123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- import pickle
- from datetime import datetime
- from functools import partialmethod
- from io import StringIO
- from unittest import mock, skipIf
- from django.core import serializers
- from django.core.serializers import SerializerDoesNotExist
- from django.core.serializers.base import PickleSerializer, ProgressBar
- from django.db import connection, transaction
- from django.http import HttpResponse
- from django.test import SimpleTestCase, override_settings, skipUnlessDBFeature
- from django.test.utils import Approximate, ignore_warnings
- from django.utils.deprecation import RemovedInDjango50Warning
- from .models import (
- Actor, Article, Author, AuthorProfile, BaseModel, Category, Child,
- ComplexModel, Movie, Player, ProxyBaseModel, ProxyProxyBaseModel, Score,
- Team,
- )
- @override_settings(
- SERIALIZATION_MODULES={
- "json2": "django.core.serializers.json",
- }
- )
- class SerializerRegistrationTests(SimpleTestCase):
- def setUp(self):
- self.old_serializers = serializers._serializers
- serializers._serializers = {}
- def tearDown(self):
- serializers._serializers = self.old_serializers
- def test_register(self):
- "Registering a new serializer populates the full registry. Refs #14823"
- serializers.register_serializer('json3', 'django.core.serializers.json')
- public_formats = serializers.get_public_serializer_formats()
- self.assertIn('json3', public_formats)
- self.assertIn('json2', public_formats)
- self.assertIn('xml', public_formats)
- def test_unregister(self):
- "Unregistering a serializer doesn't cause the registry to be repopulated. Refs #14823"
- serializers.unregister_serializer('xml')
- serializers.register_serializer('json3', 'django.core.serializers.json')
- public_formats = serializers.get_public_serializer_formats()
- self.assertNotIn('xml', public_formats)
- self.assertIn('json3', public_formats)
- def test_unregister_unknown_serializer(self):
- with self.assertRaises(SerializerDoesNotExist):
- serializers.unregister_serializer("nonsense")
- def test_builtin_serializers(self):
- "Requesting a list of serializer formats populates the registry"
- all_formats = set(serializers.get_serializer_formats())
- public_formats = set(serializers.get_public_serializer_formats())
- self.assertIn('xml', all_formats),
- self.assertIn('xml', public_formats)
- self.assertIn('json2', all_formats)
- self.assertIn('json2', public_formats)
- self.assertIn('python', all_formats)
- self.assertNotIn('python', public_formats)
- def test_get_unknown_serializer(self):
- """
- #15889: get_serializer('nonsense') raises a SerializerDoesNotExist
- """
- with self.assertRaises(SerializerDoesNotExist):
- serializers.get_serializer("nonsense")
- with self.assertRaises(KeyError):
- serializers.get_serializer("nonsense")
- # SerializerDoesNotExist is instantiated with the nonexistent format
- with self.assertRaisesMessage(SerializerDoesNotExist, 'nonsense'):
- serializers.get_serializer("nonsense")
- def test_get_unknown_deserializer(self):
- with self.assertRaises(SerializerDoesNotExist):
- serializers.get_deserializer("nonsense")
- class SerializersTestBase:
- serializer_name = None # Set by subclasses to the serialization format name
- @classmethod
- def setUpTestData(cls):
- sports = Category.objects.create(name="Sports")
- music = Category.objects.create(name="Music")
- op_ed = Category.objects.create(name="Op-Ed")
- cls.joe = Author.objects.create(name='Joe')
- cls.jane = Author.objects.create(name='Jane')
- cls.a1 = Article(
- author=cls.jane,
- headline="Poker has no place on ESPN",
- pub_date=datetime(2006, 6, 16, 11, 00)
- )
- cls.a1.save()
- cls.a1.categories.set([sports, op_ed])
- cls.a2 = Article(
- author=cls.joe,
- headline="Time to reform copyright",
- pub_date=datetime(2006, 6, 16, 13, 00, 11, 345)
- )
- cls.a2.save()
- cls.a2.categories.set([music, op_ed])
- def test_serialize(self):
- """Basic serialization works."""
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- self.assertTrue(self._validate_output(serial_str))
- def test_serializer_roundtrip(self):
- """Serialized content can be deserialized."""
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- models = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(len(models), 2)
- def test_serialize_to_stream(self):
- obj = ComplexModel(field1='first', field2='second', field3='third')
- obj.save_base(raw=True)
- # Serialize the test database to a stream
- for stream in (StringIO(), HttpResponse()):
- serializers.serialize(self.serializer_name, [obj], indent=2, stream=stream)
- # Serialize normally for a comparison
- string_data = serializers.serialize(self.serializer_name, [obj], indent=2)
- # The two are the same
- if isinstance(stream, StringIO):
- self.assertEqual(string_data, stream.getvalue())
- else:
- self.assertEqual(string_data, stream.content.decode())
- def test_serialize_specific_fields(self):
- obj = ComplexModel(field1='first', field2='second', field3='third')
- obj.save_base(raw=True)
- # Serialize then deserialize the test database
- serialized_data = serializers.serialize(
- self.serializer_name, [obj], indent=2, fields=('field1', 'field3')
- )
- result = next(serializers.deserialize(self.serializer_name, serialized_data))
- # The deserialized object contains data in only the serialized fields.
- self.assertEqual(result.object.field1, 'first')
- self.assertEqual(result.object.field2, '')
- self.assertEqual(result.object.field3, 'third')
- def test_altering_serialized_output(self):
- """
- The ability to create new objects by modifying serialized content.
- """
- old_headline = "Poker has no place on ESPN"
- new_headline = "Poker has no place on television"
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all())
- serial_str = serial_str.replace(old_headline, new_headline)
- models = list(serializers.deserialize(self.serializer_name, serial_str))
- # Prior to saving, old headline is in place
- self.assertTrue(Article.objects.filter(headline=old_headline))
- self.assertFalse(Article.objects.filter(headline=new_headline))
- for model in models:
- model.save()
- # After saving, new headline is in place
- self.assertTrue(Article.objects.filter(headline=new_headline))
- self.assertFalse(Article.objects.filter(headline=old_headline))
- def test_one_to_one_as_pk(self):
- """
- If you use your own primary key field (such as a OneToOneField), it
- doesn't appear in the serialized field list - it replaces the pk
- identifier.
- """
- AuthorProfile.objects.create(author=self.joe, date_of_birth=datetime(1970, 1, 1))
- serial_str = serializers.serialize(self.serializer_name, AuthorProfile.objects.all())
- self.assertFalse(self._get_field_values(serial_str, 'author'))
- for obj in serializers.deserialize(self.serializer_name, serial_str):
- self.assertEqual(obj.object.pk, self.joe.pk)
- def test_serialize_field_subset(self):
- """Output can be restricted to a subset of fields"""
- valid_fields = ('headline', 'pub_date')
- invalid_fields = ("author", "categories")
- serial_str = serializers.serialize(self.serializer_name, Article.objects.all(), fields=valid_fields)
- for field_name in invalid_fields:
- self.assertFalse(self._get_field_values(serial_str, field_name))
- for field_name in valid_fields:
- self.assertTrue(self._get_field_values(serial_str, field_name))
- def test_serialize_unicode_roundtrip(self):
- """Unicode makes the roundtrip intact"""
- actor_name = "Za\u017c\u00f3\u0142\u0107"
- movie_title = 'G\u0119\u015bl\u0105 ja\u017a\u0144'
- ac = Actor(name=actor_name)
- mv = Movie(title=movie_title, actor=ac)
- ac.save()
- mv.save()
- serial_str = serializers.serialize(self.serializer_name, [mv])
- self.assertEqual(self._get_field_values(serial_str, "title")[0], movie_title)
- self.assertEqual(self._get_field_values(serial_str, "actor")[0], actor_name)
- obj_list = list(serializers.deserialize(self.serializer_name, serial_str))
- mv_obj = obj_list[0].object
- self.assertEqual(mv_obj.title, movie_title)
- def test_unicode_serialization(self):
- unicode_name = 'יוניקוד'
- data = serializers.serialize(self.serializer_name, [Author(name=unicode_name)])
- self.assertIn(unicode_name, data)
- objs = list(serializers.deserialize(self.serializer_name, data))
- self.assertEqual(objs[0].object.name, unicode_name)
- def test_serialize_progressbar(self):
- fake_stdout = StringIO()
- serializers.serialize(
- self.serializer_name, Article.objects.all(),
- progress_output=fake_stdout, object_count=Article.objects.count()
- )
- self.assertTrue(
- fake_stdout.getvalue().endswith('[' + '.' * ProgressBar.progress_width + ']\n')
- )
- def test_serialize_superfluous_queries(self):
- """Ensure no superfluous queries are made when serializing ForeignKeys
- #17602
- """
- ac = Actor(name='Actor name')
- ac.save()
- mv = Movie(title='Movie title', actor_id=ac.pk)
- mv.save()
- with self.assertNumQueries(0):
- serializers.serialize(self.serializer_name, [mv])
- def test_serialize_prefetch_related_m2m(self):
- # One query for the Article table and one for each prefetched m2m
- # field.
- with self.assertNumQueries(3):
- serializers.serialize(
- self.serializer_name,
- Article.objects.all().prefetch_related('categories', 'meta_data'),
- )
- # One query for the Article table, and two m2m queries for each
- # article.
- with self.assertNumQueries(5):
- serializers.serialize(self.serializer_name, Article.objects.all())
- def test_serialize_with_null_pk(self):
- """
- Serialized data with no primary key results
- in a model instance with no id
- """
- category = Category(name="Reference")
- serial_str = serializers.serialize(self.serializer_name, [category])
- pk_value = self._get_pk_values(serial_str)[0]
- self.assertFalse(pk_value)
- cat_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0].object
- self.assertIsNone(cat_obj.id)
- def test_float_serialization(self):
- """Float values serialize and deserialize intact"""
- sc = Score(score=3.4)
- sc.save()
- serial_str = serializers.serialize(self.serializer_name, [sc])
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(deserial_objs[0].object.score, Approximate(3.4, places=1))
- def test_deferred_field_serialization(self):
- author = Author.objects.create(name='Victor Hugo')
- author = Author.objects.defer('name').get(pk=author.pk)
- serial_str = serializers.serialize(self.serializer_name, [author])
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertIsInstance(deserial_objs[0].object, Author)
- def test_custom_field_serialization(self):
- """Custom fields serialize and deserialize intact"""
- team_str = "Spartak Moskva"
- player = Player()
- player.name = "Soslan Djanaev"
- player.rank = 1
- player.team = Team(team_str)
- player.save()
- serial_str = serializers.serialize(self.serializer_name, Player.objects.all())
- team = self._get_field_values(serial_str, "team")
- self.assertTrue(team)
- self.assertEqual(team[0], team_str)
- deserial_objs = list(serializers.deserialize(self.serializer_name, serial_str))
- self.assertEqual(deserial_objs[0].object.team.to_string(), player.team.to_string())
- def test_pre_1000ad_date(self):
- """Year values before 1000AD are properly formatted"""
- # Regression for #12524 -- dates before 1000AD get prefixed
- # 0's on the year
- a = Article.objects.create(
- author=self.jane,
- headline="Nobody remembers the early years",
- pub_date=datetime(1, 2, 3, 4, 5, 6))
- serial_str = serializers.serialize(self.serializer_name, [a])
- date_values = self._get_field_values(serial_str, "pub_date")
- self.assertEqual(date_values[0].replace('T', ' '), "0001-02-03 04:05:06")
- def test_pkless_serialized_strings(self):
- """
- Serialized strings without PKs can be turned into models
- """
- deserial_objs = list(serializers.deserialize(self.serializer_name, self.pkless_str))
- for obj in deserial_objs:
- self.assertFalse(obj.object.id)
- obj.save()
- self.assertEqual(Category.objects.all().count(), 5)
- def test_deterministic_mapping_ordering(self):
- """Mapping such as fields should be deterministically ordered. (#24558)"""
- output = serializers.serialize(self.serializer_name, [self.a1], indent=2)
- categories = self.a1.categories.values_list('pk', flat=True)
- self.assertEqual(output, self.mapping_ordering_str % {
- 'article_pk': self.a1.pk,
- 'author_pk': self.a1.author_id,
- 'first_category_pk': categories[0],
- 'second_category_pk': categories[1],
- })
- def test_deserialize_force_insert(self):
- """Deserialized content can be saved with force_insert as a parameter."""
- serial_str = serializers.serialize(self.serializer_name, [self.a1])
- deserial_obj = list(serializers.deserialize(self.serializer_name, serial_str))[0]
- with mock.patch('django.db.models.Model') as mock_model:
- deserial_obj.save(force_insert=False)
- mock_model.save_base.assert_called_with(deserial_obj.object, raw=True, using=None, force_insert=False)
- @skipUnlessDBFeature('can_defer_constraint_checks')
- def test_serialize_proxy_model(self):
- BaseModel.objects.create(parent_data=1)
- base_objects = BaseModel.objects.all()
- proxy_objects = ProxyBaseModel.objects.all()
- proxy_proxy_objects = ProxyProxyBaseModel.objects.all()
- base_data = serializers.serialize("json", base_objects)
- proxy_data = serializers.serialize("json", proxy_objects)
- proxy_proxy_data = serializers.serialize("json", proxy_proxy_objects)
- self.assertEqual(base_data, proxy_data.replace('proxy', ''))
- self.assertEqual(base_data, proxy_proxy_data.replace('proxy', ''))
- def test_serialize_inherited_fields(self):
- child_1 = Child.objects.create(parent_data='a', child_data='b')
- child_2 = Child.objects.create(parent_data='c', child_data='d')
- child_1.parent_m2m.add(child_2)
- child_data = serializers.serialize(self.serializer_name, [child_1, child_2])
- self.assertEqual(self._get_field_values(child_data, 'parent_m2m'), [])
- self.assertEqual(self._get_field_values(child_data, 'parent_data'), [])
- class SerializerAPITests(SimpleTestCase):
- def test_stream_class(self):
- class File:
- def __init__(self):
- self.lines = []
- def write(self, line):
- self.lines.append(line)
- def getvalue(self):
- return ''.join(self.lines)
- class Serializer(serializers.json.Serializer):
- stream_class = File
- serializer = Serializer()
- data = serializer.serialize([Score(id=1, score=3.4)])
- self.assertIs(serializer.stream_class, File)
- self.assertIsInstance(serializer.stream, File)
- self.assertEqual(data, '[{"model": "serializers.score", "pk": 1, "fields": {"score": 3.4}}]')
- class SerializersTransactionTestBase:
- available_apps = ['serializers']
- @skipUnlessDBFeature('supports_forward_references')
- def test_forward_refs(self):
- """
- Objects ids can be referenced before they are
- defined in the serialization data.
- """
- # The deserialization process needs to run in a transaction in order
- # to test forward reference handling.
- with transaction.atomic():
- objs = serializers.deserialize(self.serializer_name, self.fwd_ref_str)
- with connection.constraint_checks_disabled():
- for obj in objs:
- obj.save()
- for model_cls in (Category, Author, Article):
- self.assertEqual(model_cls.objects.all().count(), 1)
- art_obj = Article.objects.all()[0]
- self.assertEqual(art_obj.categories.all().count(), 1)
- self.assertEqual(art_obj.author.name, "Agnes")
- class PickleSerializerTests(SimpleTestCase):
- @ignore_warnings(category=RemovedInDjango50Warning)
- def test_serializer_protocol(self):
- serializer = PickleSerializer(protocol=3)
- self.assertEqual(serializer.protocol, 3)
- # If protocol is not provided, it defaults to pickle.HIGHEST_PROTOCOL
- serializer = PickleSerializer()
- self.assertEqual(serializer.protocol, pickle.HIGHEST_PROTOCOL)
- @ignore_warnings(category=RemovedInDjango50Warning)
- def test_serializer_loads_dumps(self):
- serializer = PickleSerializer()
- test_data = 'test data'
- dump = serializer.dumps(test_data)
- self.assertEqual(serializer.loads(dump), test_data)
- def test_serializer_warning(self):
- msg = (
- 'PickleSerializer is deprecated due to its security risk. Use '
- 'JSONSerializer instead.'
- )
- with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
- PickleSerializer()
- def register_tests(test_class, method_name, test_func, exclude=()):
- """
- Dynamically create serializer tests to ensure that all registered
- serializers are automatically tested.
- """
- for format_ in serializers.get_serializer_formats():
- if format_ == 'geojson' or format_ in exclude:
- continue
- decorated_func = skipIf(
- isinstance(serializers.get_serializer(format_), serializers.BadSerializer),
- 'The Python library for the %s serializer is not installed.' % format_,
- )(test_func)
- setattr(test_class, method_name % format_, partialmethod(decorated_func, format_))
|