base.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. from django.contrib.messages import constants, get_level, set_level, utils
  2. from django.contrib.messages.api import MessageFailure
  3. from django.contrib.messages.constants import DEFAULT_LEVELS
  4. from django.contrib.messages.storage import base, default_storage
  5. from django.contrib.messages.storage.base import Message
  6. from django.http import HttpRequest, HttpResponse
  7. from django.test import modify_settings, override_settings
  8. from django.urls import reverse
  9. from django.utils.translation import gettext_lazy
  10. def add_level_messages(storage):
  11. """
  12. Add 6 messages from different levels (including a custom one) to a storage
  13. instance.
  14. """
  15. storage.add(constants.INFO, 'A generic info message')
  16. storage.add(29, 'Some custom level')
  17. storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag')
  18. storage.add(constants.WARNING, 'A warning')
  19. storage.add(constants.ERROR, 'An error')
  20. storage.add(constants.SUCCESS, 'This was a triumph.')
  21. class override_settings_tags(override_settings):
  22. def enable(self):
  23. super().enable()
  24. # LEVEL_TAGS is a constant defined in the
  25. # django.contrib.messages.storage.base module, so after changing
  26. # settings.MESSAGE_TAGS, update that constant also.
  27. self.old_level_tags = base.LEVEL_TAGS
  28. base.LEVEL_TAGS = utils.get_level_tags()
  29. def disable(self):
  30. super().disable()
  31. base.LEVEL_TAGS = self.old_level_tags
  32. class BaseTests:
  33. storage_class = default_storage
  34. levels = {
  35. 'debug': constants.DEBUG,
  36. 'info': constants.INFO,
  37. 'success': constants.SUCCESS,
  38. 'warning': constants.WARNING,
  39. 'error': constants.ERROR,
  40. }
  41. def setUp(self):
  42. self.settings_override = override_settings_tags(
  43. TEMPLATES=[{
  44. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  45. 'DIRS': [],
  46. 'APP_DIRS': True,
  47. 'OPTIONS': {
  48. 'context_processors': (
  49. 'django.contrib.auth.context_processors.auth',
  50. 'django.contrib.messages.context_processors.messages',
  51. ),
  52. },
  53. }],
  54. ROOT_URLCONF='messages_tests.urls',
  55. MESSAGE_TAGS='',
  56. MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__, self.storage_class.__name__),
  57. SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer',
  58. )
  59. self.settings_override.enable()
  60. def tearDown(self):
  61. self.settings_override.disable()
  62. def get_request(self):
  63. return HttpRequest()
  64. def get_response(self):
  65. return HttpResponse()
  66. def get_storage(self, data=None):
  67. """
  68. Return the storage backend, setting its loaded data to the ``data``
  69. argument.
  70. This method avoids the storage ``_get`` method from getting called so
  71. that other parts of the storage backend can be tested independent of
  72. the message retrieval logic.
  73. """
  74. storage = self.storage_class(self.get_request())
  75. storage._loaded_data = data or []
  76. return storage
  77. def test_add(self):
  78. storage = self.get_storage()
  79. self.assertFalse(storage.added_new)
  80. storage.add(constants.INFO, 'Test message 1')
  81. self.assertTrue(storage.added_new)
  82. storage.add(constants.INFO, 'Test message 2', extra_tags='tag')
  83. self.assertEqual(len(storage), 2)
  84. def test_add_lazy_translation(self):
  85. storage = self.get_storage()
  86. response = self.get_response()
  87. storage.add(constants.INFO, gettext_lazy('lazy message'))
  88. storage.update(response)
  89. storing = self.stored_messages_count(storage, response)
  90. self.assertEqual(storing, 1)
  91. def test_no_update(self):
  92. storage = self.get_storage()
  93. response = self.get_response()
  94. storage.update(response)
  95. storing = self.stored_messages_count(storage, response)
  96. self.assertEqual(storing, 0)
  97. def test_add_update(self):
  98. storage = self.get_storage()
  99. response = self.get_response()
  100. storage.add(constants.INFO, 'Test message 1')
  101. storage.add(constants.INFO, 'Test message 1', extra_tags='tag')
  102. storage.update(response)
  103. storing = self.stored_messages_count(storage, response)
  104. self.assertEqual(storing, 2)
  105. def test_existing_add_read_update(self):
  106. storage = self.get_existing_storage()
  107. response = self.get_response()
  108. storage.add(constants.INFO, 'Test message 3')
  109. list(storage) # Simulates a read
  110. storage.update(response)
  111. storing = self.stored_messages_count(storage, response)
  112. self.assertEqual(storing, 0)
  113. def test_existing_read_add_update(self):
  114. storage = self.get_existing_storage()
  115. response = self.get_response()
  116. list(storage) # Simulates a read
  117. storage.add(constants.INFO, 'Test message 3')
  118. storage.update(response)
  119. storing = self.stored_messages_count(storage, response)
  120. self.assertEqual(storing, 1)
  121. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  122. def test_full_request_response_cycle(self):
  123. """
  124. With the message middleware enabled, messages are properly stored and
  125. retrieved across the full request/redirect/response cycle.
  126. """
  127. data = {
  128. 'messages': ['Test message %d' % x for x in range(5)],
  129. }
  130. show_url = reverse('show_message')
  131. for level in ('debug', 'info', 'success', 'warning', 'error'):
  132. add_url = reverse('add_message', args=(level,))
  133. response = self.client.post(add_url, data, follow=True)
  134. self.assertRedirects(response, show_url)
  135. self.assertIn('messages', response.context)
  136. messages = [Message(self.levels[level], msg) for msg in data['messages']]
  137. self.assertEqual(list(response.context['messages']), messages)
  138. for msg in data['messages']:
  139. self.assertContains(response, msg)
  140. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  141. def test_with_template_response(self):
  142. data = {
  143. 'messages': ['Test message %d' % x for x in range(5)],
  144. }
  145. show_url = reverse('show_template_response')
  146. for level in self.levels:
  147. add_url = reverse('add_template_response', args=(level,))
  148. response = self.client.post(add_url, data, follow=True)
  149. self.assertRedirects(response, show_url)
  150. self.assertIn('messages', response.context)
  151. for msg in data['messages']:
  152. self.assertContains(response, msg)
  153. # there shouldn't be any messages on second GET request
  154. response = self.client.get(show_url)
  155. for msg in data['messages']:
  156. self.assertNotContains(response, msg)
  157. def test_context_processor_message_levels(self):
  158. show_url = reverse('show_template_response')
  159. response = self.client.get(show_url)
  160. self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context)
  161. self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS)
  162. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  163. def test_multiple_posts(self):
  164. """
  165. Messages persist properly when multiple POSTs are made before a GET.
  166. """
  167. data = {
  168. 'messages': ['Test message %d' % x for x in range(5)],
  169. }
  170. show_url = reverse('show_message')
  171. messages = []
  172. for level in ('debug', 'info', 'success', 'warning', 'error'):
  173. messages.extend(Message(self.levels[level], msg) for msg in data['messages'])
  174. add_url = reverse('add_message', args=(level,))
  175. self.client.post(add_url, data)
  176. response = self.client.get(show_url)
  177. self.assertIn('messages', response.context)
  178. self.assertEqual(list(response.context['messages']), messages)
  179. for msg in data['messages']:
  180. self.assertContains(response, msg)
  181. @modify_settings(
  182. INSTALLED_APPS={'remove': 'django.contrib.messages'},
  183. MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
  184. )
  185. @override_settings(
  186. MESSAGE_LEVEL=constants.DEBUG,
  187. TEMPLATES=[{
  188. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  189. 'DIRS': [],
  190. 'APP_DIRS': True,
  191. }],
  192. )
  193. def test_middleware_disabled(self):
  194. """
  195. When the middleware is disabled, an exception is raised when one
  196. attempts to store a message.
  197. """
  198. data = {
  199. 'messages': ['Test message %d' % x for x in range(5)],
  200. }
  201. reverse('show_message')
  202. for level in ('debug', 'info', 'success', 'warning', 'error'):
  203. add_url = reverse('add_message', args=(level,))
  204. with self.assertRaises(MessageFailure):
  205. self.client.post(add_url, data, follow=True)
  206. @modify_settings(
  207. INSTALLED_APPS={'remove': 'django.contrib.messages'},
  208. MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
  209. )
  210. @override_settings(
  211. TEMPLATES=[{
  212. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  213. 'DIRS': [],
  214. 'APP_DIRS': True,
  215. }],
  216. )
  217. def test_middleware_disabled_fail_silently(self):
  218. """
  219. When the middleware is disabled, an exception is not raised
  220. if 'fail_silently' = True
  221. """
  222. data = {
  223. 'messages': ['Test message %d' % x for x in range(5)],
  224. 'fail_silently': True,
  225. }
  226. show_url = reverse('show_message')
  227. for level in ('debug', 'info', 'success', 'warning', 'error'):
  228. add_url = reverse('add_message', args=(level,))
  229. response = self.client.post(add_url, data, follow=True)
  230. self.assertRedirects(response, show_url)
  231. self.assertNotIn('messages', response.context)
  232. def stored_messages_count(self, storage, response):
  233. """
  234. Return the number of messages being stored after a
  235. ``storage.update()`` call.
  236. """
  237. raise NotImplementedError('This method must be set by a subclass.')
  238. def test_get(self):
  239. raise NotImplementedError('This method must be set by a subclass.')
  240. def get_existing_storage(self):
  241. return self.get_storage([
  242. Message(constants.INFO, 'Test message 1'),
  243. Message(constants.INFO, 'Test message 2', extra_tags='tag'),
  244. ])
  245. def test_existing_read(self):
  246. """
  247. Reading the existing storage doesn't cause the data to be lost.
  248. """
  249. storage = self.get_existing_storage()
  250. self.assertFalse(storage.used)
  251. # After iterating the storage engine directly, the used flag is set.
  252. data = list(storage)
  253. self.assertTrue(storage.used)
  254. # The data does not disappear because it has been iterated.
  255. self.assertEqual(data, list(storage))
  256. def test_existing_add(self):
  257. storage = self.get_existing_storage()
  258. self.assertFalse(storage.added_new)
  259. storage.add(constants.INFO, 'Test message 3')
  260. self.assertTrue(storage.added_new)
  261. def test_default_level(self):
  262. # get_level works even with no storage on the request.
  263. request = self.get_request()
  264. self.assertEqual(get_level(request), constants.INFO)
  265. # get_level returns the default level if it hasn't been set.
  266. storage = self.get_storage()
  267. request._messages = storage
  268. self.assertEqual(get_level(request), constants.INFO)
  269. # Only messages of sufficient level get recorded.
  270. add_level_messages(storage)
  271. self.assertEqual(len(storage), 5)
  272. def test_low_level(self):
  273. request = self.get_request()
  274. storage = self.storage_class(request)
  275. request._messages = storage
  276. self.assertTrue(set_level(request, 5))
  277. self.assertEqual(get_level(request), 5)
  278. add_level_messages(storage)
  279. self.assertEqual(len(storage), 6)
  280. def test_high_level(self):
  281. request = self.get_request()
  282. storage = self.storage_class(request)
  283. request._messages = storage
  284. self.assertTrue(set_level(request, 30))
  285. self.assertEqual(get_level(request), 30)
  286. add_level_messages(storage)
  287. self.assertEqual(len(storage), 2)
  288. @override_settings(MESSAGE_LEVEL=29)
  289. def test_settings_level(self):
  290. request = self.get_request()
  291. storage = self.storage_class(request)
  292. self.assertEqual(get_level(request), 29)
  293. add_level_messages(storage)
  294. self.assertEqual(len(storage), 3)
  295. def test_tags(self):
  296. storage = self.get_storage()
  297. storage.level = 0
  298. add_level_messages(storage)
  299. storage.add(constants.INFO, 'A generic info message', extra_tags=None)
  300. tags = [msg.tags for msg in storage]
  301. self.assertEqual(tags, ['info', '', 'extra-tag debug', 'warning', 'error', 'success', 'info'])
  302. def test_level_tag(self):
  303. storage = self.get_storage()
  304. storage.level = 0
  305. add_level_messages(storage)
  306. tags = [msg.level_tag for msg in storage]
  307. self.assertEqual(tags, ['info', '', 'debug', 'warning', 'error', 'success'])
  308. @override_settings_tags(MESSAGE_TAGS={
  309. constants.INFO: 'info',
  310. constants.DEBUG: '',
  311. constants.WARNING: '',
  312. constants.ERROR: 'bad',
  313. 29: 'custom',
  314. })
  315. def test_custom_tags(self):
  316. storage = self.get_storage()
  317. storage.level = 0
  318. add_level_messages(storage)
  319. tags = [msg.tags for msg in storage]
  320. self.assertEqual(tags, ['info', 'custom', 'extra-tag', '', 'bad', 'success'])