123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- from datetime import datetime
- from functools import partialmethod
- from io import StringIO
- from unittest import mock, skipIf
- from django.core import serializers
- from django.core.serializers import SerializerDoesNotExist
- from django.core.serializers.base import ProgressBar
- from django.db import connection, transaction
- from django.http import HttpResponse
- from django.test import SimpleTestCase, override_settings, skipUnlessDBFeature
- from django.test.utils import Approximate
- from .models import (
- Actor,
- Article,
- Author,
- AuthorProfile,
- BaseModel,
- Category,
- Child,
- ComplexModel,
- Movie,
- Player,
- ProxyBaseModel,
- ProxyProxyBaseModel,
- Score,
- Team,
- )
- @override_settings(
- SERIALIZATION_MODULES={
- "json2": "django.core.serializers.json",
- }
- )
- class SerializerRegistrationTests(SimpleTestCase):
- def setUp(self):
- self.old_serializers = serializers._serializers
- serializers._serializers = {}
- def tearDown(self):
- serializers._serializers = self.old_serializers
- def test_register(self):
- "Registering a new serializer populates the full registry. Refs #14823"
- serializers.register_serializer("json3", "django.core.serializers.json")
- public_formats = serializers.get_public_serializer_formats()
- self.assertIn("json3", public_formats)
- self.assertIn("json2", public_formats)
- self.assertIn("xml", public_formats)
- def test_unregister(self):
- """
- Unregistering a serializer doesn't cause the registry to be
- repopulated.
- """
- serializers.unregister_serializer("xml")
- serializers.register_serializer("json3", "django.core.serializers.json")
- public_formats = serializers.get_public_serializer_formats()
- self.assertNotIn("xml", public_formats)
- self.assertIn("json3", public_formats)
- def test_unregister_unknown_serializer(self):
- with self.assertRaises(SerializerDoesNotExist):
- serializers.unregister_serializer("nonsense")
- def test_builtin_serializers(self):
- "Requesting a list of serializer formats populates the registry"
- all_formats = set(serializers.get_serializer_formats())
- public_formats = set(serializers.get_public_serializer_formats())
- self.assertIn("xml", all_formats),
- self.assertIn("xml", public_formats)
- self.assertIn("json2", all_formats)
- self.assertIn("json2", public_formats)
- self.assertIn("python", all_formats)
- self.assertNotIn("python", public_formats)
- def test_get_unknown_serializer(self):
- """
- #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
- """
- with self.assertRaises(SerializerDoesNotExist):
- serializers.get_serializer("nonsense")
- with self.assertRaises(KeyError):
- serializers.get_serializer("nonsense")
- # SerializerDoesNotExist is instantiated with the nonexistent format
- with self.assertRaisesMessage(SerializerDoesNotExist, "nonsense"):
- serializers.get_serializer("nonsense")
- def test_get_unknown_deserializer(self):
- with self.assertRaises(SerializerDoesNotExist):
- serializers.get_deserializer("nonsense")
- class SerializersTestBase:
- serializer_name = None # Set by subclasses to the serialization format name
- @classmethod
- def setUpTestData(cls):
- sports = Category.objects.create(name="Sports")
- music = Category.objects.create(name="Music")
- op_ed = Category.objects.create(name="Op-Ed")
- cls.joe = Author.objects.create(name="Joe")
- cls.jane = Author.objects.create(name="Jane")
- cls.a1 = Article(
- author=cls.jane,
- headline="Poker has no place on ESPN",
- pub_date=datetime(2006, 6, 16, 11, 00),
- )
- cls.a1.save()
- cls.a1.categories.set([sports, op_ed])
- cls.a2 = Article(
- author=cls.joe,
- headline="Time to reform copyright",
- pub_date=datetime(2006, 6, 16, 13, 00, 11, 345),
- )
- cls.a2.save()
- cls.a2.categories.set([music, op_ed])
- def test_serialize(self):
- """Basic serialization works."""
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- self.assertTrue(self._validate_output(serial_str))
- def test_serializer_roundtrip(self):
- """Serialized content can be deserialized."""
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- models = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(len(models), 2)
- def test_serialize_to_stream(self):
- obj = ComplexModel(field1="first", field2="second", field3="third")
- obj.save_base(raw=True)
- # Serialize the test database to a stream
- for stream in (StringIO(), HttpResponse()):
- serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)
- # Serialize normally for a comparison
- string_data = serializers.serialize(self.serializer_name, [obj], indent=2)
- # The two are the same
- if isinstance(stream, StringIO):
- self.assertEqual(string_data, stream.getvalue())
- else:
- self.assertEqual(string_data, stream.content.decode())
- def test_serialize_specific_fields(self):
- obj = ComplexModel(field1="first", field2="second", field3="third")
- obj.save_base(raw=True)
- # Serialize then deserialize the test database
- serialized_data = serializers.serialize(
- self.serializer_name, [obj], indent=2, fields=("field1", "field3")
- )
- result = next(serializers.deserialize(self.serializer_name, serialized_data))
- # The deserialized object contains data in only the serialized fields.
- self.assertEqual(result.object.field1, "first")
- self.assertEqual(result.object.field2, "")
- self.assertEqual(result.object.field3, "third")
- def test_altering_serialized_output(self):
- """
- The ability to create new objects by modifying serialized content.
- """
- old_headline = "Poker has no place on ESPN"
- new_headline = "Poker has no place on television"
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- serial_str = serial_str.replace(old_headline, new_headline)
- models = list(serializers.deserialize(self.serializer_name, serial_str))
- # Prior to saving, old headline is in place
- self.assertTrue(Article.objects.filter(headline=old_headline))
- self.assertFalse(Article.objects.filter(headline=new_headline))
- for model in models:
- model.save()
- # After saving, new headline is in place
- self.assertTrue(Article.objects.filter(headline=new_headline))
- self.assertFalse(Article.objects.filter(headline=old_headline))
- def test_one_to_one_as_pk(self):
- """
- If you use your own primary key field (such as a OneToOneField), it
- doesn't appear in the serialized field list - it replaces the pk
- identifier.
- """
- AuthorProfile.objects.create(
- author=self.joe, date_of_birth=datetime(1970, 1, 1)
- )
- serial_str = serializers.serialize(
- self.serializer_name, AuthorProfile.objects.all()
- )
- self.assertFalse(self._get_field_values(serial_str, "author"))
- for obj in serializers.deserialize(self.serializer_name, serial_str):
- self.assertEqual(obj.object.pk, self.joe.pk)
- def test_serialize_field_subset(self):
- """Output can be restricted to a subset of fields"""
- valid_fields = ("headline", "pub_date")
- invalid_fields = ("author", "categories")
- serial_str = serializers.serialize(
- self.serializer_name, Article.objects.all(), fields=valid_fields
- )
- for field_name in invalid_fields:
- self.assertFalse(self._get_field_values(serial_str, field_name))
- for field_name in valid_fields:
- self.assertTrue(self._get_field_values(serial_str, field_name))
- def test_serialize_unicode_roundtrip(self):
- """Unicode makes the roundtrip intact"""
- actor_name = "Za\u017c\u00f3\u0142\u0107"
- movie_title = "G\u0119\u015bl\u0105 ja\u017a\u0144"
- ac = Actor(name=actor_name)
- mv = Movie(title=movie_title, actor=ac)
- ac.save()
- mv.save()
- serial_str = serializers.serialize(self.serializer_name, [mv])
- self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
- self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)
- obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
- mv_obj = obj_list[0].object
- self.assertEqual(mv_obj.title, movie_title)
- def test_unicode_serialization(self):
- unicode_name = "יוניקוד"
- data = serializers.serialize(self.serializer_name, [Author(name=unicode_name)])
- self.assertIn(unicode_name, data)
- objs = list(serializers.deserialize(self.serializer_name, data))
- self.assertEqual(objs[0].object.name, unicode_name)
- def test_serialize_progressbar(self):
- fake_stdout = StringIO()
- serializers.serialize(
- self.serializer_name,
- Article.objects.all(),
- progress_output=fake_stdout,
- object_count=Article.objects.count(),
- )
- self.assertTrue(
- fake_stdout.getvalue().endswith(
- "[" + "." * ProgressBar.progress_width + "]\n"
- )
- )
- def test_serialize_superfluous_queries(self):
- """Ensure no superfluous queries are made when serializing ForeignKeys
- #17602
- """
- ac = Actor(name="Actor name")
- ac.save()
- mv = Movie(title="Movie title", actor_id=ac.pk)
- mv.save()
- with self.assertNumQueries(0):
- serializers.serialize(self.serializer_name, [mv])
- def test_serialize_prefetch_related_m2m(self):
- # One query for the Article table and one for each prefetched m2m
- # field.
- with self.assertNumQueries(3):
- serializers.serialize(
- self.serializer_name,
- Article.objects.prefetch_related("categories", "meta_data"),
- )
- # One query for the Article table, and two m2m queries for each
- # article.
- with self.assertNumQueries(5):
- serializers.serialize(self.serializer_name, Article.objects.all())
- def test_serialize_with_null_pk(self):
- """
- Serialized data with no primary key results
- in a model instance with no id
- """
- category = Category(name="Reference")
- serial_str = serializers.serialize(self.serializer_name, [category])
- pk_value = self._get_pk_values(serial_str)[0]
- self.assertFalse(pk_value)
- cat_obj = list(serializers.deserialize(self.serializer_name, serial_str))[
- 0
- ].object
- self.assertIsNone(cat_obj.id)
- def test_float_serialization(self):
- """Float values serialize and deserialize intact"""
- sc = Score(score=3.4)
- sc.save()
- serial_str = serializers.serialize(self.serializer_name, [sc])
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))
- def test_deferred_field_serialization(self):
- author = Author.objects.create(name="Victor Hugo")
- author = Author.objects.defer("name").get(pk=author.pk)
- serial_str = serializers.serialize(self.serializer_name, [author])
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertIsInstance(deserial_objs[0].object, Author)
- def test_custom_field_serialization(self):
- """Custom fields serialize and deserialize intact"""
- team_str = "Spartak Moskva"
- player = Player()
- player.name = "Soslan Djanaev"
- player.rank = 1
- player.team = Team(team_str)
- player.save()
- serial_str = serializers.serialize(self.serializer_name, Player.objects.all())
- team = self._get_field_values(serial_str, "team")
- self.assertTrue(team)
- self.assertEqual(team[0], team_str)
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(
- deserial_objs[0].object.team.to_string(), player.team.to_string()
- )
- def test_pre_1000ad_date(self):
- """Year values before 1000AD are properly formatted"""
- # Regression for #12524 -- dates before 1000AD get prefixed
- # 0's on the year
- a = Article.objects.create(
- author=self.jane,
- headline="Nobody remembers the early years",
- pub_date=datetime(1, 2, 3, 4, 5, 6),
- )
- serial_str = serializers.serialize(self.serializer_name, [a])
- date_values = self._get_field_values(serial_str, "pub_date")
- self.assertEqual(date_values[0].replace("T", " "), "0001-02-03 04:05:06")
- def test_pkless_serialized_strings(self):
- """
- Serialized strings without PKs can be turned into models
- """
- deserial_objs = list(
- serializers.deserialize(self.serializer_name, self.pkless_str)
- )
- for obj in deserial_objs:
- self.assertFalse(obj.object.id)
- obj.save()
- self.assertEqual(Category.objects.count(), 5)
- def test_deterministic_mapping_ordering(self):
- """Mapping such as fields should be deterministically ordered. (#24558)"""
- output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
- categories = self.a1.categories.values_list("pk", flat=True)
- self.assertEqual(
- output,
- self.mapping_ordering_str
- % {
- "article_pk": self.a1.pk,
- "author_pk": self.a1.author_id,
- "first_category_pk": categories[0],
- "second_category_pk": categories[1],
- },
- )
- def test_deserialize_force_insert(self):
- """Deserialized content can be saved with force_insert as a parameter."""
- serial_str = serializers.serialize(self.serializer_name, [self.a1])
- deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[
- 0
- ]
- with mock.patch("django.db.models.Model") as mock_model:
- deserial_obj.save(force_insert=False)
- mock_model.save_base.assert_called_with(
- deserial_obj.object, raw=True, using=None, force_insert=False
- )
- @skipUnlessDBFeature("can_defer_constraint_checks")
- def test_serialize_proxy_model(self):
- BaseModel.objects.create(parent_data=1)
- base_objects = BaseModel.objects.all()
- proxy_objects = ProxyBaseModel.objects.all()
- proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
- base_data = serializers.serialize("json", base_objects)
- proxy_data = serializers.serialize("json", proxy_objects)
- proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
- self.assertEqual(base_data, proxy_data.replace("proxy", ""))
- self.assertEqual(base_data, proxy_proxy_data.replace("proxy", ""))
- def test_serialize_inherited_fields(self):
- child_1 = Child.objects.create(parent_data="a", child_data="b")
- child_2 = Child.objects.create(parent_data="c", child_data="d")
- child_1.parent_m2m.add(child_2)
- child_data = serializers.serialize(self.serializer_name, [child_1, child_2])
- self.assertEqual(self._get_field_values(child_data, "parent_m2m"), [])
- self.assertEqual(self._get_field_values(child_data, "parent_data"), [])
- def test_serialize_only_pk(self):
- with self.assertNumQueries(5) as ctx:
- serializers.serialize(
- self.serializer_name,
- Article.objects.all(),
- use_natural_foreign_keys=False,
- )
- categories_sql = ctx[1]["sql"]
- self.assertNotIn(connection.ops.quote_name("meta_data_id"), categories_sql)
- meta_data_sql = ctx[2]["sql"]
- self.assertNotIn(connection.ops.quote_name("kind"), meta_data_sql)
- def test_serialize_no_only_pk_with_natural_keys(self):
- with self.assertNumQueries(5) as ctx:
- serializers.serialize(
- self.serializer_name,
- Article.objects.all(),
- use_natural_foreign_keys=True,
- )
- categories_sql = ctx[1]["sql"]
- self.assertNotIn(connection.ops.quote_name("meta_data_id"), categories_sql)
- # CategoryMetaData has natural_key().
- meta_data_sql = ctx[2]["sql"]
- self.assertIn(connection.ops.quote_name("kind"), meta_data_sql)
- class SerializerAPITests(SimpleTestCase):
- def test_stream_class(self):
- class File:
- def __init__(self):
- self.lines = []
- def write(self, line):
- self.lines.append(line)
- def getvalue(self):
- return "".join(self.lines)
- class Serializer(serializers.json.Serializer):
- stream_class = File
- serializer = Serializer()
- data = serializer.serialize([Score(id=1, score=3.4)])
- self.assertIs(serializer.stream_class, File)
- self.assertIsInstance(serializer.stream, File)
- self.assertEqual(
- data, '[{"model": "serializers.score", "pk": 1, "fields": {"score": 3.4}}]'
- )
- class SerializersTransactionTestBase:
- available_apps = ["serializers"]
- @skipUnlessDBFeature("supports_forward_references")
- def test_forward_refs(self):
- """
- Objects ids can be referenced before they are
- defined in the serialization data.
- """
- # The deserialization process needs to run in a transaction in order
- # to test forward reference handling.
- with transaction.atomic():
- objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
- with connection.constraint_checks_disabled():
- for obj in objs:
- obj.save()
- for model_cls in (Category, Author, Article):
- self.assertEqual(model_cls.objects.count(), 1)
- art_obj = Article.objects.all()[0]
- self.assertEqual(art_obj.categories.count(), 1)
- self.assertEqual(art_obj.author.name, "Agnes")
- def register_tests(test_class, method_name, test_func, exclude=()):
- """
- Dynamically create serializer tests to ensure that all registered
- serializers are automatically tested.
- """
- for format_ in serializers.get_serializer_formats():
- if format_ == "geojson" or format_ in exclude:
- continue
- decorated_func = skipIf(
- isinstance(serializers.get_serializer(format_), serializers.BadSerializer),
- "The Python library for the %s serializer is not installed." % format_,
- )(test_func)
- setattr(
- test_class, method_name % format_, partialmethod(decorated_func, format_)
- )
|