1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877387838793880388138823883388438853886388738883889389038913892389338943895389638973898389939003901390239033904390539063907390839093910391139123913391439153916391739183919392039213922392339243925392639273928392939303931393239333934393539363937393839393940394139423943394439453946394739483949395039513952395339543955395639573958395939603961396239633964396539663967396839693970397139723973397439753976397739783979398039813982398339843985398639873988398939903991399239933994399539963997399839994000400140024003400440054006400740084009401040114012401340144015401640174018401940204021402240234024402540264027402840294030403140324033403440354036403740384039404040414042404340444045404640474048404940504051405240534054405540564057405840594060406140624063406440654066406740684069407040714072407340744075407640774078407940804081408240834084408540864087408840894090409140924093409440954096409740984099410041014102410341044105410641074108410941104111411241134114411541164117411841194120412141224123412441254126412741284129413041314132413341344135413641374138413941404141414241434144414541464147414841494150415141524153415441554156415741584159416041614162416341644165416641674168416941704171417241734174417541764177417841794180418141824183418441854186418741884189419041914192419341944195419641974198419942004201420242034204420542064207420842094210421142124213421442154216421742184219422042214222422342244225422642274228422942304231423242334234423542364237423842394240424142424243424442454246424742484249425042514252425342544255425642574258425942604261426242634264426542664267426842694270427142724273427442754276427742784279428042814282428342844285428642874288428942904291429242934294429542964297429842994300430143024303430443054306430743084309431043114312431343144315431643174318431943204321432243234324432543264327432843294330433143324333433443354336433743384339434043414342434343444345434643474348434943504351435243534354435543564357435843594360436143624363436443654366436743684369437043714372437343744375437643774378437943804381438243834384438543864387438843894390439143924393439443954396439743984399440044014402440344044405440644074408440944104411441244134414441544164417441844194420442144224423442444254426442744284429443044314432443344344435443644374438443944404441444244434444444544464447444844494450445144524453445444554456445744584459446044614462446344644465446644674468446944704471447244734474447544764477447844794480448144824483448444854486448744884489449044914492449344944495449644974498449945004501450245034504450545064507450845094510451145124513451445154516451745184519452045214522452345244525452645274528452945304531453245334534453545364537453845394540454145424543454445454546454745484549455045514552455345544555455645574558455945604561456245634564456545664567456845694570457145724573457445754576457745784579458045814582458345844585458645874588458945904591459245934594459545964597459845994600460146024603460446054606460746084609461046114612461346144615461646174618461946204621462246234624462546264627462846294630463146324633463446354636463746384639464046414642464346444645464646474648464946504651465246534654465546564657465846594660466146624663466446654666466746684669467046714672467346744675467646774678467946804681468246834684468546864687468846894690469146924693469446954696469746984699470047014702470347044705470647074708470947104711471247134714471547164717471847194720472147224723472447254726472747284729473047314732473347344735473647374738473947404741474247434744474547464747474847494750475147524753475447554756475747584759476047614762476347644765476647674768476947704771477247734774477547764777477847794780478147824783478447854786478747884789479047914792479347944795479647974798479948004801480248034804480548064807480848094810481148124813481448154816481748184819482048214822482348244825482648274828482948304831483248334834483548364837483848394840484148424843484448454846484748484849485048514852485348544855485648574858485948604861486248634864486548664867486848694870487148724873487448754876487748784879488048814882488348844885488648874888488948904891489248934894489548964897489848994900490149024903490449054906490749084909491049114912491349144915491649174918491949204921492249234924492549264927492849294930493149324933493449354936493749384939494049414942494349444945494649474948494949504951495249534954495549564957 |
- """
- wagtail.models is split into submodules for maintainability. All definitions intended as
- public should be imported here (with 'noqa' comments as required) and outside code should continue
- to import them from wagtail.models (e.g. `from wagtail.models import Site`, not
- `from wagtail.models.sites import Site`.)
- Submodules should take care to keep the direction of dependencies consistent; where possible they
- should implement low-level generic functionality which is then imported by higher-level models such
- as Page.
- """
- import functools
- import logging
- import uuid
- import warnings
- from io import StringIO
- from urllib.parse import urlparse
- from django import forms
- from django.conf import settings
- from django.contrib.auth.models import Group
- from django.contrib.contenttypes.fields import (
- GenericForeignKey,
- GenericRel,
- GenericRelation,
- )
- from django.contrib.contenttypes.models import ContentType
- from django.core import checks
- from django.core.cache import cache
- from django.core.exceptions import (
- ImproperlyConfigured,
- PermissionDenied,
- ValidationError,
- )
- from django.core.handlers.base import BaseHandler
- from django.core.handlers.wsgi import WSGIRequest
- from django.core.serializers.json import DjangoJSONEncoder
- from django.db import models, transaction
- from django.db.models import DEFERRED, Q, Value
- from django.db.models.expressions import OuterRef, Subquery
- from django.db.models.functions import Cast, Concat, Substr
- from django.dispatch import receiver
- from django.http import Http404
- from django.template.response import TemplateResponse
- from django.urls import NoReverseMatch, reverse
- from django.utils import timezone
- from django.utils import translation as translation
- from django.utils.cache import patch_cache_control
- from django.utils.encoding import force_str
- from django.utils.functional import cached_property
- from django.utils.module_loading import import_string
- from django.utils.text import capfirst, slugify
- from django.utils.translation import gettext_lazy as _
- from modelcluster.fields import ParentalKey
- from modelcluster.models import (
- ClusterableModel,
- get_all_child_relations,
- get_serializable_data_for_fields,
- model_from_serializable_data,
- )
- from treebeard.mp_tree import MP_Node
- from wagtail.actions.copy_for_translation import CopyPageForTranslationAction
- from wagtail.actions.copy_page import CopyPageAction
- from wagtail.actions.create_alias import CreatePageAliasAction
- from wagtail.actions.delete_page import DeletePageAction
- from wagtail.actions.move_page import MovePageAction
- from wagtail.actions.publish_page_revision import PublishPageRevisionAction
- from wagtail.actions.publish_revision import PublishRevisionAction
- from wagtail.actions.unpublish import UnpublishAction
- from wagtail.actions.unpublish_page import UnpublishPageAction
- from wagtail.coreutils import (
- WAGTAIL_APPEND_SLASH,
- camelcase_to_underscore,
- get_supported_content_language_variant,
- resolve_model_string,
- )
- from wagtail.fields import RichTextField, StreamField
- from wagtail.forms import TaskStateCommentForm
- from wagtail.locks import BasicLock, ScheduledForPublishLock, WorkflowLock
- from wagtail.log_actions import log
- from wagtail.query import PageQuerySet
- from wagtail.search import index
- from wagtail.signals import (
- page_published,
- page_slug_changed,
- pre_validate_delete,
- task_approved,
- task_cancelled,
- task_rejected,
- task_submitted,
- workflow_approved,
- workflow_cancelled,
- workflow_rejected,
- workflow_submitted,
- )
- from wagtail.url_routing import RouteResult
- from wagtail.utils.deprecation import RemovedInWagtail50Warning
- from .audit_log import ( # noqa
- BaseLogEntry,
- BaseLogEntryManager,
- LogEntryQuerySet,
- ModelLogEntry,
- )
- from .collections import ( # noqa
- BaseCollectionManager,
- Collection,
- CollectionManager,
- CollectionMember,
- CollectionViewRestriction,
- GroupCollectionPermission,
- GroupCollectionPermissionManager,
- get_root_collection_id,
- )
- from .copying import _copy, _copy_m2m_relations, _extract_field_data # noqa
- from .i18n import ( # noqa
- BootstrapTranslatableMixin,
- BootstrapTranslatableModel,
- Locale,
- LocaleManager,
- TranslatableMixin,
- bootstrap_translatable_model,
- get_translatable_models,
- )
- from .sites import Site, SiteManager, SiteRootPath # noqa
- from .view_restrictions import BaseViewRestriction
- logger = logging.getLogger("wagtail")
- PAGE_TEMPLATE_VAR = "page"
- COMMENTS_RELATION_NAME = getattr(
- settings, "WAGTAIL_COMMENTS_RELATION_NAME", "wagtail_admin_comments"
- )
- @receiver(pre_validate_delete, sender=Locale)
- def reassign_root_page_locale_on_delete(sender, instance, **kwargs):
- # if we're deleting the locale used on the root page node, reassign that to a new locale first
- root_page_with_this_locale = Page.objects.filter(depth=1, locale=instance)
- if root_page_with_this_locale.exists():
- # Select the default locale, if one exists and isn't the one being deleted
- try:
- new_locale = Locale.get_default()
- default_locale_is_ok = new_locale != instance
- except (Locale.DoesNotExist, LookupError):
- default_locale_is_ok = False
- if not default_locale_is_ok:
- # fall back on any remaining locale
- new_locale = Locale.all_objects.exclude(pk=instance.pk).first()
- root_page_with_this_locale.update(locale=new_locale)
- PAGE_MODEL_CLASSES = []
- def get_page_models():
- """
- Returns a list of all non-abstract Page model classes defined in this project.
- """
- return PAGE_MODEL_CLASSES
- def get_default_page_content_type():
- """
- Returns the content type to use as a default for pages whose content type
- has been deleted.
- """
- return ContentType.objects.get_for_model(Page)
- @functools.lru_cache(maxsize=None)
- def get_streamfield_names(model_class):
- return tuple(
- field.name
- for field in model_class._meta.concrete_fields
- if isinstance(field, StreamField)
- )
- class BasePageManager(models.Manager):
- def get_queryset(self):
- return self._queryset_class(self.model).order_by("path")
- PageManager = BasePageManager.from_queryset(PageQuerySet)
- class PageBase(models.base.ModelBase):
- """Metaclass for Page"""
- def __init__(cls, name, bases, dct):
- super(PageBase, cls).__init__(name, bases, dct)
- if "template" not in dct:
- # Define a default template path derived from the app name and model name
- cls.template = "%s/%s.html" % (
- cls._meta.app_label,
- camelcase_to_underscore(name),
- )
- if "ajax_template" not in dct:
- cls.ajax_template = None
- cls._clean_subpage_models = (
- None # to be filled in on first call to cls.clean_subpage_models
- )
- cls._clean_parent_page_models = (
- None # to be filled in on first call to cls.clean_parent_page_models
- )
- # All pages should be creatable unless explicitly set otherwise.
- # This attribute is not inheritable.
- if "is_creatable" not in dct:
- cls.is_creatable = not cls._meta.abstract
- if not cls._meta.abstract:
- # register this type in the list of page content types
- PAGE_MODEL_CLASSES.append(cls)
- class RevisionMixin(models.Model):
- """A mixin that allows a model to have revisions."""
- latest_revision = models.ForeignKey(
- "wagtailcore.Revision",
- related_name="+",
- verbose_name=_("latest revision"),
- on_delete=models.SET_NULL,
- null=True,
- blank=True,
- editable=False,
- )
- @property
- def revisions(self):
- """
- Returns revisions that belong to the object.
- Subclasses should define a
- :class:`~django.contrib.contenttypes.fields.GenericRelation` to
- :class:`~wagtail.models.Revision` and override this property to return
- that ``GenericRelation``. This allows subclasses to customise the
- ``related_query_name`` of the ``GenericRelation`` and add custom logic
- (e.g. to always use the specific instance in ``Page``).
- """
- return Revision.objects.filter(
- content_type=self.get_content_type(),
- object_id=self.pk,
- )
- def get_base_content_type(self):
- parents = self._meta.get_parent_list()
- # Get the last non-abstract parent in the MRO as the base_content_type.
- # Note: for_concrete_model=False means that the model can be a proxy model.
- if parents:
- return ContentType.objects.get_for_model(
- parents[-1], for_concrete_model=False
- )
- # This model doesn't inherit from a non-abstract model,
- # use it as the base_content_type.
- return ContentType.objects.get_for_model(self, for_concrete_model=False)
- def get_content_type(self):
- return ContentType.objects.get_for_model(self, for_concrete_model=False)
- def get_latest_revision(self):
- return self.latest_revision
- def get_latest_revision_as_object(self):
- """
- Returns the latest revision of the object as an instance of the model.
- If no latest revision exists, returns the object itself.
- """
- latest_revision = self.get_latest_revision()
- if latest_revision:
- return latest_revision.as_object()
- return self
- def serializable_data(self):
- try:
- return super().serializable_data()
- except AttributeError:
- return get_serializable_data_for_fields(self)
- @classmethod
- def from_serializable_data(cls, data, check_fks=True, strict_fks=False):
- try:
- return super().from_serializable_data(data, check_fks, strict_fks)
- except AttributeError:
- return model_from_serializable_data(
- cls, data, check_fks=check_fks, strict_fks=strict_fks
- )
- def with_content_json(self, content):
- """
- Returns a new version of the object with field values updated to reflect changes
- in the provided ``content`` (which usually comes from a previously-saved revision).
- Certain field values are preserved in order to prevent errors if the returned
- object is saved, such as ``id``. The following field values are also preserved,
- as they are considered to be meaningful to the object as a whole, rather than
- to a specific revision:
- * ``latest_revision``
- If :class:`~wagtail.models.TranslatableMixin` is applied, the following field values
- are also preserved:
- * ``translation_key``
- * ``locale``
- """
- obj = self.from_serializable_data(content)
- # This should definitely never change between revisions
- obj.pk = self.pk
- # Ensure other values that are meaningful for the object as a whole
- # (rather than to a specific revision) are preserved
- obj.latest_revision = self.latest_revision
- if isinstance(self, TranslatableMixin):
- obj.translation_key = self.translation_key
- obj.locale = self.locale
- return obj
- def _update_from_revision(self, revision, changed=True):
- self.latest_revision = revision
- self.save(update_fields=["latest_revision"])
- def save_revision(
- self,
- user=None,
- submitted_for_moderation=False,
- approved_go_live_at=None,
- changed=True,
- log_action=False,
- previous_revision=None,
- clean=True,
- ):
- """
- Creates and saves a revision.
- :param user: The user performing the action.
- :param submitted_for_moderation: Indicates whether the object was submitted for moderation.
- :param approved_go_live_at: The date and time the revision is approved to go live.
- :param changed: Indicates whether there were any content changes.
- :param log_action: Flag for logging the action. Pass ``False`` to skip logging. Can be passed an action string.
- Defaults to ``"wagtail.edit"`` when no ``previous_revision`` param is passed, otherwise ``"wagtail.revert"``.
- :param previous_revision: Indicates a revision reversal. Should be set to the previous revision instance.
- :type previous_revision: Revision
- :param clean: Set this to ``False`` to skip cleaning object content before saving this revision.
- :return: The newly created revision.
- """
- if clean:
- self.full_clean()
- revision = Revision.objects.create(
- content_object=self,
- base_content_type=self.get_base_content_type(),
- submitted_for_moderation=submitted_for_moderation,
- user=user,
- approved_go_live_at=approved_go_live_at,
- content=self.serializable_data(),
- object_str=str(self),
- )
- self._update_from_revision(revision, changed)
- logger.info(
- 'Edited: "%s" pk=%d revision_id=%d', str(self), self.pk, revision.id
- )
- if log_action:
- if not previous_revision:
- log(
- instance=self,
- action=log_action
- if isinstance(log_action, str)
- else "wagtail.edit",
- user=user,
- revision=revision,
- content_changed=changed,
- )
- else:
- log(
- instance=self,
- action=log_action
- if isinstance(log_action, str)
- else "wagtail.revert",
- user=user,
- data={
- "revision": {
- "id": previous_revision.id,
- "created": previous_revision.created_at.strftime(
- "%d %b %Y %H:%M"
- ),
- }
- },
- revision=revision,
- content_changed=changed,
- )
- return revision
- class Meta:
- abstract = True
- class DraftStateMixin(models.Model):
- live = models.BooleanField(verbose_name=_("live"), default=True, editable=False)
- has_unpublished_changes = models.BooleanField(
- verbose_name=_("has unpublished changes"), default=False, editable=False
- )
- first_published_at = models.DateTimeField(
- verbose_name=_("first published at"), blank=True, null=True, db_index=True
- )
- last_published_at = models.DateTimeField(
- verbose_name=_("last published at"), null=True, editable=False
- )
- live_revision = models.ForeignKey(
- "wagtailcore.Revision",
- related_name="+",
- verbose_name=_("live revision"),
- on_delete=models.SET_NULL,
- null=True,
- blank=True,
- editable=False,
- )
- go_live_at = models.DateTimeField(
- verbose_name=_("go live date/time"), blank=True, null=True
- )
- expire_at = models.DateTimeField(
- verbose_name=_("expiry date/time"), blank=True, null=True
- )
- expired = models.BooleanField(
- verbose_name=_("expired"), default=False, editable=False
- )
- class Meta:
- abstract = True
- @classmethod
- def check(cls, **kwargs):
- return [
- *super().check(**kwargs),
- *cls._check_revision_mixin(),
- ]
- @classmethod
- def _check_revision_mixin(cls):
- mro = cls.mro()
- error = checks.Error(
- "DraftStateMixin requires RevisionMixin to be applied after DraftStateMixin.",
- hint="Add RevisionMixin to the model's base classes after DraftStateMixin.",
- obj=cls,
- id="wagtailcore.E004",
- )
- try:
- if mro.index(RevisionMixin) < mro.index(DraftStateMixin):
- return [error]
- except ValueError:
- return [error]
- return []
- @property
- def approved_schedule(self):
- return self.scheduled_revision is not None
- @property
- def status_string(self):
- if not self.live:
- if self.expired:
- return _("expired")
- elif self.approved_schedule:
- return _("scheduled")
- else:
- return _("draft")
- else:
- if self.approved_schedule:
- return _("live + scheduled")
- elif self.has_unpublished_changes:
- return _("live + draft")
- else:
- return _("live")
- def publish(
- self, revision, user=None, changed=True, log_action=True, previous_revision=None
- ):
- """
- Publish a revision of the object by applying the changes in the revision to the live object.
- :param revision: Revision to publish.
- :type revision: Revision
- :param user: The publishing user.
- :param changed: Indicated whether content has changed.
- :param log_action: Flag for the logging action, pass ``False`` to skip logging.
- :param previous_revision: Indicates a revision reversal. Should be set to the previous revision instance.
- :type previous_revision: Revision
- """
- return PublishRevisionAction(
- revision,
- user=user,
- changed=changed,
- log_action=log_action,
- previous_revision=previous_revision,
- ).execute()
- def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
- """
- Unpublish the live object.
- :param set_expired: Mark the object as expired.
- :param commit: Commit the changes to the database.
- :param user: The unpublishing user.
- :param log_action: Flag for the logging action, pass ``False`` to skip logging.
- """
- return UnpublishAction(
- self,
- set_expired=set_expired,
- commit=commit,
- user=user,
- log_action=log_action,
- ).execute()
- def with_content_json(self, content):
- """
- Similar to :meth:`RevisionMixin.with_content_json`,
- but with the following fields also preserved:
- * ``live``
- * ``has_unpublished_changes``
- * ``first_published_at``
- """
- obj = super().with_content_json(content)
- # Ensure other values that are meaningful for the object as a whole (rather than
- # to a specific revision) are preserved
- obj.live = self.live
- obj.has_unpublished_changes = self.has_unpublished_changes
- obj.first_published_at = self.first_published_at
- return obj
- def get_latest_revision_as_object(self):
- if not self.has_unpublished_changes:
- # Use the live database copy in preference to the revision record, as:
- # 1) this will pick up any changes that have been made directly to the model,
- # such as automated data imports;
- # 2) it ensures that inline child objects pick up real database IDs even if
- # those are absent from the revision data. (If this wasn't the case, the child
- # objects would be recreated with new IDs on next publish - see #1853)
- return self
- latest_revision = self.get_latest_revision()
- if latest_revision:
- return latest_revision.as_object()
- else:
- return self
- @cached_property
- def scheduled_revision(self):
- return self.revisions.filter(approved_go_live_at__isnull=False).first()
- def get_scheduled_revision_as_object(self):
- scheduled_revision = self.scheduled_revision
- return scheduled_revision and scheduled_revision.as_object()
- def _update_from_revision(self, revision, changed=True):
- update_fields = ["latest_revision"]
- self.latest_revision = revision
- if changed:
- self.has_unpublished_changes = True
- update_fields.append("has_unpublished_changes")
- self.save(update_fields=update_fields)
- class PreviewableMixin:
- """A mixin that allows a model to have previews."""
- def make_preview_request(
- self, original_request=None, preview_mode=None, extra_request_attrs=None
- ):
- """
- Simulate a request to this object, by constructing a fake HttpRequest object that is (as far
- as possible) representative of a real request to this object's front-end URL, and invoking
- serve_preview with that request (and the given preview_mode).
- Used for previewing / moderation and any other place where we
- want to display a view of this object in the admin interface without going through the regular
- page routing logic.
- If you pass in a real request object as original_request, additional information (e.g. client IP, cookies)
- will be included in the dummy request.
- """
- dummy_meta = self._get_dummy_headers(original_request)
- request = WSGIRequest(dummy_meta)
- # Add a flag to let middleware know that this is a dummy request.
- request.is_dummy = True
- if extra_request_attrs:
- for k, v in extra_request_attrs.items():
- setattr(request, k, v)
- obj = self
- # Build a custom django.core.handlers.BaseHandler subclass that invokes serve_preview as
- # the eventual view function called at the end of the middleware chain, rather than going
- # through the URL resolver
- class Handler(BaseHandler):
- def _get_response(self, request):
- request.is_preview = True
- request.preview_mode = preview_mode
- response = obj.serve_preview(request, preview_mode)
- if hasattr(response, "render") and callable(response.render):
- response = response.render()
- patch_cache_control(response, private=True)
- return response
- # Invoke this custom handler.
- handler = Handler()
- handler.load_middleware()
- return handler.get_response(request)
- def _get_dummy_headers(self, original_request=None):
- """
- Return a dict of META information to be included in a faked HttpRequest object to pass to
- serve_preview.
- """
- url = self._get_dummy_header_url(original_request)
- if url:
- url_info = urlparse(url)
- hostname = url_info.hostname
- path = url_info.path
- port = url_info.port or (443 if url_info.scheme == "https" else 80)
- scheme = url_info.scheme
- else:
- # Cannot determine a URL to this object - cobble one together based on
- # whatever we find in ALLOWED_HOSTS
- try:
- hostname = settings.ALLOWED_HOSTS[0]
- if hostname == "*":
- # '*' is a valid value to find in ALLOWED_HOSTS[0], but it's not a valid domain name.
- # So we pretend it isn't there.
- raise IndexError
- except IndexError:
- hostname = "localhost"
- path = "/"
- port = 80
- scheme = "http"
- http_host = hostname
- if port != (443 if scheme == "https" else 80):
- http_host = "%s:%s" % (http_host, port)
- dummy_values = {
- "REQUEST_METHOD": "GET",
- "PATH_INFO": path,
- "SERVER_NAME": hostname,
- "SERVER_PORT": port,
- "SERVER_PROTOCOL": "HTTP/1.1",
- "HTTP_HOST": http_host,
- "wsgi.version": (1, 0),
- "wsgi.input": StringIO(),
- "wsgi.errors": StringIO(),
- "wsgi.url_scheme": scheme,
- "wsgi.multithread": True,
- "wsgi.multiprocess": True,
- "wsgi.run_once": False,
- }
- # Add important values from the original request object, if it was provided.
- HEADERS_FROM_ORIGINAL_REQUEST = [
- "REMOTE_ADDR",
- "HTTP_X_FORWARDED_FOR",
- "HTTP_COOKIE",
- "HTTP_USER_AGENT",
- "HTTP_AUTHORIZATION",
- "wsgi.version",
- "wsgi.multithread",
- "wsgi.multiprocess",
- "wsgi.run_once",
- ]
- if settings.SECURE_PROXY_SSL_HEADER:
- HEADERS_FROM_ORIGINAL_REQUEST.append(settings.SECURE_PROXY_SSL_HEADER[0])
- if original_request:
- for header in HEADERS_FROM_ORIGINAL_REQUEST:
- if header in original_request.META:
- dummy_values[header] = original_request.META[header]
- return dummy_values
- def _get_dummy_header_url(self, original_request=None):
- """
- Return the URL that _get_dummy_headers() should use to set META headers
- for the faked HttpRequest.
- """
- return self.full_url
- def get_full_url(self):
- return None
- full_url = property(get_full_url)
- DEFAULT_PREVIEW_MODES = [("", _("Default"))]
- @property
- def preview_modes(self):
- """
- A list of ``(internal_name, display_name)`` tuples for the modes in which
- this object can be displayed for preview/moderation purposes. Ordinarily an object
- will only have one display mode, but subclasses can override this -
- for example, a page containing a form might have a default view of the form,
- and a post-submission 'thank you' page.
- Set to ``[]`` to completely disable previewing for this model.
- """
- return PreviewableMixin.DEFAULT_PREVIEW_MODES
- @property
- def default_preview_mode(self):
- """
- The default preview mode to use in live preview.
- This default is also used in areas that do not give the user the option of selecting a
- mode explicitly, e.g. in the moderator approval workflow.
- If ``preview_modes`` is empty, an ``IndexError`` will be raised.
- """
- return self.preview_modes[0][0]
- def is_previewable(self):
- """Returns ``True`` if at least one preview mode is specified in ``preview_modes``."""
- return bool(self.preview_modes)
- def serve_preview(self, request, mode_name):
- """
- Returns an HTTP response for use in object previews.
- This method can be overridden to implement custom rendering and/or
- routing logic.
- Any templates rendered during this process should use the ``request``
- object passed here - this ensures that ``request.user`` and other
- properties are set appropriately for the wagtail user bar to be
- displayed/hidden. This request will always be a GET.
- """
- return TemplateResponse(
- request,
- self.get_preview_template(request, mode_name),
- self.get_preview_context(request, mode_name),
- )
- def get_preview_context(self, request, mode_name):
- """
- Returns a context dictionary for use in templates for previewing this object.
- """
- return {"object": self, "request": request}
- def get_preview_template(self, request, mode_name):
- """
- Returns a template to be used when previewing this object.
- Subclasses of ``PreviewableMixin`` must override this method to return the
- template name to be used in the preview. Alternatively, subclasses can also
- override the ``serve_preview`` method to completely customise the preview
- rendering logic.
- """
- raise ImproperlyConfigured(
- "%s (subclass of PreviewableMixin) must override get_preview_template or serve_preview"
- % type(self).__name__
- )
- class LockableMixin(models.Model):
- locked = models.BooleanField(
- verbose_name=_("locked"), default=False, editable=False
- )
- locked_at = models.DateTimeField(
- verbose_name=_("locked at"), null=True, editable=False
- )
- locked_by = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- verbose_name=_("locked by"),
- null=True,
- blank=True,
- editable=False,
- on_delete=models.SET_NULL,
- related_name="locked_pages",
- )
- locked_by.wagtail_reference_index_ignore = True
- class Meta:
- abstract = True
- def with_content_json(self, content):
- """
- Returns a new version of the object with field values updated to reflect changes
- in the provided ``content`` (which usually comes from a previously-saved revision).
- Certain field values are preserved in order to prevent errors if the returned
- object is saved, such as ``id``. The following field values are also preserved,
- as they are considered to be meaningful to the object as a whole, rather than
- to a specific revision:
- * ``locked``
- * ``locked_at``
- * ``locked_by``
- """
- obj = super().with_content_json(content)
- # Ensure other values that are meaningful for the object as a whole (rather than
- # to a specific revision) are preserved
- obj.locked = self.locked
- obj.locked_at = self.locked_at
- obj.locked_by = self.locked_by
- return obj
- def get_lock(self):
- """
- Returns a sub-class of BaseLock if the instance is locked, otherwise None
- """
- if self.locked:
- return BasicLock(self)
- if isinstance(self, DraftStateMixin) and self.scheduled_revision:
- return ScheduledForPublishLock(self)
- class AbstractPage(
- LockableMixin,
- PreviewableMixin,
- DraftStateMixin,
- RevisionMixin,
- TranslatableMixin,
- MP_Node,
- ):
- """
- Abstract superclass for Page. According to Django's inheritance rules, managers set on
- abstract models are inherited by subclasses, but managers set on concrete models that are extended
- via multi-table inheritance are not. We therefore need to attach PageManager to an abstract
- superclass to ensure that it is retained by subclasses of Page.
- """
- objects = PageManager()
- class Meta:
- abstract = True
- class Page(AbstractPage, index.Indexed, ClusterableModel, metaclass=PageBase):
- title = models.CharField(
- verbose_name=_("title"),
- max_length=255,
- help_text=_("The page title as you'd like it to be seen by the public"),
- )
- # to reflect title of a current draft in the admin UI
- draft_title = models.CharField(max_length=255, editable=False)
- slug = models.SlugField(
- verbose_name=_("slug"),
- allow_unicode=True,
- max_length=255,
- help_text=_(
- "The name of the page as it will appear in URLs e.g http://domain.com/blog/[my-slug]/"
- ),
- )
- content_type = models.ForeignKey(
- ContentType,
- verbose_name=_("content type"),
- related_name="pages",
- on_delete=models.SET(get_default_page_content_type),
- )
- content_type.wagtail_reference_index_ignore = True
- url_path = models.TextField(verbose_name=_("URL path"), blank=True, editable=False)
- owner = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- verbose_name=_("owner"),
- null=True,
- blank=True,
- editable=True,
- on_delete=models.SET_NULL,
- related_name="owned_pages",
- )
- owner.wagtail_reference_index_ignore = True
- seo_title = models.CharField(
- verbose_name=_("title tag"),
- max_length=255,
- blank=True,
- help_text=_(
- "The name of the page displayed on search engine results as the clickable headline."
- ),
- )
- show_in_menus_default = False
- show_in_menus = models.BooleanField(
- verbose_name=_("show in menus"),
- default=False,
- help_text=_(
- "Whether a link to this page will appear in automatically generated menus"
- ),
- )
- search_description = models.TextField(
- verbose_name=_("meta description"),
- blank=True,
- help_text=_(
- "The descriptive text displayed underneath a headline in search engine results."
- ),
- )
- latest_revision_created_at = models.DateTimeField(
- verbose_name=_("latest revision created at"), null=True, editable=False
- )
- _revisions = GenericRelation("wagtailcore.Revision", related_query_name="page")
- # If non-null, this page is an alias of the linked page
- # This means the page is kept in sync with the live version
- # of the linked pages and is not editable by users.
- alias_of = models.ForeignKey(
- "self",
- on_delete=models.SET_NULL,
- null=True,
- blank=True,
- editable=False,
- related_name="aliases",
- )
- alias_of.wagtail_reference_index_ignore = True
- search_fields = [
- index.SearchField("title", partial_match=True, boost=2),
- index.AutocompleteField("title"),
- index.FilterField("title"),
- index.FilterField("id"),
- index.FilterField("live"),
- index.FilterField("owner"),
- index.FilterField("content_type"),
- index.FilterField("path"),
- index.FilterField("depth"),
- index.FilterField("locked"),
- index.FilterField("show_in_menus"),
- index.FilterField("first_published_at"),
- index.FilterField("last_published_at"),
- index.FilterField("latest_revision_created_at"),
- index.FilterField("locale"),
- index.FilterField("translation_key"),
- ]
- # Do not allow plain Page instances to be created through the Wagtail admin
- is_creatable = False
- # Define the maximum number of instances this page type can have. Default to unlimited.
- max_count = None
- # Define the maximum number of instances this page can have under a specific parent. Default to unlimited.
- max_count_per_parent = None
- # An array of additional field names that will not be included when a Page is copied.
- exclude_fields_in_copy = []
- default_exclude_fields_in_copy = [
- "id",
- "path",
- "depth",
- "numchild",
- "url_path",
- "path",
- "postgres_index_entries",
- "index_entries",
- "latest_revision",
- COMMENTS_RELATION_NAME,
- ]
- # Define these attributes early to avoid masking errors. (Issue #3078)
- # The canonical definition is in wagtailadmin.panels.
- content_panels = []
- promote_panels = []
- settings_panels = []
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if not self.id:
- # this model is being newly created
- # rather than retrieved from the db;
- if not self.content_type_id:
- # set content type to correctly represent the model class
- # that this was created as
- self.content_type = ContentType.objects.get_for_model(self)
- if "show_in_menus" not in kwargs:
- # if the value is not set on submit refer to the model setting
- self.show_in_menus = self.show_in_menus_default
- def __str__(self):
- return self.title
- @property
- def revisions(self):
- # Always use the specific page instance when querying for revisions as
- # they are always saved with the specific content_type.
- return self.specific_deferred._revisions
- def get_base_content_type(self):
- # We want to always use the default Page model's ContentType as the
- # base_content_type so that we can query for page revisions without
- # having to know the specific Page type.
- return get_default_page_content_type()
- def get_content_type(self):
- return self.content_type
- @classmethod
- def get_streamfield_names(cls):
- return get_streamfield_names(cls)
- def set_url_path(self, parent):
- """
- Populate the url_path field based on this page's slug and the specified parent page.
- (We pass a parent in here, rather than retrieving it via get_parent, so that we can give
- new unsaved pages a meaningful URL when previewing them; at that point the page has not
- been assigned a position in the tree, as far as treebeard is concerned.
- """
- if parent:
- self.url_path = parent.url_path + self.slug + "/"
- else:
- # a page without a parent is the tree root, which always has a url_path of '/'
- self.url_path = "/"
- return self.url_path
- @staticmethod
- def _slug_is_available(slug, parent_page, page=None):
- """
- Determine whether the given slug is available for use on a child page of
- parent_page. If 'page' is passed, the slug is intended for use on that page
- (and so it will be excluded from the duplicate check).
- """
- if parent_page is None:
- # the root page's slug can be whatever it likes...
- return True
- siblings = parent_page.get_children()
- if page:
- siblings = siblings.not_page(page)
- return not siblings.filter(slug=slug).exists()
- def _get_autogenerated_slug(self, base_slug):
- candidate_slug = base_slug
- suffix = 1
- parent_page = self.get_parent()
- while not Page._slug_is_available(candidate_slug, parent_page, self):
- # try with incrementing suffix until we find a slug which is available
- suffix += 1
- candidate_slug = "%s-%d" % (base_slug, suffix)
- return candidate_slug
- def get_default_locale(self):
- """
- Finds the default locale to use for this page.
- This will be called just before the initial save.
- """
- parent = self.get_parent()
- if parent is not None:
- return (
- parent.specific_class.objects.defer()
- .select_related("locale")
- .get(id=parent.id)
- .locale
- )
- return super().get_default_locale()
- def full_clean(self, *args, **kwargs):
- # Apply fixups that need to happen before per-field validation occurs
- if not self.slug:
- # Try to auto-populate slug from title
- allow_unicode = getattr(settings, "WAGTAIL_ALLOW_UNICODE_SLUGS", True)
- base_slug = slugify(self.title, allow_unicode=allow_unicode)
- # only proceed if we get a non-empty base slug back from slugify
- if base_slug:
- self.slug = self._get_autogenerated_slug(base_slug)
- if not self.draft_title:
- self.draft_title = self.title
- # Set the locale
- if self.locale_id is None:
- self.locale = self.get_default_locale()
- super().full_clean(*args, **kwargs)
- def clean(self):
- super().clean()
- if not Page._slug_is_available(self.slug, self.get_parent(), self):
- raise ValidationError({"slug": _("This slug is already in use")})
- def is_site_root(self):
- """
- Returns True if this page is the root of any site.
- This includes translations of site root pages as well.
- """
- # `_is_site_root` may be populated by `annotate_site_root_state` on `PageQuerySet` as a
- # performance optimisation
- if hasattr(self, "_is_site_root"):
- return self._is_site_root
- return Site.objects.filter(
- root_page__translation_key=self.translation_key
- ).exists()
- @transaction.atomic
- # ensure that changes are only committed when we have updated all descendant URL paths, to preserve consistency
- def save(self, clean=True, user=None, log_action=False, **kwargs):
- """
- Overrides default method behaviour to make additional updates unique to pages,
- such as updating the ``url_path`` value of descendant page to reflect changes
- to this page's slug.
- New pages should generally be saved via the ``add_child()`` or ``add_sibling()``
- method of an existing page, which will correctly set the ``path`` and ``depth``
- fields on the new page before saving it.
- By default, pages are validated using ``full_clean()`` before attempting to
- save changes to the database, which helps to preserve validity when restoring
- pages from historic revisions (which might not necessarily reflect the current
- model state). This validation step can be bypassed by calling the method with
- ``clean=False``.
- """
- if clean:
- self.full_clean()
- slug_changed = False
- is_new = self.id is None
- if is_new:
- # we are creating a record. If we're doing things properly, this should happen
- # through a treebeard method like add_child, in which case the 'path' field
- # has been set and so we can safely call get_parent
- self.set_url_path(self.get_parent())
- else:
- # Check that we are committing the slug to the database
- # Basically: If update_fields has been specified, and slug is not included, skip this step
- if not (
- "update_fields" in kwargs and "slug" not in kwargs["update_fields"]
- ):
- # see if the slug has changed from the record in the db, in which case we need to
- # update url_path of self and all descendants. Even though we might not need it,
- # the specific page is fetched here for sending to the 'page_slug_changed' signal.
- old_record = Page.objects.get(id=self.id).specific
- if old_record.slug != self.slug:
- self.set_url_path(self.get_parent())
- slug_changed = True
- old_url_path = old_record.url_path
- new_url_path = self.url_path
- result = super().save(**kwargs)
- if slug_changed:
- self._update_descendant_url_paths(old_url_path, new_url_path)
- # Emit page_slug_changed signal on successful db commit
- transaction.on_commit(
- lambda: page_slug_changed.send(
- sender=self.specific_class or self.__class__,
- instance=self.specific,
- instance_before=old_record,
- )
- )
- # Check if this is a root page of any sites and clear the 'wagtail_site_root_paths' key if so
- # Note: New translations of existing site roots are considered site roots as well, so we must
- # always check if this page is a site root, even if it's new.
- if self.is_site_root():
- cache.delete("wagtail_site_root_paths")
- # Log
- if is_new:
- cls = type(self)
- logger.info(
- 'Page created: "%s" id=%d content_type=%s.%s path=%s',
- self.title,
- self.id,
- cls._meta.app_label,
- cls.__name__,
- self.url_path,
- )
- if log_action is not None:
- # The default for log_action is False. i.e. don't log unless specifically instructed
- # Page creation is a special case that we want logged by default, but allow skipping it
- # explicitly by passing log_action=None
- if is_new:
- log(
- instance=self,
- action="wagtail.create",
- user=user or self.owner,
- content_changed=True,
- )
- elif log_action:
- log(instance=self, action=log_action, user=user)
- return result
- def delete(self, *args, **kwargs):
- user = kwargs.pop("user", None)
- return DeletePageAction(self, user=user).execute(*args, **kwargs)
- @classmethod
- def check(cls, **kwargs):
- errors = super(Page, cls).check(**kwargs)
- # Check that foreign keys from pages are not configured to cascade
- # This is the default Django behaviour which must be explicitly overridden
- # to prevent pages disappearing unexpectedly and the tree being corrupted
- # get names of foreign keys pointing to parent classes (such as page_ptr)
- field_exceptions = [
- field.name
- for model in [cls] + list(cls._meta.get_parent_list())
- for field in model._meta.parents.values()
- if field
- ]
- for field in cls._meta.fields:
- if (
- isinstance(field, models.ForeignKey)
- and field.name not in field_exceptions
- ):
- if field.remote_field.on_delete == models.CASCADE:
- errors.append(
- checks.Warning(
- "Field hasn't specified on_delete action",
- hint="Set on_delete=models.SET_NULL and make sure the field is nullable or set on_delete=models.PROTECT. Wagtail does not allow simple database CASCADE because it will corrupt its tree storage.",
- obj=field,
- id="wagtailcore.W001",
- )
- )
- if not isinstance(cls.objects, PageManager):
- errors.append(
- checks.Error(
- "Manager does not inherit from PageManager",
- hint="Ensure that custom Page managers inherit from wagtail.models.PageManager",
- obj=cls,
- id="wagtailcore.E002",
- )
- )
- try:
- cls.clean_subpage_models()
- except (ValueError, LookupError) as e:
- errors.append(
- checks.Error(
- "Invalid subpage_types setting for %s" % cls,
- hint=str(e),
- id="wagtailcore.E002",
- )
- )
- try:
- cls.clean_parent_page_models()
- except (ValueError, LookupError) as e:
- errors.append(
- checks.Error(
- "Invalid parent_page_types setting for %s" % cls,
- hint=str(e),
- id="wagtailcore.E002",
- )
- )
- return errors
- def _update_descendant_url_paths(self, old_url_path, new_url_path):
- (
- Page.objects.filter(path__startswith=self.path)
- .exclude(pk=self.pk)
- .update(
- url_path=Concat(
- Value(new_url_path), Substr("url_path", len(old_url_path) + 1)
- )
- )
- )
- def get_specific(self, deferred=False, copy_attrs=None, copy_attrs_exclude=None):
- """
- Return this page in its most specific subclassed form.
- By default, a database query is made to fetch all field values for the
- specific object. If you only require access to custom methods or other
- non-field attributes on the specific object, you can use
- ``deferred=True`` to avoid this query. However, any attempts to access
- specific field values from the returned object will trigger additional
- database queries.
- By default, references to all non-field attribute values are copied
- from current object to the returned one. This includes:
- * Values set by a queryset, for example: annotations, or values set as
- a result of using ``select_related()`` or ``prefetch_related()``.
- * Any ``cached_property`` values that have been evaluated.
- * Attributes set elsewhere in Python code.
- For fine-grained control over which non-field values are copied to the
- returned object, you can use ``copy_attrs`` to specify a complete list
- of attribute names to include. Alternatively, you can use
- ``copy_attrs_exclude`` to specify a list of attribute names to exclude.
- If called on a page object that is already an instance of the most
- specific class (e.g. an ``EventPage``), the object will be returned
- as is, and no database queries or other operations will be triggered.
- If the page was originally created using a page type that has since
- been removed from the codebase, a generic ``Page`` object will be
- returned (without any custom field values or other functionality
- present on the original class). Usually, deleting these pages is the
- best course of action, but there is currently no safe way for Wagtail
- to do that at migration time.
- """
- model_class = self.specific_class
- if model_class is None:
- # The codebase and database are out of sync (e.g. the model exists
- # on a different git branch and migrations were not applied or
- # reverted before switching branches). So, the best we can do is
- # return the page in it's current form.
- return self
- if isinstance(self, model_class):
- # self is already an instance of the most specific class.
- return self
- if deferred:
- # Generate a tuple of values in the order expected by __init__(),
- # with missing values substituted with DEFERRED ()
- values = tuple(
- getattr(self, f.attname, self.pk if f.primary_key else DEFERRED)
- for f in model_class._meta.concrete_fields
- )
- # Create object from known attribute values
- specific_obj = model_class(*values)
- specific_obj._state.adding = self._state.adding
- else:
- # Fetch object from database
- specific_obj = model_class._default_manager.get(id=self.id)
- # Copy non-field attribute values
- if copy_attrs is not None:
- for attr in (attr for attr in copy_attrs if attr in self.__dict__):
- setattr(specific_obj, attr, getattr(self, attr))
- else:
- exclude = copy_attrs_exclude or ()
- for k, v in ((k, v) for k, v in self.__dict__.items() if k not in exclude):
- # only set values that haven't already been set
- specific_obj.__dict__.setdefault(k, v)
- return specific_obj
- @cached_property
- def specific(self):
- """
- Returns this page in its most specific subclassed form with all field
- values fetched from the database. The result is cached in memory.
- """
- return self.get_specific()
- @cached_property
- def specific_deferred(self):
- """
- Returns this page in its most specific subclassed form without any
- additional field values being fetched from the database. The result
- is cached in memory.
- """
- return self.get_specific(deferred=True)
- @cached_property
- def specific_class(self):
- """
- Return the class that this page would be if instantiated in its
- most specific form.
- If the model class can no longer be found in the codebase, and the
- relevant ``ContentType`` has been removed by a database migration,
- the return value will be ``None``.
- If the model class can no longer be found in the codebase, but the
- relevant ``ContentType`` is still present in the database (usually a
- result of switching between git branches without running or reverting
- database migrations beforehand), the return value will be ``None``.
- """
- return self.cached_content_type.model_class()
- @property
- def cached_content_type(self):
- """
- Return this page's ``content_type`` value from the ``ContentType``
- model's cached manager, which will avoid a database query if the
- object is already in memory.
- """
- return ContentType.objects.get_for_id(self.content_type_id)
- @property
- def page_type_display_name(self):
- """
- A human-readable version of this page's type
- """
- if not self.specific_class or self.is_root():
- return ""
- else:
- return self.specific_class.get_verbose_name()
- def route(self, request, path_components):
- if path_components:
- # request is for a child of this page
- child_slug = path_components[0]
- remaining_components = path_components[1:]
- try:
- subpage = self.get_children().get(slug=child_slug)
- except Page.DoesNotExist:
- raise Http404
- return subpage.specific.route(request, remaining_components)
- else:
- # request is for this very page
- if self.live:
- return RouteResult(self)
- else:
- raise Http404
- def get_admin_display_title(self):
- """
- Return the title for this page as it should appear in the admin backend;
- override this if you wish to display extra contextual information about the page,
- such as language. By default, returns ``draft_title``.
- """
- # Fall back on title if draft_title is blank (which may happen if the page was created
- # in a fixture or migration that didn't explicitly handle draft_title)
- return self.draft_title or self.title
- def save_revision(
- self,
- user=None,
- submitted_for_moderation=False,
- approved_go_live_at=None,
- changed=True,
- log_action=False,
- previous_revision=None,
- clean=True,
- ):
- # Raise error if this is not the specific version of the page
- if not isinstance(self, self.specific_class):
- raise RuntimeError(
- "page.save_revision() must be called on the specific version of the page. "
- "Call page.specific.save_revision() instead."
- )
- # Raise an error if this page is an alias.
- if self.alias_of_id:
- raise RuntimeError(
- "page.save_revision() was called on an alias page. "
- "Revisions are not required for alias pages as they are an exact copy of another page."
- )
- if clean:
- self.full_clean()
- new_comments = getattr(self, COMMENTS_RELATION_NAME).filter(pk__isnull=True)
- for comment in new_comments:
- # We need to ensure comments have an id in the revision, so positions can be identified correctly
- comment.save()
- revision = Revision.objects.create(
- content_object=self,
- base_content_type=self.get_base_content_type(),
- submitted_for_moderation=submitted_for_moderation,
- user=user,
- approved_go_live_at=approved_go_live_at,
- content=self.serializable_data(),
- object_str=str(self),
- )
- for comment in new_comments:
- comment.revision_created = revision
- self.latest_revision_created_at = revision.created_at
- self.draft_title = self.title
- self.latest_revision = revision
- update_fields = [
- COMMENTS_RELATION_NAME,
- "latest_revision_created_at",
- "draft_title",
- "latest_revision",
- ]
- if changed:
- self.has_unpublished_changes = True
- update_fields.append("has_unpublished_changes")
- if update_fields:
- # clean=False because the fields we're updating don't need validation
- self.save(update_fields=update_fields, clean=False)
- # Log
- logger.info(
- 'Page edited: "%s" id=%d revision_id=%d', self.title, self.id, revision.id
- )
- if log_action:
- if not previous_revision:
- log(
- instance=self,
- action=log_action
- if isinstance(log_action, str)
- else "wagtail.edit",
- user=user,
- revision=revision,
- content_changed=changed,
- )
- else:
- log(
- instance=self,
- action=log_action
- if isinstance(log_action, str)
- else "wagtail.revert",
- user=user,
- data={
- "revision": {
- "id": previous_revision.id,
- "created": previous_revision.created_at.strftime(
- "%d %b %Y %H:%M"
- ),
- }
- },
- revision=revision,
- content_changed=changed,
- )
- if submitted_for_moderation:
- logger.info(
- 'Page submitted for moderation: "%s" id=%d revision_id=%d',
- self.title,
- self.id,
- revision.id,
- )
- return revision
- def get_latest_revision_as_page(self):
- warnings.warn(
- "Pages should use .get_latest_revision_as_object() instead of "
- ".get_latest_revision_as_page() to retrieve the latest revision as a "
- "Page instance.",
- category=RemovedInWagtail50Warning,
- stacklevel=2,
- )
- return self.get_latest_revision_as_object()
- def get_latest_revision_as_object(self):
- if not self.has_unpublished_changes:
- # Use the live database copy in preference to the revision record, as:
- # 1) this will pick up any changes that have been made directly to the model,
- # such as automated data imports;
- # 2) it ensures that inline child objects pick up real database IDs even if
- # those are absent from the revision data. (If this wasn't the case, the child
- # objects would be recreated with new IDs on next publish - see #1853)
- return self.specific
- latest_revision = self.get_latest_revision()
- if latest_revision:
- return latest_revision.as_object()
- else:
- return self.specific
- def update_aliases(
- self, *, revision=None, user=None, _content=None, _updated_ids=None
- ):
- """
- Publishes all aliases that follow this page with the latest content from this page.
- This is called by Wagtail whenever a page with aliases is published.
- :param revision: The revision of the original page that we are updating to (used for logging purposes).
- :type revision: Revision, optional
- :param user: The user who is publishing (used for logging purposes).
- :type user: User, optional
- """
- specific_self = self.specific
- # Only compute this if necessary since it's quite a heavy operation
- if _content is None:
- _content = self.serializable_data()
- # A list of IDs that have already been updated. This is just in case someone has
- # created an alias loop (which is impossible to do with the UI Wagtail provides)
- _updated_ids = _updated_ids or []
- for alias in self.specific_class.objects.filter(alias_of=self).exclude(
- id__in=_updated_ids
- ):
- # FIXME: Switch to the same fields that are excluded from copy
- # We can't do this right now because we can't exclude fields from with_content_json
- exclude_fields = [
- "id",
- "path",
- "depth",
- "numchild",
- "url_path",
- "path",
- "index_entries",
- "postgres_index_entries",
- ]
- # Copy field content
- alias_updated = alias.with_content_json(_content)
- # Publish the alias if it's currently in draft
- alias_updated.live = True
- alias_updated.has_unpublished_changes = False
- # Copy child relations
- child_object_map = specific_self.copy_all_child_relations(
- target=alias_updated, exclude=exclude_fields
- )
- # Process child objects
- # This has two jobs:
- # - If the alias is in a different locale, this updates the
- # locale of any translatable child objects to match
- # - If the alias is not a translation of the original, this
- # changes the translation_key field of all child objects
- # so they do not clash
- if child_object_map:
- alias_is_translation = alias.translation_key == self.translation_key
- def process_child_object(child_object):
- if isinstance(child_object, TranslatableMixin):
- # Child object's locale must always match the page
- child_object.locale = alias_updated.locale
- # If the alias isn't a translation of the original page,
- # change the child object's translation_keys so they are
- # not either
- if not alias_is_translation:
- child_object.translation_key = uuid.uuid4()
- for (rel, previous_id), child_objects in child_object_map.items():
- if previous_id is None:
- for child_object in child_objects:
- process_child_object(child_object)
- else:
- process_child_object(child_objects)
- # Copy M2M relations
- _copy_m2m_relations(
- specific_self, alias_updated, exclude_fields=exclude_fields
- )
- # Don't change the aliases slug
- # Aliases can have their own slugs so they can be siblings of the original
- alias_updated.slug = alias.slug
- alias_updated.set_url_path(alias_updated.get_parent())
- # Aliases don't have revisions, so update fields that would normally be updated by save_revision
- alias_updated.draft_title = alias_updated.title
- alias_updated.latest_revision_created_at = self.latest_revision_created_at
- alias_updated.save(clean=False)
- page_published.send(
- sender=alias_updated.specific_class,
- instance=alias_updated,
- revision=revision,
- alias=True,
- )
- # Log the publish of the alias
- log(
- instance=alias_updated,
- action="wagtail.publish",
- user=user,
- )
- # Update any aliases of that alias
- # Design note:
- # It could be argued that this will be faster if we just changed these alias-of-alias
- # pages to all point to the original page and avoid having to update them recursively.
- #
- # But, it's useful to have a record of how aliases have been chained.
- # For example, In Wagtail Localize, we use aliases to create mirrored trees, but those
- # trees themselves could have aliases within them. If an alias within a tree is
- # converted to a regular page, we want the alias in the mirrored tree to follow that
- # new page and stop receiving updates from the original page.
- #
- # Doing it this way requires an extra lookup query per alias but this is small in
- # comparison to the work required to update the alias.
- alias.update_aliases(
- revision=revision,
- _content=_content,
- _updated_ids=_updated_ids,
- )
- update_aliases.alters_data = True
- def publish(
- self, revision, user=None, changed=True, log_action=True, previous_revision=None
- ):
- return PublishPageRevisionAction(
- revision,
- user=user,
- changed=changed,
- log_action=log_action,
- previous_revision=previous_revision,
- ).execute()
- def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
- return UnpublishPageAction(
- self,
- set_expired=set_expired,
- commit=commit,
- user=user,
- log_action=log_action,
- ).execute()
- context_object_name = None
- def get_context(self, request, *args, **kwargs):
- context = {
- PAGE_TEMPLATE_VAR: self,
- "self": self,
- "request": request,
- }
- if self.context_object_name:
- context[self.context_object_name] = self
- return context
- def get_preview_context(self, request, mode_name):
- return self.get_context(request)
- def get_template(self, request, *args, **kwargs):
- if request.headers.get("x-requested-with") == "XMLHttpRequest":
- return self.ajax_template or self.template
- else:
- return self.template
- def get_preview_template(self, request, mode_name):
- return self.get_template(request)
- def serve(self, request, *args, **kwargs):
- request.is_preview = False
- return TemplateResponse(
- request,
- self.get_template(request, *args, **kwargs),
- self.get_context(request, *args, **kwargs),
- )
- def is_navigable(self):
- """
- Return true if it's meaningful to browse subpages of this page -
- i.e. it currently has subpages,
- or it's at the top level (this rule necessary for empty out-of-the-box sites to have working navigation)
- """
- return (not self.is_leaf()) or self.depth == 2
- def _get_site_root_paths(self, request=None):
- """
- Return ``Site.get_site_root_paths()``, using the cached copy on the
- request object if available.
- """
- # if we have a request, use that to cache site_root_paths; otherwise, use self
- cache_object = request if request else self
- try:
- return cache_object._wagtail_cached_site_root_paths
- except AttributeError:
- cache_object._wagtail_cached_site_root_paths = Site.get_site_root_paths()
- return cache_object._wagtail_cached_site_root_paths
- def _get_relevant_site_root_paths(self, cache_object=None):
- """
- .. versionadded::2.16
- Returns a tuple of root paths for all sites this page belongs to.
- """
- return tuple(
- srp
- for srp in self._get_site_root_paths(cache_object)
- if self.url_path.startswith(srp.root_path)
- )
- def get_url_parts(self, request=None):
- """
- Determine the URL for this page and return it as a tuple of
- ``(site_id, site_root_url, page_url_relative_to_site_root)``.
- Return None if the page is not routable.
- This is used internally by the ``full_url``, ``url``, ``relative_url``
- and ``get_site`` properties and methods; pages with custom URL routing
- should override this method in order to have those operations return
- the custom URLs.
- Accepts an optional keyword argument ``request``, which may be used
- to avoid repeated database / cache lookups. Typically, a page model
- that overrides ``get_url_parts`` should not need to deal with
- ``request`` directly, and should just pass it to the original method
- when calling ``super``.
- """
- possible_sites = self._get_relevant_site_root_paths(request)
- if not possible_sites:
- return None
- site_id, root_path, root_url, language_code = possible_sites[0]
- site = Site.find_for_request(request)
- if site:
- for site_id, root_path, root_url, language_code in possible_sites:
- if site_id == site.pk:
- break
- else:
- site_id, root_path, root_url, language_code = possible_sites[0]
- use_wagtail_i18n = getattr(settings, "WAGTAIL_I18N_ENABLED", False)
- if use_wagtail_i18n:
- # If the active language code is a variant of the page's language, then
- # use that instead
- # This is used when LANGUAGES contain more languages than WAGTAIL_CONTENT_LANGUAGES
- try:
- if (
- get_supported_content_language_variant(translation.get_language())
- == language_code
- ):
- language_code = translation.get_language()
- except LookupError:
- # active language code is not a recognised content language, so leave
- # page's language code unchanged
- pass
- # The page may not be routable because wagtail_serve is not registered
- # This may be the case if Wagtail is used headless
- try:
- if use_wagtail_i18n:
- with translation.override(language_code):
- page_path = reverse(
- "wagtail_serve", args=(self.url_path[len(root_path) :],)
- )
- else:
- page_path = reverse(
- "wagtail_serve", args=(self.url_path[len(root_path) :],)
- )
- except NoReverseMatch:
- return (site_id, None, None)
- # Remove the trailing slash from the URL reverse generates if
- # WAGTAIL_APPEND_SLASH is False and we're not trying to serve
- # the root path
- if not WAGTAIL_APPEND_SLASH and page_path != "/":
- page_path = page_path.rstrip("/")
- return (site_id, root_url, page_path)
- def get_full_url(self, request=None):
- """Return the full URL (including protocol / domain) to this page, or None if it is not routable"""
- url_parts = self.get_url_parts(request=request)
- if url_parts is None or url_parts[1] is None and url_parts[2] is None:
- # page is not routable
- return
- site_id, root_url, page_path = url_parts
- return root_url + page_path
- full_url = property(get_full_url)
- def get_url(self, request=None, current_site=None):
- """
- Return the 'most appropriate' URL for referring to this page from the pages we serve,
- within the Wagtail backend and actual website templates;
- this is the local URL (starting with '/') if we're only running a single site
- (i.e. we know that whatever the current page is being served from, this link will be on the
- same domain), and the full URL (with domain) if not.
- Return None if the page is not routable.
- Accepts an optional but recommended ``request`` keyword argument that, if provided, will
- be used to cache site-level URL information (thereby avoiding repeated database / cache
- lookups) and, via the ``Site.find_for_request()`` function, determine whether a relative
- or full URL is most appropriate.
- """
- # ``current_site`` is purposefully undocumented, as one can simply pass the request and get
- # a relative URL based on ``Site.find_for_request()``. Nonetheless, support it here to avoid
- # copy/pasting the code to the ``relative_url`` method below.
- if current_site is None and request is not None:
- site = Site.find_for_request(request)
- current_site = site
- url_parts = self.get_url_parts(request=request)
- if url_parts is None or url_parts[1] is None and url_parts[2] is None:
- # page is not routable
- return
- site_id, root_url, page_path = url_parts
- # Get number of unique sites in root paths
- # Note: there may be more root paths to sites if there are multiple languages
- num_sites = len(
- {root_path[0] for root_path in self._get_site_root_paths(request)}
- )
- if (current_site is not None and site_id == current_site.id) or num_sites == 1:
- # the site matches OR we're only running a single site, so a local URL is sufficient
- return page_path
- else:
- return root_url + page_path
- url = property(get_url)
- def relative_url(self, current_site, request=None):
- """
- Return the 'most appropriate' URL for this page taking into account the site we're currently on;
- a local URL if the site matches, or a fully qualified one otherwise.
- Return None if the page is not routable.
- Accepts an optional but recommended ``request`` keyword argument that, if provided, will
- be used to cache site-level URL information (thereby avoiding repeated database / cache
- lookups).
- """
- return self.get_url(request=request, current_site=current_site)
- def get_site(self):
- """
- Return the Site object that this page belongs to.
- """
- url_parts = self.get_url_parts()
- if url_parts is None:
- # page is not routable
- return
- site_id, root_url, page_path = url_parts
- return Site.objects.get(id=site_id)
- @classmethod
- def get_indexed_objects(cls):
- content_type = ContentType.objects.get_for_model(cls)
- return super(Page, cls).get_indexed_objects().filter(content_type=content_type)
- def get_indexed_instance(self):
- # This is accessed on save by the wagtailsearch signal handler, and in edge
- # cases (e.g. loading test fixtures), may be called before the specific instance's
- # entry has been created. In those cases, we aren't ready to be indexed yet, so
- # return None.
- try:
- return self.specific
- except self.specific_class.DoesNotExist:
- return None
- @classmethod
- def clean_subpage_models(cls):
- """
- Returns the list of subpage types, normalised as model classes.
- Throws ValueError if any entry in subpage_types cannot be recognised as a model name,
- or LookupError if a model does not exist (or is not a Page subclass).
- """
- if cls._clean_subpage_models is None:
- subpage_types = getattr(cls, "subpage_types", None)
- if subpage_types is None:
- # if subpage_types is not specified on the Page class, allow all page types as subpages
- cls._clean_subpage_models = get_page_models()
- else:
- cls._clean_subpage_models = [
- resolve_model_string(model_string, cls._meta.app_label)
- for model_string in subpage_types
- ]
- for model in cls._clean_subpage_models:
- if not issubclass(model, Page):
- raise LookupError("%s is not a Page subclass" % model)
- return cls._clean_subpage_models
- @classmethod
- def clean_parent_page_models(cls):
- """
- Returns the list of parent page types, normalised as model classes.
- Throws ValueError if any entry in parent_page_types cannot be recognised as a model name,
- or LookupError if a model does not exist (or is not a Page subclass).
- """
- if cls._clean_parent_page_models is None:
- parent_page_types = getattr(cls, "parent_page_types", None)
- if parent_page_types is None:
- # if parent_page_types is not specified on the Page class, allow all page types as subpages
- cls._clean_parent_page_models = get_page_models()
- else:
- cls._clean_parent_page_models = [
- resolve_model_string(model_string, cls._meta.app_label)
- for model_string in parent_page_types
- ]
- for model in cls._clean_parent_page_models:
- if not issubclass(model, Page):
- raise LookupError("%s is not a Page subclass" % model)
- return cls._clean_parent_page_models
- @classmethod
- def allowed_parent_page_models(cls):
- """
- Returns the list of page types that this page type can be a subpage of,
- as a list of model classes
- """
- return [
- parent_model
- for parent_model in cls.clean_parent_page_models()
- if cls in parent_model.clean_subpage_models()
- ]
- @classmethod
- def allowed_subpage_models(cls):
- """
- Returns the list of page types that this page type can have as subpages,
- as a list of model classes
- """
- return [
- subpage_model
- for subpage_model in cls.clean_subpage_models()
- if cls in subpage_model.clean_parent_page_models()
- ]
- @classmethod
- def creatable_subpage_models(cls):
- """
- Returns the list of page types that may be created under this page type,
- as a list of model classes
- """
- return [
- page_model
- for page_model in cls.allowed_subpage_models()
- if page_model.is_creatable
- ]
- @classmethod
- def can_exist_under(cls, parent):
- """
- Checks if this page type can exist as a subpage under a parent page
- instance.
- See also: :func:`Page.can_create_at` and :func:`Page.can_move_to`
- """
- return cls in parent.specific_class.allowed_subpage_models()
- @classmethod
- def can_create_at(cls, parent):
- """
- Checks if this page type can be created as a subpage under a parent
- page instance.
- """
- can_create = cls.is_creatable and cls.can_exist_under(parent)
- if cls.max_count is not None:
- can_create = can_create and cls.objects.count() < cls.max_count
- if cls.max_count_per_parent is not None:
- can_create = (
- can_create
- and parent.get_children().type(cls).count() < cls.max_count_per_parent
- )
- return can_create
- def can_move_to(self, parent):
- """
- Checks if this page instance can be moved to be a subpage of a parent
- page instance.
- """
- # Prevent pages from being moved to different language sections
- # The only page that can have multi-lingual children is the root page
- parent_is_root = parent.depth == 1
- if not parent_is_root and parent.locale_id != self.locale_id:
- return False
- return self.can_exist_under(parent)
- @classmethod
- def get_verbose_name(cls):
- """
- Returns the human-readable "verbose name" of this page model e.g "Blog page".
- """
- # This is similar to doing cls._meta.verbose_name.title()
- # except this doesn't convert any characters to lowercase
- return capfirst(cls._meta.verbose_name)
- @classmethod
- def get_page_description(cls):
- """
- Returns a page description if it's set. For example "A multi-purpose web page".
- """
- description = getattr(cls, "page_description", None)
- # make sure that page_description is actually a string rather than a model field
- if isinstance(description, str):
- return description
- elif getattr(description, "_delegate_text", None):
- # description is a lazy object (e.g. the result of gettext_lazy())
- return str(description)
- else:
- return ""
- @property
- def status_string(self):
- if not self.live:
- if self.expired:
- return _("expired")
- elif self.approved_schedule:
- return _("scheduled")
- elif self.workflow_in_progress:
- return _("in moderation")
- else:
- return _("draft")
- else:
- if self.approved_schedule:
- return _("live + scheduled")
- elif self.workflow_in_progress:
- return _("live + in moderation")
- elif self.has_unpublished_changes:
- return _("live + draft")
- else:
- return _("live")
- @property
- def approved_schedule(self):
- # `_approved_schedule` may be populated by `annotate_approved_schedule` on `PageQuerySet` as a
- # performance optimisation
- if hasattr(self, "_approved_schedule"):
- return self._approved_schedule
- return self.scheduled_revision is not None
- def has_unpublished_subtree(self):
- """
- An awkwardly-defined flag used in determining whether unprivileged editors have
- permission to delete this article. Returns true if and only if this page is non-live,
- and it has no live children.
- """
- return (not self.live) and (
- not self.get_descendants().filter(live=True).exists()
- )
- def move(self, target, pos=None, user=None):
- """
- Extension to the treebeard 'move' method to ensure that url_path is updated,
- and to emit a 'pre_page_move' and 'post_page_move' signals.
- """
- return MovePageAction(self, target, pos=pos, user=user).execute()
- def copy(
- self,
- recursive=False,
- to=None,
- update_attrs=None,
- copy_revisions=True,
- keep_live=True,
- user=None,
- process_child_object=None,
- exclude_fields=None,
- log_action="wagtail.copy",
- reset_translation_key=True,
- ):
- """
- Copies a given page
- :param log_action flag for logging the action. Pass None to skip logging.
- Can be passed an action string. Defaults to 'wagtail.copy'
- """
- return CopyPageAction(
- self,
- to=to,
- update_attrs=update_attrs,
- exclude_fields=exclude_fields,
- recursive=recursive,
- copy_revisions=copy_revisions,
- keep_live=keep_live,
- user=user,
- process_child_object=process_child_object,
- log_action=log_action,
- reset_translation_key=reset_translation_key,
- ).execute(skip_permission_checks=True)
- copy.alters_data = True
- def create_alias(
- self,
- *,
- recursive=False,
- parent=None,
- update_slug=None,
- update_locale=None,
- user=None,
- log_action="wagtail.create_alias",
- reset_translation_key=True,
- _mpnode_attrs=None,
- ):
- return CreatePageAliasAction(
- self,
- recursive=recursive,
- parent=parent,
- update_slug=update_slug,
- update_locale=update_locale,
- user=user,
- log_action=log_action,
- reset_translation_key=reset_translation_key,
- _mpnode_attrs=_mpnode_attrs,
- ).execute()
- create_alias.alters_data = True
- def copy_for_translation(
- self, locale, copy_parents=False, alias=False, exclude_fields=None
- ):
- """Creates a copy of this page in the specified locale."""
- return CopyPageForTranslationAction(
- self,
- locale,
- copy_parents=copy_parents,
- alias=alias,
- exclude_fields=exclude_fields,
- ).execute()
- copy_for_translation.alters_data = True
- def permissions_for_user(self, user):
- """
- Return a PagePermissionsTester object defining what actions the user can perform on this page
- """
- user_perms = UserPagePermissionsProxy(user)
- return user_perms.for_page(self)
- def is_previewable(self):
- """Returns True if at least one preview mode is specified"""
- # It's possible that this will be called from a listing page using a plain Page queryset -
- # if so, checking self.preview_modes would incorrectly give us the default set from
- # Page.preview_modes. However, accessing self.specific.preview_modes would result in an N+1
- # query problem. To avoid this (at least in the general case), we'll call .specific only if
- # a check of the property at the class level indicates that preview_modes has been
- # overridden from whatever type we're currently in.
- page = self
- if page.specific_class.preview_modes != type(page).preview_modes:
- page = page.specific
- return bool(page.preview_modes)
- def get_lock(self):
- # Standard locking should take precedence over workflow locking
- # because it's possible for both to be used at the same time
- lock = super().get_lock()
- if lock:
- return lock
- current_workflow_task = self.current_workflow_task
- if current_workflow_task:
- return WorkflowLock(current_workflow_task, self)
- def get_route_paths(self):
- """
- .. versionadded:: 2.16
- Returns a list of paths that this page can be viewed at.
- These values are combined with the dynamic portion of the page URL to
- automatically create redirects when the page's URL changes.
- .. note::
- If using ``RoutablePageMixin``, you may want to override this method
- to include the paths of popualar routes.
- .. note::
- Redirect paths are 'normalized' to apply consistent ordering to GET parameters,
- so you don't need to include every variation. Fragment identifiers are discarded
- too, so should be avoided.
- """
- return ["/"]
- def get_cached_paths(self):
- """
- This returns a list of paths to invalidate in a frontend cache
- """
- return ["/"]
- def get_sitemap_urls(self, request=None):
- return [
- {
- "location": self.get_full_url(request),
- # fall back on latest_revision_created_at if last_published_at is null
- # (for backwards compatibility from before last_published_at was added)
- "lastmod": (self.last_published_at or self.latest_revision_created_at),
- }
- ]
- def get_static_site_paths(self):
- """
- This is a generator of URL paths to feed into a static site generator
- Override this if you would like to create static versions of subpages
- """
- # Yield path for this page
- yield "/"
- # Yield paths for child pages
- for child in self.get_children().live():
- for path in child.specific.get_static_site_paths():
- yield "/" + child.slug + path
- def get_ancestors(self, inclusive=False):
- """
- Returns a queryset of the current page's ancestors, starting at the root page
- and descending to the parent, or to the current page itself if ``inclusive`` is true.
- """
- return Page.objects.ancestor_of(self, inclusive)
- def get_descendants(self, inclusive=False):
- """
- Returns a queryset of all pages underneath the current page, any number of levels deep.
- If ``inclusive`` is true, the current page itself is included in the queryset.
- """
- return Page.objects.descendant_of(self, inclusive)
- def get_siblings(self, inclusive=True):
- """
- Returns a queryset of all other pages with the same parent as the current page.
- If ``inclusive`` is true, the current page itself is included in the queryset.
- """
- return Page.objects.sibling_of(self, inclusive)
- def get_next_siblings(self, inclusive=False):
- return self.get_siblings(inclusive).filter(path__gte=self.path).order_by("path")
- def get_prev_siblings(self, inclusive=False):
- return (
- self.get_siblings(inclusive).filter(path__lte=self.path).order_by("-path")
- )
- def get_view_restrictions(self):
- """
- Return a query set of all page view restrictions that apply to this page.
- This checks the current page and all ancestor pages for page view restrictions.
- If any of those pages are aliases, it will resolve them to their source pages
- before querying PageViewRestrictions so alias pages use the same view restrictions
- as their source page and they cannot have their own.
- """
- page_ids_to_check = set()
- def add_page_to_check_list(page):
- # If the page is an alias, add the source page to the check list instead
- if page.alias_of:
- add_page_to_check_list(page.alias_of)
- else:
- page_ids_to_check.add(page.id)
- # Check current page for view restrictions
- add_page_to_check_list(self)
- # Check each ancestor for view restrictions as well
- for page in self.get_ancestors().only("alias_of"):
- add_page_to_check_list(page)
- return PageViewRestriction.objects.filter(page_id__in=page_ids_to_check)
- password_required_template = getattr(
- settings, "PASSWORD_REQUIRED_TEMPLATE", "wagtailcore/password_required.html"
- )
- def serve_password_required_response(self, request, form, action_url):
- """
- Serve a response indicating that the user has been denied access to view this page,
- and must supply a password.
- form = a Django form object containing the password input
- (and zero or more hidden fields that also need to be output on the template)
- action_url = URL that this form should be POSTed to
- """
- context = self.get_context(request)
- context["form"] = form
- context["action_url"] = action_url
- return TemplateResponse(request, self.password_required_template, context)
- def with_content_json(self, content):
- """
- Returns a new version of the page with field values updated to reflect changes
- in the provided ``content`` (which usually comes from a previously-saved
- page revision).
- Certain field values are preserved in order to prevent errors if the returned
- page is saved, such as ``id``, ``content_type`` and some tree-related values.
- The following field values are also preserved, as they are considered to be
- meaningful to the page as a whole, rather than to a specific revision:
- * ``draft_title``
- * ``live``
- * ``has_unpublished_changes``
- * ``owner``
- * ``locked``
- * ``locked_by``
- * ``locked_at``
- * ``latest_revision``
- * ``latest_revision_created_at``
- * ``first_published_at``
- * ``alias_of``
- * ``wagtail_admin_comments`` (COMMENTS_RELATION_NAME)
- """
- # Old revisions (pre Wagtail 2.15) may have saved comment data under the name 'comments'
- # rather than the current relation name as set by COMMENTS_RELATION_NAME;
- # if a 'comments' field exists and looks like our comments model, alter the data to use
- # COMMENTS_RELATION_NAME before restoring
- if (
- COMMENTS_RELATION_NAME not in content
- and "comments" in content
- and isinstance(content["comments"], list)
- and len(content["comments"])
- and isinstance(content["comments"][0], dict)
- and "contentpath" in content["comments"][0]
- ):
- content[COMMENTS_RELATION_NAME] = content["comments"]
- del content["comments"]
- obj = self.specific_class.from_serializable_data(content)
- # These should definitely never change between revisions
- obj.id = self.id
- obj.pk = self.pk
- obj.content_type = self.content_type
- # Override possibly-outdated tree parameter fields
- obj.path = self.path
- obj.depth = self.depth
- obj.numchild = self.numchild
- # Update url_path to reflect potential slug changes, but maintaining the page's
- # existing tree position
- obj.set_url_path(self.get_parent())
- # Ensure other values that are meaningful for the page as a whole (rather than
- # to a specific revision) are preserved
- obj.draft_title = self.draft_title
- obj.live = self.live
- obj.has_unpublished_changes = self.has_unpublished_changes
- obj.owner = self.owner
- obj.locked = self.locked
- obj.locked_by = self.locked_by
- obj.locked_at = self.locked_at
- obj.latest_revision = self.latest_revision
- obj.latest_revision_created_at = self.latest_revision_created_at
- obj.first_published_at = self.first_published_at
- obj.translation_key = self.translation_key
- obj.locale = self.locale
- obj.alias_of_id = self.alias_of_id
- revision_comments = getattr(obj, COMMENTS_RELATION_NAME)
- page_comments = getattr(self, COMMENTS_RELATION_NAME).filter(
- resolved_at__isnull=True
- )
- for comment in page_comments:
- # attempt to retrieve the comment position from the revision's stored version
- # of the comment
- try:
- revision_comment = revision_comments.get(id=comment.id)
- comment.position = revision_comment.position
- except Comment.DoesNotExist:
- pass
- setattr(obj, COMMENTS_RELATION_NAME, page_comments)
- return obj
- @property
- def has_workflow(self):
- """Returns True if the page or an ancestor has an active workflow assigned, otherwise False"""
- if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
- return False
- return (
- self.get_ancestors(inclusive=True)
- .filter(workflowpage__isnull=False)
- .filter(workflowpage__workflow__active=True)
- .exists()
- )
- def get_workflow(self):
- """Returns the active workflow assigned to the page or its nearest ancestor"""
- if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
- return None
- if hasattr(self, "workflowpage") and self.workflowpage.workflow.active:
- return self.workflowpage.workflow
- else:
- try:
- workflow = (
- self.get_ancestors()
- .filter(workflowpage__isnull=False)
- .filter(workflowpage__workflow__active=True)
- .order_by("-depth")
- .first()
- .workflowpage.workflow
- )
- except AttributeError:
- workflow = None
- return workflow
- @property
- def workflow_in_progress(self):
- """Returns True if a workflow is in progress on the current page, otherwise False"""
- if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
- return False
- # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `PageQuerySet` as a
- # performance optimisation
- if hasattr(self, "_current_workflow_states"):
- for state in self._current_workflow_states:
- if state.status == WorkflowState.STATUS_IN_PROGRESS:
- return True
- return False
- return WorkflowState.objects.filter(
- page=self, status=WorkflowState.STATUS_IN_PROGRESS
- ).exists()
- @property
- def current_workflow_state(self):
- """Returns the in progress or needs changes workflow state on this page, if it exists"""
- if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
- return None
- # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `pagequeryset` as a
- # performance optimisation
- if hasattr(self, "_current_workflow_states"):
- try:
- return self._current_workflow_states[0]
- except IndexError:
- return
- try:
- return (
- WorkflowState.objects.active()
- .select_related("current_task_state__task")
- .get(page=self)
- )
- except WorkflowState.DoesNotExist:
- return
- @property
- def current_workflow_task_state(self):
- """Returns (specific class of) the current task state of the workflow on this page, if it exists"""
- current_workflow_state = self.current_workflow_state
- if (
- current_workflow_state
- and current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS
- and current_workflow_state.current_task_state
- ):
- return current_workflow_state.current_task_state.specific
- @property
- def current_workflow_task(self):
- """Returns (specific class of) the current task in progress on this page, if it exists"""
- current_workflow_task_state = self.current_workflow_task_state
- if current_workflow_task_state:
- return current_workflow_task_state.task.specific
- class Meta:
- verbose_name = _("page")
- verbose_name_plural = _("pages")
- unique_together = [("translation_key", "locale")]
- class Orderable(models.Model):
- sort_order = models.IntegerField(null=True, blank=True, editable=False)
- sort_order_field = "sort_order"
- class Meta:
- abstract = True
- ordering = ["sort_order"]
- class RevisionQuerySet(models.QuerySet):
- def page_revisions(self):
- return self.filter(base_content_type=get_default_page_content_type())
- def submitted(self):
- return self.filter(submitted_for_moderation=True)
- def for_instance(self, instance):
- return self.filter(
- content_type=ContentType.objects.get_for_model(
- instance, for_concrete_model=False
- ),
- object_id=str(instance.pk),
- )
- class RevisionsManager(models.Manager):
- def get_queryset(self):
- return RevisionQuerySet(self.model, using=self._db)
- def for_instance(self, instance):
- return self.get_queryset().for_instance(instance)
- class PageRevisionsManager(RevisionsManager):
- def get_queryset(self):
- return RevisionQuerySet(self.model, using=self._db).page_revisions()
- def submitted(self):
- return self.get_queryset().submitted()
- class SubmittedRevisionsManager(models.Manager):
- def get_queryset(self):
- return RevisionQuerySet(self.model, using=self._db).submitted()
- class Revision(models.Model):
- content_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, related_name="+"
- )
- base_content_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, related_name="+"
- )
- object_id = models.CharField(
- max_length=255,
- verbose_name=_("object id"),
- )
- submitted_for_moderation = models.BooleanField(
- verbose_name=_("submitted for moderation"), default=False, db_index=True
- )
- created_at = models.DateTimeField(db_index=True, verbose_name=_("created at"))
- user = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- verbose_name=_("user"),
- null=True,
- blank=True,
- on_delete=models.SET_NULL,
- related_name="wagtail_revisions",
- )
- object_str = models.TextField(default="")
- content = models.JSONField(
- verbose_name=_("content JSON"), encoder=DjangoJSONEncoder
- )
- approved_go_live_at = models.DateTimeField(
- verbose_name=_("approved go live at"), null=True, blank=True, db_index=True
- )
- objects = RevisionsManager()
- page_revisions = PageRevisionsManager()
- submitted_revisions = SubmittedRevisionsManager()
- content_object = GenericForeignKey(
- "content_type", "object_id", for_concrete_model=False
- )
- wagtail_reference_index_ignore = True
- @cached_property
- def base_content_object(self):
- return self.base_content_type.get_object_for_this_type(pk=self.object_id)
- @property
- def page(self):
- warnings.warn(
- "Revisions should access .content_object instead of .page "
- "to retrieve the object.",
- category=RemovedInWagtail50Warning,
- stacklevel=2,
- )
- return self.content_object
- @property
- def page_id(self):
- warnings.warn(
- "Revisions should access .object_id instead of .page_id "
- "to retrieve the object's primary key. For page revisions, "
- "you may need to cast the object_id to integer first.",
- category=RemovedInWagtail50Warning,
- stacklevel=2,
- )
- return int(self.object_id)
- def save(self, user=None, *args, **kwargs):
- # Set default value for created_at to now
- # We cannot use auto_now_add as that will override
- # any value that is set before saving
- if self.created_at is None:
- self.created_at = timezone.now()
- # Set default value for base_content_type to the content_type.
- # Page revisions should set this to the default Page model's content type,
- # but the distinction may not be necessary for models that do not use inheritance.
- if self.base_content_type_id is None:
- self.base_content_type_id = self.content_type_id
- super().save(*args, **kwargs)
- if self.submitted_for_moderation:
- # ensure that all other revisions of this object have the 'submitted for moderation' flag unset
- Revision.objects.filter(
- base_content_type_id=self.base_content_type_id,
- object_id=self.object_id,
- ).exclude(id=self.id).update(submitted_for_moderation=False)
- if (
- self.approved_go_live_at is None
- and "update_fields" in kwargs
- and "approved_go_live_at" in kwargs["update_fields"]
- ):
- # Log scheduled revision publish cancellation
- object = self.as_object()
- # go_live_at = kwargs['update_fields'][]
- log(
- instance=object,
- action="wagtail.schedule.cancel",
- data={
- "revision": {
- "id": self.id,
- "created": self.created_at.strftime("%d %b %Y %H:%M"),
- "go_live_at": object.go_live_at.strftime("%d %b %Y %H:%M")
- if object.go_live_at
- else None,
- }
- },
- user=user,
- revision=self,
- )
- def as_object(self):
- return self.content_object.with_content_json(self.content)
- def as_page_object(self):
- warnings.warn(
- "Revisions should use .as_object() instead of .as_page_object() "
- "to create the object.",
- category=RemovedInWagtail50Warning,
- stacklevel=2,
- )
- return self.as_object()
- def approve_moderation(self, user=None):
- if self.submitted_for_moderation:
- logger.info(
- 'Page moderation approved: "%s" id=%d revision_id=%d',
- self.content_object.title,
- self.content_object.id,
- self.id,
- )
- log(
- instance=self.as_object(),
- action="wagtail.moderation.approve",
- user=user,
- revision=self,
- )
- self.publish()
- def reject_moderation(self, user=None):
- if self.submitted_for_moderation:
- logger.info(
- 'Page moderation rejected: "%s" id=%d revision_id=%d',
- self.content_object.title,
- self.content_object.id,
- self.id,
- )
- log(
- instance=self.as_object(),
- action="wagtail.moderation.reject",
- user=user,
- revision=self,
- )
- self.submitted_for_moderation = False
- self.save(update_fields=["submitted_for_moderation"])
- def is_latest_revision(self):
- if self.id is None:
- # special case: a revision without an ID is presumed to be newly-created and is thus
- # newer than any revision that might exist in the database
- return True
- latest_revision = (
- Revision.objects.filter(
- base_content_type_id=self.base_content_type_id,
- object_id=self.object_id,
- )
- .order_by("-created_at", "-id")
- .first()
- )
- return latest_revision == self
- def delete(self):
- # Update revision_created fields for comments that reference the current revision, if applicable.
- try:
- next_revision = self.get_next()
- except Revision.DoesNotExist:
- next_revision = None
- if next_revision:
- # move comments created on this revision to the next revision, as they may well still apply if they're unresolved
- self.created_comments.all().update(revision_created=next_revision)
- return super().delete()
- def publish(self, user=None, changed=True, log_action=True, previous_revision=None):
- return self.content_object.publish(
- self,
- user=user,
- changed=changed,
- log_action=log_action,
- previous_revision=previous_revision,
- )
- def get_previous(self):
- return self.get_previous_by_created_at(
- base_content_type_id=self.base_content_type_id,
- object_id=self.object_id,
- )
- def get_next(self):
- return self.get_next_by_created_at(
- base_content_type_id=self.base_content_type_id,
- object_id=self.object_id,
- )
- def __str__(self):
- return '"' + str(self.content_object) + '" at ' + str(self.created_at)
- class Meta:
- verbose_name = _("revision")
- verbose_name_plural = _("revisions")
- indexes = [
- models.Index(
- fields=["content_type", "object_id"],
- name="content_object_idx",
- ),
- models.Index(
- fields=["base_content_type", "object_id"],
- name="base_content_object_idx",
- ),
- ]
- PAGE_PERMISSION_TYPES = [
- ("add", _("Add"), _("Add/edit pages you own")),
- ("edit", _("Edit"), _("Edit any page")),
- ("publish", _("Publish"), _("Publish any page")),
- ("bulk_delete", _("Bulk delete"), _("Delete pages with children")),
- ("lock", _("Lock"), _("Lock/unlock pages you've locked")),
- ("unlock", _("Unlock"), _("Unlock any page")),
- ]
- PAGE_PERMISSION_TYPE_CHOICES = [
- (identifier, long_label)
- for identifier, short_label, long_label in PAGE_PERMISSION_TYPES
- ]
- class GroupPagePermission(models.Model):
- group = models.ForeignKey(
- Group,
- verbose_name=_("group"),
- related_name="page_permissions",
- on_delete=models.CASCADE,
- )
- page = models.ForeignKey(
- "Page",
- verbose_name=_("page"),
- related_name="group_permissions",
- on_delete=models.CASCADE,
- )
- permission_type = models.CharField(
- verbose_name=_("permission type"),
- max_length=20,
- choices=PAGE_PERMISSION_TYPE_CHOICES,
- )
- class Meta:
- unique_together = ("group", "page", "permission_type")
- verbose_name = _("group page permission")
- verbose_name_plural = _("group page permissions")
- def __str__(self):
- return "Group %d ('%s') has permission '%s' on page %d ('%s')" % (
- self.group.id,
- self.group,
- self.permission_type,
- self.page.id,
- self.page,
- )
- class UserPagePermissionsProxy:
- """Helper object that encapsulates all the page permission rules that this user has
- across the page hierarchy."""
- def __init__(self, user):
- self.user = user
- if user.is_active and not user.is_superuser:
- self.permissions = GroupPagePermission.objects.filter(
- group__user=self.user
- ).select_related("page")
- def revisions_for_moderation(self):
- """Return a queryset of page revisions awaiting moderation that this user has publish permission on"""
- # Deal with the trivial cases first...
- if not self.user.is_active:
- return Revision.objects.none()
- if self.user.is_superuser:
- return Revision.page_revisions.submitted()
- # get the list of pages for which they have direct publish permission
- # (i.e. they can publish any page within this subtree)
- publishable_pages_paths = (
- self.permissions.filter(permission_type="publish")
- .values_list("page__path", flat=True)
- .distinct()
- )
- if not publishable_pages_paths:
- return Revision.objects.none()
- # compile a filter expression to apply to the Revision.page_revisions.submitted() queryset:
- # return only those pages whose paths start with one of the publishable_pages paths
- only_my_sections = Q(path__startswith=publishable_pages_paths[0])
- for page_path in publishable_pages_paths[1:]:
- only_my_sections = only_my_sections | Q(path__startswith=page_path)
- # return the filtered queryset
- return Revision.page_revisions.submitted().filter(
- object_id__in=Page.objects.filter(only_my_sections).values_list(
- Cast("pk", output_field=models.CharField()), flat=True
- )
- )
- def for_page(self, page):
- """Return a PagePermissionTester object that can be used to query whether this user has
- permission to perform specific tasks on the given page"""
- return PagePermissionTester(self, page)
- def explorable_pages(self):
- """Return a queryset of pages that the user has access to view in the
- explorer (e.g. add/edit/publish permission). Includes all pages with
- specific group permissions and also the ancestors of those pages (in
- order to enable navigation in the explorer)"""
- # Deal with the trivial cases first...
- if not self.user.is_active:
- return Page.objects.none()
- if self.user.is_superuser:
- return Page.objects.all()
- explorable_pages = Page.objects.none()
- # Creates a union queryset of all objects the user has access to add,
- # edit and publish
- for perm in self.permissions.filter(
- Q(permission_type="add")
- | Q(permission_type="edit")
- | Q(permission_type="publish")
- | Q(permission_type="lock")
- ):
- explorable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
- # For all pages with specific permissions, add their ancestors as
- # explorable. This will allow deeply nested pages to be accessed in the
- # explorer. For example, in the hierarchy A>B>C>D where the user has
- # 'edit' access on D, they will be able to navigate to D without having
- # explicit access to A, B or C.
- page_permissions = Page.objects.filter(group_permissions__in=self.permissions)
- for page in page_permissions:
- explorable_pages |= page.get_ancestors()
- # Remove unnecessary top-level ancestors that the user has no access to
- fca_page = page_permissions.first_common_ancestor()
- explorable_pages = explorable_pages.filter(path__startswith=fca_page.path)
- return explorable_pages
- def editable_pages(self):
- """Return a queryset of the pages that this user has permission to edit"""
- # Deal with the trivial cases first...
- if not self.user.is_active:
- return Page.objects.none()
- if self.user.is_superuser:
- return Page.objects.all()
- editable_pages = Page.objects.none()
- for perm in self.permissions.filter(permission_type="add"):
- # user has edit permission on any subpage of perm.page
- # (including perm.page itself) that is owned by them
- editable_pages |= Page.objects.descendant_of(
- perm.page, inclusive=True
- ).filter(owner=self.user)
- for perm in self.permissions.filter(permission_type="edit"):
- # user has edit permission on any subpage of perm.page
- # (including perm.page itself) regardless of owner
- editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
- return editable_pages
- def can_edit_pages(self):
- """Return True if the user has permission to edit any pages"""
- return self.editable_pages().exists()
- def publishable_pages(self):
- """Return a queryset of the pages that this user has permission to publish"""
- # Deal with the trivial cases first...
- if not self.user.is_active:
- return Page.objects.none()
- if self.user.is_superuser:
- return Page.objects.all()
- publishable_pages = Page.objects.none()
- for perm in self.permissions.filter(permission_type="publish"):
- # user has publish permission on any subpage of perm.page
- # (including perm.page itself)
- publishable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
- return publishable_pages
- def can_publish_pages(self):
- """Return True if the user has permission to publish any pages"""
- return self.publishable_pages().exists()
- def can_remove_locks(self):
- """Returns True if the user has permission to unlock pages they have not locked"""
- if self.user.is_superuser:
- return True
- if not self.user.is_active:
- return False
- else:
- return self.permissions.filter(permission_type="unlock").exists()
- class PagePermissionTester:
- def __init__(self, user_perms, page):
- self.user = user_perms.user
- self.user_perms = user_perms
- self.page = page
- self.page_is_root = page.depth == 1 # Equivalent to page.is_root()
- if self.user.is_active and not self.user.is_superuser:
- self.permissions = {
- perm.permission_type
- for perm in user_perms.permissions
- if self.page.path.startswith(perm.page.path)
- }
- def user_has_lock(self):
- return self.page.locked_by_id == self.user.pk
- def page_locked(self):
- lock = self.page.get_lock()
- return lock and lock.for_user(self.user)
- def can_add_subpage(self):
- if not self.user.is_active:
- return False
- specific_class = self.page.specific_class
- if specific_class is None or not specific_class.creatable_subpage_models():
- return False
- return self.user.is_superuser or ("add" in self.permissions)
- def can_edit(self):
- if not self.user.is_active:
- return False
- if (
- self.page_is_root
- ): # root node is not a page and can never be edited, even by superusers
- return False
- if self.user.is_superuser:
- return True
- if "edit" in self.permissions:
- return True
- if "add" in self.permissions and self.page.owner_id == self.user.pk:
- return True
- current_workflow_task = self.page.current_workflow_task
- if current_workflow_task:
- if current_workflow_task.user_can_access_editor(self.page, self.user):
- return True
- return False
- def can_delete(self, ignore_bulk=False):
- if not self.user.is_active:
- return False
- if (
- self.page_is_root
- ): # root node is not a page and can never be deleted, even by superusers
- return False
- if self.user.is_superuser:
- # superusers require no further checks
- return True
- # if the user does not have bulk_delete permission, they may only delete leaf pages
- if (
- "bulk_delete" not in self.permissions
- and not self.page.is_leaf()
- and not ignore_bulk
- ):
- return False
- if "edit" in self.permissions:
- # if the user does not have publish permission, we also need to confirm that there
- # are no published pages here
- if "publish" not in self.permissions:
- pages_to_delete = self.page.get_descendants(inclusive=True)
- if pages_to_delete.live().exists():
- return False
- return True
- elif "add" in self.permissions:
- pages_to_delete = self.page.get_descendants(inclusive=True)
- if "publish" in self.permissions:
- # we don't care about live state, but all pages must be owned by this user
- # (i.e. eliminating pages owned by this user must give us the empty set)
- return not pages_to_delete.exclude(owner=self.user).exists()
- else:
- # all pages must be owned by this user and non-live
- # (i.e. eliminating non-live pages owned by this user must give us the empty set)
- return not pages_to_delete.exclude(live=False, owner=self.user).exists()
- else:
- return False
- def can_unpublish(self):
- if not self.user.is_active:
- return False
- if (not self.page.live) or self.page_is_root:
- return False
- if self.page_locked():
- return False
- return self.user.is_superuser or ("publish" in self.permissions)
- def can_publish(self):
- if not self.user.is_active:
- return False
- if self.page_is_root:
- return False
- return self.user.is_superuser or ("publish" in self.permissions)
- def can_submit_for_moderation(self):
- return (
- not self.page_locked()
- and self.page.has_workflow
- and not self.page.workflow_in_progress
- )
- def can_set_view_restrictions(self):
- return self.can_publish()
- def can_unschedule(self):
- return self.can_publish()
- def can_lock(self):
- if self.user.is_superuser:
- return True
- current_workflow_task = self.page.current_workflow_task
- if current_workflow_task:
- return current_workflow_task.user_can_lock(self.page, self.user)
- if "lock" in self.permissions:
- return True
- return False
- def can_unlock(self):
- if self.user.is_superuser:
- return True
- if self.user_has_lock():
- return True
- current_workflow_task = self.page.current_workflow_task
- if current_workflow_task:
- return current_workflow_task.user_can_unlock(self.page, self.user)
- if "unlock" in self.permissions:
- return True
- return False
- def can_publish_subpage(self):
- """
- Niggly special case for creating and publishing a page in one go.
- Differs from can_publish in that we want to be able to publish subpages of root, but not
- to be able to publish root itself. (Also, can_publish_subpage returns false if the page
- does not allow subpages at all.)
- """
- if not self.user.is_active:
- return False
- specific_class = self.page.specific_class
- if specific_class is None or not specific_class.creatable_subpage_models():
- return False
- return self.user.is_superuser or ("publish" in self.permissions)
- def can_reorder_children(self):
- """
- Keep reorder permissions the same as publishing, since it immediately affects published pages
- (and the use-cases for a non-admin needing to do it are fairly obscure...)
- """
- return self.can_publish_subpage()
- def can_move(self):
- """
- Moving a page should be logically equivalent to deleting and re-adding it (and all its children).
- As such, the permission test for 'can this be moved at all?' should be the same as for deletion.
- (Further constraints will then apply on where it can be moved *to*.)
- """
- return self.can_delete(ignore_bulk=True)
- def can_copy(self):
- return not self.page_is_root
- def can_move_to(self, destination):
- # reject the logically impossible cases first
- if self.page == destination or destination.is_descendant_of(self.page):
- return False
- # reject moves that are forbidden by subpage_types / parent_page_types rules
- # (these rules apply to superusers too)
- if not self.page.specific.can_move_to(destination):
- return False
- # shortcut the trivial 'everything' / 'nothing' permissions
- if not self.user.is_active:
- return False
- if self.user.is_superuser:
- return True
- # check that the page can be moved at all
- if not self.can_move():
- return False
- # Inspect permissions on the destination
- destination_perms = self.user_perms.for_page(destination)
- # we always need at least add permission in the target
- if "add" not in destination_perms.permissions:
- return False
- if self.page.live or self.page.get_descendants().filter(live=True).exists():
- # moving this page will entail publishing within the destination section
- return "publish" in destination_perms.permissions
- else:
- # no publishing required, so the already-tested 'add' permission is sufficient
- return True
- def can_copy_to(self, destination, recursive=False):
- # reject the logically impossible cases first
- # recursive can't copy to the same tree otherwise it will be on infinite loop
- if recursive and (
- self.page == destination or destination.is_descendant_of(self.page)
- ):
- return False
- # reject inactive users early
- if not self.user.is_active:
- return False
- # reject early if pages of this type cannot be created at the destination
- if not self.page.specific_class.can_create_at(destination):
- return False
- # skip permission checking for super users
- if self.user.is_superuser:
- return True
- # Inspect permissions on the destination
- destination_perms = self.user_perms.for_page(destination)
- if not destination.specific_class.creatable_subpage_models():
- return False
- # we always need at least add permission in the target
- if "add" not in destination_perms.permissions:
- return False
- return True
- def can_view_revisions(self):
- return not self.page_is_root
- class PageViewRestriction(BaseViewRestriction):
- page = models.ForeignKey(
- "Page",
- verbose_name=_("page"),
- related_name="view_restrictions",
- on_delete=models.CASCADE,
- )
- passed_view_restrictions_session_key = "passed_page_view_restrictions"
- class Meta:
- verbose_name = _("page view restriction")
- verbose_name_plural = _("page view restrictions")
- def save(self, user=None, **kwargs):
- """
- Custom save handler to include logging.
- :param user: the user add/updating the view restriction
- :param specific_instance: the specific model instance the restriction applies to
- """
- specific_instance = self.page.specific
- is_new = self.id is None
- super().save(**kwargs)
- if specific_instance:
- log(
- instance=specific_instance,
- action="wagtail.view_restriction.create"
- if is_new
- else "wagtail.view_restriction.edit",
- user=user,
- data={
- "restriction": {
- "type": self.restriction_type,
- "title": force_str(
- dict(self.RESTRICTION_CHOICES).get(self.restriction_type)
- ),
- }
- },
- )
- def delete(self, user=None, **kwargs):
- """
- Custom delete handler to aid in logging
- :param user: the user removing the view restriction
- :param specific_instance: the specific model instance the restriction applies to
- """
- specific_instance = self.page.specific
- if specific_instance:
- log(
- instance=specific_instance,
- action="wagtail.view_restriction.delete",
- user=user,
- data={
- "restriction": {
- "type": self.restriction_type,
- "title": force_str(
- dict(self.RESTRICTION_CHOICES).get(self.restriction_type)
- ),
- }
- },
- )
- return super().delete(**kwargs)
- class WorkflowPage(models.Model):
- page = models.OneToOneField(
- "Page",
- verbose_name=_("page"),
- on_delete=models.CASCADE,
- primary_key=True,
- unique=True,
- )
- workflow = models.ForeignKey(
- "Workflow",
- related_name="workflow_pages",
- verbose_name=_("workflow"),
- on_delete=models.CASCADE,
- )
- def get_pages(self):
- """
- Returns a queryset of pages that are affected by this WorkflowPage link.
- This includes all descendants of the page excluding any that have other WorkflowPages.
- """
- descendant_pages = Page.objects.descendant_of(self.page, inclusive=True)
- descendant_workflow_pages = WorkflowPage.objects.filter(
- page_id__in=descendant_pages.values_list("id", flat=True)
- ).exclude(pk=self.pk)
- for path, depth in descendant_workflow_pages.values_list(
- "page__path", "page__depth"
- ):
- descendant_pages = descendant_pages.exclude(
- path__startswith=path, depth__gte=depth
- )
- return descendant_pages
- class Meta:
- verbose_name = _("workflow page")
- verbose_name_plural = _("workflow pages")
- class WorkflowTask(Orderable):
- workflow = ParentalKey(
- "Workflow",
- on_delete=models.CASCADE,
- verbose_name=_("workflow_tasks"),
- related_name="workflow_tasks",
- )
- task = models.ForeignKey(
- "Task",
- on_delete=models.CASCADE,
- verbose_name=_("task"),
- related_name="workflow_tasks",
- limit_choices_to={"active": True},
- )
- class Meta(Orderable.Meta):
- unique_together = [("workflow", "task")]
- verbose_name = _("workflow task order")
- verbose_name_plural = _("workflow task orders")
- class TaskManager(models.Manager):
- def active(self):
- return self.filter(active=True)
- class Task(models.Model):
- name = models.CharField(max_length=255, verbose_name=_("name"))
- content_type = models.ForeignKey(
- ContentType,
- verbose_name=_("content type"),
- related_name="wagtail_tasks",
- on_delete=models.CASCADE,
- )
- active = models.BooleanField(
- verbose_name=_("active"),
- default=True,
- help_text=_(
- "Active tasks can be added to workflows. Deactivating a task does not remove it from existing workflows."
- ),
- )
- objects = TaskManager()
- admin_form_fields = ["name"]
- admin_form_readonly_on_edit_fields = ["name"]
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if not self.id:
- # this model is being newly created
- # rather than retrieved from the db;
- if not self.content_type_id:
- # set content type to correctly represent the model class
- # that this was created as
- self.content_type = ContentType.objects.get_for_model(self)
- def __str__(self):
- return self.name
- @property
- def workflows(self):
- """Returns all ``Workflow`` instances that use this task"""
- return Workflow.objects.filter(workflow_tasks__task=self)
- @property
- def active_workflows(self):
- """Return a ``QuerySet``` of active workflows that this task is part of"""
- return Workflow.objects.active().filter(workflow_tasks__task=self)
- @classmethod
- def get_verbose_name(cls):
- """
- Returns the human-readable "verbose name" of this task model e.g "Group approval task".
- """
- # This is similar to doing cls._meta.verbose_name.title()
- # except this doesn't convert any characters to lowercase
- return capfirst(cls._meta.verbose_name)
- @cached_property
- def specific(self):
- """
- Return this Task in its most specific subclassed form.
- """
- # the ContentType.objects manager keeps a cache, so this should potentially
- # avoid a database lookup over doing self.content_type. I think.
- content_type = ContentType.objects.get_for_id(self.content_type_id)
- model_class = content_type.model_class()
- if model_class is None:
- # Cannot locate a model class for this content type. This might happen
- # if the codebase and database are out of sync (e.g. the model exists
- # on a different git branch and we haven't rolled back migrations before
- # switching branches); if so, the best we can do is return the page
- # unchanged.
- return self
- elif isinstance(self, model_class):
- # self is already the an instance of the most specific class
- return self
- else:
- return content_type.get_object_for_this_type(id=self.id)
- task_state_class = None
- @classmethod
- def get_task_state_class(self):
- return self.task_state_class or TaskState
- def start(self, workflow_state, user=None):
- """Start this task on the provided workflow state by creating an instance of TaskState"""
- task_state = self.get_task_state_class()(workflow_state=workflow_state)
- task_state.status = TaskState.STATUS_IN_PROGRESS
- task_state.page_revision = workflow_state.page.get_latest_revision()
- task_state.task = self
- task_state.save()
- task_submitted.send(
- sender=task_state.specific.__class__,
- instance=task_state.specific,
- user=user,
- )
- return task_state
- @transaction.atomic
- def on_action(self, task_state, user, action_name, **kwargs):
- """Performs an action on a task state determined by the ``action_name`` string passed"""
- if action_name == "approve":
- task_state.approve(user=user, **kwargs)
- elif action_name == "reject":
- task_state.reject(user=user, **kwargs)
- def user_can_access_editor(self, page, user):
- """Returns True if a user who would not normally be able to access the editor for the page should be able to if the page is currently on this task.
- Note that returning False does not remove permissions from users who would otherwise have them."""
- return False
- def page_locked_for_user(self, page, user):
- """Returns True if the page should be locked to a given user's edits. This can be used to prevent editing by non-reviewers."""
- return False
- def user_can_lock(self, page, user):
- """Returns True if a user who would not normally be able to lock the page should be able to if the page is currently on this task.
- Note that returning False does not remove permissions from users who would otherwise have them."""
- return False
- def user_can_unlock(self, page, user):
- """Returns True if a user who would not normally be able to unlock the page should be able to if the page is currently on this task.
- Note that returning False does not remove permissions from users who would otherwise have them."""
- return False
- def get_actions(self, page, user):
- """
- Get the list of action strings (name, verbose_name, whether the action requires additional data - see
- ``get_form_for_action``) for actions the current user can perform for this task on the given page.
- These strings should be the same as those able to be passed to ``on_action``
- """
- return []
- def get_form_for_action(self, action):
- return TaskStateCommentForm
- def get_template_for_action(self, action):
- return ""
- def get_task_states_user_can_moderate(self, user, **kwargs):
- """Returns a ``QuerySet`` of the task states the current user can moderate"""
- return TaskState.objects.none()
- @classmethod
- def get_description(cls):
- """Returns the task description."""
- return ""
- @transaction.atomic
- def deactivate(self, user=None):
- """Set ``active`` to False and cancel all in progress task states linked to this task"""
- self.active = False
- self.save()
- in_progress_states = TaskState.objects.filter(
- task=self, status=TaskState.STATUS_IN_PROGRESS
- )
- for state in in_progress_states:
- state.cancel(user=user)
- class Meta:
- verbose_name = _("task")
- verbose_name_plural = _("tasks")
- class WorkflowManager(models.Manager):
- def active(self):
- return self.filter(active=True)
- class Workflow(ClusterableModel):
- name = models.CharField(max_length=255, verbose_name=_("name"))
- active = models.BooleanField(
- verbose_name=_("active"),
- default=True,
- help_text=_(
- "Active workflows can be added to pages. Deactivating a workflow does not remove it from existing pages."
- ),
- )
- objects = WorkflowManager()
- def __str__(self):
- return self.name
- @property
- def tasks(self):
- """Returns all ``Task`` instances linked to this workflow"""
- return Task.objects.filter(workflow_tasks__workflow=self).order_by(
- "workflow_tasks__sort_order"
- )
- @transaction.atomic
- def start(self, page, user):
- """Initiates a workflow by creating an instance of ``WorkflowState``"""
- state = WorkflowState(
- page=page,
- workflow=self,
- status=WorkflowState.STATUS_IN_PROGRESS,
- requested_by=user,
- )
- state.save()
- state.update(user=user)
- workflow_submitted.send(sender=state.__class__, instance=state, user=user)
- next_task_data = None
- if state.current_task_state:
- next_task_data = {
- "id": state.current_task_state.task.id,
- "title": state.current_task_state.task.name,
- }
- log(
- instance=page,
- action="wagtail.workflow.start",
- data={
- "workflow": {
- "id": self.id,
- "title": self.name,
- "status": state.status,
- "next": next_task_data,
- "task_state_id": state.current_task_state.id
- if state.current_task_state
- else None,
- }
- },
- revision=page.get_latest_revision(),
- user=user,
- )
- return state
- @transaction.atomic
- def deactivate(self, user=None):
- """Sets the workflow as inactive, and cancels all in progress instances of ``WorkflowState`` linked to this workflow"""
- self.active = False
- in_progress_states = WorkflowState.objects.filter(
- workflow=self, status=WorkflowState.STATUS_IN_PROGRESS
- )
- for state in in_progress_states:
- state.cancel(user=user)
- WorkflowPage.objects.filter(workflow=self).delete()
- self.save()
- def all_pages(self):
- """
- Returns a queryset of all the pages that this Workflow applies to.
- """
- pages = Page.objects.none()
- for workflow_page in self.workflow_pages.all():
- pages |= workflow_page.get_pages()
- return pages
- class Meta:
- verbose_name = _("workflow")
- verbose_name_plural = _("workflows")
- class GroupApprovalTask(Task):
- groups = models.ManyToManyField(
- Group,
- verbose_name=_("groups"),
- help_text=_(
- "Pages at this step in a workflow will be moderated or approved by these groups of users"
- ),
- )
- admin_form_fields = Task.admin_form_fields + ["groups"]
- admin_form_widgets = {
- "groups": forms.CheckboxSelectMultiple,
- }
- def start(self, workflow_state, user=None):
- if workflow_state.page.locked_by:
- # If the person who locked the page isn't in one of the groups, unlock the page
- if not workflow_state.page.locked_by.groups.filter(
- id__in=self.groups.all()
- ).exists():
- workflow_state.page.locked = False
- workflow_state.page.locked_by = None
- workflow_state.page.locked_at = None
- workflow_state.page.save(
- update_fields=["locked", "locked_by", "locked_at"]
- )
- return super().start(workflow_state, user=user)
- def user_can_access_editor(self, page, user):
- return (
- self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
- )
- def page_locked_for_user(self, page, user):
- return not (
- self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
- )
- def user_can_lock(self, page, user):
- return self.groups.filter(id__in=user.groups.all()).exists()
- def user_can_unlock(self, page, user):
- return False
- def get_actions(self, page, user):
- if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
- return [
- ("reject", _("Request changes"), True),
- ("approve", _("Approve"), False),
- ("approve", _("Approve with comment"), True),
- ]
- return []
- def get_task_states_user_can_moderate(self, user, **kwargs):
- if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
- return TaskState.objects.filter(
- status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr
- )
- else:
- return TaskState.objects.none()
- @classmethod
- def get_description(cls):
- return _("Members of the chosen Wagtail Groups can approve this task")
- class Meta:
- verbose_name = _("Group approval task")
- verbose_name_plural = _("Group approval tasks")
- class WorkflowStateManager(models.Manager):
- def active(self):
- """
- Filters to only STATUS_IN_PROGRESS and STATUS_NEEDS_CHANGES WorkflowStates
- """
- return self.filter(
- Q(status=WorkflowState.STATUS_IN_PROGRESS)
- | Q(status=WorkflowState.STATUS_NEEDS_CHANGES)
- )
- class WorkflowState(models.Model):
- """Tracks the status of a started Workflow on a Page."""
- STATUS_IN_PROGRESS = "in_progress"
- STATUS_APPROVED = "approved"
- STATUS_NEEDS_CHANGES = "needs_changes"
- STATUS_CANCELLED = "cancelled"
- STATUS_CHOICES = (
- (STATUS_IN_PROGRESS, _("In progress")),
- (STATUS_APPROVED, _("Approved")),
- (STATUS_NEEDS_CHANGES, _("Needs changes")),
- (STATUS_CANCELLED, _("Cancelled")),
- )
- page = models.ForeignKey(
- "Page",
- on_delete=models.CASCADE,
- verbose_name=_("page"),
- related_name="workflow_states",
- )
- workflow = models.ForeignKey(
- "Workflow",
- on_delete=models.CASCADE,
- verbose_name=_("workflow"),
- related_name="workflow_states",
- )
- status = models.fields.CharField(
- choices=STATUS_CHOICES,
- verbose_name=_("status"),
- max_length=50,
- default=STATUS_IN_PROGRESS,
- )
- created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at"))
- requested_by = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- verbose_name=_("requested by"),
- null=True,
- blank=True,
- editable=True,
- on_delete=models.SET_NULL,
- related_name="requested_workflows",
- )
- current_task_state = models.OneToOneField(
- "TaskState",
- on_delete=models.SET_NULL,
- null=True,
- blank=True,
- verbose_name=_("current task state"),
- )
- # allows a custom function to be called on finishing the Workflow successfully.
- on_finish = import_string(
- getattr(
- settings,
- "WAGTAIL_FINISH_WORKFLOW_ACTION",
- "wagtail.workflows.publish_workflow_state",
- )
- )
- objects = WorkflowStateManager()
- def clean(self):
- super().clean()
- if self.status in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
- # The unique constraint is conditional, and so not supported on the MySQL backend - so an additional check is done here
- if (
- WorkflowState.objects.active()
- .filter(page=self.page)
- .exclude(pk=self.pk)
- .exists()
- ):
- raise ValidationError(
- _(
- "There may only be one in progress or needs changes workflow state per page."
- )
- )
- def save(self, *args, **kwargs):
- self.full_clean()
- return super().save(*args, **kwargs)
- def __str__(self):
- return _("Workflow '{0}' on Page '{1}': {2}").format(
- self.workflow, self.page, self.status
- )
- def resume(self, user=None):
- """Put a STATUS_NEEDS_CHANGES workflow state back into STATUS_IN_PROGRESS, and restart the current task"""
- if self.status != self.STATUS_NEEDS_CHANGES:
- raise PermissionDenied
- revision = self.current_task_state.page_revision
- current_task_state = self.current_task_state
- self.current_task_state = None
- self.status = self.STATUS_IN_PROGRESS
- self.save()
- log(
- instance=self.page.specific,
- action="wagtail.workflow.resume",
- data={
- "workflow": {
- "id": self.workflow_id,
- "title": self.workflow.name,
- "status": self.status,
- "task_state_id": current_task_state.id,
- "task": {
- "id": current_task_state.task.id,
- "title": current_task_state.task.name,
- },
- }
- },
- revision=revision,
- user=user,
- )
- return self.update(user=user, next_task=current_task_state.task)
- def user_can_cancel(self, user):
- if self.page.locked and self.page.locked_by != user:
- return False
- return (
- user == self.requested_by
- or user == self.page.owner
- or (
- self.current_task_state
- and self.current_task_state.status
- == self.current_task_state.STATUS_IN_PROGRESS
- and "approve"
- in [
- action[0]
- for action in self.current_task_state.task.get_actions(
- self.page, user
- )
- ]
- )
- )
- def update(self, user=None, next_task=None):
- """Checks the status of the current task, and progresses (or ends) the workflow if appropriate. If the workflow progresses,
- next_task will be used to start a specific task next if provided."""
- if self.status != self.STATUS_IN_PROGRESS:
- # Updating a completed or cancelled workflow should have no effect
- return
- try:
- current_status = self.current_task_state.status
- except AttributeError:
- current_status = None
- if current_status == TaskState.STATUS_REJECTED:
- self.status = self.STATUS_NEEDS_CHANGES
- self.save()
- workflow_rejected.send(sender=self.__class__, instance=self, user=user)
- else:
- if not next_task:
- next_task = self.get_next_task()
- if next_task:
- if (
- (not self.current_task_state)
- or self.current_task_state.status
- != self.current_task_state.STATUS_IN_PROGRESS
- ):
- # if not on a task, or the next task to move to is not the current task (ie current task's status is
- # not STATUS_IN_PROGRESS), move to the next task
- self.current_task_state = next_task.specific.start(self, user=user)
- self.save()
- # if task has auto-approved, update the workflow again
- if (
- self.current_task_state.status
- != self.current_task_state.STATUS_IN_PROGRESS
- ):
- self.update(user=user)
- # otherwise, continue on the current task
- else:
- # if there is no uncompleted task, finish the workflow.
- self.finish(user=user)
- @property
- def successful_task_states(self):
- successful_task_states = self.task_states.filter(
- Q(status=TaskState.STATUS_APPROVED) | Q(status=TaskState.STATUS_SKIPPED)
- )
- if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
- successful_task_states = successful_task_states.filter(
- page_revision=self.page.get_latest_revision()
- )
- return successful_task_states
- def get_next_task(self):
- """Returns the next active task, which has not been either approved or skipped"""
- return (
- Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)
- .exclude(task_states__in=self.successful_task_states)
- .order_by("workflow_tasks__sort_order")
- .first()
- )
- def cancel(self, user=None):
- """Cancels the workflow state"""
- if self.status not in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
- raise PermissionDenied
- self.status = self.STATUS_CANCELLED
- self.save()
- log(
- instance=self.page.specific,
- action="wagtail.workflow.cancel",
- data={
- "workflow": {
- "id": self.workflow_id,
- "title": self.workflow.name,
- "status": self.status,
- "task_state_id": self.current_task_state.id,
- "task": {
- "id": self.current_task_state.task.id,
- "title": self.current_task_state.task.name,
- },
- }
- },
- revision=self.current_task_state.page_revision,
- user=user,
- )
- for state in self.task_states.filter(status=TaskState.STATUS_IN_PROGRESS):
- # Cancel all in progress task states
- state.specific.cancel(user=user)
- workflow_cancelled.send(sender=self.__class__, instance=self, user=user)
- @transaction.atomic
- def finish(self, user=None):
- """Finishes a successful in progress workflow, marking it as approved and performing the ``on_finish`` action"""
- if self.status != self.STATUS_IN_PROGRESS:
- raise PermissionDenied
- self.status = self.STATUS_APPROVED
- self.save()
- self.on_finish(user=user)
- workflow_approved.send(sender=self.__class__, instance=self, user=user)
- def copy_approved_task_states_to_revision(self, revision):
- """This creates copies of previously approved task states with page_revision set to a different revision."""
- approved_states = TaskState.objects.filter(
- workflow_state=self, status=TaskState.STATUS_APPROVED
- )
- for state in approved_states:
- state.copy(update_attrs={"page_revision": revision})
- def revisions(self):
- """Returns all page revisions associated with task states linked to the current workflow state"""
- return Revision.page_revisions.filter(
- object_id=str(self.page_id),
- id__in=self.task_states.values_list("page_revision_id", flat=True),
- ).defer("content")
- def _get_applicable_task_states(self):
- """Returns the set of task states whose status applies to the current revision"""
- task_states = TaskState.objects.filter(workflow_state_id=self.id)
- # If WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=True, this is only task states created on the current revision
- if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
- latest_revision_id = (
- self.revisions()
- .order_by("-created_at", "-id")
- .values_list("id", flat=True)
- .first()
- )
- task_states = task_states.filter(page_revision_id=latest_revision_id)
- return task_states
- def all_tasks_with_status(self):
- """
- Returns a list of Task objects that are linked with this workflow state's
- workflow. The status of that task in this workflow state is annotated in the
- `.status` field. And a displayable version of that status is annotated in the
- `.status_display` field.
- This is different to querying TaskState as it also returns tasks that haven't
- been started yet (so won't have a TaskState).
- """
- # Get the set of task states whose status applies to the current revision
- task_states = self._get_applicable_task_states()
- tasks = list(
- self.workflow.tasks.annotate(
- status=Subquery(
- task_states.filter(
- task_id=OuterRef("id"),
- )
- .order_by("-started_at", "-id")
- .values("status")[:1]
- ),
- )
- )
- # Manually annotate status_display
- status_choices = dict(TaskState.STATUS_CHOICES)
- for task in tasks:
- task.status_display = status_choices.get(task.status, _("Not started"))
- return tasks
- def all_tasks_with_state(self):
- """
- Returns a list of Task objects that are linked with this WorkflowState's
- workflow, and have the latest task state.
- In a "Submit for moderation -> reject at step 1 -> resubmit -> accept" workflow, this ensures
- the task list reflects the accept, rather than the reject.
- """
- task_states = self._get_applicable_task_states()
- tasks = list(
- self.workflow.tasks.annotate(
- task_state_id=Subquery(
- task_states.filter(
- task_id=OuterRef("id"),
- )
- .order_by("-started_at", "-id")
- .values("id")[:1]
- ),
- )
- )
- task_states = {task_state.id: task_state for task_state in task_states}
- # Manually annotate task_state
- for task in tasks:
- task.task_state = task_states.get(task.task_state_id)
- return tasks
- @property
- def is_active(self):
- return self.status not in [self.STATUS_APPROVED, self.STATUS_CANCELLED]
- @property
- def is_at_final_task(self):
- """Returns the next active task, which has not been either approved or skipped"""
- last_task = (
- Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)
- .exclude(task_states__in=self.successful_task_states)
- .order_by("workflow_tasks__sort_order")
- .last()
- )
- return self.get_next_task() == last_task
- class Meta:
- verbose_name = _("Workflow state")
- verbose_name_plural = _("Workflow states")
- # prevent multiple STATUS_IN_PROGRESS/STATUS_NEEDS_CHANGES workflows for the same page. This is only supported by specific databases (e.g. Postgres, SQL Server), so is checked additionally on save.
- constraints = [
- models.UniqueConstraint(
- fields=["page"],
- condition=Q(status__in=("in_progress", "needs_changes")),
- name="unique_in_progress_workflow",
- )
- ]
- class TaskStateManager(models.Manager):
- def reviewable_by(self, user):
- tasks = Task.objects.filter(active=True)
- states = TaskState.objects.none()
- for task in tasks:
- states = states | task.specific.get_task_states_user_can_moderate(user=user)
- return states
- class TaskState(models.Model):
- """Tracks the status of a given Task for a particular page revision."""
- STATUS_IN_PROGRESS = "in_progress"
- STATUS_APPROVED = "approved"
- STATUS_REJECTED = "rejected"
- STATUS_SKIPPED = "skipped"
- STATUS_CANCELLED = "cancelled"
- STATUS_CHOICES = (
- (STATUS_IN_PROGRESS, _("In progress")),
- (STATUS_APPROVED, _("Approved")),
- (STATUS_REJECTED, _("Rejected")),
- (STATUS_SKIPPED, _("Skipped")),
- (STATUS_CANCELLED, _("Cancelled")),
- )
- workflow_state = models.ForeignKey(
- "WorkflowState",
- on_delete=models.CASCADE,
- verbose_name=_("workflow state"),
- related_name="task_states",
- )
- page_revision = models.ForeignKey(
- "Revision",
- on_delete=models.CASCADE,
- verbose_name=_("page revision"),
- related_name="task_states",
- )
- task = models.ForeignKey(
- "Task",
- on_delete=models.CASCADE,
- verbose_name=_("task"),
- related_name="task_states",
- )
- status = models.fields.CharField(
- choices=STATUS_CHOICES,
- verbose_name=_("status"),
- max_length=50,
- default=STATUS_IN_PROGRESS,
- )
- started_at = models.DateTimeField(verbose_name=_("started at"), auto_now_add=True)
- finished_at = models.DateTimeField(
- verbose_name=_("finished at"), blank=True, null=True
- )
- finished_by = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- verbose_name=_("finished by"),
- null=True,
- blank=True,
- on_delete=models.SET_NULL,
- related_name="finished_task_states",
- )
- comment = models.TextField(blank=True)
- content_type = models.ForeignKey(
- ContentType,
- verbose_name=_("content type"),
- related_name="wagtail_task_states",
- on_delete=models.CASCADE,
- )
- exclude_fields_in_copy = []
- default_exclude_fields_in_copy = ["id"]
- objects = TaskStateManager()
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if not self.id:
- # this model is being newly created
- # rather than retrieved from the db;
- if not self.content_type_id:
- # set content type to correctly represent the model class
- # that this was created as
- self.content_type = ContentType.objects.get_for_model(self)
- def __str__(self):
- return _("Task '{0}' on Page Revision '{1}': {2}").format(
- self.task, self.page_revision, self.status
- )
- @cached_property
- def specific(self):
- """
- Return this TaskState in its most specific subclassed form.
- """
- # the ContentType.objects manager keeps a cache, so this should potentially
- # avoid a database lookup over doing self.content_type. I think.
- content_type = ContentType.objects.get_for_id(self.content_type_id)
- model_class = content_type.model_class()
- if model_class is None:
- # Cannot locate a model class for this content type. This might happen
- # if the codebase and database are out of sync (e.g. the model exists
- # on a different git branch and we haven't rolled back migrations before
- # switching branches); if so, the best we can do is return the page
- # unchanged.
- return self
- elif isinstance(self, model_class):
- # self is already the an instance of the most specific class
- return self
- else:
- return content_type.get_object_for_this_type(id=self.id)
- @transaction.atomic
- def approve(self, user=None, update=True, comment=""):
- """Approve the task state and update the workflow state"""
- if self.status != self.STATUS_IN_PROGRESS:
- raise PermissionDenied
- self.status = self.STATUS_APPROVED
- self.finished_at = timezone.now()
- self.finished_by = user
- self.comment = comment
- self.save()
- self.log_state_change_action(user, "approve")
- if update:
- self.workflow_state.update(user=user)
- task_approved.send(
- sender=self.specific.__class__, instance=self.specific, user=user
- )
- return self
- @transaction.atomic
- def reject(self, user=None, update=True, comment=""):
- """Reject the task state and update the workflow state"""
- if self.status != self.STATUS_IN_PROGRESS:
- raise PermissionDenied
- self.status = self.STATUS_REJECTED
- self.finished_at = timezone.now()
- self.finished_by = user
- self.comment = comment
- self.save()
- self.log_state_change_action(user, "reject")
- if update:
- self.workflow_state.update(user=user)
- task_rejected.send(
- sender=self.specific.__class__, instance=self.specific, user=user
- )
- return self
- @cached_property
- def task_type_started_at(self):
- """Finds the first chronological started_at for successive TaskStates - ie started_at if the task had not been restarted"""
- task_states = (
- TaskState.objects.filter(workflow_state=self.workflow_state)
- .order_by("-started_at")
- .select_related("task")
- )
- started_at = None
- for task_state in task_states:
- if task_state.task == self.task:
- started_at = task_state.started_at
- elif started_at:
- break
- return started_at
- @transaction.atomic
- def cancel(self, user=None, resume=False, comment=""):
- """Cancel the task state and update the workflow state. If ``resume`` is set to True, then upon update the workflow state
- is passed the current task as ``next_task``, causing it to start a new task state on the current task if possible"""
- self.status = self.STATUS_CANCELLED
- self.finished_at = timezone.now()
- self.comment = comment
- self.finished_by = user
- self.save()
- if resume:
- self.workflow_state.update(user=user, next_task=self.task.specific)
- else:
- self.workflow_state.update(user=user)
- task_cancelled.send(
- sender=self.specific.__class__, instance=self.specific, user=user
- )
- return self
- def copy(self, update_attrs=None, exclude_fields=None):
- """Copy this task state, excluding the attributes in the ``exclude_fields`` list and updating any attributes to values
- specified in the ``update_attrs`` dictionary of ``attribute``: ``new value`` pairs"""
- exclude_fields = (
- self.default_exclude_fields_in_copy
- + self.exclude_fields_in_copy
- + (exclude_fields or [])
- )
- instance, child_object_map = _copy(self.specific, exclude_fields, update_attrs)
- instance.save()
- _copy_m2m_relations(self, instance, exclude_fields=exclude_fields)
- return instance
- def get_comment(self):
- """
- Returns a string that is displayed in workflow history.
- This could be a comment by the reviewer, or generated.
- Use mark_safe to return HTML.
- """
- return self.comment
- def log_state_change_action(self, user, action):
- """Log the approval/rejection action"""
- page = self.page_revision.as_object()
- next_task = self.workflow_state.get_next_task()
- next_task_data = None
- if next_task:
- next_task_data = {"id": next_task.id, "title": next_task.name}
- log(
- instance=page,
- action="wagtail.workflow.{}".format(action),
- user=user,
- data={
- "workflow": {
- "id": self.workflow_state.workflow.id,
- "title": self.workflow_state.workflow.name,
- "status": self.status,
- "task_state_id": self.id,
- "task": {
- "id": self.task.id,
- "title": self.task.name,
- },
- "next": next_task_data,
- },
- "comment": self.get_comment(),
- },
- revision=self.page_revision,
- )
- class Meta:
- verbose_name = _("Task state")
- verbose_name_plural = _("Task states")
- class PageLogEntryQuerySet(LogEntryQuerySet):
- def get_content_type_ids(self):
- # for reporting purposes, pages of all types are combined under a single "Page"
- # object type
- if self.exists():
- return {ContentType.objects.get_for_model(Page).pk}
- else:
- return set()
- def filter_on_content_type(self, content_type):
- if content_type == ContentType.objects.get_for_model(Page):
- return self
- else:
- return self.none()
- class PageLogEntryManager(BaseLogEntryManager):
- def get_queryset(self):
- return PageLogEntryQuerySet(self.model, using=self._db)
- def get_instance_title(self, instance):
- return instance.specific_deferred.get_admin_display_title()
- def log_action(self, instance, action, **kwargs):
- kwargs.update(page=instance)
- return super().log_action(instance, action, **kwargs)
- def viewable_by_user(self, user):
- q = Q(
- page__in=UserPagePermissionsProxy(user)
- .explorable_pages()
- .values_list("pk", flat=True)
- )
- root_page_permissions = Page.get_first_root_node().permissions_for_user(user)
- if (
- user.is_superuser
- or root_page_permissions.can_add_subpage()
- or root_page_permissions.can_edit()
- ):
- # Include deleted entries
- q = q | Q(
- page_id__in=Subquery(
- PageLogEntry.objects.filter(deleted=True).values("page_id")
- )
- )
- return PageLogEntry.objects.filter(q)
- class PageLogEntry(BaseLogEntry):
- page = models.ForeignKey(
- "wagtailcore.Page",
- on_delete=models.DO_NOTHING,
- db_constraint=False,
- related_name="+",
- )
- objects = PageLogEntryManager()
- class Meta:
- ordering = ["-timestamp", "-id"]
- verbose_name = _("page log entry")
- verbose_name_plural = _("page log entries")
- def __str__(self):
- return "PageLogEntry %d: '%s' on '%s' with id %s" % (
- self.pk,
- self.action,
- self.object_verbose_name(),
- self.page_id,
- )
- @cached_property
- def object_id(self):
- return self.page_id
- @cached_property
- def message(self):
- # for page log entries, the 'edit' action should show as 'Draft saved'
- if self.action == "wagtail.edit":
- return _("Draft saved")
- else:
- return super().message
- class Comment(ClusterableModel):
- """
- A comment on a field, or a field within a streamfield block
- """
- page = ParentalKey(
- Page, on_delete=models.CASCADE, related_name=COMMENTS_RELATION_NAME
- )
- user = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- on_delete=models.CASCADE,
- related_name=COMMENTS_RELATION_NAME,
- )
- text = models.TextField()
- contentpath = models.TextField()
- # This stores the field or field within a streamfield block that the comment is applied on, in the form: 'field', or 'field.block_id.field'
- # This must be unchanging across all revisions, so we will not support (current-format) ListBlock or the contents of InlinePanels initially.
- position = models.TextField(blank=True)
- # This stores the position within a field, to be interpreted by the field's frontend widget. It may change between revisions
- created_at = models.DateTimeField(auto_now_add=True)
- updated_at = models.DateTimeField(auto_now=True)
- revision_created = models.ForeignKey(
- Revision,
- on_delete=models.CASCADE,
- related_name="created_comments",
- null=True,
- blank=True,
- )
- resolved_at = models.DateTimeField(null=True, blank=True)
- resolved_by = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- on_delete=models.SET_NULL,
- related_name="comments_resolved",
- null=True,
- blank=True,
- )
- class Meta:
- verbose_name = _("comment")
- verbose_name_plural = _("comments")
- def __str__(self):
- return "Comment on Page '{0}', left by {1}: '{2}'".format(
- self.page, self.user, self.text
- )
- def save(self, update_position=False, **kwargs):
- # Don't save the position unless specifically instructed to, as the position will normally be retrieved from the revision
- update_fields = kwargs.pop("update_fields", None)
- if not update_position and (
- not update_fields or "position" not in update_fields
- ):
- if self.id:
- # The instance is already saved; we can use `update_fields`
- update_fields = (
- update_fields if update_fields else self._meta.get_fields()
- )
- update_fields = [
- field.name
- for field in update_fields
- if field.name not in {"position", "id"}
- ]
- else:
- # This is a new instance, we have to preserve and then restore the position via a variable
- position = self.position
- result = super().save(**kwargs)
- self.position = position
- return result
- return super().save(update_fields=update_fields, **kwargs)
- def _log(self, action, page_revision=None, user=None):
- log(
- instance=self.page,
- action=action,
- user=user,
- revision=page_revision,
- data={
- "comment": {
- "id": self.pk,
- "contentpath": self.contentpath,
- "text": self.text,
- }
- },
- )
- def log_create(self, **kwargs):
- self._log("wagtail.comments.create", **kwargs)
- def log_edit(self, **kwargs):
- self._log("wagtail.comments.edit", **kwargs)
- def log_resolve(self, **kwargs):
- self._log("wagtail.comments.resolve", **kwargs)
- def log_delete(self, **kwargs):
- self._log("wagtail.comments.delete", **kwargs)
- class CommentReply(models.Model):
- comment = ParentalKey(Comment, on_delete=models.CASCADE, related_name="replies")
- user = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- on_delete=models.CASCADE,
- related_name="comment_replies",
- )
- text = models.TextField()
- created_at = models.DateTimeField(auto_now_add=True)
- updated_at = models.DateTimeField(auto_now=True)
- class Meta:
- verbose_name = _("comment reply")
- verbose_name_plural = _("comment replies")
- def __str__(self):
- return "CommentReply left by '{0}': '{1}'".format(self.user, self.text)
- def _log(self, action, page_revision=None, user=None):
- log(
- instance=self.comment.page,
- action=action,
- user=user,
- revision=page_revision,
- data={
- "comment": {
- "id": self.comment.pk,
- "contentpath": self.comment.contentpath,
- "text": self.comment.text,
- },
- "reply": {
- "id": self.pk,
- "text": self.text,
- },
- },
- )
- def log_create(self, **kwargs):
- self._log("wagtail.comments.create_reply", **kwargs)
- def log_edit(self, **kwargs):
- self._log("wagtail.comments.edit_reply", **kwargs)
- def log_delete(self, **kwargs):
- self._log("wagtail.comments.delete_reply", **kwargs)
- class PageSubscription(models.Model):
- user = models.ForeignKey(
- settings.AUTH_USER_MODEL,
- on_delete=models.CASCADE,
- related_name="page_subscriptions",
- )
- page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name="subscribers")
- comment_notifications = models.BooleanField()
- class Meta:
- unique_together = [
- ("page", "user"),
- ]
- class ReferenceGroups:
- def __init__(self, qs):
- self.qs = qs
- def __iter__(self):
- object = None
- references = []
- for reference in self.qs.order_by("base_content_type", "object_id"):
- if object != (reference.base_content_type, reference.object_id):
- if object is not None:
- yield object[0].get_object_for_this_type(pk=object[1]), references
- references = []
- object = (reference.base_content_type, reference.object_id)
- references.append(reference)
- if references:
- yield object[0].get_object_for_this_type(pk=object[1]), references
- def __len__(self):
- return (
- self.qs.order_by("base_content_type", "object_id")
- .values("base_content_type", "object_id")
- .distinct()
- .count()
- )
- def count(self):
- return len(self)
- def __getitem__(self, key):
- return list(self)[key]
- class ReferenceIndexQuerySet(models.QuerySet):
- def group_by_source_object(self):
- return ReferenceGroups(self)
- class ReferenceIndex(models.Model):
- """
- Records references between objects for quick retrieval of object usage.
- References are extracted from Foreign Keys, Chooser Blocks in StreamFields, and links in Rich Text Fields.
- This index allows us to efficiently find all of the references to a particular object from all of these sources.
- """
- # The object where the reference was extracted from
- # content_type represents the content type of the model that contains
- # the field where the reference came from. If the model sub-classes another
- # concrete model (such as Page), that concrete model will be set in
- # base_content_type, otherwise it would be the same as content_type
- content_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, related_name="+"
- )
- base_content_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, related_name="+"
- )
- object_id = models.CharField(
- max_length=255,
- verbose_name=_("object id"),
- )
- # The object that has been referenced
- # to_content_type is always the base content type of the referenced object
- to_content_type = models.ForeignKey(
- ContentType, on_delete=models.CASCADE, related_name="+"
- )
- to_object_id = models.CharField(
- max_length=255,
- verbose_name=_("object id"),
- )
- # The model_path is the path to the field on content_type where the reference was extracted from.
- # the content_path is the path to a specific block on the instance where the reference is extracted from.
- # These are dotted path, always starting with a field orchild relation name. If
- # the reference was extracted from an inline panel or streamfield, other components
- # of the path can be used to locate where the reference was extracted.
- #
- # For example, say we have a StreamField called 'body' which has a struct block type
- # called 'my_struct_block' that has a field called 'my_field'. If we extracted a
- # reference from that field, the model_path would be set to the following:
- #
- # 'body.my_struct_block.my_field'
- #
- # The content path would follow the same format, but anything repeatable would be replaced by an ID.
- # For example:
- #
- # 'body.bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86.my_field'
- #
- # We can use the model_path with the 'content_type' to find the original definition of
- # the field block and display information to the user about where the reference was
- # extracted from.
- #
- # We can use the content_path to link the user directly to the block/field that contains
- # the reference.
- model_path = models.TextField()
- content_path = models.TextField()
- # We need a separate hash field for content_path in order to use it in a unique key because
- # MySQL has a limit to the size of fields that are included in unique keys
- content_path_hash = models.UUIDField()
- objects = ReferenceIndexQuerySet.as_manager()
- wagtail_reference_index_ignore = True
- class Meta:
- unique_together = [
- (
- "base_content_type",
- "object_id",
- "to_content_type",
- "to_object_id",
- "content_path_hash",
- )
- ]
- @classmethod
- def _get_base_content_type(cls, model_or_object):
- parents = model_or_object._meta.get_parent_list()
- if parents:
- return ContentType.objects.get_for_model(
- parents[-1], for_concrete_model=False
- )
- else:
- return ContentType.objects.get_for_model(
- model_or_object, for_concrete_model=False
- )
- @classmethod
- def model_is_indexible(cls, model, allow_child_models=False):
- """
- Returns True if the given model may have outbound references that we would be interested in recording in the index.
- """
- if getattr(model, "wagtail_reference_index_ignore", False):
- return False
- # Don't check any models that have a parental key, references from these will be collected from the parent
- if not allow_child_models and any(
- [isinstance(field, ParentalKey) for field in model._meta.get_fields()]
- ):
- return False
- for field in model._meta.get_fields():
- if field.is_relation and field.many_to_one:
- if getattr(field, "wagtail_reference_index_ignore", False):
- continue
- if getattr(
- field.related_model, "wagtail_reference_index_ignore", False
- ):
- continue
- if isinstance(field, (ParentalKey, GenericRel)):
- continue
- return True
- if isinstance(field, (StreamField, RichTextField)):
- return True
- if issubclass(model, ClusterableModel):
- for child_relation in get_all_child_relations(model):
- if cls.model_is_indexible(
- child_relation.related_model,
- allow_child_models=True,
- ):
- return True
- return False
- @classmethod
- def _extract_references_from_object(cls, object):
- # Extract references from fields
- for field in object._meta.get_fields():
- if field.is_relation and field.many_to_one:
- if getattr(field, "wagtail_reference_index_ignore", False):
- continue
- if getattr(
- field.related_model, "wagtail_reference_index_ignore", False
- ):
- continue
- if isinstance(field, (ParentalKey, GenericRel)):
- continue
- if isinstance(field, GenericForeignKey):
- ct_field = object._meta.get_field(field.ct_field)
- fk_field = object._meta.get_field(field.fk_field)
- yield ct_field.value_from_object(object), str(
- fk_field.value_from_object(object)
- ), field.name, field.name
- continue
- if isinstance(field, GenericRel):
- continue
- value = field.value_from_object(object)
- if value is not None:
- yield cls._get_base_content_type(field.related_model).id, str(
- value
- ), field.name, field.name
- if isinstance(field, (StreamField, RichTextField)):
- value = field.value_from_object(object)
- if value is not None:
- yield from (
- (
- cls._get_base_content_type(to_model).id,
- to_object_id,
- f"{field.name}.{model_path}",
- f"{field.name}.{content_path}",
- )
- for to_model, to_object_id, model_path, content_path in field.extract_references(
- value
- )
- )
- # Extract references from child relations
- if isinstance(object, ClusterableModel):
- for child_relation in get_all_child_relations(object):
- relation_name = child_relation.get_accessor_name()
- child_objects = getattr(object, relation_name).all()
- for child_object in child_objects:
- yield from (
- (
- to_content_type_id,
- to_object_id,
- f"{relation_name}.item.{model_path}",
- f"{relation_name}.{str(child_object.id)}.{content_path}",
- )
- for to_content_type_id, to_object_id, model_path, content_path in cls._extract_references_from_object(
- child_object
- )
- )
- @classmethod
- def _get_content_path_hash(cls, content_path):
- return uuid.uuid5(
- uuid.UUID("bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86"), content_path
- )
- @classmethod
- def create_or_update_for_object(cls, object):
- # Extract new references
- references = set(cls._extract_references_from_object(object))
- # Find existing references in the database so we know what to add/delete
- content_type = ContentType.objects.get_for_model(object)
- base_content_type = cls._get_base_content_type(object)
- existing_references = {
- (to_content_type_id, to_object_id, model_path, content_path): id
- for id, to_content_type_id, to_object_id, model_path, content_path in cls.objects.filter(
- base_content_type=base_content_type, object_id=object.pk
- ).values_list(
- "id", "to_content_type", "to_object_id", "model_path", "content_path"
- )
- }
- # Add new references
- new_references = references - set(existing_references.keys())
- cls.objects.bulk_create(
- [
- cls(
- content_type=content_type,
- base_content_type=base_content_type,
- object_id=object.pk,
- to_content_type_id=to_content_type_id,
- to_object_id=to_object_id,
- model_path=model_path,
- content_path=content_path,
- content_path_hash=cls._get_content_path_hash(content_path),
- )
- for to_content_type_id, to_object_id, model_path, content_path in new_references
- ]
- )
- # Delete removed references
- deleted_references = set(existing_references.keys()) - references
- cls.objects.filter(
- id__in=[existing_references[reference] for reference in deleted_references]
- ).delete()
- @classmethod
- def remove_for_object(cls, object):
- base_content_type = cls._get_base_content_type(object)
- cls.objects.filter(
- base_content_type=base_content_type, object_id=object.pk
- ).delete()
- @classmethod
- def get_references_for_object(cls, object):
- return cls.objects.filter(
- base_content_type_id=cls._get_base_content_type(object),
- object_id=object.pk,
- )
- @classmethod
- def get_references_to(cls, object):
- return cls.objects.filter(
- to_content_type_id=cls._get_base_content_type(object),
- to_object_id=object.pk,
- )
- def describe_source_field(self):
- model_path_components = self.model_path.split(".")
- field_name = model_path_components[0]
- field = self.content_type.model_class()._meta.get_field(field_name)
- # ManyToOneRel (reverse accessor for ParentalKey) does not have a verbose name. So get the name of the child field instead
- if isinstance(field, models.ManyToOneRel):
- child_field = field.related_model._meta.get_field(model_path_components[2])
- return capfirst(child_field.verbose_name)
- else:
- return capfirst(field.verbose_name)
|