base.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. from django.contrib.messages import Message, constants, get_level, set_level
  2. from django.contrib.messages.api import MessageFailure
  3. from django.contrib.messages.constants import DEFAULT_LEVELS
  4. from django.contrib.messages.storage import default_storage
  5. from django.http import HttpRequest, HttpResponse
  6. from django.test import modify_settings, override_settings
  7. from django.urls import reverse
  8. from django.utils.translation import gettext_lazy
  9. def add_level_messages(storage):
  10. """
  11. Add 6 messages from different levels (including a custom one) to a storage
  12. instance.
  13. """
  14. storage.add(constants.INFO, "A generic info message")
  15. storage.add(29, "Some custom level")
  16. storage.add(constants.DEBUG, "A debugging message", extra_tags="extra-tag")
  17. storage.add(constants.WARNING, "A warning")
  18. storage.add(constants.ERROR, "An error")
  19. storage.add(constants.SUCCESS, "This was a triumph.")
  20. class BaseTests:
  21. storage_class = default_storage
  22. levels = {
  23. "debug": constants.DEBUG,
  24. "info": constants.INFO,
  25. "success": constants.SUCCESS,
  26. "warning": constants.WARNING,
  27. "error": constants.ERROR,
  28. }
  29. def setUp(self):
  30. self.settings_override = override_settings(
  31. TEMPLATES=[
  32. {
  33. "BACKEND": "django.template.backends.django.DjangoTemplates",
  34. "DIRS": [],
  35. "APP_DIRS": True,
  36. "OPTIONS": {
  37. "context_processors": (
  38. "django.contrib.auth.context_processors.auth",
  39. "django.contrib.messages.context_processors.messages",
  40. ),
  41. },
  42. }
  43. ],
  44. ROOT_URLCONF="messages_tests.urls",
  45. MESSAGE_TAGS={},
  46. MESSAGE_STORAGE="%s.%s"
  47. % (self.storage_class.__module__, self.storage_class.__name__),
  48. SESSION_SERIALIZER="django.contrib.sessions.serializers.JSONSerializer",
  49. )
  50. self.settings_override.enable()
  51. def tearDown(self):
  52. self.settings_override.disable()
  53. def get_request(self):
  54. return HttpRequest()
  55. def get_response(self):
  56. return HttpResponse()
  57. def get_storage(self, data=None):
  58. """
  59. Return the storage backend, setting its loaded data to the ``data``
  60. argument.
  61. This method avoids the storage ``_get`` method from getting called so
  62. that other parts of the storage backend can be tested independent of
  63. the message retrieval logic.
  64. """
  65. storage = self.storage_class(self.get_request())
  66. storage._loaded_data = data or []
  67. return storage
  68. def test_repr(self):
  69. request = self.get_request()
  70. storage = self.storage_class(request)
  71. self.assertEqual(
  72. repr(storage),
  73. f"<{self.storage_class.__qualname__}: request=<HttpRequest>>",
  74. )
  75. def test_add(self):
  76. storage = self.get_storage()
  77. self.assertFalse(storage.added_new)
  78. storage.add(constants.INFO, "Test message 1")
  79. self.assertTrue(storage.added_new)
  80. storage.add(constants.INFO, "Test message 2", extra_tags="tag")
  81. self.assertEqual(len(storage), 2)
  82. def test_add_lazy_translation(self):
  83. storage = self.get_storage()
  84. response = self.get_response()
  85. storage.add(constants.INFO, gettext_lazy("lazy message"))
  86. storage.update(response)
  87. storing = self.stored_messages_count(storage, response)
  88. self.assertEqual(storing, 1)
  89. def test_no_update(self):
  90. storage = self.get_storage()
  91. response = self.get_response()
  92. storage.update(response)
  93. storing = self.stored_messages_count(storage, response)
  94. self.assertEqual(storing, 0)
  95. def test_add_update(self):
  96. storage = self.get_storage()
  97. response = self.get_response()
  98. storage.add(constants.INFO, "Test message 1")
  99. storage.add(constants.INFO, "Test message 1", extra_tags="tag")
  100. storage.update(response)
  101. storing = self.stored_messages_count(storage, response)
  102. self.assertEqual(storing, 2)
  103. def test_existing_add_read_update(self):
  104. storage = self.get_existing_storage()
  105. response = self.get_response()
  106. storage.add(constants.INFO, "Test message 3")
  107. list(storage) # Simulates a read
  108. storage.update(response)
  109. storing = self.stored_messages_count(storage, response)
  110. self.assertEqual(storing, 0)
  111. def test_existing_read_add_update(self):
  112. storage = self.get_existing_storage()
  113. response = self.get_response()
  114. list(storage) # Simulates a read
  115. storage.add(constants.INFO, "Test message 3")
  116. storage.update(response)
  117. storing = self.stored_messages_count(storage, response)
  118. self.assertEqual(storing, 1)
  119. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  120. def test_full_request_response_cycle(self):
  121. """
  122. With the message middleware enabled, messages are properly stored and
  123. retrieved across the full request/redirect/response cycle.
  124. """
  125. data = {
  126. "messages": ["Test message %d" % x for x in range(5)],
  127. }
  128. show_url = reverse("show_message")
  129. for level in ("debug", "info", "success", "warning", "error"):
  130. add_url = reverse("add_message", args=(level,))
  131. response = self.client.post(add_url, data, follow=True)
  132. self.assertRedirects(response, show_url)
  133. self.assertIn("messages", response.context)
  134. messages = [Message(self.levels[level], msg) for msg in data["messages"]]
  135. self.assertEqual(list(response.context["messages"]), messages)
  136. for msg in data["messages"]:
  137. self.assertContains(response, msg)
  138. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  139. def test_with_template_response(self):
  140. data = {
  141. "messages": ["Test message %d" % x for x in range(5)],
  142. }
  143. show_url = reverse("show_template_response")
  144. for level in self.levels:
  145. add_url = reverse("add_template_response", args=(level,))
  146. response = self.client.post(add_url, data, follow=True)
  147. self.assertRedirects(response, show_url)
  148. self.assertIn("messages", response.context)
  149. for msg in data["messages"]:
  150. self.assertContains(response, msg)
  151. # there shouldn't be any messages on second GET request
  152. response = self.client.get(show_url)
  153. for msg in data["messages"]:
  154. self.assertNotContains(response, msg)
  155. def test_context_processor_message_levels(self):
  156. show_url = reverse("show_template_response")
  157. response = self.client.get(show_url)
  158. self.assertIn("DEFAULT_MESSAGE_LEVELS", response.context)
  159. self.assertEqual(response.context["DEFAULT_MESSAGE_LEVELS"], DEFAULT_LEVELS)
  160. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  161. def test_multiple_posts(self):
  162. """
  163. Messages persist properly when multiple POSTs are made before a GET.
  164. """
  165. data = {
  166. "messages": ["Test message %d" % x for x in range(5)],
  167. }
  168. show_url = reverse("show_message")
  169. messages = []
  170. for level in ("debug", "info", "success", "warning", "error"):
  171. messages.extend(
  172. Message(self.levels[level], msg) for msg in data["messages"]
  173. )
  174. add_url = reverse("add_message", args=(level,))
  175. self.client.post(add_url, data)
  176. response = self.client.get(show_url)
  177. self.assertIn("messages", response.context)
  178. self.assertEqual(list(response.context["messages"]), messages)
  179. for msg in data["messages"]:
  180. self.assertContains(response, msg)
  181. @modify_settings(
  182. INSTALLED_APPS={"remove": "django.contrib.messages"},
  183. MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
  184. )
  185. @override_settings(
  186. MESSAGE_LEVEL=constants.DEBUG,
  187. TEMPLATES=[
  188. {
  189. "BACKEND": "django.template.backends.django.DjangoTemplates",
  190. "DIRS": [],
  191. "APP_DIRS": True,
  192. }
  193. ],
  194. )
  195. def test_middleware_disabled(self):
  196. """
  197. When the middleware is disabled, an exception is raised when one
  198. attempts to store a message.
  199. """
  200. data = {
  201. "messages": ["Test message %d" % x for x in range(5)],
  202. }
  203. reverse("show_message")
  204. for level in ("debug", "info", "success", "warning", "error"):
  205. add_url = reverse("add_message", args=(level,))
  206. with self.assertRaises(MessageFailure):
  207. self.client.post(add_url, data, follow=True)
  208. @modify_settings(
  209. INSTALLED_APPS={"remove": "django.contrib.messages"},
  210. MIDDLEWARE={"remove": "django.contrib.messages.middleware.MessageMiddleware"},
  211. )
  212. @override_settings(
  213. TEMPLATES=[
  214. {
  215. "BACKEND": "django.template.backends.django.DjangoTemplates",
  216. "DIRS": [],
  217. "APP_DIRS": True,
  218. }
  219. ],
  220. )
  221. def test_middleware_disabled_fail_silently(self):
  222. """
  223. When the middleware is disabled, an exception is not raised
  224. if 'fail_silently' is True.
  225. """
  226. data = {
  227. "messages": ["Test message %d" % x for x in range(5)],
  228. "fail_silently": True,
  229. }
  230. show_url = reverse("show_message")
  231. for level in ("debug", "info", "success", "warning", "error"):
  232. add_url = reverse("add_message", args=(level,))
  233. response = self.client.post(add_url, data, follow=True)
  234. self.assertRedirects(response, show_url)
  235. self.assertNotIn("messages", response.context)
  236. def stored_messages_count(self, storage, response):
  237. """
  238. Return the number of messages being stored after a
  239. ``storage.update()`` call.
  240. """
  241. raise NotImplementedError("This method must be set by a subclass.")
  242. def test_get(self):
  243. raise NotImplementedError("This method must be set by a subclass.")
  244. def get_existing_storage(self):
  245. return self.get_storage(
  246. [
  247. Message(constants.INFO, "Test message 1"),
  248. Message(constants.INFO, "Test message 2", extra_tags="tag"),
  249. ]
  250. )
  251. def test_existing_read(self):
  252. """
  253. Reading the existing storage doesn't cause the data to be lost.
  254. """
  255. storage = self.get_existing_storage()
  256. self.assertFalse(storage.used)
  257. # After iterating the storage engine directly, the used flag is set.
  258. data = list(storage)
  259. self.assertTrue(storage.used)
  260. # The data does not disappear because it has been iterated.
  261. self.assertEqual(data, list(storage))
  262. def test_existing_add(self):
  263. storage = self.get_existing_storage()
  264. self.assertFalse(storage.added_new)
  265. storage.add(constants.INFO, "Test message 3")
  266. self.assertTrue(storage.added_new)
  267. def test_default_level(self):
  268. # get_level works even with no storage on the request.
  269. request = self.get_request()
  270. self.assertEqual(get_level(request), constants.INFO)
  271. # get_level returns the default level if it hasn't been set.
  272. storage = self.get_storage()
  273. request._messages = storage
  274. self.assertEqual(get_level(request), constants.INFO)
  275. # Only messages of sufficient level get recorded.
  276. add_level_messages(storage)
  277. self.assertEqual(len(storage), 5)
  278. def test_low_level(self):
  279. request = self.get_request()
  280. storage = self.storage_class(request)
  281. request._messages = storage
  282. self.assertTrue(set_level(request, 5))
  283. self.assertEqual(get_level(request), 5)
  284. add_level_messages(storage)
  285. self.assertEqual(len(storage), 6)
  286. def test_high_level(self):
  287. request = self.get_request()
  288. storage = self.storage_class(request)
  289. request._messages = storage
  290. self.assertTrue(set_level(request, 30))
  291. self.assertEqual(get_level(request), 30)
  292. add_level_messages(storage)
  293. self.assertEqual(len(storage), 2)
  294. @override_settings(MESSAGE_LEVEL=29)
  295. def test_settings_level(self):
  296. request = self.get_request()
  297. storage = self.storage_class(request)
  298. self.assertEqual(get_level(request), 29)
  299. add_level_messages(storage)
  300. self.assertEqual(len(storage), 3)
  301. def test_tags(self):
  302. storage = self.get_storage()
  303. storage.level = 0
  304. add_level_messages(storage)
  305. storage.add(constants.INFO, "A generic info message", extra_tags=None)
  306. tags = [msg.tags for msg in storage]
  307. self.assertEqual(
  308. tags, ["info", "", "extra-tag debug", "warning", "error", "success", "info"]
  309. )
  310. def test_level_tag(self):
  311. storage = self.get_storage()
  312. storage.level = 0
  313. add_level_messages(storage)
  314. tags = [msg.level_tag for msg in storage]
  315. self.assertEqual(tags, ["info", "", "debug", "warning", "error", "success"])
  316. @override_settings(
  317. MESSAGE_TAGS={
  318. constants.INFO: "info",
  319. constants.DEBUG: "",
  320. constants.WARNING: "",
  321. constants.ERROR: "bad",
  322. 29: "custom",
  323. }
  324. )
  325. def test_custom_tags(self):
  326. storage = self.get_storage()
  327. storage.level = 0
  328. add_level_messages(storage)
  329. tags = [msg.tags for msg in storage]
  330. self.assertEqual(tags, ["info", "custom", "extra-tag", "", "bad", "success"])