|
@@ -12,8 +12,8 @@ from django.db.models.aggregates import (
|
|
|
Avg, Count, Max, Min, StdDev, Sum, Variance,
|
|
|
)
|
|
|
from django.db.models.expressions import (
|
|
|
- Case, Col, ExpressionWrapper, F, Func, OrderBy, Random, RawSQL, Ref, Value,
|
|
|
- When,
|
|
|
+ Case, Col, Exists, ExpressionWrapper, F, Func, OrderBy, OuterRef, Random,
|
|
|
+ RawSQL, Ref, Subquery, Value, When,
|
|
|
)
|
|
|
from django.db.models.functions import (
|
|
|
Coalesce, Concat, Length, Lower, Substr, Upper,
|
|
@@ -32,15 +32,15 @@ from .models import (
|
|
|
class BasicExpressionsTests(TestCase):
|
|
|
@classmethod
|
|
|
def setUpTestData(cls):
|
|
|
- Company.objects.create(
|
|
|
+ cls.example_inc = Company.objects.create(
|
|
|
name="Example Inc.", num_employees=2300, num_chairs=5,
|
|
|
ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10)
|
|
|
)
|
|
|
- Company.objects.create(
|
|
|
+ cls.foobar_ltd = Company.objects.create(
|
|
|
name="Foobar Ltd.", num_employees=3, num_chairs=4,
|
|
|
ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20)
|
|
|
)
|
|
|
- Company.objects.create(
|
|
|
+ cls.gmbh = Company.objects.create(
|
|
|
name="Test GmbH", num_employees=32, num_chairs=1,
|
|
|
ceo=Employee.objects.create(firstname="Max", lastname="Mustermann", salary=30)
|
|
|
)
|
|
@@ -387,6 +387,136 @@ class BasicExpressionsTests(TestCase):
|
|
|
)
|
|
|
self.assertEqual(str(qs.query).count('JOIN'), 2)
|
|
|
|
|
|
+ def test_outerref(self):
|
|
|
+ inner = Company.objects.filter(point_of_contact=OuterRef('pk'))
|
|
|
+ msg = (
|
|
|
+ 'This queryset contains a reference to an outer query and may only '
|
|
|
+ 'be used in a subquery.'
|
|
|
+ )
|
|
|
+ with self.assertRaisesMessage(ValueError, msg):
|
|
|
+ inner.exists()
|
|
|
+
|
|
|
+ outer = Employee.objects.annotate(is_point_of_contact=Exists(inner))
|
|
|
+ self.assertIs(outer.exists(), True)
|
|
|
+
|
|
|
+ def test_subquery(self):
|
|
|
+ Company.objects.filter(name='Example Inc.').update(
|
|
|
+ point_of_contact=Employee.objects.get(firstname='Joe', lastname='Smith'),
|
|
|
+ ceo=Employee.objects.get(firstname='Max', lastname='Mustermann'),
|
|
|
+ )
|
|
|
+ Employee.objects.create(firstname='Bob', lastname='Brown', salary=40)
|
|
|
+ qs = Employee.objects.annotate(
|
|
|
+ is_point_of_contact=Exists(Company.objects.filter(point_of_contact=OuterRef('pk'))),
|
|
|
+ is_not_point_of_contact=~Exists(Company.objects.filter(point_of_contact=OuterRef('pk'))),
|
|
|
+ is_ceo_of_small_company=Exists(Company.objects.filter(num_employees__lt=200, ceo=OuterRef('pk'))),
|
|
|
+ is_ceo_small_2=~~Exists(Company.objects.filter(num_employees__lt=200, ceo=OuterRef('pk'))),
|
|
|
+ largest_company=Subquery(Company.objects.order_by('-num_employees').filter(
|
|
|
+ models.Q(ceo=OuterRef('pk')) | models.Q(point_of_contact=OuterRef('pk'))
|
|
|
+ ).values('name')[:1], output_field=models.CharField())
|
|
|
+ ).values(
|
|
|
+ 'firstname',
|
|
|
+ 'is_point_of_contact',
|
|
|
+ 'is_not_point_of_contact',
|
|
|
+ 'is_ceo_of_small_company',
|
|
|
+ 'is_ceo_small_2',
|
|
|
+ 'largest_company',
|
|
|
+ ).order_by('firstname')
|
|
|
+
|
|
|
+ results = list(qs)
|
|
|
+ # Could use Coalesce(subq, Value('')) instead except for the bug in
|
|
|
+ # cx_Oracle mentioned in #23843.
|
|
|
+ bob = results[0]
|
|
|
+ if bob['largest_company'] == '' and connection.features.interprets_empty_strings_as_nulls:
|
|
|
+ bob['largest_company'] = None
|
|
|
+
|
|
|
+ self.assertEqual(results, [
|
|
|
+ {
|
|
|
+ 'firstname': 'Bob',
|
|
|
+ 'is_point_of_contact': False,
|
|
|
+ 'is_not_point_of_contact': True,
|
|
|
+ 'is_ceo_of_small_company': False,
|
|
|
+ 'is_ceo_small_2': False,
|
|
|
+ 'largest_company': None,
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'firstname': 'Frank',
|
|
|
+ 'is_point_of_contact': False,
|
|
|
+ 'is_not_point_of_contact': True,
|
|
|
+ 'is_ceo_of_small_company': True,
|
|
|
+ 'is_ceo_small_2': True,
|
|
|
+ 'largest_company': 'Foobar Ltd.',
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'firstname': 'Joe',
|
|
|
+ 'is_point_of_contact': True,
|
|
|
+ 'is_not_point_of_contact': False,
|
|
|
+ 'is_ceo_of_small_company': False,
|
|
|
+ 'is_ceo_small_2': False,
|
|
|
+ 'largest_company': 'Example Inc.',
|
|
|
+ },
|
|
|
+ {
|
|
|
+ 'firstname': 'Max',
|
|
|
+ 'is_point_of_contact': False,
|
|
|
+ 'is_not_point_of_contact': True,
|
|
|
+ 'is_ceo_of_small_company': True,
|
|
|
+ 'is_ceo_small_2': True,
|
|
|
+ 'largest_company': 'Example Inc.'
|
|
|
+ }
|
|
|
+ ])
|
|
|
+ # A less elegant way to write the same query: this uses a LEFT OUTER
|
|
|
+ # JOIN and an IS NULL, inside a WHERE NOT IN which is probably less
|
|
|
+ # efficient than EXISTS.
|
|
|
+ self.assertCountEqual(
|
|
|
+ qs.filter(is_point_of_contact=True).values('pk'),
|
|
|
+ Employee.objects.exclude(company_point_of_contact_set=None).values('pk')
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_in_subquery(self):
|
|
|
+ # This is a contrived test (and you really wouldn't write this query),
|
|
|
+ # but it is a succinct way to test the __in=Subquery() construct.
|
|
|
+ small_companies = Company.objects.filter(num_employees__lt=200).values('pk')
|
|
|
+ subquery_test = Company.objects.filter(pk__in=Subquery(small_companies))
|
|
|
+ self.assertCountEqual(subquery_test, [self.foobar_ltd, self.gmbh])
|
|
|
+ subquery_test2 = Company.objects.filter(pk=Subquery(small_companies.filter(num_employees=3)))
|
|
|
+ self.assertCountEqual(subquery_test2, [self.foobar_ltd])
|
|
|
+
|
|
|
+ def test_nested_subquery(self):
|
|
|
+ inner = Company.objects.filter(point_of_contact=OuterRef('pk'))
|
|
|
+ outer = Employee.objects.annotate(is_point_of_contact=Exists(inner))
|
|
|
+ contrived = Employee.objects.annotate(
|
|
|
+ is_point_of_contact=Subquery(
|
|
|
+ outer.filter(pk=OuterRef('pk')).values('is_point_of_contact'),
|
|
|
+ output_field=models.BooleanField(),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ self.assertCountEqual(contrived.values_list(), outer.values_list())
|
|
|
+
|
|
|
+ def test_nested_subquery_outer_ref_2(self):
|
|
|
+ first = Time.objects.create(time='09:00')
|
|
|
+ second = Time.objects.create(time='17:00')
|
|
|
+ third = Time.objects.create(time='21:00')
|
|
|
+ SimulationRun.objects.bulk_create([
|
|
|
+ SimulationRun(start=first, end=second, midpoint='12:00'),
|
|
|
+ SimulationRun(start=first, end=third, midpoint='15:00'),
|
|
|
+ SimulationRun(start=second, end=first, midpoint='00:00'),
|
|
|
+ ])
|
|
|
+ inner = Time.objects.filter(time=OuterRef(OuterRef('time')), pk=OuterRef('start')).values('time')
|
|
|
+ middle = SimulationRun.objects.annotate(other=Subquery(inner)).values('other')[:1]
|
|
|
+ outer = Time.objects.annotate(other=Subquery(middle, output_field=models.TimeField()))
|
|
|
+ # This is a contrived example. It exercises the double OuterRef form.
|
|
|
+ self.assertCountEqual(outer, [first, second, third])
|
|
|
+
|
|
|
+ def test_annotations_within_subquery(self):
|
|
|
+ Company.objects.filter(num_employees__lt=50).update(ceo=Employee.objects.get(firstname='Frank'))
|
|
|
+ inner = Company.objects.filter(
|
|
|
+ ceo=OuterRef('pk')
|
|
|
+ ).values('ceo').annotate(total_employees=models.Sum('num_employees')).values('total_employees')
|
|
|
+ outer = Employee.objects.annotate(total_employees=Subquery(inner)).filter(salary__lte=Subquery(inner))
|
|
|
+ self.assertSequenceEqual(
|
|
|
+ outer.order_by('-total_employees').values('salary', 'total_employees'),
|
|
|
+ [{'salary': 10, 'total_employees': 2300}, {'salary': 20, 'total_employees': 35}],
|
|
|
+ )
|
|
|
+
|
|
|
|
|
|
class IterableLookupInnerExpressionsTests(TestCase):
|
|
|
@classmethod
|