test_cookie.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. import json
  2. import random
  3. from django.conf import settings
  4. from django.contrib.messages import constants
  5. from django.contrib.messages.storage.base import Message
  6. from django.contrib.messages.storage.cookie import (
  7. CookieStorage, MessageDecoder, MessageEncoder,
  8. )
  9. from django.core.signing import get_cookie_signer
  10. from django.test import SimpleTestCase, override_settings
  11. from django.utils.crypto import get_random_string
  12. from django.utils.safestring import SafeData, mark_safe
  13. from .base import BaseTests
  14. def set_cookie_data(storage, messages, invalid=False, encode_empty=False):
  15. """
  16. Set ``request.COOKIES`` with the encoded data and remove the storage
  17. backend's loaded data cache.
  18. """
  19. encoded_data = storage._encode(messages, encode_empty=encode_empty)
  20. if invalid:
  21. # Truncate the first character so that the hash is invalid.
  22. encoded_data = encoded_data[1:]
  23. storage.request.COOKIES = {CookieStorage.cookie_name: encoded_data}
  24. if hasattr(storage, '_loaded_data'):
  25. del storage._loaded_data
  26. def stored_cookie_messages_count(storage, response):
  27. """
  28. Return an integer containing the number of messages stored.
  29. """
  30. # Get a list of cookies, excluding ones with a max-age of 0 (because
  31. # they have been marked for deletion).
  32. cookie = response.cookies.get(storage.cookie_name)
  33. if not cookie or cookie['max-age'] == 0:
  34. return 0
  35. data = storage._decode(cookie.value)
  36. if not data:
  37. return 0
  38. if data[-1] == CookieStorage.not_finished:
  39. data.pop()
  40. return len(data)
  41. @override_settings(SESSION_COOKIE_DOMAIN='.example.com', SESSION_COOKIE_SECURE=True, SESSION_COOKIE_HTTPONLY=True)
  42. class CookieTests(BaseTests, SimpleTestCase):
  43. storage_class = CookieStorage
  44. def stored_messages_count(self, storage, response):
  45. return stored_cookie_messages_count(storage, response)
  46. def test_get(self):
  47. storage = self.storage_class(self.get_request())
  48. # Set initial data.
  49. example_messages = ['test', 'me']
  50. set_cookie_data(storage, example_messages)
  51. # The message contains what's expected.
  52. self.assertEqual(list(storage), example_messages)
  53. @override_settings(SESSION_COOKIE_SAMESITE='Strict')
  54. def test_cookie_setings(self):
  55. """
  56. CookieStorage honors SESSION_COOKIE_DOMAIN, SESSION_COOKIE_SECURE, and
  57. SESSION_COOKIE_HTTPONLY (#15618, #20972).
  58. """
  59. # Test before the messages have been consumed
  60. storage = self.get_storage()
  61. response = self.get_response()
  62. storage.add(constants.INFO, 'test')
  63. storage.update(response)
  64. messages = storage._decode(response.cookies['messages'].value)
  65. self.assertEqual(len(messages), 1)
  66. self.assertEqual(messages[0].message, 'test')
  67. self.assertEqual(response.cookies['messages']['domain'], '.example.com')
  68. self.assertEqual(response.cookies['messages']['expires'], '')
  69. self.assertIs(response.cookies['messages']['secure'], True)
  70. self.assertIs(response.cookies['messages']['httponly'], True)
  71. self.assertEqual(response.cookies['messages']['samesite'], 'Strict')
  72. # Test deletion of the cookie (storing with an empty value) after the messages have been consumed
  73. storage = self.get_storage()
  74. response = self.get_response()
  75. storage.add(constants.INFO, 'test')
  76. for m in storage:
  77. pass # Iterate through the storage to simulate consumption of messages.
  78. storage.update(response)
  79. self.assertEqual(response.cookies['messages'].value, '')
  80. self.assertEqual(response.cookies['messages']['domain'], '.example.com')
  81. self.assertEqual(response.cookies['messages']['expires'], 'Thu, 01 Jan 1970 00:00:00 GMT')
  82. self.assertEqual(
  83. response.cookies['messages']['samesite'],
  84. settings.SESSION_COOKIE_SAMESITE,
  85. )
  86. def test_get_bad_cookie(self):
  87. request = self.get_request()
  88. storage = self.storage_class(request)
  89. # Set initial (invalid) data.
  90. example_messages = ['test', 'me']
  91. set_cookie_data(storage, example_messages, invalid=True)
  92. # The message actually contains what we expect.
  93. self.assertEqual(list(storage), [])
  94. def test_max_cookie_length(self):
  95. """
  96. If the data exceeds what is allowed in a cookie, older messages are
  97. removed before saving (and returned by the ``update`` method).
  98. """
  99. storage = self.get_storage()
  100. response = self.get_response()
  101. # When storing as a cookie, the cookie has constant overhead of approx
  102. # 54 chars, and each message has a constant overhead of about 37 chars
  103. # and a variable overhead of zero in the best case. We aim for a message
  104. # size which will fit 4 messages into the cookie, but not 5.
  105. # See also FallbackTest.test_session_fallback
  106. msg_size = int((CookieStorage.max_cookie_size - 54) / 4.5 - 37)
  107. first_msg = None
  108. # Generate the same (tested) content every time that does not get run
  109. # through zlib compression.
  110. random.seed(42)
  111. for i in range(5):
  112. msg = get_random_string(msg_size)
  113. storage.add(constants.INFO, msg)
  114. if i == 0:
  115. first_msg = msg
  116. unstored_messages = storage.update(response)
  117. cookie_storing = self.stored_messages_count(storage, response)
  118. self.assertEqual(cookie_storing, 4)
  119. self.assertEqual(len(unstored_messages), 1)
  120. self.assertEqual(unstored_messages[0].message, first_msg)
  121. def test_message_rfc6265(self):
  122. non_compliant_chars = ['\\', ',', ';', '"']
  123. messages = ['\\te,st', ';m"e', '\u2019', '123"NOTRECEIVED"']
  124. storage = self.get_storage()
  125. encoded = storage._encode(messages)
  126. for illegal in non_compliant_chars:
  127. self.assertEqual(encoded.find(illegal), -1)
  128. def test_json_encoder_decoder(self):
  129. """
  130. A complex nested data structure containing Message
  131. instances is properly encoded/decoded by the custom JSON
  132. encoder/decoder classes.
  133. """
  134. messages = [
  135. {
  136. 'message': Message(constants.INFO, 'Test message'),
  137. 'message_list': [
  138. Message(constants.INFO, 'message %s') for x in range(5)
  139. ] + [{'another-message': Message(constants.ERROR, 'error')}],
  140. },
  141. Message(constants.INFO, 'message %s'),
  142. ]
  143. encoder = MessageEncoder()
  144. value = encoder.encode(messages)
  145. decoded_messages = json.loads(value, cls=MessageDecoder)
  146. self.assertEqual(messages, decoded_messages)
  147. def test_safedata(self):
  148. """
  149. A message containing SafeData is keeping its safe status when
  150. retrieved from the message storage.
  151. """
  152. def encode_decode(data):
  153. message = Message(constants.DEBUG, data)
  154. encoded = storage._encode(message)
  155. decoded = storage._decode(encoded)
  156. return decoded.message
  157. storage = self.get_storage()
  158. self.assertIsInstance(encode_decode(mark_safe("<b>Hello Django!</b>")), SafeData)
  159. self.assertNotIsInstance(encode_decode("<b>Hello Django!</b>"), SafeData)
  160. def test_legacy_encode_decode(self):
  161. # RemovedInDjango41Warning: pre-Django 3.2 encoded messages will be
  162. # invalid.
  163. storage = self.storage_class(self.get_request())
  164. messages = ['this', 'that']
  165. # Encode/decode a message using the pre-Django 3.2 format.
  166. encoder = MessageEncoder()
  167. value = encoder.encode(messages)
  168. signer = get_cookie_signer(salt=storage.key_salt)
  169. encoded_messages = signer.sign(value)
  170. decoded_messages = storage._decode(encoded_messages)
  171. self.assertEqual(messages, decoded_messages)