123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- from collections import OrderedDict
- from importlib import import_module
- from itertools import zip_longest
- import json
- import os
- from pathlib import Path
- from PIL import Image
- import datetime
- from django.apps import apps
- from django.conf import settings
- from django.contrib import messages
- from django.contrib.contenttypes.fields import GenericForeignKey
- from django.contrib.contenttypes.models import ContentType
- from django.core.files.storage import default_storage
- from django.core.serializers.json import DjangoJSONEncoder
- from django.db.models import (
- CharField, TextField, DateTimeField, Model, ForeignKey, PROTECT, CASCADE,
- QuerySet,
- )
- from django.db.models.fields.files import FieldFile
- from django.db.models.signals import post_delete
- from django.dispatch import receiver
- from django.forms import Form, ImageField, FileField, URLField, EmailField
- from django.http import HttpResponseRedirect
- from django.template.response import TemplateResponse
- from django.utils.safestring import SafeData, mark_safe
- from django.utils.timezone import now
- from django.utils.translation import gettext_lazy as _
- from wagtail.models import Page
- from wagtail.contrib.forms.models import (
- AbstractForm, AbstractEmailForm, AbstractFormSubmission)
- from .blocks import FormStepBlock, FormFieldBlock
- class Step:
- def __init__(self, steps, index, struct_child):
- self.steps = steps
- self.index = index
- block = getattr(struct_child, 'block', None)
- if block is None:
- struct_child = []
- if isinstance(block, FormStepBlock):
- self.name = struct_child.value['name']
- self.form_fields = struct_child.value['form_fields']
- else:
- self.name = ''
- self.form_fields = struct_child
- @property
- def index1(self):
- return self.index + 1
- @property
- def url(self):
- return '%s?step=%s' % (self.steps.page.url, self.index1)
- def get_form_fields(self):
- form_fields = OrderedDict()
- field_blocks = self.form_fields
- for struct_child in field_blocks:
- block = struct_child.block
- if isinstance(block, FormFieldBlock):
- struct_value = struct_child.value
- field_name = block.get_slug(struct_value)
- form_fields[field_name] = block.get_field(struct_value)
- return form_fields
- def get_form_class(self):
- return type('WagtailForm', self.steps.page.get_form_class_bases(),
- self.get_form_fields())
- def get_markups_and_bound_fields(self, form):
- for struct_child in self.form_fields:
- block = struct_child.block
- if isinstance(block, FormFieldBlock):
- struct_value = struct_child.value
- field_name = block.get_slug(struct_value)
- yield form[field_name], 'field'
- else:
- yield mark_safe(struct_child), 'markup'
- def __str__(self):
- if self.name:
- return self.name
- return _('Step %s') % self.index1
- @property
- def badge(self):
- return (mark_safe('<span class="badge">%s/%s</span>')
- % (self.index1, len(self.steps)))
- def __html__(self):
- return '%s %s' % (self, self.badge)
- @property
- def is_active(self):
- return self.index == self.steps.current_index
- @property
- def is_last(self):
- return self.index1 == len(self.steps)
- @property
- def has_prev(self):
- return self.index > 0
- @property
- def has_next(self):
- return self.index1 < len(self.steps)
- @property
- def prev(self):
- if self.has_prev:
- return self.steps[self.index-1]
- @property
- def next(self):
- if self.has_next:
- return self.steps[self.index+1]
- def get_existing_data(self, raw=False):
- data = self.steps.get_existing_data()[self.index]
- fields = self.get_form_fields()
- if not raw:
- class FakeField:
- storage = self.steps.get_storage()
- for field_name, value in data.items():
- if field_name in fields and isinstance(fields[field_name],
- FileField):
- data[field_name] = FieldFile(None, FakeField, value)
- return data
- @property
- def is_available(self):
- return self.prev is None or self.prev.get_existing_data(raw=True)
- class StreamFormJSONEncoder(DjangoJSONEncoder):
- def default(self, o):
- try:
- from phonenumber_field.phonenumber import PhoneNumber
- except ImportError:
- pass
- else:
- if isinstance(o, PhoneNumber):
- return str(o)
- return super().default(o)
- class Steps(list):
- def __init__(self, page, request=None):
- self.page = page
- # TODO: Make it possible to change the `form_fields` attribute.
- self.form_fields = page.form_fields
- self.request = request
- has_steps = any(isinstance(struct_child.block, FormStepBlock)
- for struct_child in self.form_fields)
- if has_steps:
- steps = [Step(self, i, form_field)
- for i, form_field in enumerate(self.form_fields)]
- else:
- steps = [Step(self, 0, self.form_fields)]
- super().__init__(steps)
- def clamp_index(self, index: int):
- if index < 0:
- index = 0
- if index >= len(self):
- index = len(self) - 1
- while not self[index].is_available:
- index -= 1
- return index
- @property
- def current_index(self):
- return self.request.session.get(self.page.current_step_session_key, 0)
- @property
- def current(self):
- return self[self.current_index]
- @current.setter
- def current(self, new_index: int):
- if not isinstance(new_index, int):
- raise TypeError('Use an integer to set the new current step.')
- self.request.session[self.page.current_step_session_key] = \
- self.clamp_index(new_index)
- def forward(self, increment: int = 1):
- self.current = self.current_index + increment
- def backward(self, increment: int = 1):
- self.current = self.current_index - increment
- def get_submission(self):
- return self.page.get_submission(self.request)
- def get_existing_data(self):
- submission = self.get_submission()
- data = [] if submission is None else json.loads(submission.form_data)
- length_difference = len(self) - len(data)
- if length_difference > 0:
- data.extend([{}] * length_difference)
- return data
- def get_current_form(self):
- request = self.request
- if request.method == 'POST':
- step_value = request.POST.get('step', 'next')
- if step_value == 'prev':
- self.backward()
- else:
- return self.current.get_form_class()(
- request.POST, request.FILES,
- initial=self.current.get_existing_data())
- return self.current.get_form_class()(
- initial=self.current.get_existing_data())
- def get_storage(self):
- return self.page.get_storage()
- def save_files(self, form):
- submission = self.get_submission()
- for name, field in form.fields.items():
- if isinstance(field, FileField):
- file = form.cleaned_data[name]
- if file == form.initial.get(name, ''): # Nothing submitted.
- form.cleaned_data[name] = file.name
- continue
- if submission is not None:
- submission.delete_file(name)
- if not file: # 'Clear' was checked.
- form.cleaned_data[name] = ''
- continue
- directory = self.request.session.session_key
- storage = self.get_storage()
- Path(storage.path(directory)).mkdir(parents=True,
- exist_ok=True)
- path = storage.get_available_name(
- str(Path(directory) / file.name))
- with storage.open(path, 'wb+') as destination:
- for chunk in file.chunks():
- destination.write(chunk)
- form.cleaned_data[name] = path
- def update_data(self):
- form = self.get_current_form()
- if form.is_valid():
- form_data = self.get_existing_data()
- self.save_files(form)
- form_data[self.current_index] = form.cleaned_data
- form_data = json.dumps(form_data, cls=StreamFormJSONEncoder)
- is_complete = self.current.is_last
- submission = self.get_submission()
- submission.form_data = form_data
- if not submission.is_complete and is_complete:
- submission.status = submission.COMPLETE
- submission.save()
- if is_complete:
- self.current = 0
- else:
- self.forward()
- return is_complete
- return False
- class SessionFormSubmission(AbstractFormSubmission):
- session_key = CharField(max_length=40, null=True, default=None)
- user = ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=True,
- related_name='+', on_delete=PROTECT)
- thumbnails_by_path = TextField(default=json.dumps({}))
- last_modification = DateTimeField(_('last modification'), auto_now=True)
- INCOMPLETE = 'incomplete'
- COMPLETE = 'complete'
- REVIEWED = 'reviewed'
- APPROVED = 'approved'
- REJECTED = 'rejected'
- STATUSES = (
- (INCOMPLETE, _('Not submitted')),
- (COMPLETE, _('In progress')),
- (REVIEWED, _('Under consideration')),
- (APPROVED, _('Approved')),
- (REJECTED, _('Rejected')),
- )
- status = CharField(max_length=10, choices=STATUSES, default=INCOMPLETE)
- class Meta:
- verbose_name = _('form submission')
- verbose_name_plural = _('form submissions')
- unique_together = (('page', 'session_key'),
- ('page', 'user'))
- abstract = True
- @property
- def is_complete(self):
- return self.status != self.INCOMPLETE
- @property
- def form_page(self):
- return self.page.specific
- def get_session(self):
- return import_module(settings.SESSION_ENGINE).SessionStore(
- session_key=self.session_key)
- def reset_step(self):
- session = self.get_session()
- try:
- del session[self.form_page.current_step_session_key]
- except KeyError:
- pass
- else:
- session.save()
- def get_storage(self):
- return self.form_page.get_storage()
- def get_thumbnail_path(self, path, width=64, height=64):
- if not path:
- return ''
- variant = '%s×%s' % (width, height)
- thumbnails_by_path = json.loads(self.thumbnails_by_path)
- thumbnails_paths = thumbnails_by_path.get(path)
- if thumbnails_paths is None:
- thumbnails_by_path[path] = {}
- else:
- thumbnail_path = thumbnails_paths.get(variant)
- if thumbnail_path is not None:
- return thumbnail_path
- path = Path(path)
- thumbnail_path = str(path.with_suffix('.%s%s'
- % (variant, path.suffix)))
- storage = self.get_storage()
- thumbnail_path = storage.get_available_name(thumbnail_path)
- thumbnail = Image.open(storage.path(path))
- thumbnail.thumbnail((width, height))
- thumbnail.save(storage.path(thumbnail_path))
- thumbnails_by_path[str(path)][variant] = thumbnail_path
- self.thumbnails_by_path = json.dumps(thumbnails_by_path,
- cls=StreamFormJSONEncoder)
- self.save()
- return thumbnail_path
- def get_fields(self, by_step=False):
- return self.form_page.get_form_fields(by_step=by_step)
- def get_existing_thumbnails(self, path):
- thumbnails_paths = json.loads(self.thumbnails_by_path).get(path, {})
- for thumbnail_path in thumbnails_paths.values():
- yield thumbnail_path
- def get_files_by_field(self):
- data = self.get_data(raw=True)
- files = {}
- for name, field in self.get_fields().items():
- if isinstance(field, FileField):
- path = data.get(name)
- if path:
- files[name] = [path] + list(
- self.get_existing_thumbnails(path))
- return files
- def get_all_files(self):
- for paths in self.get_files_by_field().values():
- for path in paths:
- yield path
- def delete_file(self, field_name):
- thumbnails_by_path = json.loads(self.thumbnails_by_path)
- for path in self.get_files_by_field().get(field_name, ()):
- self.get_storage().delete(path)
- if path in thumbnails_by_path:
- del thumbnails_by_path[path]
- self.thumbnails_by_path = json.dumps(thumbnails_by_path,
- cls=StreamFormJSONEncoder)
- self.save()
- def render_email(self, value):
- return (mark_safe('<a href="mailto:%s" target="_blank">%s</a>')
- % (value, value))
- def render_link(self, value):
- return (mark_safe('<a href="%s" target="_blank">%s</a>')
- % (value, value))
- def render_image(self, value):
- storage = self.get_storage()
- return (mark_safe('<a href="%s" target="_blank"><img src="%s" /></a>')
- % (storage.url(value),
- storage.url(self.get_thumbnail_path(value))))
- def render_file(self, value):
- return mark_safe('<a href="%s" target="_blank">%s</a>') % (
- self.get_storage().url(value),
- Path(value).name
- )
- def format_value(self, field, value):
- if value is None or value == '':
- return '-'
- new_value = self.form_page.format_value(field, value)
- if new_value != value:
- return new_value
- if value is True:
- return 'Yes'
- if value is False:
- return 'No'
- if isinstance(value, (list, tuple)):
- return ', '.join([self.format_value(field, item)
- for item in value])
- if isinstance(value, datetime.date):
- return value
- if isinstance(field, EmailField):
- return self.render_email(value)
- if isinstance(field, URLField):
- return self.render_link(value)
- if isinstance(field, ImageField):
- return self.render_image(value)
- if isinstance(field, FileField):
- return self.render_file(value)
- if isinstance(value, SafeData) or hasattr(value, '__html__'):
- return value
- return str(value)
- def format_db_field(self, field_name, raw=False):
- method = getattr(self, 'get_%s_display' % field_name, None)
- if method is not None:
- return method()
- value = getattr(self, field_name)
- if raw:
- return value
- return self.format_value(self._meta.get_field(field_name).formfield(),
- value)
- def get_steps_data(self, raw=False):
- steps_data = json.loads(self.form_data)
- if raw:
- return steps_data
- fields_and_data_iterator = zip_longest(self.get_fields(by_step=True),
- steps_data, fillvalue={})
- return [
- OrderedDict([(name, self.format_value(field, step_data.get(name)))
- for name, field in step_fields.items()])
- for step_fields, step_data in fields_and_data_iterator]
- def get_extra_data(self, raw=False):
- return self.form_page.get_extra_data(self, raw=raw)
- def get_data(self, raw=False, add_metadata=True):
- steps_data = self.get_steps_data(raw=raw)
- form_data = {}
- form_data.update(self.get_extra_data(raw=raw))
- for step_data in steps_data:
- form_data.update(step_data)
- if add_metadata:
- form_data.update(
- status=self.format_db_field('status', raw=raw),
- user=self.format_db_field('user', raw=raw),
- submit_time=self.format_db_field('submit_time', raw=raw),
- last_modification=self.format_db_field('last_modification',
- raw=raw),
- )
- return form_data
- def steps_with_data_iterator(self, raw=False):
- for step, step_data_fields, step_data in zip(
- self.form_page.get_steps(),
- self.form_page.get_data_fields(by_step=True),
- self.get_steps_data(raw=raw)):
- yield step, [(field_name, field_label, step_data[field_name])
- for field_name, field_label in step_data_fields]
- @receiver(post_delete, sender=SessionFormSubmission)
- def delete_files(sender, **kwargs):
- instance = kwargs['instance']
- instance.reset_step()
- storage = instance.get_storage()
- for path in instance.get_all_files():
- storage.delete(path)
- # Automatically deletes ancestor folders if empty.
- directory = Path(path)
- while directory.parent != Path(directory.root):
- directory = directory.parent
- try:
- subdirectories, files = storage.listdir(directory)
- except FileNotFoundError:
- continue
- if not subdirectories and not files:
- Path(storage.path(directory)).rmdir()
- class SubmissionRevisionQuerySet(QuerySet):
- def for_submission(self, submission):
- return self.filter(**self.model.get_filters_for(submission))
- def created(self):
- return self.filter(type=self.model.CREATED)
- def changed(self):
- return self.filter(type=self.model.CHANGED)
- def deleted(self):
- return self.filter(type=self.model.DELETED)
- class SubmissionRevision(Model):
- CREATED = 'created'
- CHANGED = 'changed'
- DELETED = 'deleted'
- TYPES = (
- (CREATED, _('Created')),
- (CHANGED, _('Changed')),
- (DELETED, _('Deleted')),
- )
- type = CharField(max_length=7, choices=TYPES)
- created_at = DateTimeField(auto_now_add=True)
- submission_ct = ForeignKey('contenttypes.ContentType', on_delete=CASCADE)
- submission_id = TextField()
- submission = GenericForeignKey('submission_ct', 'submission_id')
- data = TextField()
- summary = TextField()
- objects = SubmissionRevisionQuerySet.as_manager()
- class Meta:
- ordering = ('-created_at',)
- abstract = True
- @staticmethod
- def get_filters_for(submission):
- return {
- 'submission_ct':
- ContentType.objects.get_for_model(submission._meta.model),
- 'submission_id': str(submission.pk),
- }
- @classmethod
- def diff_summary(cls, page, data1, data2):
- diff = []
- data_fields = page.get_data_fields()
- hidden_types = (tuple, list, dict)
- for k, label in data_fields:
- value1 = data1.get(k)
- value2 = data2.get(k)
- if value2 == value1 or not value1 and not value2:
- continue
- is_hidden = (isinstance(value1, hidden_types)
- or isinstance(value2, hidden_types))
- # Escapes newlines as they are used as separator inside summaries.
- if isinstance(value1, str):
- value1 = value1.replace('\n', r'\n')
- if isinstance(value2, str):
- value2 = value2.replace('\n', r'\n')
- if value2 and not value1:
- diff.append(
- ((_('“%s” set.') % label) if is_hidden
- else (_('“%s” set to “%s”.')) % (label, value2)))
- elif value1 and not value2:
- diff.append(_('“%s” unset.') % label)
- else:
- diff.append(((_('“%s” changed.') % label) if is_hidden
- else (_('“%s” changed from “%s” to “%s”.')
- % (label, value1, value2))))
- return '\n'.join(diff)
- @classmethod
- def create_from_submission(cls, submission, revision_type):
- page = submission.form_page
- try:
- previous = cls.objects.for_submission(
- submission).latest('created_at')
- except cls.DoesNotExist:
- previous_data = {}
- else:
- previous_data = previous.get_data()
- filters = cls.get_filters_for(submission)
- data = submission.get_data(raw=True, add_metadata=False)
- data['status'] = submission.status
- if revision_type == cls.CREATED:
- summary = _('Submission created.')
- elif revision_type == cls.DELETED:
- summary = _('Submission deleted.')
- else:
- summary = cls.diff_summary(page, previous_data, data)
- if not summary: # Nothing changed.
- return
- filters.update(
- type=revision_type, data=json.dumps(data, cls=StreamFormJSONEncoder), summary=summary
- )
- return cls.objects.create(**filters)
- def get_data(self):
- return json.loads(self.data)
- # ORIGINAL NORIPYT CODE.
- # We don't want these receivers triggering.
- # @receiver(post_save)
- # def create_submission_changed_revision(sender, **kwargs):
- # if not issubclass(sender, SessionFormSubmission):
- # return
- # submission = kwargs['instance']
- # created = kwargs['created']
- # SubmissionRevision.create_from_submission(
- # submission, (SubmissionRevision.CREATED if created
- # else SubmissionRevision.CHANGED))
- # @receiver(post_delete)
- # def create_submission_deleted_revision(sender, **kwargs):
- # if not issubclass(sender, SessionFormSubmission):
- # return
- # submission = kwargs['instance']
- # SubmissionRevision.create_from_submission(submission,
- # SubmissionRevision.DELETED)
- class StreamFormMixin:
- preview_modes = Page.DEFAULT_PREVIEW_MODES
- @property
- def current_step_session_key(self):
- return '%s:step' % self.pk
- def get_steps(self, request=None):
- if not hasattr(self, 'steps'):
- steps = Steps(self, request=request)
- if request is None:
- return steps
- self.steps = steps
- return self.steps
- def get_form_fields(self, by_step=False):
- if by_step:
- return [step.get_form_fields() for step in self.get_steps()]
- form_fields = OrderedDict()
- for step_fields in self.get_form_fields(by_step=True):
- form_fields.update(step_fields)
- return form_fields
- def get_context(self, request, *args, **kwargs):
- context = super().get_context(request, *args, **kwargs)
- self.steps = self.get_steps(request)
- step_value = request.GET.get('step')
- if step_value is not None and step_value.isdigit():
- self.steps.current = int(step_value) - 1
- form = self.steps.get_current_form()
- context.update(
- steps=self.steps,
- step=self.steps.current,
- form=form,
- markups_and_bound_fields=list(
- self.steps.current.get_markups_and_bound_fields(form)),
- )
- return context
- def get_storage(self):
- return default_storage
- @staticmethod
- def get_form_class_bases():
- return Form,
- @staticmethod
- def get_submission_class():
- return SessionFormSubmission
- def get_submission(self, request):
- Submission = self.get_submission_class()
- if request.user.is_authenticated:
- user_submission = Submission.objects.filter(
- user=request.user, page=self).order_by('-pk').first()
- if user_submission is None:
- return Submission(user=request.user, page=self, form_data='[]')
- return user_submission
- user_submission = Submission.objects.filter(
- session_key=request.session.session_key, page=self
- ).order_by('-pk').first()
- if user_submission is None:
- return Submission(session_key=request.session.session_key,
- page=self, form_data='[]')
- return user_submission
- def get_success_url(self):
- form_complete_models = [model for model in apps.get_models()
- if issubclass(model, FormCompleteMixin)]
- cts = (ContentType.objects
- .get_for_models(*form_complete_models).values())
- first_child = self.get_children().filter(content_type__in=cts).first()
- if first_child is None:
- return self.url
- return first_child.url
- def serve_success(self, request, *args, **kwargs):
- url = self.get_success_url()
- if url == self.url:
- messages.success(request,
- _('Successfully submitted the form.'))
- return HttpResponseRedirect(url)
- def serve(self, request, *args, **kwargs):
- context = self.get_context(request)
- form = context['form']
- if request.method == 'POST' and form.is_valid():
- is_complete = self.steps.update_data()
- if is_complete:
- return self.serve_success(request, *args, **kwargs)
- return HttpResponseRedirect(self.url)
- return Page.serve(self, request, *args, **kwargs)
- def get_data_fields(self, by_step=False, add_metadata=True):
- if by_step:
- return [[(field_name, field.label)
- for field_name, field in step_fields.items()]
- for step_fields in self.get_form_fields(by_step=True)]
- data_fields = []
- data_fields.extend(self.get_extra_data_fields())
- if add_metadata:
- data_fields.extend((
- ('status', _('Status')),
- ('user', _('User')),
- ('submit_time', _('First modification')),
- ('last_modification', _('Last modification'))))
- data_fields.extend([
- (field_name, field_label)
- for step_data_fields in self.get_data_fields(by_step=True)
- for field_name, field_label in step_data_fields])
- return data_fields
- def get_extra_data_fields(self):
- return ()
- def get_extra_data(self, submission, raw=False):
- return {}
- def format_value(self, field, value):
- return value
- class ClosingFormMixin(Model):
- closing_at = DateTimeField()
- closed_template = None
- class Meta:
- abstract = True
- @property
- def is_closed(self):
- return now() > self.closing_at
- def get_closed_template(self, request, *args, **kwargs):
- if self.closed_template is None:
- template = self.get_template(request, *args, **kwargs)
- base, ext = os.path.splitext(template)
- return '%s_closed%s' % (base, ext)
- return self.closed_template
- def serve_closed(self, request, *args, **kwargs):
- return TemplateResponse(
- request,
- self.get_closed_template(request, *args, **kwargs),
- self.get_context(request, *args, **kwargs),
- )
- def serve(self, request, *args, **kwargs):
- if self.is_closed:
- return self.serve_closed(request, *args, **kwargs)
- return super().serve(request, *args, **kwargs)
- class FormCompleteMixin:
- def get_form_page(self):
- return self.get_parent().specific
- def serve(self, request, *args, **kwargs):
- form_page = self.get_form_page()
- if isinstance(form_page, LoginRequiredMixin) \
- and not request.user.is_authenticated():
- return HttpResponseRedirect(form_page.url)
- self.submission = form_page.get_submission(request)
- if self.submission is not None and self.submission.is_complete \
- or getattr(request, 'is_preview', False):
- return super().serve(request, *args, **kwargs)
- return HttpResponseRedirect(form_page.url)
- def get_context(self, *args, **kwargs):
- context = super().get_context(*args, **kwargs)
- if hasattr(self, 'submission'):
- context['submission'] = self.submission
- return context
- class LoginRequiredMixin:
- login_required_template = None
- def get_login_required_template(self, request, *args, **kwargs):
- if self.login_required_template is None:
- template = self.get_template(request, *args, **kwargs)
- base, ext = os.path.splitext(template)
- return '%s_login_required%s' % (base, ext)
- return self.login_required_template
- def serve_login_required(self, request, *args, **kwargs):
- return TemplateResponse(
- request,
- self.get_login_required_template(request, *args, **kwargs),
- self.get_context(request, *args, **kwargs),
- )
- def serve(self, request, *args, **kwargs):
- if not request.user.is_authenticated():
- return self.serve_login_required(request, *args, **kwargs)
- return super().serve(request, *args, **kwargs)
- class AbstractStreamForm(StreamFormMixin, AbstractForm):
- class Meta:
- abstract = True
- class AbstractEmailStreamForm(StreamFormMixin, AbstractEmailForm):
- class Meta:
- abstract = True
|