views.py 11 KB


  1. import mimetypes
  2. import os
  3. from datetime import datetime
  4. from django.http import (
  5. Http404,
  6. HttpResponse,
  7. HttpResponsePermanentRedirect,
  8. JsonResponse,
  9. )
  10. from django.contrib.auth.decorators import login_required, permission_required
  11. from django.contrib.contenttypes.models import ContentType
  12. from django.core.paginator import (
  13. Paginator,
  14. InvalidPage,
  15. EmptyPage,
  16. PageNotAnInteger,
  17. )
  18. from django.shortcuts import redirect, render
  19. from django.utils import timezone
  20. from django.utils.translation import ngettext, gettext_lazy as _
  21. from django.views.decorators.http import require_POST
  22. from icalendar import Calendar
  23. from wagtail.admin import messages
  24. from wagtail.models import Page, get_page_models
  25. from wagtail.search.backends import get_search_backend
  26. from wagtail.search.backends.database.mysql.mysql import MySQLSearchBackend
  27. from coderedcms import utils
  28. from coderedcms.forms import SearchForm
  29. from coderedcms.models import CoderedPage, LayoutSettings
  30. from coderedcms.importexport import (
  31. convert_csv_to_json,
  32. import_pages,
  33. ImportPagesFromCSVFileForm,
  34. )
  35. from coderedcms.settings import crx_settings
  36. from coderedcms.templatetags.coderedcms_tags import get_name_of_class
  37. def search(request):
  38. """
  39. Searches pages across the entire site.
  40. """
  41. search_form = SearchForm(request.GET)
  42. pagetypes = []
  43. results = None
  44. results_paginated = None
  45. if search_form.is_valid():
  46. search_query = search_form.cleaned_data["s"]
  47. search_model = search_form.cleaned_data["t"]
  48. # get all page models
  49. pagemodels = sorted(get_page_models(), key=get_name_of_class)
  50. # filter based on is search_filterable
  51. for model in pagemodels:
  52. if hasattr(model, "search_filterable") and model.search_filterable:
  53. pagetypes.append(model)
  54. results = Page.objects.live()
  55. if search_model:
  56. try:
  57. # If provided a model name, try to get it
  58. model = ContentType.objects.get(
  59. model=search_model
  60. ).model_class()
  61. # Workaround for Wagtail MySQL search bug.
  62. # See: https://github.com/wagtail/wagtail/issues/11273
  63. backend = get_search_backend()
  64. if type(backend) is MySQLSearchBackend:
  65. results = model.objects.live()
  66. else:
  67. results = results.type(model)
  68. except ContentType.DoesNotExist:
  69. # Maintain existing behavior of only returning objects if the page type is real
  70. results = None
  71. # get and paginate results
  72. if results:
  73. results = results.search(search_query)
  74. paginator = Paginator(
  75. results, LayoutSettings.for_request(request).search_num_results
  76. )
  77. page = request.GET.get("p", 1)
  78. try:
  79. results_paginated = paginator.page(page)
  80. except PageNotAnInteger:
  81. results_paginated = paginator.page(1)
  82. except EmptyPage:
  83. results_paginated = paginator.page(1)
  84. except InvalidPage:
  85. results_paginated = paginator.page(1)
  86. # Render template
  87. return render(
  88. request,
  89. "coderedcms/pages/search.html",
  90. {
  91. "request": request,
  92. "pagetypes": pagetypes,
  93. "form": search_form,
  94. "results": results,
  95. "results_paginated": results_paginated,
  96. },
  97. )
  98. @login_required
  99. def serve_protected_file(request, path):
  100. """
  101. Function that serves protected files uploaded from forms.
  102. """
  103. # Fully resolve all provided paths.
  104. mediapath = os.path.abspath(crx_settings.CRX_PROTECTED_MEDIA_ROOT)
  105. fullpath = os.path.abspath(os.path.join(mediapath, path))
  106. # Path must be a sub-path of the PROTECTED_MEDIA_ROOT, and exist.
  107. if fullpath.startswith(mediapath) and os.path.isfile(fullpath):
  108. mimetype, encoding = mimetypes.guess_type(fullpath)
  109. with open(fullpath, "rb") as f:
  110. response = HttpResponse(f.read(), content_type=mimetype)
  111. if encoding:
  112. response["Content-Encoding"] = encoding
  113. return response
  114. raise Http404()
  115. def favicon(request):
  116. icon = LayoutSettings.for_request(request).favicon
  117. if icon:
  118. return HttpResponsePermanentRedirect(
  119. icon.get_rendition("fill-256x256|format-webp").url
  120. )
  121. raise Http404()
  122. def robots(request):
  123. return render(request, "robots.txt", content_type="text/plain")
  124. @require_POST
  125. def event_generate_single_ical_for_event(request):
  126. # Parse input.
  127. try:
  128. event_pk = request.POST["event_pk"]
  129. except KeyError:
  130. return HttpResponse("event_pk required", status=400)
  131. try:
  132. dt_start_str = utils.fix_ical_datetime_format(
  133. request.POST["datetime_start"]
  134. )
  135. dt_end_str = utils.fix_ical_datetime_format(
  136. request.POST["datetime_end"]
  137. )
  138. dt_start = None
  139. dt_end = None
  140. if dt_start_str:
  141. dt_start = datetime.strptime(dt_start_str, "%Y-%m-%dT%H:%M:%S%z")
  142. if dt_end_str:
  143. dt_end = datetime.strptime(dt_end_str, "%Y-%m-%dT%H:%M:%S%z")
  144. except KeyError:
  145. return HttpResponse(
  146. "datetime_start and datetime_end required.",
  147. status=400,
  148. )
  149. except ValueError:
  150. return HttpResponse(
  151. "datetime_start and datetime_end must be valid datetimes.",
  152. status=400,
  153. )
  154. # Get the page.
  155. try:
  156. event = CoderedPage.objects.get(pk=event_pk).specific
  157. except (CoderedPage.DoesNotExist, ValueError):
  158. raise Http404("Event does not exist")
  159. # Generate the ical file.
  160. ical = Calendar()
  161. ical.add("prodid", "-//Wagtail CRX//")
  162. ical.add("version", "2.0")
  163. ical.add_component(
  164. event.create_single_ical(dt_start=dt_start, dt_end=dt_end)
  165. )
  166. response = HttpResponse(ical.to_ical(), content_type="text/calendar")
  167. response["Filename"] = "{0}.ics".format(event.slug)
  168. response["Content-Disposition"] = "attachment; filename={0}.ics".format(
  169. event.slug
  170. )
  171. return response
  172. @require_POST
  173. def event_generate_recurring_ical_for_event(request):
  174. # Parse input.
  175. try:
  176. event_pk = request.POST["event_pk"]
  177. except KeyError:
  178. return HttpResponse("event_pk required", status=400)
  179. # Get the page.
  180. try:
  181. event = CoderedPage.objects.get(pk=event_pk).specific
  182. except (CoderedPage.DoesNotExist, ValueError):
  183. raise Http404("Event does not exist")
  184. # Generate the ical file.
  185. ical = Calendar()
  186. ical.add("prodid", "-//Wagtail CRX//")
  187. ical.add("version", "2.0")
  188. for e in event.create_recurring_ical():
  189. ical.add_component(e)
  190. response = HttpResponse(ical.to_ical(), content_type="text/calendar")
  191. response["Filename"] = "{0}.ics".format(event.slug)
  192. response["Content-Disposition"] = "attachment; filename={0}.ics".format(
  193. event.slug
  194. )
  195. return response
  196. @require_POST
  197. def event_generate_ical_for_calendar(request):
  198. # Parse input.
  199. try:
  200. page_id = request.POST["page_id"]
  201. except KeyError:
  202. return HttpResponse("page_id required", status=400)
  203. # Get the page.
  204. try:
  205. page = CoderedPage.objects.get(pk=page_id).specific
  206. except (CoderedPage.DoesNotExist, ValueError):
  207. raise Http404("Page does not exist")
  208. # Generate the ical file.
  209. ical = Calendar()
  210. ical.add("prodid", "-//Wagtail CRX//")
  211. ical.add("version", "2.0")
  212. for event_page in page.get_index_children():
  213. for e in event_page.specific.create_recurring_ical():
  214. ical.add_component(e)
  215. response = HttpResponse(ical.to_ical(), content_type="text/calendar")
  216. response["Filename"] = "calendar.ics"
  217. response["Content-Disposition"] = "attachment; filename=calendar.ics"
  218. return response
  219. def event_get_calendar_events(request):
  220. """
  221. JSON list of events compatible with fullcalendar.js
  222. """
  223. # Parse input.
  224. try:
  225. page_id = request.GET["pid"]
  226. except KeyError:
  227. return HttpResponse("pid required", status=400)
  228. start = None
  229. end = None
  230. start_str = request.GET.get("start", None)
  231. end_str = request.GET.get("end", None)
  232. try:
  233. if start_str:
  234. start = timezone.make_aware(
  235. datetime.strptime(start_str[:10], "%Y-%m-%d"),
  236. )
  237. if end_str:
  238. end = timezone.make_aware(
  239. datetime.strptime(end_str[:10], "%Y-%m-%d"),
  240. )
  241. except ValueError:
  242. return HttpResponse(
  243. "start and end must be valid datetimes.", status=400
  244. )
  245. # Get the page.
  246. try:
  247. page = CoderedPage.objects.get(pk=page_id).specific
  248. except (CoderedPage.DoesNotExist, ValueError):
  249. raise Http404("Page does not exist")
  250. return JsonResponse(
  251. page.get_calendar_events(start=start, end=end), safe=False
  252. )
  253. @login_required
  254. @permission_required(
  255. "wagtailadmin.access_admin",
  256. login_url="wagtailadmin_login",
  257. )
  258. def import_index(request):
  259. """
  260. Landing page to replace wagtailimportexport.
  261. """
  262. return render(request, "wagtailimportexport/index.html")
  263. @login_required
  264. @permission_required(
  265. "wagtailadmin.access_admin",
  266. login_url="wagtailadmin_login",
  267. )
  268. def import_pages_from_csv_file(request):
  269. """
  270. Overwrite of the `import_pages` view from wagtailimportexport. By default, the `import_pages`
  271. view expects a json file to be uploaded. This view converts the uploaded csv into the json
  272. format that the importer expects.
  273. """
  274. if request.method == "POST":
  275. form = ImportPagesFromCSVFileForm(request.POST, request.FILES)
  276. if form.is_valid():
  277. import_data = convert_csv_to_json(
  278. form.cleaned_data["file"].read().decode("utf-8").splitlines(),
  279. form.cleaned_data["page_type"],
  280. )
  281. parent_page = form.cleaned_data["parent_page"]
  282. try:
  283. page_count = import_pages(import_data, parent_page)
  284. except LookupError as e:
  285. messages.error(
  286. request, _("Import failed: %(reason)s") % {"reason": e}
  287. )
  288. else:
  289. messages.success(
  290. request,
  291. ngettext(
  292. "%(count)s page imported.",
  293. "%(count)s pages imported.",
  294. page_count,
  295. )
  296. % {"count": page_count},
  297. )
  298. return redirect("wagtailadmin_explore", parent_page.pk)
  299. else:
  300. form = ImportPagesFromCSVFileForm()
  301. return render(
  302. request,
  303. "wagtailimportexport/import_from_csv.html",
  304. {
  305. "form": form,
  306. },
  307. )