__init__.py 178 KB


  1. """
  2. wagtail.models is split into submodules for maintainability. All definitions intended as
  3. public should be imported here (with 'noqa' comments as required) and outside code should continue
  4. to import them from wagtail.models (e.g. `from wagtail.models import Site`, not
  5. `from wagtail.models.sites import Site`.)
  6. Submodules should take care to keep the direction of dependencies consistent; where possible they
  7. should implement low-level generic functionality which is then imported by higher-level models such
  8. as Page.
  9. """
  10. import functools
  11. import logging
  12. import uuid
  13. import warnings
  14. from io import StringIO
  15. from urllib.parse import urlparse
  16. from django import forms
  17. from django.conf import settings
  18. from django.contrib.auth.models import Group
  19. from django.contrib.contenttypes.fields import (
  20. GenericForeignKey,
  21. GenericRel,
  22. GenericRelation,
  23. )
  24. from django.contrib.contenttypes.models import ContentType
  25. from django.core import checks
  26. from django.core.cache import cache
  27. from django.core.exceptions import (
  28. ImproperlyConfigured,
  29. PermissionDenied,
  30. ValidationError,
  31. )
  32. from django.core.handlers.base import BaseHandler
  33. from django.core.handlers.wsgi import WSGIRequest
  34. from django.core.serializers.json import DjangoJSONEncoder
  35. from django.db import models, transaction
  36. from django.db.models import DEFERRED, Q, Value
  37. from django.db.models.expressions import OuterRef, Subquery
  38. from django.db.models.functions import Cast, Concat, Substr
  39. from django.dispatch import receiver
  40. from django.http import Http404
  41. from django.template.response import TemplateResponse
  42. from django.urls import NoReverseMatch, reverse
  43. from django.utils import timezone
  44. from django.utils import translation as translation
  45. from django.utils.cache import patch_cache_control
  46. from django.utils.encoding import force_str
  47. from django.utils.functional import cached_property
  48. from django.utils.module_loading import import_string
  49. from django.utils.text import capfirst, slugify
  50. from django.utils.translation import gettext_lazy as _
  51. from modelcluster.fields import ParentalKey
  52. from modelcluster.models import (
  53. ClusterableModel,
  54. get_all_child_relations,
  55. get_serializable_data_for_fields,
  56. model_from_serializable_data,
  57. )
  58. from treebeard.mp_tree import MP_Node
  59. from wagtail.actions.copy_for_translation import CopyPageForTranslationAction
  60. from wagtail.actions.copy_page import CopyPageAction
  61. from wagtail.actions.create_alias import CreatePageAliasAction
  62. from wagtail.actions.delete_page import DeletePageAction
  63. from wagtail.actions.move_page import MovePageAction
  64. from wagtail.actions.publish_page_revision import PublishPageRevisionAction
  65. from wagtail.actions.publish_revision import PublishRevisionAction
  66. from wagtail.actions.unpublish import UnpublishAction
  67. from wagtail.actions.unpublish_page import UnpublishPageAction
  68. from wagtail.coreutils import (
  69. WAGTAIL_APPEND_SLASH,
  70. camelcase_to_underscore,
  71. get_supported_content_language_variant,
  72. resolve_model_string,
  73. )
  74. from wagtail.fields import RichTextField, StreamField
  75. from wagtail.forms import TaskStateCommentForm
  76. from wagtail.locks import BasicLock, ScheduledForPublishLock, WorkflowLock
  77. from wagtail.log_actions import log
  78. from wagtail.query import PageQuerySet
  79. from wagtail.search import index
  80. from wagtail.signals import (
  81. page_published,
  82. page_slug_changed,
  83. pre_validate_delete,
  84. task_approved,
  85. task_cancelled,
  86. task_rejected,
  87. task_submitted,
  88. workflow_approved,
  89. workflow_cancelled,
  90. workflow_rejected,
  91. workflow_submitted,
  92. )
  93. from wagtail.url_routing import RouteResult
  94. from wagtail.utils.deprecation import RemovedInWagtail50Warning
  95. from .audit_log import ( # noqa
  96. BaseLogEntry,
  97. BaseLogEntryManager,
  98. LogEntryQuerySet,
  99. ModelLogEntry,
  100. )
  101. from .collections import ( # noqa
  102. BaseCollectionManager,
  103. Collection,
  104. CollectionManager,
  105. CollectionMember,
  106. CollectionViewRestriction,
  107. GroupCollectionPermission,
  108. GroupCollectionPermissionManager,
  109. get_root_collection_id,
  110. )
  111. from .copying import _copy, _copy_m2m_relations, _extract_field_data # noqa
  112. from .i18n import ( # noqa
  113. BootstrapTranslatableMixin,
  114. BootstrapTranslatableModel,
  115. Locale,
  116. LocaleManager,
  117. TranslatableMixin,
  118. bootstrap_translatable_model,
  119. get_translatable_models,
  120. )
  121. from .sites import Site, SiteManager, SiteRootPath # noqa
  122. from .view_restrictions import BaseViewRestriction
  123. logger = logging.getLogger("wagtail")
  124. PAGE_TEMPLATE_VAR = "page"
  125. COMMENTS_RELATION_NAME = getattr(
  126. settings, "WAGTAIL_COMMENTS_RELATION_NAME", "wagtail_admin_comments"
  127. )
  128. @receiver(pre_validate_delete, sender=Locale)
  129. def reassign_root_page_locale_on_delete(sender, instance, **kwargs):
  130. # if we're deleting the locale used on the root page node, reassign that to a new locale first
  131. root_page_with_this_locale = Page.objects.filter(depth=1, locale=instance)
  132. if root_page_with_this_locale.exists():
  133. # Select the default locale, if one exists and isn't the one being deleted
  134. try:
  135. new_locale = Locale.get_default()
  136. default_locale_is_ok = new_locale != instance
  137. except (Locale.DoesNotExist, LookupError):
  138. default_locale_is_ok = False
  139. if not default_locale_is_ok:
  140. # fall back on any remaining locale
  141. new_locale = Locale.all_objects.exclude(pk=instance.pk).first()
  142. root_page_with_this_locale.update(locale=new_locale)
  143. PAGE_MODEL_CLASSES = []
  144. def get_page_models():
  145. """
  146. Returns a list of all non-abstract Page model classes defined in this project.
  147. """
  148. return PAGE_MODEL_CLASSES
  149. def get_default_page_content_type():
  150. """
  151. Returns the content type to use as a default for pages whose content type
  152. has been deleted.
  153. """
  154. return ContentType.objects.get_for_model(Page)
  155. @functools.lru_cache(maxsize=None)
  156. def get_streamfield_names(model_class):
  157. return tuple(
  158. field.name
  159. for field in model_class._meta.concrete_fields
  160. if isinstance(field, StreamField)
  161. )
  162. class BasePageManager(models.Manager):
  163. def get_queryset(self):
  164. return self._queryset_class(self.model).order_by("path")
  165. PageManager = BasePageManager.from_queryset(PageQuerySet)
  166. class PageBase(models.base.ModelBase):
  167. """Metaclass for Page"""
  168. def __init__(cls, name, bases, dct):
  169. super(PageBase, cls).__init__(name, bases, dct)
  170. if "template" not in dct:
  171. # Define a default template path derived from the app name and model name
  172. cls.template = "%s/%s.html" % (
  173. cls._meta.app_label,
  174. camelcase_to_underscore(name),
  175. )
  176. if "ajax_template" not in dct:
  177. cls.ajax_template = None
  178. cls._clean_subpage_models = (
  179. None # to be filled in on first call to cls.clean_subpage_models
  180. )
  181. cls._clean_parent_page_models = (
  182. None # to be filled in on first call to cls.clean_parent_page_models
  183. )
  184. # All pages should be creatable unless explicitly set otherwise.
  185. # This attribute is not inheritable.
  186. if "is_creatable" not in dct:
  187. cls.is_creatable = not cls._meta.abstract
  188. if not cls._meta.abstract:
  189. # register this type in the list of page content types
  190. PAGE_MODEL_CLASSES.append(cls)
  191. class RevisionMixin(models.Model):
  192. """A mixin that allows a model to have revisions."""
  193. latest_revision = models.ForeignKey(
  194. "wagtailcore.Revision",
  195. related_name="+",
  196. verbose_name=_("latest revision"),
  197. on_delete=models.SET_NULL,
  198. null=True,
  199. blank=True,
  200. editable=False,
  201. )
  202. @property
  203. def revisions(self):
  204. """
  205. Returns revisions that belong to the object.
  206. Subclasses should define a
  207. :class:`~django.contrib.contenttypes.fields.GenericRelation` to
  208. :class:`~wagtail.models.Revision` and override this property to return
  209. that ``GenericRelation``. This allows subclasses to customise the
  210. ``related_query_name`` of the ``GenericRelation`` and add custom logic
  211. (e.g. to always use the specific instance in ``Page``).
  212. """
  213. return Revision.objects.filter(
  214. content_type=self.get_content_type(),
  215. object_id=self.pk,
  216. )
  217. def get_base_content_type(self):
  218. parents = self._meta.get_parent_list()
  219. # Get the last non-abstract parent in the MRO as the base_content_type.
  220. # Note: for_concrete_model=False means that the model can be a proxy model.
  221. if parents:
  222. return ContentType.objects.get_for_model(
  223. parents[-1], for_concrete_model=False
  224. )
  225. # This model doesn't inherit from a non-abstract model,
  226. # use it as the base_content_type.
  227. return ContentType.objects.get_for_model(self, for_concrete_model=False)
  228. def get_content_type(self):
  229. return ContentType.objects.get_for_model(self, for_concrete_model=False)
  230. def get_latest_revision(self):
  231. return self.latest_revision
  232. def get_latest_revision_as_object(self):
  233. """
  234. Returns the latest revision of the object as an instance of the model.
  235. If no latest revision exists, returns the object itself.
  236. """
  237. latest_revision = self.get_latest_revision()
  238. if latest_revision:
  239. return latest_revision.as_object()
  240. return self
  241. def serializable_data(self):
  242. try:
  243. return super().serializable_data()
  244. except AttributeError:
  245. return get_serializable_data_for_fields(self)
  246. @classmethod
  247. def from_serializable_data(cls, data, check_fks=True, strict_fks=False):
  248. try:
  249. return super().from_serializable_data(data, check_fks, strict_fks)
  250. except AttributeError:
  251. return model_from_serializable_data(
  252. cls, data, check_fks=check_fks, strict_fks=strict_fks
  253. )
  254. def with_content_json(self, content):
  255. """
  256. Returns a new version of the object with field values updated to reflect changes
  257. in the provided ``content`` (which usually comes from a previously-saved revision).
  258. Certain field values are preserved in order to prevent errors if the returned
  259. object is saved, such as ``id``. The following field values are also preserved,
  260. as they are considered to be meaningful to the object as a whole, rather than
  261. to a specific revision:
  262. * ``latest_revision``
  263. If :class:`~wagtail.models.TranslatableMixin` is applied, the following field values
  264. are also preserved:
  265. * ``translation_key``
  266. * ``locale``
  267. """
  268. obj = self.from_serializable_data(content)
  269. # This should definitely never change between revisions
  270. obj.pk = self.pk
  271. # Ensure other values that are meaningful for the object as a whole
  272. # (rather than to a specific revision) are preserved
  273. obj.latest_revision = self.latest_revision
  274. if isinstance(self, TranslatableMixin):
  275. obj.translation_key = self.translation_key
  276. obj.locale = self.locale
  277. return obj
  278. def _update_from_revision(self, revision, changed=True):
  279. self.latest_revision = revision
  280. self.save(update_fields=["latest_revision"])
  281. def save_revision(
  282. self,
  283. user=None,
  284. submitted_for_moderation=False,
  285. approved_go_live_at=None,
  286. changed=True,
  287. log_action=False,
  288. previous_revision=None,
  289. clean=True,
  290. ):
  291. """
  292. Creates and saves a revision.
  293. :param user: The user performing the action.
  294. :param submitted_for_moderation: Indicates whether the object was submitted for moderation.
  295. :param approved_go_live_at: The date and time the revision is approved to go live.
  296. :param changed: Indicates whether there were any content changes.
  297. :param log_action: Flag for logging the action. Pass ``False`` to skip logging. Can be passed an action string.
  298. Defaults to ``"wagtail.edit"`` when no ``previous_revision`` param is passed, otherwise ``"wagtail.revert"``.
  299. :param previous_revision: Indicates a revision reversal. Should be set to the previous revision instance.
  300. :type previous_revision: Revision
  301. :param clean: Set this to ``False`` to skip cleaning object content before saving this revision.
  302. :return: The newly created revision.
  303. """
  304. if clean:
  305. self.full_clean()
  306. revision = Revision.objects.create(
  307. content_object=self,
  308. base_content_type=self.get_base_content_type(),
  309. submitted_for_moderation=submitted_for_moderation,
  310. user=user,
  311. approved_go_live_at=approved_go_live_at,
  312. content=self.serializable_data(),
  313. object_str=str(self),
  314. )
  315. self._update_from_revision(revision, changed)
  316. logger.info(
  317. 'Edited: "%s" pk=%d revision_id=%d', str(self), self.pk, revision.id
  318. )
  319. if log_action:
  320. if not previous_revision:
  321. log(
  322. instance=self,
  323. action=log_action
  324. if isinstance(log_action, str)
  325. else "wagtail.edit",
  326. user=user,
  327. revision=revision,
  328. content_changed=changed,
  329. )
  330. else:
  331. log(
  332. instance=self,
  333. action=log_action
  334. if isinstance(log_action, str)
  335. else "wagtail.revert",
  336. user=user,
  337. data={
  338. "revision": {
  339. "id": previous_revision.id,
  340. "created": previous_revision.created_at.strftime(
  341. "%d %b %Y %H:%M"
  342. ),
  343. }
  344. },
  345. revision=revision,
  346. content_changed=changed,
  347. )
  348. return revision
  349. class Meta:
  350. abstract = True
  351. class DraftStateMixin(models.Model):
  352. live = models.BooleanField(verbose_name=_("live"), default=True, editable=False)
  353. has_unpublished_changes = models.BooleanField(
  354. verbose_name=_("has unpublished changes"), default=False, editable=False
  355. )
  356. first_published_at = models.DateTimeField(
  357. verbose_name=_("first published at"), blank=True, null=True, db_index=True
  358. )
  359. last_published_at = models.DateTimeField(
  360. verbose_name=_("last published at"), null=True, editable=False
  361. )
  362. live_revision = models.ForeignKey(
  363. "wagtailcore.Revision",
  364. related_name="+",
  365. verbose_name=_("live revision"),
  366. on_delete=models.SET_NULL,
  367. null=True,
  368. blank=True,
  369. editable=False,
  370. )
  371. go_live_at = models.DateTimeField(
  372. verbose_name=_("go live date/time"), blank=True, null=True
  373. )
  374. expire_at = models.DateTimeField(
  375. verbose_name=_("expiry date/time"), blank=True, null=True
  376. )
  377. expired = models.BooleanField(
  378. verbose_name=_("expired"), default=False, editable=False
  379. )
  380. class Meta:
  381. abstract = True
  382. @classmethod
  383. def check(cls, **kwargs):
  384. return [
  385. *super().check(**kwargs),
  386. *cls._check_revision_mixin(),
  387. ]
  388. @classmethod
  389. def _check_revision_mixin(cls):
  390. mro = cls.mro()
  391. error = checks.Error(
  392. "DraftStateMixin requires RevisionMixin to be applied after DraftStateMixin.",
  393. hint="Add RevisionMixin to the model's base classes after DraftStateMixin.",
  394. obj=cls,
  395. id="wagtailcore.E004",
  396. )
  397. try:
  398. if mro.index(RevisionMixin) < mro.index(DraftStateMixin):
  399. return [error]
  400. except ValueError:
  401. return [error]
  402. return []
  403. @property
  404. def approved_schedule(self):
  405. return self.scheduled_revision is not None
  406. @property
  407. def status_string(self):
  408. if not self.live:
  409. if self.expired:
  410. return _("expired")
  411. elif self.approved_schedule:
  412. return _("scheduled")
  413. else:
  414. return _("draft")
  415. else:
  416. if self.approved_schedule:
  417. return _("live + scheduled")
  418. elif self.has_unpublished_changes:
  419. return _("live + draft")
  420. else:
  421. return _("live")
  422. def publish(
  423. self, revision, user=None, changed=True, log_action=True, previous_revision=None
  424. ):
  425. """
  426. Publish a revision of the object by applying the changes in the revision to the live object.
  427. :param revision: Revision to publish.
  428. :type revision: Revision
  429. :param user: The publishing user.
  430. :param changed: Indicated whether content has changed.
  431. :param log_action: Flag for the logging action, pass ``False`` to skip logging.
  432. :param previous_revision: Indicates a revision reversal. Should be set to the previous revision instance.
  433. :type previous_revision: Revision
  434. """
  435. return PublishRevisionAction(
  436. revision,
  437. user=user,
  438. changed=changed,
  439. log_action=log_action,
  440. previous_revision=previous_revision,
  441. ).execute()
  442. def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
  443. """
  444. Unpublish the live object.
  445. :param set_expired: Mark the object as expired.
  446. :param commit: Commit the changes to the database.
  447. :param user: The unpublishing user.
  448. :param log_action: Flag for the logging action, pass ``False`` to skip logging.
  449. """
  450. return UnpublishAction(
  451. self,
  452. set_expired=set_expired,
  453. commit=commit,
  454. user=user,
  455. log_action=log_action,
  456. ).execute()
  457. def with_content_json(self, content):
  458. """
  459. Similar to :meth:`RevisionMixin.with_content_json`,
  460. but with the following fields also preserved:
  461. * ``live``
  462. * ``has_unpublished_changes``
  463. * ``first_published_at``
  464. """
  465. obj = super().with_content_json(content)
  466. # Ensure other values that are meaningful for the object as a whole (rather than
  467. # to a specific revision) are preserved
  468. obj.live = self.live
  469. obj.has_unpublished_changes = self.has_unpublished_changes
  470. obj.first_published_at = self.first_published_at
  471. return obj
  472. def get_latest_revision_as_object(self):
  473. if not self.has_unpublished_changes:
  474. # Use the live database copy in preference to the revision record, as:
  475. # 1) this will pick up any changes that have been made directly to the model,
  476. # such as automated data imports;
  477. # 2) it ensures that inline child objects pick up real database IDs even if
  478. # those are absent from the revision data. (If this wasn't the case, the child
  479. # objects would be recreated with new IDs on next publish - see #1853)
  480. return self
  481. latest_revision = self.get_latest_revision()
  482. if latest_revision:
  483. return latest_revision.as_object()
  484. else:
  485. return self
  486. @cached_property
  487. def scheduled_revision(self):
  488. return self.revisions.filter(approved_go_live_at__isnull=False).first()
  489. def get_scheduled_revision_as_object(self):
  490. scheduled_revision = self.scheduled_revision
  491. return scheduled_revision and scheduled_revision.as_object()
  492. def _update_from_revision(self, revision, changed=True):
  493. update_fields = ["latest_revision"]
  494. self.latest_revision = revision
  495. if changed:
  496. self.has_unpublished_changes = True
  497. update_fields.append("has_unpublished_changes")
  498. self.save(update_fields=update_fields)
  499. class PreviewableMixin:
  500. """A mixin that allows a model to have previews."""
  501. def make_preview_request(
  502. self, original_request=None, preview_mode=None, extra_request_attrs=None
  503. ):
  504. """
  505. Simulate a request to this object, by constructing a fake HttpRequest object that is (as far
  506. as possible) representative of a real request to this object's front-end URL, and invoking
  507. serve_preview with that request (and the given preview_mode).
  508. Used for previewing / moderation and any other place where we
  509. want to display a view of this object in the admin interface without going through the regular
  510. page routing logic.
  511. If you pass in a real request object as original_request, additional information (e.g. client IP, cookies)
  512. will be included in the dummy request.
  513. """
  514. dummy_meta = self._get_dummy_headers(original_request)
  515. request = WSGIRequest(dummy_meta)
  516. # Add a flag to let middleware know that this is a dummy request.
  517. request.is_dummy = True
  518. if extra_request_attrs:
  519. for k, v in extra_request_attrs.items():
  520. setattr(request, k, v)
  521. obj = self
  522. # Build a custom django.core.handlers.BaseHandler subclass that invokes serve_preview as
  523. # the eventual view function called at the end of the middleware chain, rather than going
  524. # through the URL resolver
  525. class Handler(BaseHandler):
  526. def _get_response(self, request):
  527. request.is_preview = True
  528. request.preview_mode = preview_mode
  529. response = obj.serve_preview(request, preview_mode)
  530. if hasattr(response, "render") and callable(response.render):
  531. response = response.render()
  532. patch_cache_control(response, private=True)
  533. return response
  534. # Invoke this custom handler.
  535. handler = Handler()
  536. handler.load_middleware()
  537. return handler.get_response(request)
  538. def _get_dummy_headers(self, original_request=None):
  539. """
  540. Return a dict of META information to be included in a faked HttpRequest object to pass to
  541. serve_preview.
  542. """
  543. url = self._get_dummy_header_url(original_request)
  544. if url:
  545. url_info = urlparse(url)
  546. hostname = url_info.hostname
  547. path = url_info.path
  548. port = url_info.port or (443 if url_info.scheme == "https" else 80)
  549. scheme = url_info.scheme
  550. else:
  551. # Cannot determine a URL to this object - cobble one together based on
  552. # whatever we find in ALLOWED_HOSTS
  553. try:
  554. hostname = settings.ALLOWED_HOSTS[0]
  555. if hostname == "*":
  556. # '*' is a valid value to find in ALLOWED_HOSTS[0], but it's not a valid domain name.
  557. # So we pretend it isn't there.
  558. raise IndexError
  559. except IndexError:
  560. hostname = "localhost"
  561. path = "/"
  562. port = 80
  563. scheme = "http"
  564. http_host = hostname
  565. if port != (443 if scheme == "https" else 80):
  566. http_host = "%s:%s" % (http_host, port)
  567. dummy_values = {
  568. "REQUEST_METHOD": "GET",
  569. "PATH_INFO": path,
  570. "SERVER_NAME": hostname,
  571. "SERVER_PORT": port,
  572. "SERVER_PROTOCOL": "HTTP/1.1",
  573. "HTTP_HOST": http_host,
  574. "wsgi.version": (1, 0),
  575. "wsgi.input": StringIO(),
  576. "wsgi.errors": StringIO(),
  577. "wsgi.url_scheme": scheme,
  578. "wsgi.multithread": True,
  579. "wsgi.multiprocess": True,
  580. "wsgi.run_once": False,
  581. }
  582. # Add important values from the original request object, if it was provided.
  583. HEADERS_FROM_ORIGINAL_REQUEST = [
  584. "REMOTE_ADDR",
  585. "HTTP_X_FORWARDED_FOR",
  586. "HTTP_COOKIE",
  587. "HTTP_USER_AGENT",
  588. "HTTP_AUTHORIZATION",
  589. "wsgi.version",
  590. "wsgi.multithread",
  591. "wsgi.multiprocess",
  592. "wsgi.run_once",
  593. ]
  594. if settings.SECURE_PROXY_SSL_HEADER:
  595. HEADERS_FROM_ORIGINAL_REQUEST.append(settings.SECURE_PROXY_SSL_HEADER[0])
  596. if original_request:
  597. for header in HEADERS_FROM_ORIGINAL_REQUEST:
  598. if header in original_request.META:
  599. dummy_values[header] = original_request.META[header]
  600. return dummy_values
  601. def _get_dummy_header_url(self, original_request=None):
  602. """
  603. Return the URL that _get_dummy_headers() should use to set META headers
  604. for the faked HttpRequest.
  605. """
  606. return self.full_url
  607. def get_full_url(self):
  608. return None
  609. full_url = property(get_full_url)
  610. DEFAULT_PREVIEW_MODES = [("", _("Default"))]
  611. @property
  612. def preview_modes(self):
  613. """
  614. A list of ``(internal_name, display_name)`` tuples for the modes in which
  615. this object can be displayed for preview/moderation purposes. Ordinarily an object
  616. will only have one display mode, but subclasses can override this -
  617. for example, a page containing a form might have a default view of the form,
  618. and a post-submission 'thank you' page.
  619. Set to ``[]`` to completely disable previewing for this model.
  620. """
  621. return PreviewableMixin.DEFAULT_PREVIEW_MODES
  622. @property
  623. def default_preview_mode(self):
  624. """
  625. The default preview mode to use in live preview.
  626. This default is also used in areas that do not give the user the option of selecting a
  627. mode explicitly, e.g. in the moderator approval workflow.
  628. If ``preview_modes`` is empty, an ``IndexError`` will be raised.
  629. """
  630. return self.preview_modes[0][0]
  631. def is_previewable(self):
  632. """Returns ``True`` if at least one preview mode is specified in ``preview_modes``."""
  633. return bool(self.preview_modes)
  634. def serve_preview(self, request, mode_name):
  635. """
  636. Returns an HTTP response for use in object previews.
  637. This method can be overridden to implement custom rendering and/or
  638. routing logic.
  639. Any templates rendered during this process should use the ``request``
  640. object passed here - this ensures that ``request.user`` and other
  641. properties are set appropriately for the wagtail user bar to be
  642. displayed/hidden. This request will always be a GET.
  643. """
  644. return TemplateResponse(
  645. request,
  646. self.get_preview_template(request, mode_name),
  647. self.get_preview_context(request, mode_name),
  648. )
  649. def get_preview_context(self, request, mode_name):
  650. """
  651. Returns a context dictionary for use in templates for previewing this object.
  652. """
  653. return {"object": self, "request": request}
  654. def get_preview_template(self, request, mode_name):
  655. """
  656. Returns a template to be used when previewing this object.
  657. Subclasses of ``PreviewableMixin`` must override this method to return the
  658. template name to be used in the preview. Alternatively, subclasses can also
  659. override the ``serve_preview`` method to completely customise the preview
  660. rendering logic.
  661. """
  662. raise ImproperlyConfigured(
  663. "%s (subclass of PreviewableMixin) must override get_preview_template or serve_preview"
  664. % type(self).__name__
  665. )
  666. class LockableMixin(models.Model):
  667. locked = models.BooleanField(
  668. verbose_name=_("locked"), default=False, editable=False
  669. )
  670. locked_at = models.DateTimeField(
  671. verbose_name=_("locked at"), null=True, editable=False
  672. )
  673. locked_by = models.ForeignKey(
  674. settings.AUTH_USER_MODEL,
  675. verbose_name=_("locked by"),
  676. null=True,
  677. blank=True,
  678. editable=False,
  679. on_delete=models.SET_NULL,
  680. related_name="locked_pages",
  681. )
  682. locked_by.wagtail_reference_index_ignore = True
  683. class Meta:
  684. abstract = True
  685. def with_content_json(self, content):
  686. """
  687. Returns a new version of the object with field values updated to reflect changes
  688. in the provided ``content`` (which usually comes from a previously-saved revision).
  689. Certain field values are preserved in order to prevent errors if the returned
  690. object is saved, such as ``id``. The following field values are also preserved,
  691. as they are considered to be meaningful to the object as a whole, rather than
  692. to a specific revision:
  693. * ``locked``
  694. * ``locked_at``
  695. * ``locked_by``
  696. """
  697. obj = super().with_content_json(content)
  698. # Ensure other values that are meaningful for the object as a whole (rather than
  699. # to a specific revision) are preserved
  700. obj.locked = self.locked
  701. obj.locked_at = self.locked_at
  702. obj.locked_by = self.locked_by
  703. return obj
  704. def get_lock(self):
  705. """
  706. Returns a sub-class of BaseLock if the instance is locked, otherwise None
  707. """
  708. if self.locked:
  709. return BasicLock(self)
  710. if isinstance(self, DraftStateMixin) and self.scheduled_revision:
  711. return ScheduledForPublishLock(self)
  712. class AbstractPage(
  713. LockableMixin,
  714. PreviewableMixin,
  715. DraftStateMixin,
  716. RevisionMixin,
  717. TranslatableMixin,
  718. MP_Node,
  719. ):
  720. """
  721. Abstract superclass for Page. According to Django's inheritance rules, managers set on
  722. abstract models are inherited by subclasses, but managers set on concrete models that are extended
  723. via multi-table inheritance are not. We therefore need to attach PageManager to an abstract
  724. superclass to ensure that it is retained by subclasses of Page.
  725. """
  726. objects = PageManager()
  727. class Meta:
  728. abstract = True
  729. class Page(AbstractPage, index.Indexed, ClusterableModel, metaclass=PageBase):
  730. title = models.CharField(
  731. verbose_name=_("title"),
  732. max_length=255,
  733. help_text=_("The page title as you'd like it to be seen by the public"),
  734. )
  735. # to reflect title of a current draft in the admin UI
  736. draft_title = models.CharField(max_length=255, editable=False)
  737. slug = models.SlugField(
  738. verbose_name=_("slug"),
  739. allow_unicode=True,
  740. max_length=255,
  741. help_text=_(
  742. "The name of the page as it will appear in URLs e.g http://domain.com/blog/[my-slug]/"
  743. ),
  744. )
  745. content_type = models.ForeignKey(
  746. ContentType,
  747. verbose_name=_("content type"),
  748. related_name="pages",
  749. on_delete=models.SET(get_default_page_content_type),
  750. )
  751. content_type.wagtail_reference_index_ignore = True
  752. url_path = models.TextField(verbose_name=_("URL path"), blank=True, editable=False)
  753. owner = models.ForeignKey(
  754. settings.AUTH_USER_MODEL,
  755. verbose_name=_("owner"),
  756. null=True,
  757. blank=True,
  758. editable=True,
  759. on_delete=models.SET_NULL,
  760. related_name="owned_pages",
  761. )
  762. owner.wagtail_reference_index_ignore = True
  763. seo_title = models.CharField(
  764. verbose_name=_("title tag"),
  765. max_length=255,
  766. blank=True,
  767. help_text=_(
  768. "The name of the page displayed on search engine results as the clickable headline."
  769. ),
  770. )
  771. show_in_menus_default = False
  772. show_in_menus = models.BooleanField(
  773. verbose_name=_("show in menus"),
  774. default=False,
  775. help_text=_(
  776. "Whether a link to this page will appear in automatically generated menus"
  777. ),
  778. )
  779. search_description = models.TextField(
  780. verbose_name=_("meta description"),
  781. blank=True,
  782. help_text=_(
  783. "The descriptive text displayed underneath a headline in search engine results."
  784. ),
  785. )
  786. latest_revision_created_at = models.DateTimeField(
  787. verbose_name=_("latest revision created at"), null=True, editable=False
  788. )
  789. _revisions = GenericRelation("wagtailcore.Revision", related_query_name="page")
  790. # If non-null, this page is an alias of the linked page
  791. # This means the page is kept in sync with the live version
  792. # of the linked pages and is not editable by users.
  793. alias_of = models.ForeignKey(
  794. "self",
  795. on_delete=models.SET_NULL,
  796. null=True,
  797. blank=True,
  798. editable=False,
  799. related_name="aliases",
  800. )
  801. alias_of.wagtail_reference_index_ignore = True
  802. search_fields = [
  803. index.SearchField("title", partial_match=True, boost=2),
  804. index.AutocompleteField("title"),
  805. index.FilterField("title"),
  806. index.FilterField("id"),
  807. index.FilterField("live"),
  808. index.FilterField("owner"),
  809. index.FilterField("content_type"),
  810. index.FilterField("path"),
  811. index.FilterField("depth"),
  812. index.FilterField("locked"),
  813. index.FilterField("show_in_menus"),
  814. index.FilterField("first_published_at"),
  815. index.FilterField("last_published_at"),
  816. index.FilterField("latest_revision_created_at"),
  817. index.FilterField("locale"),
  818. index.FilterField("translation_key"),
  819. ]
  820. # Do not allow plain Page instances to be created through the Wagtail admin
  821. is_creatable = False
  822. # Define the maximum number of instances this page type can have. Default to unlimited.
  823. max_count = None
  824. # Define the maximum number of instances this page can have under a specific parent. Default to unlimited.
  825. max_count_per_parent = None
  826. # An array of additional field names that will not be included when a Page is copied.
  827. exclude_fields_in_copy = []
  828. default_exclude_fields_in_copy = [
  829. "id",
  830. "path",
  831. "depth",
  832. "numchild",
  833. "url_path",
  834. "path",
  835. "postgres_index_entries",
  836. "index_entries",
  837. "latest_revision",
  838. COMMENTS_RELATION_NAME,
  839. ]
  840. # Define these attributes early to avoid masking errors. (Issue #3078)
  841. # The canonical definition is in wagtailadmin.panels.
  842. content_panels = []
  843. promote_panels = []
  844. settings_panels = []
  845. def __init__(self, *args, **kwargs):
  846. super().__init__(*args, **kwargs)
  847. if not self.id:
  848. # this model is being newly created
  849. # rather than retrieved from the db;
  850. if not self.content_type_id:
  851. # set content type to correctly represent the model class
  852. # that this was created as
  853. self.content_type = ContentType.objects.get_for_model(self)
  854. if "show_in_menus" not in kwargs:
  855. # if the value is not set on submit refer to the model setting
  856. self.show_in_menus = self.show_in_menus_default
  857. def __str__(self):
  858. return self.title
  859. @property
  860. def revisions(self):
  861. # Always use the specific page instance when querying for revisions as
  862. # they are always saved with the specific content_type.
  863. return self.specific_deferred._revisions
  864. def get_base_content_type(self):
  865. # We want to always use the default Page model's ContentType as the
  866. # base_content_type so that we can query for page revisions without
  867. # having to know the specific Page type.
  868. return get_default_page_content_type()
  869. def get_content_type(self):
  870. return self.content_type
  871. @classmethod
  872. def get_streamfield_names(cls):
  873. return get_streamfield_names(cls)
  874. def set_url_path(self, parent):
  875. """
  876. Populate the url_path field based on this page's slug and the specified parent page.
  877. (We pass a parent in here, rather than retrieving it via get_parent, so that we can give
  878. new unsaved pages a meaningful URL when previewing them; at that point the page has not
  879. been assigned a position in the tree, as far as treebeard is concerned.
  880. """
  881. if parent:
  882. self.url_path = parent.url_path + self.slug + "/"
  883. else:
  884. # a page without a parent is the tree root, which always has a url_path of '/'
  885. self.url_path = "/"
  886. return self.url_path
  887. @staticmethod
  888. def _slug_is_available(slug, parent_page, page=None):
  889. """
  890. Determine whether the given slug is available for use on a child page of
  891. parent_page. If 'page' is passed, the slug is intended for use on that page
  892. (and so it will be excluded from the duplicate check).
  893. """
  894. if parent_page is None:
  895. # the root page's slug can be whatever it likes...
  896. return True
  897. siblings = parent_page.get_children()
  898. if page:
  899. siblings = siblings.not_page(page)
  900. return not siblings.filter(slug=slug).exists()
  901. def _get_autogenerated_slug(self, base_slug):
  902. candidate_slug = base_slug
  903. suffix = 1
  904. parent_page = self.get_parent()
  905. while not Page._slug_is_available(candidate_slug, parent_page, self):
  906. # try with incrementing suffix until we find a slug which is available
  907. suffix += 1
  908. candidate_slug = "%s-%d" % (base_slug, suffix)
  909. return candidate_slug
  910. def get_default_locale(self):
  911. """
  912. Finds the default locale to use for this page.
  913. This will be called just before the initial save.
  914. """
  915. parent = self.get_parent()
  916. if parent is not None:
  917. return (
  918. parent.specific_class.objects.defer()
  919. .select_related("locale")
  920. .get(id=parent.id)
  921. .locale
  922. )
  923. return super().get_default_locale()
  924. def full_clean(self, *args, **kwargs):
  925. # Apply fixups that need to happen before per-field validation occurs
  926. if not self.slug:
  927. # Try to auto-populate slug from title
  928. allow_unicode = getattr(settings, "WAGTAIL_ALLOW_UNICODE_SLUGS", True)
  929. base_slug = slugify(self.title, allow_unicode=allow_unicode)
  930. # only proceed if we get a non-empty base slug back from slugify
  931. if base_slug:
  932. self.slug = self._get_autogenerated_slug(base_slug)
  933. if not self.draft_title:
  934. self.draft_title = self.title
  935. # Set the locale
  936. if self.locale_id is None:
  937. self.locale = self.get_default_locale()
  938. super().full_clean(*args, **kwargs)
  939. def clean(self):
  940. super().clean()
  941. if not Page._slug_is_available(self.slug, self.get_parent(), self):
  942. raise ValidationError({"slug": _("This slug is already in use")})
  943. def is_site_root(self):
  944. """
  945. Returns True if this page is the root of any site.
  946. This includes translations of site root pages as well.
  947. """
  948. # `_is_site_root` may be populated by `annotate_site_root_state` on `PageQuerySet` as a
  949. # performance optimisation
  950. if hasattr(self, "_is_site_root"):
  951. return self._is_site_root
  952. return Site.objects.filter(
  953. root_page__translation_key=self.translation_key
  954. ).exists()
  955. @transaction.atomic
  956. # ensure that changes are only committed when we have updated all descendant URL paths, to preserve consistency
  957. def save(self, clean=True, user=None, log_action=False, **kwargs):
  958. """
  959. Overrides default method behaviour to make additional updates unique to pages,
  960. such as updating the ``url_path`` value of descendant page to reflect changes
  961. to this page's slug.
  962. New pages should generally be saved via the ``add_child()`` or ``add_sibling()``
  963. method of an existing page, which will correctly set the ``path`` and ``depth``
  964. fields on the new page before saving it.
  965. By default, pages are validated using ``full_clean()`` before attempting to
  966. save changes to the database, which helps to preserve validity when restoring
  967. pages from historic revisions (which might not necessarily reflect the current
  968. model state). This validation step can be bypassed by calling the method with
  969. ``clean=False``.
  970. """
  971. if clean:
  972. self.full_clean()
  973. slug_changed = False
  974. is_new = self.id is None
  975. if is_new:
  976. # we are creating a record. If we're doing things properly, this should happen
  977. # through a treebeard method like add_child, in which case the 'path' field
  978. # has been set and so we can safely call get_parent
  979. self.set_url_path(self.get_parent())
  980. else:
  981. # Check that we are committing the slug to the database
  982. # Basically: If update_fields has been specified, and slug is not included, skip this step
  983. if not (
  984. "update_fields" in kwargs and "slug" not in kwargs["update_fields"]
  985. ):
  986. # see if the slug has changed from the record in the db, in which case we need to
  987. # update url_path of self and all descendants. Even though we might not need it,
  988. # the specific page is fetched here for sending to the 'page_slug_changed' signal.
  989. old_record = Page.objects.get(id=self.id).specific
  990. if old_record.slug != self.slug:
  991. self.set_url_path(self.get_parent())
  992. slug_changed = True
  993. old_url_path = old_record.url_path
  994. new_url_path = self.url_path
  995. result = super().save(**kwargs)
  996. if slug_changed:
  997. self._update_descendant_url_paths(old_url_path, new_url_path)
  998. # Emit page_slug_changed signal on successful db commit
  999. transaction.on_commit(
  1000. lambda: page_slug_changed.send(
  1001. sender=self.specific_class or self.__class__,
  1002. instance=self.specific,
  1003. instance_before=old_record,
  1004. )
  1005. )
  1006. # Check if this is a root page of any sites and clear the 'wagtail_site_root_paths' key if so
  1007. # Note: New translations of existing site roots are considered site roots as well, so we must
  1008. # always check if this page is a site root, even if it's new.
  1009. if self.is_site_root():
  1010. cache.delete("wagtail_site_root_paths")
  1011. # Log
  1012. if is_new:
  1013. cls = type(self)
  1014. logger.info(
  1015. 'Page created: "%s" id=%d content_type=%s.%s path=%s',
  1016. self.title,
  1017. self.id,
  1018. cls._meta.app_label,
  1019. cls.__name__,
  1020. self.url_path,
  1021. )
  1022. if log_action is not None:
  1023. # The default for log_action is False. i.e. don't log unless specifically instructed
  1024. # Page creation is a special case that we want logged by default, but allow skipping it
  1025. # explicitly by passing log_action=None
  1026. if is_new:
  1027. log(
  1028. instance=self,
  1029. action="wagtail.create",
  1030. user=user or self.owner,
  1031. content_changed=True,
  1032. )
  1033. elif log_action:
  1034. log(instance=self, action=log_action, user=user)
  1035. return result
  1036. def delete(self, *args, **kwargs):
  1037. user = kwargs.pop("user", None)
  1038. return DeletePageAction(self, user=user).execute(*args, **kwargs)
  1039. @classmethod
  1040. def check(cls, **kwargs):
  1041. errors = super(Page, cls).check(**kwargs)
  1042. # Check that foreign keys from pages are not configured to cascade
  1043. # This is the default Django behaviour which must be explicitly overridden
  1044. # to prevent pages disappearing unexpectedly and the tree being corrupted
  1045. # get names of foreign keys pointing to parent classes (such as page_ptr)
  1046. field_exceptions = [
  1047. field.name
  1048. for model in [cls] + list(cls._meta.get_parent_list())
  1049. for field in model._meta.parents.values()
  1050. if field
  1051. ]
  1052. for field in cls._meta.fields:
  1053. if (
  1054. isinstance(field, models.ForeignKey)
  1055. and field.name not in field_exceptions
  1056. ):
  1057. if field.remote_field.on_delete == models.CASCADE:
  1058. errors.append(
  1059. checks.Warning(
  1060. "Field hasn't specified on_delete action",
  1061. hint="Set on_delete=models.SET_NULL and make sure the field is nullable or set on_delete=models.PROTECT. Wagtail does not allow simple database CASCADE because it will corrupt its tree storage.",
  1062. obj=field,
  1063. id="wagtailcore.W001",
  1064. )
  1065. )
  1066. if not isinstance(cls.objects, PageManager):
  1067. errors.append(
  1068. checks.Error(
  1069. "Manager does not inherit from PageManager",
  1070. hint="Ensure that custom Page managers inherit from wagtail.models.PageManager",
  1071. obj=cls,
  1072. id="wagtailcore.E002",
  1073. )
  1074. )
  1075. try:
  1076. cls.clean_subpage_models()
  1077. except (ValueError, LookupError) as e:
  1078. errors.append(
  1079. checks.Error(
  1080. "Invalid subpage_types setting for %s" % cls,
  1081. hint=str(e),
  1082. id="wagtailcore.E002",
  1083. )
  1084. )
  1085. try:
  1086. cls.clean_parent_page_models()
  1087. except (ValueError, LookupError) as e:
  1088. errors.append(
  1089. checks.Error(
  1090. "Invalid parent_page_types setting for %s" % cls,
  1091. hint=str(e),
  1092. id="wagtailcore.E002",
  1093. )
  1094. )
  1095. return errors
  1096. def _update_descendant_url_paths(self, old_url_path, new_url_path):
  1097. (
  1098. Page.objects.filter(path__startswith=self.path)
  1099. .exclude(pk=self.pk)
  1100. .update(
  1101. url_path=Concat(
  1102. Value(new_url_path), Substr("url_path", len(old_url_path) + 1)
  1103. )
  1104. )
  1105. )
  1106. def get_specific(self, deferred=False, copy_attrs=None, copy_attrs_exclude=None):
  1107. """
  1108. Return this page in its most specific subclassed form.
  1109. By default, a database query is made to fetch all field values for the
  1110. specific object. If you only require access to custom methods or other
  1111. non-field attributes on the specific object, you can use
  1112. ``deferred=True`` to avoid this query. However, any attempts to access
  1113. specific field values from the returned object will trigger additional
  1114. database queries.
  1115. By default, references to all non-field attribute values are copied
  1116. from current object to the returned one. This includes:
  1117. * Values set by a queryset, for example: annotations, or values set as
  1118. a result of using ``select_related()`` or ``prefetch_related()``.
  1119. * Any ``cached_property`` values that have been evaluated.
  1120. * Attributes set elsewhere in Python code.
  1121. For fine-grained control over which non-field values are copied to the
  1122. returned object, you can use ``copy_attrs`` to specify a complete list
  1123. of attribute names to include. Alternatively, you can use
  1124. ``copy_attrs_exclude`` to specify a list of attribute names to exclude.
  1125. If called on a page object that is already an instance of the most
  1126. specific class (e.g. an ``EventPage``), the object will be returned
  1127. as is, and no database queries or other operations will be triggered.
  1128. If the page was originally created using a page type that has since
  1129. been removed from the codebase, a generic ``Page`` object will be
  1130. returned (without any custom field values or other functionality
  1131. present on the original class). Usually, deleting these pages is the
  1132. best course of action, but there is currently no safe way for Wagtail
  1133. to do that at migration time.
  1134. """
  1135. model_class = self.specific_class
  1136. if model_class is None:
  1137. # The codebase and database are out of sync (e.g. the model exists
  1138. # on a different git branch and migrations were not applied or
  1139. # reverted before switching branches). So, the best we can do is
  1140. # return the page in it's current form.
  1141. return self
  1142. if isinstance(self, model_class):
  1143. # self is already an instance of the most specific class.
  1144. return self
  1145. if deferred:
  1146. # Generate a tuple of values in the order expected by __init__(),
  1147. # with missing values substituted with DEFERRED ()
  1148. values = tuple(
  1149. getattr(self, f.attname, self.pk if f.primary_key else DEFERRED)
  1150. for f in model_class._meta.concrete_fields
  1151. )
  1152. # Create object from known attribute values
  1153. specific_obj = model_class(*values)
  1154. specific_obj._state.adding = self._state.adding
  1155. else:
  1156. # Fetch object from database
  1157. specific_obj = model_class._default_manager.get(id=self.id)
  1158. # Copy non-field attribute values
  1159. if copy_attrs is not None:
  1160. for attr in (attr for attr in copy_attrs if attr in self.__dict__):
  1161. setattr(specific_obj, attr, getattr(self, attr))
  1162. else:
  1163. exclude = copy_attrs_exclude or ()
  1164. for k, v in ((k, v) for k, v in self.__dict__.items() if k not in exclude):
  1165. # only set values that haven't already been set
  1166. specific_obj.__dict__.setdefault(k, v)
  1167. return specific_obj
  1168. @cached_property
  1169. def specific(self):
  1170. """
  1171. Returns this page in its most specific subclassed form with all field
  1172. values fetched from the database. The result is cached in memory.
  1173. """
  1174. return self.get_specific()
  1175. @cached_property
  1176. def specific_deferred(self):
  1177. """
  1178. Returns this page in its most specific subclassed form without any
  1179. additional field values being fetched from the database. The result
  1180. is cached in memory.
  1181. """
  1182. return self.get_specific(deferred=True)
  1183. @cached_property
  1184. def specific_class(self):
  1185. """
  1186. Return the class that this page would be if instantiated in its
  1187. most specific form.
  1188. If the model class can no longer be found in the codebase, and the
  1189. relevant ``ContentType`` has been removed by a database migration,
  1190. the return value will be ``None``.
  1191. If the model class can no longer be found in the codebase, but the
  1192. relevant ``ContentType`` is still present in the database (usually a
  1193. result of switching between git branches without running or reverting
  1194. database migrations beforehand), the return value will be ``None``.
  1195. """
  1196. return self.cached_content_type.model_class()
  1197. @property
  1198. def cached_content_type(self):
  1199. """
  1200. Return this page's ``content_type`` value from the ``ContentType``
  1201. model's cached manager, which will avoid a database query if the
  1202. object is already in memory.
  1203. """
  1204. return ContentType.objects.get_for_id(self.content_type_id)
  1205. @property
  1206. def page_type_display_name(self):
  1207. """
  1208. A human-readable version of this page's type
  1209. """
  1210. if not self.specific_class or self.is_root():
  1211. return ""
  1212. else:
  1213. return self.specific_class.get_verbose_name()
  1214. def route(self, request, path_components):
  1215. if path_components:
  1216. # request is for a child of this page
  1217. child_slug = path_components[0]
  1218. remaining_components = path_components[1:]
  1219. try:
  1220. subpage = self.get_children().get(slug=child_slug)
  1221. except Page.DoesNotExist:
  1222. raise Http404
  1223. return subpage.specific.route(request, remaining_components)
  1224. else:
  1225. # request is for this very page
  1226. if self.live:
  1227. return RouteResult(self)
  1228. else:
  1229. raise Http404
  1230. def get_admin_display_title(self):
  1231. """
  1232. Return the title for this page as it should appear in the admin backend;
  1233. override this if you wish to display extra contextual information about the page,
  1234. such as language. By default, returns ``draft_title``.
  1235. """
  1236. # Fall back on title if draft_title is blank (which may happen if the page was created
  1237. # in a fixture or migration that didn't explicitly handle draft_title)
  1238. return self.draft_title or self.title
  1239. def save_revision(
  1240. self,
  1241. user=None,
  1242. submitted_for_moderation=False,
  1243. approved_go_live_at=None,
  1244. changed=True,
  1245. log_action=False,
  1246. previous_revision=None,
  1247. clean=True,
  1248. ):
  1249. # Raise error if this is not the specific version of the page
  1250. if not isinstance(self, self.specific_class):
  1251. raise RuntimeError(
  1252. "page.save_revision() must be called on the specific version of the page. "
  1253. "Call page.specific.save_revision() instead."
  1254. )
  1255. # Raise an error if this page is an alias.
  1256. if self.alias_of_id:
  1257. raise RuntimeError(
  1258. "page.save_revision() was called on an alias page. "
  1259. "Revisions are not required for alias pages as they are an exact copy of another page."
  1260. )
  1261. if clean:
  1262. self.full_clean()
  1263. new_comments = getattr(self, COMMENTS_RELATION_NAME).filter(pk__isnull=True)
  1264. for comment in new_comments:
  1265. # We need to ensure comments have an id in the revision, so positions can be identified correctly
  1266. comment.save()
  1267. revision = Revision.objects.create(
  1268. content_object=self,
  1269. base_content_type=self.get_base_content_type(),
  1270. submitted_for_moderation=submitted_for_moderation,
  1271. user=user,
  1272. approved_go_live_at=approved_go_live_at,
  1273. content=self.serializable_data(),
  1274. object_str=str(self),
  1275. )
  1276. for comment in new_comments:
  1277. comment.revision_created = revision
  1278. self.latest_revision_created_at = revision.created_at
  1279. self.draft_title = self.title
  1280. self.latest_revision = revision
  1281. update_fields = [
  1282. COMMENTS_RELATION_NAME,
  1283. "latest_revision_created_at",
  1284. "draft_title",
  1285. "latest_revision",
  1286. ]
  1287. if changed:
  1288. self.has_unpublished_changes = True
  1289. update_fields.append("has_unpublished_changes")
  1290. if update_fields:
  1291. # clean=False because the fields we're updating don't need validation
  1292. self.save(update_fields=update_fields, clean=False)
  1293. # Log
  1294. logger.info(
  1295. 'Page edited: "%s" id=%d revision_id=%d', self.title, self.id, revision.id
  1296. )
  1297. if log_action:
  1298. if not previous_revision:
  1299. log(
  1300. instance=self,
  1301. action=log_action
  1302. if isinstance(log_action, str)
  1303. else "wagtail.edit",
  1304. user=user,
  1305. revision=revision,
  1306. content_changed=changed,
  1307. )
  1308. else:
  1309. log(
  1310. instance=self,
  1311. action=log_action
  1312. if isinstance(log_action, str)
  1313. else "wagtail.revert",
  1314. user=user,
  1315. data={
  1316. "revision": {
  1317. "id": previous_revision.id,
  1318. "created": previous_revision.created_at.strftime(
  1319. "%d %b %Y %H:%M"
  1320. ),
  1321. }
  1322. },
  1323. revision=revision,
  1324. content_changed=changed,
  1325. )
  1326. if submitted_for_moderation:
  1327. logger.info(
  1328. 'Page submitted for moderation: "%s" id=%d revision_id=%d',
  1329. self.title,
  1330. self.id,
  1331. revision.id,
  1332. )
  1333. return revision
  1334. def get_latest_revision_as_page(self):
  1335. warnings.warn(
  1336. "Pages should use .get_latest_revision_as_object() instead of "
  1337. ".get_latest_revision_as_page() to retrieve the latest revision as a "
  1338. "Page instance.",
  1339. category=RemovedInWagtail50Warning,
  1340. stacklevel=2,
  1341. )
  1342. return self.get_latest_revision_as_object()
  1343. def get_latest_revision_as_object(self):
  1344. if not self.has_unpublished_changes:
  1345. # Use the live database copy in preference to the revision record, as:
  1346. # 1) this will pick up any changes that have been made directly to the model,
  1347. # such as automated data imports;
  1348. # 2) it ensures that inline child objects pick up real database IDs even if
  1349. # those are absent from the revision data. (If this wasn't the case, the child
  1350. # objects would be recreated with new IDs on next publish - see #1853)
  1351. return self.specific
  1352. latest_revision = self.get_latest_revision()
  1353. if latest_revision:
  1354. return latest_revision.as_object()
  1355. else:
  1356. return self.specific
  1357. def update_aliases(
  1358. self, *, revision=None, user=None, _content=None, _updated_ids=None
  1359. ):
  1360. """
  1361. Publishes all aliases that follow this page with the latest content from this page.
  1362. This is called by Wagtail whenever a page with aliases is published.
  1363. :param revision: The revision of the original page that we are updating to (used for logging purposes).
  1364. :type revision: Revision, optional
  1365. :param user: The user who is publishing (used for logging purposes).
  1366. :type user: User, optional
  1367. """
  1368. specific_self = self.specific
  1369. # Only compute this if necessary since it's quite a heavy operation
  1370. if _content is None:
  1371. _content = self.serializable_data()
  1372. # A list of IDs that have already been updated. This is just in case someone has
  1373. # created an alias loop (which is impossible to do with the UI Wagtail provides)
  1374. _updated_ids = _updated_ids or []
  1375. for alias in self.specific_class.objects.filter(alias_of=self).exclude(
  1376. id__in=_updated_ids
  1377. ):
  1378. # FIXME: Switch to the same fields that are excluded from copy
  1379. # We can't do this right now because we can't exclude fields from with_content_json
  1380. exclude_fields = [
  1381. "id",
  1382. "path",
  1383. "depth",
  1384. "numchild",
  1385. "url_path",
  1386. "path",
  1387. "index_entries",
  1388. "postgres_index_entries",
  1389. ]
  1390. # Copy field content
  1391. alias_updated = alias.with_content_json(_content)
  1392. # Publish the alias if it's currently in draft
  1393. alias_updated.live = True
  1394. alias_updated.has_unpublished_changes = False
  1395. # Copy child relations
  1396. child_object_map = specific_self.copy_all_child_relations(
  1397. target=alias_updated, exclude=exclude_fields
  1398. )
  1399. # Process child objects
  1400. # This has two jobs:
  1401. # - If the alias is in a different locale, this updates the
  1402. # locale of any translatable child objects to match
  1403. # - If the alias is not a translation of the original, this
  1404. # changes the translation_key field of all child objects
  1405. # so they do not clash
  1406. if child_object_map:
  1407. alias_is_translation = alias.translation_key == self.translation_key
  1408. def process_child_object(child_object):
  1409. if isinstance(child_object, TranslatableMixin):
  1410. # Child object's locale must always match the page
  1411. child_object.locale = alias_updated.locale
  1412. # If the alias isn't a translation of the original page,
  1413. # change the child object's translation_keys so they are
  1414. # not either
  1415. if not alias_is_translation:
  1416. child_object.translation_key = uuid.uuid4()
  1417. for (rel, previous_id), child_objects in child_object_map.items():
  1418. if previous_id is None:
  1419. for child_object in child_objects:
  1420. process_child_object(child_object)
  1421. else:
  1422. process_child_object(child_objects)
  1423. # Copy M2M relations
  1424. _copy_m2m_relations(
  1425. specific_self, alias_updated, exclude_fields=exclude_fields
  1426. )
  1427. # Don't change the aliases slug
  1428. # Aliases can have their own slugs so they can be siblings of the original
  1429. alias_updated.slug = alias.slug
  1430. alias_updated.set_url_path(alias_updated.get_parent())
  1431. # Aliases don't have revisions, so update fields that would normally be updated by save_revision
  1432. alias_updated.draft_title = alias_updated.title
  1433. alias_updated.latest_revision_created_at = self.latest_revision_created_at
  1434. alias_updated.save(clean=False)
  1435. page_published.send(
  1436. sender=alias_updated.specific_class,
  1437. instance=alias_updated,
  1438. revision=revision,
  1439. alias=True,
  1440. )
  1441. # Log the publish of the alias
  1442. log(
  1443. instance=alias_updated,
  1444. action="wagtail.publish",
  1445. user=user,
  1446. )
  1447. # Update any aliases of that alias
  1448. # Design note:
  1449. # It could be argued that this will be faster if we just changed these alias-of-alias
  1450. # pages to all point to the original page and avoid having to update them recursively.
  1451. #
  1452. # But, it's useful to have a record of how aliases have been chained.
  1453. # For example, In Wagtail Localize, we use aliases to create mirrored trees, but those
  1454. # trees themselves could have aliases within them. If an alias within a tree is
  1455. # converted to a regular page, we want the alias in the mirrored tree to follow that
  1456. # new page and stop receiving updates from the original page.
  1457. #
  1458. # Doing it this way requires an extra lookup query per alias but this is small in
  1459. # comparison to the work required to update the alias.
  1460. alias.update_aliases(
  1461. revision=revision,
  1462. _content=_content,
  1463. _updated_ids=_updated_ids,
  1464. )
  1465. update_aliases.alters_data = True
  1466. def publish(
  1467. self, revision, user=None, changed=True, log_action=True, previous_revision=None
  1468. ):
  1469. return PublishPageRevisionAction(
  1470. revision,
  1471. user=user,
  1472. changed=changed,
  1473. log_action=log_action,
  1474. previous_revision=previous_revision,
  1475. ).execute()
  1476. def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
  1477. return UnpublishPageAction(
  1478. self,
  1479. set_expired=set_expired,
  1480. commit=commit,
  1481. user=user,
  1482. log_action=log_action,
  1483. ).execute()
  1484. context_object_name = None
  1485. def get_context(self, request, *args, **kwargs):
  1486. context = {
  1487. PAGE_TEMPLATE_VAR: self,
  1488. "self": self,
  1489. "request": request,
  1490. }
  1491. if self.context_object_name:
  1492. context[self.context_object_name] = self
  1493. return context
  1494. def get_preview_context(self, request, mode_name):
  1495. return self.get_context(request)
  1496. def get_template(self, request, *args, **kwargs):
  1497. if request.headers.get("x-requested-with") == "XMLHttpRequest":
  1498. return self.ajax_template or self.template
  1499. else:
  1500. return self.template
  1501. def get_preview_template(self, request, mode_name):
  1502. return self.get_template(request)
  1503. def serve(self, request, *args, **kwargs):
  1504. request.is_preview = False
  1505. return TemplateResponse(
  1506. request,
  1507. self.get_template(request, *args, **kwargs),
  1508. self.get_context(request, *args, **kwargs),
  1509. )
  1510. def is_navigable(self):
  1511. """
  1512. Return true if it's meaningful to browse subpages of this page -
  1513. i.e. it currently has subpages,
  1514. or it's at the top level (this rule necessary for empty out-of-the-box sites to have working navigation)
  1515. """
  1516. return (not self.is_leaf()) or self.depth == 2
  1517. def _get_site_root_paths(self, request=None):
  1518. """
  1519. Return ``Site.get_site_root_paths()``, using the cached copy on the
  1520. request object if available.
  1521. """
  1522. # if we have a request, use that to cache site_root_paths; otherwise, use self
  1523. cache_object = request if request else self
  1524. try:
  1525. return cache_object._wagtail_cached_site_root_paths
  1526. except AttributeError:
  1527. cache_object._wagtail_cached_site_root_paths = Site.get_site_root_paths()
  1528. return cache_object._wagtail_cached_site_root_paths
  1529. def _get_relevant_site_root_paths(self, cache_object=None):
  1530. """
  1531. .. versionadded::2.16
  1532. Returns a tuple of root paths for all sites this page belongs to.
  1533. """
  1534. return tuple(
  1535. srp
  1536. for srp in self._get_site_root_paths(cache_object)
  1537. if self.url_path.startswith(srp.root_path)
  1538. )
  1539. def get_url_parts(self, request=None):
  1540. """
  1541. Determine the URL for this page and return it as a tuple of
  1542. ``(site_id, site_root_url, page_url_relative_to_site_root)``.
  1543. Return None if the page is not routable.
  1544. This is used internally by the ``full_url``, ``url``, ``relative_url``
  1545. and ``get_site`` properties and methods; pages with custom URL routing
  1546. should override this method in order to have those operations return
  1547. the custom URLs.
  1548. Accepts an optional keyword argument ``request``, which may be used
  1549. to avoid repeated database / cache lookups. Typically, a page model
  1550. that overrides ``get_url_parts`` should not need to deal with
  1551. ``request`` directly, and should just pass it to the original method
  1552. when calling ``super``.
  1553. """
  1554. possible_sites = self._get_relevant_site_root_paths(request)
  1555. if not possible_sites:
  1556. return None
  1557. site_id, root_path, root_url, language_code = possible_sites[0]
  1558. site = Site.find_for_request(request)
  1559. if site:
  1560. for site_id, root_path, root_url, language_code in possible_sites:
  1561. if site_id == site.pk:
  1562. break
  1563. else:
  1564. site_id, root_path, root_url, language_code = possible_sites[0]
  1565. use_wagtail_i18n = getattr(settings, "WAGTAIL_I18N_ENABLED", False)
  1566. if use_wagtail_i18n:
  1567. # If the active language code is a variant of the page's language, then
  1568. # use that instead
  1569. # This is used when LANGUAGES contain more languages than WAGTAIL_CONTENT_LANGUAGES
  1570. try:
  1571. if (
  1572. get_supported_content_language_variant(translation.get_language())
  1573. == language_code
  1574. ):
  1575. language_code = translation.get_language()
  1576. except LookupError:
  1577. # active language code is not a recognised content language, so leave
  1578. # page's language code unchanged
  1579. pass
  1580. # The page may not be routable because wagtail_serve is not registered
  1581. # This may be the case if Wagtail is used headless
  1582. try:
  1583. if use_wagtail_i18n:
  1584. with translation.override(language_code):
  1585. page_path = reverse(
  1586. "wagtail_serve", args=(self.url_path[len(root_path) :],)
  1587. )
  1588. else:
  1589. page_path = reverse(
  1590. "wagtail_serve", args=(self.url_path[len(root_path) :],)
  1591. )
  1592. except NoReverseMatch:
  1593. return (site_id, None, None)
  1594. # Remove the trailing slash from the URL reverse generates if
  1595. # WAGTAIL_APPEND_SLASH is False and we're not trying to serve
  1596. # the root path
  1597. if not WAGTAIL_APPEND_SLASH and page_path != "/":
  1598. page_path = page_path.rstrip("/")
  1599. return (site_id, root_url, page_path)
  1600. def get_full_url(self, request=None):
  1601. """Return the full URL (including protocol / domain) to this page, or None if it is not routable"""
  1602. url_parts = self.get_url_parts(request=request)
  1603. if url_parts is None or url_parts[1] is None and url_parts[2] is None:
  1604. # page is not routable
  1605. return
  1606. site_id, root_url, page_path = url_parts
  1607. return root_url + page_path
  1608. full_url = property(get_full_url)
  1609. def get_url(self, request=None, current_site=None):
  1610. """
  1611. Return the 'most appropriate' URL for referring to this page from the pages we serve,
  1612. within the Wagtail backend and actual website templates;
  1613. this is the local URL (starting with '/') if we're only running a single site
  1614. (i.e. we know that whatever the current page is being served from, this link will be on the
  1615. same domain), and the full URL (with domain) if not.
  1616. Return None if the page is not routable.
  1617. Accepts an optional but recommended ``request`` keyword argument that, if provided, will
  1618. be used to cache site-level URL information (thereby avoiding repeated database / cache
  1619. lookups) and, via the ``Site.find_for_request()`` function, determine whether a relative
  1620. or full URL is most appropriate.
  1621. """
  1622. # ``current_site`` is purposefully undocumented, as one can simply pass the request and get
  1623. # a relative URL based on ``Site.find_for_request()``. Nonetheless, support it here to avoid
  1624. # copy/pasting the code to the ``relative_url`` method below.
  1625. if current_site is None and request is not None:
  1626. site = Site.find_for_request(request)
  1627. current_site = site
  1628. url_parts = self.get_url_parts(request=request)
  1629. if url_parts is None or url_parts[1] is None and url_parts[2] is None:
  1630. # page is not routable
  1631. return
  1632. site_id, root_url, page_path = url_parts
  1633. # Get number of unique sites in root paths
  1634. # Note: there may be more root paths to sites if there are multiple languages
  1635. num_sites = len(
  1636. {root_path[0] for root_path in self._get_site_root_paths(request)}
  1637. )
  1638. if (current_site is not None and site_id == current_site.id) or num_sites == 1:
  1639. # the site matches OR we're only running a single site, so a local URL is sufficient
  1640. return page_path
  1641. else:
  1642. return root_url + page_path
  1643. url = property(get_url)
  1644. def relative_url(self, current_site, request=None):
  1645. """
  1646. Return the 'most appropriate' URL for this page taking into account the site we're currently on;
  1647. a local URL if the site matches, or a fully qualified one otherwise.
  1648. Return None if the page is not routable.
  1649. Accepts an optional but recommended ``request`` keyword argument that, if provided, will
  1650. be used to cache site-level URL information (thereby avoiding repeated database / cache
  1651. lookups).
  1652. """
  1653. return self.get_url(request=request, current_site=current_site)
  1654. def get_site(self):
  1655. """
  1656. Return the Site object that this page belongs to.
  1657. """
  1658. url_parts = self.get_url_parts()
  1659. if url_parts is None:
  1660. # page is not routable
  1661. return
  1662. site_id, root_url, page_path = url_parts
  1663. return Site.objects.get(id=site_id)
  1664. @classmethod
  1665. def get_indexed_objects(cls):
  1666. content_type = ContentType.objects.get_for_model(cls)
  1667. return super(Page, cls).get_indexed_objects().filter(content_type=content_type)
  1668. def get_indexed_instance(self):
  1669. # This is accessed on save by the wagtailsearch signal handler, and in edge
  1670. # cases (e.g. loading test fixtures), may be called before the specific instance's
  1671. # entry has been created. In those cases, we aren't ready to be indexed yet, so
  1672. # return None.
  1673. try:
  1674. return self.specific
  1675. except self.specific_class.DoesNotExist:
  1676. return None
  1677. @classmethod
  1678. def clean_subpage_models(cls):
  1679. """
  1680. Returns the list of subpage types, normalised as model classes.
  1681. Throws ValueError if any entry in subpage_types cannot be recognised as a model name,
  1682. or LookupError if a model does not exist (or is not a Page subclass).
  1683. """
  1684. if cls._clean_subpage_models is None:
  1685. subpage_types = getattr(cls, "subpage_types", None)
  1686. if subpage_types is None:
  1687. # if subpage_types is not specified on the Page class, allow all page types as subpages
  1688. cls._clean_subpage_models = get_page_models()
  1689. else:
  1690. cls._clean_subpage_models = [
  1691. resolve_model_string(model_string, cls._meta.app_label)
  1692. for model_string in subpage_types
  1693. ]
  1694. for model in cls._clean_subpage_models:
  1695. if not issubclass(model, Page):
  1696. raise LookupError("%s is not a Page subclass" % model)
  1697. return cls._clean_subpage_models
  1698. @classmethod
  1699. def clean_parent_page_models(cls):
  1700. """
  1701. Returns the list of parent page types, normalised as model classes.
  1702. Throws ValueError if any entry in parent_page_types cannot be recognised as a model name,
  1703. or LookupError if a model does not exist (or is not a Page subclass).
  1704. """
  1705. if cls._clean_parent_page_models is None:
  1706. parent_page_types = getattr(cls, "parent_page_types", None)
  1707. if parent_page_types is None:
  1708. # if parent_page_types is not specified on the Page class, allow all page types as subpages
  1709. cls._clean_parent_page_models = get_page_models()
  1710. else:
  1711. cls._clean_parent_page_models = [
  1712. resolve_model_string(model_string, cls._meta.app_label)
  1713. for model_string in parent_page_types
  1714. ]
  1715. for model in cls._clean_parent_page_models:
  1716. if not issubclass(model, Page):
  1717. raise LookupError("%s is not a Page subclass" % model)
  1718. return cls._clean_parent_page_models
  1719. @classmethod
  1720. def allowed_parent_page_models(cls):
  1721. """
  1722. Returns the list of page types that this page type can be a subpage of,
  1723. as a list of model classes
  1724. """
  1725. return [
  1726. parent_model
  1727. for parent_model in cls.clean_parent_page_models()
  1728. if cls in parent_model.clean_subpage_models()
  1729. ]
  1730. @classmethod
  1731. def allowed_subpage_models(cls):
  1732. """
  1733. Returns the list of page types that this page type can have as subpages,
  1734. as a list of model classes
  1735. """
  1736. return [
  1737. subpage_model
  1738. for subpage_model in cls.clean_subpage_models()
  1739. if cls in subpage_model.clean_parent_page_models()
  1740. ]
  1741. @classmethod
  1742. def creatable_subpage_models(cls):
  1743. """
  1744. Returns the list of page types that may be created under this page type,
  1745. as a list of model classes
  1746. """
  1747. return [
  1748. page_model
  1749. for page_model in cls.allowed_subpage_models()
  1750. if page_model.is_creatable
  1751. ]
  1752. @classmethod
  1753. def can_exist_under(cls, parent):
  1754. """
  1755. Checks if this page type can exist as a subpage under a parent page
  1756. instance.
  1757. See also: :func:`Page.can_create_at` and :func:`Page.can_move_to`
  1758. """
  1759. return cls in parent.specific_class.allowed_subpage_models()
  1760. @classmethod
  1761. def can_create_at(cls, parent):
  1762. """
  1763. Checks if this page type can be created as a subpage under a parent
  1764. page instance.
  1765. """
  1766. can_create = cls.is_creatable and cls.can_exist_under(parent)
  1767. if cls.max_count is not None:
  1768. can_create = can_create and cls.objects.count() < cls.max_count
  1769. if cls.max_count_per_parent is not None:
  1770. can_create = (
  1771. can_create
  1772. and parent.get_children().type(cls).count() < cls.max_count_per_parent
  1773. )
  1774. return can_create
  1775. def can_move_to(self, parent):
  1776. """
  1777. Checks if this page instance can be moved to be a subpage of a parent
  1778. page instance.
  1779. """
  1780. # Prevent pages from being moved to different language sections
  1781. # The only page that can have multi-lingual children is the root page
  1782. parent_is_root = parent.depth == 1
  1783. if not parent_is_root and parent.locale_id != self.locale_id:
  1784. return False
  1785. return self.can_exist_under(parent)
  1786. @classmethod
  1787. def get_verbose_name(cls):
  1788. """
  1789. Returns the human-readable "verbose name" of this page model e.g "Blog page".
  1790. """
  1791. # This is similar to doing cls._meta.verbose_name.title()
  1792. # except this doesn't convert any characters to lowercase
  1793. return capfirst(cls._meta.verbose_name)
  1794. @classmethod
  1795. def get_page_description(cls):
  1796. """
  1797. Returns a page description if it's set. For example "A multi-purpose web page".
  1798. """
  1799. description = getattr(cls, "page_description", None)
  1800. # make sure that page_description is actually a string rather than a model field
  1801. if isinstance(description, str):
  1802. return description
  1803. elif getattr(description, "_delegate_text", None):
  1804. # description is a lazy object (e.g. the result of gettext_lazy())
  1805. return str(description)
  1806. else:
  1807. return ""
  1808. @property
  1809. def status_string(self):
  1810. if not self.live:
  1811. if self.expired:
  1812. return _("expired")
  1813. elif self.approved_schedule:
  1814. return _("scheduled")
  1815. elif self.workflow_in_progress:
  1816. return _("in moderation")
  1817. else:
  1818. return _("draft")
  1819. else:
  1820. if self.approved_schedule:
  1821. return _("live + scheduled")
  1822. elif self.workflow_in_progress:
  1823. return _("live + in moderation")
  1824. elif self.has_unpublished_changes:
  1825. return _("live + draft")
  1826. else:
  1827. return _("live")
  1828. @property
  1829. def approved_schedule(self):
  1830. # `_approved_schedule` may be populated by `annotate_approved_schedule` on `PageQuerySet` as a
  1831. # performance optimisation
  1832. if hasattr(self, "_approved_schedule"):
  1833. return self._approved_schedule
  1834. return self.scheduled_revision is not None
  1835. def has_unpublished_subtree(self):
  1836. """
  1837. An awkwardly-defined flag used in determining whether unprivileged editors have
  1838. permission to delete this article. Returns true if and only if this page is non-live,
  1839. and it has no live children.
  1840. """
  1841. return (not self.live) and (
  1842. not self.get_descendants().filter(live=True).exists()
  1843. )
  1844. def move(self, target, pos=None, user=None):
  1845. """
  1846. Extension to the treebeard 'move' method to ensure that url_path is updated,
  1847. and to emit a 'pre_page_move' and 'post_page_move' signals.
  1848. """
  1849. return MovePageAction(self, target, pos=pos, user=user).execute()
  1850. def copy(
  1851. self,
  1852. recursive=False,
  1853. to=None,
  1854. update_attrs=None,
  1855. copy_revisions=True,
  1856. keep_live=True,
  1857. user=None,
  1858. process_child_object=None,
  1859. exclude_fields=None,
  1860. log_action="wagtail.copy",
  1861. reset_translation_key=True,
  1862. ):
  1863. """
  1864. Copies a given page
  1865. :param log_action flag for logging the action. Pass None to skip logging.
  1866. Can be passed an action string. Defaults to 'wagtail.copy'
  1867. """
  1868. return CopyPageAction(
  1869. self,
  1870. to=to,
  1871. update_attrs=update_attrs,
  1872. exclude_fields=exclude_fields,
  1873. recursive=recursive,
  1874. copy_revisions=copy_revisions,
  1875. keep_live=keep_live,
  1876. user=user,
  1877. process_child_object=process_child_object,
  1878. log_action=log_action,
  1879. reset_translation_key=reset_translation_key,
  1880. ).execute(skip_permission_checks=True)
  1881. copy.alters_data = True
  1882. def create_alias(
  1883. self,
  1884. *,
  1885. recursive=False,
  1886. parent=None,
  1887. update_slug=None,
  1888. update_locale=None,
  1889. user=None,
  1890. log_action="wagtail.create_alias",
  1891. reset_translation_key=True,
  1892. _mpnode_attrs=None,
  1893. ):
  1894. return CreatePageAliasAction(
  1895. self,
  1896. recursive=recursive,
  1897. parent=parent,
  1898. update_slug=update_slug,
  1899. update_locale=update_locale,
  1900. user=user,
  1901. log_action=log_action,
  1902. reset_translation_key=reset_translation_key,
  1903. _mpnode_attrs=_mpnode_attrs,
  1904. ).execute()
  1905. create_alias.alters_data = True
  1906. def copy_for_translation(
  1907. self, locale, copy_parents=False, alias=False, exclude_fields=None
  1908. ):
  1909. """Creates a copy of this page in the specified locale."""
  1910. return CopyPageForTranslationAction(
  1911. self,
  1912. locale,
  1913. copy_parents=copy_parents,
  1914. alias=alias,
  1915. exclude_fields=exclude_fields,
  1916. ).execute()
  1917. copy_for_translation.alters_data = True
  1918. def permissions_for_user(self, user):
  1919. """
  1920. Return a PagePermissionsTester object defining what actions the user can perform on this page
  1921. """
  1922. user_perms = UserPagePermissionsProxy(user)
  1923. return user_perms.for_page(self)
  1924. def is_previewable(self):
  1925. """Returns True if at least one preview mode is specified"""
  1926. # It's possible that this will be called from a listing page using a plain Page queryset -
  1927. # if so, checking self.preview_modes would incorrectly give us the default set from
  1928. # Page.preview_modes. However, accessing self.specific.preview_modes would result in an N+1
  1929. # query problem. To avoid this (at least in the general case), we'll call .specific only if
  1930. # a check of the property at the class level indicates that preview_modes has been
  1931. # overridden from whatever type we're currently in.
  1932. page = self
  1933. if page.specific_class.preview_modes != type(page).preview_modes:
  1934. page = page.specific
  1935. return bool(page.preview_modes)
  1936. def get_lock(self):
  1937. # Standard locking should take precedence over workflow locking
  1938. # because it's possible for both to be used at the same time
  1939. lock = super().get_lock()
  1940. if lock:
  1941. return lock
  1942. current_workflow_task = self.current_workflow_task
  1943. if current_workflow_task:
  1944. return WorkflowLock(current_workflow_task, self)
  1945. def get_route_paths(self):
  1946. """
  1947. .. versionadded:: 2.16
  1948. Returns a list of paths that this page can be viewed at.
  1949. These values are combined with the dynamic portion of the page URL to
  1950. automatically create redirects when the page's URL changes.
  1951. .. note::
  1952. If using ``RoutablePageMixin``, you may want to override this method
  1953. to include the paths of popualar routes.
  1954. .. note::
  1955. Redirect paths are 'normalized' to apply consistent ordering to GET parameters,
  1956. so you don't need to include every variation. Fragment identifiers are discarded
  1957. too, so should be avoided.
  1958. """
  1959. return ["/"]
  1960. def get_cached_paths(self):
  1961. """
  1962. This returns a list of paths to invalidate in a frontend cache
  1963. """
  1964. return ["/"]
  1965. def get_sitemap_urls(self, request=None):
  1966. return [
  1967. {
  1968. "location": self.get_full_url(request),
  1969. # fall back on latest_revision_created_at if last_published_at is null
  1970. # (for backwards compatibility from before last_published_at was added)
  1971. "lastmod": (self.last_published_at or self.latest_revision_created_at),
  1972. }
  1973. ]
  1974. def get_static_site_paths(self):
  1975. """
  1976. This is a generator of URL paths to feed into a static site generator
  1977. Override this if you would like to create static versions of subpages
  1978. """
  1979. # Yield path for this page
  1980. yield "/"
  1981. # Yield paths for child pages
  1982. for child in self.get_children().live():
  1983. for path in child.specific.get_static_site_paths():
  1984. yield "/" + child.slug + path
  1985. def get_ancestors(self, inclusive=False):
  1986. """
  1987. Returns a queryset of the current page's ancestors, starting at the root page
  1988. and descending to the parent, or to the current page itself if ``inclusive`` is true.
  1989. """
  1990. return Page.objects.ancestor_of(self, inclusive)
  1991. def get_descendants(self, inclusive=False):
  1992. """
  1993. Returns a queryset of all pages underneath the current page, any number of levels deep.
  1994. If ``inclusive`` is true, the current page itself is included in the queryset.
  1995. """
  1996. return Page.objects.descendant_of(self, inclusive)
  1997. def get_siblings(self, inclusive=True):
  1998. """
  1999. Returns a queryset of all other pages with the same parent as the current page.
  2000. If ``inclusive`` is true, the current page itself is included in the queryset.
  2001. """
  2002. return Page.objects.sibling_of(self, inclusive)
  2003. def get_next_siblings(self, inclusive=False):
  2004. return self.get_siblings(inclusive).filter(path__gte=self.path).order_by("path")
  2005. def get_prev_siblings(self, inclusive=False):
  2006. return (
  2007. self.get_siblings(inclusive).filter(path__lte=self.path).order_by("-path")
  2008. )
  2009. def get_view_restrictions(self):
  2010. """
  2011. Return a query set of all page view restrictions that apply to this page.
  2012. This checks the current page and all ancestor pages for page view restrictions.
  2013. If any of those pages are aliases, it will resolve them to their source pages
  2014. before querying PageViewRestrictions so alias pages use the same view restrictions
  2015. as their source page and they cannot have their own.
  2016. """
  2017. page_ids_to_check = set()
  2018. def add_page_to_check_list(page):
  2019. # If the page is an alias, add the source page to the check list instead
  2020. if page.alias_of:
  2021. add_page_to_check_list(page.alias_of)
  2022. else:
  2023. page_ids_to_check.add(page.id)
  2024. # Check current page for view restrictions
  2025. add_page_to_check_list(self)
  2026. # Check each ancestor for view restrictions as well
  2027. for page in self.get_ancestors().only("alias_of"):
  2028. add_page_to_check_list(page)
  2029. return PageViewRestriction.objects.filter(page_id__in=page_ids_to_check)
  2030. password_required_template = getattr(
  2031. settings, "PASSWORD_REQUIRED_TEMPLATE", "wagtailcore/password_required.html"
  2032. )
  2033. def serve_password_required_response(self, request, form, action_url):
  2034. """
  2035. Serve a response indicating that the user has been denied access to view this page,
  2036. and must supply a password.
  2037. form = a Django form object containing the password input
  2038. (and zero or more hidden fields that also need to be output on the template)
  2039. action_url = URL that this form should be POSTed to
  2040. """
  2041. context = self.get_context(request)
  2042. context["form"] = form
  2043. context["action_url"] = action_url
  2044. return TemplateResponse(request, self.password_required_template, context)
  2045. def with_content_json(self, content):
  2046. """
  2047. Returns a new version of the page with field values updated to reflect changes
  2048. in the provided ``content`` (which usually comes from a previously-saved
  2049. page revision).
  2050. Certain field values are preserved in order to prevent errors if the returned
  2051. page is saved, such as ``id``, ``content_type`` and some tree-related values.
  2052. The following field values are also preserved, as they are considered to be
  2053. meaningful to the page as a whole, rather than to a specific revision:
  2054. * ``draft_title``
  2055. * ``live``
  2056. * ``has_unpublished_changes``
  2057. * ``owner``
  2058. * ``locked``
  2059. * ``locked_by``
  2060. * ``locked_at``
  2061. * ``latest_revision``
  2062. * ``latest_revision_created_at``
  2063. * ``first_published_at``
  2064. * ``alias_of``
  2065. * ``wagtail_admin_comments`` (COMMENTS_RELATION_NAME)
  2066. """
  2067. # Old revisions (pre Wagtail 2.15) may have saved comment data under the name 'comments'
  2068. # rather than the current relation name as set by COMMENTS_RELATION_NAME;
  2069. # if a 'comments' field exists and looks like our comments model, alter the data to use
  2070. # COMMENTS_RELATION_NAME before restoring
  2071. if (
  2072. COMMENTS_RELATION_NAME not in content
  2073. and "comments" in content
  2074. and isinstance(content["comments"], list)
  2075. and len(content["comments"])
  2076. and isinstance(content["comments"][0], dict)
  2077. and "contentpath" in content["comments"][0]
  2078. ):
  2079. content[COMMENTS_RELATION_NAME] = content["comments"]
  2080. del content["comments"]
  2081. obj = self.specific_class.from_serializable_data(content)
  2082. # These should definitely never change between revisions
  2083. obj.id = self.id
  2084. obj.pk = self.pk
  2085. obj.content_type = self.content_type
  2086. # Override possibly-outdated tree parameter fields
  2087. obj.path = self.path
  2088. obj.depth = self.depth
  2089. obj.numchild = self.numchild
  2090. # Update url_path to reflect potential slug changes, but maintaining the page's
  2091. # existing tree position
  2092. obj.set_url_path(self.get_parent())
  2093. # Ensure other values that are meaningful for the page as a whole (rather than
  2094. # to a specific revision) are preserved
  2095. obj.draft_title = self.draft_title
  2096. obj.live = self.live
  2097. obj.has_unpublished_changes = self.has_unpublished_changes
  2098. obj.owner = self.owner
  2099. obj.locked = self.locked
  2100. obj.locked_by = self.locked_by
  2101. obj.locked_at = self.locked_at
  2102. obj.latest_revision = self.latest_revision
  2103. obj.latest_revision_created_at = self.latest_revision_created_at
  2104. obj.first_published_at = self.first_published_at
  2105. obj.translation_key = self.translation_key
  2106. obj.locale = self.locale
  2107. obj.alias_of_id = self.alias_of_id
  2108. revision_comments = getattr(obj, COMMENTS_RELATION_NAME)
  2109. page_comments = getattr(self, COMMENTS_RELATION_NAME).filter(
  2110. resolved_at__isnull=True
  2111. )
  2112. for comment in page_comments:
  2113. # attempt to retrieve the comment position from the revision's stored version
  2114. # of the comment
  2115. try:
  2116. revision_comment = revision_comments.get(id=comment.id)
  2117. comment.position = revision_comment.position
  2118. except Comment.DoesNotExist:
  2119. pass
  2120. setattr(obj, COMMENTS_RELATION_NAME, page_comments)
  2121. return obj
  2122. @property
  2123. def has_workflow(self):
  2124. """Returns True if the page or an ancestor has an active workflow assigned, otherwise False"""
  2125. if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
  2126. return False
  2127. return (
  2128. self.get_ancestors(inclusive=True)
  2129. .filter(workflowpage__isnull=False)
  2130. .filter(workflowpage__workflow__active=True)
  2131. .exists()
  2132. )
  2133. def get_workflow(self):
  2134. """Returns the active workflow assigned to the page or its nearest ancestor"""
  2135. if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
  2136. return None
  2137. if hasattr(self, "workflowpage") and self.workflowpage.workflow.active:
  2138. return self.workflowpage.workflow
  2139. else:
  2140. try:
  2141. workflow = (
  2142. self.get_ancestors()
  2143. .filter(workflowpage__isnull=False)
  2144. .filter(workflowpage__workflow__active=True)
  2145. .order_by("-depth")
  2146. .first()
  2147. .workflowpage.workflow
  2148. )
  2149. except AttributeError:
  2150. workflow = None
  2151. return workflow
  2152. @property
  2153. def workflow_in_progress(self):
  2154. """Returns True if a workflow is in progress on the current page, otherwise False"""
  2155. if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
  2156. return False
  2157. # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `PageQuerySet` as a
  2158. # performance optimisation
  2159. if hasattr(self, "_current_workflow_states"):
  2160. for state in self._current_workflow_states:
  2161. if state.status == WorkflowState.STATUS_IN_PROGRESS:
  2162. return True
  2163. return False
  2164. return WorkflowState.objects.filter(
  2165. page=self, status=WorkflowState.STATUS_IN_PROGRESS
  2166. ).exists()
  2167. @property
  2168. def current_workflow_state(self):
  2169. """Returns the in progress or needs changes workflow state on this page, if it exists"""
  2170. if not getattr(settings, "WAGTAIL_WORKFLOW_ENABLED", True):
  2171. return None
  2172. # `_current_workflow_states` may be populated by `prefetch_workflow_states` on `pagequeryset` as a
  2173. # performance optimisation
  2174. if hasattr(self, "_current_workflow_states"):
  2175. try:
  2176. return self._current_workflow_states[0]
  2177. except IndexError:
  2178. return
  2179. try:
  2180. return (
  2181. WorkflowState.objects.active()
  2182. .select_related("current_task_state__task")
  2183. .get(page=self)
  2184. )
  2185. except WorkflowState.DoesNotExist:
  2186. return
  2187. @property
  2188. def current_workflow_task_state(self):
  2189. """Returns (specific class of) the current task state of the workflow on this page, if it exists"""
  2190. current_workflow_state = self.current_workflow_state
  2191. if (
  2192. current_workflow_state
  2193. and current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS
  2194. and current_workflow_state.current_task_state
  2195. ):
  2196. return current_workflow_state.current_task_state.specific
  2197. @property
  2198. def current_workflow_task(self):
  2199. """Returns (specific class of) the current task in progress on this page, if it exists"""
  2200. current_workflow_task_state = self.current_workflow_task_state
  2201. if current_workflow_task_state:
  2202. return current_workflow_task_state.task.specific
  2203. class Meta:
  2204. verbose_name = _("page")
  2205. verbose_name_plural = _("pages")
  2206. unique_together = [("translation_key", "locale")]
  2207. class Orderable(models.Model):
  2208. sort_order = models.IntegerField(null=True, blank=True, editable=False)
  2209. sort_order_field = "sort_order"
  2210. class Meta:
  2211. abstract = True
  2212. ordering = ["sort_order"]
  2213. class RevisionQuerySet(models.QuerySet):
  2214. def page_revisions(self):
  2215. return self.filter(base_content_type=get_default_page_content_type())
  2216. def submitted(self):
  2217. return self.filter(submitted_for_moderation=True)
  2218. def for_instance(self, instance):
  2219. return self.filter(
  2220. content_type=ContentType.objects.get_for_model(
  2221. instance, for_concrete_model=False
  2222. ),
  2223. object_id=str(instance.pk),
  2224. )
  2225. class RevisionsManager(models.Manager):
  2226. def get_queryset(self):
  2227. return RevisionQuerySet(self.model, using=self._db)
  2228. def for_instance(self, instance):
  2229. return self.get_queryset().for_instance(instance)
  2230. class PageRevisionsManager(RevisionsManager):
  2231. def get_queryset(self):
  2232. return RevisionQuerySet(self.model, using=self._db).page_revisions()
  2233. def submitted(self):
  2234. return self.get_queryset().submitted()
  2235. class SubmittedRevisionsManager(models.Manager):
  2236. def get_queryset(self):
  2237. return RevisionQuerySet(self.model, using=self._db).submitted()
  2238. class Revision(models.Model):
  2239. content_type = models.ForeignKey(
  2240. ContentType, on_delete=models.CASCADE, related_name="+"
  2241. )
  2242. base_content_type = models.ForeignKey(
  2243. ContentType, on_delete=models.CASCADE, related_name="+"
  2244. )
  2245. object_id = models.CharField(
  2246. max_length=255,
  2247. verbose_name=_("object id"),
  2248. )
  2249. submitted_for_moderation = models.BooleanField(
  2250. verbose_name=_("submitted for moderation"), default=False, db_index=True
  2251. )
  2252. created_at = models.DateTimeField(db_index=True, verbose_name=_("created at"))
  2253. user = models.ForeignKey(
  2254. settings.AUTH_USER_MODEL,
  2255. verbose_name=_("user"),
  2256. null=True,
  2257. blank=True,
  2258. on_delete=models.SET_NULL,
  2259. related_name="wagtail_revisions",
  2260. )
  2261. object_str = models.TextField(default="")
  2262. content = models.JSONField(
  2263. verbose_name=_("content JSON"), encoder=DjangoJSONEncoder
  2264. )
  2265. approved_go_live_at = models.DateTimeField(
  2266. verbose_name=_("approved go live at"), null=True, blank=True, db_index=True
  2267. )
  2268. objects = RevisionsManager()
  2269. page_revisions = PageRevisionsManager()
  2270. submitted_revisions = SubmittedRevisionsManager()
  2271. content_object = GenericForeignKey(
  2272. "content_type", "object_id", for_concrete_model=False
  2273. )
  2274. wagtail_reference_index_ignore = True
  2275. @cached_property
  2276. def base_content_object(self):
  2277. return self.base_content_type.get_object_for_this_type(pk=self.object_id)
  2278. @property
  2279. def page(self):
  2280. warnings.warn(
  2281. "Revisions should access .content_object instead of .page "
  2282. "to retrieve the object.",
  2283. category=RemovedInWagtail50Warning,
  2284. stacklevel=2,
  2285. )
  2286. return self.content_object
  2287. @property
  2288. def page_id(self):
  2289. warnings.warn(
  2290. "Revisions should access .object_id instead of .page_id "
  2291. "to retrieve the object's primary key. For page revisions, "
  2292. "you may need to cast the object_id to integer first.",
  2293. category=RemovedInWagtail50Warning,
  2294. stacklevel=2,
  2295. )
  2296. return int(self.object_id)
  2297. def save(self, user=None, *args, **kwargs):
  2298. # Set default value for created_at to now
  2299. # We cannot use auto_now_add as that will override
  2300. # any value that is set before saving
  2301. if self.created_at is None:
  2302. self.created_at = timezone.now()
  2303. # Set default value for base_content_type to the content_type.
  2304. # Page revisions should set this to the default Page model's content type,
  2305. # but the distinction may not be necessary for models that do not use inheritance.
  2306. if self.base_content_type_id is None:
  2307. self.base_content_type_id = self.content_type_id
  2308. super().save(*args, **kwargs)
  2309. if self.submitted_for_moderation:
  2310. # ensure that all other revisions of this object have the 'submitted for moderation' flag unset
  2311. Revision.objects.filter(
  2312. base_content_type_id=self.base_content_type_id,
  2313. object_id=self.object_id,
  2314. ).exclude(id=self.id).update(submitted_for_moderation=False)
  2315. if (
  2316. self.approved_go_live_at is None
  2317. and "update_fields" in kwargs
  2318. and "approved_go_live_at" in kwargs["update_fields"]
  2319. ):
  2320. # Log scheduled revision publish cancellation
  2321. object = self.as_object()
  2322. # go_live_at = kwargs['update_fields'][]
  2323. log(
  2324. instance=object,
  2325. action="wagtail.schedule.cancel",
  2326. data={
  2327. "revision": {
  2328. "id": self.id,
  2329. "created": self.created_at.strftime("%d %b %Y %H:%M"),
  2330. "go_live_at": object.go_live_at.strftime("%d %b %Y %H:%M")
  2331. if object.go_live_at
  2332. else None,
  2333. }
  2334. },
  2335. user=user,
  2336. revision=self,
  2337. )
  2338. def as_object(self):
  2339. return self.content_object.with_content_json(self.content)
  2340. def as_page_object(self):
  2341. warnings.warn(
  2342. "Revisions should use .as_object() instead of .as_page_object() "
  2343. "to create the object.",
  2344. category=RemovedInWagtail50Warning,
  2345. stacklevel=2,
  2346. )
  2347. return self.as_object()
  2348. def approve_moderation(self, user=None):
  2349. if self.submitted_for_moderation:
  2350. logger.info(
  2351. 'Page moderation approved: "%s" id=%d revision_id=%d',
  2352. self.content_object.title,
  2353. self.content_object.id,
  2354. self.id,
  2355. )
  2356. log(
  2357. instance=self.as_object(),
  2358. action="wagtail.moderation.approve",
  2359. user=user,
  2360. revision=self,
  2361. )
  2362. self.publish()
  2363. def reject_moderation(self, user=None):
  2364. if self.submitted_for_moderation:
  2365. logger.info(
  2366. 'Page moderation rejected: "%s" id=%d revision_id=%d',
  2367. self.content_object.title,
  2368. self.content_object.id,
  2369. self.id,
  2370. )
  2371. log(
  2372. instance=self.as_object(),
  2373. action="wagtail.moderation.reject",
  2374. user=user,
  2375. revision=self,
  2376. )
  2377. self.submitted_for_moderation = False
  2378. self.save(update_fields=["submitted_for_moderation"])
  2379. def is_latest_revision(self):
  2380. if self.id is None:
  2381. # special case: a revision without an ID is presumed to be newly-created and is thus
  2382. # newer than any revision that might exist in the database
  2383. return True
  2384. latest_revision = (
  2385. Revision.objects.filter(
  2386. base_content_type_id=self.base_content_type_id,
  2387. object_id=self.object_id,
  2388. )
  2389. .order_by("-created_at", "-id")
  2390. .first()
  2391. )
  2392. return latest_revision == self
  2393. def delete(self):
  2394. # Update revision_created fields for comments that reference the current revision, if applicable.
  2395. try:
  2396. next_revision = self.get_next()
  2397. except Revision.DoesNotExist:
  2398. next_revision = None
  2399. if next_revision:
  2400. # move comments created on this revision to the next revision, as they may well still apply if they're unresolved
  2401. self.created_comments.all().update(revision_created=next_revision)
  2402. return super().delete()
  2403. def publish(self, user=None, changed=True, log_action=True, previous_revision=None):
  2404. return self.content_object.publish(
  2405. self,
  2406. user=user,
  2407. changed=changed,
  2408. log_action=log_action,
  2409. previous_revision=previous_revision,
  2410. )
  2411. def get_previous(self):
  2412. return self.get_previous_by_created_at(
  2413. base_content_type_id=self.base_content_type_id,
  2414. object_id=self.object_id,
  2415. )
  2416. def get_next(self):
  2417. return self.get_next_by_created_at(
  2418. base_content_type_id=self.base_content_type_id,
  2419. object_id=self.object_id,
  2420. )
  2421. def __str__(self):
  2422. return '"' + str(self.content_object) + '" at ' + str(self.created_at)
  2423. class Meta:
  2424. verbose_name = _("revision")
  2425. verbose_name_plural = _("revisions")
  2426. indexes = [
  2427. models.Index(
  2428. fields=["content_type", "object_id"],
  2429. name="content_object_idx",
  2430. ),
  2431. models.Index(
  2432. fields=["base_content_type", "object_id"],
  2433. name="base_content_object_idx",
  2434. ),
  2435. ]
  2436. PAGE_PERMISSION_TYPES = [
  2437. ("add", _("Add"), _("Add/edit pages you own")),
  2438. ("edit", _("Edit"), _("Edit any page")),
  2439. ("publish", _("Publish"), _("Publish any page")),
  2440. ("bulk_delete", _("Bulk delete"), _("Delete pages with children")),
  2441. ("lock", _("Lock"), _("Lock/unlock pages you've locked")),
  2442. ("unlock", _("Unlock"), _("Unlock any page")),
  2443. ]
  2444. PAGE_PERMISSION_TYPE_CHOICES = [
  2445. (identifier, long_label)
  2446. for identifier, short_label, long_label in PAGE_PERMISSION_TYPES
  2447. ]
  2448. class GroupPagePermission(models.Model):
  2449. group = models.ForeignKey(
  2450. Group,
  2451. verbose_name=_("group"),
  2452. related_name="page_permissions",
  2453. on_delete=models.CASCADE,
  2454. )
  2455. page = models.ForeignKey(
  2456. "Page",
  2457. verbose_name=_("page"),
  2458. related_name="group_permissions",
  2459. on_delete=models.CASCADE,
  2460. )
  2461. permission_type = models.CharField(
  2462. verbose_name=_("permission type"),
  2463. max_length=20,
  2464. choices=PAGE_PERMISSION_TYPE_CHOICES,
  2465. )
  2466. class Meta:
  2467. unique_together = ("group", "page", "permission_type")
  2468. verbose_name = _("group page permission")
  2469. verbose_name_plural = _("group page permissions")
  2470. def __str__(self):
  2471. return "Group %d ('%s') has permission '%s' on page %d ('%s')" % (
  2472. self.group.id,
  2473. self.group,
  2474. self.permission_type,
  2475. self.page.id,
  2476. self.page,
  2477. )
  2478. class UserPagePermissionsProxy:
  2479. """Helper object that encapsulates all the page permission rules that this user has
  2480. across the page hierarchy."""
  2481. def __init__(self, user):
  2482. self.user = user
  2483. if user.is_active and not user.is_superuser:
  2484. self.permissions = GroupPagePermission.objects.filter(
  2485. group__user=self.user
  2486. ).select_related("page")
  2487. def revisions_for_moderation(self):
  2488. """Return a queryset of page revisions awaiting moderation that this user has publish permission on"""
  2489. # Deal with the trivial cases first...
  2490. if not self.user.is_active:
  2491. return Revision.objects.none()
  2492. if self.user.is_superuser:
  2493. return Revision.page_revisions.submitted()
  2494. # get the list of pages for which they have direct publish permission
  2495. # (i.e. they can publish any page within this subtree)
  2496. publishable_pages_paths = (
  2497. self.permissions.filter(permission_type="publish")
  2498. .values_list("page__path", flat=True)
  2499. .distinct()
  2500. )
  2501. if not publishable_pages_paths:
  2502. return Revision.objects.none()
  2503. # compile a filter expression to apply to the Revision.page_revisions.submitted() queryset:
  2504. # return only those pages whose paths start with one of the publishable_pages paths
  2505. only_my_sections = Q(path__startswith=publishable_pages_paths[0])
  2506. for page_path in publishable_pages_paths[1:]:
  2507. only_my_sections = only_my_sections | Q(path__startswith=page_path)
  2508. # return the filtered queryset
  2509. return Revision.page_revisions.submitted().filter(
  2510. object_id__in=Page.objects.filter(only_my_sections).values_list(
  2511. Cast("pk", output_field=models.CharField()), flat=True
  2512. )
  2513. )
  2514. def for_page(self, page):
  2515. """Return a PagePermissionTester object that can be used to query whether this user has
  2516. permission to perform specific tasks on the given page"""
  2517. return PagePermissionTester(self, page)
  2518. def explorable_pages(self):
  2519. """Return a queryset of pages that the user has access to view in the
  2520. explorer (e.g. add/edit/publish permission). Includes all pages with
  2521. specific group permissions and also the ancestors of those pages (in
  2522. order to enable navigation in the explorer)"""
  2523. # Deal with the trivial cases first...
  2524. if not self.user.is_active:
  2525. return Page.objects.none()
  2526. if self.user.is_superuser:
  2527. return Page.objects.all()
  2528. explorable_pages = Page.objects.none()
  2529. # Creates a union queryset of all objects the user has access to add,
  2530. # edit and publish
  2531. for perm in self.permissions.filter(
  2532. Q(permission_type="add")
  2533. | Q(permission_type="edit")
  2534. | Q(permission_type="publish")
  2535. | Q(permission_type="lock")
  2536. ):
  2537. explorable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
  2538. # For all pages with specific permissions, add their ancestors as
  2539. # explorable. This will allow deeply nested pages to be accessed in the
  2540. # explorer. For example, in the hierarchy A>B>C>D where the user has
  2541. # 'edit' access on D, they will be able to navigate to D without having
  2542. # explicit access to A, B or C.
  2543. page_permissions = Page.objects.filter(group_permissions__in=self.permissions)
  2544. for page in page_permissions:
  2545. explorable_pages |= page.get_ancestors()
  2546. # Remove unnecessary top-level ancestors that the user has no access to
  2547. fca_page = page_permissions.first_common_ancestor()
  2548. explorable_pages = explorable_pages.filter(path__startswith=fca_page.path)
  2549. return explorable_pages
  2550. def editable_pages(self):
  2551. """Return a queryset of the pages that this user has permission to edit"""
  2552. # Deal with the trivial cases first...
  2553. if not self.user.is_active:
  2554. return Page.objects.none()
  2555. if self.user.is_superuser:
  2556. return Page.objects.all()
  2557. editable_pages = Page.objects.none()
  2558. for perm in self.permissions.filter(permission_type="add"):
  2559. # user has edit permission on any subpage of perm.page
  2560. # (including perm.page itself) that is owned by them
  2561. editable_pages |= Page.objects.descendant_of(
  2562. perm.page, inclusive=True
  2563. ).filter(owner=self.user)
  2564. for perm in self.permissions.filter(permission_type="edit"):
  2565. # user has edit permission on any subpage of perm.page
  2566. # (including perm.page itself) regardless of owner
  2567. editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
  2568. return editable_pages
  2569. def can_edit_pages(self):
  2570. """Return True if the user has permission to edit any pages"""
  2571. return self.editable_pages().exists()
  2572. def publishable_pages(self):
  2573. """Return a queryset of the pages that this user has permission to publish"""
  2574. # Deal with the trivial cases first...
  2575. if not self.user.is_active:
  2576. return Page.objects.none()
  2577. if self.user.is_superuser:
  2578. return Page.objects.all()
  2579. publishable_pages = Page.objects.none()
  2580. for perm in self.permissions.filter(permission_type="publish"):
  2581. # user has publish permission on any subpage of perm.page
  2582. # (including perm.page itself)
  2583. publishable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
  2584. return publishable_pages
  2585. def can_publish_pages(self):
  2586. """Return True if the user has permission to publish any pages"""
  2587. return self.publishable_pages().exists()
  2588. def can_remove_locks(self):
  2589. """Returns True if the user has permission to unlock pages they have not locked"""
  2590. if self.user.is_superuser:
  2591. return True
  2592. if not self.user.is_active:
  2593. return False
  2594. else:
  2595. return self.permissions.filter(permission_type="unlock").exists()
  2596. class PagePermissionTester:
  2597. def __init__(self, user_perms, page):
  2598. self.user = user_perms.user
  2599. self.user_perms = user_perms
  2600. self.page = page
  2601. self.page_is_root = page.depth == 1 # Equivalent to page.is_root()
  2602. if self.user.is_active and not self.user.is_superuser:
  2603. self.permissions = {
  2604. perm.permission_type
  2605. for perm in user_perms.permissions
  2606. if self.page.path.startswith(perm.page.path)
  2607. }
  2608. def user_has_lock(self):
  2609. return self.page.locked_by_id == self.user.pk
  2610. def page_locked(self):
  2611. lock = self.page.get_lock()
  2612. return lock and lock.for_user(self.user)
  2613. def can_add_subpage(self):
  2614. if not self.user.is_active:
  2615. return False
  2616. specific_class = self.page.specific_class
  2617. if specific_class is None or not specific_class.creatable_subpage_models():
  2618. return False
  2619. return self.user.is_superuser or ("add" in self.permissions)
  2620. def can_edit(self):
  2621. if not self.user.is_active:
  2622. return False
  2623. if (
  2624. self.page_is_root
  2625. ): # root node is not a page and can never be edited, even by superusers
  2626. return False
  2627. if self.user.is_superuser:
  2628. return True
  2629. if "edit" in self.permissions:
  2630. return True
  2631. if "add" in self.permissions and self.page.owner_id == self.user.pk:
  2632. return True
  2633. current_workflow_task = self.page.current_workflow_task
  2634. if current_workflow_task:
  2635. if current_workflow_task.user_can_access_editor(self.page, self.user):
  2636. return True
  2637. return False
  2638. def can_delete(self, ignore_bulk=False):
  2639. if not self.user.is_active:
  2640. return False
  2641. if (
  2642. self.page_is_root
  2643. ): # root node is not a page and can never be deleted, even by superusers
  2644. return False
  2645. if self.user.is_superuser:
  2646. # superusers require no further checks
  2647. return True
  2648. # if the user does not have bulk_delete permission, they may only delete leaf pages
  2649. if (
  2650. "bulk_delete" not in self.permissions
  2651. and not self.page.is_leaf()
  2652. and not ignore_bulk
  2653. ):
  2654. return False
  2655. if "edit" in self.permissions:
  2656. # if the user does not have publish permission, we also need to confirm that there
  2657. # are no published pages here
  2658. if "publish" not in self.permissions:
  2659. pages_to_delete = self.page.get_descendants(inclusive=True)
  2660. if pages_to_delete.live().exists():
  2661. return False
  2662. return True
  2663. elif "add" in self.permissions:
  2664. pages_to_delete = self.page.get_descendants(inclusive=True)
  2665. if "publish" in self.permissions:
  2666. # we don't care about live state, but all pages must be owned by this user
  2667. # (i.e. eliminating pages owned by this user must give us the empty set)
  2668. return not pages_to_delete.exclude(owner=self.user).exists()
  2669. else:
  2670. # all pages must be owned by this user and non-live
  2671. # (i.e. eliminating non-live pages owned by this user must give us the empty set)
  2672. return not pages_to_delete.exclude(live=False, owner=self.user).exists()
  2673. else:
  2674. return False
  2675. def can_unpublish(self):
  2676. if not self.user.is_active:
  2677. return False
  2678. if (not self.page.live) or self.page_is_root:
  2679. return False
  2680. if self.page_locked():
  2681. return False
  2682. return self.user.is_superuser or ("publish" in self.permissions)
  2683. def can_publish(self):
  2684. if not self.user.is_active:
  2685. return False
  2686. if self.page_is_root:
  2687. return False
  2688. return self.user.is_superuser or ("publish" in self.permissions)
  2689. def can_submit_for_moderation(self):
  2690. return (
  2691. not self.page_locked()
  2692. and self.page.has_workflow
  2693. and not self.page.workflow_in_progress
  2694. )
  2695. def can_set_view_restrictions(self):
  2696. return self.can_publish()
  2697. def can_unschedule(self):
  2698. return self.can_publish()
  2699. def can_lock(self):
  2700. if self.user.is_superuser:
  2701. return True
  2702. current_workflow_task = self.page.current_workflow_task
  2703. if current_workflow_task:
  2704. return current_workflow_task.user_can_lock(self.page, self.user)
  2705. if "lock" in self.permissions:
  2706. return True
  2707. return False
  2708. def can_unlock(self):
  2709. if self.user.is_superuser:
  2710. return True
  2711. if self.user_has_lock():
  2712. return True
  2713. current_workflow_task = self.page.current_workflow_task
  2714. if current_workflow_task:
  2715. return current_workflow_task.user_can_unlock(self.page, self.user)
  2716. if "unlock" in self.permissions:
  2717. return True
  2718. return False
  2719. def can_publish_subpage(self):
  2720. """
  2721. Niggly special case for creating and publishing a page in one go.
  2722. Differs from can_publish in that we want to be able to publish subpages of root, but not
  2723. to be able to publish root itself. (Also, can_publish_subpage returns false if the page
  2724. does not allow subpages at all.)
  2725. """
  2726. if not self.user.is_active:
  2727. return False
  2728. specific_class = self.page.specific_class
  2729. if specific_class is None or not specific_class.creatable_subpage_models():
  2730. return False
  2731. return self.user.is_superuser or ("publish" in self.permissions)
  2732. def can_reorder_children(self):
  2733. """
  2734. Keep reorder permissions the same as publishing, since it immediately affects published pages
  2735. (and the use-cases for a non-admin needing to do it are fairly obscure...)
  2736. """
  2737. return self.can_publish_subpage()
  2738. def can_move(self):
  2739. """
  2740. Moving a page should be logically equivalent to deleting and re-adding it (and all its children).
  2741. As such, the permission test for 'can this be moved at all?' should be the same as for deletion.
  2742. (Further constraints will then apply on where it can be moved *to*.)
  2743. """
  2744. return self.can_delete(ignore_bulk=True)
  2745. def can_copy(self):
  2746. return not self.page_is_root
  2747. def can_move_to(self, destination):
  2748. # reject the logically impossible cases first
  2749. if self.page == destination or destination.is_descendant_of(self.page):
  2750. return False
  2751. # reject moves that are forbidden by subpage_types / parent_page_types rules
  2752. # (these rules apply to superusers too)
  2753. if not self.page.specific.can_move_to(destination):
  2754. return False
  2755. # shortcut the trivial 'everything' / 'nothing' permissions
  2756. if not self.user.is_active:
  2757. return False
  2758. if self.user.is_superuser:
  2759. return True
  2760. # check that the page can be moved at all
  2761. if not self.can_move():
  2762. return False
  2763. # Inspect permissions on the destination
  2764. destination_perms = self.user_perms.for_page(destination)
  2765. # we always need at least add permission in the target
  2766. if "add" not in destination_perms.permissions:
  2767. return False
  2768. if self.page.live or self.page.get_descendants().filter(live=True).exists():
  2769. # moving this page will entail publishing within the destination section
  2770. return "publish" in destination_perms.permissions
  2771. else:
  2772. # no publishing required, so the already-tested 'add' permission is sufficient
  2773. return True
  2774. def can_copy_to(self, destination, recursive=False):
  2775. # reject the logically impossible cases first
  2776. # recursive can't copy to the same tree otherwise it will be on infinite loop
  2777. if recursive and (
  2778. self.page == destination or destination.is_descendant_of(self.page)
  2779. ):
  2780. return False
  2781. # reject inactive users early
  2782. if not self.user.is_active:
  2783. return False
  2784. # reject early if pages of this type cannot be created at the destination
  2785. if not self.page.specific_class.can_create_at(destination):
  2786. return False
  2787. # skip permission checking for super users
  2788. if self.user.is_superuser:
  2789. return True
  2790. # Inspect permissions on the destination
  2791. destination_perms = self.user_perms.for_page(destination)
  2792. if not destination.specific_class.creatable_subpage_models():
  2793. return False
  2794. # we always need at least add permission in the target
  2795. if "add" not in destination_perms.permissions:
  2796. return False
  2797. return True
  2798. def can_view_revisions(self):
  2799. return not self.page_is_root
  2800. class PageViewRestriction(BaseViewRestriction):
  2801. page = models.ForeignKey(
  2802. "Page",
  2803. verbose_name=_("page"),
  2804. related_name="view_restrictions",
  2805. on_delete=models.CASCADE,
  2806. )
  2807. passed_view_restrictions_session_key = "passed_page_view_restrictions"
  2808. class Meta:
  2809. verbose_name = _("page view restriction")
  2810. verbose_name_plural = _("page view restrictions")
  2811. def save(self, user=None, **kwargs):
  2812. """
  2813. Custom save handler to include logging.
  2814. :param user: the user add/updating the view restriction
  2815. :param specific_instance: the specific model instance the restriction applies to
  2816. """
  2817. specific_instance = self.page.specific
  2818. is_new = self.id is None
  2819. super().save(**kwargs)
  2820. if specific_instance:
  2821. log(
  2822. instance=specific_instance,
  2823. action="wagtail.view_restriction.create"
  2824. if is_new
  2825. else "wagtail.view_restriction.edit",
  2826. user=user,
  2827. data={
  2828. "restriction": {
  2829. "type": self.restriction_type,
  2830. "title": force_str(
  2831. dict(self.RESTRICTION_CHOICES).get(self.restriction_type)
  2832. ),
  2833. }
  2834. },
  2835. )
  2836. def delete(self, user=None, **kwargs):
  2837. """
  2838. Custom delete handler to aid in logging
  2839. :param user: the user removing the view restriction
  2840. :param specific_instance: the specific model instance the restriction applies to
  2841. """
  2842. specific_instance = self.page.specific
  2843. if specific_instance:
  2844. log(
  2845. instance=specific_instance,
  2846. action="wagtail.view_restriction.delete",
  2847. user=user,
  2848. data={
  2849. "restriction": {
  2850. "type": self.restriction_type,
  2851. "title": force_str(
  2852. dict(self.RESTRICTION_CHOICES).get(self.restriction_type)
  2853. ),
  2854. }
  2855. },
  2856. )
  2857. return super().delete(**kwargs)
  2858. class WorkflowPage(models.Model):
  2859. page = models.OneToOneField(
  2860. "Page",
  2861. verbose_name=_("page"),
  2862. on_delete=models.CASCADE,
  2863. primary_key=True,
  2864. unique=True,
  2865. )
  2866. workflow = models.ForeignKey(
  2867. "Workflow",
  2868. related_name="workflow_pages",
  2869. verbose_name=_("workflow"),
  2870. on_delete=models.CASCADE,
  2871. )
  2872. def get_pages(self):
  2873. """
  2874. Returns a queryset of pages that are affected by this WorkflowPage link.
  2875. This includes all descendants of the page excluding any that have other WorkflowPages.
  2876. """
  2877. descendant_pages = Page.objects.descendant_of(self.page, inclusive=True)
  2878. descendant_workflow_pages = WorkflowPage.objects.filter(
  2879. page_id__in=descendant_pages.values_list("id", flat=True)
  2880. ).exclude(pk=self.pk)
  2881. for path, depth in descendant_workflow_pages.values_list(
  2882. "page__path", "page__depth"
  2883. ):
  2884. descendant_pages = descendant_pages.exclude(
  2885. path__startswith=path, depth__gte=depth
  2886. )
  2887. return descendant_pages
  2888. class Meta:
  2889. verbose_name = _("workflow page")
  2890. verbose_name_plural = _("workflow pages")
  2891. class WorkflowTask(Orderable):
  2892. workflow = ParentalKey(
  2893. "Workflow",
  2894. on_delete=models.CASCADE,
  2895. verbose_name=_("workflow_tasks"),
  2896. related_name="workflow_tasks",
  2897. )
  2898. task = models.ForeignKey(
  2899. "Task",
  2900. on_delete=models.CASCADE,
  2901. verbose_name=_("task"),
  2902. related_name="workflow_tasks",
  2903. limit_choices_to={"active": True},
  2904. )
  2905. class Meta(Orderable.Meta):
  2906. unique_together = [("workflow", "task")]
  2907. verbose_name = _("workflow task order")
  2908. verbose_name_plural = _("workflow task orders")
  2909. class TaskManager(models.Manager):
  2910. def active(self):
  2911. return self.filter(active=True)
  2912. class Task(models.Model):
  2913. name = models.CharField(max_length=255, verbose_name=_("name"))
  2914. content_type = models.ForeignKey(
  2915. ContentType,
  2916. verbose_name=_("content type"),
  2917. related_name="wagtail_tasks",
  2918. on_delete=models.CASCADE,
  2919. )
  2920. active = models.BooleanField(
  2921. verbose_name=_("active"),
  2922. default=True,
  2923. help_text=_(
  2924. "Active tasks can be added to workflows. Deactivating a task does not remove it from existing workflows."
  2925. ),
  2926. )
  2927. objects = TaskManager()
  2928. admin_form_fields = ["name"]
  2929. admin_form_readonly_on_edit_fields = ["name"]
  2930. def __init__(self, *args, **kwargs):
  2931. super().__init__(*args, **kwargs)
  2932. if not self.id:
  2933. # this model is being newly created
  2934. # rather than retrieved from the db;
  2935. if not self.content_type_id:
  2936. # set content type to correctly represent the model class
  2937. # that this was created as
  2938. self.content_type = ContentType.objects.get_for_model(self)
  2939. def __str__(self):
  2940. return self.name
  2941. @property
  2942. def workflows(self):
  2943. """Returns all ``Workflow`` instances that use this task"""
  2944. return Workflow.objects.filter(workflow_tasks__task=self)
  2945. @property
  2946. def active_workflows(self):
  2947. """Return a ``QuerySet``` of active workflows that this task is part of"""
  2948. return Workflow.objects.active().filter(workflow_tasks__task=self)
  2949. @classmethod
  2950. def get_verbose_name(cls):
  2951. """
  2952. Returns the human-readable "verbose name" of this task model e.g "Group approval task".
  2953. """
  2954. # This is similar to doing cls._meta.verbose_name.title()
  2955. # except this doesn't convert any characters to lowercase
  2956. return capfirst(cls._meta.verbose_name)
  2957. @cached_property
  2958. def specific(self):
  2959. """
  2960. Return this Task in its most specific subclassed form.
  2961. """
  2962. # the ContentType.objects manager keeps a cache, so this should potentially
  2963. # avoid a database lookup over doing self.content_type. I think.
  2964. content_type = ContentType.objects.get_for_id(self.content_type_id)
  2965. model_class = content_type.model_class()
  2966. if model_class is None:
  2967. # Cannot locate a model class for this content type. This might happen
  2968. # if the codebase and database are out of sync (e.g. the model exists
  2969. # on a different git branch and we haven't rolled back migrations before
  2970. # switching branches); if so, the best we can do is return the page
  2971. # unchanged.
  2972. return self
  2973. elif isinstance(self, model_class):
  2974. # self is already the an instance of the most specific class
  2975. return self
  2976. else:
  2977. return content_type.get_object_for_this_type(id=self.id)
  2978. task_state_class = None
  2979. @classmethod
  2980. def get_task_state_class(self):
  2981. return self.task_state_class or TaskState
  2982. def start(self, workflow_state, user=None):
  2983. """Start this task on the provided workflow state by creating an instance of TaskState"""
  2984. task_state = self.get_task_state_class()(workflow_state=workflow_state)
  2985. task_state.status = TaskState.STATUS_IN_PROGRESS
  2986. task_state.page_revision = workflow_state.page.get_latest_revision()
  2987. task_state.task = self
  2988. task_state.save()
  2989. task_submitted.send(
  2990. sender=task_state.specific.__class__,
  2991. instance=task_state.specific,
  2992. user=user,
  2993. )
  2994. return task_state
  2995. @transaction.atomic
  2996. def on_action(self, task_state, user, action_name, **kwargs):
  2997. """Performs an action on a task state determined by the ``action_name`` string passed"""
  2998. if action_name == "approve":
  2999. task_state.approve(user=user, **kwargs)
  3000. elif action_name == "reject":
  3001. task_state.reject(user=user, **kwargs)
  3002. def user_can_access_editor(self, page, user):
  3003. """Returns True if a user who would not normally be able to access the editor for the page should be able to if the page is currently on this task.
  3004. Note that returning False does not remove permissions from users who would otherwise have them."""
  3005. return False
  3006. def page_locked_for_user(self, page, user):
  3007. """Returns True if the page should be locked to a given user's edits. This can be used to prevent editing by non-reviewers."""
  3008. return False
  3009. def user_can_lock(self, page, user):
  3010. """Returns True if a user who would not normally be able to lock the page should be able to if the page is currently on this task.
  3011. Note that returning False does not remove permissions from users who would otherwise have them."""
  3012. return False
  3013. def user_can_unlock(self, page, user):
  3014. """Returns True if a user who would not normally be able to unlock the page should be able to if the page is currently on this task.
  3015. Note that returning False does not remove permissions from users who would otherwise have them."""
  3016. return False
  3017. def get_actions(self, page, user):
  3018. """
  3019. Get the list of action strings (name, verbose_name, whether the action requires additional data - see
  3020. ``get_form_for_action``) for actions the current user can perform for this task on the given page.
  3021. These strings should be the same as those able to be passed to ``on_action``
  3022. """
  3023. return []
  3024. def get_form_for_action(self, action):
  3025. return TaskStateCommentForm
  3026. def get_template_for_action(self, action):
  3027. return ""
  3028. def get_task_states_user_can_moderate(self, user, **kwargs):
  3029. """Returns a ``QuerySet`` of the task states the current user can moderate"""
  3030. return TaskState.objects.none()
  3031. @classmethod
  3032. def get_description(cls):
  3033. """Returns the task description."""
  3034. return ""
  3035. @transaction.atomic
  3036. def deactivate(self, user=None):
  3037. """Set ``active`` to False and cancel all in progress task states linked to this task"""
  3038. self.active = False
  3039. self.save()
  3040. in_progress_states = TaskState.objects.filter(
  3041. task=self, status=TaskState.STATUS_IN_PROGRESS
  3042. )
  3043. for state in in_progress_states:
  3044. state.cancel(user=user)
  3045. class Meta:
  3046. verbose_name = _("task")
  3047. verbose_name_plural = _("tasks")
  3048. class WorkflowManager(models.Manager):
  3049. def active(self):
  3050. return self.filter(active=True)
  3051. class Workflow(ClusterableModel):
  3052. name = models.CharField(max_length=255, verbose_name=_("name"))
  3053. active = models.BooleanField(
  3054. verbose_name=_("active"),
  3055. default=True,
  3056. help_text=_(
  3057. "Active workflows can be added to pages. Deactivating a workflow does not remove it from existing pages."
  3058. ),
  3059. )
  3060. objects = WorkflowManager()
  3061. def __str__(self):
  3062. return self.name
  3063. @property
  3064. def tasks(self):
  3065. """Returns all ``Task`` instances linked to this workflow"""
  3066. return Task.objects.filter(workflow_tasks__workflow=self).order_by(
  3067. "workflow_tasks__sort_order"
  3068. )
  3069. @transaction.atomic
  3070. def start(self, page, user):
  3071. """Initiates a workflow by creating an instance of ``WorkflowState``"""
  3072. state = WorkflowState(
  3073. page=page,
  3074. workflow=self,
  3075. status=WorkflowState.STATUS_IN_PROGRESS,
  3076. requested_by=user,
  3077. )
  3078. state.save()
  3079. state.update(user=user)
  3080. workflow_submitted.send(sender=state.__class__, instance=state, user=user)
  3081. next_task_data = None
  3082. if state.current_task_state:
  3083. next_task_data = {
  3084. "id": state.current_task_state.task.id,
  3085. "title": state.current_task_state.task.name,
  3086. }
  3087. log(
  3088. instance=page,
  3089. action="wagtail.workflow.start",
  3090. data={
  3091. "workflow": {
  3092. "id": self.id,
  3093. "title": self.name,
  3094. "status": state.status,
  3095. "next": next_task_data,
  3096. "task_state_id": state.current_task_state.id
  3097. if state.current_task_state
  3098. else None,
  3099. }
  3100. },
  3101. revision=page.get_latest_revision(),
  3102. user=user,
  3103. )
  3104. return state
  3105. @transaction.atomic
  3106. def deactivate(self, user=None):
  3107. """Sets the workflow as inactive, and cancels all in progress instances of ``WorkflowState`` linked to this workflow"""
  3108. self.active = False
  3109. in_progress_states = WorkflowState.objects.filter(
  3110. workflow=self, status=WorkflowState.STATUS_IN_PROGRESS
  3111. )
  3112. for state in in_progress_states:
  3113. state.cancel(user=user)
  3114. WorkflowPage.objects.filter(workflow=self).delete()
  3115. self.save()
  3116. def all_pages(self):
  3117. """
  3118. Returns a queryset of all the pages that this Workflow applies to.
  3119. """
  3120. pages = Page.objects.none()
  3121. for workflow_page in self.workflow_pages.all():
  3122. pages |= workflow_page.get_pages()
  3123. return pages
  3124. class Meta:
  3125. verbose_name = _("workflow")
  3126. verbose_name_plural = _("workflows")
  3127. class GroupApprovalTask(Task):
  3128. groups = models.ManyToManyField(
  3129. Group,
  3130. verbose_name=_("groups"),
  3131. help_text=_(
  3132. "Pages at this step in a workflow will be moderated or approved by these groups of users"
  3133. ),
  3134. )
  3135. admin_form_fields = Task.admin_form_fields + ["groups"]
  3136. admin_form_widgets = {
  3137. "groups": forms.CheckboxSelectMultiple,
  3138. }
  3139. def start(self, workflow_state, user=None):
  3140. if workflow_state.page.locked_by:
  3141. # If the person who locked the page isn't in one of the groups, unlock the page
  3142. if not workflow_state.page.locked_by.groups.filter(
  3143. id__in=self.groups.all()
  3144. ).exists():
  3145. workflow_state.page.locked = False
  3146. workflow_state.page.locked_by = None
  3147. workflow_state.page.locked_at = None
  3148. workflow_state.page.save(
  3149. update_fields=["locked", "locked_by", "locked_at"]
  3150. )
  3151. return super().start(workflow_state, user=user)
  3152. def user_can_access_editor(self, page, user):
  3153. return (
  3154. self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
  3155. )
  3156. def page_locked_for_user(self, page, user):
  3157. return not (
  3158. self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
  3159. )
  3160. def user_can_lock(self, page, user):
  3161. return self.groups.filter(id__in=user.groups.all()).exists()
  3162. def user_can_unlock(self, page, user):
  3163. return False
  3164. def get_actions(self, page, user):
  3165. if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
  3166. return [
  3167. ("reject", _("Request changes"), True),
  3168. ("approve", _("Approve"), False),
  3169. ("approve", _("Approve with comment"), True),
  3170. ]
  3171. return []
  3172. def get_task_states_user_can_moderate(self, user, **kwargs):
  3173. if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
  3174. return TaskState.objects.filter(
  3175. status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr
  3176. )
  3177. else:
  3178. return TaskState.objects.none()
  3179. @classmethod
  3180. def get_description(cls):
  3181. return _("Members of the chosen Wagtail Groups can approve this task")
  3182. class Meta:
  3183. verbose_name = _("Group approval task")
  3184. verbose_name_plural = _("Group approval tasks")
  3185. class WorkflowStateManager(models.Manager):
  3186. def active(self):
  3187. """
  3188. Filters to only STATUS_IN_PROGRESS and STATUS_NEEDS_CHANGES WorkflowStates
  3189. """
  3190. return self.filter(
  3191. Q(status=WorkflowState.STATUS_IN_PROGRESS)
  3192. | Q(status=WorkflowState.STATUS_NEEDS_CHANGES)
  3193. )
  3194. class WorkflowState(models.Model):
  3195. """Tracks the status of a started Workflow on a Page."""
  3196. STATUS_IN_PROGRESS = "in_progress"
  3197. STATUS_APPROVED = "approved"
  3198. STATUS_NEEDS_CHANGES = "needs_changes"
  3199. STATUS_CANCELLED = "cancelled"
  3200. STATUS_CHOICES = (
  3201. (STATUS_IN_PROGRESS, _("In progress")),
  3202. (STATUS_APPROVED, _("Approved")),
  3203. (STATUS_NEEDS_CHANGES, _("Needs changes")),
  3204. (STATUS_CANCELLED, _("Cancelled")),
  3205. )
  3206. page = models.ForeignKey(
  3207. "Page",
  3208. on_delete=models.CASCADE,
  3209. verbose_name=_("page"),
  3210. related_name="workflow_states",
  3211. )
  3212. workflow = models.ForeignKey(
  3213. "Workflow",
  3214. on_delete=models.CASCADE,
  3215. verbose_name=_("workflow"),
  3216. related_name="workflow_states",
  3217. )
  3218. status = models.fields.CharField(
  3219. choices=STATUS_CHOICES,
  3220. verbose_name=_("status"),
  3221. max_length=50,
  3222. default=STATUS_IN_PROGRESS,
  3223. )
  3224. created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at"))
  3225. requested_by = models.ForeignKey(
  3226. settings.AUTH_USER_MODEL,
  3227. verbose_name=_("requested by"),
  3228. null=True,
  3229. blank=True,
  3230. editable=True,
  3231. on_delete=models.SET_NULL,
  3232. related_name="requested_workflows",
  3233. )
  3234. current_task_state = models.OneToOneField(
  3235. "TaskState",
  3236. on_delete=models.SET_NULL,
  3237. null=True,
  3238. blank=True,
  3239. verbose_name=_("current task state"),
  3240. )
  3241. # allows a custom function to be called on finishing the Workflow successfully.
  3242. on_finish = import_string(
  3243. getattr(
  3244. settings,
  3245. "WAGTAIL_FINISH_WORKFLOW_ACTION",
  3246. "wagtail.workflows.publish_workflow_state",
  3247. )
  3248. )
  3249. objects = WorkflowStateManager()
  3250. def clean(self):
  3251. super().clean()
  3252. if self.status in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
  3253. # The unique constraint is conditional, and so not supported on the MySQL backend - so an additional check is done here
  3254. if (
  3255. WorkflowState.objects.active()
  3256. .filter(page=self.page)
  3257. .exclude(pk=self.pk)
  3258. .exists()
  3259. ):
  3260. raise ValidationError(
  3261. _(
  3262. "There may only be one in progress or needs changes workflow state per page."
  3263. )
  3264. )
  3265. def save(self, *args, **kwargs):
  3266. self.full_clean()
  3267. return super().save(*args, **kwargs)
  3268. def __str__(self):
  3269. return _("Workflow '{0}' on Page '{1}': {2}").format(
  3270. self.workflow, self.page, self.status
  3271. )
  3272. def resume(self, user=None):
  3273. """Put a STATUS_NEEDS_CHANGES workflow state back into STATUS_IN_PROGRESS, and restart the current task"""
  3274. if self.status != self.STATUS_NEEDS_CHANGES:
  3275. raise PermissionDenied
  3276. revision = self.current_task_state.page_revision
  3277. current_task_state = self.current_task_state
  3278. self.current_task_state = None
  3279. self.status = self.STATUS_IN_PROGRESS
  3280. self.save()
  3281. log(
  3282. instance=self.page.specific,
  3283. action="wagtail.workflow.resume",
  3284. data={
  3285. "workflow": {
  3286. "id": self.workflow_id,
  3287. "title": self.workflow.name,
  3288. "status": self.status,
  3289. "task_state_id": current_task_state.id,
  3290. "task": {
  3291. "id": current_task_state.task.id,
  3292. "title": current_task_state.task.name,
  3293. },
  3294. }
  3295. },
  3296. revision=revision,
  3297. user=user,
  3298. )
  3299. return self.update(user=user, next_task=current_task_state.task)
  3300. def user_can_cancel(self, user):
  3301. if self.page.locked and self.page.locked_by != user:
  3302. return False
  3303. return (
  3304. user == self.requested_by
  3305. or user == self.page.owner
  3306. or (
  3307. self.current_task_state
  3308. and self.current_task_state.status
  3309. == self.current_task_state.STATUS_IN_PROGRESS
  3310. and "approve"
  3311. in [
  3312. action[0]
  3313. for action in self.current_task_state.task.get_actions(
  3314. self.page, user
  3315. )
  3316. ]
  3317. )
  3318. )
  3319. def update(self, user=None, next_task=None):
  3320. """Checks the status of the current task, and progresses (or ends) the workflow if appropriate. If the workflow progresses,
  3321. next_task will be used to start a specific task next if provided."""
  3322. if self.status != self.STATUS_IN_PROGRESS:
  3323. # Updating a completed or cancelled workflow should have no effect
  3324. return
  3325. try:
  3326. current_status = self.current_task_state.status
  3327. except AttributeError:
  3328. current_status = None
  3329. if current_status == TaskState.STATUS_REJECTED:
  3330. self.status = self.STATUS_NEEDS_CHANGES
  3331. self.save()
  3332. workflow_rejected.send(sender=self.__class__, instance=self, user=user)
  3333. else:
  3334. if not next_task:
  3335. next_task = self.get_next_task()
  3336. if next_task:
  3337. if (
  3338. (not self.current_task_state)
  3339. or self.current_task_state.status
  3340. != self.current_task_state.STATUS_IN_PROGRESS
  3341. ):
  3342. # if not on a task, or the next task to move to is not the current task (ie current task's status is
  3343. # not STATUS_IN_PROGRESS), move to the next task
  3344. self.current_task_state = next_task.specific.start(self, user=user)
  3345. self.save()
  3346. # if task has auto-approved, update the workflow again
  3347. if (
  3348. self.current_task_state.status
  3349. != self.current_task_state.STATUS_IN_PROGRESS
  3350. ):
  3351. self.update(user=user)
  3352. # otherwise, continue on the current task
  3353. else:
  3354. # if there is no uncompleted task, finish the workflow.
  3355. self.finish(user=user)
  3356. @property
  3357. def successful_task_states(self):
  3358. successful_task_states = self.task_states.filter(
  3359. Q(status=TaskState.STATUS_APPROVED) | Q(status=TaskState.STATUS_SKIPPED)
  3360. )
  3361. if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
  3362. successful_task_states = successful_task_states.filter(
  3363. page_revision=self.page.get_latest_revision()
  3364. )
  3365. return successful_task_states
  3366. def get_next_task(self):
  3367. """Returns the next active task, which has not been either approved or skipped"""
  3368. return (
  3369. Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)
  3370. .exclude(task_states__in=self.successful_task_states)
  3371. .order_by("workflow_tasks__sort_order")
  3372. .first()
  3373. )
  3374. def cancel(self, user=None):
  3375. """Cancels the workflow state"""
  3376. if self.status not in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
  3377. raise PermissionDenied
  3378. self.status = self.STATUS_CANCELLED
  3379. self.save()
  3380. log(
  3381. instance=self.page.specific,
  3382. action="wagtail.workflow.cancel",
  3383. data={
  3384. "workflow": {
  3385. "id": self.workflow_id,
  3386. "title": self.workflow.name,
  3387. "status": self.status,
  3388. "task_state_id": self.current_task_state.id,
  3389. "task": {
  3390. "id": self.current_task_state.task.id,
  3391. "title": self.current_task_state.task.name,
  3392. },
  3393. }
  3394. },
  3395. revision=self.current_task_state.page_revision,
  3396. user=user,
  3397. )
  3398. for state in self.task_states.filter(status=TaskState.STATUS_IN_PROGRESS):
  3399. # Cancel all in progress task states
  3400. state.specific.cancel(user=user)
  3401. workflow_cancelled.send(sender=self.__class__, instance=self, user=user)
  3402. @transaction.atomic
  3403. def finish(self, user=None):
  3404. """Finishes a successful in progress workflow, marking it as approved and performing the ``on_finish`` action"""
  3405. if self.status != self.STATUS_IN_PROGRESS:
  3406. raise PermissionDenied
  3407. self.status = self.STATUS_APPROVED
  3408. self.save()
  3409. self.on_finish(user=user)
  3410. workflow_approved.send(sender=self.__class__, instance=self, user=user)
  3411. def copy_approved_task_states_to_revision(self, revision):
  3412. """This creates copies of previously approved task states with page_revision set to a different revision."""
  3413. approved_states = TaskState.objects.filter(
  3414. workflow_state=self, status=TaskState.STATUS_APPROVED
  3415. )
  3416. for state in approved_states:
  3417. state.copy(update_attrs={"page_revision": revision})
  3418. def revisions(self):
  3419. """Returns all page revisions associated with task states linked to the current workflow state"""
  3420. return Revision.page_revisions.filter(
  3421. object_id=str(self.page_id),
  3422. id__in=self.task_states.values_list("page_revision_id", flat=True),
  3423. ).defer("content")
  3424. def _get_applicable_task_states(self):
  3425. """Returns the set of task states whose status applies to the current revision"""
  3426. task_states = TaskState.objects.filter(workflow_state_id=self.id)
  3427. # If WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=True, this is only task states created on the current revision
  3428. if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
  3429. latest_revision_id = (
  3430. self.revisions()
  3431. .order_by("-created_at", "-id")
  3432. .values_list("id", flat=True)
  3433. .first()
  3434. )
  3435. task_states = task_states.filter(page_revision_id=latest_revision_id)
  3436. return task_states
  3437. def all_tasks_with_status(self):
  3438. """
  3439. Returns a list of Task objects that are linked with this workflow state's
  3440. workflow. The status of that task in this workflow state is annotated in the
  3441. `.status` field. And a displayable version of that status is annotated in the
  3442. `.status_display` field.
  3443. This is different to querying TaskState as it also returns tasks that haven't
  3444. been started yet (so won't have a TaskState).
  3445. """
  3446. # Get the set of task states whose status applies to the current revision
  3447. task_states = self._get_applicable_task_states()
  3448. tasks = list(
  3449. self.workflow.tasks.annotate(
  3450. status=Subquery(
  3451. task_states.filter(
  3452. task_id=OuterRef("id"),
  3453. )
  3454. .order_by("-started_at", "-id")
  3455. .values("status")[:1]
  3456. ),
  3457. )
  3458. )
  3459. # Manually annotate status_display
  3460. status_choices = dict(TaskState.STATUS_CHOICES)
  3461. for task in tasks:
  3462. task.status_display = status_choices.get(task.status, _("Not started"))
  3463. return tasks
  3464. def all_tasks_with_state(self):
  3465. """
  3466. Returns a list of Task objects that are linked with this WorkflowState's
  3467. workflow, and have the latest task state.
  3468. In a "Submit for moderation -> reject at step 1 -> resubmit -> accept" workflow, this ensures
  3469. the task list reflects the accept, rather than the reject.
  3470. """
  3471. task_states = self._get_applicable_task_states()
  3472. tasks = list(
  3473. self.workflow.tasks.annotate(
  3474. task_state_id=Subquery(
  3475. task_states.filter(
  3476. task_id=OuterRef("id"),
  3477. )
  3478. .order_by("-started_at", "-id")
  3479. .values("id")[:1]
  3480. ),
  3481. )
  3482. )
  3483. task_states = {task_state.id: task_state for task_state in task_states}
  3484. # Manually annotate task_state
  3485. for task in tasks:
  3486. task.task_state = task_states.get(task.task_state_id)
  3487. return tasks
  3488. @property
  3489. def is_active(self):
  3490. return self.status not in [self.STATUS_APPROVED, self.STATUS_CANCELLED]
  3491. @property
  3492. def is_at_final_task(self):
  3493. """Returns the next active task, which has not been either approved or skipped"""
  3494. last_task = (
  3495. Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)
  3496. .exclude(task_states__in=self.successful_task_states)
  3497. .order_by("workflow_tasks__sort_order")
  3498. .last()
  3499. )
  3500. return self.get_next_task() == last_task
  3501. class Meta:
  3502. verbose_name = _("Workflow state")
  3503. verbose_name_plural = _("Workflow states")
  3504. # prevent multiple STATUS_IN_PROGRESS/STATUS_NEEDS_CHANGES workflows for the same page. This is only supported by specific databases (e.g. Postgres, SQL Server), so is checked additionally on save.
  3505. constraints = [
  3506. models.UniqueConstraint(
  3507. fields=["page"],
  3508. condition=Q(status__in=("in_progress", "needs_changes")),
  3509. name="unique_in_progress_workflow",
  3510. )
  3511. ]
  3512. class TaskStateManager(models.Manager):
  3513. def reviewable_by(self, user):
  3514. tasks = Task.objects.filter(active=True)
  3515. states = TaskState.objects.none()
  3516. for task in tasks:
  3517. states = states | task.specific.get_task_states_user_can_moderate(user=user)
  3518. return states
  3519. class TaskState(models.Model):
  3520. """Tracks the status of a given Task for a particular page revision."""
  3521. STATUS_IN_PROGRESS = "in_progress"
  3522. STATUS_APPROVED = "approved"
  3523. STATUS_REJECTED = "rejected"
  3524. STATUS_SKIPPED = "skipped"
  3525. STATUS_CANCELLED = "cancelled"
  3526. STATUS_CHOICES = (
  3527. (STATUS_IN_PROGRESS, _("In progress")),
  3528. (STATUS_APPROVED, _("Approved")),
  3529. (STATUS_REJECTED, _("Rejected")),
  3530. (STATUS_SKIPPED, _("Skipped")),
  3531. (STATUS_CANCELLED, _("Cancelled")),
  3532. )
  3533. workflow_state = models.ForeignKey(
  3534. "WorkflowState",
  3535. on_delete=models.CASCADE,
  3536. verbose_name=_("workflow state"),
  3537. related_name="task_states",
  3538. )
  3539. page_revision = models.ForeignKey(
  3540. "Revision",
  3541. on_delete=models.CASCADE,
  3542. verbose_name=_("page revision"),
  3543. related_name="task_states",
  3544. )
  3545. task = models.ForeignKey(
  3546. "Task",
  3547. on_delete=models.CASCADE,
  3548. verbose_name=_("task"),
  3549. related_name="task_states",
  3550. )
  3551. status = models.fields.CharField(
  3552. choices=STATUS_CHOICES,
  3553. verbose_name=_("status"),
  3554. max_length=50,
  3555. default=STATUS_IN_PROGRESS,
  3556. )
  3557. started_at = models.DateTimeField(verbose_name=_("started at"), auto_now_add=True)
  3558. finished_at = models.DateTimeField(
  3559. verbose_name=_("finished at"), blank=True, null=True
  3560. )
  3561. finished_by = models.ForeignKey(
  3562. settings.AUTH_USER_MODEL,
  3563. verbose_name=_("finished by"),
  3564. null=True,
  3565. blank=True,
  3566. on_delete=models.SET_NULL,
  3567. related_name="finished_task_states",
  3568. )
  3569. comment = models.TextField(blank=True)
  3570. content_type = models.ForeignKey(
  3571. ContentType,
  3572. verbose_name=_("content type"),
  3573. related_name="wagtail_task_states",
  3574. on_delete=models.CASCADE,
  3575. )
  3576. exclude_fields_in_copy = []
  3577. default_exclude_fields_in_copy = ["id"]
  3578. objects = TaskStateManager()
  3579. def __init__(self, *args, **kwargs):
  3580. super().__init__(*args, **kwargs)
  3581. if not self.id:
  3582. # this model is being newly created
  3583. # rather than retrieved from the db;
  3584. if not self.content_type_id:
  3585. # set content type to correctly represent the model class
  3586. # that this was created as
  3587. self.content_type = ContentType.objects.get_for_model(self)
  3588. def __str__(self):
  3589. return _("Task '{0}' on Page Revision '{1}': {2}").format(
  3590. self.task, self.page_revision, self.status
  3591. )
  3592. @cached_property
  3593. def specific(self):
  3594. """
  3595. Return this TaskState in its most specific subclassed form.
  3596. """
  3597. # the ContentType.objects manager keeps a cache, so this should potentially
  3598. # avoid a database lookup over doing self.content_type. I think.
  3599. content_type = ContentType.objects.get_for_id(self.content_type_id)
  3600. model_class = content_type.model_class()
  3601. if model_class is None:
  3602. # Cannot locate a model class for this content type. This might happen
  3603. # if the codebase and database are out of sync (e.g. the model exists
  3604. # on a different git branch and we haven't rolled back migrations before
  3605. # switching branches); if so, the best we can do is return the page
  3606. # unchanged.
  3607. return self
  3608. elif isinstance(self, model_class):
  3609. # self is already the an instance of the most specific class
  3610. return self
  3611. else:
  3612. return content_type.get_object_for_this_type(id=self.id)
  3613. @transaction.atomic
  3614. def approve(self, user=None, update=True, comment=""):
  3615. """Approve the task state and update the workflow state"""
  3616. if self.status != self.STATUS_IN_PROGRESS:
  3617. raise PermissionDenied
  3618. self.status = self.STATUS_APPROVED
  3619. self.finished_at = timezone.now()
  3620. self.finished_by = user
  3621. self.comment = comment
  3622. self.save()
  3623. self.log_state_change_action(user, "approve")
  3624. if update:
  3625. self.workflow_state.update(user=user)
  3626. task_approved.send(
  3627. sender=self.specific.__class__, instance=self.specific, user=user
  3628. )
  3629. return self
  3630. @transaction.atomic
  3631. def reject(self, user=None, update=True, comment=""):
  3632. """Reject the task state and update the workflow state"""
  3633. if self.status != self.STATUS_IN_PROGRESS:
  3634. raise PermissionDenied
  3635. self.status = self.STATUS_REJECTED
  3636. self.finished_at = timezone.now()
  3637. self.finished_by = user
  3638. self.comment = comment
  3639. self.save()
  3640. self.log_state_change_action(user, "reject")
  3641. if update:
  3642. self.workflow_state.update(user=user)
  3643. task_rejected.send(
  3644. sender=self.specific.__class__, instance=self.specific, user=user
  3645. )
  3646. return self
  3647. @cached_property
  3648. def task_type_started_at(self):
  3649. """Finds the first chronological started_at for successive TaskStates - ie started_at if the task had not been restarted"""
  3650. task_states = (
  3651. TaskState.objects.filter(workflow_state=self.workflow_state)
  3652. .order_by("-started_at")
  3653. .select_related("task")
  3654. )
  3655. started_at = None
  3656. for task_state in task_states:
  3657. if task_state.task == self.task:
  3658. started_at = task_state.started_at
  3659. elif started_at:
  3660. break
  3661. return started_at
  3662. @transaction.atomic
  3663. def cancel(self, user=None, resume=False, comment=""):
  3664. """Cancel the task state and update the workflow state. If ``resume`` is set to True, then upon update the workflow state
  3665. is passed the current task as ``next_task``, causing it to start a new task state on the current task if possible"""
  3666. self.status = self.STATUS_CANCELLED
  3667. self.finished_at = timezone.now()
  3668. self.comment = comment
  3669. self.finished_by = user
  3670. self.save()
  3671. if resume:
  3672. self.workflow_state.update(user=user, next_task=self.task.specific)
  3673. else:
  3674. self.workflow_state.update(user=user)
  3675. task_cancelled.send(
  3676. sender=self.specific.__class__, instance=self.specific, user=user
  3677. )
  3678. return self
  3679. def copy(self, update_attrs=None, exclude_fields=None):
  3680. """Copy this task state, excluding the attributes in the ``exclude_fields`` list and updating any attributes to values
  3681. specified in the ``update_attrs`` dictionary of ``attribute``: ``new value`` pairs"""
  3682. exclude_fields = (
  3683. self.default_exclude_fields_in_copy
  3684. + self.exclude_fields_in_copy
  3685. + (exclude_fields or [])
  3686. )
  3687. instance, child_object_map = _copy(self.specific, exclude_fields, update_attrs)
  3688. instance.save()
  3689. _copy_m2m_relations(self, instance, exclude_fields=exclude_fields)
  3690. return instance
  3691. def get_comment(self):
  3692. """
  3693. Returns a string that is displayed in workflow history.
  3694. This could be a comment by the reviewer, or generated.
  3695. Use mark_safe to return HTML.
  3696. """
  3697. return self.comment
  3698. def log_state_change_action(self, user, action):
  3699. """Log the approval/rejection action"""
  3700. page = self.page_revision.as_object()
  3701. next_task = self.workflow_state.get_next_task()
  3702. next_task_data = None
  3703. if next_task:
  3704. next_task_data = {"id": next_task.id, "title": next_task.name}
  3705. log(
  3706. instance=page,
  3707. action="wagtail.workflow.{}".format(action),
  3708. user=user,
  3709. data={
  3710. "workflow": {
  3711. "id": self.workflow_state.workflow.id,
  3712. "title": self.workflow_state.workflow.name,
  3713. "status": self.status,
  3714. "task_state_id": self.id,
  3715. "task": {
  3716. "id": self.task.id,
  3717. "title": self.task.name,
  3718. },
  3719. "next": next_task_data,
  3720. },
  3721. "comment": self.get_comment(),
  3722. },
  3723. revision=self.page_revision,
  3724. )
  3725. class Meta:
  3726. verbose_name = _("Task state")
  3727. verbose_name_plural = _("Task states")
  3728. class PageLogEntryQuerySet(LogEntryQuerySet):
  3729. def get_content_type_ids(self):
  3730. # for reporting purposes, pages of all types are combined under a single "Page"
  3731. # object type
  3732. if self.exists():
  3733. return {ContentType.objects.get_for_model(Page).pk}
  3734. else:
  3735. return set()
  3736. def filter_on_content_type(self, content_type):
  3737. if content_type == ContentType.objects.get_for_model(Page):
  3738. return self
  3739. else:
  3740. return self.none()
  3741. class PageLogEntryManager(BaseLogEntryManager):
  3742. def get_queryset(self):
  3743. return PageLogEntryQuerySet(self.model, using=self._db)
  3744. def get_instance_title(self, instance):
  3745. return instance.specific_deferred.get_admin_display_title()
  3746. def log_action(self, instance, action, **kwargs):
  3747. kwargs.update(page=instance)
  3748. return super().log_action(instance, action, **kwargs)
  3749. def viewable_by_user(self, user):
  3750. q = Q(
  3751. page__in=UserPagePermissionsProxy(user)
  3752. .explorable_pages()
  3753. .values_list("pk", flat=True)
  3754. )
  3755. root_page_permissions = Page.get_first_root_node().permissions_for_user(user)
  3756. if (
  3757. user.is_superuser
  3758. or root_page_permissions.can_add_subpage()
  3759. or root_page_permissions.can_edit()
  3760. ):
  3761. # Include deleted entries
  3762. q = q | Q(
  3763. page_id__in=Subquery(
  3764. PageLogEntry.objects.filter(deleted=True).values("page_id")
  3765. )
  3766. )
  3767. return PageLogEntry.objects.filter(q)
  3768. class PageLogEntry(BaseLogEntry):
  3769. page = models.ForeignKey(
  3770. "wagtailcore.Page",
  3771. on_delete=models.DO_NOTHING,
  3772. db_constraint=False,
  3773. related_name="+",
  3774. )
  3775. objects = PageLogEntryManager()
  3776. class Meta:
  3777. ordering = ["-timestamp", "-id"]
  3778. verbose_name = _("page log entry")
  3779. verbose_name_plural = _("page log entries")
  3780. def __str__(self):
  3781. return "PageLogEntry %d: '%s' on '%s' with id %s" % (
  3782. self.pk,
  3783. self.action,
  3784. self.object_verbose_name(),
  3785. self.page_id,
  3786. )
  3787. @cached_property
  3788. def object_id(self):
  3789. return self.page_id
  3790. @cached_property
  3791. def message(self):
  3792. # for page log entries, the 'edit' action should show as 'Draft saved'
  3793. if self.action == "wagtail.edit":
  3794. return _("Draft saved")
  3795. else:
  3796. return super().message
  3797. class Comment(ClusterableModel):
  3798. """
  3799. A comment on a field, or a field within a streamfield block
  3800. """
  3801. page = ParentalKey(
  3802. Page, on_delete=models.CASCADE, related_name=COMMENTS_RELATION_NAME
  3803. )
  3804. user = models.ForeignKey(
  3805. settings.AUTH_USER_MODEL,
  3806. on_delete=models.CASCADE,
  3807. related_name=COMMENTS_RELATION_NAME,
  3808. )
  3809. text = models.TextField()
  3810. contentpath = models.TextField()
  3811. # This stores the field or field within a streamfield block that the comment is applied on, in the form: 'field', or 'field.block_id.field'
  3812. # This must be unchanging across all revisions, so we will not support (current-format) ListBlock or the contents of InlinePanels initially.
  3813. position = models.TextField(blank=True)
  3814. # This stores the position within a field, to be interpreted by the field's frontend widget. It may change between revisions
  3815. created_at = models.DateTimeField(auto_now_add=True)
  3816. updated_at = models.DateTimeField(auto_now=True)
  3817. revision_created = models.ForeignKey(
  3818. Revision,
  3819. on_delete=models.CASCADE,
  3820. related_name="created_comments",
  3821. null=True,
  3822. blank=True,
  3823. )
  3824. resolved_at = models.DateTimeField(null=True, blank=True)
  3825. resolved_by = models.ForeignKey(
  3826. settings.AUTH_USER_MODEL,
  3827. on_delete=models.SET_NULL,
  3828. related_name="comments_resolved",
  3829. null=True,
  3830. blank=True,
  3831. )
  3832. class Meta:
  3833. verbose_name = _("comment")
  3834. verbose_name_plural = _("comments")
  3835. def __str__(self):
  3836. return "Comment on Page '{0}', left by {1}: '{2}'".format(
  3837. self.page, self.user, self.text
  3838. )
  3839. def save(self, update_position=False, **kwargs):
  3840. # Don't save the position unless specifically instructed to, as the position will normally be retrieved from the revision
  3841. update_fields = kwargs.pop("update_fields", None)
  3842. if not update_position and (
  3843. not update_fields or "position" not in update_fields
  3844. ):
  3845. if self.id:
  3846. # The instance is already saved; we can use `update_fields`
  3847. update_fields = (
  3848. update_fields if update_fields else self._meta.get_fields()
  3849. )
  3850. update_fields = [
  3851. field.name
  3852. for field in update_fields
  3853. if field.name not in {"position", "id"}
  3854. ]
  3855. else:
  3856. # This is a new instance, we have to preserve and then restore the position via a variable
  3857. position = self.position
  3858. result = super().save(**kwargs)
  3859. self.position = position
  3860. return result
  3861. return super().save(update_fields=update_fields, **kwargs)
  3862. def _log(self, action, page_revision=None, user=None):
  3863. log(
  3864. instance=self.page,
  3865. action=action,
  3866. user=user,
  3867. revision=page_revision,
  3868. data={
  3869. "comment": {
  3870. "id": self.pk,
  3871. "contentpath": self.contentpath,
  3872. "text": self.text,
  3873. }
  3874. },
  3875. )
  3876. def log_create(self, **kwargs):
  3877. self._log("wagtail.comments.create", **kwargs)
  3878. def log_edit(self, **kwargs):
  3879. self._log("wagtail.comments.edit", **kwargs)
  3880. def log_resolve(self, **kwargs):
  3881. self._log("wagtail.comments.resolve", **kwargs)
  3882. def log_delete(self, **kwargs):
  3883. self._log("wagtail.comments.delete", **kwargs)
  3884. class CommentReply(models.Model):
  3885. comment = ParentalKey(Comment, on_delete=models.CASCADE, related_name="replies")
  3886. user = models.ForeignKey(
  3887. settings.AUTH_USER_MODEL,
  3888. on_delete=models.CASCADE,
  3889. related_name="comment_replies",
  3890. )
  3891. text = models.TextField()
  3892. created_at = models.DateTimeField(auto_now_add=True)
  3893. updated_at = models.DateTimeField(auto_now=True)
  3894. class Meta:
  3895. verbose_name = _("comment reply")
  3896. verbose_name_plural = _("comment replies")
  3897. def __str__(self):
  3898. return "CommentReply left by '{0}': '{1}'".format(self.user, self.text)
  3899. def _log(self, action, page_revision=None, user=None):
  3900. log(
  3901. instance=self.comment.page,
  3902. action=action,
  3903. user=user,
  3904. revision=page_revision,
  3905. data={
  3906. "comment": {
  3907. "id": self.comment.pk,
  3908. "contentpath": self.comment.contentpath,
  3909. "text": self.comment.text,
  3910. },
  3911. "reply": {
  3912. "id": self.pk,
  3913. "text": self.text,
  3914. },
  3915. },
  3916. )
  3917. def log_create(self, **kwargs):
  3918. self._log("wagtail.comments.create_reply", **kwargs)
  3919. def log_edit(self, **kwargs):
  3920. self._log("wagtail.comments.edit_reply", **kwargs)
  3921. def log_delete(self, **kwargs):
  3922. self._log("wagtail.comments.delete_reply", **kwargs)
  3923. class PageSubscription(models.Model):
  3924. user = models.ForeignKey(
  3925. settings.AUTH_USER_MODEL,
  3926. on_delete=models.CASCADE,
  3927. related_name="page_subscriptions",
  3928. )
  3929. page = models.ForeignKey(Page, on_delete=models.CASCADE, related_name="subscribers")
  3930. comment_notifications = models.BooleanField()
  3931. class Meta:
  3932. unique_together = [
  3933. ("page", "user"),
  3934. ]
  3935. class ReferenceGroups:
  3936. def __init__(self, qs):
  3937. self.qs = qs
  3938. def __iter__(self):
  3939. object = None
  3940. references = []
  3941. for reference in self.qs.order_by("base_content_type", "object_id"):
  3942. if object != (reference.base_content_type, reference.object_id):
  3943. if object is not None:
  3944. yield object[0].get_object_for_this_type(pk=object[1]), references
  3945. references = []
  3946. object = (reference.base_content_type, reference.object_id)
  3947. references.append(reference)
  3948. if references:
  3949. yield object[0].get_object_for_this_type(pk=object[1]), references
  3950. def __len__(self):
  3951. return (
  3952. self.qs.order_by("base_content_type", "object_id")
  3953. .values("base_content_type", "object_id")
  3954. .distinct()
  3955. .count()
  3956. )
  3957. def count(self):
  3958. return len(self)
  3959. def __getitem__(self, key):
  3960. return list(self)[key]
  3961. class ReferenceIndexQuerySet(models.QuerySet):
  3962. def group_by_source_object(self):
  3963. return ReferenceGroups(self)
  3964. class ReferenceIndex(models.Model):
  3965. """
  3966. Records references between objects for quick retrieval of object usage.
  3967. References are extracted from Foreign Keys, Chooser Blocks in StreamFields, and links in Rich Text Fields.
  3968. This index allows us to efficiently find all of the references to a particular object from all of these sources.
  3969. """
  3970. # The object where the reference was extracted from
  3971. # content_type represents the content type of the model that contains
  3972. # the field where the reference came from. If the model sub-classes another
  3973. # concrete model (such as Page), that concrete model will be set in
  3974. # base_content_type, otherwise it would be the same as content_type
  3975. content_type = models.ForeignKey(
  3976. ContentType, on_delete=models.CASCADE, related_name="+"
  3977. )
  3978. base_content_type = models.ForeignKey(
  3979. ContentType, on_delete=models.CASCADE, related_name="+"
  3980. )
  3981. object_id = models.CharField(
  3982. max_length=255,
  3983. verbose_name=_("object id"),
  3984. )
  3985. # The object that has been referenced
  3986. # to_content_type is always the base content type of the referenced object
  3987. to_content_type = models.ForeignKey(
  3988. ContentType, on_delete=models.CASCADE, related_name="+"
  3989. )
  3990. to_object_id = models.CharField(
  3991. max_length=255,
  3992. verbose_name=_("object id"),
  3993. )
  3994. # The model_path is the path to the field on content_type where the reference was extracted from.
  3995. # the content_path is the path to a specific block on the instance where the reference is extracted from.
  3996. # These are dotted path, always starting with a field orchild relation name. If
  3997. # the reference was extracted from an inline panel or streamfield, other components
  3998. # of the path can be used to locate where the reference was extracted.
  3999. #
  4000. # For example, say we have a StreamField called 'body' which has a struct block type
  4001. # called 'my_struct_block' that has a field called 'my_field'. If we extracted a
  4002. # reference from that field, the model_path would be set to the following:
  4003. #
  4004. # 'body.my_struct_block.my_field'
  4005. #
  4006. # The content path would follow the same format, but anything repeatable would be replaced by an ID.
  4007. # For example:
  4008. #
  4009. # 'body.bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86.my_field'
  4010. #
  4011. # We can use the model_path with the 'content_type' to find the original definition of
  4012. # the field block and display information to the user about where the reference was
  4013. # extracted from.
  4014. #
  4015. # We can use the content_path to link the user directly to the block/field that contains
  4016. # the reference.
  4017. model_path = models.TextField()
  4018. content_path = models.TextField()
  4019. # We need a separate hash field for content_path in order to use it in a unique key because
  4020. # MySQL has a limit to the size of fields that are included in unique keys
  4021. content_path_hash = models.UUIDField()
  4022. objects = ReferenceIndexQuerySet.as_manager()
  4023. wagtail_reference_index_ignore = True
  4024. class Meta:
  4025. unique_together = [
  4026. (
  4027. "base_content_type",
  4028. "object_id",
  4029. "to_content_type",
  4030. "to_object_id",
  4031. "content_path_hash",
  4032. )
  4033. ]
  4034. @classmethod
  4035. def _get_base_content_type(cls, model_or_object):
  4036. parents = model_or_object._meta.get_parent_list()
  4037. if parents:
  4038. return ContentType.objects.get_for_model(
  4039. parents[-1], for_concrete_model=False
  4040. )
  4041. else:
  4042. return ContentType.objects.get_for_model(
  4043. model_or_object, for_concrete_model=False
  4044. )
  4045. @classmethod
  4046. def model_is_indexible(cls, model, allow_child_models=False):
  4047. """
  4048. Returns True if the given model may have outbound references that we would be interested in recording in the index.
  4049. """
  4050. if getattr(model, "wagtail_reference_index_ignore", False):
  4051. return False
  4052. # Don't check any models that have a parental key, references from these will be collected from the parent
  4053. if not allow_child_models and any(
  4054. [isinstance(field, ParentalKey) for field in model._meta.get_fields()]
  4055. ):
  4056. return False
  4057. for field in model._meta.get_fields():
  4058. if field.is_relation and field.many_to_one:
  4059. if getattr(field, "wagtail_reference_index_ignore", False):
  4060. continue
  4061. if getattr(
  4062. field.related_model, "wagtail_reference_index_ignore", False
  4063. ):
  4064. continue
  4065. if isinstance(field, (ParentalKey, GenericRel)):
  4066. continue
  4067. return True
  4068. if isinstance(field, (StreamField, RichTextField)):
  4069. return True
  4070. if issubclass(model, ClusterableModel):
  4071. for child_relation in get_all_child_relations(model):
  4072. if cls.model_is_indexible(
  4073. child_relation.related_model,
  4074. allow_child_models=True,
  4075. ):
  4076. return True
  4077. return False
  4078. @classmethod
  4079. def _extract_references_from_object(cls, object):
  4080. # Extract references from fields
  4081. for field in object._meta.get_fields():
  4082. if field.is_relation and field.many_to_one:
  4083. if getattr(field, "wagtail_reference_index_ignore", False):
  4084. continue
  4085. if getattr(
  4086. field.related_model, "wagtail_reference_index_ignore", False
  4087. ):
  4088. continue
  4089. if isinstance(field, (ParentalKey, GenericRel)):
  4090. continue
  4091. if isinstance(field, GenericForeignKey):
  4092. ct_field = object._meta.get_field(field.ct_field)
  4093. fk_field = object._meta.get_field(field.fk_field)
  4094. yield ct_field.value_from_object(object), str(
  4095. fk_field.value_from_object(object)
  4096. ), field.name, field.name
  4097. continue
  4098. if isinstance(field, GenericRel):
  4099. continue
  4100. value = field.value_from_object(object)
  4101. if value is not None:
  4102. yield cls._get_base_content_type(field.related_model).id, str(
  4103. value
  4104. ), field.name, field.name
  4105. if isinstance(field, (StreamField, RichTextField)):
  4106. value = field.value_from_object(object)
  4107. if value is not None:
  4108. yield from (
  4109. (
  4110. cls._get_base_content_type(to_model).id,
  4111. to_object_id,
  4112. f"{field.name}.{model_path}",
  4113. f"{field.name}.{content_path}",
  4114. )
  4115. for to_model, to_object_id, model_path, content_path in field.extract_references(
  4116. value
  4117. )
  4118. )
  4119. # Extract references from child relations
  4120. if isinstance(object, ClusterableModel):
  4121. for child_relation in get_all_child_relations(object):
  4122. relation_name = child_relation.get_accessor_name()
  4123. child_objects = getattr(object, relation_name).all()
  4124. for child_object in child_objects:
  4125. yield from (
  4126. (
  4127. to_content_type_id,
  4128. to_object_id,
  4129. f"{relation_name}.item.{model_path}",
  4130. f"{relation_name}.{str(child_object.id)}.{content_path}",
  4131. )
  4132. for to_content_type_id, to_object_id, model_path, content_path in cls._extract_references_from_object(
  4133. child_object
  4134. )
  4135. )
  4136. @classmethod
  4137. def _get_content_path_hash(cls, content_path):
  4138. return uuid.uuid5(
  4139. uuid.UUID("bdc70d8b-e7a2-4c2a-bf43-2a3e3fcbbe86"), content_path
  4140. )
  4141. @classmethod
  4142. def create_or_update_for_object(cls, object):
  4143. # Extract new references
  4144. references = set(cls._extract_references_from_object(object))
  4145. # Find existing references in the database so we know what to add/delete
  4146. content_type = ContentType.objects.get_for_model(object)
  4147. base_content_type = cls._get_base_content_type(object)
  4148. existing_references = {
  4149. (to_content_type_id, to_object_id, model_path, content_path): id
  4150. for id, to_content_type_id, to_object_id, model_path, content_path in cls.objects.filter(
  4151. base_content_type=base_content_type, object_id=object.pk
  4152. ).values_list(
  4153. "id", "to_content_type", "to_object_id", "model_path", "content_path"
  4154. )
  4155. }
  4156. # Add new references
  4157. new_references = references - set(existing_references.keys())
  4158. cls.objects.bulk_create(
  4159. [
  4160. cls(
  4161. content_type=content_type,
  4162. base_content_type=base_content_type,
  4163. object_id=object.pk,
  4164. to_content_type_id=to_content_type_id,
  4165. to_object_id=to_object_id,
  4166. model_path=model_path,
  4167. content_path=content_path,
  4168. content_path_hash=cls._get_content_path_hash(content_path),
  4169. )
  4170. for to_content_type_id, to_object_id, model_path, content_path in new_references
  4171. ]
  4172. )
  4173. # Delete removed references
  4174. deleted_references = set(existing_references.keys()) - references
  4175. cls.objects.filter(
  4176. id__in=[existing_references[reference] for reference in deleted_references]
  4177. ).delete()
  4178. @classmethod
  4179. def remove_for_object(cls, object):
  4180. base_content_type = cls._get_base_content_type(object)
  4181. cls.objects.filter(
  4182. base_content_type=base_content_type, object_id=object.pk
  4183. ).delete()
  4184. @classmethod
  4185. def get_references_for_object(cls, object):
  4186. return cls.objects.filter(
  4187. base_content_type_id=cls._get_base_content_type(object),
  4188. object_id=object.pk,
  4189. )
  4190. @classmethod
  4191. def get_references_to(cls, object):
  4192. return cls.objects.filter(
  4193. to_content_type_id=cls._get_base_content_type(object),
  4194. to_object_id=object.pk,
  4195. )
  4196. def describe_source_field(self):
  4197. model_path_components = self.model_path.split(".")
  4198. field_name = model_path_components[0]
  4199. field = self.content_type.model_class()._meta.get_field(field_name)
  4200. # ManyToOneRel (reverse accessor for ParentalKey) does not have a verbose name. So get the name of the child field instead
  4201. if isinstance(field, models.ManyToOneRel):
  4202. child_field = field.related_model._meta.get_field(model_path_components[2])
  4203. return capfirst(child_field.verbose_name)
  4204. else:
  4205. return capfirst(field.verbose_name)