|
@@ -1,925 +0,0 @@
|
|
|
-import datetime
|
|
|
-import json
|
|
|
-import os
|
|
|
-from collections import OrderedDict
|
|
|
-from importlib import import_module
|
|
|
-from itertools import zip_longest
|
|
|
-from pathlib import Path
|
|
|
-
|
|
|
-from django.apps import apps
|
|
|
-from django.conf import settings
|
|
|
-from django.contrib import messages
|
|
|
-from django.contrib.contenttypes.fields import GenericForeignKey
|
|
|
-from django.contrib.contenttypes.models import ContentType
|
|
|
-from django.core.files.storage import default_storage
|
|
|
-from django.core.serializers.json import DjangoJSONEncoder
|
|
|
-from django.db.models import CASCADE
|
|
|
-from django.db.models import PROTECT
|
|
|
-from django.db.models import CharField
|
|
|
-from django.db.models import DateTimeField
|
|
|
-from django.db.models import ForeignKey
|
|
|
-from django.db.models import Model
|
|
|
-from django.db.models import QuerySet
|
|
|
-from django.db.models import TextField
|
|
|
-from django.db.models.fields.files import FieldFile
|
|
|
-from django.db.models.signals import post_delete
|
|
|
-from django.dispatch import receiver
|
|
|
-from django.forms import EmailField
|
|
|
-from django.forms import FileField
|
|
|
-from django.forms import Form
|
|
|
-from django.forms import ImageField
|
|
|
-from django.forms import URLField
|
|
|
-from django.http import HttpResponseRedirect
|
|
|
-from django.template.response import TemplateResponse
|
|
|
-from django.utils.safestring import SafeData
|
|
|
-from django.utils.safestring import mark_safe
|
|
|
-from django.utils.timezone import now
|
|
|
-from django.utils.translation import gettext_lazy as _
|
|
|
-from PIL import Image
|
|
|
-from wagtail.contrib.forms.models import AbstractEmailForm
|
|
|
-from wagtail.contrib.forms.models import AbstractForm
|
|
|
-from wagtail.contrib.forms.models import AbstractFormSubmission
|
|
|
-
|
|
|
-from .blocks import FormFieldBlock
|
|
|
-from .blocks import FormStepBlock
|
|
|
-
|
|
|
-
|
|
|
-class Step:
|
|
|
- def __init__(self, steps, index, struct_child):
|
|
|
- self.steps = steps
|
|
|
- self.index = index
|
|
|
- block = getattr(struct_child, "block", None)
|
|
|
- if block is None:
|
|
|
- struct_child = []
|
|
|
- if isinstance(block, FormStepBlock):
|
|
|
- self.name = struct_child.value["name"]
|
|
|
- self.form_fields = struct_child.value["form_fields"]
|
|
|
- else:
|
|
|
- self.name = ""
|
|
|
- self.form_fields = struct_child
|
|
|
-
|
|
|
- @property
|
|
|
- def index1(self):
|
|
|
- return self.index + 1
|
|
|
-
|
|
|
- @property
|
|
|
- def url(self):
|
|
|
- return "%s?step=%s" % (self.steps.page.url, self.index1)
|
|
|
-
|
|
|
- def get_form_fields(self):
|
|
|
- form_fields = OrderedDict()
|
|
|
- field_blocks = self.form_fields
|
|
|
- for struct_child in field_blocks:
|
|
|
- block = struct_child.block
|
|
|
- if isinstance(block, FormFieldBlock):
|
|
|
- struct_value = struct_child.value
|
|
|
- field_name = block.get_slug(struct_value)
|
|
|
- form_fields[field_name] = block.get_field(struct_value)
|
|
|
- return form_fields
|
|
|
-
|
|
|
- def get_form_class(self):
|
|
|
- return type(
|
|
|
- "WagtailForm",
|
|
|
- self.steps.page.get_form_class_bases(),
|
|
|
- self.get_form_fields(),
|
|
|
- )
|
|
|
-
|
|
|
- def get_markups_and_bound_fields(self, form):
|
|
|
- for struct_child in self.form_fields:
|
|
|
- block = struct_child.block
|
|
|
- if isinstance(block, FormFieldBlock):
|
|
|
- struct_value = struct_child.value
|
|
|
- field_name = block.get_slug(struct_value)
|
|
|
- yield form[field_name], "field"
|
|
|
- else:
|
|
|
- yield mark_safe(struct_child), "markup"
|
|
|
-
|
|
|
- def __str__(self):
|
|
|
- if self.name:
|
|
|
- return self.name
|
|
|
- return _("Step %s") % self.index1
|
|
|
-
|
|
|
- @property
|
|
|
- def badge(self):
|
|
|
- return mark_safe('<span class="badge">%s/%s</span>') % (
|
|
|
- self.index1,
|
|
|
- len(self.steps),
|
|
|
- )
|
|
|
-
|
|
|
- def __html__(self):
|
|
|
- return "%s %s" % (self, self.badge)
|
|
|
-
|
|
|
- @property
|
|
|
- def is_active(self):
|
|
|
- return self.index == self.steps.current_index
|
|
|
-
|
|
|
- @property
|
|
|
- def is_last(self):
|
|
|
- return self.index1 == len(self.steps)
|
|
|
-
|
|
|
- @property
|
|
|
- def has_prev(self):
|
|
|
- return self.index > 0
|
|
|
-
|
|
|
- @property
|
|
|
- def has_next(self):
|
|
|
- return self.index1 < len(self.steps)
|
|
|
-
|
|
|
- @property
|
|
|
- def prev(self):
|
|
|
- if self.has_prev:
|
|
|
- return self.steps[self.index - 1]
|
|
|
-
|
|
|
- @property
|
|
|
- def next(self):
|
|
|
- if self.has_next:
|
|
|
- return self.steps[self.index + 1]
|
|
|
-
|
|
|
- def get_existing_data(self, raw=False):
|
|
|
- data = self.steps.get_existing_data()[self.index]
|
|
|
- fields = self.get_form_fields()
|
|
|
- if not raw:
|
|
|
-
|
|
|
- class FakeField:
|
|
|
- storage = self.steps.get_storage()
|
|
|
-
|
|
|
- for field_name, value in data.items():
|
|
|
- if field_name in fields and isinstance(
|
|
|
- fields[field_name], FileField
|
|
|
- ):
|
|
|
- data[field_name] = FieldFile(None, FakeField, value)
|
|
|
- return data
|
|
|
-
|
|
|
- @property
|
|
|
- def is_available(self):
|
|
|
- return self.prev is None or self.prev.get_existing_data(raw=True)
|
|
|
-
|
|
|
-
|
|
|
-class StreamFormJSONEncoder(DjangoJSONEncoder):
|
|
|
- def default(self, o):
|
|
|
- try:
|
|
|
- from phonenumber_field.phonenumber import PhoneNumber
|
|
|
- except ImportError:
|
|
|
- pass
|
|
|
- else:
|
|
|
- if isinstance(o, PhoneNumber):
|
|
|
- return str(o)
|
|
|
-
|
|
|
- return super().default(o)
|
|
|
-
|
|
|
-
|
|
|
-class Steps(list):
|
|
|
- def __init__(self, page, request=None):
|
|
|
- self.page = page
|
|
|
- # TODO: Make it possible to change the `form_fields` attribute.
|
|
|
- self.form_fields = page.form_fields
|
|
|
- self.request = request
|
|
|
- has_steps = any(
|
|
|
- isinstance(struct_child.block, FormStepBlock)
|
|
|
- for struct_child in self.form_fields
|
|
|
- )
|
|
|
- if has_steps:
|
|
|
- steps = [
|
|
|
- Step(self, i, form_field)
|
|
|
- for i, form_field in enumerate(self.form_fields)
|
|
|
- ]
|
|
|
- else:
|
|
|
- steps = [Step(self, 0, self.form_fields)]
|
|
|
- super().__init__(steps)
|
|
|
-
|
|
|
- def clamp_index(self, index: int):
|
|
|
- if index < 0:
|
|
|
- index = 0
|
|
|
- if index >= len(self):
|
|
|
- index = len(self) - 1
|
|
|
- while not self[index].is_available:
|
|
|
- index -= 1
|
|
|
- return index
|
|
|
-
|
|
|
- @property
|
|
|
- def current_index(self):
|
|
|
- return self.request.session.get(self.page.current_step_session_key, 0)
|
|
|
-
|
|
|
- @property
|
|
|
- def current(self):
|
|
|
- return self[self.current_index]
|
|
|
-
|
|
|
- @current.setter
|
|
|
- def current(self, new_index: int):
|
|
|
- if not isinstance(new_index, int):
|
|
|
- raise TypeError("Use an integer to set the new current step.")
|
|
|
- self.request.session[self.page.current_step_session_key] = (
|
|
|
- self.clamp_index(new_index)
|
|
|
- )
|
|
|
-
|
|
|
- def forward(self, increment: int = 1):
|
|
|
- self.current = self.current_index + increment
|
|
|
-
|
|
|
- def backward(self, increment: int = 1):
|
|
|
- self.current = self.current_index - increment
|
|
|
-
|
|
|
- def get_submission(self):
|
|
|
- return self.page.get_submission(self.request)
|
|
|
-
|
|
|
- def get_existing_data(self):
|
|
|
- submission = self.get_submission()
|
|
|
- data = [] if submission is None else json.loads(submission.form_data)
|
|
|
- length_difference = len(self) - len(data)
|
|
|
- if length_difference > 0:
|
|
|
- data.extend([{}] * length_difference)
|
|
|
- return data
|
|
|
-
|
|
|
- def get_current_form(self):
|
|
|
- request = self.request
|
|
|
- if request.method == "POST":
|
|
|
- step_value = request.POST.get("step", "next")
|
|
|
- if step_value == "prev":
|
|
|
- self.backward()
|
|
|
- else:
|
|
|
- return self.current.get_form_class()(
|
|
|
- request.POST,
|
|
|
- request.FILES,
|
|
|
- initial=self.current.get_existing_data(),
|
|
|
- )
|
|
|
- return self.current.get_form_class()(
|
|
|
- initial=self.current.get_existing_data()
|
|
|
- )
|
|
|
-
|
|
|
- def get_storage(self):
|
|
|
- return self.page.get_storage()
|
|
|
-
|
|
|
- def save_files(self, form):
|
|
|
- submission = self.get_submission()
|
|
|
- for name, field in form.fields.items():
|
|
|
- if isinstance(field, FileField):
|
|
|
- file = form.cleaned_data[name]
|
|
|
- if file == form.initial.get(name, ""): # Nothing submitted.
|
|
|
- form.cleaned_data[name] = file.name
|
|
|
- continue
|
|
|
- if submission is not None:
|
|
|
- submission.delete_file(name)
|
|
|
- if not file: # 'Clear' was checked.
|
|
|
- form.cleaned_data[name] = ""
|
|
|
- continue
|
|
|
- directory = self.request.session.session_key
|
|
|
- storage = self.get_storage()
|
|
|
- Path(storage.path(directory)).mkdir(parents=True, exist_ok=True)
|
|
|
- path = storage.get_available_name(
|
|
|
- str(Path(directory) / file.name)
|
|
|
- )
|
|
|
- with storage.open(path, "wb+") as destination:
|
|
|
- for chunk in file.chunks():
|
|
|
- destination.write(chunk)
|
|
|
- form.cleaned_data[name] = path
|
|
|
-
|
|
|
- def update_data(self):
|
|
|
- form = self.get_current_form()
|
|
|
- if form.is_valid():
|
|
|
- form_data = self.get_existing_data()
|
|
|
- self.save_files(form)
|
|
|
- form_data[self.current_index] = form.cleaned_data
|
|
|
- form_data = json.dumps(form_data, cls=StreamFormJSONEncoder)
|
|
|
- is_complete = self.current.is_last
|
|
|
- submission = self.get_submission()
|
|
|
- submission.form_data = form_data
|
|
|
- if not submission.is_complete and is_complete:
|
|
|
- submission.status = submission.COMPLETE
|
|
|
- submission.save()
|
|
|
- if is_complete:
|
|
|
- self.current = 0
|
|
|
- else:
|
|
|
- self.forward()
|
|
|
- return is_complete
|
|
|
- return False
|
|
|
-
|
|
|
-
|
|
|
-class SessionFormSubmission(AbstractFormSubmission):
|
|
|
- session_key = CharField(max_length=40, null=True, default=None)
|
|
|
- user = ForeignKey(
|
|
|
- settings.AUTH_USER_MODEL,
|
|
|
- null=True,
|
|
|
- blank=True,
|
|
|
- related_name="+",
|
|
|
- on_delete=PROTECT,
|
|
|
- )
|
|
|
- thumbnails_by_path = TextField(default=json.dumps({}))
|
|
|
- last_modification = DateTimeField(_("last modification"), auto_now=True)
|
|
|
- INCOMPLETE = "incomplete"
|
|
|
- COMPLETE = "complete"
|
|
|
- REVIEWED = "reviewed"
|
|
|
- APPROVED = "approved"
|
|
|
- REJECTED = "rejected"
|
|
|
- STATUSES = (
|
|
|
- (INCOMPLETE, _("Not submitted")),
|
|
|
- (COMPLETE, _("In progress")),
|
|
|
- (REVIEWED, _("Under consideration")),
|
|
|
- (APPROVED, _("Approved")),
|
|
|
- (REJECTED, _("Rejected")),
|
|
|
- )
|
|
|
- status = CharField(max_length=10, choices=STATUSES, default=INCOMPLETE)
|
|
|
-
|
|
|
- class Meta:
|
|
|
- verbose_name = _("form submission")
|
|
|
- verbose_name_plural = _("form submissions")
|
|
|
- unique_together = (("page", "session_key"), ("page", "user"))
|
|
|
- abstract = True
|
|
|
-
|
|
|
- @property
|
|
|
- def is_complete(self):
|
|
|
- return self.status != self.INCOMPLETE
|
|
|
-
|
|
|
- @property
|
|
|
- def form_page(self):
|
|
|
- return self.page.specific
|
|
|
-
|
|
|
- def get_session(self):
|
|
|
- return import_module(settings.SESSION_ENGINE).SessionStore(
|
|
|
- session_key=self.session_key
|
|
|
- )
|
|
|
-
|
|
|
- def reset_step(self):
|
|
|
- session = self.get_session()
|
|
|
- try:
|
|
|
- del session[self.form_page.current_step_session_key]
|
|
|
- except KeyError:
|
|
|
- pass
|
|
|
- else:
|
|
|
- session.save()
|
|
|
-
|
|
|
- def get_storage(self):
|
|
|
- return self.form_page.get_storage()
|
|
|
-
|
|
|
- def get_thumbnail_path(self, path, width=64, height=64):
|
|
|
- if not path:
|
|
|
- return ""
|
|
|
- variant = "%s×%s" % (width, height)
|
|
|
- thumbnails_by_path = json.loads(self.thumbnails_by_path)
|
|
|
- thumbnails_paths = thumbnails_by_path.get(path)
|
|
|
- if thumbnails_paths is None:
|
|
|
- thumbnails_by_path[path] = {}
|
|
|
- else:
|
|
|
- thumbnail_path = thumbnails_paths.get(variant)
|
|
|
- if thumbnail_path is not None:
|
|
|
- return thumbnail_path
|
|
|
-
|
|
|
- path = Path(path)
|
|
|
- thumbnail_path = str(path.with_suffix(".%s%s" % (variant, path.suffix)))
|
|
|
- storage = self.get_storage()
|
|
|
- thumbnail_path = storage.get_available_name(thumbnail_path)
|
|
|
-
|
|
|
- thumbnail = Image.open(storage.path(path))
|
|
|
- thumbnail.thumbnail((width, height))
|
|
|
- thumbnail.save(storage.path(thumbnail_path))
|
|
|
-
|
|
|
- thumbnails_by_path[str(path)][variant] = thumbnail_path
|
|
|
- self.thumbnails_by_path = json.dumps(
|
|
|
- thumbnails_by_path, cls=StreamFormJSONEncoder
|
|
|
- )
|
|
|
- self.save()
|
|
|
- return thumbnail_path
|
|
|
-
|
|
|
- def get_fields(self, by_step=False):
|
|
|
- return self.form_page.get_form_fields(by_step=by_step)
|
|
|
-
|
|
|
- def get_existing_thumbnails(self, path):
|
|
|
- thumbnails_paths = json.loads(self.thumbnails_by_path).get(path, {})
|
|
|
- for thumbnail_path in thumbnails_paths.values():
|
|
|
- yield thumbnail_path
|
|
|
-
|
|
|
- def get_files_by_field(self):
|
|
|
- data = self.get_data(raw=True)
|
|
|
- files = {}
|
|
|
- for name, field in self.get_fields().items():
|
|
|
- if isinstance(field, FileField):
|
|
|
- path = data.get(name)
|
|
|
- if path:
|
|
|
- files[name] = [path] + list(
|
|
|
- self.get_existing_thumbnails(path)
|
|
|
- )
|
|
|
- return files
|
|
|
-
|
|
|
- def get_all_files(self):
|
|
|
- for paths in self.get_files_by_field().values():
|
|
|
- for path in paths:
|
|
|
- yield path
|
|
|
-
|
|
|
- def delete_file(self, field_name):
|
|
|
- thumbnails_by_path = json.loads(self.thumbnails_by_path)
|
|
|
- for path in self.get_files_by_field().get(field_name, ()):
|
|
|
- self.get_storage().delete(path)
|
|
|
- if path in thumbnails_by_path:
|
|
|
- del thumbnails_by_path[path]
|
|
|
- self.thumbnails_by_path = json.dumps(
|
|
|
- thumbnails_by_path, cls=StreamFormJSONEncoder
|
|
|
- )
|
|
|
- self.save()
|
|
|
-
|
|
|
- def render_email(self, value):
|
|
|
- return mark_safe('<a href="mailto:%s" target="_blank">%s</a>') % (
|
|
|
- value,
|
|
|
- value,
|
|
|
- )
|
|
|
-
|
|
|
- def render_link(self, value):
|
|
|
- return mark_safe('<a href="%s" target="_blank">%s</a>') % (value, value)
|
|
|
-
|
|
|
- def render_image(self, value):
|
|
|
- storage = self.get_storage()
|
|
|
- return mark_safe(
|
|
|
- '<a href="%s" target="_blank"><img src="%s" /></a>'
|
|
|
- ) % (storage.url(value), storage.url(self.get_thumbnail_path(value)))
|
|
|
-
|
|
|
- def render_file(self, value):
|
|
|
- return mark_safe('<a href="%s" target="_blank">%s</a>') % (
|
|
|
- self.get_storage().url(value),
|
|
|
- Path(value).name,
|
|
|
- )
|
|
|
-
|
|
|
- def format_value(self, field, value):
|
|
|
- if value is None or value == "":
|
|
|
- return "-"
|
|
|
- new_value = self.form_page.format_value(field, value)
|
|
|
- if new_value != value:
|
|
|
- return new_value
|
|
|
- if value is True:
|
|
|
- return "Yes"
|
|
|
- if value is False:
|
|
|
- return "No"
|
|
|
- if isinstance(value, (list, tuple)):
|
|
|
- return ", ".join([self.format_value(field, item) for item in value])
|
|
|
- if isinstance(value, datetime.date):
|
|
|
- return value
|
|
|
- if isinstance(field, EmailField):
|
|
|
- return self.render_email(value)
|
|
|
- if isinstance(field, URLField):
|
|
|
- return self.render_link(value)
|
|
|
- if isinstance(field, ImageField):
|
|
|
- return self.render_image(value)
|
|
|
- if isinstance(field, FileField):
|
|
|
- return self.render_file(value)
|
|
|
- if isinstance(value, SafeData) or hasattr(value, "__html__"):
|
|
|
- return value
|
|
|
- return str(value)
|
|
|
-
|
|
|
- def format_db_field(self, field_name, raw=False):
|
|
|
- method = getattr(self, "get_%s_display" % field_name, None)
|
|
|
- if method is not None:
|
|
|
- return method()
|
|
|
- value = getattr(self, field_name)
|
|
|
- if raw:
|
|
|
- return value
|
|
|
- return self.format_value(
|
|
|
- self._meta.get_field(field_name).formfield(), value
|
|
|
- )
|
|
|
-
|
|
|
- def get_steps_data(self, raw=False):
|
|
|
- steps_data = json.loads(self.form_data)
|
|
|
- if raw:
|
|
|
- return steps_data
|
|
|
- fields_and_data_iterator = zip_longest(
|
|
|
- self.get_fields(by_step=True), steps_data, fillvalue={}
|
|
|
- )
|
|
|
- return [
|
|
|
- OrderedDict(
|
|
|
- [
|
|
|
- (name, self.format_value(field, step_data.get(name)))
|
|
|
- for name, field in step_fields.items()
|
|
|
- ]
|
|
|
- )
|
|
|
- for step_fields, step_data in fields_and_data_iterator
|
|
|
- ]
|
|
|
-
|
|
|
- def get_extra_data(self, raw=False):
|
|
|
- return self.form_page.get_extra_data(self, raw=raw)
|
|
|
-
|
|
|
- def get_data(self, raw=False, add_metadata=True):
|
|
|
- steps_data = self.get_steps_data(raw=raw)
|
|
|
- form_data = {}
|
|
|
- form_data.update(self.get_extra_data(raw=raw))
|
|
|
- for step_data in steps_data:
|
|
|
- form_data.update(step_data)
|
|
|
- if add_metadata:
|
|
|
- form_data.update(
|
|
|
- status=self.format_db_field("status", raw=raw),
|
|
|
- user=self.format_db_field("user", raw=raw),
|
|
|
- submit_time=self.format_db_field("submit_time", raw=raw),
|
|
|
- last_modification=self.format_db_field(
|
|
|
- "last_modification", raw=raw
|
|
|
- ),
|
|
|
- )
|
|
|
- return form_data
|
|
|
-
|
|
|
- def steps_with_data_iterator(self, raw=False):
|
|
|
- for step, step_data_fields, step_data in zip(
|
|
|
- self.form_page.get_steps(),
|
|
|
- self.form_page.get_data_fields(by_step=True),
|
|
|
- self.get_steps_data(raw=raw),
|
|
|
- ):
|
|
|
- yield (
|
|
|
- step,
|
|
|
- [
|
|
|
- (field_name, field_label, step_data[field_name])
|
|
|
- for field_name, field_label in step_data_fields
|
|
|
- ],
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
-@receiver(post_delete, sender=SessionFormSubmission)
|
|
|
-def delete_files(sender, **kwargs):
|
|
|
- instance = kwargs["instance"]
|
|
|
- instance.reset_step()
|
|
|
- storage = instance.get_storage()
|
|
|
- for path in instance.get_all_files():
|
|
|
- storage.delete(path)
|
|
|
-
|
|
|
- # Automatically deletes ancestor folders if empty.
|
|
|
- directory = Path(path)
|
|
|
- while directory.parent != Path(directory.root):
|
|
|
- directory = directory.parent
|
|
|
- try:
|
|
|
- subdirectories, files = storage.listdir(directory)
|
|
|
- except FileNotFoundError:
|
|
|
- continue
|
|
|
- if not subdirectories and not files:
|
|
|
- Path(storage.path(directory)).rmdir()
|
|
|
-
|
|
|
-
|
|
|
-class SubmissionRevisionQuerySet(QuerySet):
|
|
|
- def for_submission(self, submission):
|
|
|
- return self.filter(**self.model.get_filters_for(submission))
|
|
|
-
|
|
|
- def created(self):
|
|
|
- return self.filter(type=self.model.CREATED)
|
|
|
-
|
|
|
- def changed(self):
|
|
|
- return self.filter(type=self.model.CHANGED)
|
|
|
-
|
|
|
- def deleted(self):
|
|
|
- return self.filter(type=self.model.DELETED)
|
|
|
-
|
|
|
-
|
|
|
-class SubmissionRevision(Model):
|
|
|
- CREATED = "created"
|
|
|
- CHANGED = "changed"
|
|
|
- DELETED = "deleted"
|
|
|
- TYPES = (
|
|
|
- (CREATED, _("Created")),
|
|
|
- (CHANGED, _("Changed")),
|
|
|
- (DELETED, _("Deleted")),
|
|
|
- )
|
|
|
- type = CharField(max_length=7, choices=TYPES)
|
|
|
- created_at = DateTimeField(auto_now_add=True)
|
|
|
- submission_ct = ForeignKey("contenttypes.ContentType", on_delete=CASCADE)
|
|
|
- submission_id = TextField()
|
|
|
- submission = GenericForeignKey("submission_ct", "submission_id")
|
|
|
- data = TextField()
|
|
|
- summary = TextField()
|
|
|
-
|
|
|
- objects = SubmissionRevisionQuerySet.as_manager()
|
|
|
-
|
|
|
- class Meta:
|
|
|
- ordering = ("-created_at",)
|
|
|
- abstract = True
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def get_filters_for(submission):
|
|
|
- return {
|
|
|
- "submission_ct": ContentType.objects.get_for_model(
|
|
|
- submission._meta.model
|
|
|
- ),
|
|
|
- "submission_id": str(submission.pk),
|
|
|
- }
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def diff_summary(cls, page, data1, data2):
|
|
|
- diff = []
|
|
|
- data_fields = page.get_data_fields()
|
|
|
- hidden_types = (tuple, list, dict)
|
|
|
- for k, label in data_fields:
|
|
|
- value1 = data1.get(k)
|
|
|
- value2 = data2.get(k)
|
|
|
- if value2 == value1 or not value1 and not value2:
|
|
|
- continue
|
|
|
- is_hidden = isinstance(value1, hidden_types) or isinstance(
|
|
|
- value2, hidden_types
|
|
|
- )
|
|
|
-
|
|
|
- # Escapes newlines as they are used as separator inside summaries.
|
|
|
- if isinstance(value1, str):
|
|
|
- value1 = value1.replace("\n", r"\n")
|
|
|
- if isinstance(value2, str):
|
|
|
- value2 = value2.replace("\n", r"\n")
|
|
|
-
|
|
|
- if value2 and not value1:
|
|
|
- diff.append(
|
|
|
- (
|
|
|
- (_("“%s” set.") % label)
|
|
|
- if is_hidden
|
|
|
- else (_("“%s” set to “%s”.")) % (label, value2)
|
|
|
- )
|
|
|
- )
|
|
|
- elif value1 and not value2:
|
|
|
- diff.append(_("“%s” unset.") % label)
|
|
|
- else:
|
|
|
- diff.append(
|
|
|
- (
|
|
|
- (_("“%s” changed.") % label)
|
|
|
- if is_hidden
|
|
|
- else (
|
|
|
- _("“%s” changed from “%s” to “%s”.")
|
|
|
- % (label, value1, value2)
|
|
|
- )
|
|
|
- )
|
|
|
- )
|
|
|
- return "\n".join(diff)
|
|
|
-
|
|
|
- @classmethod
|
|
|
- def create_from_submission(cls, submission, revision_type):
|
|
|
- page = submission.form_page
|
|
|
- try:
|
|
|
- previous = cls.objects.for_submission(submission).latest(
|
|
|
- "created_at"
|
|
|
- )
|
|
|
- except cls.DoesNotExist:
|
|
|
- previous_data = {}
|
|
|
- else:
|
|
|
- previous_data = previous.get_data()
|
|
|
- filters = cls.get_filters_for(submission)
|
|
|
- data = submission.get_data(raw=True, add_metadata=False)
|
|
|
- data["status"] = submission.status
|
|
|
- if revision_type == cls.CREATED:
|
|
|
- summary = _("Submission created.")
|
|
|
- elif revision_type == cls.DELETED:
|
|
|
- summary = _("Submission deleted.")
|
|
|
- else:
|
|
|
- summary = cls.diff_summary(page, previous_data, data)
|
|
|
- if not summary: # Nothing changed.
|
|
|
- return
|
|
|
- filters.update(
|
|
|
- type=revision_type,
|
|
|
- data=json.dumps(data, cls=StreamFormJSONEncoder),
|
|
|
- summary=summary,
|
|
|
- )
|
|
|
- return cls.objects.create(**filters)
|
|
|
-
|
|
|
- def get_data(self):
|
|
|
- return json.loads(self.data)
|
|
|
-
|
|
|
-
|
|
|
-# ORIGINAL NORIPYT CODE.
|
|
|
-# We don't want these receivers triggering.
|
|
|
-
|
|
|
-# @receiver(post_save)
|
|
|
-# def create_submission_changed_revision(sender, **kwargs):
|
|
|
-# if not issubclass(sender, SessionFormSubmission):
|
|
|
-# return
|
|
|
-# submission = kwargs['instance']
|
|
|
-# created = kwargs['created']
|
|
|
-# SubmissionRevision.create_from_submission(
|
|
|
-# submission, (SubmissionRevision.CREATED if created
|
|
|
-# else SubmissionRevision.CHANGED))
|
|
|
-
|
|
|
-
|
|
|
-# @receiver(post_delete)
|
|
|
-# def create_submission_deleted_revision(sender, **kwargs):
|
|
|
-# if not issubclass(sender, SessionFormSubmission):
|
|
|
-# return
|
|
|
-# submission = kwargs['instance']
|
|
|
-# SubmissionRevision.create_from_submission(submission,
|
|
|
-# SubmissionRevision.DELETED)
|
|
|
-
|
|
|
-
|
|
|
-class StreamFormMixin:
|
|
|
- @property
|
|
|
- def current_step_session_key(self):
|
|
|
- return "%s:step" % self.pk
|
|
|
-
|
|
|
- def get_steps(self, request=None):
|
|
|
- if not hasattr(self, "steps"):
|
|
|
- steps = Steps(self, request=request)
|
|
|
- if request is None:
|
|
|
- return steps
|
|
|
- self.steps = steps
|
|
|
- return self.steps
|
|
|
-
|
|
|
- def get_form_fields(self, by_step=False):
|
|
|
- if by_step:
|
|
|
- return [step.get_form_fields() for step in self.get_steps()]
|
|
|
- form_fields = OrderedDict()
|
|
|
- for step_fields in self.get_form_fields(by_step=True):
|
|
|
- form_fields.update(step_fields)
|
|
|
- return form_fields
|
|
|
-
|
|
|
- def get_context(self, request, *args, **kwargs):
|
|
|
- context = super().get_context(request, *args, **kwargs)
|
|
|
- self.steps = self.get_steps(request)
|
|
|
- step_value = request.GET.get("step")
|
|
|
- if step_value is not None and step_value.isdigit():
|
|
|
- self.steps.current = int(step_value) - 1
|
|
|
- form = self.steps.get_current_form()
|
|
|
- context.update(
|
|
|
- steps=self.steps,
|
|
|
- step=self.steps.current,
|
|
|
- form=form,
|
|
|
- markups_and_bound_fields=list(
|
|
|
- self.steps.current.get_markups_and_bound_fields(form)
|
|
|
- ),
|
|
|
- )
|
|
|
- return context
|
|
|
-
|
|
|
- def get_storage(self):
|
|
|
- return default_storage
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def get_form_class_bases():
|
|
|
- return (Form,)
|
|
|
-
|
|
|
- @staticmethod
|
|
|
- def get_submission_class():
|
|
|
- return SessionFormSubmission
|
|
|
-
|
|
|
- def get_submission(self, request):
|
|
|
- Submission = self.get_submission_class()
|
|
|
- if request.user.is_authenticated:
|
|
|
- user_submission = (
|
|
|
- Submission.objects.filter(user=request.user, page=self)
|
|
|
- .order_by("-pk")
|
|
|
- .first()
|
|
|
- )
|
|
|
- if user_submission is None:
|
|
|
- return Submission(user=request.user, page=self, form_data="[]")
|
|
|
- return user_submission
|
|
|
-
|
|
|
- user_submission = (
|
|
|
- Submission.objects.filter(
|
|
|
- session_key=request.session.session_key, page=self
|
|
|
- )
|
|
|
- .order_by("-pk")
|
|
|
- .first()
|
|
|
- )
|
|
|
- if user_submission is None:
|
|
|
- return Submission(
|
|
|
- session_key=request.session.session_key,
|
|
|
- page=self,
|
|
|
- form_data="[]",
|
|
|
- )
|
|
|
- return user_submission
|
|
|
-
|
|
|
- def get_success_url(self):
|
|
|
- form_complete_models = [
|
|
|
- model
|
|
|
- for model in apps.get_models()
|
|
|
- if issubclass(model, FormCompleteMixin)
|
|
|
- ]
|
|
|
- cts = ContentType.objects.get_for_models(*form_complete_models).values()
|
|
|
- first_child = self.get_children().filter(content_type__in=cts).first()
|
|
|
- if first_child is None:
|
|
|
- return self.url
|
|
|
- return first_child.url
|
|
|
-
|
|
|
- def serve_success(self, request, *args, **kwargs):
|
|
|
- url = self.get_success_url()
|
|
|
- if url == self.url:
|
|
|
- messages.success(request, _("Successfully submitted the form."))
|
|
|
- return HttpResponseRedirect(url)
|
|
|
-
|
|
|
- def serve(self, request, *args, **kwargs):
|
|
|
- context = self.get_context(request)
|
|
|
- form = context["form"]
|
|
|
- if request.method == "POST" and form.is_valid():
|
|
|
- is_complete = self.steps.update_data()
|
|
|
- if is_complete:
|
|
|
- return self.serve_success(request, *args, **kwargs)
|
|
|
- return HttpResponseRedirect(self.url)
|
|
|
- return super().serve(self, request, *args, **kwargs)
|
|
|
-
|
|
|
- def get_data_fields(self, by_step=False, add_metadata=True):
|
|
|
- if by_step:
|
|
|
- return [
|
|
|
- [
|
|
|
- (field_name, field.label)
|
|
|
- for field_name, field in step_fields.items()
|
|
|
- ]
|
|
|
- for step_fields in self.get_form_fields(by_step=True)
|
|
|
- ]
|
|
|
-
|
|
|
- data_fields = []
|
|
|
- data_fields.extend(self.get_extra_data_fields())
|
|
|
- if add_metadata:
|
|
|
- data_fields.extend(
|
|
|
- (
|
|
|
- ("status", _("Status")),
|
|
|
- ("user", _("User")),
|
|
|
- ("submit_time", _("First modification")),
|
|
|
- ("last_modification", _("Last modification")),
|
|
|
- )
|
|
|
- )
|
|
|
- data_fields.extend(
|
|
|
- [
|
|
|
- (field_name, field_label)
|
|
|
- for step_data_fields in self.get_data_fields(by_step=True)
|
|
|
- for field_name, field_label in step_data_fields
|
|
|
- ]
|
|
|
- )
|
|
|
- return data_fields
|
|
|
-
|
|
|
- def get_extra_data_fields(self):
|
|
|
- return ()
|
|
|
-
|
|
|
- def get_extra_data(self, submission, raw=False):
|
|
|
- return {}
|
|
|
-
|
|
|
- def format_value(self, field, value):
|
|
|
- return value
|
|
|
-
|
|
|
-
|
|
|
-class ClosingFormMixin(Model):
|
|
|
- closing_at = DateTimeField()
|
|
|
-
|
|
|
- closed_template = None
|
|
|
-
|
|
|
- class Meta:
|
|
|
- abstract = True
|
|
|
-
|
|
|
- @property
|
|
|
- def is_closed(self):
|
|
|
- return now() > self.closing_at
|
|
|
-
|
|
|
- def get_closed_template(self, request, *args, **kwargs):
|
|
|
- if self.closed_template is None:
|
|
|
- template = self.get_template(request, *args, **kwargs)
|
|
|
- base, ext = os.path.splitext(template)
|
|
|
- return "%s_closed%s" % (base, ext)
|
|
|
- return self.closed_template
|
|
|
-
|
|
|
- def serve_closed(self, request, *args, **kwargs):
|
|
|
- return TemplateResponse(
|
|
|
- request,
|
|
|
- self.get_closed_template(request, *args, **kwargs),
|
|
|
- self.get_context(request, *args, **kwargs),
|
|
|
- )
|
|
|
-
|
|
|
- def serve(self, request, *args, **kwargs):
|
|
|
- if self.is_closed:
|
|
|
- return self.serve_closed(request, *args, **kwargs)
|
|
|
- return super().serve(request, *args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
-class FormCompleteMixin:
|
|
|
- def get_form_page(self):
|
|
|
- return self.get_parent().specific
|
|
|
-
|
|
|
- def serve(self, request, *args, **kwargs):
|
|
|
- form_page = self.get_form_page()
|
|
|
- if (
|
|
|
- isinstance(form_page, LoginRequiredMixin)
|
|
|
- and not request.user.is_authenticated()
|
|
|
- ):
|
|
|
- return HttpResponseRedirect(form_page.url)
|
|
|
- self.submission = form_page.get_submission(request)
|
|
|
- if (
|
|
|
- self.submission is not None
|
|
|
- and self.submission.is_complete
|
|
|
- or getattr(request, "is_preview", False)
|
|
|
- ):
|
|
|
- return super().serve(request, *args, **kwargs)
|
|
|
- return HttpResponseRedirect(form_page.url)
|
|
|
-
|
|
|
- def get_context(self, *args, **kwargs):
|
|
|
- context = super().get_context(*args, **kwargs)
|
|
|
- if hasattr(self, "submission"):
|
|
|
- context["submission"] = self.submission
|
|
|
- return context
|
|
|
-
|
|
|
-
|
|
|
-class LoginRequiredMixin:
|
|
|
- login_required_template = None
|
|
|
-
|
|
|
- def get_login_required_template(self, request, *args, **kwargs):
|
|
|
- if self.login_required_template is None:
|
|
|
- template = self.get_template(request, *args, **kwargs)
|
|
|
- base, ext = os.path.splitext(template)
|
|
|
- return "%s_login_required%s" % (base, ext)
|
|
|
- return self.login_required_template
|
|
|
-
|
|
|
- def serve_login_required(self, request, *args, **kwargs):
|
|
|
- return TemplateResponse(
|
|
|
- request,
|
|
|
- self.get_login_required_template(request, *args, **kwargs),
|
|
|
- self.get_context(request, *args, **kwargs),
|
|
|
- )
|
|
|
-
|
|
|
- def serve(self, request, *args, **kwargs):
|
|
|
- if not request.user.is_authenticated():
|
|
|
- return self.serve_login_required(request, *args, **kwargs)
|
|
|
- return super().serve(request, *args, **kwargs)
|
|
|
-
|
|
|
-
|
|
|
-class AbstractStreamForm(StreamFormMixin, AbstractForm):
|
|
|
- class Meta:
|
|
|
- abstract = True
|
|
|
-
|
|
|
-
|
|
|
-class AbstractEmailStreamForm(StreamFormMixin, AbstractEmailForm):
|
|
|
- class Meta:
|
|
|
- abstract = True
|