test_workflow.py 14 KB


  1. import datetime
  2. import pytz
  3. from django.contrib.auth import get_user_model
  4. from django.contrib.auth.models import Group
  5. from django.core.exceptions import ValidationError
  6. from django.db.utils import IntegrityError
  7. from django.test import TestCase, override_settings
  8. from freezegun import freeze_time
  9. from wagtail.core.models import (
  10. GroupApprovalTask, Page, Task, TaskState, Workflow, WorkflowPage, WorkflowState, WorkflowTask)
  11. from wagtail.tests.testapp.models import SimplePage
  12. class TestWorkflows(TestCase):
  13. fixtures = ['test.json']
  14. def create_workflow_and_tasks(self):
  15. workflow = Workflow.objects.create(name='test_workflow')
  16. task_1 = Task.objects.create(name='test_task_1')
  17. task_2 = Task.objects.create(name='test_task_2')
  18. WorkflowTask.objects.create(workflow=workflow, task=task_1, sort_order=1)
  19. WorkflowTask.objects.create(workflow=workflow, task=task_2, sort_order=2)
  20. return workflow, task_1, task_2
  21. def start_workflow_on_homepage(self):
  22. workflow, task_1, task_2 = self.create_workflow_and_tasks()
  23. homepage = Page.objects.get(url_path='/home/')
  24. homepage.save_revision()
  25. user = get_user_model().objects.first()
  26. workflow_state = workflow.start(homepage, user)
  27. return {'workflow_state': workflow_state, 'user': user, 'page': homepage, 'task_1': task_1, 'task_2': task_2, 'workflow': workflow}
  28. def test_create_workflow(self):
  29. # test creating and retrieving an empty Workflow from the db
  30. test_workflow = Workflow(name='test_workflow')
  31. test_workflow.save()
  32. retrieved_workflow = Workflow.objects.get(id=test_workflow.id)
  33. self.assertEqual(retrieved_workflow.name, 'test_workflow')
  34. def test_create_task(self):
  35. # test creating and retrieving a base Task from the db
  36. test_task = Task(name='test_task')
  37. test_task.save()
  38. retrieved_task = Task.objects.get(id=test_task.id)
  39. self.assertEqual(retrieved_task.name, 'test_task')
  40. def test_add_task_to_workflow(self):
  41. workflow = Workflow.objects.create(name='test_workflow')
  42. task = Task.objects.create(name='test_task')
  43. WorkflowTask.objects.create(workflow=workflow, task=task, sort_order=1)
  44. self.assertIn(task, Task.objects.filter(workflow_tasks__workflow=workflow))
  45. self.assertIn(workflow, Workflow.objects.filter(workflow_tasks__task=task))
  46. def test_add_workflow_to_page(self):
  47. # test adding a Workflow to a Page via WorkflowPage
  48. workflow = Workflow.objects.create(name='test_workflow')
  49. homepage = Page.objects.get(url_path='/home/')
  50. WorkflowPage.objects.create(page=homepage, workflow=workflow)
  51. homepage.refresh_from_db()
  52. self.assertEqual(homepage.workflowpage.workflow, workflow)
  53. def test_get_specific_task(self):
  54. # test ability to get instance of subclassed Task type using Task.specific
  55. group_approval_task = GroupApprovalTask.objects.create(name='test_group_approval')
  56. group_approval_task.groups.set(Group.objects.all())
  57. task = Task.objects.get(name='test_group_approval')
  58. specific_task = task.specific
  59. self.assertIsInstance(specific_task, GroupApprovalTask)
  60. def test_get_workflow_from_parent(self):
  61. # test ability to use Page.get_workflow() to retrieve a Workflow from a parent Page if none is set directly
  62. workflow = Workflow.objects.create(name='test_workflow')
  63. homepage = Page.objects.get(url_path='/home/')
  64. WorkflowPage.objects.create(page=homepage, workflow=workflow)
  65. hello_page = SimplePage(title="Hello world", slug='hello-world', content="hello")
  66. homepage.add_child(instance=hello_page)
  67. self.assertEqual(hello_page.get_workflow(), workflow)
  68. self.assertTrue(workflow.all_pages().filter(id=hello_page.id).exists())
  69. def test_get_workflow_from_closest_ancestor(self):
  70. # test that using Page.get_workflow() tries to get the workflow from itself, then the closest ancestor, and does
  71. # not get Workflows from further up the page tree first
  72. workflow_1 = Workflow.objects.create(name='test_workflow_1')
  73. workflow_2 = Workflow.objects.create(name='test_workflow_2')
  74. homepage = Page.objects.get(url_path='/home/')
  75. WorkflowPage.objects.create(page=homepage, workflow=workflow_1)
  76. hello_page = SimplePage(title="Hello world", slug='hello-world', content="hello")
  77. homepage.add_child(instance=hello_page)
  78. WorkflowPage.objects.create(page=hello_page, workflow=workflow_2)
  79. goodbye_page = SimplePage(title="Goodbye world", slug='goodbye-world', content="goodbye")
  80. hello_page.add_child(instance=goodbye_page)
  81. self.assertEqual(hello_page.get_workflow(), workflow_2)
  82. self.assertEqual(goodbye_page.get_workflow(), workflow_2)
  83. # Check the .all_pages() method
  84. self.assertFalse(workflow_1.all_pages().filter(id=hello_page.id).exists())
  85. self.assertFalse(workflow_1.all_pages().filter(id=goodbye_page.id).exists())
  86. self.assertTrue(workflow_2.all_pages().filter(id=hello_page.id).exists())
  87. self.assertTrue(workflow_2.all_pages().filter(id=goodbye_page.id).exists())
  88. @freeze_time("2017-01-01 12:00:00")
  89. def test_start_workflow_on_page(self):
  90. # test the first WorkflowState and TaskState models are set up correctly when Workflow.start(page) is used.
  91. workflow, task_1, task_2 = self.create_workflow_and_tasks()
  92. homepage = Page.objects.get(url_path='/home/')
  93. homepage.save_revision()
  94. user = get_user_model().objects.first()
  95. workflow_state = workflow.start(homepage, user)
  96. self.assertEqual(workflow_state.workflow, workflow)
  97. self.assertEqual(workflow_state.page, homepage)
  98. self.assertEqual(workflow_state.status, 'in_progress')
  99. self.assertEqual(workflow_state.created_at, datetime.datetime(2017, 1, 1, 12, 0, 0, tzinfo=pytz.utc))
  100. self.assertEqual(workflow_state.requested_by, user)
  101. task_state = workflow_state.current_task_state
  102. self.assertEqual(task_state.task, task_1)
  103. self.assertEqual(task_state.status, 'in_progress')
  104. self.assertEqual(task_state.page_revision, homepage.get_latest_revision())
  105. self.assertEqual(task_state.started_at, datetime.datetime(2017, 1, 1, 12, 0, 0, tzinfo=pytz.utc))
  106. self.assertEqual(task_state.finished_at, None)
  107. def test_error_when_starting_multiple_in_progress_workflows(self):
  108. # test trying to start multiple status='in_progress' workflows on a single page will trigger an IntegrityError
  109. self.start_workflow_on_homepage()
  110. with self.assertRaises((IntegrityError, ValidationError)):
  111. self.start_workflow_on_homepage()
  112. @freeze_time("2017-01-01 12:00:00")
  113. def test_approve_workflow(self):
  114. # tests that approving both TaskStates in a Workflow via Task.on_action approves tasks and publishes the revision correctly
  115. data = self.start_workflow_on_homepage()
  116. workflow_state = data['workflow_state']
  117. task_2 = data['task_2']
  118. page = data['page']
  119. task_state = workflow_state.current_task_state
  120. task_state.task.on_action(task_state, user=None, action_name='approve')
  121. self.assertEqual(task_state.finished_at, datetime.datetime(2017, 1, 1, 12, 0, 0, tzinfo=pytz.utc))
  122. self.assertEqual(task_state.status, 'approved')
  123. self.assertEqual(workflow_state.current_task_state.task, task_2)
  124. task_2.on_action(workflow_state.current_task_state, user=None, action_name='approve')
  125. self.assertEqual(workflow_state.status, 'approved')
  126. page.refresh_from_db()
  127. self.assertEqual(page.live_revision, workflow_state.current_task_state.page_revision)
  128. @override_settings(WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=True)
  129. def test_workflow_resets_when_new_revision_created(self):
  130. # test that a Workflow on its second Task returns to its first task (upon WorkflowState.update()) if a new revision is created
  131. data = self.start_workflow_on_homepage()
  132. workflow_state = data['workflow_state']
  133. task_1 = data['task_1']
  134. task_2 = data['task_2']
  135. page = data['page']
  136. task_state = workflow_state.current_task_state
  137. task_state.task.on_action(task_state, user=None, action_name='approve')
  138. self.assertEqual(workflow_state.current_task_state.task, task_2)
  139. page.save_revision()
  140. workflow_state.refresh_from_db()
  141. task_state = workflow_state.current_task_state
  142. task_state.task.on_action(task_state, user=None, action_name='approve')
  143. workflow_state.refresh_from_db()
  144. task_state = workflow_state.current_task_state
  145. self.assertEqual(task_state.task, task_1)
  146. @override_settings(WAGTAIL_WORKFLOW_REQUIRE_REAPPROVAL_ON_EDIT=False)
  147. def test_workflow_does_not_reset_when_new_revision_created_if_reapproval_turned_off(self):
  148. # test that a Workflow on its second Task does not return to its first task (upon approval) if a new revision is created
  149. data = self.start_workflow_on_homepage()
  150. workflow_state = data['workflow_state']
  151. task_1 = data['task_1']
  152. task_2 = data['task_2']
  153. page = data['page']
  154. task_state = workflow_state.current_task_state
  155. task_state.task.on_action(task_state, user=None, action_name='approve')
  156. self.assertEqual(workflow_state.current_task_state.task, task_2)
  157. page.save_revision()
  158. workflow_state.refresh_from_db()
  159. task_state = workflow_state.current_task_state
  160. task_state.task.on_action(task_state, user=None, action_name='approve')
  161. workflow_state.refresh_from_db()
  162. task_state = workflow_state.current_task_state
  163. self.assertNotEqual(task_state.task, task_1)
  164. self.assertEqual(workflow_state.status, workflow_state.STATUS_APPROVED)
  165. def test_reject_workflow(self):
  166. # test that TaskState is marked as rejected upon Task.on_action with action=reject
  167. # and the WorkflowState as needs changes
  168. data = self.start_workflow_on_homepage()
  169. workflow_state = data['workflow_state']
  170. task_state = workflow_state.current_task_state
  171. task_state.task.on_action(task_state, user=None, action_name='reject')
  172. self.assertEqual(task_state.status, task_state.STATUS_REJECTED)
  173. self.assertEqual(workflow_state.status, workflow_state.STATUS_NEEDS_CHANGES)
  174. def test_resume_workflow(self):
  175. # test that a Workflow rejected on its second Task can be resumed on the second task
  176. data = self.start_workflow_on_homepage()
  177. workflow_state = data['workflow_state']
  178. task_2 = data['task_2']
  179. workflow_state.current_task_state.approve(user=None)
  180. workflow_state.refresh_from_db()
  181. workflow_state.current_task_state.reject(user=None)
  182. workflow_state.refresh_from_db()
  183. workflow_state.resume(user=None)
  184. self.assertEqual(workflow_state.status, workflow_state.STATUS_IN_PROGRESS)
  185. self.assertEqual(workflow_state.current_task_state.status, workflow_state.current_task_state.STATUS_IN_PROGRESS)
  186. self.assertEqual(workflow_state.current_task_state.task, task_2)
  187. self.assertTrue(workflow_state.is_active)
  188. def test_tasks_with_status_on_resubmission(self):
  189. # test that a Workflow rejected and resumed shows the status of the latest tasks when _`all_tasks_with_status` is called
  190. data = self.start_workflow_on_homepage()
  191. workflow_state = data['workflow_state']
  192. tasks = workflow_state.all_tasks_with_status()
  193. self.assertEqual(tasks[0].status, TaskState.STATUS_IN_PROGRESS)
  194. self.assertEqual(tasks[1].status_display, 'Not started')
  195. workflow_state.current_task_state.approve(user=None)
  196. workflow_state.refresh_from_db()
  197. workflow_state.current_task_state.reject(user=None)
  198. workflow_state.refresh_from_db()
  199. tasks = workflow_state.all_tasks_with_status()
  200. self.assertEqual(tasks[0].status, TaskState.STATUS_APPROVED)
  201. self.assertEqual(tasks[1].status, TaskState.STATUS_REJECTED)
  202. workflow_state.resume(user=None)
  203. tasks = workflow_state.all_tasks_with_status()
  204. self.assertEqual(tasks[0].status, TaskState.STATUS_APPROVED)
  205. self.assertEqual(tasks[1].status, TaskState.STATUS_IN_PROGRESS)
  206. def cancel_workflow(self):
  207. # test that cancelling a workflow state sets both current task state and its own statuses to cancelled, and cancels all in progress states
  208. data = self.start_workflow_on_homepage()
  209. workflow_state = data['workflow_state']
  210. workflow_state.cancel(user=None)
  211. self.assertEqual(workflow_state.status, WorkflowState.STATUS_CANCELLED)
  212. self.assertEqual(workflow_state.current_task_state.status, TaskState.STATUS_CANCELLED)
  213. self.assertFalse(TaskState.objects.filter(workflow_state=workflow_state, status=TaskState.STATUS_IN_PROGRESS).exists())
  214. self.assertFalse(workflow_state.is_active)
  215. def test_task_workflows(self):
  216. workflow = Workflow.objects.create(name='test_workflow')
  217. disabled_workflow = Workflow.objects.create(name="disabled_workflow", active=False)
  218. task = Task.objects.create(name='test_task')
  219. WorkflowTask.objects.create(workflow=workflow, task=task, sort_order=1)
  220. WorkflowTask.objects.create(workflow=disabled_workflow, task=task, sort_order=1)
  221. self.assertEqual(
  222. list(task.workflows), [workflow, disabled_workflow]
  223. )
  224. self.assertEqual(
  225. list(task.active_workflows), [workflow]
  226. )
  227. def test_is_at_final_task(self):
  228. # test that a Workflow rejected on its second Task can be resumed on the second task
  229. data = self.start_workflow_on_homepage()
  230. workflow_state = data['workflow_state']
  231. self.assertFalse(workflow_state.is_at_final_task)
  232. workflow_state.current_task_state.approve(user=None)
  233. workflow_state.refresh_from_db()
  234. self.assertTrue(workflow_state.is_at_final_task)