123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- import datetime
- import pickle
- import django
- from django.db import models
- from django.test import TestCase
- from .models import (
- BinaryFieldModel,
- Container,
- Event,
- Group,
- Happening,
- M2MModel,
- MyEvent,
- )
- class PickleabilityTestCase(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.happening = (
- Happening.objects.create()
- ) # make sure the defaults are working (#20158)
- def assert_pickles(self, qs):
- self.assertEqual(list(pickle.loads(pickle.dumps(qs))), list(qs))
- def test_binaryfield(self):
- BinaryFieldModel.objects.create(data=b"binary data")
- self.assert_pickles(BinaryFieldModel.objects.all())
- def test_related_field(self):
- g = Group.objects.create(name="Ponies Who Own Maybachs")
- self.assert_pickles(Event.objects.filter(group=g.id))
- def test_datetime_callable_default_all(self):
- self.assert_pickles(Happening.objects.all())
- def test_datetime_callable_default_filter(self):
- self.assert_pickles(Happening.objects.filter(when=datetime.datetime.now()))
- def test_string_as_default(self):
- self.assert_pickles(Happening.objects.filter(name="test"))
- def test_standalone_method_as_default(self):
- self.assert_pickles(Happening.objects.filter(number1=1))
- def test_staticmethod_as_default(self):
- self.assert_pickles(Happening.objects.filter(number2=1))
- def test_filter_reverse_fk(self):
- self.assert_pickles(Group.objects.filter(event=1))
- def test_doesnotexist_exception(self):
- # Ticket #17776
- original = Event.DoesNotExist("Doesn't exist")
- unpickled = pickle.loads(pickle.dumps(original))
- # Exceptions are not equal to equivalent instances of themselves, so
- # can't just use assertEqual(original, unpickled)
- self.assertEqual(original.__class__, unpickled.__class__)
- self.assertEqual(original.args, unpickled.args)
- def test_doesnotexist_class(self):
- klass = Event.DoesNotExist
- self.assertIs(pickle.loads(pickle.dumps(klass)), klass)
- def test_multipleobjectsreturned_class(self):
- klass = Event.MultipleObjectsReturned
- self.assertIs(pickle.loads(pickle.dumps(klass)), klass)
- def test_forward_relatedobjectdoesnotexist_class(self):
- # ForwardManyToOneDescriptor
- klass = Event.group.RelatedObjectDoesNotExist
- self.assertIs(pickle.loads(pickle.dumps(klass)), klass)
- # ForwardOneToOneDescriptor
- klass = Happening.event.RelatedObjectDoesNotExist
- self.assertIs(pickle.loads(pickle.dumps(klass)), klass)
- def test_reverse_one_to_one_relatedobjectdoesnotexist_class(self):
- klass = Event.happening.RelatedObjectDoesNotExist
- self.assertIs(pickle.loads(pickle.dumps(klass)), klass)
- def test_manager_pickle(self):
- pickle.loads(pickle.dumps(Happening.objects))
- def test_model_pickle(self):
- """
- A model not defined on module level is picklable.
- """
- original = Container.SomeModel(pk=1)
- dumped = pickle.dumps(original)
- reloaded = pickle.loads(dumped)
- self.assertEqual(original, reloaded)
- # Also, deferred dynamic model works
- Container.SomeModel.objects.create(somefield=1)
- original = Container.SomeModel.objects.defer("somefield")[0]
- dumped = pickle.dumps(original)
- reloaded = pickle.loads(dumped)
- self.assertEqual(original, reloaded)
- self.assertEqual(original.somefield, reloaded.somefield)
- def test_model_pickle_m2m(self):
- """
- Test intentionally the automatically created through model.
- """
- m1 = M2MModel.objects.create()
- g1 = Group.objects.create(name="foof")
- m1.groups.add(g1)
- m2m_through = M2MModel._meta.get_field("groups").remote_field.through
- original = m2m_through.objects.get()
- dumped = pickle.dumps(original)
- reloaded = pickle.loads(dumped)
- self.assertEqual(original, reloaded)
- def test_model_pickle_dynamic(self):
- class Meta:
- proxy = True
- dynclass = type(
- "DynamicEventSubclass",
- (Event,),
- {"Meta": Meta, "__module__": Event.__module__},
- )
- original = dynclass(pk=1)
- dumped = pickle.dumps(original)
- reloaded = pickle.loads(dumped)
- self.assertEqual(original, reloaded)
- self.assertIs(reloaded.__class__, dynclass)
- def test_specialized_queryset(self):
- self.assert_pickles(Happening.objects.values("name"))
- self.assert_pickles(Happening.objects.values("name").dates("when", "year"))
- # With related field (#14515)
- self.assert_pickles(
- Event.objects.select_related("group")
- .order_by("title")
- .values_list("title", "group__name")
- )
- def test_pickle_prefetch_related_idempotence(self):
- g = Group.objects.create(name="foo")
- groups = Group.objects.prefetch_related("event_set")
- # First pickling
- groups = pickle.loads(pickle.dumps(groups))
- self.assertSequenceEqual(groups, [g])
- # Second pickling
- groups = pickle.loads(pickle.dumps(groups))
- self.assertSequenceEqual(groups, [g])
- def test_pickle_prefetch_queryset_usable_outside_of_prefetch(self):
- # Prefetch shouldn't affect the fetch-on-pickle behavior of the
- # queryset passed to it.
- Group.objects.create(name="foo")
- events = Event.objects.order_by("id")
- Group.objects.prefetch_related(models.Prefetch("event_set", queryset=events))
- with self.assertNumQueries(1):
- events2 = pickle.loads(pickle.dumps(events))
- with self.assertNumQueries(0):
- list(events2)
- def test_pickle_prefetch_queryset_still_usable(self):
- g = Group.objects.create(name="foo")
- groups = Group.objects.prefetch_related(
- models.Prefetch("event_set", queryset=Event.objects.order_by("id"))
- )
- groups2 = pickle.loads(pickle.dumps(groups))
- self.assertSequenceEqual(groups2.filter(id__gte=0), [g])
- def test_pickle_prefetch_queryset_not_evaluated(self):
- Group.objects.create(name="foo")
- groups = Group.objects.prefetch_related(
- models.Prefetch("event_set", queryset=Event.objects.order_by("id"))
- )
- list(groups) # evaluate QuerySet
- with self.assertNumQueries(0):
- pickle.loads(pickle.dumps(groups))
- def test_pickle_prefetch_related_with_m2m_and_objects_deletion(self):
- """
- #24831 -- Cached properties on ManyToOneRel created in QuerySet.delete()
- caused subsequent QuerySet pickling to fail.
- """
- g = Group.objects.create(name="foo")
- m2m = M2MModel.objects.create()
- m2m.groups.add(g)
- Group.objects.all().delete()
- m2ms = M2MModel.objects.prefetch_related("groups")
- m2ms = pickle.loads(pickle.dumps(m2ms))
- self.assertSequenceEqual(m2ms, [m2m])
- def test_pickle_boolean_expression_in_Q__queryset(self):
- group = Group.objects.create(name="group")
- Event.objects.create(title="event", group=group)
- groups = Group.objects.filter(
- models.Q(
- models.Exists(
- Event.objects.filter(group_id=models.OuterRef("id")),
- )
- ),
- )
- groups2 = pickle.loads(pickle.dumps(groups))
- self.assertSequenceEqual(groups2, [group])
- def test_pickle_exists_queryset_still_usable(self):
- group = Group.objects.create(name="group")
- Event.objects.create(title="event", group=group)
- groups = Group.objects.annotate(
- has_event=models.Exists(
- Event.objects.filter(group_id=models.OuterRef("id")),
- ),
- )
- groups2 = pickle.loads(pickle.dumps(groups))
- self.assertSequenceEqual(groups2.filter(has_event=True), [group])
- def test_pickle_exists_queryset_not_evaluated(self):
- group = Group.objects.create(name="group")
- Event.objects.create(title="event", group=group)
- groups = Group.objects.annotate(
- has_event=models.Exists(
- Event.objects.filter(group_id=models.OuterRef("id")),
- ),
- )
- list(groups) # evaluate QuerySet.
- with self.assertNumQueries(0):
- self.assert_pickles(groups)
- def test_pickle_exists_kwargs_queryset_not_evaluated(self):
- group = Group.objects.create(name="group")
- Event.objects.create(title="event", group=group)
- groups = Group.objects.annotate(
- has_event=models.Exists(
- queryset=Event.objects.filter(group_id=models.OuterRef("id")),
- ),
- )
- list(groups) # evaluate QuerySet.
- with self.assertNumQueries(0):
- self.assert_pickles(groups)
- def test_pickle_subquery_queryset_not_evaluated(self):
- group = Group.objects.create(name="group")
- Event.objects.create(title="event", group=group)
- groups = Group.objects.annotate(
- event_title=models.Subquery(
- Event.objects.filter(group_id=models.OuterRef("id")).values("title"),
- ),
- )
- list(groups) # evaluate QuerySet.
- with self.assertNumQueries(0):
- self.assert_pickles(groups)
- def test_pickle_filteredrelation(self):
- group = Group.objects.create(name="group")
- event_1 = Event.objects.create(title="Big event", group=group)
- event_2 = Event.objects.create(title="Small event", group=group)
- Happening.objects.bulk_create(
- [
- Happening(event=event_1, number1=5),
- Happening(event=event_2, number1=3),
- ]
- )
- groups = Group.objects.annotate(
- big_events=models.FilteredRelation(
- "event",
- condition=models.Q(event__title__startswith="Big"),
- ),
- ).annotate(sum_number=models.Sum("big_events__happening__number1"))
- groups_query = pickle.loads(pickle.dumps(groups.query))
- groups = Group.objects.all()
- groups.query = groups_query
- self.assertEqual(groups.get().sum_number, 5)
- def test_pickle_filteredrelation_m2m(self):
- group = Group.objects.create(name="group")
- m2mmodel = M2MModel.objects.create(added=datetime.date(2020, 1, 1))
- m2mmodel.groups.add(group)
- groups = Group.objects.annotate(
- first_m2mmodels=models.FilteredRelation(
- "m2mmodel",
- condition=models.Q(m2mmodel__added__year=2020),
- ),
- ).annotate(count_groups=models.Count("first_m2mmodels__groups"))
- groups_query = pickle.loads(pickle.dumps(groups.query))
- groups = Group.objects.all()
- groups.query = groups_query
- self.assertEqual(groups.get().count_groups, 1)
- def test_annotation_with_callable_default(self):
- # Happening.when has a callable default of datetime.datetime.now.
- qs = Happening.objects.annotate(latest_time=models.Max("when"))
- self.assert_pickles(qs)
- def test_annotation_values(self):
- qs = Happening.objects.values("name").annotate(latest_time=models.Max("when"))
- reloaded = Happening.objects.all()
- reloaded.query = pickle.loads(pickle.dumps(qs.query))
- self.assertEqual(
- reloaded.get(),
- {"name": "test", "latest_time": self.happening.when},
- )
- def test_annotation_values_list(self):
- # values_list() is reloaded to values() when using a pickled query.
- tests = [
- Happening.objects.values_list("name"),
- Happening.objects.values_list("name", flat=True),
- Happening.objects.values_list("name", named=True),
- ]
- for qs in tests:
- with self.subTest(qs._iterable_class.__name__):
- reloaded = Happening.objects.all()
- reloaded.query = pickle.loads(pickle.dumps(qs.query))
- self.assertEqual(reloaded.get(), {"name": "test"})
- def test_filter_deferred(self):
- qs = Happening.objects.all()
- qs._defer_next_filter = True
- qs = qs.filter(id=0)
- self.assert_pickles(qs)
- def test_missing_django_version_unpickling(self):
- """
- #21430 -- Verifies a warning is raised for querysets that are
- unpickled without a Django version
- """
- qs = Group.missing_django_version_objects.all()
- msg = "Pickled queryset instance's Django version is not specified."
- with self.assertRaisesMessage(RuntimeWarning, msg):
- pickle.loads(pickle.dumps(qs))
- def test_unsupported_unpickle(self):
- """
- #21430 -- Verifies a warning is raised for querysets that are
- unpickled with a different Django version than the current
- """
- qs = Group.previous_django_version_objects.all()
- msg = (
- "Pickled queryset instance's Django version 1.0 does not match "
- "the current version %s." % django.__version__
- )
- with self.assertRaisesMessage(RuntimeWarning, msg):
- pickle.loads(pickle.dumps(qs))
- def test_order_by_model_with_abstract_inheritance_and_meta_ordering(self):
- group = Group.objects.create(name="test")
- event = MyEvent.objects.create(title="test event", group=group)
- event.edition_set.create()
- self.assert_pickles(event.edition_set.order_by("event"))
- class InLookupTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- for i in range(1, 3):
- group = Group.objects.create(name="Group {}".format(i))
- cls.e1 = Event.objects.create(title="Event 1", group=group)
- def test_in_lookup_queryset_evaluation(self):
- """
- Neither pickling nor unpickling a QuerySet.query with an __in=inner_qs
- lookup should evaluate inner_qs.
- """
- events = Event.objects.filter(group__in=Group.objects.all())
- with self.assertNumQueries(0):
- dumped = pickle.dumps(events.query)
- with self.assertNumQueries(0):
- reloaded = pickle.loads(dumped)
- reloaded_events = Event.objects.none()
- reloaded_events.query = reloaded
- self.assertSequenceEqual(reloaded_events, [self.e1])
- def test_in_lookup_query_evaluation(self):
- events = Event.objects.filter(group__in=Group.objects.values("id").query)
- with self.assertNumQueries(0):
- dumped = pickle.dumps(events.query)
- with self.assertNumQueries(0):
- reloaded = pickle.loads(dumped)
- reloaded_events = Event.objects.none()
- reloaded_events.query = reloaded
- self.assertSequenceEqual(reloaded_events, [self.e1])
|