tests.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. # -*- coding: utf-8 -*-
  2. from __future__ import unicode_literals
  3. from datetime import datetime
  4. from django.core import serializers
  5. from django.core.serializers import SerializerDoesNotExist
  6. from django.core.serializers.base import ProgressBar
  7. from django.db import connection, transaction
  8. from django.http import HttpResponse
  9. from django.test import (
  10. SimpleTestCase, mock, override_settings, skipUnlessDBFeature,
  11. )
  12. from django.test.utils import Approximate
  13. from django.utils.functional import curry
  14. from django.utils.six import StringIO
  15. from .models import (
  16. Actor, Article, Author, AuthorProfile, BaseModel, Category, ComplexModel,
  17. Movie, Player, ProxyBaseModel, ProxyProxyBaseModel, Score, Team,
  18. )
  19. @override_settings(
  20. SERIALIZATION_MODULES={
  21. "json2": "django.core.serializers.json",
  22. }
  23. )
  24. class SerializerRegistrationTests(SimpleTestCase):
  25. def setUp(self):
  26. self.old_serializers = serializers._serializers
  27. serializers._serializers = {}
  28. def tearDown(self):
  29. serializers._serializers = self.old_serializers
  30. def test_register(self):
  31. "Registering a new serializer populates the full registry. Refs #14823"
  32. serializers.register_serializer('json3', 'django.core.serializers.json')
  33. public_formats = serializers.get_public_serializer_formats()
  34. self.assertIn('json3', public_formats)
  35. self.assertIn('json2', public_formats)
  36. self.assertIn('xml', public_formats)
  37. def test_unregister(self):
  38. "Unregistering a serializer doesn't cause the registry to be repopulated. Refs #14823"
  39. serializers.unregister_serializer('xml')
  40. serializers.register_serializer('json3', 'django.core.serializers.json')
  41. public_formats = serializers.get_public_serializer_formats()
  42. self.assertNotIn('xml', public_formats)
  43. self.assertIn('json3', public_formats)
  44. def test_unregister_unknown_serializer(self):
  45. with self.assertRaises(SerializerDoesNotExist):
  46. serializers.unregister_serializer("nonsense")
  47. def test_builtin_serializers(self):
  48. "Requesting a list of serializer formats popuates the registry"
  49. all_formats = set(serializers.get_serializer_formats())
  50. public_formats = set(serializers.get_public_serializer_formats())
  51. self.assertIn('xml', all_formats),
  52. self.assertIn('xml', public_formats)
  53. self.assertIn('json2', all_formats)
  54. self.assertIn('json2', public_formats)
  55. self.assertIn('python', all_formats)
  56. self.assertNotIn('python', public_formats)
  57. def test_get_unknown_serializer(self):
  58. """
  59. #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
  60. """
  61. with self.assertRaises(SerializerDoesNotExist):
  62. serializers.get_serializer("nonsense")
  63. with self.assertRaises(KeyError):
  64. serializers.get_serializer("nonsense")
  65. # SerializerDoesNotExist is instantiated with the nonexistent format
  66. with self.assertRaises(SerializerDoesNotExist) as cm:
  67. serializers.get_serializer("nonsense")
  68. self.assertEqual(cm.exception.args, ("nonsense",))
  69. def test_get_unknown_deserializer(self):
  70. with self.assertRaises(SerializerDoesNotExist):
  71. serializers.get_deserializer("nonsense")
  72. class SerializersTestBase(object):
  73. serializer_name = None # Set by subclasses to the serialization format name
  74. @staticmethod
  75. def _comparison_value(value):
  76. return value
  77. def setUp(self):
  78. sports = Category.objects.create(name="Sports")
  79. music = Category.objects.create(name="Music")
  80. op_ed = Category.objects.create(name="Op-Ed")
  81. self.joe = Author.objects.create(name="Joe")
  82. self.jane = Author.objects.create(name="Jane")
  83. self.a1 = Article(
  84. author=self.jane,
  85. headline="Poker has no place on ESPN",
  86. pub_date=datetime(2006, 6, 16, 11, 00)
  87. )
  88. self.a1.save()
  89. self.a1.categories.set([sports, op_ed])
  90. self.a2 = Article(
  91. author=self.joe,
  92. headline="Time to reform copyright",
  93. pub_date=datetime(2006, 6, 16, 13, 00, 11, 345)
  94. )
  95. self.a2.save()
  96. self.a2.categories.set([music, op_ed])
  97. def test_serialize(self):
  98. """Tests that basic serialization works."""
  99. serial_str = serializers.serialize(self.serializer_name,
  100. Article.objects.all())
  101. self.assertTrue(self._validate_output(serial_str))
  102. def test_serializer_roundtrip(self):
  103. """Tests that serialized content can be deserialized."""
  104. serial_str = serializers.serialize(self.serializer_name,
  105. Article.objects.all())
  106. models = list(serializers.deserialize(self.serializer_name, serial_str))
  107. self.assertEqual(len(models), 2)
  108. def test_serialize_to_stream(self):
  109. obj = ComplexModel(field1='first', field2='second', field3='third')
  110. obj.save_base(raw=True)
  111. # Serialize the test database to a stream
  112. for stream in (StringIO(), HttpResponse()):
  113. serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)
  114. # Serialize normally for a comparison
  115. string_data = serializers.serialize(self.serializer_name, [obj], indent=2)
  116. # Check that the two are the same
  117. if isinstance(stream, StringIO):
  118. self.assertEqual(string_data, stream.getvalue())
  119. else:
  120. self.assertEqual(string_data, stream.content.decode('utf-8'))
  121. def test_serialize_specific_fields(self):
  122. obj = ComplexModel(field1='first', field2='second', field3='third')
  123. obj.save_base(raw=True)
  124. # Serialize then deserialize the test database
  125. serialized_data = serializers.serialize(
  126. self.serializer_name, [obj], indent=2, fields=('field1', 'field3')
  127. )
  128. result = next(serializers.deserialize(self.serializer_name, serialized_data))
  129. # Check that the deserialized object contains data in only the serialized fields.
  130. self.assertEqual(result.object.field1, 'first')
  131. self.assertEqual(result.object.field2, '')
  132. self.assertEqual(result.object.field3, 'third')
  133. def test_altering_serialized_output(self):
  134. """
  135. Tests the ability to create new objects by
  136. modifying serialized content.
  137. """
  138. old_headline = "Poker has no place on ESPN"
  139. new_headline = "Poker has no place on television"
  140. serial_str = serializers.serialize(self.serializer_name,
  141. Article.objects.all())
  142. serial_str = serial_str.replace(old_headline, new_headline)
  143. models = list(serializers.deserialize(self.serializer_name, serial_str))
  144. # Prior to saving, old headline is in place
  145. self.assertTrue(Article.objects.filter(headline=old_headline))
  146. self.assertFalse(Article.objects.filter(headline=new_headline))
  147. for model in models:
  148. model.save()
  149. # After saving, new headline is in place
  150. self.assertTrue(Article.objects.filter(headline=new_headline))
  151. self.assertFalse(Article.objects.filter(headline=old_headline))
  152. def test_one_to_one_as_pk(self):
  153. """
  154. Tests that if you use your own primary key field
  155. (such as a OneToOneField), it doesn't appear in the
  156. serialized field list - it replaces the pk identifier.
  157. """
  158. profile = AuthorProfile(author=self.joe,
  159. date_of_birth=datetime(1970, 1, 1))
  160. profile.save()
  161. serial_str = serializers.serialize(self.serializer_name,
  162. AuthorProfile.objects.all())
  163. self.assertFalse(self._get_field_values(serial_str, 'author'))
  164. for obj in serializers.deserialize(self.serializer_name, serial_str):
  165. self.assertEqual(obj.object.pk, self._comparison_value(self.joe.pk))
  166. def test_serialize_field_subset(self):
  167. """Tests that output can be restricted to a subset of fields"""
  168. valid_fields = ('headline', 'pub_date')
  169. invalid_fields = ("author", "categories")
  170. serial_str = serializers.serialize(self.serializer_name,
  171. Article.objects.all(),
  172. fields=valid_fields)
  173. for field_name in invalid_fields:
  174. self.assertFalse(self._get_field_values(serial_str, field_name))
  175. for field_name in valid_fields:
  176. self.assertTrue(self._get_field_values(serial_str, field_name))
  177. def test_serialize_unicode(self):
  178. """Tests that unicode makes the roundtrip intact"""
  179. actor_name = "Za\u017c\u00f3\u0142\u0107"
  180. movie_title = 'G\u0119\u015bl\u0105 ja\u017a\u0144'
  181. ac = Actor(name=actor_name)
  182. mv = Movie(title=movie_title, actor=ac)
  183. ac.save()
  184. mv.save()
  185. serial_str = serializers.serialize(self.serializer_name, [mv])
  186. self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
  187. self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)
  188. obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
  189. mv_obj = obj_list[0].object
  190. self.assertEqual(mv_obj.title, movie_title)
  191. def test_serialize_progressbar(self):
  192. fake_stdout = StringIO()
  193. serializers.serialize(
  194. self.serializer_name, Article.objects.all(),
  195. progress_output=fake_stdout, object_count=Article.objects.count()
  196. )
  197. self.assertTrue(
  198. fake_stdout.getvalue().endswith('[' + '.' * ProgressBar.progress_width + ']\n')
  199. )
  200. def test_serialize_superfluous_queries(self):
  201. """Ensure no superfluous queries are made when serializing ForeignKeys
  202. #17602
  203. """
  204. ac = Actor(name='Actor name')
  205. ac.save()
  206. mv = Movie(title='Movie title', actor_id=ac.pk)
  207. mv.save()
  208. with self.assertNumQueries(0):
  209. serializers.serialize(self.serializer_name, [mv])
  210. def test_serialize_with_null_pk(self):
  211. """
  212. Tests that serialized data with no primary key results
  213. in a model instance with no id
  214. """
  215. category = Category(name="Reference")
  216. serial_str = serializers.serialize(self.serializer_name, [category])
  217. pk_value = self._get_pk_values(serial_str)[0]
  218. self.assertFalse(pk_value)
  219. cat_obj = list(serializers.deserialize(self.serializer_name,
  220. serial_str))[0].object
  221. self.assertEqual(cat_obj.id, None)
  222. def test_float_serialization(self):
  223. """Tests that float values serialize and deserialize intact"""
  224. sc = Score(score=3.4)
  225. sc.save()
  226. serial_str = serializers.serialize(self.serializer_name, [sc])
  227. deserial_objs = list(serializers.deserialize(self.serializer_name,
  228. serial_str))
  229. self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))
  230. def test_deferred_field_serialization(self):
  231. author = Author.objects.create(name='Victor Hugo')
  232. author = Author.objects.defer('name').get(pk=author.pk)
  233. serial_str = serializers.serialize(self.serializer_name, [author])
  234. deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
  235. # Check the class instead of using isinstance() because model instances
  236. # with deferred fields (e.g. Author_Deferred_name) will pass isinstance.
  237. self.assertEqual(deserial_objs[0].object.__class__, Author)
  238. def test_custom_field_serialization(self):
  239. """Tests that custom fields serialize and deserialize intact"""
  240. team_str = "Spartak Moskva"
  241. player = Player()
  242. player.name = "Soslan Djanaev"
  243. player.rank = 1
  244. player.team = Team(team_str)
  245. player.save()
  246. serial_str = serializers.serialize(self.serializer_name,
  247. Player.objects.all())
  248. team = self._get_field_values(serial_str, "team")
  249. self.assertTrue(team)
  250. self.assertEqual(team[0], team_str)
  251. deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
  252. self.assertEqual(deserial_objs[0].object.team.to_string(),
  253. player.team.to_string())
  254. def test_pre_1000ad_date(self):
  255. """Tests that year values before 1000AD are properly formatted"""
  256. # Regression for #12524 -- dates before 1000AD get prefixed
  257. # 0's on the year
  258. a = Article.objects.create(
  259. author=self.jane,
  260. headline="Nobody remembers the early years",
  261. pub_date=datetime(1, 2, 3, 4, 5, 6))
  262. serial_str = serializers.serialize(self.serializer_name, [a])
  263. date_values = self._get_field_values(serial_str, "pub_date")
  264. self.assertEqual(date_values[0].replace('T', ' '), "0001-02-03 04:05:06")
  265. def test_pkless_serialized_strings(self):
  266. """
  267. Tests that serialized strings without PKs
  268. can be turned into models
  269. """
  270. deserial_objs = list(serializers.deserialize(self.serializer_name,
  271. self.pkless_str))
  272. for obj in deserial_objs:
  273. self.assertFalse(obj.object.id)
  274. obj.save()
  275. self.assertEqual(Category.objects.all().count(), 5)
  276. def test_deterministic_mapping_ordering(self):
  277. """Mapping such as fields should be deterministically ordered. (#24558)"""
  278. output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
  279. categories = self.a1.categories.values_list('pk', flat=True)
  280. self.assertEqual(output, self.mapping_ordering_str % {
  281. 'article_pk': self.a1.pk,
  282. 'author_pk': self.a1.author_id,
  283. 'first_category_pk': categories[0],
  284. 'second_category_pk': categories[1],
  285. })
  286. def test_deserialize_force_insert(self):
  287. """Tests that deserialized content can be saved with force_insert as a parameter."""
  288. serial_str = serializers.serialize(self.serializer_name, [self.a1])
  289. deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0]
  290. with mock.patch('django.db.models.Model') as mock_model:
  291. deserial_obj.save(force_insert=False)
  292. mock_model.save_base.assert_called_with(deserial_obj.object, raw=True, using=None, force_insert=False)
  293. @skipUnlessDBFeature('can_defer_constraint_checks')
  294. def test_serialize_proxy_model(self):
  295. BaseModel.objects.create(parent_data=1)
  296. base_objects = BaseModel.objects.all()
  297. proxy_objects = ProxyBaseModel.objects.all()
  298. proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
  299. base_data = serializers.serialize("json", base_objects)
  300. proxy_data = serializers.serialize("json", proxy_objects)
  301. proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
  302. self.assertEqual(base_data, proxy_data.replace('proxy', ''))
  303. self.assertEqual(base_data, proxy_proxy_data.replace('proxy', ''))
  304. class SerializersTransactionTestBase(object):
  305. available_apps = ['serializers']
  306. @skipUnlessDBFeature('supports_forward_references')
  307. def test_forward_refs(self):
  308. """
  309. Tests that objects ids can be referenced before they are
  310. defined in the serialization data.
  311. """
  312. # The deserialization process needs to run in a transaction in order
  313. # to test forward reference handling.
  314. with transaction.atomic():
  315. objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
  316. with connection.constraint_checks_disabled():
  317. for obj in objs:
  318. obj.save()
  319. for model_cls in (Category, Author, Article):
  320. self.assertEqual(model_cls.objects.all().count(), 1)
  321. art_obj = Article.objects.all()[0]
  322. self.assertEqual(art_obj.categories.all().count(), 1)
  323. self.assertEqual(art_obj.author.name, "Agnes")
  324. def register_tests(test_class, method_name, test_func, exclude=None):
  325. """
  326. Dynamically create serializer tests to ensure that all registered
  327. serializers are automatically tested.
  328. """
  329. formats = [
  330. f for f in serializers.get_serializer_formats()
  331. if (not isinstance(serializers.get_serializer(f), serializers.BadSerializer)
  332. and not f == 'geojson'
  333. and (exclude is None or f not in exclude))
  334. ]
  335. for format_ in formats:
  336. setattr(test_class, method_name % format_, curry(test_func, format_))