base.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. from django import http
  2. from django.contrib.messages import constants, get_level, set_level, utils
  3. from django.contrib.messages.api import MessageFailure
  4. from django.contrib.messages.constants import DEFAULT_LEVELS
  5. from django.contrib.messages.storage import base, default_storage
  6. from django.contrib.messages.storage.base import Message
  7. from django.test import modify_settings, override_settings
  8. from django.urls import reverse
  9. from django.utils.translation import ugettext_lazy
  10. def add_level_messages(storage):
  11. """
  12. Adds 6 messages from different levels (including a custom one) to a storage
  13. instance.
  14. """
  15. storage.add(constants.INFO, 'A generic info message')
  16. storage.add(29, 'Some custom level')
  17. storage.add(constants.DEBUG, 'A debugging message', extra_tags='extra-tag')
  18. storage.add(constants.WARNING, 'A warning')
  19. storage.add(constants.ERROR, 'An error')
  20. storage.add(constants.SUCCESS, 'This was a triumph.')
  21. class override_settings_tags(override_settings):
  22. def enable(self):
  23. super(override_settings_tags, self).enable()
  24. # LEVEL_TAGS is a constant defined in the
  25. # django.contrib.messages.storage.base module, so after changing
  26. # settings.MESSAGE_TAGS, we need to update that constant too.
  27. self.old_level_tags = base.LEVEL_TAGS
  28. base.LEVEL_TAGS = utils.get_level_tags()
  29. def disable(self):
  30. super(override_settings_tags, self).disable()
  31. base.LEVEL_TAGS = self.old_level_tags
  32. class BaseTests(object):
  33. storage_class = default_storage
  34. levels = {
  35. 'debug': constants.DEBUG,
  36. 'info': constants.INFO,
  37. 'success': constants.SUCCESS,
  38. 'warning': constants.WARNING,
  39. 'error': constants.ERROR,
  40. }
  41. def setUp(self):
  42. self.settings_override = override_settings_tags(
  43. TEMPLATES=[{
  44. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  45. 'DIRS': [],
  46. 'APP_DIRS': True,
  47. 'OPTIONS': {
  48. 'context_processors': (
  49. 'django.contrib.auth.context_processors.auth',
  50. 'django.contrib.messages.context_processors.messages',
  51. ),
  52. },
  53. }],
  54. ROOT_URLCONF='messages_tests.urls',
  55. MESSAGE_TAGS='',
  56. MESSAGE_STORAGE='%s.%s' % (self.storage_class.__module__,
  57. self.storage_class.__name__),
  58. SESSION_SERIALIZER='django.contrib.sessions.serializers.JSONSerializer',
  59. )
  60. self.settings_override.enable()
  61. def tearDown(self):
  62. self.settings_override.disable()
  63. def get_request(self):
  64. return http.HttpRequest()
  65. def get_response(self):
  66. return http.HttpResponse()
  67. def get_storage(self, data=None):
  68. """
  69. Returns the storage backend, setting its loaded data to the ``data``
  70. argument.
  71. This method avoids the storage ``_get`` method from getting called so
  72. that other parts of the storage backend can be tested independent of
  73. the message retrieval logic.
  74. """
  75. storage = self.storage_class(self.get_request())
  76. storage._loaded_data = data or []
  77. return storage
  78. def test_add(self):
  79. storage = self.get_storage()
  80. self.assertFalse(storage.added_new)
  81. storage.add(constants.INFO, 'Test message 1')
  82. self.assertTrue(storage.added_new)
  83. storage.add(constants.INFO, 'Test message 2', extra_tags='tag')
  84. self.assertEqual(len(storage), 2)
  85. def test_add_lazy_translation(self):
  86. storage = self.get_storage()
  87. response = self.get_response()
  88. storage.add(constants.INFO, ugettext_lazy('lazy message'))
  89. storage.update(response)
  90. storing = self.stored_messages_count(storage, response)
  91. self.assertEqual(storing, 1)
  92. def test_no_update(self):
  93. storage = self.get_storage()
  94. response = self.get_response()
  95. storage.update(response)
  96. storing = self.stored_messages_count(storage, response)
  97. self.assertEqual(storing, 0)
  98. def test_add_update(self):
  99. storage = self.get_storage()
  100. response = self.get_response()
  101. storage.add(constants.INFO, 'Test message 1')
  102. storage.add(constants.INFO, 'Test message 1', extra_tags='tag')
  103. storage.update(response)
  104. storing = self.stored_messages_count(storage, response)
  105. self.assertEqual(storing, 2)
  106. def test_existing_add_read_update(self):
  107. storage = self.get_existing_storage()
  108. response = self.get_response()
  109. storage.add(constants.INFO, 'Test message 3')
  110. list(storage) # Simulates a read
  111. storage.update(response)
  112. storing = self.stored_messages_count(storage, response)
  113. self.assertEqual(storing, 0)
  114. def test_existing_read_add_update(self):
  115. storage = self.get_existing_storage()
  116. response = self.get_response()
  117. list(storage) # Simulates a read
  118. storage.add(constants.INFO, 'Test message 3')
  119. storage.update(response)
  120. storing = self.stored_messages_count(storage, response)
  121. self.assertEqual(storing, 1)
  122. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  123. def test_full_request_response_cycle(self):
  124. """
  125. With the message middleware enabled, tests that messages are properly
  126. stored and then retrieved across the full request/redirect/response
  127. cycle.
  128. """
  129. data = {
  130. 'messages': ['Test message %d' % x for x in range(5)],
  131. }
  132. show_url = reverse('show_message')
  133. for level in ('debug', 'info', 'success', 'warning', 'error'):
  134. add_url = reverse('add_message', args=(level,))
  135. response = self.client.post(add_url, data, follow=True)
  136. self.assertRedirects(response, show_url)
  137. self.assertIn('messages', response.context)
  138. messages = [Message(self.levels[level], msg) for msg in data['messages']]
  139. self.assertEqual(list(response.context['messages']), messages)
  140. for msg in data['messages']:
  141. self.assertContains(response, msg)
  142. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  143. def test_with_template_response(self):
  144. data = {
  145. 'messages': ['Test message %d' % x for x in range(5)],
  146. }
  147. show_url = reverse('show_template_response')
  148. for level in self.levels.keys():
  149. add_url = reverse('add_template_response', args=(level,))
  150. response = self.client.post(add_url, data, follow=True)
  151. self.assertRedirects(response, show_url)
  152. self.assertIn('messages', response.context)
  153. for msg in data['messages']:
  154. self.assertContains(response, msg)
  155. # there shouldn't be any messages on second GET request
  156. response = self.client.get(show_url)
  157. for msg in data['messages']:
  158. self.assertNotContains(response, msg)
  159. def test_context_processor_message_levels(self):
  160. show_url = reverse('show_template_response')
  161. response = self.client.get(show_url)
  162. self.assertIn('DEFAULT_MESSAGE_LEVELS', response.context)
  163. self.assertEqual(response.context['DEFAULT_MESSAGE_LEVELS'], DEFAULT_LEVELS)
  164. @override_settings(MESSAGE_LEVEL=constants.DEBUG)
  165. def test_multiple_posts(self):
  166. """
  167. Tests that messages persist properly when multiple POSTs are made
  168. before a GET.
  169. """
  170. data = {
  171. 'messages': ['Test message %d' % x for x in range(5)],
  172. }
  173. show_url = reverse('show_message')
  174. messages = []
  175. for level in ('debug', 'info', 'success', 'warning', 'error'):
  176. messages.extend(Message(self.levels[level], msg) for msg in data['messages'])
  177. add_url = reverse('add_message', args=(level,))
  178. self.client.post(add_url, data)
  179. response = self.client.get(show_url)
  180. self.assertIn('messages', response.context)
  181. self.assertEqual(list(response.context['messages']), messages)
  182. for msg in data['messages']:
  183. self.assertContains(response, msg)
  184. @modify_settings(
  185. INSTALLED_APPS={'remove': 'django.contrib.messages'},
  186. MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
  187. )
  188. @override_settings(
  189. MESSAGE_LEVEL=constants.DEBUG,
  190. TEMPLATES=[{
  191. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  192. 'DIRS': [],
  193. 'APP_DIRS': True,
  194. }],
  195. )
  196. def test_middleware_disabled(self):
  197. """
  198. Tests that, when the middleware is disabled, an exception is raised
  199. when one attempts to store a message.
  200. """
  201. data = {
  202. 'messages': ['Test message %d' % x for x in range(5)],
  203. }
  204. reverse('show_message')
  205. for level in ('debug', 'info', 'success', 'warning', 'error'):
  206. add_url = reverse('add_message', args=(level,))
  207. with self.assertRaises(MessageFailure):
  208. self.client.post(add_url, data, follow=True)
  209. @modify_settings(
  210. INSTALLED_APPS={'remove': 'django.contrib.messages'},
  211. MIDDLEWARE={'remove': 'django.contrib.messages.middleware.MessageMiddleware'},
  212. )
  213. @override_settings(
  214. TEMPLATES=[{
  215. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  216. 'DIRS': [],
  217. 'APP_DIRS': True,
  218. }],
  219. )
  220. def test_middleware_disabled_fail_silently(self):
  221. """
  222. Tests that, when the middleware is disabled, an exception is not
  223. raised if 'fail_silently' = True
  224. """
  225. data = {
  226. 'messages': ['Test message %d' % x for x in range(5)],
  227. 'fail_silently': True,
  228. }
  229. show_url = reverse('show_message')
  230. for level in ('debug', 'info', 'success', 'warning', 'error'):
  231. add_url = reverse('add_message', args=(level,))
  232. response = self.client.post(add_url, data, follow=True)
  233. self.assertRedirects(response, show_url)
  234. self.assertNotIn('messages', response.context)
  235. def stored_messages_count(self, storage, response):
  236. """
  237. Returns the number of messages being stored after a
  238. ``storage.update()`` call.
  239. """
  240. raise NotImplementedError('This method must be set by a subclass.')
  241. def test_get(self):
  242. raise NotImplementedError('This method must be set by a subclass.')
  243. def get_existing_storage(self):
  244. return self.get_storage([Message(constants.INFO, 'Test message 1'),
  245. Message(constants.INFO, 'Test message 2',
  246. extra_tags='tag')])
  247. def test_existing_read(self):
  248. """
  249. Tests that reading the existing storage doesn't cause the data to be
  250. lost.
  251. """
  252. storage = self.get_existing_storage()
  253. self.assertFalse(storage.used)
  254. # After iterating the storage engine directly, the used flag is set.
  255. data = list(storage)
  256. self.assertTrue(storage.used)
  257. # The data does not disappear because it has been iterated.
  258. self.assertEqual(data, list(storage))
  259. def test_existing_add(self):
  260. storage = self.get_existing_storage()
  261. self.assertFalse(storage.added_new)
  262. storage.add(constants.INFO, 'Test message 3')
  263. self.assertTrue(storage.added_new)
  264. def test_default_level(self):
  265. # get_level works even with no storage on the request.
  266. request = self.get_request()
  267. self.assertEqual(get_level(request), constants.INFO)
  268. # get_level returns the default level if it hasn't been set.
  269. storage = self.get_storage()
  270. request._messages = storage
  271. self.assertEqual(get_level(request), constants.INFO)
  272. # Only messages of sufficient level get recorded.
  273. add_level_messages(storage)
  274. self.assertEqual(len(storage), 5)
  275. def test_low_level(self):
  276. request = self.get_request()
  277. storage = self.storage_class(request)
  278. request._messages = storage
  279. self.assertTrue(set_level(request, 5))
  280. self.assertEqual(get_level(request), 5)
  281. add_level_messages(storage)
  282. self.assertEqual(len(storage), 6)
  283. def test_high_level(self):
  284. request = self.get_request()
  285. storage = self.storage_class(request)
  286. request._messages = storage
  287. self.assertTrue(set_level(request, 30))
  288. self.assertEqual(get_level(request), 30)
  289. add_level_messages(storage)
  290. self.assertEqual(len(storage), 2)
  291. @override_settings(MESSAGE_LEVEL=29)
  292. def test_settings_level(self):
  293. request = self.get_request()
  294. storage = self.storage_class(request)
  295. self.assertEqual(get_level(request), 29)
  296. add_level_messages(storage)
  297. self.assertEqual(len(storage), 3)
  298. def test_tags(self):
  299. storage = self.get_storage()
  300. storage.level = 0
  301. add_level_messages(storage)
  302. tags = [msg.tags for msg in storage]
  303. self.assertEqual(tags, ['info', '', 'extra-tag debug', 'warning', 'error', 'success'])
  304. def test_level_tag(self):
  305. storage = self.get_storage()
  306. storage.level = 0
  307. add_level_messages(storage)
  308. tags = [msg.level_tag for msg in storage]
  309. self.assertEqual(tags, ['info', '', 'debug', 'warning', 'error', 'success'])
  310. @override_settings_tags(MESSAGE_TAGS={
  311. constants.INFO: 'info',
  312. constants.DEBUG: '',
  313. constants.WARNING: '',
  314. constants.ERROR: 'bad',
  315. 29: 'custom',
  316. }
  317. )
  318. def test_custom_tags(self):
  319. storage = self.get_storage()
  320. storage.level = 0
  321. add_level_messages(storage)
  322. tags = [msg.tags for msg in storage]
  323. self.assertEqual(tags, ['info', 'custom', 'extra-tag', '', 'bad', 'success'])