from collections import OrderedDict from importlib import import_module from itertools import zip_longest import json import os from pathlib import Path from PIL import Image import datetime from django.apps import apps from django.conf import settings from django.contrib import messages from django.contrib.contenttypes.fields import GenericForeignKey from django.contrib.contenttypes.models import ContentType from django.core.files.storage import default_storage from django.core.serializers.json import DjangoJSONEncoder from django.db.models import ( CharField, TextField, DateTimeField, Model, ForeignKey, PROTECT, CASCADE, QuerySet, ) from django.db.models.fields.files import FieldFile from django.db.models.signals import post_delete from django.dispatch import receiver from django.forms import Form, ImageField, FileField, URLField, EmailField from django.http import HttpResponseRedirect from django.template.response import TemplateResponse from django.utils.safestring import SafeData, mark_safe from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ from wagtail.models import Page from wagtail.contrib.forms.models import ( AbstractForm, AbstractEmailForm, AbstractFormSubmission) from .blocks import FormStepBlock, FormFieldBlock class Step: def __init__(self, steps, index, struct_child): self.steps = steps self.index = index block = getattr(struct_child, 'block', None) if block is None: struct_child = [] if isinstance(block, FormStepBlock): self.name = struct_child.value['name'] self.form_fields = struct_child.value['form_fields'] else: self.name = '' self.form_fields = struct_child @property def index1(self): return self.index + 1 @property def url(self): return '%s?step=%s' % (self.steps.page.url, self.index1) def get_form_fields(self): form_fields = OrderedDict() field_blocks = self.form_fields for struct_child in field_blocks: block = struct_child.block if isinstance(block, FormFieldBlock): struct_value = struct_child.value field_name = block.get_slug(struct_value) form_fields[field_name] = block.get_field(struct_value) return form_fields def get_form_class(self): return type('WagtailForm', self.steps.page.get_form_class_bases(), self.get_form_fields()) def get_markups_and_bound_fields(self, form): for struct_child in self.form_fields: block = struct_child.block if isinstance(block, FormFieldBlock): struct_value = struct_child.value field_name = block.get_slug(struct_value) yield form[field_name], 'field' else: yield mark_safe(struct_child), 'markup' def __str__(self): if self.name: return self.name return _('Step %s') % self.index1 @property def badge(self): return (mark_safe('%s/%s') % (self.index1, len(self.steps))) def __html__(self): return '%s %s' % (self, self.badge) @property def is_active(self): return self.index == self.steps.current_index @property def is_last(self): return self.index1 == len(self.steps) @property def has_prev(self): return self.index > 0 @property def has_next(self): return self.index1 < len(self.steps) @property def prev(self): if self.has_prev: return self.steps[self.index-1] @property def next(self): if self.has_next: return self.steps[self.index+1] def get_existing_data(self, raw=False): data = self.steps.get_existing_data()[self.index] fields = self.get_form_fields() if not raw: class FakeField: storage = self.steps.get_storage() for field_name, value in data.items(): if field_name in fields and isinstance(fields[field_name], FileField): data[field_name] = FieldFile(None, FakeField, value) return data @property def is_available(self): return self.prev is None or self.prev.get_existing_data(raw=True) class StreamFormJSONEncoder(DjangoJSONEncoder): def default(self, o): try: from phonenumber_field.phonenumber import PhoneNumber except ImportError: pass else: if isinstance(o, PhoneNumber): return str(o) return super().default(o) class Steps(list): def __init__(self, page, request=None): self.page = page # TODO: Make it possible to change the `form_fields` attribute. self.form_fields = page.form_fields self.request = request has_steps = any(isinstance(struct_child.block, FormStepBlock) for struct_child in self.form_fields) if has_steps: steps = [Step(self, i, form_field) for i, form_field in enumerate(self.form_fields)] else: steps = [Step(self, 0, self.form_fields)] super().__init__(steps) def clamp_index(self, index: int): if index < 0: index = 0 if index >= len(self): index = len(self) - 1 while not self[index].is_available: index -= 1 return index @property def current_index(self): return self.request.session.get(self.page.current_step_session_key, 0) @property def current(self): return self[self.current_index] @current.setter def current(self, new_index: int): if not isinstance(new_index, int): raise TypeError('Use an integer to set the new current step.') self.request.session[self.page.current_step_session_key] = \ self.clamp_index(new_index) def forward(self, increment: int = 1): self.current = self.current_index + increment def backward(self, increment: int = 1): self.current = self.current_index - increment def get_submission(self): return self.page.get_submission(self.request) def get_existing_data(self): submission = self.get_submission() data = [] if submission is None else json.loads(submission.form_data) length_difference = len(self) - len(data) if length_difference > 0: data.extend([{}] * length_difference) return data def get_current_form(self): request = self.request if request.method == 'POST': step_value = request.POST.get('step', 'next') if step_value == 'prev': self.backward() else: return self.current.get_form_class()( request.POST, request.FILES, initial=self.current.get_existing_data()) return self.current.get_form_class()( initial=self.current.get_existing_data()) def get_storage(self): return self.page.get_storage() def save_files(self, form): submission = self.get_submission() for name, field in form.fields.items(): if isinstance(field, FileField): file = form.cleaned_data[name] if file == form.initial.get(name, ''): # Nothing submitted. form.cleaned_data[name] = file.name continue if submission is not None: submission.delete_file(name) if not file: # 'Clear' was checked. form.cleaned_data[name] = '' continue directory = self.request.session.session_key storage = self.get_storage() Path(storage.path(directory)).mkdir(parents=True, exist_ok=True) path = storage.get_available_name( str(Path(directory) / file.name)) with storage.open(path, 'wb+') as destination: for chunk in file.chunks(): destination.write(chunk) form.cleaned_data[name] = path def update_data(self): form = self.get_current_form() if form.is_valid(): form_data = self.get_existing_data() self.save_files(form) form_data[self.current_index] = form.cleaned_data form_data = json.dumps(form_data, cls=StreamFormJSONEncoder) is_complete = self.current.is_last submission = self.get_submission() submission.form_data = form_data if not submission.is_complete and is_complete: submission.status = submission.COMPLETE submission.save() if is_complete: self.current = 0 else: self.forward() return is_complete return False class SessionFormSubmission(AbstractFormSubmission): session_key = CharField(max_length=40, null=True, default=None) user = ForeignKey(settings.AUTH_USER_MODEL, null=True, blank=True, related_name='+', on_delete=PROTECT) thumbnails_by_path = TextField(default=json.dumps({})) last_modification = DateTimeField(_('last modification'), auto_now=True) INCOMPLETE = 'incomplete' COMPLETE = 'complete' REVIEWED = 'reviewed' APPROVED = 'approved' REJECTED = 'rejected' STATUSES = ( (INCOMPLETE, _('Not submitted')), (COMPLETE, _('In progress')), (REVIEWED, _('Under consideration')), (APPROVED, _('Approved')), (REJECTED, _('Rejected')), ) status = CharField(max_length=10, choices=STATUSES, default=INCOMPLETE) class Meta: verbose_name = _('form submission') verbose_name_plural = _('form submissions') unique_together = (('page', 'session_key'), ('page', 'user')) abstract = True @property def is_complete(self): return self.status != self.INCOMPLETE @property def form_page(self): return self.page.specific def get_session(self): return import_module(settings.SESSION_ENGINE).SessionStore( session_key=self.session_key) def reset_step(self): session = self.get_session() try: del session[self.form_page.current_step_session_key] except KeyError: pass else: session.save() def get_storage(self): return self.form_page.get_storage() def get_thumbnail_path(self, path, width=64, height=64): if not path: return '' variant = '%s×%s' % (width, height) thumbnails_by_path = json.loads(self.thumbnails_by_path) thumbnails_paths = thumbnails_by_path.get(path) if thumbnails_paths is None: thumbnails_by_path[path] = {} else: thumbnail_path = thumbnails_paths.get(variant) if thumbnail_path is not None: return thumbnail_path path = Path(path) thumbnail_path = str(path.with_suffix('.%s%s' % (variant, path.suffix))) storage = self.get_storage() thumbnail_path = storage.get_available_name(thumbnail_path) thumbnail = Image.open(storage.path(path)) thumbnail.thumbnail((width, height)) thumbnail.save(storage.path(thumbnail_path)) thumbnails_by_path[str(path)][variant] = thumbnail_path self.thumbnails_by_path = json.dumps(thumbnails_by_path, cls=StreamFormJSONEncoder) self.save() return thumbnail_path def get_fields(self, by_step=False): return self.form_page.get_form_fields(by_step=by_step) def get_existing_thumbnails(self, path): thumbnails_paths = json.loads(self.thumbnails_by_path).get(path, {}) for thumbnail_path in thumbnails_paths.values(): yield thumbnail_path def get_files_by_field(self): data = self.get_data(raw=True) files = {} for name, field in self.get_fields().items(): if isinstance(field, FileField): path = data.get(name) if path: files[name] = [path] + list( self.get_existing_thumbnails(path)) return files def get_all_files(self): for paths in self.get_files_by_field().values(): for path in paths: yield path def delete_file(self, field_name): thumbnails_by_path = json.loads(self.thumbnails_by_path) for path in self.get_files_by_field().get(field_name, ()): self.get_storage().delete(path) if path in thumbnails_by_path: del thumbnails_by_path[path] self.thumbnails_by_path = json.dumps(thumbnails_by_path, cls=StreamFormJSONEncoder) self.save() def render_email(self, value): return (mark_safe('%s') % (value, value)) def render_link(self, value): return (mark_safe('%s') % (value, value)) def render_image(self, value): storage = self.get_storage() return (mark_safe('') % (storage.url(value), storage.url(self.get_thumbnail_path(value)))) def render_file(self, value): return mark_safe('%s') % ( self.get_storage().url(value), Path(value).name ) def format_value(self, field, value): if value is None or value == '': return '-' new_value = self.form_page.format_value(field, value) if new_value != value: return new_value if value is True: return 'Yes' if value is False: return 'No' if isinstance(value, (list, tuple)): return ', '.join([self.format_value(field, item) for item in value]) if isinstance(value, datetime.date): return value if isinstance(field, EmailField): return self.render_email(value) if isinstance(field, URLField): return self.render_link(value) if isinstance(field, ImageField): return self.render_image(value) if isinstance(field, FileField): return self.render_file(value) if isinstance(value, SafeData) or hasattr(value, '__html__'): return value return str(value) def format_db_field(self, field_name, raw=False): method = getattr(self, 'get_%s_display' % field_name, None) if method is not None: return method() value = getattr(self, field_name) if raw: return value return self.format_value(self._meta.get_field(field_name).formfield(), value) def get_steps_data(self, raw=False): steps_data = json.loads(self.form_data) if raw: return steps_data fields_and_data_iterator = zip_longest(self.get_fields(by_step=True), steps_data, fillvalue={}) return [ OrderedDict([(name, self.format_value(field, step_data.get(name))) for name, field in step_fields.items()]) for step_fields, step_data in fields_and_data_iterator] def get_extra_data(self, raw=False): return self.form_page.get_extra_data(self, raw=raw) def get_data(self, raw=False, add_metadata=True): steps_data = self.get_steps_data(raw=raw) form_data = {} form_data.update(self.get_extra_data(raw=raw)) for step_data in steps_data: form_data.update(step_data) if add_metadata: form_data.update( status=self.format_db_field('status', raw=raw), user=self.format_db_field('user', raw=raw), submit_time=self.format_db_field('submit_time', raw=raw), last_modification=self.format_db_field('last_modification', raw=raw), ) return form_data def steps_with_data_iterator(self, raw=False): for step, step_data_fields, step_data in zip( self.form_page.get_steps(), self.form_page.get_data_fields(by_step=True), self.get_steps_data(raw=raw)): yield step, [(field_name, field_label, step_data[field_name]) for field_name, field_label in step_data_fields] @receiver(post_delete, sender=SessionFormSubmission) def delete_files(sender, **kwargs): instance = kwargs['instance'] instance.reset_step() storage = instance.get_storage() for path in instance.get_all_files(): storage.delete(path) # Automatically deletes ancestor folders if empty. directory = Path(path) while directory.parent != Path(directory.root): directory = directory.parent try: subdirectories, files = storage.listdir(directory) except FileNotFoundError: continue if not subdirectories and not files: Path(storage.path(directory)).rmdir() class SubmissionRevisionQuerySet(QuerySet): def for_submission(self, submission): return self.filter(**self.model.get_filters_for(submission)) def created(self): return self.filter(type=self.model.CREATED) def changed(self): return self.filter(type=self.model.CHANGED) def deleted(self): return self.filter(type=self.model.DELETED) class SubmissionRevision(Model): CREATED = 'created' CHANGED = 'changed' DELETED = 'deleted' TYPES = ( (CREATED, _('Created')), (CHANGED, _('Changed')), (DELETED, _('Deleted')), ) type = CharField(max_length=7, choices=TYPES) created_at = DateTimeField(auto_now_add=True) submission_ct = ForeignKey('contenttypes.ContentType', on_delete=CASCADE) submission_id = TextField() submission = GenericForeignKey('submission_ct', 'submission_id') data = TextField() summary = TextField() objects = SubmissionRevisionQuerySet.as_manager() class Meta: ordering = ('-created_at',) abstract = True @staticmethod def get_filters_for(submission): return { 'submission_ct': ContentType.objects.get_for_model(submission._meta.model), 'submission_id': str(submission.pk), } @classmethod def diff_summary(cls, page, data1, data2): diff = [] data_fields = page.get_data_fields() hidden_types = (tuple, list, dict) for k, label in data_fields: value1 = data1.get(k) value2 = data2.get(k) if value2 == value1 or not value1 and not value2: continue is_hidden = (isinstance(value1, hidden_types) or isinstance(value2, hidden_types)) # Escapes newlines as they are used as separator inside summaries. if isinstance(value1, str): value1 = value1.replace('\n', r'\n') if isinstance(value2, str): value2 = value2.replace('\n', r'\n') if value2 and not value1: diff.append( ((_('“%s” set.') % label) if is_hidden else (_('“%s” set to “%s”.')) % (label, value2))) elif value1 and not value2: diff.append(_('“%s” unset.') % label) else: diff.append(((_('“%s” changed.') % label) if is_hidden else (_('“%s” changed from “%s” to “%s”.') % (label, value1, value2)))) return '\n'.join(diff) @classmethod def create_from_submission(cls, submission, revision_type): page = submission.form_page try: previous = cls.objects.for_submission( submission).latest('created_at') except cls.DoesNotExist: previous_data = {} else: previous_data = previous.get_data() filters = cls.get_filters_for(submission) data = submission.get_data(raw=True, add_metadata=False) data['status'] = submission.status if revision_type == cls.CREATED: summary = _('Submission created.') elif revision_type == cls.DELETED: summary = _('Submission deleted.') else: summary = cls.diff_summary(page, previous_data, data) if not summary: # Nothing changed. return filters.update( type=revision_type, data=json.dumps(data, cls=StreamFormJSONEncoder), summary=summary ) return cls.objects.create(**filters) def get_data(self): return json.loads(self.data) # ORIGINAL NORIPYT CODE. # We don't want these receivers triggering. # @receiver(post_save) # def create_submission_changed_revision(sender, **kwargs): # if not issubclass(sender, SessionFormSubmission): # return # submission = kwargs['instance'] # created = kwargs['created'] # SubmissionRevision.create_from_submission( # submission, (SubmissionRevision.CREATED if created # else SubmissionRevision.CHANGED)) # @receiver(post_delete) # def create_submission_deleted_revision(sender, **kwargs): # if not issubclass(sender, SessionFormSubmission): # return # submission = kwargs['instance'] # SubmissionRevision.create_from_submission(submission, # SubmissionRevision.DELETED) class StreamFormMixin: preview_modes = Page.DEFAULT_PREVIEW_MODES @property def current_step_session_key(self): return '%s:step' % self.pk def get_steps(self, request=None): if not hasattr(self, 'steps'): steps = Steps(self, request=request) if request is None: return steps self.steps = steps return self.steps def get_form_fields(self, by_step=False): if by_step: return [step.get_form_fields() for step in self.get_steps()] form_fields = OrderedDict() for step_fields in self.get_form_fields(by_step=True): form_fields.update(step_fields) return form_fields def get_context(self, request, *args, **kwargs): context = super().get_context(request, *args, **kwargs) self.steps = self.get_steps(request) step_value = request.GET.get('step') if step_value is not None and step_value.isdigit(): self.steps.current = int(step_value) - 1 form = self.steps.get_current_form() context.update( steps=self.steps, step=self.steps.current, form=form, markups_and_bound_fields=list( self.steps.current.get_markups_and_bound_fields(form)), ) return context def get_storage(self): return default_storage @staticmethod def get_form_class_bases(): return Form, @staticmethod def get_submission_class(): return SessionFormSubmission def get_submission(self, request): Submission = self.get_submission_class() if request.user.is_authenticated: user_submission = Submission.objects.filter( user=request.user, page=self).order_by('-pk').first() if user_submission is None: return Submission(user=request.user, page=self, form_data='[]') return user_submission user_submission = Submission.objects.filter( session_key=request.session.session_key, page=self ).order_by('-pk').first() if user_submission is None: return Submission(session_key=request.session.session_key, page=self, form_data='[]') return user_submission def get_success_url(self): form_complete_models = [model for model in apps.get_models() if issubclass(model, FormCompleteMixin)] cts = (ContentType.objects .get_for_models(*form_complete_models).values()) first_child = self.get_children().filter(content_type__in=cts).first() if first_child is None: return self.url return first_child.url def serve_success(self, request, *args, **kwargs): url = self.get_success_url() if url == self.url: messages.success(request, _('Successfully submitted the form.')) return HttpResponseRedirect(url) def serve(self, request, *args, **kwargs): context = self.get_context(request) form = context['form'] if request.method == 'POST' and form.is_valid(): is_complete = self.steps.update_data() if is_complete: return self.serve_success(request, *args, **kwargs) return HttpResponseRedirect(self.url) return Page.serve(self, request, *args, **kwargs) def get_data_fields(self, by_step=False, add_metadata=True): if by_step: return [[(field_name, field.label) for field_name, field in step_fields.items()] for step_fields in self.get_form_fields(by_step=True)] data_fields = [] data_fields.extend(self.get_extra_data_fields()) if add_metadata: data_fields.extend(( ('status', _('Status')), ('user', _('User')), ('submit_time', _('First modification')), ('last_modification', _('Last modification')))) data_fields.extend([ (field_name, field_label) for step_data_fields in self.get_data_fields(by_step=True) for field_name, field_label in step_data_fields]) return data_fields def get_extra_data_fields(self): return () def get_extra_data(self, submission, raw=False): return {} def format_value(self, field, value): return value class ClosingFormMixin(Model): closing_at = DateTimeField() closed_template = None class Meta: abstract = True @property def is_closed(self): return now() > self.closing_at def get_closed_template(self, request, *args, **kwargs): if self.closed_template is None: template = self.get_template(request, *args, **kwargs) base, ext = os.path.splitext(template) return '%s_closed%s' % (base, ext) return self.closed_template def serve_closed(self, request, *args, **kwargs): return TemplateResponse( request, self.get_closed_template(request, *args, **kwargs), self.get_context(request, *args, **kwargs), ) def serve(self, request, *args, **kwargs): if self.is_closed: return self.serve_closed(request, *args, **kwargs) return super().serve(request, *args, **kwargs) class FormCompleteMixin: def get_form_page(self): return self.get_parent().specific def serve(self, request, *args, **kwargs): form_page = self.get_form_page() if isinstance(form_page, LoginRequiredMixin) \ and not request.user.is_authenticated(): return HttpResponseRedirect(form_page.url) self.submission = form_page.get_submission(request) if self.submission is not None and self.submission.is_complete \ or getattr(request, 'is_preview', False): return super().serve(request, *args, **kwargs) return HttpResponseRedirect(form_page.url) def get_context(self, *args, **kwargs): context = super().get_context(*args, **kwargs) if hasattr(self, 'submission'): context['submission'] = self.submission return context class LoginRequiredMixin: login_required_template = None def get_login_required_template(self, request, *args, **kwargs): if self.login_required_template is None: template = self.get_template(request, *args, **kwargs) base, ext = os.path.splitext(template) return '%s_login_required%s' % (base, ext) return self.login_required_template def serve_login_required(self, request, *args, **kwargs): return TemplateResponse( request, self.get_login_required_template(request, *args, **kwargs), self.get_context(request, *args, **kwargs), ) def serve(self, request, *args, **kwargs): if not request.user.is_authenticated(): return self.serve_login_required(request, *args, **kwargs) return super().serve(request, *args, **kwargs) class AbstractStreamForm(StreamFormMixin, AbstractForm): class Meta: abstract = True class AbstractEmailStreamForm(StreamFormMixin, AbstractEmailForm): class Meta: abstract = True