123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526 |
- import itertools
- from django.db import NotSupportedError
- from django.db.models import F
- from django.db.models.fields.tuple_lookups import (
- TupleExact,
- TupleGreaterThan,
- TupleGreaterThanOrEqual,
- TupleIn,
- TupleIsNull,
- TupleLessThan,
- TupleLessThanOrEqual,
- )
- from django.db.models.lookups import In
- from django.test import TestCase, skipUnlessDBFeature
- from .models import Contact, Customer
- class TupleLookupsTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- super().setUpTestData()
- cls.customer_1 = Customer.objects.create(customer_id=1, company="a")
- cls.customer_2 = Customer.objects.create(customer_id=1, company="b")
- cls.customer_3 = Customer.objects.create(customer_id=2, company="c")
- cls.customer_4 = Customer.objects.create(customer_id=3, company="d")
- cls.customer_5 = Customer.objects.create(customer_id=1, company="e")
- cls.contact_1 = Contact.objects.create(customer=cls.customer_1)
- cls.contact_2 = Contact.objects.create(customer=cls.customer_1)
- cls.contact_3 = Contact.objects.create(customer=cls.customer_2)
- cls.contact_4 = Contact.objects.create(customer=cls.customer_3)
- cls.contact_5 = Contact.objects.create(customer=cls.customer_1)
- cls.contact_6 = Contact.objects.create(customer=cls.customer_5)
- def test_exact(self):
- test_cases = (
- (self.customer_1, (self.contact_1, self.contact_2, self.contact_5)),
- (self.customer_2, (self.contact_3,)),
- (self.customer_3, (self.contact_4,)),
- (self.customer_4, ()),
- (self.customer_5, (self.contact_6,)),
- )
- for customer, contacts in test_cases:
- with self.subTest(
- "filter(customer=customer)",
- customer=customer,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer=customer).order_by("id"), contacts
- )
- with self.subTest(
- "filter(TupleExact)",
- customer=customer,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = (customer.customer_id, customer.company)
- lookup = TupleExact(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- def test_exact_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'exact' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer=subquery).order_by("id"), ()
- )
- def test_in(self):
- cust_1, cust_2, cust_3, cust_4, cust_5 = (
- self.customer_1,
- self.customer_2,
- self.customer_3,
- self.customer_4,
- self.customer_5,
- )
- c1, c2, c3, c4, c5, c6 = (
- self.contact_1,
- self.contact_2,
- self.contact_3,
- self.contact_4,
- self.contact_5,
- self.contact_6,
- )
- test_cases = (
- ((), ()),
- ((cust_1,), (c1, c2, c5)),
- ((cust_1, cust_2), (c1, c2, c3, c5)),
- ((cust_1, cust_2, cust_3), (c1, c2, c3, c4, c5)),
- ((cust_1, cust_2, cust_3, cust_4), (c1, c2, c3, c4, c5)),
- ((cust_1, cust_2, cust_3, cust_4, cust_5), (c1, c2, c3, c4, c5, c6)),
- )
- for customers, contacts in test_cases:
- with self.subTest(
- "filter(customer__in=customers)",
- customers=customers,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__in=customers).order_by("id"),
- contacts,
- )
- with self.subTest(
- "filter(TupleIn)",
- customers=customers,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = [(c.customer_id, c.company) for c in customers]
- lookup = TupleIn(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- @skipUnlessDBFeature("allow_sliced_subqueries_with_in")
- def test_in_subquery(self):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__in=subquery).order_by("id"),
- (self.contact_1, self.contact_2, self.contact_5),
- )
- def test_tuple_in_subquery_must_be_query(self):
- lhs = (F("customer_code"), F("company_code"))
- # If rhs is any non-Query object with an as_sql() function.
- rhs = In(F("customer_code"), [1, 2, 3])
- with self.assertRaisesMessage(
- ValueError,
- "'in' subquery lookup of ('customer_code', 'company_code') "
- "must be a Query object (received 'In')",
- ):
- TupleIn(lhs, rhs)
- def test_tuple_in_subquery_must_have_2_fields(self):
- lhs = (F("customer_code"), F("company_code"))
- rhs = Customer.objects.values_list("customer_id").query
- with self.assertRaisesMessage(
- ValueError,
- "'in' subquery lookup of ('customer_code', 'company_code') "
- "must have 2 fields (received 1)",
- ):
- TupleIn(lhs, rhs)
- def test_tuple_in_subquery(self):
- customers = Customer.objects.values_list("customer_id", "company")
- test_cases = (
- (self.customer_1, (self.contact_1, self.contact_2, self.contact_5)),
- (self.customer_2, (self.contact_3,)),
- (self.customer_3, (self.contact_4,)),
- (self.customer_4, ()),
- (self.customer_5, (self.contact_6,)),
- )
- for customer, contacts in test_cases:
- lhs = (F("customer_code"), F("company_code"))
- rhs = customers.filter(id=customer.id).query
- lookup = TupleIn(lhs, rhs)
- qs = Contact.objects.filter(lookup).order_by("id")
- with self.subTest(customer=customer.id, query=str(qs.query)):
- self.assertSequenceEqual(qs, contacts)
- def test_tuple_in_rhs_must_be_collection_of_tuples_or_lists(self):
- test_cases = (
- (1, 2, 3),
- ((1, 2), (3, 4), None),
- )
- for rhs in test_cases:
- with self.subTest(rhs=rhs):
- with self.assertRaisesMessage(
- ValueError,
- "'in' lookup of ('customer_code', 'company_code') "
- "must be a collection of tuples or lists",
- ):
- TupleIn((F("customer_code"), F("company_code")), rhs)
- def test_tuple_in_rhs_must_have_2_elements_each(self):
- test_cases = (
- ((),),
- ((1,),),
- ((1, 2, 3),),
- )
- for rhs in test_cases:
- with self.subTest(rhs=rhs):
- with self.assertRaisesMessage(
- ValueError,
- "'in' lookup of ('customer_code', 'company_code') "
- "must have 2 elements each",
- ):
- TupleIn((F("customer_code"), F("company_code")), rhs)
- def test_lt(self):
- c1, c2, c3, c4, c5, c6 = (
- self.contact_1,
- self.contact_2,
- self.contact_3,
- self.contact_4,
- self.contact_5,
- self.contact_6,
- )
- test_cases = (
- (self.customer_1, ()),
- (self.customer_2, (c1, c2, c5)),
- (self.customer_5, (c1, c2, c3, c5)),
- (self.customer_3, (c1, c2, c3, c5, c6)),
- (self.customer_4, (c1, c2, c3, c4, c5, c6)),
- )
- for customer, contacts in test_cases:
- with self.subTest(
- "filter(customer__lt=customer)",
- customer=customer,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__lt=customer).order_by("id"),
- contacts,
- )
- with self.subTest(
- "filter(TupleLessThan)",
- customer=customer,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = (customer.customer_id, customer.company)
- lookup = TupleLessThan(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- def test_lt_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'lt' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__lt=subquery).order_by("id"), ()
- )
- def test_lte(self):
- c1, c2, c3, c4, c5, c6 = (
- self.contact_1,
- self.contact_2,
- self.contact_3,
- self.contact_4,
- self.contact_5,
- self.contact_6,
- )
- test_cases = (
- (self.customer_1, (c1, c2, c5)),
- (self.customer_2, (c1, c2, c3, c5)),
- (self.customer_5, (c1, c2, c3, c5, c6)),
- (self.customer_3, (c1, c2, c3, c4, c5, c6)),
- (self.customer_4, (c1, c2, c3, c4, c5, c6)),
- )
- for customer, contacts in test_cases:
- with self.subTest(
- "filter(customer__lte=customer)",
- customer=customer,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__lte=customer).order_by("id"),
- contacts,
- )
- with self.subTest(
- "filter(TupleLessThanOrEqual)",
- customer=customer,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = (customer.customer_id, customer.company)
- lookup = TupleLessThanOrEqual(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- def test_lte_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'lte' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__lte=subquery).order_by("id"), ()
- )
- def test_gt(self):
- test_cases = (
- (self.customer_1, (self.contact_3, self.contact_4, self.contact_6)),
- (self.customer_2, (self.contact_4, self.contact_6)),
- (self.customer_5, (self.contact_4,)),
- (self.customer_3, ()),
- (self.customer_4, ()),
- )
- for customer, contacts in test_cases:
- with self.subTest(
- "filter(customer__gt=customer)",
- customer=customer,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__gt=customer).order_by("id"),
- contacts,
- )
- with self.subTest(
- "filter(TupleGreaterThan)",
- customer=customer,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = (customer.customer_id, customer.company)
- lookup = TupleGreaterThan(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- def test_gt_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'gt' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__gt=subquery).order_by("id"), ()
- )
- def test_gte(self):
- c1, c2, c3, c4, c5, c6 = (
- self.contact_1,
- self.contact_2,
- self.contact_3,
- self.contact_4,
- self.contact_5,
- self.contact_6,
- )
- test_cases = (
- (self.customer_1, (c1, c2, c3, c4, c5, c6)),
- (self.customer_2, (c3, c4, c6)),
- (self.customer_5, (c4, c6)),
- (self.customer_3, (c4,)),
- (self.customer_4, ()),
- )
- for customer, contacts in test_cases:
- with self.subTest(
- "filter(customer__gte=customer)",
- customer=customer,
- contacts=contacts,
- ):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__gte=customer).order_by("pk"),
- contacts,
- )
- with self.subTest(
- "filter(TupleGreaterThanOrEqual)",
- customer=customer,
- contacts=contacts,
- ):
- lhs = (F("customer_code"), F("company_code"))
- rhs = (customer.customer_id, customer.company)
- lookup = TupleGreaterThanOrEqual(lhs, rhs)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"), contacts
- )
- def test_gte_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'gte' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=self.customer_1.id)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__gte=subquery).order_by("id"), ()
- )
- def test_isnull(self):
- contacts = (
- self.contact_1,
- self.contact_2,
- self.contact_3,
- self.contact_4,
- self.contact_5,
- self.contact_6,
- )
- with self.subTest("filter(customer__isnull=True)"):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__isnull=True).order_by("id"),
- (),
- )
- with self.subTest("filter(TupleIsNull(True))"):
- lhs = (F("customer_code"), F("company_code"))
- lookup = TupleIsNull(lhs, True)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"),
- (),
- )
- with self.subTest("filter(customer__isnull=False)"):
- self.assertSequenceEqual(
- Contact.objects.filter(customer__isnull=False).order_by("id"),
- contacts,
- )
- with self.subTest("filter(TupleIsNull(False))"):
- lhs = (F("customer_code"), F("company_code"))
- lookup = TupleIsNull(lhs, False)
- self.assertSequenceEqual(
- Contact.objects.filter(lookup).order_by("id"),
- contacts,
- )
- def test_isnull_subquery(self):
- with self.assertRaisesMessage(
- NotSupportedError, "'isnull' doesn't support multi-column subqueries."
- ):
- subquery = Customer.objects.filter(id=0)[:1]
- self.assertSequenceEqual(
- Contact.objects.filter(customer__isnull=subquery).order_by("id"), ()
- )
- def test_lookup_errors(self):
- m_2_elements = "'%s' lookup of 'customer' must have 2 elements"
- m_2_elements_each = "'in' lookup of 'customer' must have 2 elements each"
- test_cases = (
- ({"customer": 1}, m_2_elements % "exact"),
- ({"customer": (1, 2, 3)}, m_2_elements % "exact"),
- ({"customer__in": (1, 2, 3)}, m_2_elements_each),
- ({"customer__in": ("foo", "bar")}, m_2_elements_each),
- ({"customer__gt": 1}, m_2_elements % "gt"),
- ({"customer__gt": (1, 2, 3)}, m_2_elements % "gt"),
- ({"customer__gte": 1}, m_2_elements % "gte"),
- ({"customer__gte": (1, 2, 3)}, m_2_elements % "gte"),
- ({"customer__lt": 1}, m_2_elements % "lt"),
- ({"customer__lt": (1, 2, 3)}, m_2_elements % "lt"),
- ({"customer__lte": 1}, m_2_elements % "lte"),
- ({"customer__lte": (1, 2, 3)}, m_2_elements % "lte"),
- )
- for kwargs, message in test_cases:
- with (
- self.subTest(kwargs=kwargs),
- self.assertRaisesMessage(ValueError, message),
- ):
- Contact.objects.get(**kwargs)
- def test_tuple_lookup_names(self):
- test_cases = (
- (TupleExact, "exact"),
- (TupleGreaterThan, "gt"),
- (TupleGreaterThanOrEqual, "gte"),
- (TupleLessThan, "lt"),
- (TupleLessThanOrEqual, "lte"),
- (TupleIn, "in"),
- (TupleIsNull, "isnull"),
- )
- for lookup_class, lookup_name in test_cases:
- with self.subTest(lookup_name):
- self.assertEqual(lookup_class.lookup_name, lookup_name)
- def test_tuple_lookup_rhs_must_be_tuple_or_list(self):
- test_cases = itertools.product(
- (
- TupleExact,
- TupleGreaterThan,
- TupleGreaterThanOrEqual,
- TupleLessThan,
- TupleLessThanOrEqual,
- TupleIn,
- ),
- (
- 0,
- 1,
- None,
- True,
- False,
- {"foo": "bar"},
- ),
- )
- for lookup_cls, rhs in test_cases:
- lookup_name = lookup_cls.lookup_name
- with self.subTest(lookup_name=lookup_name, rhs=rhs):
- with self.assertRaisesMessage(
- ValueError,
- f"'{lookup_name}' lookup of ('customer_code', 'company_code') "
- "must be a tuple or a list",
- ):
- lookup_cls((F("customer_code"), F("company_code")), rhs)
- def test_tuple_lookup_rhs_must_have_2_elements(self):
- test_cases = itertools.product(
- (
- TupleExact,
- TupleGreaterThan,
- TupleGreaterThanOrEqual,
- TupleLessThan,
- TupleLessThanOrEqual,
- ),
- (
- [],
- [1],
- [1, 2, 3],
- (),
- (1,),
- (1, 2, 3),
- ),
- )
- for lookup_cls, rhs in test_cases:
- lookup_name = lookup_cls.lookup_name
- with self.subTest(lookup_name=lookup_name, rhs=rhs):
- with self.assertRaisesMessage(
- ValueError,
- f"'{lookup_name}' lookup of ('customer_code', 'company_code') "
- "must have 2 elements",
- ):
- lookup_cls((F("customer_code"), F("company_code")), rhs)
|