models.py 188 KB


  1. import functools
  2. import json
  3. import logging
  4. import uuid
  5. from collections import namedtuple
  6. from io import StringIO
  7. from urllib.parse import urlparse
  8. from django import VERSION as DJANGO_VERSION
  9. from django import forms
  10. from django.apps import apps
  11. from django.conf import settings
  12. from django.contrib.auth.models import Group, Permission
  13. from django.contrib.contenttypes.models import ContentType
  14. from django.core import checks
  15. from django.core.cache import cache
  16. from django.core.exceptions import PermissionDenied, ValidationError
  17. from django.core.handlers.base import BaseHandler
  18. from django.core.handlers.wsgi import WSGIRequest
  19. from django.db import migrations, models, transaction
  20. from django.db.models import DEFERRED, Q, Value
  21. from django.db.models.expressions import OuterRef, Subquery
  22. from django.db.models.functions import Concat, Lower, Substr
  23. from django.db.models.signals import pre_save
  24. from django.dispatch import receiver
  25. from django.http import Http404
  26. from django.http.request import split_domain_port
  27. from django.template.response import TemplateResponse
  28. from django.urls import NoReverseMatch, reverse
  29. from django.utils import timezone, translation
  30. from django.utils.cache import patch_cache_control
  31. from django.utils.encoding import force_str
  32. from django.utils.functional import cached_property
  33. from django.utils.html import format_html
  34. from django.utils.module_loading import import_string
  35. from django.utils.safestring import mark_safe
  36. from django.utils.text import capfirst, slugify
  37. from django.utils.translation import gettext_lazy as _
  38. from modelcluster.fields import ParentalKey, ParentalManyToManyField
  39. from modelcluster.models import ClusterableModel, get_all_child_relations
  40. from treebeard.mp_tree import MP_Node
  41. from wagtail.core.fields import StreamField
  42. from wagtail.core.forms import TaskStateCommentForm
  43. from wagtail.core.logging import page_log_action_registry
  44. from wagtail.core.query import PageQuerySet, TreeQuerySet
  45. from wagtail.core.signals import (
  46. page_published, page_unpublished, post_page_move, pre_page_move, task_approved, task_cancelled,
  47. task_rejected, task_submitted, workflow_approved, workflow_cancelled, workflow_rejected,
  48. workflow_submitted)
  49. from wagtail.core.sites import get_site_for_hostname
  50. from wagtail.core.treebeard import TreebeardPathFixMixin
  51. from wagtail.core.url_routing import RouteResult
  52. from wagtail.core.utils import WAGTAIL_APPEND_SLASH, camelcase_to_underscore, resolve_model_string
  53. from wagtail.search import index
  54. from .utils import (
  55. find_available_slug, get_content_languages, get_supported_content_language_variant)
  56. logger = logging.getLogger('wagtail.core')
  57. PAGE_TEMPLATE_VAR = 'page'
  58. def _extract_field_data(source, exclude_fields=None):
  59. """
  60. Get dictionaries representing the model's field data.
  61. This excludes many to many fields (which are handled by _copy_m2m_relations)'
  62. """
  63. exclude_fields = exclude_fields or []
  64. data_dict = {}
  65. for field in source._meta.get_fields():
  66. # Ignore explicitly excluded fields
  67. if field.name in exclude_fields:
  68. continue
  69. # Ignore reverse relations
  70. if field.auto_created:
  71. continue
  72. # Copy parental m2m relations
  73. if field.many_to_many:
  74. if isinstance(field, ParentalManyToManyField):
  75. parental_field = getattr(source, field.name)
  76. if hasattr(parental_field, 'all'):
  77. values = parental_field.all()
  78. if values:
  79. data_dict[field.name] = values
  80. continue
  81. # Ignore parent links (page_ptr)
  82. if isinstance(field, models.OneToOneField) and field.remote_field.parent_link:
  83. continue
  84. if DJANGO_VERSION >= (3, 0) and isinstance(field, models.ForeignKey):
  85. # Use attname to copy the ID instead of retrieving the instance
  86. # Note: We first need to set the field to None to unset any object
  87. # that's there already just setting _id on its own won't change the
  88. # field until its saved.
  89. # Before Django 3.0, Django won't find the new object if the field
  90. # was set to None in this way, so this optimisation isn't available
  91. # for Django 2.x.
  92. data_dict[field.name] = None
  93. data_dict[field.attname] = getattr(source, field.attname)
  94. else:
  95. data_dict[field.name] = getattr(source, field.name)
  96. return data_dict
  97. def _copy_m2m_relations(source, target, exclude_fields=None, update_attrs=None):
  98. """
  99. Copies non-ParentalManyToMany m2m relations
  100. """
  101. update_attrs = update_attrs or {}
  102. exclude_fields = exclude_fields or []
  103. for field in source._meta.get_fields():
  104. # Copy m2m relations. Ignore explicitly excluded fields, reverse relations, and Parental m2m fields.
  105. if field.many_to_many and field.name not in exclude_fields and not field.auto_created and not isinstance(field, ParentalManyToManyField):
  106. try:
  107. # Do not copy m2m links with a through model that has a ParentalKey to the model being copied - these will be copied as child objects
  108. through_model_parental_links = [field for field in field.through._meta.get_fields() if isinstance(field, ParentalKey) and (field.related_model == source.__class__ or field.related_model in source._meta.parents)]
  109. if through_model_parental_links:
  110. continue
  111. except AttributeError:
  112. pass
  113. if field.name in update_attrs:
  114. value = update_attrs[field.name]
  115. else:
  116. value = getattr(source, field.name).all()
  117. getattr(target, field.name).set(value)
  118. def _copy(source, exclude_fields=None, update_attrs=None):
  119. data_dict = _extract_field_data(source, exclude_fields=exclude_fields)
  120. target = source.__class__(**data_dict)
  121. if update_attrs:
  122. for field, value in update_attrs.items():
  123. if field not in data_dict:
  124. continue
  125. setattr(target, field, value)
  126. if isinstance(source, ClusterableModel):
  127. child_object_map = source.copy_all_child_relations(target, exclude=exclude_fields)
  128. else:
  129. child_object_map = {}
  130. return target, child_object_map
  131. class SiteManager(models.Manager):
  132. def get_queryset(self):
  133. return super(SiteManager, self).get_queryset().order_by(Lower("hostname"))
  134. def get_by_natural_key(self, hostname, port):
  135. return self.get(hostname=hostname, port=port)
  136. SiteRootPath = namedtuple('SiteRootPath', 'site_id root_path root_url language_code')
  137. class Site(models.Model):
  138. hostname = models.CharField(verbose_name=_('hostname'), max_length=255, db_index=True)
  139. port = models.IntegerField(
  140. verbose_name=_('port'),
  141. default=80,
  142. help_text=_(
  143. "Set this to something other than 80 if you need a specific port number to appear in URLs"
  144. " (e.g. development on port 8000). Does not affect request handling (so port forwarding still works)."
  145. )
  146. )
  147. site_name = models.CharField(
  148. verbose_name=_('site name'),
  149. max_length=255,
  150. blank=True,
  151. help_text=_("Human-readable name for the site.")
  152. )
  153. root_page = models.ForeignKey('Page', verbose_name=_('root page'), related_name='sites_rooted_here',
  154. on_delete=models.CASCADE)
  155. is_default_site = models.BooleanField(
  156. verbose_name=_('is default site'),
  157. default=False,
  158. help_text=_(
  159. "If true, this site will handle requests for all other hostnames that do not have a site entry of their own"
  160. )
  161. )
  162. objects = SiteManager()
  163. class Meta:
  164. unique_together = ('hostname', 'port')
  165. verbose_name = _('site')
  166. verbose_name_plural = _('sites')
  167. def natural_key(self):
  168. return (self.hostname, self.port)
  169. def __str__(self):
  170. default_suffix = " [{}]".format(_("default"))
  171. if self.site_name:
  172. return (
  173. self.site_name
  174. + (default_suffix if self.is_default_site else "")
  175. )
  176. else:
  177. return (
  178. self.hostname
  179. + ("" if self.port == 80 else (":%d" % self.port))
  180. + (default_suffix if self.is_default_site else "")
  181. )
  182. @staticmethod
  183. def find_for_request(request):
  184. """
  185. Find the site object responsible for responding to this HTTP
  186. request object. Try:
  187. * unique hostname first
  188. * then hostname and port
  189. * if there is no matching hostname at all, or no matching
  190. hostname:port combination, fall back to the unique default site,
  191. or raise an exception
  192. NB this means that high-numbered ports on an extant hostname may
  193. still be routed to a different hostname which is set as the default
  194. The site will be cached via request._wagtail_site
  195. """
  196. if request is None:
  197. return None
  198. if not hasattr(request, '_wagtail_site'):
  199. site = Site._find_for_request(request)
  200. setattr(request, '_wagtail_site', site)
  201. return request._wagtail_site
  202. @staticmethod
  203. def _find_for_request(request):
  204. hostname = split_domain_port(request.get_host())[0]
  205. port = request.get_port()
  206. site = None
  207. try:
  208. site = get_site_for_hostname(hostname, port)
  209. except Site.DoesNotExist:
  210. pass
  211. # copy old SiteMiddleware behavior
  212. return site
  213. @property
  214. def root_url(self):
  215. if self.port == 80:
  216. return 'http://%s' % self.hostname
  217. elif self.port == 443:
  218. return 'https://%s' % self.hostname
  219. else:
  220. return 'http://%s:%d' % (self.hostname, self.port)
  221. def clean_fields(self, exclude=None):
  222. super().clean_fields(exclude)
  223. # Only one site can have the is_default_site flag set
  224. try:
  225. default = Site.objects.get(is_default_site=True)
  226. except Site.DoesNotExist:
  227. pass
  228. except Site.MultipleObjectsReturned:
  229. raise
  230. else:
  231. if self.is_default_site and self.pk != default.pk:
  232. raise ValidationError(
  233. {'is_default_site': [
  234. _(
  235. "%(hostname)s is already configured as the default site."
  236. " You must unset that before you can save this site as default."
  237. )
  238. % {'hostname': default.hostname}
  239. ]}
  240. )
  241. @staticmethod
  242. def get_site_root_paths():
  243. """
  244. Return a list of `SiteRootPath` instances, most specific path
  245. first - used to translate url_paths into actual URLs with hostnames
  246. Each root path is an instance of the `SiteRootPath` named tuple,
  247. and have the following attributes:
  248. - `site_id` - The ID of the Site record
  249. - `root_path` - The internal URL path of the site's home page (for example '/home/')
  250. - `root_url` - The scheme/domain name of the site (for example 'https://www.example.com/')
  251. - `language_code` - The language code of the site (for example 'en')
  252. """
  253. result = cache.get('wagtail_site_root_paths')
  254. # Wagtail 2.11 changed the way site root paths were stored. This can cause an upgraded 2.11
  255. # site to break when loading cached site root paths that were cached with 2.10.2 or older
  256. # versions of Wagtail. The line below checks if the any of the cached site urls is consistent
  257. # with an older version of Wagtail and invalidates the cache.
  258. if result is None or any(len(site_record) == 3 for site_record in result):
  259. result = []
  260. for site in Site.objects.select_related('root_page', 'root_page__locale').order_by('-root_page__url_path', '-is_default_site', 'hostname'):
  261. if getattr(settings, 'WAGTAIL_I18N_ENABLED', False):
  262. result.extend([
  263. SiteRootPath(site.id, root_page.url_path, site.root_url, root_page.locale.language_code)
  264. for root_page in site.root_page.get_translations(inclusive=True).select_related('locale')
  265. ])
  266. else:
  267. result.append(SiteRootPath(site.id, site.root_page.url_path, site.root_url, site.root_page.locale.language_code))
  268. cache.set('wagtail_site_root_paths', result, 3600)
  269. return result
  270. def pk(obj):
  271. if isinstance(obj, models.Model):
  272. return obj.pk
  273. else:
  274. return obj
  275. class LocaleManager(models.Manager):
  276. def get_for_language(self, language_code):
  277. """
  278. Gets a Locale from a language code.
  279. """
  280. return self.get(language_code=get_supported_content_language_variant(language_code))
  281. class Locale(models.Model):
  282. #: The language code that represents this locale
  283. #:
  284. #: The language code can either be a language code on its own (such as ``en``, ``fr``),
  285. #: or it can include a region code (such as ``en-gb``, ``fr-fr``).
  286. language_code = models.CharField(max_length=100, unique=True)
  287. # Objects excludes any Locales that have been removed from LANGUAGES, This effectively disables them
  288. # The Locale management UI needs to be able to see these so we provide a separate manager `all_objects`
  289. objects = LocaleManager()
  290. all_objects = models.Manager()
  291. class Meta:
  292. ordering = [
  293. "language_code",
  294. ]
  295. @classmethod
  296. def get_default(cls):
  297. """
  298. Returns the default Locale based on the site's LANGUAGE_CODE setting
  299. """
  300. return cls.objects.get_for_language(settings.LANGUAGE_CODE)
  301. @classmethod
  302. def get_active(cls):
  303. """
  304. Returns the Locale that corresponds to the currently activated language in Django.
  305. """
  306. try:
  307. return cls.objects.get_for_language(translation.get_language())
  308. except (cls.DoesNotExist, LookupError):
  309. return cls.get_default()
  310. @transaction.atomic
  311. def delete(self, *args, **kwargs):
  312. # if we're deleting the locale used on the root page node, reassign that to a new locale first
  313. root_page_with_this_locale = Page.objects.filter(depth=1, locale=self)
  314. if root_page_with_this_locale.exists():
  315. # Select the default locale, if one exists and isn't the one being deleted
  316. try:
  317. new_locale = Locale.get_default()
  318. default_locale_is_ok = (new_locale != self)
  319. except (Locale.DoesNotExist, LookupError):
  320. default_locale_is_ok = False
  321. if not default_locale_is_ok:
  322. # fall back on any remaining locale
  323. new_locale = Locale.all_objects.exclude(pk=self.pk).first()
  324. root_page_with_this_locale.update(locale=new_locale)
  325. return super().delete(*args, **kwargs)
  326. def language_code_is_valid(self):
  327. return self.language_code in get_content_languages()
  328. def get_display_name(self):
  329. return get_content_languages().get(self.language_code)
  330. def __str__(self):
  331. return force_str(self.get_display_name() or self.language_code)
  332. class TranslatableMixin(models.Model):
  333. translation_key = models.UUIDField(default=uuid.uuid4, editable=False)
  334. locale = models.ForeignKey(Locale, on_delete=models.PROTECT, related_name="+", editable=False)
  335. class Meta:
  336. abstract = True
  337. unique_together = [("translation_key", "locale")]
  338. @classmethod
  339. def check(cls, **kwargs):
  340. errors = super(TranslatableMixin, cls).check(**kwargs)
  341. is_translation_model = cls.get_translation_model() is cls
  342. # Raise error if subclass has removed the unique_together constraint
  343. # No need to check this on multi-table-inheritance children though as it only needs to be applied to
  344. # the table that has the translation_key/locale fields
  345. if is_translation_model and ("translation_key", "locale") not in cls._meta.unique_together:
  346. errors.append(
  347. checks.Error(
  348. "{0}.{1} is missing a unique_together constraint for the translation key and locale fields"
  349. .format(cls._meta.app_label, cls.__name__),
  350. hint="Add ('translation_key', 'locale') to {}.Meta.unique_together".format(cls.__name__),
  351. obj=cls,
  352. id='wagtailcore.E003',
  353. )
  354. )
  355. return errors
  356. @property
  357. def localized(self):
  358. """
  359. Finds the translation in the current active language.
  360. If there is no translation in the active language, self is returned.
  361. """
  362. try:
  363. locale = Locale.get_active()
  364. except (LookupError, Locale.DoesNotExist):
  365. return self
  366. if locale.id == self.locale_id:
  367. return self
  368. return self.get_translation_or_none(locale) or self
  369. def get_translations(self, inclusive=False):
  370. """
  371. Returns a queryset containing the translations of this instance.
  372. """
  373. translations = self.__class__.objects.filter(
  374. translation_key=self.translation_key
  375. )
  376. if inclusive is False:
  377. translations = translations.exclude(id=self.id)
  378. return translations
  379. def get_translation(self, locale):
  380. """
  381. Finds the translation in the specified locale.
  382. If there is no translation in that locale, this raises a ``model.DoesNotExist`` exception.
  383. """
  384. return self.get_translations(inclusive=True).get(locale_id=pk(locale))
  385. def get_translation_or_none(self, locale):
  386. """
  387. Finds the translation in the specified locale.
  388. If there is no translation in that locale, this returns None.
  389. """
  390. try:
  391. return self.get_translation(locale)
  392. except self.__class__.DoesNotExist:
  393. return None
  394. def has_translation(self, locale):
  395. """
  396. Returns True if a translation exists in the specified locale.
  397. """
  398. return self.get_translations(inclusive=True).filter(locale_id=pk(locale)).exists()
  399. def copy_for_translation(self, locale):
  400. """
  401. Creates a copy of this instance with the specified locale.
  402. Note that the copy is initially unsaved.
  403. """
  404. translated, child_object_map = _copy(self)
  405. translated.locale = locale
  406. # Update locale on any translatable child objects as well
  407. # Note: If this is not a subclass of ClusterableModel, child_object_map will always be '{}'
  408. for (child_relation, old_pk), child_object in child_object_map.items():
  409. if isinstance(child_object, TranslatableMixin):
  410. child_object.locale = locale
  411. return translated
  412. def get_default_locale(self):
  413. """
  414. Finds the default locale to use for this object.
  415. This will be called just before the initial save.
  416. """
  417. # Check if the object has any parental keys to another translatable model
  418. # If so, take the locale from the object referenced in that parental key
  419. parental_keys = [
  420. field
  421. for field in self._meta.get_fields()
  422. if isinstance(field, ParentalKey)
  423. and issubclass(field.related_model, TranslatableMixin)
  424. ]
  425. if parental_keys:
  426. parent_id = parental_keys[0].value_from_object(self)
  427. return (
  428. parental_keys[0]
  429. .related_model.objects.defer().select_related("locale")
  430. .get(id=parent_id)
  431. .locale
  432. )
  433. return Locale.get_default()
  434. @classmethod
  435. def get_translation_model(cls):
  436. """
  437. Returns this model's "Translation model".
  438. The "Translation model" is the model that has the ``locale`` and
  439. ``translation_key`` fields.
  440. Typically this would be the current model, but it may be a
  441. super-class if multi-table inheritance is in use (as is the case
  442. for ``wagtailcore.Page``).
  443. """
  444. return cls._meta.get_field("locale").model
  445. def bootstrap_translatable_model(model, locale):
  446. """
  447. This function populates the "translation_key", and "locale" fields on model instances that were created
  448. before wagtail-localize was added to the site.
  449. This can be called from a data migration, or instead you could use the "boostrap_translatable_models"
  450. management command.
  451. """
  452. for instance in (
  453. model.objects.filter(translation_key__isnull=True).defer().iterator()
  454. ):
  455. instance.translation_key = uuid.uuid4()
  456. instance.locale = locale
  457. instance.save(update_fields=["translation_key", "locale"])
  458. class BootstrapTranslatableModel(migrations.RunPython):
  459. def __init__(self, model_string, language_code=None):
  460. if language_code is None:
  461. language_code = get_supported_content_language_variant(settings.LANGUAGE_CODE)
  462. def forwards(apps, schema_editor):
  463. model = apps.get_model(model_string)
  464. Locale = apps.get_model("wagtailcore.Locale")
  465. locale = Locale.objects.get(language_code=language_code)
  466. bootstrap_translatable_model(model, locale)
  467. def backwards(apps, schema_editor):
  468. pass
  469. super().__init__(forwards, backwards)
  470. class ParentNotTranslatedError(Exception):
  471. """
  472. Raised when a call to Page.copy_for_translation is made but the
  473. parent page is not translated and copy_parents is False.
  474. """
  475. pass
  476. class BootstrapTranslatableMixin(TranslatableMixin):
  477. """
  478. A version of TranslatableMixin without uniqueness constraints.
  479. This is to make it easy to transition existing models to being translatable.
  480. The process is as follows:
  481. - Add BootstrapTranslatableMixin to the model
  482. - Run makemigrations
  483. - Create a data migration for each app, then use the BootstrapTranslatableModel operation in
  484. wagtail.core.models on each model in that app
  485. - Change BootstrapTranslatableMixin to TranslatableMixin
  486. - Run makemigrations again
  487. - Migrate!
  488. """
  489. translation_key = models.UUIDField(null=True, editable=False)
  490. locale = models.ForeignKey(
  491. Locale, on_delete=models.PROTECT, null=True, related_name="+", editable=False
  492. )
  493. @classmethod
  494. def check(cls, **kwargs):
  495. # skip the check in TranslatableMixin that enforces the unique-together constraint
  496. return super(TranslatableMixin, cls).check(**kwargs)
  497. class Meta:
  498. abstract = True
  499. def get_translatable_models(include_subclasses=False):
  500. """
  501. Returns a list of all concrete models that inherit from TranslatableMixin.
  502. By default, this only includes models that are direct children of TranslatableMixin,
  503. to get all models, set the include_subclasses attribute to True.
  504. """
  505. translatable_models = [
  506. model
  507. for model in apps.get_models()
  508. if issubclass(model, TranslatableMixin) and not model._meta.abstract
  509. ]
  510. if include_subclasses is False:
  511. # Exclude models that inherit from another translatable model
  512. root_translatable_models = set()
  513. for model in translatable_models:
  514. root_translatable_models.add(model.get_translation_model())
  515. translatable_models = [
  516. model for model in translatable_models if model in root_translatable_models
  517. ]
  518. return translatable_models
  519. @receiver(pre_save)
  520. def set_locale_on_new_instance(sender, instance, **kwargs):
  521. if not isinstance(instance, TranslatableMixin):
  522. return
  523. if instance.locale_id is not None:
  524. return
  525. # If this is a fixture load, use the global default Locale
  526. # as the page tree is probably in an flux
  527. if kwargs["raw"]:
  528. instance.locale = Locale.get_default()
  529. return
  530. instance.locale = instance.get_default_locale()
  531. PAGE_MODEL_CLASSES = []
  532. def get_page_models():
  533. """
  534. Returns a list of all non-abstract Page model classes defined in this project.
  535. """
  536. return PAGE_MODEL_CLASSES
  537. def get_default_page_content_type():
  538. """
  539. Returns the content type to use as a default for pages whose content type
  540. has been deleted.
  541. """
  542. return ContentType.objects.get_for_model(Page)
  543. @functools.lru_cache(maxsize=None)
  544. def get_streamfield_names(model_class):
  545. return tuple(
  546. field.name for field in model_class._meta.concrete_fields
  547. if isinstance(field, StreamField)
  548. )
  549. class BasePageManager(models.Manager):
  550. def get_queryset(self):
  551. return self._queryset_class(self.model).order_by('path')
  552. PageManager = BasePageManager.from_queryset(PageQuerySet)
  553. class PageBase(models.base.ModelBase):
  554. """Metaclass for Page"""
  555. def __init__(cls, name, bases, dct):
  556. super(PageBase, cls).__init__(name, bases, dct)
  557. if 'template' not in dct:
  558. # Define a default template path derived from the app name and model name
  559. cls.template = "%s/%s.html" % (cls._meta.app_label, camelcase_to_underscore(name))
  560. if 'ajax_template' not in dct:
  561. cls.ajax_template = None
  562. cls._clean_subpage_models = None # to be filled in on first call to cls.clean_subpage_models
  563. cls._clean_parent_page_models = None # to be filled in on first call to cls.clean_parent_page_models
  564. # All pages should be creatable unless explicitly set otherwise.
  565. # This attribute is not inheritable.
  566. if 'is_creatable' not in dct:
  567. cls.is_creatable = not cls._meta.abstract
  568. if not cls._meta.abstract:
  569. # register this type in the list of page content types
  570. PAGE_MODEL_CLASSES.append(cls)
  571. class AbstractPage(TranslatableMixin, TreebeardPathFixMixin, MP_Node):
  572. """
  573. Abstract superclass for Page. According to Django's inheritance rules, managers set on
  574. abstract models are inherited by subclasses, but managers set on concrete models that are extended
  575. via multi-table inheritance are not. We therefore need to attach PageManager to an abstract
  576. superclass to ensure that it is retained by subclasses of Page.
  577. """
  578. objects = PageManager()
  579. class Meta:
  580. abstract = True
  581. class Page(AbstractPage, index.Indexed, ClusterableModel, metaclass=PageBase):
  582. title = models.CharField(
  583. verbose_name=_('title'),
  584. max_length=255,
  585. help_text=_("The page title as you'd like it to be seen by the public")
  586. )
  587. # to reflect title of a current draft in the admin UI
  588. draft_title = models.CharField(
  589. max_length=255,
  590. editable=False
  591. )
  592. slug = models.SlugField(
  593. verbose_name=_('slug'),
  594. allow_unicode=True,
  595. max_length=255,
  596. help_text=_("The name of the page as it will appear in URLs e.g http://domain.com/blog/[my-slug]/")
  597. )
  598. content_type = models.ForeignKey(
  599. ContentType,
  600. verbose_name=_('content type'),
  601. related_name='pages',
  602. on_delete=models.SET(get_default_page_content_type)
  603. )
  604. live = models.BooleanField(verbose_name=_('live'), default=True, editable=False)
  605. has_unpublished_changes = models.BooleanField(
  606. verbose_name=_('has unpublished changes'),
  607. default=False,
  608. editable=False
  609. )
  610. url_path = models.TextField(verbose_name=_('URL path'), blank=True, editable=False)
  611. owner = models.ForeignKey(
  612. settings.AUTH_USER_MODEL,
  613. verbose_name=_('owner'),
  614. null=True,
  615. blank=True,
  616. editable=True,
  617. on_delete=models.SET_NULL,
  618. related_name='owned_pages'
  619. )
  620. seo_title = models.CharField(
  621. verbose_name=_("title tag"),
  622. max_length=255,
  623. blank=True,
  624. help_text=_("The name of the page displayed on search engine results as the clickable headline.")
  625. )
  626. show_in_menus_default = False
  627. show_in_menus = models.BooleanField(
  628. verbose_name=_('show in menus'),
  629. default=False,
  630. help_text=_("Whether a link to this page will appear in automatically generated menus")
  631. )
  632. search_description = models.TextField(
  633. verbose_name=_('meta description'),
  634. blank=True,
  635. help_text=_("The descriptive text displayed underneath a headline in search engine results.")
  636. )
  637. go_live_at = models.DateTimeField(
  638. verbose_name=_("go live date/time"),
  639. blank=True,
  640. null=True
  641. )
  642. expire_at = models.DateTimeField(
  643. verbose_name=_("expiry date/time"),
  644. blank=True,
  645. null=True
  646. )
  647. expired = models.BooleanField(verbose_name=_('expired'), default=False, editable=False)
  648. locked = models.BooleanField(verbose_name=_('locked'), default=False, editable=False)
  649. locked_at = models.DateTimeField(verbose_name=_('locked at'), null=True, editable=False)
  650. locked_by = models.ForeignKey(
  651. settings.AUTH_USER_MODEL,
  652. verbose_name=_('locked by'),
  653. null=True,
  654. blank=True,
  655. editable=False,
  656. on_delete=models.SET_NULL,
  657. related_name='locked_pages'
  658. )
  659. first_published_at = models.DateTimeField(
  660. verbose_name=_('first published at'),
  661. blank=True,
  662. null=True,
  663. db_index=True
  664. )
  665. last_published_at = models.DateTimeField(
  666. verbose_name=_('last published at'),
  667. null=True,
  668. editable=False
  669. )
  670. latest_revision_created_at = models.DateTimeField(
  671. verbose_name=_('latest revision created at'),
  672. null=True,
  673. editable=False
  674. )
  675. live_revision = models.ForeignKey(
  676. 'PageRevision',
  677. related_name='+',
  678. verbose_name=_('live revision'),
  679. on_delete=models.SET_NULL,
  680. null=True,
  681. blank=True,
  682. editable=False
  683. )
  684. # If non-null, this page is an alias of the linked page
  685. # This means the page is kept in sync with the live version
  686. # of the linked pages and is not editable by users.
  687. alias_of = models.ForeignKey(
  688. 'self',
  689. on_delete=models.SET_NULL,
  690. null=True,
  691. blank=True,
  692. editable=False,
  693. related_name='aliases',
  694. )
  695. search_fields = [
  696. index.SearchField('title', partial_match=True, boost=2),
  697. index.AutocompleteField('title'),
  698. index.FilterField('title'),
  699. index.FilterField('id'),
  700. index.FilterField('live'),
  701. index.FilterField('owner'),
  702. index.FilterField('content_type'),
  703. index.FilterField('path'),
  704. index.FilterField('depth'),
  705. index.FilterField('locked'),
  706. index.FilterField('show_in_menus'),
  707. index.FilterField('first_published_at'),
  708. index.FilterField('last_published_at'),
  709. index.FilterField('latest_revision_created_at'),
  710. index.FilterField('locale'),
  711. index.FilterField('translation_key'),
  712. ]
  713. # Do not allow plain Page instances to be created through the Wagtail admin
  714. is_creatable = False
  715. # Define the maximum number of instances this page type can have. Default to unlimited.
  716. max_count = None
  717. # Define the maximum number of instances this page can have under a specific parent. Default to unlimited.
  718. max_count_per_parent = None
  719. # An array of additional field names that will not be included when a Page is copied.
  720. exclude_fields_in_copy = []
  721. default_exclude_fields_in_copy = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries']
  722. # Define these attributes early to avoid masking errors. (Issue #3078)
  723. # The canonical definition is in wagtailadmin.edit_handlers.
  724. content_panels = []
  725. promote_panels = []
  726. settings_panels = []
  727. def __init__(self, *args, **kwargs):
  728. super().__init__(*args, **kwargs)
  729. if not self.id:
  730. # this model is being newly created
  731. # rather than retrieved from the db;
  732. if not self.content_type_id:
  733. # set content type to correctly represent the model class
  734. # that this was created as
  735. self.content_type = ContentType.objects.get_for_model(self)
  736. if 'show_in_menus' not in kwargs:
  737. # if the value is not set on submit refer to the model setting
  738. self.show_in_menus = self.show_in_menus_default
  739. def __str__(self):
  740. return self.title
  741. @classmethod
  742. def get_streamfield_names(cls):
  743. return get_streamfield_names(cls)
  744. def set_url_path(self, parent):
  745. """
  746. Populate the url_path field based on this page's slug and the specified parent page.
  747. (We pass a parent in here, rather than retrieving it via get_parent, so that we can give
  748. new unsaved pages a meaningful URL when previewing them; at that point the page has not
  749. been assigned a position in the tree, as far as treebeard is concerned.
  750. """
  751. if parent:
  752. self.url_path = parent.url_path + self.slug + '/'
  753. else:
  754. # a page without a parent is the tree root, which always has a url_path of '/'
  755. self.url_path = '/'
  756. return self.url_path
  757. @staticmethod
  758. def _slug_is_available(slug, parent_page, page=None):
  759. """
  760. Determine whether the given slug is available for use on a child page of
  761. parent_page. If 'page' is passed, the slug is intended for use on that page
  762. (and so it will be excluded from the duplicate check).
  763. """
  764. if parent_page is None:
  765. # the root page's slug can be whatever it likes...
  766. return True
  767. siblings = parent_page.get_children()
  768. if page:
  769. siblings = siblings.not_page(page)
  770. return not siblings.filter(slug=slug).exists()
  771. def _get_autogenerated_slug(self, base_slug):
  772. candidate_slug = base_slug
  773. suffix = 1
  774. parent_page = self.get_parent()
  775. while not Page._slug_is_available(candidate_slug, parent_page, self):
  776. # try with incrementing suffix until we find a slug which is available
  777. suffix += 1
  778. candidate_slug = "%s-%d" % (base_slug, suffix)
  779. return candidate_slug
  780. def get_default_locale(self):
  781. """
  782. Finds the default locale to use for this page.
  783. This will be called just before the initial save.
  784. """
  785. parent = self.get_parent()
  786. if parent is not None:
  787. return (
  788. parent.specific_class.objects.defer().select_related("locale")
  789. .get(id=parent.id)
  790. .locale
  791. )
  792. return super().get_default_locale()
  793. def full_clean(self, *args, **kwargs):
  794. # Apply fixups that need to happen before per-field validation occurs
  795. if not self.slug:
  796. # Try to auto-populate slug from title
  797. allow_unicode = getattr(settings, 'WAGTAIL_ALLOW_UNICODE_SLUGS', True)
  798. base_slug = slugify(self.title, allow_unicode=allow_unicode)
  799. # only proceed if we get a non-empty base slug back from slugify
  800. if base_slug:
  801. self.slug = self._get_autogenerated_slug(base_slug)
  802. if not self.draft_title:
  803. self.draft_title = self.title
  804. # Set the locale
  805. if self.locale_id is None:
  806. self.locale = self.get_default_locale()
  807. super().full_clean(*args, **kwargs)
  808. def clean(self):
  809. super().clean()
  810. if not Page._slug_is_available(self.slug, self.get_parent(), self):
  811. raise ValidationError({'slug': _("This slug is already in use")})
  812. def is_site_root(self):
  813. """
  814. Returns True if this page is the root of any site.
  815. This includes translations of site root pages as well.
  816. """
  817. return Site.objects.filter(root_page__translation_key=self.translation_key).exists()
  818. @transaction.atomic
  819. # ensure that changes are only committed when we have updated all descendant URL paths, to preserve consistency
  820. def save(self, clean=True, user=None, log_action=False, **kwargs):
  821. """
  822. Overrides default method behaviour to make additional updates unique to pages,
  823. such as updating the ``url_path`` value of descendant page to reflect changes
  824. to this page's slug.
  825. New pages should generally be saved via the ``add_child()`` or ``add_sibling()``
  826. method of an existing page, which will correctly set the ``path`` and ``depth``
  827. fields on the new page before saving it.
  828. By default, pages are validated using ``full_clean()`` before attempting to
  829. save changes to the database, which helps to preserve validity when restoring
  830. pages from historic revisions (which might not necessarily reflect the current
  831. model state). This validation step can be bypassed by calling the method with
  832. ``clean=False``.
  833. """
  834. if clean:
  835. self.full_clean()
  836. update_descendant_url_paths = False
  837. is_new = self.id is None
  838. if is_new:
  839. # we are creating a record. If we're doing things properly, this should happen
  840. # through a treebeard method like add_child, in which case the 'path' field
  841. # has been set and so we can safely call get_parent
  842. self.set_url_path(self.get_parent())
  843. else:
  844. # Check that we are committing the slug to the database
  845. # Basically: If update_fields has been specified, and slug is not included, skip this step
  846. if not ('update_fields' in kwargs and 'slug' not in kwargs['update_fields']):
  847. # see if the slug has changed from the record in the db, in which case we need to
  848. # update url_path of self and all descendants
  849. old_record = Page.objects.get(id=self.id)
  850. if old_record.slug != self.slug:
  851. self.set_url_path(self.get_parent())
  852. update_descendant_url_paths = True
  853. old_url_path = old_record.url_path
  854. new_url_path = self.url_path
  855. result = super().save(**kwargs)
  856. if not is_new and update_descendant_url_paths:
  857. self._update_descendant_url_paths(old_url_path, new_url_path)
  858. # Check if this is a root page of any sites and clear the 'wagtail_site_root_paths' key if so
  859. # Note: New translations of existing site roots are considered site roots as well, so we must
  860. # always check if this page is a site root, even if it's new.
  861. if self.is_site_root():
  862. cache.delete('wagtail_site_root_paths')
  863. # Log
  864. if is_new:
  865. cls = type(self)
  866. logger.info(
  867. "Page created: \"%s\" id=%d content_type=%s.%s path=%s",
  868. self.title,
  869. self.id,
  870. cls._meta.app_label,
  871. cls.__name__,
  872. self.url_path
  873. )
  874. if log_action is not None:
  875. # The default for log_action is False. i.e. don't log unless specifically instructed
  876. # Page creation is a special case that we want logged by default, but allow skipping it
  877. # explicitly by passing log_action=None
  878. if is_new:
  879. PageLogEntry.objects.log_action(
  880. instance=self,
  881. action='wagtail.create',
  882. user=user or self.owner,
  883. content_changed=True,
  884. )
  885. elif log_action:
  886. PageLogEntry.objects.log_action(
  887. instance=self,
  888. action=log_action,
  889. user=user
  890. )
  891. return result
  892. def delete(self, *args, **kwargs):
  893. # Ensure that deletion always happens on an instance of Page, not a specific subclass. This
  894. # works around a bug in treebeard <= 3.0 where calling SpecificPage.delete() fails to delete
  895. # child pages that are not instances of SpecificPage
  896. if type(self) is Page:
  897. user = kwargs.pop('user', None)
  898. def log_deletion(page, user):
  899. PageLogEntry.objects.log_action(
  900. instance=page,
  901. action='wagtail.delete',
  902. user=user,
  903. deleted=True,
  904. )
  905. if self.get_children().exists():
  906. for child in self.get_children():
  907. log_deletion(child.specific, user)
  908. log_deletion(self.specific, user)
  909. # this is a Page instance, so carry on as we were
  910. return super().delete(*args, **kwargs)
  911. else:
  912. # retrieve an actual Page instance and delete that instead of self
  913. return Page.objects.get(id=self.id).delete(*args, **kwargs)
  914. @classmethod
  915. def check(cls, **kwargs):
  916. errors = super(Page, cls).check(**kwargs)
  917. # Check that foreign keys from pages are not configured to cascade
  918. # This is the default Django behaviour which must be explicitly overridden
  919. # to prevent pages disappearing unexpectedly and the tree being corrupted
  920. # get names of foreign keys pointing to parent classes (such as page_ptr)
  921. field_exceptions = [field.name
  922. for model in [cls] + list(cls._meta.get_parent_list())
  923. for field in model._meta.parents.values() if field]
  924. for field in cls._meta.fields:
  925. if isinstance(field, models.ForeignKey) and field.name not in field_exceptions:
  926. if field.remote_field.on_delete == models.CASCADE:
  927. errors.append(
  928. checks.Warning(
  929. "Field hasn't specified on_delete action",
  930. hint="Set on_delete=models.SET_NULL and make sure the field is nullable or set on_delete=models.PROTECT. Wagtail does not allow simple database CASCADE because it will corrupt its tree storage.",
  931. obj=field,
  932. id='wagtailcore.W001',
  933. )
  934. )
  935. if not isinstance(cls.objects, PageManager):
  936. errors.append(
  937. checks.Error(
  938. "Manager does not inherit from PageManager",
  939. hint="Ensure that custom Page managers inherit from wagtail.core.models.PageManager",
  940. obj=cls,
  941. id='wagtailcore.E002',
  942. )
  943. )
  944. try:
  945. cls.clean_subpage_models()
  946. except (ValueError, LookupError) as e:
  947. errors.append(
  948. checks.Error(
  949. "Invalid subpage_types setting for %s" % cls,
  950. hint=str(e),
  951. id='wagtailcore.E002'
  952. )
  953. )
  954. try:
  955. cls.clean_parent_page_models()
  956. except (ValueError, LookupError) as e:
  957. errors.append(
  958. checks.Error(
  959. "Invalid parent_page_types setting for %s" % cls,
  960. hint=str(e),
  961. id='wagtailcore.E002'
  962. )
  963. )
  964. return errors
  965. def _update_descendant_url_paths(self, old_url_path, new_url_path):
  966. (
  967. Page.objects
  968. .filter(path__startswith=self.path)
  969. .exclude(pk=self.pk)
  970. .update(
  971. url_path=Concat(
  972. Value(new_url_path),
  973. Substr('url_path', len(old_url_path) + 1)
  974. )
  975. )
  976. )
  977. def get_specific(self, deferred=False, copy_attrs=None, copy_attrs_exclude=None):
  978. """
  979. .. versionadded:: 2.12
  980. Return this page in its most specific subclassed form.
  981. .. versionchanged:: 2.13
  982. * When ``copy_attrs`` is not supplied, all known non-field attribute
  983. values are copied to the returned object. Previously, no non-field
  984. values would be copied.
  985. * The ``copy_attrs_exclude`` option was added.
  986. By default, a database query is made to fetch all field values for the
  987. specific object. If you only require access to custom methods or other
  988. non-field attributes on the specific object, you can use
  989. ``deferred=True`` to avoid this query. However, any attempts to access
  990. specific field values from the returned object will trigger additional
  991. database queries.
  992. By default, references to all non-field attribute values are copied
  993. from current object to the returned one. This includes:
  994. * Values set by a queryset, for example: annotations, or values set as
  995. a result of using ``select_related()`` or ``prefetch_related()``.
  996. * Any ``cached_property`` values that have been evaluated.
  997. * Attributes set elsewhere in Python code.
  998. For fine-grained control over which non-field values are copied to the
  999. returned object, you can use ``copy_attrs`` to specify a complete list
  1000. of attribute names to include. Alternatively, you can use
  1001. ``copy_attrs_exclude`` to specify a list of attribute names to exclude.
  1002. If called on a page object that is already an instance of the most
  1003. specific class (e.g. an ``EventPage``), the object will be returned
  1004. as is, and no database queries or other operations will be triggered.
  1005. If the page was originally created using a page type that has since
  1006. been removed from the codebase, a generic ``Page`` object will be
  1007. returned (without any custom field values or other functionality
  1008. present on the original class). Usually, deleting these pages is the
  1009. best course of action, but there is currently no safe way for Wagtail
  1010. to do that at migration time.
  1011. """
  1012. model_class = self.specific_class
  1013. if model_class is None:
  1014. # The codebase and database are out of sync (e.g. the model exists
  1015. # on a different git branch and migrations were not applied or
  1016. # reverted before switching branches). So, the best we can do is
  1017. # return the page in it's current form.
  1018. return self
  1019. if isinstance(self, model_class):
  1020. # self is already the an instance of the most specific class
  1021. return self
  1022. if deferred:
  1023. # Generate a tuple of values in the order expected by __init__(),
  1024. # with missing values substituted with DEFERRED ()
  1025. values = tuple(
  1026. getattr(self, f.attname, self.pk if f.primary_key else DEFERRED)
  1027. for f in model_class._meta.concrete_fields
  1028. )
  1029. # Create object from known attribute values
  1030. specific_obj = model_class(*values)
  1031. specific_obj._state.adding = self._state.adding
  1032. else:
  1033. # Fetch object from database
  1034. specific_obj = model_class._default_manager.get(id=self.id)
  1035. # Copy non-field attribute values
  1036. if copy_attrs is not None:
  1037. for attr in (attr for attr in copy_attrs if attr in self.__dict__):
  1038. setattr(specific_obj, attr, getattr(self, attr))
  1039. else:
  1040. exclude = copy_attrs_exclude or ()
  1041. for k, v in (
  1042. (k, v) for k, v in self.__dict__.items()
  1043. if k not in exclude
  1044. ):
  1045. # only set values that haven't already been set
  1046. specific_obj.__dict__.setdefault(k, v)
  1047. return specific_obj
  1048. @cached_property
  1049. def specific(self):
  1050. """
  1051. Returns this page in its most specific subclassed form with all field
  1052. values fetched from the database. The result is cached in memory.
  1053. """
  1054. return self.get_specific()
  1055. @cached_property
  1056. def specific_deferred(self):
  1057. """
  1058. .. versionadded:: 2.12
  1059. Returns this page in its most specific subclassed form without any
  1060. additional field values being fetched from the database. The result
  1061. is cached in memory.
  1062. """
  1063. return self.get_specific(deferred=True)
  1064. @cached_property
  1065. def specific_class(self):
  1066. """
  1067. Return the class that this page would be if instantiated in its
  1068. most specific form.
  1069. If the model class can no longer be found in the codebase, and the
  1070. relevant ``ContentType`` has been removed by a database migration,
  1071. the return value will be ``None``.
  1072. If the model class can no longer be found in the codebase, but the
  1073. relevant ``ContentType`` is still present in the database (usually a
  1074. result of switching between git branches without running or reverting
  1075. database migrations beforehand), the return value will be ``None``.
  1076. """
  1077. return self.cached_content_type.model_class()
  1078. @property
  1079. def cached_content_type(self):
  1080. """
  1081. .. versionadded:: 2.10
  1082. Return this page's ``content_type`` value from the ``ContentType``
  1083. model's cached manager, which will avoid a database query if the
  1084. object is already in memory.
  1085. """
  1086. return ContentType.objects.get_for_id(self.content_type_id)
  1087. @property
  1088. def localized_draft(self):
  1089. """
  1090. Finds the translation in the current active language.
  1091. If there is no translation in the active language, self is returned.
  1092. Note: This will return translations that are in draft. If you want to exclude
  1093. these, use the ``.localized`` attribute.
  1094. """
  1095. try:
  1096. locale = Locale.get_active()
  1097. except (LookupError, Locale.DoesNotExist):
  1098. return self
  1099. if locale.id == self.locale_id:
  1100. return self
  1101. return self.get_translation_or_none(locale) or self
  1102. @property
  1103. def localized(self):
  1104. """
  1105. Finds the translation in the current active language.
  1106. If there is no translation in the active language, self is returned.
  1107. Note: This will not return the translation if it is in draft.
  1108. If you want to include drafts, use the ``.localized_draft`` attribute instead.
  1109. """
  1110. localized = self.localized_draft
  1111. if not localized.live:
  1112. return self
  1113. return localized
  1114. def route(self, request, path_components):
  1115. if path_components:
  1116. # request is for a child of this page
  1117. child_slug = path_components[0]
  1118. remaining_components = path_components[1:]
  1119. try:
  1120. subpage = self.get_children().get(slug=child_slug)
  1121. except Page.DoesNotExist:
  1122. raise Http404
  1123. return subpage.specific.route(request, remaining_components)
  1124. else:
  1125. # request is for this very page
  1126. if self.live:
  1127. return RouteResult(self)
  1128. else:
  1129. raise Http404
  1130. def get_admin_display_title(self):
  1131. """
  1132. Return the title for this page as it should appear in the admin backend;
  1133. override this if you wish to display extra contextual information about the page,
  1134. such as language. By default, returns ``draft_title``.
  1135. """
  1136. # Fall back on title if draft_title is blank (which may happen if the page was created
  1137. # in a fixture or migration that didn't explicitly handle draft_title)
  1138. return self.draft_title or self.title
  1139. def save_revision(self, user=None, submitted_for_moderation=False, approved_go_live_at=None, changed=True,
  1140. log_action=False, previous_revision=None, clean=True):
  1141. """
  1142. Creates and saves a page revision.
  1143. :param user: the user performing the action
  1144. :param submitted_for_moderation: indicates whether the page was submitted for moderation
  1145. :param approved_go_live_at: the date and time the revision is approved to go live
  1146. :param changed: indicates whether there were any content changes
  1147. :param log_action: flag for logging the action. Pass False to skip logging. Can be passed an action string.
  1148. Defaults to 'wagtail.edit' when no 'previous_revision' param is passed, otherwise 'wagtail.revert'
  1149. :param previous_revision: indicates a revision reversal. Should be set to the previous revision instance
  1150. :param clean: Set this to False to skip cleaning page content before saving this revision
  1151. :return: the newly created revision
  1152. """
  1153. # Raise an error if this page is an alias.
  1154. if self.alias_of_id:
  1155. raise RuntimeError(
  1156. "save_revision() was called on an alias page. "
  1157. "Revisions are not required for alias pages as they are an exact copy of another page."
  1158. )
  1159. if clean:
  1160. self.full_clean()
  1161. # Create revision
  1162. revision = self.revisions.create(
  1163. content_json=self.to_json(),
  1164. user=user,
  1165. submitted_for_moderation=submitted_for_moderation,
  1166. approved_go_live_at=approved_go_live_at,
  1167. )
  1168. update_fields = []
  1169. self.latest_revision_created_at = revision.created_at
  1170. update_fields.append('latest_revision_created_at')
  1171. self.draft_title = self.title
  1172. update_fields.append('draft_title')
  1173. if changed:
  1174. self.has_unpublished_changes = True
  1175. update_fields.append('has_unpublished_changes')
  1176. if update_fields:
  1177. # clean=False because the fields we're updating don't need validation
  1178. self.save(update_fields=update_fields, clean=False)
  1179. # Log
  1180. logger.info("Page edited: \"%s\" id=%d revision_id=%d", self.title, self.id, revision.id)
  1181. if log_action:
  1182. if not previous_revision:
  1183. PageLogEntry.objects.log_action(
  1184. instance=self,
  1185. action=log_action if isinstance(log_action, str) else 'wagtail.edit',
  1186. user=user,
  1187. revision=revision,
  1188. content_changed=changed,
  1189. )
  1190. else:
  1191. PageLogEntry.objects.log_action(
  1192. instance=self,
  1193. action=log_action if isinstance(log_action, str) else 'wagtail.revert',
  1194. user=user,
  1195. data={
  1196. 'revision': {
  1197. 'id': previous_revision.id,
  1198. 'created': previous_revision.created_at.strftime("%d %b %Y %H:%M")
  1199. }
  1200. },
  1201. revision=revision,
  1202. content_changed=changed,
  1203. )
  1204. if submitted_for_moderation:
  1205. logger.info("Page submitted for moderation: \"%s\" id=%d revision_id=%d", self.title, self.id, revision.id)
  1206. return revision
  1207. def get_latest_revision(self):
  1208. return self.revisions.order_by('-created_at', '-id').first()
  1209. def get_latest_revision_as_page(self):
  1210. if not self.has_unpublished_changes:
  1211. # Use the live database copy in preference to the revision record, as:
  1212. # 1) this will pick up any changes that have been made directly to the model,
  1213. # such as automated data imports;
  1214. # 2) it ensures that inline child objects pick up real database IDs even if
  1215. # those are absent from the revision data. (If this wasn't the case, the child
  1216. # objects would be recreated with new IDs on next publish - see #1853)
  1217. return self.specific
  1218. latest_revision = self.get_latest_revision()
  1219. if latest_revision:
  1220. return latest_revision.as_page_object()
  1221. else:
  1222. return self.specific
  1223. def update_aliases(self, *, revision=None, user=None, _content_json=None, _updated_ids=None):
  1224. """
  1225. Publishes all aliases that follow this page with the latest content from this page.
  1226. This is called by Wagtail whenever a page with aliases is published.
  1227. :param revision: The revision of the original page that we are updating to (used for logging purposes)
  1228. :type revision: PageRevision, optional
  1229. :param user: The user who is publishing (used for logging purposes)
  1230. :type user: User, optional
  1231. """
  1232. specific_self = self.specific
  1233. # Only compute this if necessary since it's quite a heavy operation
  1234. if _content_json is None:
  1235. _content_json = self.to_json()
  1236. # A list of IDs that have already been updated. This is just in case someone has
  1237. # created an alias loop (which is impossible to do with the UI Wagtail provides)
  1238. _updated_ids = _updated_ids or []
  1239. for alias in self.specific_class.objects.filter(alias_of=self).exclude(id__in=_updated_ids):
  1240. # FIXME: Switch to the same fields that are excluded from copy
  1241. # We can't do this right now because we can't exclude fields from with_content_json
  1242. exclude_fields = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries']
  1243. # Copy field content
  1244. alias_updated = alias.with_content_json(_content_json)
  1245. # Publish the alias if it's currently in draft
  1246. alias_updated.live = True
  1247. alias_updated.has_unpublished_changes = False
  1248. # Copy child relations
  1249. child_object_map = specific_self.copy_all_child_relations(target=alias_updated, exclude=exclude_fields)
  1250. # Process child objects
  1251. # This has two jobs:
  1252. # - If the alias is in a different locale, this updates the
  1253. # locale of any translatable child objects to match
  1254. # - If the alias is not a translation of the original, this
  1255. # changes the translation_key field of all child objects
  1256. # so they do not clash
  1257. if child_object_map:
  1258. alias_is_translation = alias.translation_key == self.translation_key
  1259. def process_child_object(child_object):
  1260. if isinstance(child_object, TranslatableMixin):
  1261. # Child object's locale must always match the page
  1262. child_object.locale = alias_updated.locale
  1263. # If the alias isn't a translation of the original page,
  1264. # change the child object's translation_keys so they are
  1265. # not either
  1266. if not alias_is_translation:
  1267. child_object.translation_key = uuid.uuid4()
  1268. for (rel, previous_id), child_objects in child_object_map.items():
  1269. if previous_id is None:
  1270. for child_object in child_objects:
  1271. process_child_object(child_object)
  1272. else:
  1273. process_child_object(child_objects)
  1274. # Copy M2M relations
  1275. _copy_m2m_relations(specific_self, alias_updated, exclude_fields=exclude_fields)
  1276. # Don't change the aliases slug
  1277. # Aliases can have their own slugs so they can be siblings of the original
  1278. alias_updated.slug = alias.slug
  1279. alias_updated.set_url_path(alias_updated.get_parent())
  1280. # Aliases don't have revisions, so update fields that would normally be updated by save_revision
  1281. alias_updated.draft_title = alias_updated.title
  1282. alias_updated.latest_revision_created_at = self.latest_revision_created_at
  1283. alias_updated.save(clean=False)
  1284. page_published.send(sender=alias_updated.specific_class, instance=alias_updated, revision=revision, alias=True)
  1285. # Log the publish of the alias
  1286. PageLogEntry.objects.log_action(
  1287. instance=alias_updated,
  1288. action='wagtail.publish',
  1289. user=user,
  1290. )
  1291. # Update any aliases of that alias
  1292. # Design note:
  1293. # It could be argued that this will be faster if we just changed these alias-of-alias
  1294. # pages to all point to the original page and avoid having to update them recursively.
  1295. #
  1296. # But, it's useful to have a record of how aliases have been chained.
  1297. # For example, In Wagtail Localize, we use aliases to create mirrored trees, but those
  1298. # trees themselves could have aliases within them. If an alias within a tree is
  1299. # converted to a regular page, we want the alias in the mirrored tree to follow that
  1300. # new page and stop receiving updates from the original page.
  1301. #
  1302. # Doing it this way requires an extra lookup query per alias but this is small in
  1303. # comparison to the work required to update the alias.
  1304. alias.update_aliases(revision=revision, _content_json=_content_json, _updated_ids=_updated_ids)
  1305. update_aliases.alters_data = True
  1306. def unpublish(self, set_expired=False, commit=True, user=None, log_action=True):
  1307. """
  1308. Unpublish the page by setting ``live`` to ``False``. Does nothing if ``live`` is already ``False``
  1309. :param log_action: flag for logging the action. Pass False to skip logging. Can be passed an action string.
  1310. Defaults to 'wagtail.unpublish'
  1311. """
  1312. if self.live:
  1313. self.live = False
  1314. self.has_unpublished_changes = True
  1315. self.live_revision = None
  1316. if set_expired:
  1317. self.expired = True
  1318. if commit:
  1319. # using clean=False to bypass validation
  1320. self.save(clean=False)
  1321. page_unpublished.send(sender=self.specific_class, instance=self.specific)
  1322. if log_action:
  1323. PageLogEntry.objects.log_action(
  1324. instance=self,
  1325. action=log_action if isinstance(log_action, str) else 'wagtail.unpublish',
  1326. user=user,
  1327. )
  1328. logger.info("Page unpublished: \"%s\" id=%d", self.title, self.id)
  1329. self.revisions.update(approved_go_live_at=None)
  1330. # Unpublish aliases
  1331. for alias in self.aliases.all():
  1332. alias.unpublish()
  1333. context_object_name = None
  1334. def get_context(self, request, *args, **kwargs):
  1335. context = {
  1336. PAGE_TEMPLATE_VAR: self,
  1337. 'self': self,
  1338. 'request': request,
  1339. }
  1340. if self.context_object_name:
  1341. context[self.context_object_name] = self
  1342. return context
  1343. def get_template(self, request, *args, **kwargs):
  1344. if request.is_ajax():
  1345. return self.ajax_template or self.template
  1346. else:
  1347. return self.template
  1348. def serve(self, request, *args, **kwargs):
  1349. request.is_preview = getattr(request, 'is_preview', False)
  1350. return TemplateResponse(
  1351. request,
  1352. self.get_template(request, *args, **kwargs),
  1353. self.get_context(request, *args, **kwargs)
  1354. )
  1355. def is_navigable(self):
  1356. """
  1357. Return true if it's meaningful to browse subpages of this page -
  1358. i.e. it currently has subpages,
  1359. or it's at the top level (this rule necessary for empty out-of-the-box sites to have working navigation)
  1360. """
  1361. return (not self.is_leaf()) or self.depth == 2
  1362. def _get_site_root_paths(self, request=None):
  1363. """
  1364. Return ``Site.get_site_root_paths()``, using the cached copy on the
  1365. request object if available.
  1366. """
  1367. # if we have a request, use that to cache site_root_paths; otherwise, use self
  1368. cache_object = request if request else self
  1369. try:
  1370. return cache_object._wagtail_cached_site_root_paths
  1371. except AttributeError:
  1372. cache_object._wagtail_cached_site_root_paths = Site.get_site_root_paths()
  1373. return cache_object._wagtail_cached_site_root_paths
  1374. def get_url_parts(self, request=None):
  1375. """
  1376. Determine the URL for this page and return it as a tuple of
  1377. ``(site_id, site_root_url, page_url_relative_to_site_root)``.
  1378. Return None if the page is not routable.
  1379. This is used internally by the ``full_url``, ``url``, ``relative_url``
  1380. and ``get_site`` properties and methods; pages with custom URL routing
  1381. should override this method in order to have those operations return
  1382. the custom URLs.
  1383. Accepts an optional keyword argument ``request``, which may be used
  1384. to avoid repeated database / cache lookups. Typically, a page model
  1385. that overrides ``get_url_parts`` should not need to deal with
  1386. ``request`` directly, and should just pass it to the original method
  1387. when calling ``super``.
  1388. """
  1389. possible_sites = [
  1390. (pk, path, url, language_code)
  1391. for pk, path, url, language_code in self._get_site_root_paths(request)
  1392. if self.url_path.startswith(path)
  1393. ]
  1394. if not possible_sites:
  1395. return None
  1396. site_id, root_path, root_url, language_code = possible_sites[0]
  1397. site = Site.find_for_request(request)
  1398. if site:
  1399. for site_id, root_path, root_url, language_code in possible_sites:
  1400. if site_id == site.pk:
  1401. break
  1402. else:
  1403. site_id, root_path, root_url, language_code = possible_sites[0]
  1404. use_wagtail_i18n = getattr(settings, 'WAGTAIL_I18N_ENABLED', False)
  1405. if use_wagtail_i18n:
  1406. # If the active language code is a variant of the page's language, then
  1407. # use that instead
  1408. # This is used when LANGUAGES contain more languages than WAGTAIL_CONTENT_LANGUAGES
  1409. try:
  1410. if get_supported_content_language_variant(translation.get_language()) == language_code:
  1411. language_code = translation.get_language()
  1412. except LookupError:
  1413. # active language code is not a recognised content language, so leave
  1414. # page's language code unchanged
  1415. pass
  1416. # The page may not be routable because wagtail_serve is not registered
  1417. # This may be the case if Wagtail is used headless
  1418. try:
  1419. if use_wagtail_i18n:
  1420. with translation.override(language_code):
  1421. page_path = reverse(
  1422. 'wagtail_serve', args=(self.url_path[len(root_path):],))
  1423. else:
  1424. page_path = reverse(
  1425. 'wagtail_serve', args=(self.url_path[len(root_path):],))
  1426. except NoReverseMatch:
  1427. return (site_id, None, None)
  1428. # Remove the trailing slash from the URL reverse generates if
  1429. # WAGTAIL_APPEND_SLASH is False and we're not trying to serve
  1430. # the root path
  1431. if not WAGTAIL_APPEND_SLASH and page_path != '/':
  1432. page_path = page_path.rstrip('/')
  1433. return (site_id, root_url, page_path)
  1434. def get_full_url(self, request=None):
  1435. """Return the full URL (including protocol / domain) to this page, or None if it is not routable"""
  1436. url_parts = self.get_url_parts(request=request)
  1437. if url_parts is None or url_parts[1] is None and url_parts[2] is None:
  1438. # page is not routable
  1439. return
  1440. site_id, root_url, page_path = url_parts
  1441. return root_url + page_path
  1442. full_url = property(get_full_url)
  1443. def get_url(self, request=None, current_site=None):
  1444. """
  1445. Return the 'most appropriate' URL for referring to this page from the pages we serve,
  1446. within the Wagtail backend and actual website templates;
  1447. this is the local URL (starting with '/') if we're only running a single site
  1448. (i.e. we know that whatever the current page is being served from, this link will be on the
  1449. same domain), and the full URL (with domain) if not.
  1450. Return None if the page is not routable.
  1451. Accepts an optional but recommended ``request`` keyword argument that, if provided, will
  1452. be used to cache site-level URL information (thereby avoiding repeated database / cache
  1453. lookups) and, via the ``Site.find_for_request()`` function, determine whether a relative
  1454. or full URL is most appropriate.
  1455. """
  1456. # ``current_site`` is purposefully undocumented, as one can simply pass the request and get
  1457. # a relative URL based on ``Site.find_for_request()``. Nonetheless, support it here to avoid
  1458. # copy/pasting the code to the ``relative_url`` method below.
  1459. if current_site is None and request is not None:
  1460. site = Site.find_for_request(request)
  1461. current_site = site
  1462. url_parts = self.get_url_parts(request=request)
  1463. if url_parts is None or url_parts[1] is None and url_parts[2] is None:
  1464. # page is not routable
  1465. return
  1466. site_id, root_url, page_path = url_parts
  1467. # Get number of unique sites in root paths
  1468. # Note: there may be more root paths to sites if there are multiple languages
  1469. num_sites = len(set(root_path[0] for root_path in self._get_site_root_paths(request)))
  1470. if (current_site is not None and site_id == current_site.id) or num_sites == 1:
  1471. # the site matches OR we're only running a single site, so a local URL is sufficient
  1472. return page_path
  1473. else:
  1474. return root_url + page_path
  1475. url = property(get_url)
  1476. def relative_url(self, current_site, request=None):
  1477. """
  1478. Return the 'most appropriate' URL for this page taking into account the site we're currently on;
  1479. a local URL if the site matches, or a fully qualified one otherwise.
  1480. Return None if the page is not routable.
  1481. Accepts an optional but recommended ``request`` keyword argument that, if provided, will
  1482. be used to cache site-level URL information (thereby avoiding repeated database / cache
  1483. lookups).
  1484. """
  1485. return self.get_url(request=request, current_site=current_site)
  1486. def get_site(self):
  1487. """
  1488. Return the Site object that this page belongs to.
  1489. """
  1490. url_parts = self.get_url_parts()
  1491. if url_parts is None:
  1492. # page is not routable
  1493. return
  1494. site_id, root_url, page_path = url_parts
  1495. return Site.objects.get(id=site_id)
  1496. @classmethod
  1497. def get_indexed_objects(cls):
  1498. content_type = ContentType.objects.get_for_model(cls)
  1499. return super(Page, cls).get_indexed_objects().filter(content_type=content_type)
  1500. def get_indexed_instance(self):
  1501. # This is accessed on save by the wagtailsearch signal handler, and in edge
  1502. # cases (e.g. loading test fixtures), may be called before the specific instance's
  1503. # entry has been created. In those cases, we aren't ready to be indexed yet, so
  1504. # return None.
  1505. try:
  1506. return self.specific
  1507. except self.specific_class.DoesNotExist:
  1508. return None
  1509. @classmethod
  1510. def clean_subpage_models(cls):
  1511. """
  1512. Returns the list of subpage types, normalised as model classes.
  1513. Throws ValueError if any entry in subpage_types cannot be recognised as a model name,
  1514. or LookupError if a model does not exist (or is not a Page subclass).
  1515. """
  1516. if cls._clean_subpage_models is None:
  1517. subpage_types = getattr(cls, 'subpage_types', None)
  1518. if subpage_types is None:
  1519. # if subpage_types is not specified on the Page class, allow all page types as subpages
  1520. cls._clean_subpage_models = get_page_models()
  1521. else:
  1522. cls._clean_subpage_models = [
  1523. resolve_model_string(model_string, cls._meta.app_label)
  1524. for model_string in subpage_types
  1525. ]
  1526. for model in cls._clean_subpage_models:
  1527. if not issubclass(model, Page):
  1528. raise LookupError("%s is not a Page subclass" % model)
  1529. return cls._clean_subpage_models
  1530. @classmethod
  1531. def clean_parent_page_models(cls):
  1532. """
  1533. Returns the list of parent page types, normalised as model classes.
  1534. Throws ValueError if any entry in parent_page_types cannot be recognised as a model name,
  1535. or LookupError if a model does not exist (or is not a Page subclass).
  1536. """
  1537. if cls._clean_parent_page_models is None:
  1538. parent_page_types = getattr(cls, 'parent_page_types', None)
  1539. if parent_page_types is None:
  1540. # if parent_page_types is not specified on the Page class, allow all page types as subpages
  1541. cls._clean_parent_page_models = get_page_models()
  1542. else:
  1543. cls._clean_parent_page_models = [
  1544. resolve_model_string(model_string, cls._meta.app_label)
  1545. for model_string in parent_page_types
  1546. ]
  1547. for model in cls._clean_parent_page_models:
  1548. if not issubclass(model, Page):
  1549. raise LookupError("%s is not a Page subclass" % model)
  1550. return cls._clean_parent_page_models
  1551. @classmethod
  1552. def allowed_parent_page_models(cls):
  1553. """
  1554. Returns the list of page types that this page type can be a subpage of,
  1555. as a list of model classes
  1556. """
  1557. return [
  1558. parent_model for parent_model in cls.clean_parent_page_models()
  1559. if cls in parent_model.clean_subpage_models()
  1560. ]
  1561. @classmethod
  1562. def allowed_subpage_models(cls):
  1563. """
  1564. Returns the list of page types that this page type can have as subpages,
  1565. as a list of model classes
  1566. """
  1567. return [
  1568. subpage_model for subpage_model in cls.clean_subpage_models()
  1569. if cls in subpage_model.clean_parent_page_models()
  1570. ]
  1571. @classmethod
  1572. def creatable_subpage_models(cls):
  1573. """
  1574. Returns the list of page types that may be created under this page type,
  1575. as a list of model classes
  1576. """
  1577. return [
  1578. page_model for page_model in cls.allowed_subpage_models()
  1579. if page_model.is_creatable
  1580. ]
  1581. @classmethod
  1582. def can_exist_under(cls, parent):
  1583. """
  1584. Checks if this page type can exist as a subpage under a parent page
  1585. instance.
  1586. See also: :func:`Page.can_create_at` and :func:`Page.can_move_to`
  1587. """
  1588. return cls in parent.specific_class.allowed_subpage_models()
  1589. @classmethod
  1590. def can_create_at(cls, parent):
  1591. """
  1592. Checks if this page type can be created as a subpage under a parent
  1593. page instance.
  1594. """
  1595. can_create = cls.is_creatable and cls.can_exist_under(parent)
  1596. if cls.max_count is not None:
  1597. can_create = can_create and cls.objects.count() < cls.max_count
  1598. if cls.max_count_per_parent is not None:
  1599. can_create = can_create and parent.get_children().type(cls).count() < cls.max_count_per_parent
  1600. return can_create
  1601. def can_move_to(self, parent):
  1602. """
  1603. Checks if this page instance can be moved to be a subpage of a parent
  1604. page instance.
  1605. """
  1606. # Prevent pages from being moved to different language sections
  1607. # The only page that can have multi-lingual children is the root page
  1608. parent_is_root = parent.depth == 1
  1609. if not parent_is_root and parent.locale_id != self.locale_id:
  1610. return False
  1611. return self.can_exist_under(parent)
  1612. @classmethod
  1613. def get_verbose_name(cls):
  1614. """
  1615. Returns the human-readable "verbose name" of this page model e.g "Blog page".
  1616. """
  1617. # This is similar to doing cls._meta.verbose_name.title()
  1618. # except this doesn't convert any characters to lowercase
  1619. return capfirst(cls._meta.verbose_name)
  1620. @property
  1621. def status_string(self):
  1622. if not self.live:
  1623. if self.expired:
  1624. return _("expired")
  1625. elif self.approved_schedule:
  1626. return _("scheduled")
  1627. elif self.workflow_in_progress:
  1628. return _("in moderation")
  1629. else:
  1630. return _("draft")
  1631. else:
  1632. if self.approved_schedule:
  1633. return _("live + scheduled")
  1634. elif self.workflow_in_progress:
  1635. return _("live + in moderation")
  1636. elif self.has_unpublished_changes:
  1637. return _("live + draft")
  1638. else:
  1639. return _("live")
  1640. @property
  1641. def approved_schedule(self):
  1642. return self.revisions.exclude(approved_go_live_at__isnull=True).exists()
  1643. def has_unpublished_subtree(self):
  1644. """
  1645. An awkwardly-defined flag used in determining whether unprivileged editors have
  1646. permission to delete this article. Returns true if and only if this page is non-live,
  1647. and it has no live children.
  1648. """
  1649. return (not self.live) and (not self.get_descendants().filter(live=True).exists())
  1650. def move(self, target, pos=None, user=None):
  1651. """
  1652. Extension to the treebeard 'move' method to ensure that url_path is updated,
  1653. and to emit a 'pre_page_move' and 'post_page_move' signals.
  1654. """
  1655. # Determine old and new parents
  1656. parent_before = self.get_parent()
  1657. if pos in ('first-child', 'last-child', 'sorted-child'):
  1658. parent_after = target
  1659. else:
  1660. parent_after = target.get_parent()
  1661. # Determine old and new url_paths
  1662. # Fetching new object to avoid affecting `self`
  1663. old_self = Page.objects.get(id=self.id)
  1664. old_url_path = old_self.url_path
  1665. new_url_path = old_self.set_url_path(parent=parent_after)
  1666. # Emit pre_page_move signal
  1667. pre_page_move.send(
  1668. sender=self.specific_class or self.__class__,
  1669. instance=self,
  1670. parent_page_before=parent_before,
  1671. parent_page_after=parent_after,
  1672. url_path_before=old_url_path,
  1673. url_path_after=new_url_path,
  1674. )
  1675. # Only commit when all descendants are properly updated
  1676. with transaction.atomic():
  1677. # Allow treebeard to update `path` values
  1678. super().move(target, pos=pos)
  1679. # Treebeard's move method doesn't actually update the in-memory instance,
  1680. # so we need to work with a freshly loaded one now
  1681. new_self = Page.objects.get(id=self.id)
  1682. new_self.url_path = new_url_path
  1683. new_self.save()
  1684. # Update descendant paths if url_path has changed
  1685. if old_url_path != new_url_path:
  1686. new_self._update_descendant_url_paths(old_url_path, new_url_path)
  1687. # Emit post_page_move signal
  1688. post_page_move.send(
  1689. sender=self.specific_class or self.__class__,
  1690. instance=new_self,
  1691. parent_page_before=parent_before,
  1692. parent_page_after=parent_after,
  1693. url_path_before=old_url_path,
  1694. url_path_after=new_url_path,
  1695. )
  1696. # Log
  1697. PageLogEntry.objects.log_action(
  1698. instance=self,
  1699. # Check if page was reordered (reordering doesn't change the parent)
  1700. action='wagtail.reorder' if parent_before.id == target.id else 'wagtail.move',
  1701. user=user,
  1702. data={
  1703. 'source': {
  1704. 'id': parent_before.id,
  1705. 'title': parent_before.specific_deferred.get_admin_display_title()
  1706. },
  1707. 'destination': {
  1708. 'id': parent_after.id,
  1709. 'title': parent_after.specific_deferred.get_admin_display_title()
  1710. }
  1711. }
  1712. )
  1713. logger.info("Page moved: \"%s\" id=%d path=%s", self.title, self.id, new_url_path)
  1714. def copy(self, recursive=False, to=None, update_attrs=None, copy_revisions=True, keep_live=True, user=None,
  1715. process_child_object=None, exclude_fields=None, log_action='wagtail.copy', reset_translation_key=True, _mpnode_attrs=None):
  1716. """
  1717. Copies a given page
  1718. :param log_action flag for logging the action. Pass None to skip logging.
  1719. Can be passed an action string. Defaults to 'wagtail.copy'
  1720. """
  1721. if self._state.adding:
  1722. raise RuntimeError('Page.copy() called on an unsaved page')
  1723. exclude_fields = self.default_exclude_fields_in_copy + self.exclude_fields_in_copy + (exclude_fields or [])
  1724. specific_self = self.specific
  1725. if keep_live:
  1726. base_update_attrs = {
  1727. 'alias_of': None,
  1728. }
  1729. else:
  1730. base_update_attrs = {
  1731. 'live': False,
  1732. 'has_unpublished_changes': True,
  1733. 'live_revision': None,
  1734. 'first_published_at': None,
  1735. 'last_published_at': None,
  1736. 'alias_of': None,
  1737. }
  1738. if user:
  1739. base_update_attrs['owner'] = user
  1740. # When we're not copying for translation, we should give the translation_key a new value
  1741. if reset_translation_key:
  1742. base_update_attrs['translation_key'] = uuid.uuid4()
  1743. if update_attrs:
  1744. base_update_attrs.update(update_attrs)
  1745. page_copy, child_object_map = _copy(specific_self, exclude_fields=exclude_fields, update_attrs=base_update_attrs)
  1746. # Save copied child objects and run process_child_object on them if we need to
  1747. for (child_relation, old_pk), child_object in child_object_map.items():
  1748. if process_child_object:
  1749. process_child_object(specific_self, page_copy, child_relation, child_object)
  1750. # When we're not copying for translation, we should give the translation_key a new value for each child object as well
  1751. if reset_translation_key and isinstance(child_object, TranslatableMixin):
  1752. child_object.translation_key = uuid.uuid4()
  1753. # Save the new page
  1754. if _mpnode_attrs:
  1755. # We've got a tree position already reserved. Perform a quick save
  1756. page_copy.path = _mpnode_attrs[0]
  1757. page_copy.depth = _mpnode_attrs[1]
  1758. page_copy.save(clean=False)
  1759. else:
  1760. if to:
  1761. if recursive and (to == self or to.is_descendant_of(self)):
  1762. raise Exception("You cannot copy a tree branch recursively into itself")
  1763. page_copy = to.add_child(instance=page_copy)
  1764. else:
  1765. page_copy = self.add_sibling(instance=page_copy)
  1766. _mpnode_attrs = (page_copy.path, page_copy.depth)
  1767. _copy_m2m_relations(specific_self, page_copy, exclude_fields=exclude_fields, update_attrs=base_update_attrs)
  1768. # Copy revisions
  1769. if copy_revisions:
  1770. for revision in self.revisions.all():
  1771. revision.pk = None
  1772. revision.submitted_for_moderation = False
  1773. revision.approved_go_live_at = None
  1774. revision.page = page_copy
  1775. # Update ID fields in content
  1776. revision_content = json.loads(revision.content_json)
  1777. revision_content['pk'] = page_copy.pk
  1778. for child_relation in get_all_child_relations(specific_self):
  1779. accessor_name = child_relation.get_accessor_name()
  1780. try:
  1781. child_objects = revision_content[accessor_name]
  1782. except KeyError:
  1783. # KeyErrors are possible if the revision was created
  1784. # before this child relation was added to the database
  1785. continue
  1786. for child_object in child_objects:
  1787. child_object[child_relation.field.name] = page_copy.pk
  1788. # Remap primary key to copied versions
  1789. # If the primary key is not recognised (eg, the child object has been deleted from the database)
  1790. # set the primary key to None
  1791. copied_child_object = child_object_map.get((child_relation, child_object['pk']))
  1792. child_object['pk'] = copied_child_object.pk if copied_child_object else None
  1793. revision.content_json = json.dumps(revision_content)
  1794. # Save
  1795. revision.save()
  1796. # Create a new revision
  1797. # This code serves a few purposes:
  1798. # * It makes sure update_attrs gets applied to the latest revision
  1799. # * It bumps the last_revision_created_at value so the new page gets ordered as if it was just created
  1800. # * It sets the user of the new revision so it's possible to see who copied the page by looking at its history
  1801. latest_revision = page_copy.get_latest_revision_as_page()
  1802. if update_attrs:
  1803. for field, value in update_attrs.items():
  1804. setattr(latest_revision, field, value)
  1805. latest_revision_as_page_revision = latest_revision.save_revision(user=user, changed=False, clean=False)
  1806. if keep_live:
  1807. page_copy.live_revision = latest_revision_as_page_revision
  1808. page_copy.last_published_at = latest_revision_as_page_revision.created_at
  1809. page_copy.first_published_at = latest_revision_as_page_revision.created_at
  1810. page_copy.save(clean=False)
  1811. if page_copy.live:
  1812. page_published.send(
  1813. sender=page_copy.specific_class, instance=page_copy,
  1814. revision=latest_revision_as_page_revision
  1815. )
  1816. # Log
  1817. if log_action:
  1818. parent = specific_self.get_parent()
  1819. PageLogEntry.objects.log_action(
  1820. instance=page_copy,
  1821. action=log_action,
  1822. user=user,
  1823. data={
  1824. 'page': {
  1825. 'id': page_copy.id,
  1826. 'title': page_copy.get_admin_display_title()
  1827. },
  1828. 'source': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None,
  1829. 'destination': {'id': to.id, 'title': to.specific_deferred.get_admin_display_title()} if to else None,
  1830. 'keep_live': page_copy.live and keep_live
  1831. },
  1832. )
  1833. if page_copy.live and keep_live:
  1834. # Log the publish if the use chose to keep the copied page live
  1835. PageLogEntry.objects.log_action(
  1836. instance=page_copy,
  1837. action='wagtail.publish',
  1838. user=user,
  1839. revision=latest_revision_as_page_revision,
  1840. )
  1841. logger.info("Page copied: \"%s\" id=%d from=%d", page_copy.title, page_copy.id, self.id)
  1842. # Copy child pages
  1843. if recursive:
  1844. numchild = 0
  1845. for child_page in self.get_children().specific():
  1846. newdepth = _mpnode_attrs[1] + 1
  1847. child_mpnode_attrs = (
  1848. Page._get_path(_mpnode_attrs[0], newdepth, numchild),
  1849. newdepth
  1850. )
  1851. numchild += 1
  1852. child_page.copy(
  1853. recursive=True,
  1854. to=page_copy,
  1855. copy_revisions=copy_revisions,
  1856. keep_live=keep_live,
  1857. user=user,
  1858. process_child_object=process_child_object,
  1859. _mpnode_attrs=child_mpnode_attrs
  1860. )
  1861. if numchild > 0:
  1862. page_copy.numchild = numchild
  1863. page_copy.save(clean=False, update_fields=['numchild'])
  1864. return page_copy
  1865. copy.alters_data = True
  1866. def create_alias(self, *, recursive=False, parent=None, update_slug=None, update_locale=None, user=None, log_action='wagtail.create_alias', reset_translation_key=True, _mpnode_attrs=None):
  1867. """
  1868. Creates an alias of the given page.
  1869. An alias is like a copy, but an alias remains in sync with the original page. They
  1870. are not directly editable and do not have revisions.
  1871. You can convert an alias into a regular page by setting the .alias_of attibute to None
  1872. and creating an initial revision.
  1873. :param recursive: create aliases of the page's subtree, defaults to False
  1874. :type recursive: boolean, optional
  1875. :param parent: The page to create the new alias under
  1876. :type parent: Page, optional
  1877. :param update_slug: The slug of the new alias page, defaults to the slug of the original page
  1878. :type update_slug: string, optional
  1879. :param update_locale: The locale of the new alias page, defaults to the locale of the original page
  1880. :type update_locale: Locale, optional
  1881. :param user: The user who is performing this action. This user would be assigned as the owner of the new page and appear in the audit log
  1882. :type user: User, optional
  1883. :param log_action: Override the log action with a custom one. or pass None to skip logging, defaults to 'wagtail.create_alias'
  1884. :type log_action: string or None, optional
  1885. :param reset_translation_key: Generate new translation_keys for the page and any translatable child objects, defaults to False
  1886. :type reset_translation_key: boolean, optional
  1887. """
  1888. specific_self = self.specific
  1889. # FIXME: Switch to the same fields that are excluded from copy
  1890. # We can't do this right now because we can't exclude fields from with_content_json
  1891. # which we use for updating aliases
  1892. exclude_fields = ['id', 'path', 'depth', 'numchild', 'url_path', 'path', 'index_entries']
  1893. update_attrs = {
  1894. 'alias_of': self,
  1895. # Aliases don't have revisions so the draft title should always match the live title
  1896. 'draft_title': self.title,
  1897. # Likewise, an alias page can't have unpublished changes if it's live
  1898. 'has_unpublished_changes': not self.live,
  1899. }
  1900. if update_slug:
  1901. update_attrs['slug'] = update_slug
  1902. if update_locale:
  1903. update_attrs['locale'] = update_locale
  1904. if user:
  1905. update_attrs['owner'] = user
  1906. # When we're not copying for translation, we should give the translation_key a new value
  1907. if reset_translation_key:
  1908. update_attrs['translation_key'] = uuid.uuid4()
  1909. alias, child_object_map = _copy(specific_self, update_attrs=update_attrs, exclude_fields=exclude_fields)
  1910. # Update any translatable child objects
  1911. for (child_relation, old_pk), child_object in child_object_map.items():
  1912. if isinstance(child_object, TranslatableMixin):
  1913. if update_locale:
  1914. child_object.locale = update_locale
  1915. # When we're not copying for translation, we should give the translation_key a new value for each child object as well
  1916. if reset_translation_key:
  1917. child_object.translation_key = uuid.uuid4()
  1918. # Save the new page
  1919. if _mpnode_attrs:
  1920. # We've got a tree position already reserved. Perform a quick save
  1921. alias.path = _mpnode_attrs[0]
  1922. alias.depth = _mpnode_attrs[1]
  1923. alias.save(clean=False)
  1924. else:
  1925. if parent:
  1926. if recursive and (parent == self or parent.is_descendant_of(self)):
  1927. raise Exception("You cannot copy a tree branch recursively into itself")
  1928. alias = parent.add_child(instance=alias)
  1929. else:
  1930. alias = self.add_sibling(instance=alias)
  1931. _mpnode_attrs = (alias.path, alias.depth)
  1932. _copy_m2m_relations(specific_self, alias, exclude_fields=exclude_fields)
  1933. # Log
  1934. if log_action:
  1935. source_parent = specific_self.get_parent()
  1936. PageLogEntry.objects.log_action(
  1937. instance=alias,
  1938. action=log_action,
  1939. user=user,
  1940. data={
  1941. 'page': {
  1942. 'id': alias.id,
  1943. 'title': alias.get_admin_display_title()
  1944. },
  1945. 'source': {'id': source_parent.id, 'title': source_parent.specific_deferred.get_admin_display_title()} if source_parent else None,
  1946. 'destination': {'id': parent.id, 'title': parent.specific_deferred.get_admin_display_title()} if parent else None,
  1947. },
  1948. )
  1949. if alias.live:
  1950. # Log the publish
  1951. PageLogEntry.objects.log_action(
  1952. instance=alias,
  1953. action='wagtail.publish',
  1954. user=user,
  1955. )
  1956. logger.info("Page alias created: \"%s\" id=%d from=%d", alias.title, alias.id, self.id)
  1957. # Copy child pages
  1958. if recursive:
  1959. numchild = 0
  1960. for child_page in self.get_children().specific():
  1961. newdepth = _mpnode_attrs[1] + 1
  1962. child_mpnode_attrs = (
  1963. Page._get_path(_mpnode_attrs[0], newdepth, numchild),
  1964. newdepth
  1965. )
  1966. numchild += 1
  1967. child_page.create_alias(
  1968. recursive=True,
  1969. parent=alias,
  1970. update_locale=update_locale,
  1971. user=user,
  1972. log_action=log_action,
  1973. reset_translation_key=reset_translation_key,
  1974. _mpnode_attrs=child_mpnode_attrs
  1975. )
  1976. if numchild > 0:
  1977. alias.numchild = numchild
  1978. alias.save(clean=False, update_fields=['numchild'])
  1979. return alias
  1980. create_alias.alters_data = True
  1981. @transaction.atomic
  1982. def copy_for_translation(self, locale, copy_parents=False, alias=False, exclude_fields=None):
  1983. """
  1984. Creates a copy of this page in the specified locale.
  1985. The new page will be created in draft as a child of this page's translated
  1986. parent.
  1987. For example, if you are translating a blog post from English into French,
  1988. this method will look for the French version of the blog index and create
  1989. the French translation of the blog post under that.
  1990. If this page's parent is not translated into the locale, then a ``ParentNotTranslatedError``
  1991. is raised. You can circumvent this error by passing ``copy_parents=True`` which
  1992. copies any parents that are not translated yet.
  1993. The ``exclude_fields`` parameter can be used to set any fields to a blank value
  1994. in the copy.
  1995. Note that this method calls the ``.copy()`` method internally so any fields that
  1996. are excluded in ``.exclude_fields_in_copy`` will be excluded from the translation.
  1997. """
  1998. # Find the translated version of the parent page to create the new page under
  1999. parent = self.get_parent().specific
  2000. slug = self.slug
  2001. if not parent.is_root():
  2002. try:
  2003. translated_parent = parent.get_translation(locale)
  2004. except parent.__class__.DoesNotExist:
  2005. if not copy_parents:
  2006. raise ParentNotTranslatedError
  2007. translated_parent = parent.copy_for_translation(
  2008. locale, copy_parents=True, alias=True
  2009. )
  2010. else:
  2011. # Don't duplicate the root page for translation. Create new locale as a sibling
  2012. translated_parent = parent
  2013. # Append language code to slug as the new page
  2014. # will be created in the same section as the existing one
  2015. slug += "-" + locale.language_code
  2016. # Find available slug for new page
  2017. slug = find_available_slug(translated_parent, slug)
  2018. if alias:
  2019. return self.create_alias(
  2020. parent=translated_parent,
  2021. update_slug=slug,
  2022. update_locale=locale,
  2023. reset_translation_key=False,
  2024. )
  2025. else:
  2026. # Update locale on translatable child objects as well
  2027. def process_child_object(
  2028. original_page, page_copy, child_relation, child_object
  2029. ):
  2030. if isinstance(child_object, TranslatableMixin):
  2031. child_object.locale = locale
  2032. return self.copy(
  2033. to=translated_parent,
  2034. update_attrs={
  2035. "locale": locale,
  2036. "slug": slug,
  2037. },
  2038. copy_revisions=False,
  2039. keep_live=False,
  2040. reset_translation_key=False,
  2041. process_child_object=process_child_object,
  2042. exclude_fields=exclude_fields,
  2043. )
  2044. copy_for_translation.alters_data = True
  2045. def permissions_for_user(self, user):
  2046. """
  2047. Return a PagePermissionsTester object defining what actions the user can perform on this page
  2048. """
  2049. user_perms = UserPagePermissionsProxy(user)
  2050. return user_perms.for_page(self)
  2051. def make_preview_request(self, original_request=None, preview_mode=None, extra_request_attrs=None):
  2052. """
  2053. Simulate a request to this page, by constructing a fake HttpRequest object that is (as far
  2054. as possible) representative of a real request to this page's front-end URL, and invoking
  2055. serve_preview with that request (and the given preview_mode).
  2056. Used for previewing / moderation and any other place where we
  2057. want to display a view of this page in the admin interface without going through the regular
  2058. page routing logic.
  2059. If you pass in a real request object as original_request, additional information (e.g. client IP, cookies)
  2060. will be included in the dummy request.
  2061. """
  2062. dummy_meta = self._get_dummy_headers(original_request)
  2063. request = WSGIRequest(dummy_meta)
  2064. # Add a flag to let middleware know that this is a dummy request.
  2065. request.is_dummy = True
  2066. if extra_request_attrs:
  2067. for k, v in extra_request_attrs.items():
  2068. setattr(request, k, v)
  2069. page = self
  2070. # Build a custom django.core.handlers.BaseHandler subclass that invokes serve_preview as
  2071. # the eventual view function called at the end of the middleware chain, rather than going
  2072. # through the URL resolver
  2073. class Handler(BaseHandler):
  2074. def _get_response(self, request):
  2075. response = page.serve_preview(request, preview_mode)
  2076. if hasattr(response, 'render') and callable(response.render):
  2077. response = response.render()
  2078. return response
  2079. # Invoke this custom handler.
  2080. handler = Handler()
  2081. handler.load_middleware()
  2082. return handler.get_response(request)
  2083. def _get_dummy_headers(self, original_request=None):
  2084. """
  2085. Return a dict of META information to be included in a faked HttpRequest object to pass to
  2086. serve_preview.
  2087. """
  2088. url = self._get_dummy_header_url(original_request)
  2089. if url:
  2090. url_info = urlparse(url)
  2091. hostname = url_info.hostname
  2092. path = url_info.path
  2093. port = url_info.port or (443 if url_info.scheme == 'https' else 80)
  2094. scheme = url_info.scheme
  2095. else:
  2096. # Cannot determine a URL to this page - cobble one together based on
  2097. # whatever we find in ALLOWED_HOSTS
  2098. try:
  2099. hostname = settings.ALLOWED_HOSTS[0]
  2100. if hostname == '*':
  2101. # '*' is a valid value to find in ALLOWED_HOSTS[0], but it's not a valid domain name.
  2102. # So we pretend it isn't there.
  2103. raise IndexError
  2104. except IndexError:
  2105. hostname = 'localhost'
  2106. path = '/'
  2107. port = 80
  2108. scheme = 'http'
  2109. http_host = hostname
  2110. if port != (443 if scheme == 'https' else 80):
  2111. http_host = '%s:%s' % (http_host, port)
  2112. dummy_values = {
  2113. 'REQUEST_METHOD': 'GET',
  2114. 'PATH_INFO': path,
  2115. 'SERVER_NAME': hostname,
  2116. 'SERVER_PORT': port,
  2117. 'SERVER_PROTOCOL': 'HTTP/1.1',
  2118. 'HTTP_HOST': http_host,
  2119. 'wsgi.version': (1, 0),
  2120. 'wsgi.input': StringIO(),
  2121. 'wsgi.errors': StringIO(),
  2122. 'wsgi.url_scheme': scheme,
  2123. 'wsgi.multithread': True,
  2124. 'wsgi.multiprocess': True,
  2125. 'wsgi.run_once': False,
  2126. }
  2127. # Add important values from the original request object, if it was provided.
  2128. HEADERS_FROM_ORIGINAL_REQUEST = [
  2129. 'REMOTE_ADDR', 'HTTP_X_FORWARDED_FOR', 'HTTP_COOKIE', 'HTTP_USER_AGENT', 'HTTP_AUTHORIZATION',
  2130. 'wsgi.version', 'wsgi.multithread', 'wsgi.multiprocess', 'wsgi.run_once',
  2131. ]
  2132. if settings.SECURE_PROXY_SSL_HEADER:
  2133. HEADERS_FROM_ORIGINAL_REQUEST.append(settings.SECURE_PROXY_SSL_HEADER[0])
  2134. if original_request:
  2135. for header in HEADERS_FROM_ORIGINAL_REQUEST:
  2136. if header in original_request.META:
  2137. dummy_values[header] = original_request.META[header]
  2138. return dummy_values
  2139. def _get_dummy_header_url(self, original_request=None):
  2140. """
  2141. Return the URL that _get_dummy_headers() should use to set META headers
  2142. for the faked HttpRequest.
  2143. """
  2144. return self.full_url
  2145. DEFAULT_PREVIEW_MODES = [('', _('Default'))]
  2146. @property
  2147. def preview_modes(self):
  2148. """
  2149. A list of (internal_name, display_name) tuples for the modes in which
  2150. this page can be displayed for preview/moderation purposes. Ordinarily a page
  2151. will only have one display mode, but subclasses of Page can override this -
  2152. for example, a page containing a form might have a default view of the form,
  2153. and a post-submission 'thank you' page
  2154. """
  2155. return Page.DEFAULT_PREVIEW_MODES
  2156. @property
  2157. def default_preview_mode(self):
  2158. """
  2159. The preview mode to use in workflows that do not give the user the option of selecting a
  2160. mode explicitly, e.g. moderator approval. Will raise IndexError if preview_modes is empty
  2161. """
  2162. return self.preview_modes[0][0]
  2163. def is_previewable(self):
  2164. """Returns True if at least one preview mode is specified"""
  2165. # It's possible that this will be called from a listing page using a plain Page queryset -
  2166. # if so, checking self.preview_modes would incorrectly give us the default set from
  2167. # Page.preview_modes. However, accessing self.specific.preview_modes would result in an N+1
  2168. # query problem. To avoid this (at least in the general case), we'll call .specific only if
  2169. # a check of the property at the class level indicates that preview_modes has been
  2170. # overridden from whatever type we're currently in.
  2171. page = self
  2172. if page.specific_class.preview_modes != type(page).preview_modes:
  2173. page = page.specific
  2174. return bool(page.preview_modes)
  2175. def serve_preview(self, request, mode_name):
  2176. """
  2177. Return an HTTP response for use in page previews. Normally this would be equivalent
  2178. to self.serve(request), since we obviously want the preview to be indicative of how
  2179. it looks on the live site. However, there are a couple of cases where this is not
  2180. appropriate, and custom behaviour is required:
  2181. 1) The page has custom routing logic that derives some additional required
  2182. args/kwargs to be passed to serve(). The routing mechanism is bypassed when
  2183. previewing, so there's no way to know what args we should pass. In such a case,
  2184. the page model needs to implement its own version of serve_preview.
  2185. 2) The page has several different renderings that we would like to be able to see
  2186. when previewing - for example, a form page might have one rendering that displays
  2187. the form, and another rendering to display a landing page when the form is posted.
  2188. This can be done by setting a custom preview_modes list on the page model -
  2189. Wagtail will allow the user to specify one of those modes when previewing, and
  2190. pass the chosen mode_name to serve_preview so that the page model can decide how
  2191. to render it appropriately. (Page models that do not specify their own preview_modes
  2192. list will always receive an empty string as mode_name.)
  2193. Any templates rendered during this process should use the 'request' object passed
  2194. here - this ensures that request.user and other properties are set appropriately for
  2195. the wagtail user bar to be displayed. This request will always be a GET.
  2196. """
  2197. request.is_preview = True
  2198. response = self.serve(request)
  2199. patch_cache_control(response, private=True)
  2200. return response
  2201. def get_cached_paths(self):
  2202. """
  2203. This returns a list of paths to invalidate in a frontend cache
  2204. """
  2205. return ['/']
  2206. def get_sitemap_urls(self, request=None):
  2207. return [
  2208. {
  2209. 'location': self.get_full_url(request),
  2210. # fall back on latest_revision_created_at if last_published_at is null
  2211. # (for backwards compatibility from before last_published_at was added)
  2212. 'lastmod': (self.last_published_at or self.latest_revision_created_at),
  2213. }
  2214. ]
  2215. def get_static_site_paths(self):
  2216. """
  2217. This is a generator of URL paths to feed into a static site generator
  2218. Override this if you would like to create static versions of subpages
  2219. """
  2220. # Yield path for this page
  2221. yield '/'
  2222. # Yield paths for child pages
  2223. for child in self.get_children().live():
  2224. for path in child.specific.get_static_site_paths():
  2225. yield '/' + child.slug + path
  2226. def get_ancestors(self, inclusive=False):
  2227. """
  2228. Returns a queryset of the current page's ancestors, starting at the root page
  2229. and descending to the parent, or to the current page itself if ``inclusive`` is true.
  2230. """
  2231. return Page.objects.ancestor_of(self, inclusive)
  2232. def get_descendants(self, inclusive=False):
  2233. """
  2234. Returns a queryset of all pages underneath the current page, any number of levels deep.
  2235. If ``inclusive`` is true, the current page itself is included in the queryset.
  2236. """
  2237. return Page.objects.descendant_of(self, inclusive)
  2238. def get_siblings(self, inclusive=True):
  2239. """
  2240. Returns a queryset of all other pages with the same parent as the current page.
  2241. If ``inclusive`` is true, the current page itself is included in the queryset.
  2242. """
  2243. return Page.objects.sibling_of(self, inclusive)
  2244. def get_next_siblings(self, inclusive=False):
  2245. return self.get_siblings(inclusive).filter(path__gte=self.path).order_by('path')
  2246. def get_prev_siblings(self, inclusive=False):
  2247. return self.get_siblings(inclusive).filter(path__lte=self.path).order_by('-path')
  2248. def get_view_restrictions(self):
  2249. """
  2250. Return a query set of all page view restrictions that apply to this page.
  2251. This checks the current page and all ancestor pages for page view restrictions.
  2252. If any of those pages are aliases, it will resolve them to their source pages
  2253. before querying PageViewRestrictions so alias pages use the same view restrictions
  2254. as their source page and they cannot have their own.
  2255. """
  2256. page_ids_to_check = set()
  2257. def add_page_to_check_list(page):
  2258. # If the page is an alias, add the source page to the check list instead
  2259. if page.alias_of:
  2260. add_page_to_check_list(page.alias_of)
  2261. else:
  2262. page_ids_to_check.add(page.id)
  2263. # Check current page for view restrictions
  2264. add_page_to_check_list(self)
  2265. # Check each ancestor for view restrictions as well
  2266. for page in self.get_ancestors().only('alias_of'):
  2267. add_page_to_check_list(page)
  2268. return PageViewRestriction.objects.filter(page_id__in=page_ids_to_check)
  2269. password_required_template = getattr(settings, 'PASSWORD_REQUIRED_TEMPLATE', 'wagtailcore/password_required.html')
  2270. def serve_password_required_response(self, request, form, action_url):
  2271. """
  2272. Serve a response indicating that the user has been denied access to view this page,
  2273. and must supply a password.
  2274. form = a Django form object containing the password input
  2275. (and zero or more hidden fields that also need to be output on the template)
  2276. action_url = URL that this form should be POSTed to
  2277. """
  2278. context = self.get_context(request)
  2279. context['form'] = form
  2280. context['action_url'] = action_url
  2281. return TemplateResponse(request, self.password_required_template, context)
  2282. def with_content_json(self, content_json):
  2283. """
  2284. Returns a new version of the page with field values updated to reflect changes
  2285. in the provided ``content_json`` (which usually comes from a previously-saved
  2286. page revision).
  2287. Certain field values are preserved in order to prevent errors if the returned
  2288. page is saved, such as ``id``, ``content_type`` and some tree-related values.
  2289. The following field values are also preserved, as they are considered to be
  2290. meaningful to the page as a whole, rather than to a specific revision:
  2291. * ``draft_title``
  2292. * ``live``
  2293. * ``has_unpublished_changes``
  2294. * ``owner``
  2295. * ``locked``
  2296. * ``locked_by``
  2297. * ``locked_at``
  2298. * ``latest_revision_created_at``
  2299. * ``first_published_at``
  2300. * ``alias_of``
  2301. """
  2302. obj = self.specific_class.from_json(content_json)
  2303. # These should definitely never change between revisions
  2304. obj.id = self.id
  2305. obj.pk = self.pk
  2306. obj.content_type = self.content_type
  2307. # Override possibly-outdated tree parameter fields
  2308. obj.path = self.path
  2309. obj.depth = self.depth
  2310. obj.numchild = self.numchild
  2311. # Update url_path to reflect potential slug changes, but maintining the page's
  2312. # existing tree position
  2313. obj.set_url_path(self.get_parent())
  2314. # Ensure other values that are meaningful for the page as a whole (rather than
  2315. # to a specific revision) are preserved
  2316. obj.draft_title = self.draft_title
  2317. obj.live = self.live
  2318. obj.has_unpublished_changes = self.has_unpublished_changes
  2319. obj.owner = self.owner
  2320. obj.locked = self.locked
  2321. obj.locked_by = self.locked_by
  2322. obj.locked_at = self.locked_at
  2323. obj.latest_revision_created_at = self.latest_revision_created_at
  2324. obj.first_published_at = self.first_published_at
  2325. obj.translation_key = self.translation_key
  2326. obj.locale = self.locale
  2327. obj.alias_of_id = self.alias_of_id
  2328. return obj
  2329. @property
  2330. def has_workflow(self):
  2331. """Returns True if the page or an ancestor has an active workflow assigned, otherwise False"""
  2332. return self.get_ancestors(inclusive=True).filter(workflowpage__isnull=False).filter(workflowpage__workflow__active=True).exists()
  2333. def get_workflow(self):
  2334. """Returns the active workflow assigned to the page or its nearest ancestor"""
  2335. if hasattr(self, 'workflowpage') and self.workflowpage.workflow.active:
  2336. return self.workflowpage.workflow
  2337. else:
  2338. try:
  2339. workflow = self.get_ancestors().filter(workflowpage__isnull=False).filter(workflowpage__workflow__active=True).order_by(
  2340. '-depth').first().workflowpage.workflow
  2341. except AttributeError:
  2342. workflow = None
  2343. return workflow
  2344. @property
  2345. def workflow_in_progress(self):
  2346. """Returns True if a workflow is in progress on the current page, otherwise False"""
  2347. return WorkflowState.objects.filter(page=self, status=WorkflowState.STATUS_IN_PROGRESS).exists()
  2348. @property
  2349. def current_workflow_state(self):
  2350. """Returns the in progress or needs changes workflow state on this page, if it exists"""
  2351. try:
  2352. return WorkflowState.objects.active().get(page=self)
  2353. except WorkflowState.DoesNotExist:
  2354. return
  2355. @property
  2356. def current_workflow_task_state(self):
  2357. """Returns (specific class of) the current task state of the workflow on this page, if it exists"""
  2358. if self.current_workflow_state and self.current_workflow_state.status == WorkflowState.STATUS_IN_PROGRESS and self.current_workflow_state.current_task_state:
  2359. return self.current_workflow_state.current_task_state.specific
  2360. @property
  2361. def current_workflow_task(self):
  2362. """Returns (specific class of) the current task in progress on this page, if it exists"""
  2363. if self.current_workflow_task_state:
  2364. return self.current_workflow_task_state.task.specific
  2365. class Meta:
  2366. verbose_name = _('page')
  2367. verbose_name_plural = _('pages')
  2368. unique_together = [("translation_key", "locale")]
  2369. class Orderable(models.Model):
  2370. sort_order = models.IntegerField(null=True, blank=True, editable=False)
  2371. sort_order_field = 'sort_order'
  2372. class Meta:
  2373. abstract = True
  2374. ordering = ['sort_order']
  2375. class SubmittedRevisionsManager(models.Manager):
  2376. def get_queryset(self):
  2377. return super().get_queryset().filter(submitted_for_moderation=True)
  2378. class PageRevision(models.Model):
  2379. page = models.ForeignKey('Page', verbose_name=_('page'), related_name='revisions', on_delete=models.CASCADE)
  2380. submitted_for_moderation = models.BooleanField(
  2381. verbose_name=_('submitted for moderation'),
  2382. default=False,
  2383. db_index=True
  2384. )
  2385. created_at = models.DateTimeField(db_index=True, verbose_name=_('created at'))
  2386. user = models.ForeignKey(
  2387. settings.AUTH_USER_MODEL, verbose_name=_('user'), null=True, blank=True,
  2388. on_delete=models.SET_NULL
  2389. )
  2390. content_json = models.TextField(verbose_name=_('content JSON'))
  2391. approved_go_live_at = models.DateTimeField(
  2392. verbose_name=_('approved go live at'),
  2393. null=True,
  2394. blank=True,
  2395. db_index=True
  2396. )
  2397. objects = models.Manager()
  2398. submitted_revisions = SubmittedRevisionsManager()
  2399. def save(self, user=None, *args, **kwargs):
  2400. # Set default value for created_at to now
  2401. # We cannot use auto_now_add as that will override
  2402. # any value that is set before saving
  2403. if self.created_at is None:
  2404. self.created_at = timezone.now()
  2405. super().save(*args, **kwargs)
  2406. if self.submitted_for_moderation:
  2407. # ensure that all other revisions of this page have the 'submitted for moderation' flag unset
  2408. self.page.revisions.exclude(id=self.id).update(submitted_for_moderation=False)
  2409. if (
  2410. self.approved_go_live_at is None
  2411. and 'update_fields' in kwargs and 'approved_go_live_at' in kwargs['update_fields']
  2412. ):
  2413. # Log scheduled revision publish cancellation
  2414. page = self.as_page_object()
  2415. # go_live_at = kwargs['update_fields'][]
  2416. PageLogEntry.objects.log_action(
  2417. instance=page,
  2418. action='wagtail.schedule.cancel',
  2419. data={
  2420. 'revision': {
  2421. 'id': self.id,
  2422. 'created': self.created_at.strftime("%d %b %Y %H:%M"),
  2423. 'go_live_at': page.go_live_at.strftime("%d %b %Y %H:%M") if page.go_live_at else None,
  2424. }
  2425. },
  2426. user=user,
  2427. revision=self,
  2428. )
  2429. def as_page_object(self):
  2430. return self.page.specific.with_content_json(self.content_json)
  2431. def approve_moderation(self, user=None):
  2432. if self.submitted_for_moderation:
  2433. logger.info("Page moderation approved: \"%s\" id=%d revision_id=%d", self.page.title, self.page.id, self.id)
  2434. PageLogEntry.objects.log_action(
  2435. instance=self.as_page_object(),
  2436. action='wagtail.moderation.approve',
  2437. user=user,
  2438. revision=self,
  2439. )
  2440. self.publish()
  2441. def reject_moderation(self, user=None):
  2442. if self.submitted_for_moderation:
  2443. logger.info("Page moderation rejected: \"%s\" id=%d revision_id=%d", self.page.title, self.page.id, self.id)
  2444. PageLogEntry.objects.log_action(
  2445. instance=self.as_page_object(),
  2446. action='wagtail.moderation.reject',
  2447. user=user,
  2448. revision=self,
  2449. )
  2450. self.submitted_for_moderation = False
  2451. self.save(update_fields=['submitted_for_moderation'])
  2452. def is_latest_revision(self):
  2453. if self.id is None:
  2454. # special case: a revision without an ID is presumed to be newly-created and is thus
  2455. # newer than any revision that might exist in the database
  2456. return True
  2457. latest_revision = PageRevision.objects.filter(page_id=self.page_id).order_by('-created_at', '-id').first()
  2458. return (latest_revision == self)
  2459. def publish(self, user=None, changed=True, log_action=True, previous_revision=None):
  2460. """
  2461. Publishes or schedules revision for publishing.
  2462. :param user: the publishing user
  2463. :param changed: indicated whether content has changed
  2464. :param log_action:
  2465. flag for the logging action. Pass False to skip logging. Cannot pass an action string as the method
  2466. performs several actions: "publish", "revert" (and publish the reverted revision),
  2467. "schedule publishing with a live revision", "schedule revision reversal publishing, with a live revision",
  2468. "schedule publishing", "schedule revision reversal publishing"
  2469. :param previous_revision: indicates a revision reversal. Should be set to the previous revision instance
  2470. """
  2471. page = self.as_page_object()
  2472. def log_scheduling_action(revision, user=None, changed=changed):
  2473. PageLogEntry.objects.log_action(
  2474. instance=page,
  2475. action='wagtail.publish.schedule',
  2476. user=user,
  2477. data={
  2478. 'revision': {
  2479. 'id': revision.id,
  2480. 'created': revision.created_at.strftime("%d %b %Y %H:%M"),
  2481. 'go_live_at': page.go_live_at.strftime("%d %b %Y %H:%M"),
  2482. 'has_live_version': page.live,
  2483. }
  2484. },
  2485. revision=revision,
  2486. content_changed=changed,
  2487. )
  2488. if page.go_live_at and page.go_live_at > timezone.now():
  2489. page.has_unpublished_changes = True
  2490. # Instead set the approved_go_live_at of this revision
  2491. self.approved_go_live_at = page.go_live_at
  2492. self.save()
  2493. # And clear the the approved_go_live_at of any other revisions
  2494. page.revisions.exclude(id=self.id).update(approved_go_live_at=None)
  2495. # if we are updating a currently live page skip the rest
  2496. if page.live_revision:
  2497. # Log scheduled publishing
  2498. if log_action:
  2499. log_scheduling_action(self, user, changed)
  2500. return
  2501. # if we have a go_live in the future don't make the page live
  2502. page.live = False
  2503. else:
  2504. page.live = True
  2505. # at this point, the page has unpublished changes iff there are newer revisions than this one
  2506. page.has_unpublished_changes = not self.is_latest_revision()
  2507. # If page goes live clear the approved_go_live_at of all revisions
  2508. page.revisions.update(approved_go_live_at=None)
  2509. page.expired = False # When a page is published it can't be expired
  2510. # Set first_published_at, last_published_at and live_revision
  2511. # if the page is being published now
  2512. if page.live:
  2513. now = timezone.now()
  2514. page.last_published_at = now
  2515. page.live_revision = self
  2516. if page.first_published_at is None:
  2517. page.first_published_at = now
  2518. if previous_revision:
  2519. previous_revision_page = previous_revision.as_page_object()
  2520. old_page_title = previous_revision_page.title if page.title != previous_revision_page.title else None
  2521. else:
  2522. try:
  2523. previous = self.get_previous()
  2524. except PageRevision.DoesNotExist:
  2525. previous = None
  2526. old_page_title = previous.page.title if previous and page.title != previous.page.title else None
  2527. else:
  2528. # Unset live_revision if the page is going live in the future
  2529. page.live_revision = None
  2530. page.save()
  2531. self.submitted_for_moderation = False
  2532. page.revisions.update(submitted_for_moderation=False)
  2533. workflow_state = page.current_workflow_state
  2534. if workflow_state and getattr(settings, 'WAGTAIL_WORKFLOW_CANCEL_ON_PUBLISH', True):
  2535. workflow_state.cancel(user=user)
  2536. if page.live:
  2537. page_published.send(sender=page.specific_class, instance=page.specific, revision=self)
  2538. # Update alias pages
  2539. page.update_aliases(revision=self, user=user, _content_json=self.content_json)
  2540. if log_action:
  2541. data = None
  2542. if previous_revision:
  2543. data = {
  2544. 'revision': {
  2545. 'id': previous_revision.id,
  2546. 'created': previous_revision.created_at.strftime("%d %b %Y %H:%M")
  2547. }
  2548. }
  2549. if old_page_title:
  2550. data = data or {}
  2551. data['title'] = {
  2552. 'old': old_page_title,
  2553. 'new': page.title,
  2554. }
  2555. PageLogEntry.objects.log_action(
  2556. instance=page,
  2557. action='wagtail.rename',
  2558. user=user,
  2559. data=data,
  2560. revision=self,
  2561. )
  2562. PageLogEntry.objects.log_action(
  2563. instance=page,
  2564. action=log_action if isinstance(log_action, str) else 'wagtail.publish',
  2565. user=user,
  2566. data=data,
  2567. revision=self,
  2568. content_changed=changed,
  2569. )
  2570. logger.info("Page published: \"%s\" id=%d revision_id=%d", page.title, page.id, self.id)
  2571. elif page.go_live_at:
  2572. logger.info(
  2573. "Page scheduled for publish: \"%s\" id=%d revision_id=%d go_live_at=%s",
  2574. page.title,
  2575. page.id,
  2576. self.id,
  2577. page.go_live_at.isoformat()
  2578. )
  2579. if log_action:
  2580. log_scheduling_action(self, user, changed)
  2581. def get_previous(self):
  2582. return self.get_previous_by_created_at(page=self.page)
  2583. def get_next(self):
  2584. return self.get_next_by_created_at(page=self.page)
  2585. def __str__(self):
  2586. return '"' + str(self.page) + '" at ' + str(self.created_at)
  2587. class Meta:
  2588. verbose_name = _('page revision')
  2589. verbose_name_plural = _('page revisions')
  2590. PAGE_PERMISSION_TYPES = [
  2591. ('add', _("Add"), _("Add/edit pages you own")),
  2592. ('edit', _("Edit"), _("Edit any page")),
  2593. ('publish', _("Publish"), _("Publish any page")),
  2594. ('bulk_delete', _("Bulk delete"), _("Delete pages with children")),
  2595. ('lock', _("Lock"), _("Lock/unlock pages you've locked")),
  2596. ('unlock', _("Unlock"), _("Unlock any page")),
  2597. ]
  2598. PAGE_PERMISSION_TYPE_CHOICES = [
  2599. (identifier, long_label)
  2600. for identifier, short_label, long_label in PAGE_PERMISSION_TYPES
  2601. ]
  2602. class GroupPagePermission(models.Model):
  2603. group = models.ForeignKey(Group, verbose_name=_('group'), related_name='page_permissions', on_delete=models.CASCADE)
  2604. page = models.ForeignKey('Page', verbose_name=_('page'), related_name='group_permissions', on_delete=models.CASCADE)
  2605. permission_type = models.CharField(
  2606. verbose_name=_('permission type'),
  2607. max_length=20,
  2608. choices=PAGE_PERMISSION_TYPE_CHOICES
  2609. )
  2610. class Meta:
  2611. unique_together = ('group', 'page', 'permission_type')
  2612. verbose_name = _('group page permission')
  2613. verbose_name_plural = _('group page permissions')
  2614. def __str__(self):
  2615. return "Group %d ('%s') has permission '%s' on page %d ('%s')" % (
  2616. self.group.id, self.group,
  2617. self.permission_type,
  2618. self.page.id, self.page
  2619. )
  2620. class UserPagePermissionsProxy:
  2621. """Helper object that encapsulates all the page permission rules that this user has
  2622. across the page hierarchy."""
  2623. def __init__(self, user):
  2624. self.user = user
  2625. if user.is_active and not user.is_superuser:
  2626. self.permissions = GroupPagePermission.objects.filter(group__user=self.user).select_related('page')
  2627. def revisions_for_moderation(self):
  2628. """Return a queryset of page revisions awaiting moderation that this user has publish permission on"""
  2629. # Deal with the trivial cases first...
  2630. if not self.user.is_active:
  2631. return PageRevision.objects.none()
  2632. if self.user.is_superuser:
  2633. return PageRevision.submitted_revisions.all()
  2634. # get the list of pages for which they have direct publish permission
  2635. # (i.e. they can publish any page within this subtree)
  2636. publishable_pages_paths = self.permissions.filter(
  2637. permission_type='publish'
  2638. ).values_list('page__path', flat=True).distinct()
  2639. if not publishable_pages_paths:
  2640. return PageRevision.objects.none()
  2641. # compile a filter expression to apply to the PageRevision.submitted_revisions manager:
  2642. # return only those pages whose paths start with one of the publishable_pages paths
  2643. only_my_sections = Q(page__path__startswith=publishable_pages_paths[0])
  2644. for page_path in publishable_pages_paths[1:]:
  2645. only_my_sections = only_my_sections | Q(page__path__startswith=page_path)
  2646. # return the filtered queryset
  2647. return PageRevision.submitted_revisions.filter(only_my_sections)
  2648. def for_page(self, page):
  2649. """Return a PagePermissionTester object that can be used to query whether this user has
  2650. permission to perform specific tasks on the given page"""
  2651. return PagePermissionTester(self, page)
  2652. def explorable_pages(self):
  2653. """Return a queryset of pages that the user has access to view in the
  2654. explorer (e.g. add/edit/publish permission). Includes all pages with
  2655. specific group permissions and also the ancestors of those pages (in
  2656. order to enable navigation in the explorer)"""
  2657. # Deal with the trivial cases first...
  2658. if not self.user.is_active:
  2659. return Page.objects.none()
  2660. if self.user.is_superuser:
  2661. return Page.objects.all()
  2662. explorable_pages = Page.objects.none()
  2663. # Creates a union queryset of all objects the user has access to add,
  2664. # edit and publish
  2665. for perm in self.permissions.filter(
  2666. Q(permission_type="add")
  2667. | Q(permission_type="edit")
  2668. | Q(permission_type="publish")
  2669. | Q(permission_type="lock")
  2670. ):
  2671. explorable_pages |= Page.objects.descendant_of(
  2672. perm.page, inclusive=True
  2673. )
  2674. # For all pages with specific permissions, add their ancestors as
  2675. # explorable. This will allow deeply nested pages to be accessed in the
  2676. # explorer. For example, in the hierarchy A>B>C>D where the user has
  2677. # 'edit' access on D, they will be able to navigate to D without having
  2678. # explicit access to A, B or C.
  2679. page_permissions = Page.objects.filter(group_permissions__in=self.permissions)
  2680. for page in page_permissions:
  2681. explorable_pages |= page.get_ancestors()
  2682. # Remove unnecessary top-level ancestors that the user has no access to
  2683. fca_page = page_permissions.first_common_ancestor()
  2684. explorable_pages = explorable_pages.filter(path__startswith=fca_page.path)
  2685. return explorable_pages
  2686. def editable_pages(self):
  2687. """Return a queryset of the pages that this user has permission to edit"""
  2688. # Deal with the trivial cases first...
  2689. if not self.user.is_active:
  2690. return Page.objects.none()
  2691. if self.user.is_superuser:
  2692. return Page.objects.all()
  2693. editable_pages = Page.objects.none()
  2694. for perm in self.permissions.filter(permission_type='add'):
  2695. # user has edit permission on any subpage of perm.page
  2696. # (including perm.page itself) that is owned by them
  2697. editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True).filter(owner=self.user)
  2698. for perm in self.permissions.filter(permission_type='edit'):
  2699. # user has edit permission on any subpage of perm.page
  2700. # (including perm.page itself) regardless of owner
  2701. editable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
  2702. return editable_pages
  2703. def can_edit_pages(self):
  2704. """Return True if the user has permission to edit any pages"""
  2705. return self.editable_pages().exists()
  2706. def publishable_pages(self):
  2707. """Return a queryset of the pages that this user has permission to publish"""
  2708. # Deal with the trivial cases first...
  2709. if not self.user.is_active:
  2710. return Page.objects.none()
  2711. if self.user.is_superuser:
  2712. return Page.objects.all()
  2713. publishable_pages = Page.objects.none()
  2714. for perm in self.permissions.filter(permission_type='publish'):
  2715. # user has publish permission on any subpage of perm.page
  2716. # (including perm.page itself)
  2717. publishable_pages |= Page.objects.descendant_of(perm.page, inclusive=True)
  2718. return publishable_pages
  2719. def can_publish_pages(self):
  2720. """Return True if the user has permission to publish any pages"""
  2721. return self.publishable_pages().exists()
  2722. def can_remove_locks(self):
  2723. """Returns True if the user has permission to unlock pages they have not locked"""
  2724. if self.user.is_superuser:
  2725. return True
  2726. if not self.user.is_active:
  2727. return False
  2728. else:
  2729. return self.permissions.filter(permission_type='unlock').exists()
  2730. class PagePermissionTester:
  2731. def __init__(self, user_perms, page):
  2732. self.user = user_perms.user
  2733. self.user_perms = user_perms
  2734. self.page = page
  2735. self.page_is_root = page.depth == 1 # Equivalent to page.is_root()
  2736. if self.user.is_active and not self.user.is_superuser:
  2737. self.permissions = set(
  2738. perm.permission_type for perm in user_perms.permissions
  2739. if self.page.path.startswith(perm.page.path)
  2740. )
  2741. def user_has_lock(self):
  2742. return self.page.locked_by_id == self.user.pk
  2743. def page_locked(self):
  2744. if self.page.current_workflow_task:
  2745. if self.page.current_workflow_task.page_locked_for_user(self.page, self.user):
  2746. return True
  2747. if not self.page.locked:
  2748. # Page is not locked
  2749. return False
  2750. if getattr(settings, 'WAGTAILADMIN_GLOBAL_PAGE_EDIT_LOCK', False):
  2751. # All locks are global
  2752. return True
  2753. else:
  2754. # Locked only if the current user was not the one who locked the page
  2755. return not self.user_has_lock()
  2756. def can_add_subpage(self):
  2757. if not self.user.is_active:
  2758. return False
  2759. specific_class = self.page.specific_class
  2760. if specific_class is None or not specific_class.creatable_subpage_models():
  2761. return False
  2762. return self.user.is_superuser or ('add' in self.permissions)
  2763. def can_edit(self):
  2764. if not self.user.is_active:
  2765. return False
  2766. if self.page_is_root: # root node is not a page and can never be edited, even by superusers
  2767. return False
  2768. if self.user.is_superuser:
  2769. return True
  2770. if 'edit' in self.permissions:
  2771. return True
  2772. if 'add' in self.permissions and self.page.owner_id == self.user.pk:
  2773. return True
  2774. if self.page.current_workflow_task:
  2775. if self.page.current_workflow_task.user_can_access_editor(self.page, self.user):
  2776. return True
  2777. return False
  2778. def can_delete(self, ignore_bulk=False):
  2779. if not self.user.is_active:
  2780. return False
  2781. if self.page_is_root: # root node is not a page and can never be deleted, even by superusers
  2782. return False
  2783. if self.user.is_superuser:
  2784. # superusers require no further checks
  2785. return True
  2786. # if the user does not have bulk_delete permission, they may only delete leaf pages
  2787. if 'bulk_delete' not in self.permissions and not self.page.is_leaf() and not ignore_bulk:
  2788. return False
  2789. if 'edit' in self.permissions:
  2790. # if the user does not have publish permission, we also need to confirm that there
  2791. # are no published pages here
  2792. if 'publish' not in self.permissions:
  2793. pages_to_delete = self.page.get_descendants(inclusive=True)
  2794. if pages_to_delete.live().exists():
  2795. return False
  2796. return True
  2797. elif 'add' in self.permissions:
  2798. pages_to_delete = self.page.get_descendants(inclusive=True)
  2799. if 'publish' in self.permissions:
  2800. # we don't care about live state, but all pages must be owned by this user
  2801. # (i.e. eliminating pages owned by this user must give us the empty set)
  2802. return not pages_to_delete.exclude(owner=self.user).exists()
  2803. else:
  2804. # all pages must be owned by this user and non-live
  2805. # (i.e. eliminating non-live pages owned by this user must give us the empty set)
  2806. return not pages_to_delete.exclude(live=False, owner=self.user).exists()
  2807. else:
  2808. return False
  2809. def can_unpublish(self):
  2810. if not self.user.is_active:
  2811. return False
  2812. if (not self.page.live) or self.page_is_root:
  2813. return False
  2814. if self.page_locked():
  2815. return False
  2816. return self.user.is_superuser or ('publish' in self.permissions)
  2817. def can_publish(self):
  2818. if not self.user.is_active:
  2819. return False
  2820. if self.page_is_root:
  2821. return False
  2822. return self.user.is_superuser or ('publish' in self.permissions)
  2823. def can_submit_for_moderation(self):
  2824. return not self.page_locked() and self.page.has_workflow and not self.page.workflow_in_progress
  2825. def can_set_view_restrictions(self):
  2826. return self.can_publish()
  2827. def can_unschedule(self):
  2828. return self.can_publish()
  2829. def can_lock(self):
  2830. if self.user.is_superuser:
  2831. return True
  2832. if self.page.current_workflow_task:
  2833. return self.page.current_workflow_task.user_can_lock(self.page, self.user)
  2834. if 'lock' in self.permissions:
  2835. return True
  2836. return False
  2837. def can_unlock(self):
  2838. if self.user.is_superuser:
  2839. return True
  2840. if self.user_has_lock():
  2841. return True
  2842. if self.page.current_workflow_task:
  2843. return self.page.current_workflow_task.user_can_unlock(self.page, self.user)
  2844. if 'unlock' in self.permissions:
  2845. return True
  2846. return False
  2847. def can_publish_subpage(self):
  2848. """
  2849. Niggly special case for creating and publishing a page in one go.
  2850. Differs from can_publish in that we want to be able to publish subpages of root, but not
  2851. to be able to publish root itself. (Also, can_publish_subpage returns false if the page
  2852. does not allow subpages at all.)
  2853. """
  2854. if not self.user.is_active:
  2855. return False
  2856. specific_class = self.page.specific_class
  2857. if specific_class is None or not specific_class.creatable_subpage_models():
  2858. return False
  2859. return self.user.is_superuser or ('publish' in self.permissions)
  2860. def can_reorder_children(self):
  2861. """
  2862. Keep reorder permissions the same as publishing, since it immediately affects published pages
  2863. (and the use-cases for a non-admin needing to do it are fairly obscure...)
  2864. """
  2865. return self.can_publish_subpage()
  2866. def can_move(self):
  2867. """
  2868. Moving a page should be logically equivalent to deleting and re-adding it (and all its children).
  2869. As such, the permission test for 'can this be moved at all?' should be the same as for deletion.
  2870. (Further constraints will then apply on where it can be moved *to*.)
  2871. """
  2872. return self.can_delete(ignore_bulk=True)
  2873. def can_copy(self):
  2874. return not self.page_is_root
  2875. def can_move_to(self, destination):
  2876. # reject the logically impossible cases first
  2877. if self.page == destination or destination.is_descendant_of(self.page):
  2878. return False
  2879. # reject moves that are forbidden by subpage_types / parent_page_types rules
  2880. # (these rules apply to superusers too)
  2881. if not self.page.specific.can_move_to(destination):
  2882. return False
  2883. # shortcut the trivial 'everything' / 'nothing' permissions
  2884. if not self.user.is_active:
  2885. return False
  2886. if self.user.is_superuser:
  2887. return True
  2888. # check that the page can be moved at all
  2889. if not self.can_move():
  2890. return False
  2891. # Inspect permissions on the destination
  2892. destination_perms = self.user_perms.for_page(destination)
  2893. # we always need at least add permission in the target
  2894. if 'add' not in destination_perms.permissions:
  2895. return False
  2896. if self.page.live or self.page.get_descendants().filter(live=True).exists():
  2897. # moving this page will entail publishing within the destination section
  2898. return ('publish' in destination_perms.permissions)
  2899. else:
  2900. # no publishing required, so the already-tested 'add' permission is sufficient
  2901. return True
  2902. def can_copy_to(self, destination, recursive=False):
  2903. # reject the logically impossible cases first
  2904. # recursive can't copy to the same tree otherwise it will be on infinite loop
  2905. if recursive and (self.page == destination or destination.is_descendant_of(self.page)):
  2906. return False
  2907. # reject inactive users early
  2908. if not self.user.is_active:
  2909. return False
  2910. # reject early if pages of this type cannot be created at the destination
  2911. if not self.page.specific_class.can_create_at(destination):
  2912. return False
  2913. # skip permission checking for super users
  2914. if self.user.is_superuser:
  2915. return True
  2916. # Inspect permissions on the destination
  2917. destination_perms = self.user_perms.for_page(destination)
  2918. if not destination.specific_class.creatable_subpage_models():
  2919. return False
  2920. # we always need at least add permission in the target
  2921. if 'add' not in destination_perms.permissions:
  2922. return False
  2923. return True
  2924. def can_view_revisions(self):
  2925. return not self.page_is_root
  2926. class BaseViewRestriction(models.Model):
  2927. NONE = 'none'
  2928. PASSWORD = 'password'
  2929. GROUPS = 'groups'
  2930. LOGIN = 'login'
  2931. RESTRICTION_CHOICES = (
  2932. (NONE, _("Public")),
  2933. (LOGIN, _("Private, accessible to logged-in users")),
  2934. (PASSWORD, _("Private, accessible with the following password")),
  2935. (GROUPS, _("Private, accessible to users in specific groups")),
  2936. )
  2937. restriction_type = models.CharField(
  2938. max_length=20, choices=RESTRICTION_CHOICES)
  2939. password = models.CharField(verbose_name=_('password'), max_length=255, blank=True)
  2940. groups = models.ManyToManyField(Group, verbose_name=_('groups'), blank=True)
  2941. def accept_request(self, request):
  2942. if self.restriction_type == BaseViewRestriction.PASSWORD:
  2943. passed_restrictions = request.session.get(self.passed_view_restrictions_session_key, [])
  2944. if self.id not in passed_restrictions:
  2945. return False
  2946. elif self.restriction_type == BaseViewRestriction.LOGIN:
  2947. if not request.user.is_authenticated:
  2948. return False
  2949. elif self.restriction_type == BaseViewRestriction.GROUPS:
  2950. if not request.user.is_superuser:
  2951. current_user_groups = request.user.groups.all()
  2952. if not any(group in current_user_groups for group in self.groups.all()):
  2953. return False
  2954. return True
  2955. def mark_as_passed(self, request):
  2956. """
  2957. Update the session data in the request to mark the user as having passed this
  2958. view restriction
  2959. """
  2960. has_existing_session = (settings.SESSION_COOKIE_NAME in request.COOKIES)
  2961. passed_restrictions = request.session.setdefault(self.passed_view_restrictions_session_key, [])
  2962. if self.id not in passed_restrictions:
  2963. passed_restrictions.append(self.id)
  2964. request.session[self.passed_view_restrictions_session_key] = passed_restrictions
  2965. if not has_existing_session:
  2966. # if this is a session we've created, set it to expire at the end
  2967. # of the browser session
  2968. request.session.set_expiry(0)
  2969. class Meta:
  2970. abstract = True
  2971. verbose_name = _('view restriction')
  2972. verbose_name_plural = _('view restrictions')
  2973. def save(self, user=None, specific_instance=None, **kwargs):
  2974. """
  2975. Custom save handler to include logging.
  2976. :param user: the user add/updating the view restriction
  2977. :param specific_instance: the specific model instance the restriction applies to
  2978. """
  2979. is_new = self.id is None
  2980. super().save(**kwargs)
  2981. if specific_instance:
  2982. PageLogEntry.objects.log_action(
  2983. instance=specific_instance,
  2984. action='wagtail.view_restriction.create' if is_new else 'wagtail.view_restriction.edit',
  2985. user=user,
  2986. data={
  2987. 'restriction': {
  2988. 'type': self.restriction_type,
  2989. 'title': force_str(dict(self.RESTRICTION_CHOICES).get(self.restriction_type))
  2990. }
  2991. }
  2992. )
  2993. def delete(self, user=None, specific_instance=None, **kwargs):
  2994. """
  2995. Custom delete handler to aid in logging
  2996. :param user: the user removing the view restriction
  2997. :param specific_instance: the specific model instance the restriction applies to
  2998. """
  2999. if specific_instance:
  3000. PageLogEntry.objects.log_action(
  3001. instance=specific_instance,
  3002. action='wagtail.view_restriction.delete',
  3003. user=user,
  3004. data={
  3005. 'restriction': {
  3006. 'type': self.restriction_type,
  3007. 'title': force_str(dict(self.RESTRICTION_CHOICES).get(self.restriction_type))
  3008. }
  3009. }
  3010. )
  3011. return super().delete(**kwargs)
  3012. class PageViewRestriction(BaseViewRestriction):
  3013. page = models.ForeignKey(
  3014. 'Page', verbose_name=_('page'), related_name='view_restrictions', on_delete=models.CASCADE
  3015. )
  3016. passed_view_restrictions_session_key = 'passed_page_view_restrictions'
  3017. class Meta:
  3018. verbose_name = _('page view restriction')
  3019. verbose_name_plural = _('page view restrictions')
  3020. def save(self, user=None, **kwargs):
  3021. return super().save(user, specific_instance=self.page.specific, **kwargs)
  3022. def delete(self, user=None, **kwargs):
  3023. return super().delete(user, specific_instance=self.page.specific, **kwargs)
  3024. class BaseCollectionManager(models.Manager):
  3025. def get_queryset(self):
  3026. return TreeQuerySet(self.model).order_by('path')
  3027. CollectionManager = BaseCollectionManager.from_queryset(TreeQuerySet)
  3028. class CollectionViewRestriction(BaseViewRestriction):
  3029. collection = models.ForeignKey(
  3030. 'Collection',
  3031. verbose_name=_('collection'),
  3032. related_name='view_restrictions',
  3033. on_delete=models.CASCADE
  3034. )
  3035. passed_view_restrictions_session_key = 'passed_collection_view_restrictions'
  3036. class Meta:
  3037. verbose_name = _('collection view restriction')
  3038. verbose_name_plural = _('collection view restrictions')
  3039. class Collection(TreebeardPathFixMixin, MP_Node):
  3040. """
  3041. A location in which resources such as images and documents can be grouped
  3042. """
  3043. name = models.CharField(max_length=255, verbose_name=_('name'))
  3044. objects = CollectionManager()
  3045. # Tell treebeard to order Collections' paths such that they are ordered by name at each level.
  3046. node_order_by = ['name']
  3047. def __str__(self):
  3048. return self.name
  3049. def get_ancestors(self, inclusive=False):
  3050. return Collection.objects.ancestor_of(self, inclusive)
  3051. def get_descendants(self, inclusive=False):
  3052. return Collection.objects.descendant_of(self, inclusive)
  3053. def get_siblings(self, inclusive=True):
  3054. return Collection.objects.sibling_of(self, inclusive)
  3055. def get_next_siblings(self, inclusive=False):
  3056. return self.get_siblings(inclusive).filter(path__gte=self.path).order_by('path')
  3057. def get_prev_siblings(self, inclusive=False):
  3058. return self.get_siblings(inclusive).filter(path__lte=self.path).order_by('-path')
  3059. def get_view_restrictions(self):
  3060. """Return a query set of all collection view restrictions that apply to this collection"""
  3061. return CollectionViewRestriction.objects.filter(collection__in=self.get_ancestors(inclusive=True))
  3062. def get_indented_name(self, indentation_start_depth=2, html=False):
  3063. """
  3064. Renders this Collection's name as a formatted string that displays its hierarchical depth via indentation.
  3065. If indentation_start_depth is supplied, the Collection's depth is rendered relative to that depth.
  3066. indentation_start_depth defaults to 2, the depth of the first non-Root Collection.
  3067. Pass html=True to get a HTML representation, instead of the default plain-text.
  3068. Example text output: " ↳ Pies"
  3069. Example HTML output: "&nbsp;&nbsp;&nbsp;&nbsp;&#x21b3 Pies"
  3070. """
  3071. display_depth = self.depth - indentation_start_depth
  3072. # A Collection with a display depth of 0 or less (Root's can be -1), should have no indent.
  3073. if display_depth <= 0:
  3074. return self.name
  3075. # Indent each level of depth by 4 spaces (the width of the ↳ character in our admin font), then add ↳
  3076. # before adding the name.
  3077. if html:
  3078. # NOTE: &#x21b3 is the hex HTML entity for ↳.
  3079. return format_html(
  3080. "{indent}{icon} {name}",
  3081. indent=mark_safe('&nbsp;' * 4 * display_depth),
  3082. icon=mark_safe('&#x21b3'),
  3083. name=self.name
  3084. )
  3085. # Output unicode plain-text version
  3086. return "{}↳ {}".format(' ' * 4 * display_depth, self.name)
  3087. class Meta:
  3088. verbose_name = _('collection')
  3089. verbose_name_plural = _('collections')
  3090. def get_root_collection_id():
  3091. return Collection.get_first_root_node().id
  3092. class CollectionMember(models.Model):
  3093. """
  3094. Base class for models that are categorised into collections
  3095. """
  3096. collection = models.ForeignKey(
  3097. Collection,
  3098. default=get_root_collection_id,
  3099. verbose_name=_('collection'),
  3100. related_name='+',
  3101. on_delete=models.CASCADE
  3102. )
  3103. search_fields = [
  3104. index.FilterField('collection'),
  3105. ]
  3106. class Meta:
  3107. abstract = True
  3108. class GroupCollectionPermissionManager(models.Manager):
  3109. def get_by_natural_key(self, group, collection, permission):
  3110. return self.get(group=group,
  3111. collection=collection,
  3112. permission=permission)
  3113. class GroupCollectionPermission(models.Model):
  3114. """
  3115. A rule indicating that a group has permission for some action (e.g. "create document")
  3116. within a specified collection.
  3117. """
  3118. group = models.ForeignKey(
  3119. Group,
  3120. verbose_name=_('group'),
  3121. related_name='collection_permissions',
  3122. on_delete=models.CASCADE
  3123. )
  3124. collection = models.ForeignKey(
  3125. Collection,
  3126. verbose_name=_('collection'),
  3127. related_name='group_permissions',
  3128. on_delete=models.CASCADE
  3129. )
  3130. permission = models.ForeignKey(
  3131. Permission,
  3132. verbose_name=_('permission'),
  3133. on_delete=models.CASCADE
  3134. )
  3135. def __str__(self):
  3136. return "Group %d ('%s') has permission '%s' on collection %d ('%s')" % (
  3137. self.group.id, self.group,
  3138. self.permission,
  3139. self.collection.id, self.collection
  3140. )
  3141. def natural_key(self):
  3142. return (self.group, self.collection, self.permission)
  3143. objects = GroupCollectionPermissionManager()
  3144. class Meta:
  3145. unique_together = ('group', 'collection', 'permission')
  3146. verbose_name = _('group collection permission')
  3147. verbose_name_plural = _('group collection permissions')
  3148. class WorkflowPage(models.Model):
  3149. page = models.OneToOneField(
  3150. 'Page',
  3151. verbose_name=_('page'),
  3152. on_delete=models.CASCADE,
  3153. primary_key=True,
  3154. unique=True
  3155. )
  3156. workflow = models.ForeignKey(
  3157. 'Workflow',
  3158. related_name='workflow_pages',
  3159. verbose_name=_('workflow'),
  3160. on_delete=models.CASCADE,
  3161. )
  3162. def get_pages(self):
  3163. """
  3164. Returns a queryset of pages that are affected by this WorkflowPage link.
  3165. This includes all descendants of the page excluding any that have other WorkflowPages.
  3166. """
  3167. descendant_pages = Page.objects.descendant_of(self.page, inclusive=True)
  3168. descendant_workflow_pages = WorkflowPage.objects.filter(page_id__in=descendant_pages.values_list('id', flat=True)).exclude(pk=self.pk)
  3169. for path, depth in descendant_workflow_pages.values_list('page__path', 'page__depth'):
  3170. descendant_pages = descendant_pages.exclude(path__startswith=path, depth__gte=depth)
  3171. return descendant_pages
  3172. class Meta:
  3173. verbose_name = _('workflow page')
  3174. verbose_name_plural = _('workflow pages')
  3175. class WorkflowTask(Orderable):
  3176. workflow = ParentalKey('Workflow', on_delete=models.CASCADE, verbose_name=_('workflow_tasks'),
  3177. related_name='workflow_tasks')
  3178. task = models.ForeignKey('Task', on_delete=models.CASCADE, verbose_name=_('task'), related_name='workflow_tasks',
  3179. limit_choices_to={'active': True})
  3180. class Meta(Orderable.Meta):
  3181. unique_together = [('workflow', 'task')]
  3182. verbose_name = _('workflow task order')
  3183. verbose_name_plural = _('workflow task orders')
  3184. class TaskManager(models.Manager):
  3185. def active(self):
  3186. return self.filter(active=True)
  3187. class Task(models.Model):
  3188. name = models.CharField(max_length=255, verbose_name=_('name'))
  3189. content_type = models.ForeignKey(
  3190. ContentType,
  3191. verbose_name=_('content type'),
  3192. related_name='wagtail_tasks',
  3193. on_delete=models.CASCADE
  3194. )
  3195. active = models.BooleanField(verbose_name=_('active'), default=True, help_text=_(
  3196. "Active tasks can be added to workflows. Deactivating a task does not remove it from existing workflows."))
  3197. objects = TaskManager()
  3198. admin_form_fields = ['name']
  3199. admin_form_readonly_on_edit_fields = ['name']
  3200. def __init__(self, *args, **kwargs):
  3201. super().__init__(*args, **kwargs)
  3202. if not self.id:
  3203. # this model is being newly created
  3204. # rather than retrieved from the db;
  3205. if not self.content_type_id:
  3206. # set content type to correctly represent the model class
  3207. # that this was created as
  3208. self.content_type = ContentType.objects.get_for_model(self)
  3209. def __str__(self):
  3210. return self.name
  3211. @property
  3212. def workflows(self):
  3213. """Returns all ``Workflow`` instances that use this task"""
  3214. return Workflow.objects.filter(workflow_tasks__task=self)
  3215. @property
  3216. def active_workflows(self):
  3217. """Return a ``QuerySet``` of active workflows that this task is part of"""
  3218. return Workflow.objects.active().filter(workflow_tasks__task=self)
  3219. @classmethod
  3220. def get_verbose_name(cls):
  3221. """
  3222. Returns the human-readable "verbose name" of this task model e.g "Group approval task".
  3223. """
  3224. # This is similar to doing cls._meta.verbose_name.title()
  3225. # except this doesn't convert any characters to lowercase
  3226. return capfirst(cls._meta.verbose_name)
  3227. @cached_property
  3228. def specific(self):
  3229. """
  3230. Return this Task in its most specific subclassed form.
  3231. """
  3232. # the ContentType.objects manager keeps a cache, so this should potentially
  3233. # avoid a database lookup over doing self.content_type. I think.
  3234. content_type = ContentType.objects.get_for_id(self.content_type_id)
  3235. model_class = content_type.model_class()
  3236. if model_class is None:
  3237. # Cannot locate a model class for this content type. This might happen
  3238. # if the codebase and database are out of sync (e.g. the model exists
  3239. # on a different git branch and we haven't rolled back migrations before
  3240. # switching branches); if so, the best we can do is return the page
  3241. # unchanged.
  3242. return self
  3243. elif isinstance(self, model_class):
  3244. # self is already the an instance of the most specific class
  3245. return self
  3246. else:
  3247. return content_type.get_object_for_this_type(id=self.id)
  3248. task_state_class = None
  3249. @classmethod
  3250. def get_task_state_class(self):
  3251. return self.task_state_class or TaskState
  3252. def start(self, workflow_state, user=None):
  3253. """Start this task on the provided workflow state by creating an instance of TaskState"""
  3254. task_state = self.get_task_state_class()(workflow_state=workflow_state)
  3255. task_state.status = TaskState.STATUS_IN_PROGRESS
  3256. task_state.page_revision = workflow_state.page.get_latest_revision()
  3257. task_state.task = self
  3258. task_state.save()
  3259. task_submitted.send(sender=task_state.specific.__class__, instance=task_state.specific, user=user)
  3260. return task_state
  3261. @transaction.atomic
  3262. def on_action(self, task_state, user, action_name, **kwargs):
  3263. """Performs an action on a task state determined by the ``action_name`` string passed"""
  3264. if action_name == 'approve':
  3265. task_state.approve(user=user, **kwargs)
  3266. elif action_name == 'reject':
  3267. task_state.reject(user=user, **kwargs)
  3268. def user_can_access_editor(self, page, user):
  3269. """Returns True if a user who would not normally be able to access the editor for the page should be able to if the page is currently on this task.
  3270. Note that returning False does not remove permissions from users who would otherwise have them."""
  3271. return False
  3272. def page_locked_for_user(self, page, user):
  3273. """Returns True if the page should be locked to a given user's edits. This can be used to prevent editing by non-reviewers."""
  3274. return False
  3275. def user_can_lock(self, page, user):
  3276. """Returns True if a user who would not normally be able to lock the page should be able to if the page is currently on this task.
  3277. Note that returning False does not remove permissions from users who would otherwise have them."""
  3278. return False
  3279. def user_can_unlock(self, page, user):
  3280. """Returns True if a user who would not normally be able to unlock the page should be able to if the page is currently on this task.
  3281. Note that returning False does not remove permissions from users who would otherwise have them."""
  3282. return False
  3283. def get_actions(self, page, user):
  3284. """
  3285. Get the list of action strings (name, verbose_name, whether the action requires additional data - see
  3286. ``get_form_for_action``) for actions the current user can perform for this task on the given page.
  3287. These strings should be the same as those able to be passed to ``on_action``
  3288. """
  3289. return []
  3290. def get_form_for_action(self, action):
  3291. return TaskStateCommentForm
  3292. def get_template_for_action(self, action):
  3293. return ''
  3294. def get_task_states_user_can_moderate(self, user, **kwargs):
  3295. """Returns a ``QuerySet`` of the task states the current user can moderate"""
  3296. return TaskState.objects.none()
  3297. @classmethod
  3298. def get_description(cls):
  3299. """Returns the task description."""
  3300. return ''
  3301. @transaction.atomic
  3302. def deactivate(self, user=None):
  3303. """Set ``active`` to False and cancel all in progress task states linked to this task"""
  3304. self.active = False
  3305. self.save()
  3306. in_progress_states = TaskState.objects.filter(task=self, status=TaskState.STATUS_IN_PROGRESS)
  3307. for state in in_progress_states:
  3308. state.cancel(user=user)
  3309. class Meta:
  3310. verbose_name = _('task')
  3311. verbose_name_plural = _('tasks')
  3312. class WorkflowManager(models.Manager):
  3313. def active(self):
  3314. return self.filter(active=True)
  3315. class Workflow(ClusterableModel):
  3316. name = models.CharField(max_length=255, verbose_name=_('name'))
  3317. active = models.BooleanField(verbose_name=_('active'), default=True, help_text=_(
  3318. "Active workflows can be added to pages. Deactivating a workflow does not remove it from existing pages."))
  3319. objects = WorkflowManager()
  3320. def __str__(self):
  3321. return self.name
  3322. @property
  3323. def tasks(self):
  3324. """Returns all ``Task`` instances linked to this workflow"""
  3325. return Task.objects.filter(workflow_tasks__workflow=self).order_by('workflow_tasks__sort_order')
  3326. @transaction.atomic
  3327. def start(self, page, user):
  3328. """Initiates a workflow by creating an instance of ``WorkflowState``"""
  3329. state = WorkflowState(page=page, workflow=self, status=WorkflowState.STATUS_IN_PROGRESS, requested_by=user)
  3330. state.save()
  3331. state.update(user=user)
  3332. workflow_submitted.send(sender=state.__class__, instance=state, user=user)
  3333. next_task_data = None
  3334. if state.current_task_state:
  3335. next_task_data = {
  3336. 'id': state.current_task_state.task.id,
  3337. 'title': state.current_task_state.task.name,
  3338. }
  3339. PageLogEntry.objects.log_action(
  3340. instance=page,
  3341. action='wagtail.workflow.start',
  3342. data={
  3343. 'workflow': {
  3344. 'id': self.id,
  3345. 'title': self.name,
  3346. 'status': state.status,
  3347. 'next': next_task_data,
  3348. 'task_state_id': state.current_task_state.id if state.current_task_state else None,
  3349. }
  3350. },
  3351. revision=page.get_latest_revision(),
  3352. user=user,
  3353. )
  3354. return state
  3355. @transaction.atomic
  3356. def deactivate(self, user=None):
  3357. """Sets the workflow as inactive, and cancels all in progress instances of ``WorkflowState`` linked to this workflow"""
  3358. self.active = False
  3359. in_progress_states = WorkflowState.objects.filter(workflow=self, status=WorkflowState.STATUS_IN_PROGRESS)
  3360. for state in in_progress_states:
  3361. state.cancel(user=user)
  3362. WorkflowPage.objects.filter(workflow=self).delete()
  3363. self.save()
  3364. def all_pages(self):
  3365. """
  3366. Returns a queryset of all the pages that this Workflow applies to.
  3367. """
  3368. pages = Page.objects.none()
  3369. for workflow_page in self.workflow_pages.all():
  3370. pages |= workflow_page.get_pages()
  3371. return pages
  3372. class Meta:
  3373. verbose_name = _('workflow')
  3374. verbose_name_plural = _('workflows')
  3375. class GroupApprovalTask(Task):
  3376. groups = models.ManyToManyField(Group, verbose_name=_('groups'), help_text=_('Pages at this step in a workflow will be moderated or approved by these groups of users'))
  3377. admin_form_fields = Task.admin_form_fields + ['groups']
  3378. admin_form_widgets = {
  3379. 'groups': forms.CheckboxSelectMultiple,
  3380. }
  3381. def start(self, workflow_state, user=None):
  3382. if workflow_state.page.locked_by:
  3383. # If the person who locked the page isn't in one of the groups, unlock the page
  3384. if not workflow_state.page.locked_by.groups.filter(id__in=self.groups.all()).exists():
  3385. workflow_state.page.locked = False
  3386. workflow_state.page.locked_by = None
  3387. workflow_state.page.locked_at = None
  3388. workflow_state.page.save(update_fields=['locked', 'locked_by', 'locked_at'])
  3389. return super().start(workflow_state, user=user)
  3390. def user_can_access_editor(self, page, user):
  3391. return self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser
  3392. def page_locked_for_user(self, page, user):
  3393. return not (self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser)
  3394. def user_can_lock(self, page, user):
  3395. return self.groups.filter(id__in=user.groups.all()).exists()
  3396. def user_can_unlock(self, page, user):
  3397. return False
  3398. def get_actions(self, page, user):
  3399. if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
  3400. return [
  3401. ('reject', _("Request changes"), True),
  3402. ('approve', _("Approve"), False),
  3403. ('approve', _("Approve with comment"), True),
  3404. ]
  3405. return []
  3406. def get_task_states_user_can_moderate(self, user, **kwargs):
  3407. if self.groups.filter(id__in=user.groups.all()).exists() or user.is_superuser:
  3408. return TaskState.objects.filter(status=TaskState.STATUS_IN_PROGRESS, task=self.task_ptr)
  3409. else:
  3410. return TaskState.objects.none()
  3411. @classmethod
  3412. def get_description(cls):
  3413. return _("Members of the chosen Wagtail Groups can approve this task")
  3414. class Meta:
  3415. verbose_name = _('Group approval task')
  3416. verbose_name_plural = _('Group approval tasks')
  3417. class WorkflowStateManager(models.Manager):
  3418. def active(self):
  3419. """
  3420. Filters to only STATUS_IN_PROGRESS and STATUS_NEEDS_CHANGES WorkflowStates
  3421. """
  3422. return self.filter(Q(status=WorkflowState.STATUS_IN_PROGRESS) | Q(status=WorkflowState.STATUS_NEEDS_CHANGES))
  3423. class WorkflowState(models.Model):
  3424. """Tracks the status of a started Workflow on a Page."""
  3425. STATUS_IN_PROGRESS = 'in_progress'
  3426. STATUS_APPROVED = 'approved'
  3427. STATUS_NEEDS_CHANGES = 'needs_changes'
  3428. STATUS_CANCELLED = 'cancelled'
  3429. STATUS_CHOICES = (
  3430. (STATUS_IN_PROGRESS, _("In progress")),
  3431. (STATUS_APPROVED, _("Approved")),
  3432. (STATUS_NEEDS_CHANGES, _("Needs changes")),
  3433. (STATUS_CANCELLED, _("Cancelled")),
  3434. )
  3435. page = models.ForeignKey('Page', on_delete=models.CASCADE, verbose_name=_("page"), related_name='workflow_states')
  3436. workflow = models.ForeignKey('Workflow', on_delete=models.CASCADE, verbose_name=_('workflow'), related_name='workflow_states')
  3437. status = models.fields.CharField(choices=STATUS_CHOICES, verbose_name=_("status"), max_length=50, default=STATUS_IN_PROGRESS)
  3438. created_at = models.DateTimeField(auto_now_add=True, verbose_name=_("created at"))
  3439. requested_by = models.ForeignKey(settings.AUTH_USER_MODEL,
  3440. verbose_name=_('requested by'),
  3441. null=True,
  3442. blank=True,
  3443. editable=True,
  3444. on_delete=models.SET_NULL,
  3445. related_name='requested_workflows')
  3446. current_task_state = models.OneToOneField('TaskState', on_delete=models.SET_NULL, null=True, blank=True,
  3447. verbose_name=_("current task state"))
  3448. # allows a custom function to be called on finishing the Workflow successfully.
  3449. on_finish = import_string(getattr(settings, 'WAGTAIL_FINISH_WORKFLOW_ACTION', 'wagtail.core.workflows.publish_workflow_state'))
  3450. objects = WorkflowStateManager()
  3451. def clean(self):
  3452. super().clean()
  3453. if self.status in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
  3454. # The unique constraint is conditional, and so not supported on the MySQL backend - so an additional check is done here
  3455. if WorkflowState.objects.active().filter(page=self.page).exclude(pk=self.pk).exists():
  3456. raise ValidationError(_('There may only be one in progress or needs changes workflow state per page.'))
  3457. def save(self, *args, **kwargs):
  3458. self.full_clean()
  3459. return super().save(*args, **kwargs)
  3460. def __str__(self):
  3461. return _("Workflow '{0}' on Page '{1}': {2}").format(self.workflow, self.page, self.status)
  3462. def resume(self, user=None):
  3463. """Put a STATUS_NEEDS_CHANGES workflow state back into STATUS_IN_PROGRESS, and restart the current task"""
  3464. if self.status != self.STATUS_NEEDS_CHANGES:
  3465. raise PermissionDenied
  3466. revision = self.current_task_state.page_revision
  3467. current_task_state = self.current_task_state
  3468. self.current_task_state = None
  3469. self.status = self.STATUS_IN_PROGRESS
  3470. self.save()
  3471. PageLogEntry.objects.log_action(
  3472. instance=self.page.specific,
  3473. action='wagtail.workflow.resume',
  3474. data={
  3475. 'workflow': {
  3476. 'id': self.workflow_id,
  3477. 'title': self.workflow.name,
  3478. 'status': self.status,
  3479. 'task_state_id': current_task_state.id,
  3480. 'task': {
  3481. 'id': current_task_state.task.id,
  3482. 'title': current_task_state.task.name,
  3483. },
  3484. }
  3485. },
  3486. revision=revision,
  3487. user=user,
  3488. )
  3489. return self.update(user=user, next_task=current_task_state.task)
  3490. def user_can_cancel(self, user):
  3491. if self.page.locked and self.page.locked_by != user:
  3492. return False
  3493. return user == self.requested_by or user == self.page.owner or (self.current_task_state and self.current_task_state.status == self.current_task_state.STATUS_IN_PROGRESS and 'approve' in [action[0] for action in self.current_task_state.task.get_actions(self.page, user)])
  3494. def update(self, user=None, next_task=None):
  3495. """Checks the status of the current task, and progresses (or ends) the workflow if appropriate. If the workflow progresses,
  3496. next_task will be used to start a specific task next if provided."""
  3497. if self.status != self.STATUS_IN_PROGRESS:
  3498. # Updating a completed or cancelled workflow should have no effect
  3499. return
  3500. try:
  3501. current_status = self.current_task_state.status
  3502. except AttributeError:
  3503. current_status = None
  3504. if current_status == TaskState.STATUS_REJECTED:
  3505. self.status = self.STATUS_NEEDS_CHANGES
  3506. self.save()
  3507. workflow_rejected.send(sender=self.__class__, instance=self, user=user)
  3508. else:
  3509. if not next_task:
  3510. next_task = self.get_next_task()
  3511. if next_task:
  3512. if (not self.current_task_state) or self.current_task_state.status != self.current_task_state.STATUS_IN_PROGRESS:
  3513. # if not on a task, or the next task to move to is not the current task (ie current task's status is
  3514. # not STATUS_IN_PROGRESS), move to the next task
  3515. self.current_task_state = next_task.specific.start(self, user=user)
  3516. self.save()
  3517. # if task has auto-approved, update the workflow again
  3518. if self.current_task_state.status != self.current_task_state.STATUS_IN_PROGRESS:
  3519. self.update(user=user)
  3520. # otherwise, continue on the current task
  3521. else:
  3522. # if there is no uncompleted task, finish the workflow.
  3523. self.finish(user=user)
  3524. @property
  3525. def successful_task_states(self):
  3526. successful_task_states = self.task_states.filter(
  3527. Q(status=TaskState.STATUS_APPROVED) | Q(status=TaskState.STATUS_SKIPPED)
  3528. )
  3529. if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
  3530. successful_task_states = successful_task_states.filter(page_revision=self.page.get_latest_revision())
  3531. return successful_task_states
  3532. def get_next_task(self):
  3533. """Returns the next active task, which has not been either approved or skipped"""
  3534. return (
  3535. Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)
  3536. .exclude(
  3537. task_states__in=self.successful_task_states
  3538. ).order_by('workflow_tasks__sort_order').first()
  3539. )
  3540. def cancel(self, user=None):
  3541. """Cancels the workflow state"""
  3542. if self.status not in (self.STATUS_IN_PROGRESS, self.STATUS_NEEDS_CHANGES):
  3543. raise PermissionDenied
  3544. self.status = self.STATUS_CANCELLED
  3545. self.save()
  3546. PageLogEntry.objects.log_action(
  3547. instance=self.page.specific,
  3548. action='wagtail.workflow.cancel',
  3549. data={
  3550. 'workflow': {
  3551. 'id': self.workflow_id,
  3552. 'title': self.workflow.name,
  3553. 'status': self.status,
  3554. 'task_state_id': self.current_task_state.id,
  3555. 'task': {
  3556. 'id': self.current_task_state.task.id,
  3557. 'title': self.current_task_state.task.name,
  3558. },
  3559. }
  3560. },
  3561. revision=self.current_task_state.page_revision,
  3562. user=user,
  3563. )
  3564. for state in self.task_states.filter(status=TaskState.STATUS_IN_PROGRESS):
  3565. # Cancel all in progress task states
  3566. state.specific.cancel(user=user)
  3567. workflow_cancelled.send(sender=self.__class__, instance=self, user=user)
  3568. @transaction.atomic
  3569. def finish(self, user=None):
  3570. """Finishes a successful in progress workflow, marking it as approved and performing the ``on_finish`` action"""
  3571. if self.status != self.STATUS_IN_PROGRESS:
  3572. raise PermissionDenied
  3573. self.status = self.STATUS_APPROVED
  3574. self.save()
  3575. self.on_finish(user=user)
  3576. workflow_approved.send(sender=self.__class__, instance=self, user=user)
  3577. def copy_approved_task_states_to_revision(self, revision):
  3578. """This creates copies of previously approved task states with page_revision set to a different revision."""
  3579. approved_states = TaskState.objects.filter(workflow_state=self, status=TaskState.STATUS_APPROVED)
  3580. for state in approved_states:
  3581. state.copy(update_attrs={'page_revision': revision})
  3582. def revisions(self):
  3583. """Returns all page revisions associated with task states linked to the current workflow state"""
  3584. return PageRevision.objects.filter(
  3585. page_id=self.page_id,
  3586. id__in=self.task_states.values_list('page_revision_id', flat=True)
  3587. ).defer('content_json')
  3588. def _get_applicable_task_states(self):
  3589. """Returns the set of task states whose status applies to the current revision"""
  3590. task_states = TaskState.objects.filter(workflow_state_id=self.id)
  3591. # If WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=True, this is only task states created on the current revision
  3592. if getattr(settings, "WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT", False):
  3593. latest_revision_id = self.revisions().order_by('-created_at', '-id').values_list('id', flat=True).first()
  3594. task_states = task_states.filter(page_revision_id=latest_revision_id)
  3595. return task_states
  3596. def all_tasks_with_status(self):
  3597. """
  3598. Returns a list of Task objects that are linked with this workflow state's
  3599. workflow. The status of that task in this workflow state is annotated in the
  3600. `.status` field. And a displayable version of that status is annotated in the
  3601. `.status_display` field.
  3602. This is different to querying TaskState as it also returns tasks that haven't
  3603. been started yet (so won't have a TaskState).
  3604. """
  3605. # Get the set of task states whose status applies to the current revision
  3606. task_states = self._get_applicable_task_states()
  3607. tasks = list(
  3608. self.workflow.tasks.annotate(
  3609. status=Subquery(
  3610. task_states.filter(
  3611. task_id=OuterRef('id'),
  3612. ).order_by(
  3613. '-started_at', '-id'
  3614. ).values('status')[:1]
  3615. ),
  3616. )
  3617. )
  3618. # Manually annotate status_display
  3619. status_choices = dict(TaskState.STATUS_CHOICES)
  3620. for task in tasks:
  3621. task.status_display = status_choices.get(task.status, _("Not started"))
  3622. return tasks
  3623. def all_tasks_with_state(self):
  3624. """
  3625. Returns a list of Task objects that are linked with this WorkflowState's
  3626. workflow, and have the latest task state.
  3627. In a "Submit for moderation -> reject at step 1 -> resubmit -> accept" workflow, this ensures
  3628. the task list reflects the accept, rather than the reject.
  3629. """
  3630. task_states = self._get_applicable_task_states()
  3631. tasks = list(
  3632. self.workflow.tasks.annotate(
  3633. task_state_id=Subquery(
  3634. task_states.filter(
  3635. task_id=OuterRef('id'),
  3636. ).order_by(
  3637. '-started_at', '-id'
  3638. ).values('id')[:1]
  3639. ),
  3640. )
  3641. )
  3642. task_states = {task_state.id: task_state for task_state in task_states}
  3643. # Manually annotate task_state
  3644. for task in tasks:
  3645. task.task_state = task_states.get(task.task_state_id)
  3646. return tasks
  3647. @property
  3648. def is_active(self):
  3649. return self.status not in [self.STATUS_APPROVED, self.STATUS_CANCELLED]
  3650. @property
  3651. def is_at_final_task(self):
  3652. """Returns the next active task, which has not been either approved or skipped"""
  3653. last_task = Task.objects.filter(workflow_tasks__workflow=self.workflow, active=True)\
  3654. .exclude(task_states__in=self.successful_task_states)\
  3655. .order_by('workflow_tasks__sort_order').last()
  3656. return self.get_next_task() == last_task
  3657. class Meta:
  3658. verbose_name = _('Workflow state')
  3659. verbose_name_plural = _('Workflow states')
  3660. # prevent multiple STATUS_IN_PROGRESS/STATUS_NEEDS_CHANGES workflows for the same page. This is only supported by specific databases (e.g. Postgres, SQL Server), so is checked additionally on save.
  3661. constraints = [
  3662. models.UniqueConstraint(fields=['page'], condition=Q(status__in=('in_progress', 'needs_changes')), name='unique_in_progress_workflow')
  3663. ]
  3664. class TaskStateManager(models.Manager):
  3665. def reviewable_by(self, user):
  3666. tasks = Task.objects.filter(active=True)
  3667. states = TaskState.objects.none()
  3668. for task in tasks:
  3669. states = states | task.specific.get_task_states_user_can_moderate(user=user)
  3670. return states
  3671. class TaskState(models.Model):
  3672. """Tracks the status of a given Task for a particular page revision."""
  3673. STATUS_IN_PROGRESS = 'in_progress'
  3674. STATUS_APPROVED = 'approved'
  3675. STATUS_REJECTED = 'rejected'
  3676. STATUS_SKIPPED = 'skipped'
  3677. STATUS_CANCELLED = 'cancelled'
  3678. STATUS_CHOICES = (
  3679. (STATUS_IN_PROGRESS, _("In progress")),
  3680. (STATUS_APPROVED, _("Approved")),
  3681. (STATUS_REJECTED, _("Rejected")),
  3682. (STATUS_SKIPPED, _("Skipped")),
  3683. (STATUS_CANCELLED, _("Cancelled")),
  3684. )
  3685. workflow_state = models.ForeignKey('WorkflowState', on_delete=models.CASCADE, verbose_name=_('workflow state'), related_name='task_states')
  3686. page_revision = models.ForeignKey('PageRevision', on_delete=models.CASCADE, verbose_name=_('page revision'), related_name='task_states')
  3687. task = models.ForeignKey('Task', on_delete=models.CASCADE, verbose_name=_('task'), related_name='task_states')
  3688. status = models.fields.CharField(choices=STATUS_CHOICES, verbose_name=_("status"), max_length=50, default=STATUS_IN_PROGRESS)
  3689. started_at = models.DateTimeField(verbose_name=_('started at'), auto_now_add=True)
  3690. finished_at = models.DateTimeField(verbose_name=_('finished at'), blank=True, null=True)
  3691. finished_by = models.ForeignKey(
  3692. settings.AUTH_USER_MODEL,
  3693. verbose_name=_('finished by'),
  3694. null=True,
  3695. blank=True,
  3696. on_delete=models.SET_NULL,
  3697. related_name='finished_task_states'
  3698. )
  3699. comment = models.TextField(blank=True)
  3700. content_type = models.ForeignKey(
  3701. ContentType,
  3702. verbose_name=_('content type'),
  3703. related_name='wagtail_task_states',
  3704. on_delete=models.CASCADE
  3705. )
  3706. exclude_fields_in_copy = []
  3707. default_exclude_fields_in_copy = ['id']
  3708. objects = TaskStateManager()
  3709. def __init__(self, *args, **kwargs):
  3710. super().__init__(*args, **kwargs)
  3711. if not self.id:
  3712. # this model is being newly created
  3713. # rather than retrieved from the db;
  3714. if not self.content_type_id:
  3715. # set content type to correctly represent the model class
  3716. # that this was created as
  3717. self.content_type = ContentType.objects.get_for_model(self)
  3718. def __str__(self):
  3719. return _("Task '{0}' on Page Revision '{1}': {2}").format(self.task, self.page_revision, self.status)
  3720. @cached_property
  3721. def specific(self):
  3722. """
  3723. Return this TaskState in its most specific subclassed form.
  3724. """
  3725. # the ContentType.objects manager keeps a cache, so this should potentially
  3726. # avoid a database lookup over doing self.content_type. I think.
  3727. content_type = ContentType.objects.get_for_id(self.content_type_id)
  3728. model_class = content_type.model_class()
  3729. if model_class is None:
  3730. # Cannot locate a model class for this content type. This might happen
  3731. # if the codebase and database are out of sync (e.g. the model exists
  3732. # on a different git branch and we haven't rolled back migrations before
  3733. # switching branches); if so, the best we can do is return the page
  3734. # unchanged.
  3735. return self
  3736. elif isinstance(self, model_class):
  3737. # self is already the an instance of the most specific class
  3738. return self
  3739. else:
  3740. return content_type.get_object_for_this_type(id=self.id)
  3741. @transaction.atomic
  3742. def approve(self, user=None, update=True, comment=''):
  3743. """Approve the task state and update the workflow state"""
  3744. if self.status != self.STATUS_IN_PROGRESS:
  3745. raise PermissionDenied
  3746. self.status = self.STATUS_APPROVED
  3747. self.finished_at = timezone.now()
  3748. self.finished_by = user
  3749. self.comment = comment
  3750. self.save()
  3751. self.log_state_change_action(user, 'approve')
  3752. if update:
  3753. self.workflow_state.update(user=user)
  3754. task_approved.send(sender=self.specific.__class__, instance=self.specific, user=user)
  3755. return self
  3756. @transaction.atomic
  3757. def reject(self, user=None, update=True, comment=''):
  3758. """Reject the task state and update the workflow state"""
  3759. if self.status != self.STATUS_IN_PROGRESS:
  3760. raise PermissionDenied
  3761. self.status = self.STATUS_REJECTED
  3762. self.finished_at = timezone.now()
  3763. self.finished_by = user
  3764. self.comment = comment
  3765. self.save()
  3766. self.log_state_change_action(user, 'reject')
  3767. if update:
  3768. self.workflow_state.update(user=user)
  3769. task_rejected.send(sender=self.specific.__class__, instance=self.specific, user=user)
  3770. return self
  3771. @cached_property
  3772. def task_type_started_at(self):
  3773. """Finds the first chronological started_at for successive TaskStates - ie started_at if the task had not been restarted"""
  3774. task_states = TaskState.objects.filter(workflow_state=self.workflow_state).order_by('-started_at').select_related('task')
  3775. started_at = None
  3776. for task_state in task_states:
  3777. if task_state.task == self.task:
  3778. started_at = task_state.started_at
  3779. elif started_at:
  3780. break
  3781. return started_at
  3782. @transaction.atomic
  3783. def cancel(self, user=None, resume=False, comment=''):
  3784. """Cancel the task state and update the workflow state. If ``resume`` is set to True, then upon update the workflow state
  3785. is passed the current task as ``next_task``, causing it to start a new task state on the current task if possible"""
  3786. self.status = self.STATUS_CANCELLED
  3787. self.finished_at = timezone.now()
  3788. self.comment = comment
  3789. self.finished_by = user
  3790. self.save()
  3791. if resume:
  3792. self.workflow_state.update(user=user, next_task=self.task.specific)
  3793. else:
  3794. self.workflow_state.update(user=user)
  3795. task_cancelled.send(sender=self.specific.__class__, instance=self.specific, user=user)
  3796. return self
  3797. def copy(self, update_attrs=None, exclude_fields=None):
  3798. """Copy this task state, excluding the attributes in the ``exclude_fields`` list and updating any attributes to values
  3799. specified in the ``update_attrs`` dictionary of ``attribute``: ``new value`` pairs"""
  3800. exclude_fields = self.default_exclude_fields_in_copy + self.exclude_fields_in_copy + (exclude_fields or [])
  3801. instance, child_object_map = _copy(self.specific, exclude_fields, update_attrs)
  3802. instance.save()
  3803. _copy_m2m_relations(self, instance, exclude_fields=exclude_fields)
  3804. return instance
  3805. def get_comment(self):
  3806. """
  3807. Returns a string that is displayed in workflow history.
  3808. This could be a comment by the reviewer, or generated.
  3809. Use mark_safe to return HTML.
  3810. """
  3811. return self.comment
  3812. def log_state_change_action(self, user, action):
  3813. """Log the approval/rejection action"""
  3814. page = self.page_revision.as_page_object()
  3815. next_task = self.workflow_state.get_next_task()
  3816. next_task_data = None
  3817. if next_task:
  3818. next_task_data = {
  3819. 'id': next_task.id,
  3820. 'title': next_task.name
  3821. }
  3822. PageLogEntry.objects.log_action(
  3823. instance=page,
  3824. action='wagtail.workflow.{}'.format(action),
  3825. user=user,
  3826. data={
  3827. 'workflow': {
  3828. 'id': self.workflow_state.workflow.id,
  3829. 'title': self.workflow_state.workflow.name,
  3830. 'status': self.status,
  3831. 'task_state_id': self.id,
  3832. 'task': {
  3833. 'id': self.task.id,
  3834. 'title': self.task.name,
  3835. },
  3836. 'next': next_task_data,
  3837. },
  3838. 'comment': self.get_comment()
  3839. },
  3840. revision=self.page_revision
  3841. )
  3842. class Meta:
  3843. verbose_name = _('Task state')
  3844. verbose_name_plural = _('Task states')
  3845. class BaseLogEntryManager(models.Manager):
  3846. def log_action(self, instance, action, **kwargs):
  3847. """
  3848. :param instance: The model instance we are logging an action for
  3849. :param action: The action. Should be namespaced to app (e.g. wagtail.create, wagtail.workflow.start)
  3850. :param kwargs: Addition fields to for the model deriving from BaseLogEntry
  3851. - user: The user performing the action
  3852. - title: the instance title
  3853. - data: any additional metadata
  3854. - content_changed, deleted - Boolean flags
  3855. :return: The new log entry
  3856. """
  3857. data = kwargs.pop('data', '')
  3858. title = kwargs.pop('title', None)
  3859. if not title:
  3860. if isinstance(instance, Page):
  3861. title = instance.specific_deferred.get_admin_display_title()
  3862. else:
  3863. title = str(instance)
  3864. timestamp = kwargs.pop('timestamp', timezone.now())
  3865. return self.model.objects.create(
  3866. content_type=ContentType.objects.get_for_model(instance, for_concrete_model=False),
  3867. label=title,
  3868. action=action,
  3869. timestamp=timestamp,
  3870. data_json=json.dumps(data),
  3871. **kwargs,
  3872. )
  3873. def get_for_model(self, model):
  3874. # Return empty queryset if the given object is not valid.
  3875. if not issubclass(model, models.Model):
  3876. return self.none()
  3877. ct = ContentType.objects.get_for_model(model)
  3878. return self.filter(content_type=ct)
  3879. def get_for_user(self, user_id):
  3880. return self.filter(user=user_id)
  3881. class PageLogEntryManager(BaseLogEntryManager):
  3882. def log_action(self, instance, action, **kwargs):
  3883. kwargs.update(page=instance)
  3884. return super().log_action(instance, action, **kwargs)
  3885. class BaseLogEntry(models.Model):
  3886. content_type = models.ForeignKey(
  3887. ContentType,
  3888. models.SET_NULL,
  3889. verbose_name=_('content type'),
  3890. blank=True, null=True,
  3891. related_name='+',
  3892. )
  3893. label = models.TextField()
  3894. action = models.CharField(max_length=255, blank=True, db_index=True)
  3895. data_json = models.TextField(blank=True)
  3896. timestamp = models.DateTimeField(verbose_name=_('timestamp (UTC)'))
  3897. user = models.ForeignKey(
  3898. settings.AUTH_USER_MODEL,
  3899. null=True, # Null if actioned by system
  3900. blank=True,
  3901. on_delete=models.DO_NOTHING,
  3902. db_constraint=False,
  3903. related_name='+',
  3904. )
  3905. # Flags for additional context to the 'action' made by the user (or system).
  3906. content_changed = models.BooleanField(default=False, db_index=True)
  3907. deleted = models.BooleanField(default=False)
  3908. objects = BaseLogEntryManager()
  3909. action_registry = None
  3910. class Meta:
  3911. abstract = True
  3912. verbose_name = _('log entry')
  3913. verbose_name_plural = _('log entries')
  3914. ordering = ['-timestamp']
  3915. def save(self, *args, **kwargs):
  3916. self.full_clean()
  3917. return super().save(*args, **kwargs)
  3918. def clean(self):
  3919. self.action_registry.scan_for_actions()
  3920. if self.action not in self.action_registry.actions:
  3921. raise ValidationError({'action': _("The log action '{}' has not been registered.").format(self.action)})
  3922. def __str__(self):
  3923. return "LogEntry %d: '%s' on '%s'" % (
  3924. self.pk, self.action, self.object_verbose_name()
  3925. )
  3926. @cached_property
  3927. def user_display_name(self):
  3928. """
  3929. Returns the display name of the associated user;
  3930. get_full_name if available and non-empty, otherwise get_username.
  3931. Defaults to 'system' when none is provided
  3932. """
  3933. if self.user_id:
  3934. try:
  3935. user = self.user
  3936. except self._meta.get_field('user').related_model.DoesNotExist:
  3937. # User has been deleted
  3938. return _('user %(id)d (deleted)') % {'id': self.user_id}
  3939. try:
  3940. full_name = user.get_full_name().strip()
  3941. except AttributeError:
  3942. full_name = ''
  3943. return full_name or user.get_username()
  3944. else:
  3945. return _('system')
  3946. @cached_property
  3947. def data(self):
  3948. """
  3949. Provides deserialized data
  3950. """
  3951. if self.data_json:
  3952. return json.loads(self.data_json)
  3953. else:
  3954. return {}
  3955. @cached_property
  3956. def object_verbose_name(self):
  3957. model_class = self.content_type.model_class()
  3958. if model_class is None:
  3959. return self.content_type_id
  3960. return model_class._meta.verbose_name.title
  3961. def object_id(self):
  3962. raise NotImplementedError
  3963. @cached_property
  3964. def comment(self):
  3965. if self.data:
  3966. return self.data.get('comment', '')
  3967. return ''
  3968. class PageLogEntry(BaseLogEntry):
  3969. page = models.ForeignKey(
  3970. 'wagtailcore.Page',
  3971. on_delete=models.DO_NOTHING,
  3972. db_constraint=False,
  3973. related_name='+'
  3974. )
  3975. # Pointer to a specific page revision
  3976. revision = models.ForeignKey(
  3977. 'wagtailcore.PageRevision',
  3978. null=True,
  3979. blank=True,
  3980. on_delete=models.DO_NOTHING,
  3981. db_constraint=False,
  3982. related_name='+',
  3983. )
  3984. objects = PageLogEntryManager()
  3985. action_registry = page_log_action_registry
  3986. class Meta:
  3987. ordering = ['-timestamp', '-id']
  3988. verbose_name = _('page log entry')
  3989. verbose_name_plural = _('page log entries')
  3990. def __str__(self):
  3991. return "PageLogEntry %d: '%s' on '%s' with id %s" % (
  3992. self.pk, self.action, self.object_verbose_name(), self.page_id
  3993. )
  3994. @cached_property
  3995. def object_id(self):
  3996. return self.page_id