1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569 |
- import datetime
- import pickle
- import unittest
- import uuid
- from collections import namedtuple
- from copy import deepcopy
- from decimal import Decimal
- from unittest import mock
- from django.core.exceptions import FieldError
- from django.db import DatabaseError, NotSupportedError, connection
- from django.db.models import (
- AutoField,
- Avg,
- BinaryField,
- BooleanField,
- Case,
- CharField,
- Count,
- DateField,
- DateTimeField,
- DecimalField,
- DurationField,
- Exists,
- Expression,
- ExpressionList,
- ExpressionWrapper,
- F,
- FloatField,
- Func,
- IntegerField,
- Max,
- Min,
- Model,
- OrderBy,
- OuterRef,
- Q,
- StdDev,
- Subquery,
- Sum,
- TimeField,
- UUIDField,
- Value,
- Variance,
- When,
- )
- from django.db.models.expressions import (
- Col,
- Combinable,
- CombinedExpression,
- RawSQL,
- Ref,
- )
- from django.db.models.functions import (
- Coalesce,
- Concat,
- Left,
- Length,
- Lower,
- Substr,
- Upper,
- )
- from django.db.models.sql import constants
- from django.db.models.sql.datastructures import Join
- from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
- from django.test.utils import (
- Approximate,
- CaptureQueriesContext,
- isolate_apps,
- register_lookup,
- )
- from django.utils.deprecation import RemovedInDjango50Warning
- from django.utils.functional import SimpleLazyObject
- from .models import (
- UUID,
- UUIDPK,
- Company,
- Employee,
- Experiment,
- Manager,
- Number,
- RemoteEmployee,
- Result,
- SimulationRun,
- Time,
- )
- class BasicExpressionsTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.example_inc = Company.objects.create(
- name="Example Inc.",
- num_employees=2300,
- num_chairs=5,
- ceo=Employee.objects.create(firstname="Joe", lastname="Smith", salary=10),
- )
- cls.foobar_ltd = Company.objects.create(
- name="Foobar Ltd.",
- num_employees=3,
- num_chairs=4,
- based_in_eu=True,
- ceo=Employee.objects.create(firstname="Frank", lastname="Meyer", salary=20),
- )
- cls.max = Employee.objects.create(
- firstname="Max", lastname="Mustermann", salary=30
- )
- cls.gmbh = Company.objects.create(
- name="Test GmbH", num_employees=32, num_chairs=1, ceo=cls.max
- )
- def setUp(self):
- self.company_query = Company.objects.values(
- "name", "num_employees", "num_chairs"
- ).order_by("name", "num_employees", "num_chairs")
- def test_annotate_values_aggregate(self):
- companies = (
- Company.objects.annotate(
- salaries=F("ceo__salary"),
- )
- .values("num_employees", "salaries")
- .aggregate(
- result=Sum(
- F("salaries") + F("num_employees"), output_field=IntegerField()
- ),
- )
- )
- self.assertEqual(companies["result"], 2395)
- def test_annotate_values_filter(self):
- companies = (
- Company.objects.annotate(
- foo=RawSQL("%s", ["value"]),
- )
- .filter(foo="value")
- .order_by("name")
- )
- self.assertSequenceEqual(
- companies,
- [self.example_inc, self.foobar_ltd, self.gmbh],
- )
- def test_annotate_values_count(self):
- companies = Company.objects.annotate(foo=RawSQL("%s", ["value"]))
- self.assertEqual(companies.count(), 3)
- @skipUnlessDBFeature("supports_boolean_expr_in_select_clause")
- def test_filtering_on_annotate_that_uses_q(self):
- self.assertEqual(
- Company.objects.annotate(
- num_employees_check=ExpressionWrapper(
- Q(num_employees__gt=3), output_field=BooleanField()
- )
- )
- .filter(num_employees_check=True)
- .count(),
- 2,
- )
- def test_filtering_on_q_that_is_boolean(self):
- self.assertEqual(
- Company.objects.filter(
- ExpressionWrapper(Q(num_employees__gt=3), output_field=BooleanField())
- ).count(),
- 2,
- )
- def test_filtering_on_rawsql_that_is_boolean(self):
- self.assertEqual(
- Company.objects.filter(
- RawSQL("num_employees > %s", (3,), output_field=BooleanField()),
- ).count(),
- 2,
- )
- def test_filter_inter_attribute(self):
- # We can filter on attribute relationships on same model obj, e.g.
- # find companies where the number of employees is greater
- # than the number of chairs.
- self.assertSequenceEqual(
- self.company_query.filter(num_employees__gt=F("num_chairs")),
- [
- {
- "num_chairs": 5,
- "name": "Example Inc.",
- "num_employees": 2300,
- },
- {"num_chairs": 1, "name": "Test GmbH", "num_employees": 32},
- ],
- )
- def test_update(self):
- # We can set one field to have the value of another field
- # Make sure we have enough chairs
- self.company_query.update(num_chairs=F("num_employees"))
- self.assertSequenceEqual(
- self.company_query,
- [
- {"num_chairs": 2300, "name": "Example Inc.", "num_employees": 2300},
- {"num_chairs": 3, "name": "Foobar Ltd.", "num_employees": 3},
- {"num_chairs": 32, "name": "Test GmbH", "num_employees": 32},
- ],
- )
- def test_arithmetic(self):
- # We can perform arithmetic operations in expressions
- # Make sure we have 2 spare chairs
- self.company_query.update(num_chairs=F("num_employees") + 2)
- self.assertSequenceEqual(
- self.company_query,
- [
- {"num_chairs": 2302, "name": "Example Inc.", "num_employees": 2300},
- {"num_chairs": 5, "name": "Foobar Ltd.", "num_employees": 3},
- {"num_chairs": 34, "name": "Test GmbH", "num_employees": 32},
- ],
- )
- def test_order_of_operations(self):
- # Law of order of operations is followed
- self.company_query.update(
- num_chairs=F("num_employees") + 2 * F("num_employees")
- )
- self.assertSequenceEqual(
- self.company_query,
- [
- {"num_chairs": 6900, "name": "Example Inc.", "num_employees": 2300},
- {"num_chairs": 9, "name": "Foobar Ltd.", "num_employees": 3},
- {"num_chairs": 96, "name": "Test GmbH", "num_employees": 32},
- ],
- )
- def test_parenthesis_priority(self):
- # Law of order of operations can be overridden by parentheses
- self.company_query.update(
- num_chairs=(F("num_employees") + 2) * F("num_employees")
- )
- self.assertSequenceEqual(
- self.company_query,
- [
- {"num_chairs": 5294600, "name": "Example Inc.", "num_employees": 2300},
- {"num_chairs": 15, "name": "Foobar Ltd.", "num_employees": 3},
- {"num_chairs": 1088, "name": "Test GmbH", "num_employees": 32},
- ],
- )
- def test_update_with_fk(self):
- # ForeignKey can become updated with the value of another ForeignKey.
- self.assertEqual(Company.objects.update(point_of_contact=F("ceo")), 3)
- self.assertQuerysetEqual(
- Company.objects.all(),
- ["Joe Smith", "Frank Meyer", "Max Mustermann"],
- lambda c: str(c.point_of_contact),
- ordered=False,
- )
- def test_update_with_none(self):
- Number.objects.create(integer=1, float=1.0)
- Number.objects.create(integer=2)
- Number.objects.filter(float__isnull=False).update(float=Value(None))
- self.assertQuerysetEqual(
- Number.objects.all(), [None, None], lambda n: n.float, ordered=False
- )
- def test_filter_with_join(self):
- # F Expressions can also span joins
- Company.objects.update(point_of_contact=F("ceo"))
- c = Company.objects.first()
- c.point_of_contact = Employee.objects.create(
- firstname="Guido", lastname="van Rossum"
- )
- c.save()
- self.assertQuerysetEqual(
- Company.objects.filter(ceo__firstname=F("point_of_contact__firstname")),
- ["Foobar Ltd.", "Test GmbH"],
- lambda c: c.name,
- ordered=False,
- )
- Company.objects.exclude(ceo__firstname=F("point_of_contact__firstname")).update(
- name="foo"
- )
- self.assertEqual(
- Company.objects.exclude(ceo__firstname=F("point_of_contact__firstname"))
- .get()
- .name,
- "foo",
- )
- msg = "Joined field references are not permitted in this query"
- with self.assertRaisesMessage(FieldError, msg):
- Company.objects.exclude(
- ceo__firstname=F("point_of_contact__firstname")
- ).update(name=F("point_of_contact__lastname"))
- def test_object_update(self):
- # F expressions can be used to update attributes on single objects
- self.gmbh.num_employees = F("num_employees") + 4
- self.gmbh.save()
- self.gmbh.refresh_from_db()
- self.assertEqual(self.gmbh.num_employees, 36)
- def test_new_object_save(self):
- # We should be able to use Funcs when inserting new data
- test_co = Company(
- name=Lower(Value("UPPER")), num_employees=32, num_chairs=1, ceo=self.max
- )
- test_co.save()
- test_co.refresh_from_db()
- self.assertEqual(test_co.name, "upper")
- def test_new_object_create(self):
- test_co = Company.objects.create(
- name=Lower(Value("UPPER")), num_employees=32, num_chairs=1, ceo=self.max
- )
- test_co.refresh_from_db()
- self.assertEqual(test_co.name, "upper")
- def test_object_create_with_aggregate(self):
- # Aggregates are not allowed when inserting new data
- msg = (
- "Aggregate functions are not allowed in this query "
- "(num_employees=Max(Value(1)))."
- )
- with self.assertRaisesMessage(FieldError, msg):
- Company.objects.create(
- name="Company",
- num_employees=Max(Value(1)),
- num_chairs=1,
- ceo=Employee.objects.create(
- firstname="Just", lastname="Doit", salary=30
- ),
- )
- def test_object_update_fk(self):
- # F expressions cannot be used to update attributes which are foreign
- # keys, or attributes which involve joins.
- test_gmbh = Company.objects.get(pk=self.gmbh.pk)
- msg = 'F(ceo)": "Company.point_of_contact" must be a "Employee" instance.'
- with self.assertRaisesMessage(ValueError, msg):
- test_gmbh.point_of_contact = F("ceo")
- test_gmbh.point_of_contact = self.gmbh.ceo
- test_gmbh.save()
- test_gmbh.name = F("ceo__lastname")
- msg = "Joined field references are not permitted in this query"
- with self.assertRaisesMessage(FieldError, msg):
- test_gmbh.save()
- def test_update_inherited_field_value(self):
- msg = "Joined field references are not permitted in this query"
- with self.assertRaisesMessage(FieldError, msg):
- RemoteEmployee.objects.update(adjusted_salary=F("salary") * 5)
- def test_object_update_unsaved_objects(self):
- # F expressions cannot be used to update attributes on objects which do
- # not yet exist in the database
- acme = Company(
- name="The Acme Widget Co.", num_employees=12, num_chairs=5, ceo=self.max
- )
- acme.num_employees = F("num_employees") + 16
- msg = (
- 'Failed to insert expression "Col(expressions_company, '
- 'expressions.Company.num_employees) + Value(16)" on '
- "expressions.Company.num_employees. F() expressions can only be "
- "used to update, not to insert."
- )
- with self.assertRaisesMessage(ValueError, msg):
- acme.save()
- acme.num_employees = 12
- acme.name = Lower(F("name"))
- msg = (
- 'Failed to insert expression "Lower(Col(expressions_company, '
- 'expressions.Company.name))" on expressions.Company.name. F() '
- "expressions can only be used to update, not to insert."
- )
- with self.assertRaisesMessage(ValueError, msg):
- acme.save()
- def test_ticket_11722_iexact_lookup(self):
- Employee.objects.create(firstname="John", lastname="Doe")
- test = Employee.objects.create(firstname="Test", lastname="test")
- queryset = Employee.objects.filter(firstname__iexact=F("lastname"))
- self.assertSequenceEqual(queryset, [test])
- def test_ticket_16731_startswith_lookup(self):
- Employee.objects.create(firstname="John", lastname="Doe")
- e2 = Employee.objects.create(firstname="Jack", lastname="Jackson")
- e3 = Employee.objects.create(firstname="Jack", lastname="jackson")
- self.assertSequenceEqual(
- Employee.objects.filter(lastname__startswith=F("firstname")),
- [e2, e3] if connection.features.has_case_insensitive_like else [e2],
- )
- qs = Employee.objects.filter(lastname__istartswith=F("firstname")).order_by(
- "pk"
- )
- self.assertSequenceEqual(qs, [e2, e3])
- def test_ticket_18375_join_reuse(self):
- # Reverse multijoin F() references and the lookup target the same join.
- # Pre #18375 the F() join was generated first and the lookup couldn't
- # reuse that join.
- qs = Employee.objects.filter(
- company_ceo_set__num_chairs=F("company_ceo_set__num_employees")
- )
- self.assertEqual(str(qs.query).count("JOIN"), 1)
- def test_ticket_18375_kwarg_ordering(self):
- # The next query was dict-randomization dependent - if the "gte=1"
- # was seen first, then the F() will reuse the join generated by the
- # gte lookup, if F() was seen first, then it generated a join the
- # other lookups could not reuse.
- qs = Employee.objects.filter(
- company_ceo_set__num_chairs=F("company_ceo_set__num_employees"),
- company_ceo_set__num_chairs__gte=1,
- )
- self.assertEqual(str(qs.query).count("JOIN"), 1)
- def test_ticket_18375_kwarg_ordering_2(self):
- # Another similar case for F() than above. Now we have the same join
- # in two filter kwargs, one in the lhs lookup, one in F. Here pre
- # #18375 the amount of joins generated was random if dict
- # randomization was enabled, that is the generated query dependent
- # on which clause was seen first.
- qs = Employee.objects.filter(
- company_ceo_set__num_employees=F("pk"),
- pk=F("company_ceo_set__num_employees"),
- )
- self.assertEqual(str(qs.query).count("JOIN"), 1)
- def test_ticket_18375_chained_filters(self):
- # F() expressions do not reuse joins from previous filter.
- qs = Employee.objects.filter(company_ceo_set__num_employees=F("pk")).filter(
- company_ceo_set__num_employees=F("company_ceo_set__num_employees")
- )
- self.assertEqual(str(qs.query).count("JOIN"), 2)
- def test_order_by_exists(self):
- mary = Employee.objects.create(
- firstname="Mary", lastname="Mustermann", salary=20
- )
- mustermanns_by_seniority = Employee.objects.filter(
- lastname="Mustermann"
- ).order_by(
- # Order by whether the employee is the CEO of a company
- Exists(Company.objects.filter(ceo=OuterRef("pk"))).desc()
- )
- self.assertSequenceEqual(mustermanns_by_seniority, [self.max, mary])
- def test_order_by_multiline_sql(self):
- raw_order_by = (
- RawSQL(
- """
- CASE WHEN num_employees > 1000
- THEN num_chairs
- ELSE 0 END
- """,
- [],
- ).desc(),
- RawSQL(
- """
- CASE WHEN num_chairs > 1
- THEN 1
- ELSE 0 END
- """,
- [],
- ).asc(),
- )
- for qs in (
- Company.objects.all(),
- Company.objects.distinct(),
- ):
- with self.subTest(qs=qs):
- self.assertSequenceEqual(
- qs.order_by(*raw_order_by),
- [self.example_inc, self.gmbh, self.foobar_ltd],
- )
- def test_outerref(self):
- inner = Company.objects.filter(point_of_contact=OuterRef("pk"))
- msg = (
- "This queryset contains a reference to an outer query and may only "
- "be used in a subquery."
- )
- with self.assertRaisesMessage(ValueError, msg):
- inner.exists()
- outer = Employee.objects.annotate(is_point_of_contact=Exists(inner))
- self.assertIs(outer.exists(), True)
- def test_exist_single_field_output_field(self):
- queryset = Company.objects.values("pk")
- self.assertIsInstance(Exists(queryset).output_field, BooleanField)
- def test_subquery(self):
- Company.objects.filter(name="Example Inc.").update(
- point_of_contact=Employee.objects.get(firstname="Joe", lastname="Smith"),
- ceo=self.max,
- )
- Employee.objects.create(firstname="Bob", lastname="Brown", salary=40)
- qs = (
- Employee.objects.annotate(
- is_point_of_contact=Exists(
- Company.objects.filter(point_of_contact=OuterRef("pk"))
- ),
- is_not_point_of_contact=~Exists(
- Company.objects.filter(point_of_contact=OuterRef("pk"))
- ),
- is_ceo_of_small_company=Exists(
- Company.objects.filter(num_employees__lt=200, ceo=OuterRef("pk"))
- ),
- is_ceo_small_2=~~Exists(
- Company.objects.filter(num_employees__lt=200, ceo=OuterRef("pk"))
- ),
- largest_company=Subquery(
- Company.objects.order_by("-num_employees")
- .filter(Q(ceo=OuterRef("pk")) | Q(point_of_contact=OuterRef("pk")))
- .values("name")[:1],
- output_field=CharField(),
- ),
- )
- .values(
- "firstname",
- "is_point_of_contact",
- "is_not_point_of_contact",
- "is_ceo_of_small_company",
- "is_ceo_small_2",
- "largest_company",
- )
- .order_by("firstname")
- )
- results = list(qs)
- # Could use Coalesce(subq, Value('')) instead except for the bug in
- # cx_Oracle mentioned in #23843.
- bob = results[0]
- if (
- bob["largest_company"] == ""
- and connection.features.interprets_empty_strings_as_nulls
- ):
- bob["largest_company"] = None
- self.assertEqual(
- results,
- [
- {
- "firstname": "Bob",
- "is_point_of_contact": False,
- "is_not_point_of_contact": True,
- "is_ceo_of_small_company": False,
- "is_ceo_small_2": False,
- "largest_company": None,
- },
- {
- "firstname": "Frank",
- "is_point_of_contact": False,
- "is_not_point_of_contact": True,
- "is_ceo_of_small_company": True,
- "is_ceo_small_2": True,
- "largest_company": "Foobar Ltd.",
- },
- {
- "firstname": "Joe",
- "is_point_of_contact": True,
- "is_not_point_of_contact": False,
- "is_ceo_of_small_company": False,
- "is_ceo_small_2": False,
- "largest_company": "Example Inc.",
- },
- {
- "firstname": "Max",
- "is_point_of_contact": False,
- "is_not_point_of_contact": True,
- "is_ceo_of_small_company": True,
- "is_ceo_small_2": True,
- "largest_company": "Example Inc.",
- },
- ],
- )
- # A less elegant way to write the same query: this uses a LEFT OUTER
- # JOIN and an IS NULL, inside a WHERE NOT IN which is probably less
- # efficient than EXISTS.
- self.assertCountEqual(
- qs.filter(is_point_of_contact=True).values("pk"),
- Employee.objects.exclude(company_point_of_contact_set=None).values("pk"),
- )
- def test_subquery_eq(self):
- qs = Employee.objects.annotate(
- is_ceo=Exists(Company.objects.filter(ceo=OuterRef("pk"))),
- is_point_of_contact=Exists(
- Company.objects.filter(point_of_contact=OuterRef("pk")),
- ),
- small_company=Exists(
- queryset=Company.objects.filter(num_employees__lt=200),
- ),
- ).filter(is_ceo=True, is_point_of_contact=False, small_company=True)
- self.assertNotEqual(
- qs.query.annotations["is_ceo"],
- qs.query.annotations["is_point_of_contact"],
- )
- self.assertNotEqual(
- qs.query.annotations["is_ceo"],
- qs.query.annotations["small_company"],
- )
- def test_subquery_sql(self):
- employees = Employee.objects.all()
- employees_subquery = Subquery(employees)
- self.assertIs(employees_subquery.query.subquery, True)
- self.assertIs(employees.query.subquery, False)
- compiler = employees_subquery.query.get_compiler(connection=connection)
- sql, _ = employees_subquery.as_sql(compiler, connection)
- self.assertIn("(SELECT ", sql)
- def test_in_subquery(self):
- # This is a contrived test (and you really wouldn't write this query),
- # but it is a succinct way to test the __in=Subquery() construct.
- small_companies = Company.objects.filter(num_employees__lt=200).values("pk")
- subquery_test = Company.objects.filter(pk__in=Subquery(small_companies))
- self.assertCountEqual(subquery_test, [self.foobar_ltd, self.gmbh])
- subquery_test2 = Company.objects.filter(
- pk=Subquery(small_companies.filter(num_employees=3))
- )
- self.assertCountEqual(subquery_test2, [self.foobar_ltd])
- def test_uuid_pk_subquery(self):
- u = UUIDPK.objects.create()
- UUID.objects.create(uuid_fk=u)
- qs = UUIDPK.objects.filter(id__in=Subquery(UUID.objects.values("uuid_fk__id")))
- self.assertCountEqual(qs, [u])
- def test_nested_subquery(self):
- inner = Company.objects.filter(point_of_contact=OuterRef("pk"))
- outer = Employee.objects.annotate(is_point_of_contact=Exists(inner))
- contrived = Employee.objects.annotate(
- is_point_of_contact=Subquery(
- outer.filter(pk=OuterRef("pk")).values("is_point_of_contact"),
- output_field=BooleanField(),
- ),
- )
- self.assertCountEqual(contrived.values_list(), outer.values_list())
- def test_nested_subquery_join_outer_ref(self):
- inner = Employee.objects.filter(pk=OuterRef("ceo__pk")).values("pk")
- qs = Employee.objects.annotate(
- ceo_company=Subquery(
- Company.objects.filter(
- ceo__in=inner,
- ceo__pk=OuterRef("pk"),
- ).values("pk"),
- ),
- )
- self.assertSequenceEqual(
- qs.values_list("ceo_company", flat=True),
- [self.example_inc.pk, self.foobar_ltd.pk, self.gmbh.pk],
- )
- def test_nested_subquery_outer_ref_2(self):
- first = Time.objects.create(time="09:00")
- second = Time.objects.create(time="17:00")
- third = Time.objects.create(time="21:00")
- SimulationRun.objects.bulk_create(
- [
- SimulationRun(start=first, end=second, midpoint="12:00"),
- SimulationRun(start=first, end=third, midpoint="15:00"),
- SimulationRun(start=second, end=first, midpoint="00:00"),
- ]
- )
- inner = Time.objects.filter(
- time=OuterRef(OuterRef("time")), pk=OuterRef("start")
- ).values("time")
- middle = SimulationRun.objects.annotate(other=Subquery(inner)).values("other")[
- :1
- ]
- outer = Time.objects.annotate(other=Subquery(middle, output_field=TimeField()))
- # This is a contrived example. It exercises the double OuterRef form.
- self.assertCountEqual(outer, [first, second, third])
- def test_nested_subquery_outer_ref_with_autofield(self):
- first = Time.objects.create(time="09:00")
- second = Time.objects.create(time="17:00")
- SimulationRun.objects.create(start=first, end=second, midpoint="12:00")
- inner = SimulationRun.objects.filter(start=OuterRef(OuterRef("pk"))).values(
- "start"
- )
- middle = Time.objects.annotate(other=Subquery(inner)).values("other")[:1]
- outer = Time.objects.annotate(
- other=Subquery(middle, output_field=IntegerField())
- )
- # This exercises the double OuterRef form with AutoField as pk.
- self.assertCountEqual(outer, [first, second])
- def test_annotations_within_subquery(self):
- Company.objects.filter(num_employees__lt=50).update(
- ceo=Employee.objects.get(firstname="Frank")
- )
- inner = (
- Company.objects.filter(ceo=OuterRef("pk"))
- .values("ceo")
- .annotate(total_employees=Sum("num_employees"))
- .values("total_employees")
- )
- outer = Employee.objects.annotate(total_employees=Subquery(inner)).filter(
- salary__lte=Subquery(inner)
- )
- self.assertSequenceEqual(
- outer.order_by("-total_employees").values("salary", "total_employees"),
- [
- {"salary": 10, "total_employees": 2300},
- {"salary": 20, "total_employees": 35},
- ],
- )
- def test_subquery_references_joined_table_twice(self):
- inner = Company.objects.filter(
- num_chairs__gte=OuterRef("ceo__salary"),
- num_employees__gte=OuterRef("point_of_contact__salary"),
- )
- # Another contrived example (there is no need to have a subquery here)
- outer = Company.objects.filter(pk__in=Subquery(inner.values("pk")))
- self.assertFalse(outer.exists())
- def test_subquery_filter_by_aggregate(self):
- Number.objects.create(integer=1000, float=1.2)
- Employee.objects.create(salary=1000)
- qs = Number.objects.annotate(
- min_valuable_count=Subquery(
- Employee.objects.filter(
- salary=OuterRef("integer"),
- )
- .annotate(cnt=Count("salary"))
- .filter(cnt__gt=0)
- .values("cnt")[:1]
- ),
- )
- self.assertEqual(qs.get().float, 1.2)
- def test_subquery_filter_by_lazy(self):
- self.max.manager = Manager.objects.create(name="Manager")
- self.max.save()
- max_manager = SimpleLazyObject(
- lambda: Manager.objects.get(pk=self.max.manager.pk)
- )
- qs = Company.objects.annotate(
- ceo_manager=Subquery(
- Employee.objects.filter(
- lastname=OuterRef("ceo__lastname"),
- ).values("manager"),
- ),
- ).filter(ceo_manager=max_manager)
- self.assertEqual(qs.get(), self.gmbh)
- def test_aggregate_subquery_annotation(self):
- with self.assertNumQueries(1) as ctx:
- aggregate = Company.objects.annotate(
- ceo_salary=Subquery(
- Employee.objects.filter(
- id=OuterRef("ceo_id"),
- ).values("salary")
- ),
- ).aggregate(
- ceo_salary_gt_20=Count("pk", filter=Q(ceo_salary__gt=20)),
- )
- self.assertEqual(aggregate, {"ceo_salary_gt_20": 1})
- # Aggregation over a subquery annotation doesn't annotate the subquery
- # twice in the inner query.
- sql = ctx.captured_queries[0]["sql"]
- self.assertLessEqual(sql.count("SELECT"), 3)
- # GROUP BY isn't required to aggregate over a query that doesn't
- # contain nested aggregates.
- self.assertNotIn("GROUP BY", sql)
- @skipUnlessDBFeature("supports_over_clause")
- def test_aggregate_rawsql_annotation(self):
- with self.assertNumQueries(1) as ctx:
- aggregate = Company.objects.annotate(
- salary=RawSQL("SUM(num_chairs) OVER (ORDER BY num_employees)", []),
- ).aggregate(
- count=Count("pk"),
- )
- self.assertEqual(aggregate, {"count": 3})
- sql = ctx.captured_queries[0]["sql"]
- self.assertNotIn("GROUP BY", sql)
- def test_explicit_output_field(self):
- class FuncA(Func):
- output_field = CharField()
- class FuncB(Func):
- pass
- expr = FuncB(FuncA())
- self.assertEqual(expr.output_field, FuncA.output_field)
- def test_outerref_mixed_case_table_name(self):
- inner = Result.objects.filter(result_time__gte=OuterRef("experiment__assigned"))
- outer = Result.objects.filter(pk__in=Subquery(inner.values("pk")))
- self.assertFalse(outer.exists())
- def test_outerref_with_operator(self):
- inner = Company.objects.filter(num_employees=OuterRef("ceo__salary") + 2)
- outer = Company.objects.filter(pk__in=Subquery(inner.values("pk")))
- self.assertEqual(outer.get().name, "Test GmbH")
- def test_nested_outerref_with_function(self):
- self.gmbh.point_of_contact = Employee.objects.get(lastname="Meyer")
- self.gmbh.save()
- inner = Employee.objects.filter(
- lastname__startswith=Left(OuterRef(OuterRef("lastname")), 1),
- )
- qs = Employee.objects.annotate(
- ceo_company=Subquery(
- Company.objects.filter(
- point_of_contact__in=inner,
- ceo__pk=OuterRef("pk"),
- ).values("name"),
- ),
- ).filter(ceo_company__isnull=False)
- self.assertEqual(qs.get().ceo_company, "Test GmbH")
- def test_annotation_with_outerref(self):
- gmbh_salary = Company.objects.annotate(
- max_ceo_salary_raise=Subquery(
- Company.objects.annotate(
- salary_raise=OuterRef("num_employees") + F("num_employees"),
- )
- .order_by("-salary_raise")
- .values("salary_raise")[:1],
- output_field=IntegerField(),
- ),
- ).get(pk=self.gmbh.pk)
- self.assertEqual(gmbh_salary.max_ceo_salary_raise, 2332)
- def test_annotation_with_nested_outerref(self):
- self.gmbh.point_of_contact = Employee.objects.get(lastname="Meyer")
- self.gmbh.save()
- inner = Employee.objects.annotate(
- outer_lastname=OuterRef(OuterRef("lastname")),
- ).filter(lastname__startswith=Left("outer_lastname", 1))
- qs = Employee.objects.annotate(
- ceo_company=Subquery(
- Company.objects.filter(
- point_of_contact__in=inner,
- ceo__pk=OuterRef("pk"),
- ).values("name"),
- ),
- ).filter(ceo_company__isnull=False)
- self.assertEqual(qs.get().ceo_company, "Test GmbH")
- def test_pickle_expression(self):
- expr = Value(1)
- expr.convert_value # populate cached property
- self.assertEqual(pickle.loads(pickle.dumps(expr)), expr)
- def test_incorrect_field_in_F_expression(self):
- with self.assertRaisesMessage(
- FieldError, "Cannot resolve keyword 'nope' into field."
- ):
- list(Employee.objects.filter(firstname=F("nope")))
- def test_incorrect_joined_field_in_F_expression(self):
- with self.assertRaisesMessage(
- FieldError, "Cannot resolve keyword 'nope' into field."
- ):
- list(Company.objects.filter(ceo__pk=F("point_of_contact__nope")))
- def test_exists_in_filter(self):
- inner = Company.objects.filter(ceo=OuterRef("pk")).values("pk")
- qs1 = Employee.objects.filter(Exists(inner))
- qs2 = Employee.objects.annotate(found=Exists(inner)).filter(found=True)
- self.assertCountEqual(qs1, qs2)
- self.assertFalse(Employee.objects.exclude(Exists(inner)).exists())
- self.assertCountEqual(qs2, Employee.objects.exclude(~Exists(inner)))
- def test_subquery_in_filter(self):
- inner = Company.objects.filter(ceo=OuterRef("pk")).values("based_in_eu")
- self.assertSequenceEqual(
- Employee.objects.filter(Subquery(inner)),
- [self.foobar_ltd.ceo],
- )
- def test_subquery_group_by_outerref_in_filter(self):
- inner = (
- Company.objects.annotate(
- employee=OuterRef("pk"),
- )
- .values("employee")
- .annotate(
- min_num_chairs=Min("num_chairs"),
- )
- .values("ceo")
- )
- self.assertIs(Employee.objects.filter(pk__in=Subquery(inner)).exists(), True)
- def test_case_in_filter_if_boolean_output_field(self):
- is_ceo = Company.objects.filter(ceo=OuterRef("pk"))
- is_poc = Company.objects.filter(point_of_contact=OuterRef("pk"))
- qs = Employee.objects.filter(
- Case(
- When(Exists(is_ceo), then=True),
- When(Exists(is_poc), then=True),
- default=False,
- output_field=BooleanField(),
- ),
- )
- self.assertCountEqual(qs, [self.example_inc.ceo, self.foobar_ltd.ceo, self.max])
- def test_boolean_expression_combined(self):
- is_ceo = Company.objects.filter(ceo=OuterRef("pk"))
- is_poc = Company.objects.filter(point_of_contact=OuterRef("pk"))
- self.gmbh.point_of_contact = self.max
- self.gmbh.save()
- self.assertCountEqual(
- Employee.objects.filter(Exists(is_ceo) | Exists(is_poc)),
- [self.example_inc.ceo, self.foobar_ltd.ceo, self.max],
- )
- self.assertCountEqual(
- Employee.objects.filter(Exists(is_ceo) & Exists(is_poc)),
- [self.max],
- )
- self.assertCountEqual(
- Employee.objects.filter(Exists(is_ceo) & Q(salary__gte=30)),
- [self.max],
- )
- self.assertCountEqual(
- Employee.objects.filter(Exists(is_poc) | Q(salary__lt=15)),
- [self.example_inc.ceo, self.max],
- )
- self.assertCountEqual(
- Employee.objects.filter(Q(salary__gte=30) & Exists(is_ceo)),
- [self.max],
- )
- self.assertCountEqual(
- Employee.objects.filter(Q(salary__lt=15) | Exists(is_poc)),
- [self.example_inc.ceo, self.max],
- )
- def test_boolean_expression_combined_with_empty_Q(self):
- is_poc = Company.objects.filter(point_of_contact=OuterRef("pk"))
- self.gmbh.point_of_contact = self.max
- self.gmbh.save()
- tests = [
- Exists(is_poc) & Q(),
- Q() & Exists(is_poc),
- Exists(is_poc) | Q(),
- Q() | Exists(is_poc),
- Q(Exists(is_poc)) & Q(),
- Q() & Q(Exists(is_poc)),
- Q(Exists(is_poc)) | Q(),
- Q() | Q(Exists(is_poc)),
- ]
- for conditions in tests:
- with self.subTest(conditions):
- self.assertCountEqual(Employee.objects.filter(conditions), [self.max])
- def test_boolean_expression_in_Q(self):
- is_poc = Company.objects.filter(point_of_contact=OuterRef("pk"))
- self.gmbh.point_of_contact = self.max
- self.gmbh.save()
- self.assertCountEqual(Employee.objects.filter(Q(Exists(is_poc))), [self.max])
- class IterableLookupInnerExpressionsTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- ceo = Employee.objects.create(firstname="Just", lastname="Doit", salary=30)
- # MySQL requires that the values calculated for expressions don't pass
- # outside of the field's range, so it's inconvenient to use the values
- # in the more general tests.
- cls.c5020 = Company.objects.create(
- name="5020 Ltd", num_employees=50, num_chairs=20, ceo=ceo
- )
- cls.c5040 = Company.objects.create(
- name="5040 Ltd", num_employees=50, num_chairs=40, ceo=ceo
- )
- cls.c5050 = Company.objects.create(
- name="5050 Ltd", num_employees=50, num_chairs=50, ceo=ceo
- )
- cls.c5060 = Company.objects.create(
- name="5060 Ltd", num_employees=50, num_chairs=60, ceo=ceo
- )
- cls.c99300 = Company.objects.create(
- name="99300 Ltd", num_employees=99, num_chairs=300, ceo=ceo
- )
- def test_in_lookup_allows_F_expressions_and_expressions_for_integers(self):
- # __in lookups can use F() expressions for integers.
- queryset = Company.objects.filter(num_employees__in=([F("num_chairs") - 10]))
- self.assertSequenceEqual(queryset, [self.c5060])
- self.assertCountEqual(
- Company.objects.filter(
- num_employees__in=([F("num_chairs") - 10, F("num_chairs") + 10])
- ),
- [self.c5040, self.c5060],
- )
- self.assertCountEqual(
- Company.objects.filter(
- num_employees__in=(
- [F("num_chairs") - 10, F("num_chairs"), F("num_chairs") + 10]
- )
- ),
- [self.c5040, self.c5050, self.c5060],
- )
- def test_expressions_in_lookups_join_choice(self):
- midpoint = datetime.time(13, 0)
- t1 = Time.objects.create(time=datetime.time(12, 0))
- t2 = Time.objects.create(time=datetime.time(14, 0))
- s1 = SimulationRun.objects.create(start=t1, end=t2, midpoint=midpoint)
- SimulationRun.objects.create(start=t1, end=None, midpoint=midpoint)
- SimulationRun.objects.create(start=None, end=t2, midpoint=midpoint)
- SimulationRun.objects.create(start=None, end=None, midpoint=midpoint)
- queryset = SimulationRun.objects.filter(
- midpoint__range=[F("start__time"), F("end__time")]
- )
- self.assertSequenceEqual(queryset, [s1])
- for alias in queryset.query.alias_map.values():
- if isinstance(alias, Join):
- self.assertEqual(alias.join_type, constants.INNER)
- queryset = SimulationRun.objects.exclude(
- midpoint__range=[F("start__time"), F("end__time")]
- )
- self.assertQuerysetEqual(queryset, [], ordered=False)
- for alias in queryset.query.alias_map.values():
- if isinstance(alias, Join):
- self.assertEqual(alias.join_type, constants.LOUTER)
- def test_range_lookup_allows_F_expressions_and_expressions_for_integers(self):
- # Range lookups can use F() expressions for integers.
- Company.objects.filter(num_employees__exact=F("num_chairs"))
- self.assertCountEqual(
- Company.objects.filter(num_employees__range=(F("num_chairs"), 100)),
- [self.c5020, self.c5040, self.c5050],
- )
- self.assertCountEqual(
- Company.objects.filter(
- num_employees__range=(F("num_chairs") - 10, F("num_chairs") + 10)
- ),
- [self.c5040, self.c5050, self.c5060],
- )
- self.assertCountEqual(
- Company.objects.filter(num_employees__range=(F("num_chairs") - 10, 100)),
- [self.c5020, self.c5040, self.c5050, self.c5060],
- )
- self.assertCountEqual(
- Company.objects.filter(num_employees__range=(1, 100)),
- [self.c5020, self.c5040, self.c5050, self.c5060, self.c99300],
- )
- def test_range_lookup_namedtuple(self):
- EmployeeRange = namedtuple("EmployeeRange", ["minimum", "maximum"])
- qs = Company.objects.filter(
- num_employees__range=EmployeeRange(minimum=51, maximum=100),
- )
- self.assertSequenceEqual(qs, [self.c99300])
- @unittest.skipUnless(
- connection.vendor == "sqlite",
- "This defensive test only works on databases that don't validate parameter "
- "types",
- )
- def test_expressions_not_introduce_sql_injection_via_untrusted_string_inclusion(
- self,
- ):
- """
- This tests that SQL injection isn't possible using compilation of
- expressions in iterable filters, as their compilation happens before
- the main query compilation. It's limited to SQLite, as PostgreSQL,
- Oracle and other vendors have defense in depth against this by type
- checking. Testing against SQLite (the most permissive of the built-in
- databases) demonstrates that the problem doesn't exist while keeping
- the test simple.
- """
- queryset = Company.objects.filter(name__in=[F("num_chairs") + "1)) OR ((1==1"])
- self.assertQuerysetEqual(queryset, [], ordered=False)
- def test_in_lookup_allows_F_expressions_and_expressions_for_datetimes(self):
- start = datetime.datetime(2016, 2, 3, 15, 0, 0)
- end = datetime.datetime(2016, 2, 5, 15, 0, 0)
- experiment_1 = Experiment.objects.create(
- name="Integrity testing",
- assigned=start.date(),
- start=start,
- end=end,
- completed=end.date(),
- estimated_time=end - start,
- )
- experiment_2 = Experiment.objects.create(
- name="Taste testing",
- assigned=start.date(),
- start=start,
- end=end,
- completed=end.date(),
- estimated_time=end - start,
- )
- r1 = Result.objects.create(
- experiment=experiment_1,
- result_time=datetime.datetime(2016, 2, 4, 15, 0, 0),
- )
- Result.objects.create(
- experiment=experiment_1,
- result_time=datetime.datetime(2016, 3, 10, 2, 0, 0),
- )
- Result.objects.create(
- experiment=experiment_2,
- result_time=datetime.datetime(2016, 1, 8, 5, 0, 0),
- )
- within_experiment_time = [F("experiment__start"), F("experiment__end")]
- queryset = Result.objects.filter(result_time__range=within_experiment_time)
- self.assertSequenceEqual(queryset, [r1])
- class FTests(SimpleTestCase):
- def test_deepcopy(self):
- f = F("foo")
- g = deepcopy(f)
- self.assertEqual(f.name, g.name)
- def test_deconstruct(self):
- f = F("name")
- path, args, kwargs = f.deconstruct()
- self.assertEqual(path, "django.db.models.F")
- self.assertEqual(args, (f.name,))
- self.assertEqual(kwargs, {})
- def test_equal(self):
- f = F("name")
- same_f = F("name")
- other_f = F("username")
- self.assertEqual(f, same_f)
- self.assertNotEqual(f, other_f)
- def test_hash(self):
- d = {F("name"): "Bob"}
- self.assertIn(F("name"), d)
- self.assertEqual(d[F("name")], "Bob")
- def test_not_equal_Value(self):
- f = F("name")
- value = Value("name")
- self.assertNotEqual(f, value)
- self.assertNotEqual(value, f)
- class ExpressionsTests(TestCase):
- def test_F_reuse(self):
- f = F("id")
- n = Number.objects.create(integer=-1)
- c = Company.objects.create(
- name="Example Inc.",
- num_employees=2300,
- num_chairs=5,
- ceo=Employee.objects.create(firstname="Joe", lastname="Smith"),
- )
- c_qs = Company.objects.filter(id=f)
- self.assertEqual(c_qs.get(), c)
- # Reuse the same F-object for another queryset
- n_qs = Number.objects.filter(id=f)
- self.assertEqual(n_qs.get(), n)
- # The original query still works correctly
- self.assertEqual(c_qs.get(), c)
- def test_patterns_escape(self):
- r"""
- Special characters (e.g. %, _ and \) stored in database are
- properly escaped when using a pattern lookup with an expression
- refs #16731
- """
- Employee.objects.bulk_create(
- [
- Employee(firstname="Johnny", lastname="%John"),
- Employee(firstname="Jean-Claude", lastname="Claud_"),
- Employee(firstname="Jean-Claude", lastname="Claude%"),
- Employee(firstname="Johnny", lastname="Joh\\n"),
- Employee(firstname="Johnny", lastname="_ohn"),
- ]
- )
- claude = Employee.objects.create(firstname="Jean-Claude", lastname="Claude")
- john = Employee.objects.create(firstname="Johnny", lastname="John")
- john_sign = Employee.objects.create(firstname="%Joh\\nny", lastname="%Joh\\n")
- self.assertCountEqual(
- Employee.objects.filter(firstname__contains=F("lastname")),
- [john_sign, john, claude],
- )
- self.assertCountEqual(
- Employee.objects.filter(firstname__startswith=F("lastname")),
- [john_sign, john],
- )
- self.assertSequenceEqual(
- Employee.objects.filter(firstname__endswith=F("lastname")),
- [claude],
- )
- def test_insensitive_patterns_escape(self):
- r"""
- Special characters (e.g. %, _ and \) stored in database are
- properly escaped when using a case insensitive pattern lookup with an
- expression -- refs #16731
- """
- Employee.objects.bulk_create(
- [
- Employee(firstname="Johnny", lastname="%john"),
- Employee(firstname="Jean-Claude", lastname="claud_"),
- Employee(firstname="Jean-Claude", lastname="claude%"),
- Employee(firstname="Johnny", lastname="joh\\n"),
- Employee(firstname="Johnny", lastname="_ohn"),
- ]
- )
- claude = Employee.objects.create(firstname="Jean-Claude", lastname="claude")
- john = Employee.objects.create(firstname="Johnny", lastname="john")
- john_sign = Employee.objects.create(firstname="%Joh\\nny", lastname="%joh\\n")
- self.assertCountEqual(
- Employee.objects.filter(firstname__icontains=F("lastname")),
- [john_sign, john, claude],
- )
- self.assertCountEqual(
- Employee.objects.filter(firstname__istartswith=F("lastname")),
- [john_sign, john],
- )
- self.assertSequenceEqual(
- Employee.objects.filter(firstname__iendswith=F("lastname")),
- [claude],
- )
- @isolate_apps("expressions")
- class SimpleExpressionTests(SimpleTestCase):
- def test_equal(self):
- self.assertEqual(Expression(), Expression())
- self.assertEqual(
- Expression(IntegerField()), Expression(output_field=IntegerField())
- )
- self.assertEqual(Expression(IntegerField()), mock.ANY)
- self.assertNotEqual(Expression(IntegerField()), Expression(CharField()))
- class TestModel(Model):
- field = IntegerField()
- other_field = IntegerField()
- self.assertNotEqual(
- Expression(TestModel._meta.get_field("field")),
- Expression(TestModel._meta.get_field("other_field")),
- )
- def test_hash(self):
- self.assertEqual(hash(Expression()), hash(Expression()))
- self.assertEqual(
- hash(Expression(IntegerField())),
- hash(Expression(output_field=IntegerField())),
- )
- self.assertNotEqual(
- hash(Expression(IntegerField())),
- hash(Expression(CharField())),
- )
- class TestModel(Model):
- field = IntegerField()
- other_field = IntegerField()
- self.assertNotEqual(
- hash(Expression(TestModel._meta.get_field("field"))),
- hash(Expression(TestModel._meta.get_field("other_field"))),
- )
- class ExpressionsNumericTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- Number(integer=-1).save()
- Number(integer=42).save()
- Number(integer=1337).save()
- Number.objects.update(float=F("integer"))
- def test_fill_with_value_from_same_object(self):
- """
- We can fill a value in all objects with an other value of the
- same object.
- """
- self.assertQuerysetEqual(
- Number.objects.all(),
- [(-1, -1), (42, 42), (1337, 1337)],
- lambda n: (n.integer, round(n.float)),
- ordered=False,
- )
- def test_increment_value(self):
- """
- We can increment a value of all objects in a query set.
- """
- self.assertEqual(
- Number.objects.filter(integer__gt=0).update(integer=F("integer") + 1), 2
- )
- self.assertQuerysetEqual(
- Number.objects.all(),
- [(-1, -1), (43, 42), (1338, 1337)],
- lambda n: (n.integer, round(n.float)),
- ordered=False,
- )
- def test_filter_not_equals_other_field(self):
- """
- We can filter for objects, where a value is not equals the value
- of an other field.
- """
- self.assertEqual(
- Number.objects.filter(integer__gt=0).update(integer=F("integer") + 1), 2
- )
- self.assertQuerysetEqual(
- Number.objects.exclude(float=F("integer")),
- [(43, 42), (1338, 1337)],
- lambda n: (n.integer, round(n.float)),
- ordered=False,
- )
- def test_filter_decimal_expression(self):
- obj = Number.objects.create(integer=0, float=1, decimal_value=Decimal("1"))
- qs = Number.objects.annotate(
- x=ExpressionWrapper(Value(1), output_field=DecimalField()),
- ).filter(Q(x=1, integer=0) & Q(x=Decimal("1")))
- self.assertSequenceEqual(qs, [obj])
- def test_complex_expressions(self):
- """
- Complex expressions of different connection types are possible.
- """
- n = Number.objects.create(integer=10, float=123.45)
- self.assertEqual(
- Number.objects.filter(pk=n.pk).update(float=F("integer") + F("float") * 2),
- 1,
- )
- self.assertEqual(Number.objects.get(pk=n.pk).integer, 10)
- self.assertEqual(
- Number.objects.get(pk=n.pk).float, Approximate(256.900, places=3)
- )
- def test_decimal_expression(self):
- n = Number.objects.create(integer=1, decimal_value=Decimal("0.5"))
- n.decimal_value = F("decimal_value") - Decimal("0.4")
- n.save()
- n.refresh_from_db()
- self.assertEqual(n.decimal_value, Decimal("0.1"))
- class ExpressionOperatorTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.n = Number.objects.create(integer=42, float=15.5)
- cls.n1 = Number.objects.create(integer=-42, float=-15.5)
- def test_lefthand_addition(self):
- # LH Addition of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=F("integer") + 15, float=F("float") + 42.7
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3)
- )
- def test_lefthand_subtraction(self):
- # LH Subtraction of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=F("integer") - 15, float=F("float") - 42.7
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(-27.200, places=3)
- )
- def test_lefthand_multiplication(self):
- # Multiplication of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=F("integer") * 15, float=F("float") * 42.7
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3)
- )
- def test_lefthand_division(self):
- # LH Division of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=F("integer") / 2, float=F("float") / 42.7
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 21)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(0.363, places=3)
- )
- def test_lefthand_modulo(self):
- # LH Modulo arithmetic on integers
- Number.objects.filter(pk=self.n.pk).update(integer=F("integer") % 20)
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 2)
- def test_lefthand_modulo_null(self):
- # LH Modulo arithmetic on integers.
- Employee.objects.create(firstname="John", lastname="Doe", salary=None)
- qs = Employee.objects.annotate(modsalary=F("salary") % 20)
- self.assertIsNone(qs.get().salary)
- def test_lefthand_bitwise_and(self):
- # LH Bitwise ands on integers
- Number.objects.filter(pk=self.n.pk).update(integer=F("integer").bitand(56))
- Number.objects.filter(pk=self.n1.pk).update(integer=F("integer").bitand(-56))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 40)
- self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -64)
- def test_lefthand_bitwise_left_shift_operator(self):
- Number.objects.update(integer=F("integer").bitleftshift(2))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 168)
- self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -168)
- def test_lefthand_bitwise_right_shift_operator(self):
- Number.objects.update(integer=F("integer").bitrightshift(2))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 10)
- self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -11)
- def test_lefthand_bitwise_or(self):
- # LH Bitwise or on integers
- Number.objects.update(integer=F("integer").bitor(48))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 58)
- self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -10)
- def test_lefthand_transformed_field_bitwise_or(self):
- Employee.objects.create(firstname="Max", lastname="Mustermann")
- with register_lookup(CharField, Length):
- qs = Employee.objects.annotate(bitor=F("lastname__length").bitor(48))
- self.assertEqual(qs.get().bitor, 58)
- def test_lefthand_power(self):
- # LH Power arithmetic operation on floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=F("integer") ** 2, float=F("float") ** 1.5
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 1764)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(61.02, places=2)
- )
- def test_lefthand_bitwise_xor(self):
- Number.objects.update(integer=F("integer").bitxor(48))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 26)
- self.assertEqual(Number.objects.get(pk=self.n1.pk).integer, -26)
- def test_lefthand_bitwise_xor_null(self):
- employee = Employee.objects.create(firstname="John", lastname="Doe")
- Employee.objects.update(salary=F("salary").bitxor(48))
- employee.refresh_from_db()
- self.assertIsNone(employee.salary)
- def test_lefthand_bitwise_xor_right_null(self):
- employee = Employee.objects.create(firstname="John", lastname="Doe", salary=48)
- Employee.objects.update(salary=F("salary").bitxor(None))
- employee.refresh_from_db()
- self.assertIsNone(employee.salary)
- @unittest.skipUnless(
- connection.vendor == "oracle", "Oracle doesn't support bitwise XOR."
- )
- def test_lefthand_bitwise_xor_not_supported(self):
- msg = "Bitwise XOR is not supported in Oracle."
- with self.assertRaisesMessage(NotSupportedError, msg):
- Number.objects.update(integer=F("integer").bitxor(48))
- def test_right_hand_addition(self):
- # Right hand operators
- Number.objects.filter(pk=self.n.pk).update(
- integer=15 + F("integer"), float=42.7 + F("float")
- )
- # RH Addition of floats and integers
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 57)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(58.200, places=3)
- )
- def test_right_hand_subtraction(self):
- Number.objects.filter(pk=self.n.pk).update(
- integer=15 - F("integer"), float=42.7 - F("float")
- )
- # RH Subtraction of floats and integers
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, -27)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(27.200, places=3)
- )
- def test_right_hand_multiplication(self):
- # RH Multiplication of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=15 * F("integer"), float=42.7 * F("float")
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 630)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(661.850, places=3)
- )
- def test_right_hand_division(self):
- # RH Division of floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=640 / F("integer"), float=42.7 / F("float")
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 15)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(2.755, places=3)
- )
- def test_right_hand_modulo(self):
- # RH Modulo arithmetic on integers
- Number.objects.filter(pk=self.n.pk).update(integer=69 % F("integer"))
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 27)
- def test_righthand_power(self):
- # RH Power arithmetic operation on floats and integers
- Number.objects.filter(pk=self.n.pk).update(
- integer=2 ** F("integer"), float=1.5 ** F("float")
- )
- self.assertEqual(Number.objects.get(pk=self.n.pk).integer, 4398046511104)
- self.assertEqual(
- Number.objects.get(pk=self.n.pk).float, Approximate(536.308, places=3)
- )
- class FTimeDeltaTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.sday = sday = datetime.date(2010, 6, 25)
- cls.stime = stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000)
- midnight = datetime.time(0)
- delta0 = datetime.timedelta(0)
- delta1 = datetime.timedelta(microseconds=253000)
- delta2 = datetime.timedelta(seconds=44)
- delta3 = datetime.timedelta(hours=21, minutes=8)
- delta4 = datetime.timedelta(days=10)
- delta5 = datetime.timedelta(days=90)
- # Test data is set so that deltas and delays will be
- # strictly increasing.
- cls.deltas = []
- cls.delays = []
- cls.days_long = []
- # e0: started same day as assigned, zero duration
- end = stime + delta0
- cls.e0 = Experiment.objects.create(
- name="e0",
- assigned=sday,
- start=stime,
- end=end,
- completed=end.date(),
- estimated_time=delta0,
- )
- cls.deltas.append(delta0)
- cls.delays.append(
- cls.e0.start - datetime.datetime.combine(cls.e0.assigned, midnight)
- )
- cls.days_long.append(cls.e0.completed - cls.e0.assigned)
- # e1: started one day after assigned, tiny duration, data
- # set so that end time has no fractional seconds, which
- # tests an edge case on sqlite.
- delay = datetime.timedelta(1)
- end = stime + delay + delta1
- e1 = Experiment.objects.create(
- name="e1",
- assigned=sday,
- start=stime + delay,
- end=end,
- completed=end.date(),
- estimated_time=delta1,
- )
- cls.deltas.append(delta1)
- cls.delays.append(e1.start - datetime.datetime.combine(e1.assigned, midnight))
- cls.days_long.append(e1.completed - e1.assigned)
- # e2: started three days after assigned, small duration
- end = stime + delta2
- e2 = Experiment.objects.create(
- name="e2",
- assigned=sday - datetime.timedelta(3),
- start=stime,
- end=end,
- completed=end.date(),
- estimated_time=datetime.timedelta(hours=1),
- )
- cls.deltas.append(delta2)
- cls.delays.append(e2.start - datetime.datetime.combine(e2.assigned, midnight))
- cls.days_long.append(e2.completed - e2.assigned)
- # e3: started four days after assigned, medium duration
- delay = datetime.timedelta(4)
- end = stime + delay + delta3
- e3 = Experiment.objects.create(
- name="e3",
- assigned=sday,
- start=stime + delay,
- end=end,
- completed=end.date(),
- estimated_time=delta3,
- )
- cls.deltas.append(delta3)
- cls.delays.append(e3.start - datetime.datetime.combine(e3.assigned, midnight))
- cls.days_long.append(e3.completed - e3.assigned)
- # e4: started 10 days after assignment, long duration
- end = stime + delta4
- e4 = Experiment.objects.create(
- name="e4",
- assigned=sday - datetime.timedelta(10),
- start=stime,
- end=end,
- completed=end.date(),
- estimated_time=delta4 - datetime.timedelta(1),
- )
- cls.deltas.append(delta4)
- cls.delays.append(e4.start - datetime.datetime.combine(e4.assigned, midnight))
- cls.days_long.append(e4.completed - e4.assigned)
- # e5: started a month after assignment, very long duration
- delay = datetime.timedelta(30)
- end = stime + delay + delta5
- e5 = Experiment.objects.create(
- name="e5",
- assigned=sday,
- start=stime + delay,
- end=end,
- completed=end.date(),
- estimated_time=delta5,
- )
- cls.deltas.append(delta5)
- cls.delays.append(e5.start - datetime.datetime.combine(e5.assigned, midnight))
- cls.days_long.append(e5.completed - e5.assigned)
- cls.expnames = [e.name for e in Experiment.objects.all()]
- def test_multiple_query_compilation(self):
- # Ticket #21643
- queryset = Experiment.objects.filter(
- end__lt=F("start") + datetime.timedelta(hours=1)
- )
- q1 = str(queryset.query)
- q2 = str(queryset.query)
- self.assertEqual(q1, q2)
- def test_query_clone(self):
- # Ticket #21643 - Crash when compiling query more than once
- qs = Experiment.objects.filter(end__lt=F("start") + datetime.timedelta(hours=1))
- qs2 = qs.all()
- list(qs)
- list(qs2)
- # Intentionally no assert
- def test_delta_add(self):
- for i, delta in enumerate(self.deltas):
- test_set = [
- e.name for e in Experiment.objects.filter(end__lt=F("start") + delta)
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name for e in Experiment.objects.filter(end__lt=delta + F("start"))
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name for e in Experiment.objects.filter(end__lte=F("start") + delta)
- ]
- self.assertEqual(test_set, self.expnames[: i + 1])
- def test_delta_subtract(self):
- for i, delta in enumerate(self.deltas):
- test_set = [
- e.name for e in Experiment.objects.filter(start__gt=F("end") - delta)
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name for e in Experiment.objects.filter(start__gte=F("end") - delta)
- ]
- self.assertEqual(test_set, self.expnames[: i + 1])
- def test_exclude(self):
- for i, delta in enumerate(self.deltas):
- test_set = [
- e.name for e in Experiment.objects.exclude(end__lt=F("start") + delta)
- ]
- self.assertEqual(test_set, self.expnames[i:])
- test_set = [
- e.name for e in Experiment.objects.exclude(end__lte=F("start") + delta)
- ]
- self.assertEqual(test_set, self.expnames[i + 1 :])
- def test_date_comparison(self):
- for i, days in enumerate(self.days_long):
- test_set = [
- e.name
- for e in Experiment.objects.filter(completed__lt=F("assigned") + days)
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name
- for e in Experiment.objects.filter(completed__lte=F("assigned") + days)
- ]
- self.assertEqual(test_set, self.expnames[: i + 1])
- def test_datetime_and_durationfield_addition_with_filter(self):
- test_set = Experiment.objects.filter(end=F("start") + F("estimated_time"))
- self.assertGreater(test_set.count(), 0)
- self.assertEqual(
- [e.name for e in test_set],
- [
- e.name
- for e in Experiment.objects.all()
- if e.end == e.start + e.estimated_time
- ],
- )
- def test_datetime_and_duration_field_addition_with_annotate_and_no_output_field(
- self,
- ):
- test_set = Experiment.objects.annotate(
- estimated_end=F("start") + F("estimated_time")
- )
- self.assertEqual(
- [e.estimated_end for e in test_set],
- [e.start + e.estimated_time for e in test_set],
- )
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_datetime_subtraction_with_annotate_and_no_output_field(self):
- test_set = Experiment.objects.annotate(
- calculated_duration=F("end") - F("start")
- )
- self.assertEqual(
- [e.calculated_duration for e in test_set],
- [e.end - e.start for e in test_set],
- )
- def test_mixed_comparisons1(self):
- for i, delay in enumerate(self.delays):
- test_set = [
- e.name
- for e in Experiment.objects.filter(assigned__gt=F("start") - delay)
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name
- for e in Experiment.objects.filter(assigned__gte=F("start") - delay)
- ]
- self.assertEqual(test_set, self.expnames[: i + 1])
- def test_mixed_comparisons2(self):
- for i, delay in enumerate(self.delays):
- delay = datetime.timedelta(delay.days)
- test_set = [
- e.name
- for e in Experiment.objects.filter(start__lt=F("assigned") + delay)
- ]
- self.assertEqual(test_set, self.expnames[:i])
- test_set = [
- e.name
- for e in Experiment.objects.filter(
- start__lte=F("assigned") + delay + datetime.timedelta(1)
- )
- ]
- self.assertEqual(test_set, self.expnames[: i + 1])
- def test_delta_update(self):
- for delta in self.deltas:
- exps = Experiment.objects.all()
- expected_durations = [e.duration() for e in exps]
- expected_starts = [e.start + delta for e in exps]
- expected_ends = [e.end + delta for e in exps]
- Experiment.objects.update(start=F("start") + delta, end=F("end") + delta)
- exps = Experiment.objects.all()
- new_starts = [e.start for e in exps]
- new_ends = [e.end for e in exps]
- new_durations = [e.duration() for e in exps]
- self.assertEqual(expected_starts, new_starts)
- self.assertEqual(expected_ends, new_ends)
- self.assertEqual(expected_durations, new_durations)
- def test_invalid_operator(self):
- with self.assertRaises(DatabaseError):
- list(Experiment.objects.filter(start=F("start") * datetime.timedelta(0)))
- def test_durationfield_add(self):
- zeros = [
- e.name
- for e in Experiment.objects.filter(start=F("start") + F("estimated_time"))
- ]
- self.assertEqual(zeros, ["e0"])
- end_less = [
- e.name
- for e in Experiment.objects.filter(end__lt=F("start") + F("estimated_time"))
- ]
- self.assertEqual(end_less, ["e2"])
- delta_math = [
- e.name
- for e in Experiment.objects.filter(
- end__gte=F("start") + F("estimated_time") + datetime.timedelta(hours=1)
- )
- ]
- self.assertEqual(delta_math, ["e4"])
- queryset = Experiment.objects.annotate(
- shifted=ExpressionWrapper(
- F("start") + Value(None, output_field=DurationField()),
- output_field=DateTimeField(),
- )
- )
- self.assertIsNone(queryset.first().shifted)
- def test_durationfield_multiply_divide(self):
- Experiment.objects.update(scalar=2)
- tests = [
- (Decimal("2"), 2),
- (F("scalar"), 2),
- (2, 2),
- (3.2, 3.2),
- ]
- for expr, scalar in tests:
- with self.subTest(expr=expr):
- qs = Experiment.objects.annotate(
- multiplied=ExpressionWrapper(
- expr * F("estimated_time"),
- output_field=DurationField(),
- ),
- divided=ExpressionWrapper(
- F("estimated_time") / expr,
- output_field=DurationField(),
- ),
- )
- for experiment in qs:
- self.assertEqual(
- experiment.multiplied,
- experiment.estimated_time * scalar,
- )
- self.assertEqual(
- experiment.divided,
- experiment.estimated_time / scalar,
- )
- def test_duration_expressions(self):
- for delta in self.deltas:
- qs = Experiment.objects.annotate(duration=F("estimated_time") + delta)
- for obj in qs:
- self.assertEqual(obj.duration, obj.estimated_time + delta)
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_date_subtraction(self):
- queryset = Experiment.objects.annotate(
- completion_duration=F("completed") - F("assigned"),
- )
- at_least_5_days = {
- e.name
- for e in queryset.filter(
- completion_duration__gte=datetime.timedelta(days=5)
- )
- }
- self.assertEqual(at_least_5_days, {"e3", "e4", "e5"})
- at_least_120_days = {
- e.name
- for e in queryset.filter(
- completion_duration__gte=datetime.timedelta(days=120)
- )
- }
- self.assertEqual(at_least_120_days, {"e5"})
- less_than_5_days = {
- e.name
- for e in queryset.filter(completion_duration__lt=datetime.timedelta(days=5))
- }
- self.assertEqual(less_than_5_days, {"e0", "e1", "e2"})
- queryset = Experiment.objects.annotate(
- difference=F("completed") - Value(None, output_field=DateField()),
- )
- self.assertIsNone(queryset.first().difference)
- queryset = Experiment.objects.annotate(
- shifted=ExpressionWrapper(
- F("completed") - Value(None, output_field=DurationField()),
- output_field=DateField(),
- )
- )
- self.assertIsNone(queryset.first().shifted)
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_date_subquery_subtraction(self):
- subquery = Experiment.objects.filter(pk=OuterRef("pk")).values("completed")
- queryset = Experiment.objects.annotate(
- difference=subquery - F("completed"),
- ).filter(difference=datetime.timedelta())
- self.assertTrue(queryset.exists())
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_date_case_subtraction(self):
- queryset = Experiment.objects.annotate(
- date_case=Case(
- When(Q(name="e0"), then=F("completed")),
- output_field=DateField(),
- ),
- completed_value=Value(
- self.e0.completed,
- output_field=DateField(),
- ),
- difference=F("date_case") - F("completed_value"),
- ).filter(difference=datetime.timedelta())
- self.assertEqual(queryset.get(), self.e0)
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_time_subtraction(self):
- Time.objects.create(time=datetime.time(12, 30, 15, 2345))
- queryset = Time.objects.annotate(
- difference=F("time") - Value(datetime.time(11, 15, 0)),
- )
- self.assertEqual(
- queryset.get().difference,
- datetime.timedelta(hours=1, minutes=15, seconds=15, microseconds=2345),
- )
- queryset = Time.objects.annotate(
- difference=F("time") - Value(None, output_field=TimeField()),
- )
- self.assertIsNone(queryset.first().difference)
- queryset = Time.objects.annotate(
- shifted=ExpressionWrapper(
- F("time") - Value(None, output_field=DurationField()),
- output_field=TimeField(),
- )
- )
- self.assertIsNone(queryset.first().shifted)
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_time_subquery_subtraction(self):
- Time.objects.create(time=datetime.time(12, 30, 15, 2345))
- subquery = Time.objects.filter(pk=OuterRef("pk")).values("time")
- queryset = Time.objects.annotate(
- difference=subquery - F("time"),
- ).filter(difference=datetime.timedelta())
- self.assertTrue(queryset.exists())
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_datetime_subtraction(self):
- under_estimate = [
- e.name
- for e in Experiment.objects.filter(estimated_time__gt=F("end") - F("start"))
- ]
- self.assertEqual(under_estimate, ["e2"])
- over_estimate = [
- e.name
- for e in Experiment.objects.filter(estimated_time__lt=F("end") - F("start"))
- ]
- self.assertEqual(over_estimate, ["e4"])
- queryset = Experiment.objects.annotate(
- difference=F("start") - Value(None, output_field=DateTimeField()),
- )
- self.assertIsNone(queryset.first().difference)
- queryset = Experiment.objects.annotate(
- shifted=ExpressionWrapper(
- F("start") - Value(None, output_field=DurationField()),
- output_field=DateTimeField(),
- )
- )
- self.assertIsNone(queryset.first().shifted)
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_datetime_subquery_subtraction(self):
- subquery = Experiment.objects.filter(pk=OuterRef("pk")).values("start")
- queryset = Experiment.objects.annotate(
- difference=subquery - F("start"),
- ).filter(difference=datetime.timedelta())
- self.assertTrue(queryset.exists())
- @skipUnlessDBFeature("supports_temporal_subtraction")
- def test_datetime_subtraction_microseconds(self):
- delta = datetime.timedelta(microseconds=8999999999999999)
- Experiment.objects.update(end=F("start") + delta)
- qs = Experiment.objects.annotate(delta=F("end") - F("start"))
- for e in qs:
- self.assertEqual(e.delta, delta)
- def test_duration_with_datetime(self):
- # Exclude e1 which has very high precision so we can test this on all
- # backends regardless of whether or not it supports
- # microsecond_precision.
- over_estimate = (
- Experiment.objects.exclude(name="e1")
- .filter(
- completed__gt=self.stime + F("estimated_time"),
- )
- .order_by("name")
- )
- self.assertQuerysetEqual(over_estimate, ["e3", "e4", "e5"], lambda e: e.name)
- def test_duration_with_datetime_microseconds(self):
- delta = datetime.timedelta(microseconds=8999999999999999)
- qs = Experiment.objects.annotate(
- dt=ExpressionWrapper(
- F("start") + delta,
- output_field=DateTimeField(),
- )
- )
- for e in qs:
- self.assertEqual(e.dt, e.start + delta)
- def test_date_minus_duration(self):
- more_than_4_days = Experiment.objects.filter(
- assigned__lt=F("completed") - Value(datetime.timedelta(days=4))
- )
- self.assertQuerysetEqual(more_than_4_days, ["e3", "e4", "e5"], lambda e: e.name)
- def test_negative_timedelta_update(self):
- # subtract 30 seconds, 30 minutes, 2 hours and 2 days
- experiments = (
- Experiment.objects.filter(name="e0")
- .annotate(
- start_sub_seconds=F("start") + datetime.timedelta(seconds=-30),
- )
- .annotate(
- start_sub_minutes=F("start_sub_seconds")
- + datetime.timedelta(minutes=-30),
- )
- .annotate(
- start_sub_hours=F("start_sub_minutes") + datetime.timedelta(hours=-2),
- )
- .annotate(
- new_start=F("start_sub_hours") + datetime.timedelta(days=-2),
- )
- )
- expected_start = datetime.datetime(2010, 6, 23, 9, 45, 0)
- # subtract 30 microseconds
- experiments = experiments.annotate(
- new_start=F("new_start") + datetime.timedelta(microseconds=-30)
- )
- expected_start += datetime.timedelta(microseconds=+746970)
- experiments.update(start=F("new_start"))
- e0 = Experiment.objects.get(name="e0")
- self.assertEqual(e0.start, expected_start)
- class ValueTests(TestCase):
- def test_update_TimeField_using_Value(self):
- Time.objects.create()
- Time.objects.update(time=Value(datetime.time(1), output_field=TimeField()))
- self.assertEqual(Time.objects.get().time, datetime.time(1))
- def test_update_UUIDField_using_Value(self):
- UUID.objects.create()
- UUID.objects.update(
- uuid=Value(
- uuid.UUID("12345678901234567890123456789012"), output_field=UUIDField()
- )
- )
- self.assertEqual(
- UUID.objects.get().uuid, uuid.UUID("12345678901234567890123456789012")
- )
- def test_deconstruct(self):
- value = Value("name")
- path, args, kwargs = value.deconstruct()
- self.assertEqual(path, "django.db.models.Value")
- self.assertEqual(args, (value.value,))
- self.assertEqual(kwargs, {})
- def test_deconstruct_output_field(self):
- value = Value("name", output_field=CharField())
- path, args, kwargs = value.deconstruct()
- self.assertEqual(path, "django.db.models.Value")
- self.assertEqual(args, (value.value,))
- self.assertEqual(len(kwargs), 1)
- self.assertEqual(
- kwargs["output_field"].deconstruct(), CharField().deconstruct()
- )
- def test_repr(self):
- tests = [
- (None, "Value(None)"),
- ("str", "Value('str')"),
- (True, "Value(True)"),
- (42, "Value(42)"),
- (
- datetime.datetime(2019, 5, 15),
- "Value(datetime.datetime(2019, 5, 15, 0, 0))",
- ),
- (Decimal("3.14"), "Value(Decimal('3.14'))"),
- ]
- for value, expected in tests:
- with self.subTest(value=value):
- self.assertEqual(repr(Value(value)), expected)
- def test_equal(self):
- value = Value("name")
- self.assertEqual(value, Value("name"))
- self.assertNotEqual(value, Value("username"))
- def test_hash(self):
- d = {Value("name"): "Bob"}
- self.assertIn(Value("name"), d)
- self.assertEqual(d[Value("name")], "Bob")
- def test_equal_output_field(self):
- value = Value("name", output_field=CharField())
- same_value = Value("name", output_field=CharField())
- other_value = Value("name", output_field=TimeField())
- no_output_field = Value("name")
- self.assertEqual(value, same_value)
- self.assertNotEqual(value, other_value)
- self.assertNotEqual(value, no_output_field)
- def test_raise_empty_expressionlist(self):
- msg = "ExpressionList requires at least one expression"
- with self.assertRaisesMessage(ValueError, msg):
- ExpressionList()
- def test_compile_unresolved(self):
- # This test might need to be revisited later on if #25425 is enforced.
- compiler = Time.objects.all().query.get_compiler(connection=connection)
- value = Value("foo")
- self.assertEqual(value.as_sql(compiler, connection), ("%s", ["foo"]))
- value = Value("foo", output_field=CharField())
- self.assertEqual(value.as_sql(compiler, connection), ("%s", ["foo"]))
- def test_output_field_decimalfield(self):
- Time.objects.create()
- time = Time.objects.annotate(one=Value(1, output_field=DecimalField())).first()
- self.assertEqual(time.one, 1)
- def test_resolve_output_field(self):
- value_types = [
- ("str", CharField),
- (True, BooleanField),
- (42, IntegerField),
- (3.14, FloatField),
- (datetime.date(2019, 5, 15), DateField),
- (datetime.datetime(2019, 5, 15), DateTimeField),
- (datetime.time(3, 16), TimeField),
- (datetime.timedelta(1), DurationField),
- (Decimal("3.14"), DecimalField),
- (b"", BinaryField),
- (uuid.uuid4(), UUIDField),
- ]
- for value, output_field_type in value_types:
- with self.subTest(type=type(value)):
- expr = Value(value)
- self.assertIsInstance(expr.output_field, output_field_type)
- def test_resolve_output_field_failure(self):
- msg = "Cannot resolve expression type, unknown output_field"
- with self.assertRaisesMessage(FieldError, msg):
- Value(object()).output_field
- def test_output_field_does_not_create_broken_validators(self):
- """
- The output field for a given Value doesn't get cleaned & validated,
- however validators may still be instantiated for a given field type
- and this demonstrates that they don't throw an exception.
- """
- value_types = [
- "str",
- True,
- 42,
- 3.14,
- datetime.date(2019, 5, 15),
- datetime.datetime(2019, 5, 15),
- datetime.time(3, 16),
- datetime.timedelta(1),
- Decimal("3.14"),
- b"",
- uuid.uuid4(),
- ]
- for value in value_types:
- with self.subTest(type=type(value)):
- field = Value(value)._resolve_output_field()
- field.clean(value, model_instance=None)
- class ExistsTests(TestCase):
- def test_optimizations(self):
- with CaptureQueriesContext(connection) as context:
- list(
- Experiment.objects.values(
- exists=Exists(
- Experiment.objects.order_by("pk"),
- )
- ).order_by()
- )
- captured_queries = context.captured_queries
- self.assertEqual(len(captured_queries), 1)
- captured_sql = captured_queries[0]["sql"]
- self.assertNotIn(
- connection.ops.quote_name(Experiment._meta.pk.column),
- captured_sql,
- )
- self.assertIn(
- connection.ops.limit_offset_sql(None, 1),
- captured_sql,
- )
- self.assertNotIn("ORDER BY", captured_sql)
- def test_negated_empty_exists(self):
- manager = Manager.objects.create()
- qs = Manager.objects.filter(~Exists(Manager.objects.none()) & Q(pk=manager.pk))
- self.assertSequenceEqual(qs, [manager])
- def test_select_negated_empty_exists(self):
- manager = Manager.objects.create()
- qs = Manager.objects.annotate(
- not_exists=~Exists(Manager.objects.none())
- ).filter(pk=manager.pk)
- self.assertSequenceEqual(qs, [manager])
- self.assertIs(qs.get().not_exists, True)
- class FieldTransformTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.sday = sday = datetime.date(2010, 6, 25)
- cls.stime = stime = datetime.datetime(2010, 6, 25, 12, 15, 30, 747000)
- cls.ex1 = Experiment.objects.create(
- name="Experiment 1",
- assigned=sday,
- completed=sday + datetime.timedelta(2),
- estimated_time=datetime.timedelta(2),
- start=stime,
- end=stime + datetime.timedelta(2),
- )
- def test_month_aggregation(self):
- self.assertEqual(
- Experiment.objects.aggregate(month_count=Count("assigned__month")),
- {"month_count": 1},
- )
- def test_transform_in_values(self):
- self.assertSequenceEqual(
- Experiment.objects.values("assigned__month"),
- [{"assigned__month": 6}],
- )
- def test_multiple_transforms_in_values(self):
- self.assertSequenceEqual(
- Experiment.objects.values("end__date__month"),
- [{"end__date__month": 6}],
- )
- class ReprTests(SimpleTestCase):
- def test_expressions(self):
- self.assertEqual(
- repr(Case(When(a=1))),
- "<Case: CASE WHEN <Q: (AND: ('a', 1))> THEN Value(None), ELSE Value(None)>",
- )
- self.assertEqual(
- repr(When(Q(age__gte=18), then=Value("legal"))),
- "<When: WHEN <Q: (AND: ('age__gte', 18))> THEN Value('legal')>",
- )
- self.assertEqual(repr(Col("alias", "field")), "Col(alias, field)")
- self.assertEqual(repr(F("published")), "F(published)")
- self.assertEqual(
- repr(F("cost") + F("tax")), "<CombinedExpression: F(cost) + F(tax)>"
- )
- self.assertEqual(
- repr(ExpressionWrapper(F("cost") + F("tax"), IntegerField())),
- "ExpressionWrapper(F(cost) + F(tax))",
- )
- self.assertEqual(
- repr(Func("published", function="TO_CHAR")),
- "Func(F(published), function=TO_CHAR)",
- )
- self.assertEqual(repr(OrderBy(Value(1))), "OrderBy(Value(1), descending=False)")
- self.assertEqual(repr(RawSQL("table.col", [])), "RawSQL(table.col, [])")
- self.assertEqual(
- repr(Ref("sum_cost", Sum("cost"))), "Ref(sum_cost, Sum(F(cost)))"
- )
- self.assertEqual(repr(Value(1)), "Value(1)")
- self.assertEqual(
- repr(ExpressionList(F("col"), F("anothercol"))),
- "ExpressionList(F(col), F(anothercol))",
- )
- self.assertEqual(
- repr(ExpressionList(OrderBy(F("col"), descending=False))),
- "ExpressionList(OrderBy(F(col), descending=False))",
- )
- def test_functions(self):
- self.assertEqual(repr(Coalesce("a", "b")), "Coalesce(F(a), F(b))")
- self.assertEqual(repr(Concat("a", "b")), "Concat(ConcatPair(F(a), F(b)))")
- self.assertEqual(repr(Length("a")), "Length(F(a))")
- self.assertEqual(repr(Lower("a")), "Lower(F(a))")
- self.assertEqual(repr(Substr("a", 1, 3)), "Substr(F(a), Value(1), Value(3))")
- self.assertEqual(repr(Upper("a")), "Upper(F(a))")
- def test_aggregates(self):
- self.assertEqual(repr(Avg("a")), "Avg(F(a))")
- self.assertEqual(repr(Count("a")), "Count(F(a))")
- self.assertEqual(repr(Count("*")), "Count('*')")
- self.assertEqual(repr(Max("a")), "Max(F(a))")
- self.assertEqual(repr(Min("a")), "Min(F(a))")
- self.assertEqual(repr(StdDev("a")), "StdDev(F(a), sample=False)")
- self.assertEqual(repr(Sum("a")), "Sum(F(a))")
- self.assertEqual(
- repr(Variance("a", sample=True)), "Variance(F(a), sample=True)"
- )
- def test_distinct_aggregates(self):
- self.assertEqual(repr(Count("a", distinct=True)), "Count(F(a), distinct=True)")
- self.assertEqual(repr(Count("*", distinct=True)), "Count('*', distinct=True)")
- def test_filtered_aggregates(self):
- filter = Q(a=1)
- self.assertEqual(
- repr(Avg("a", filter=filter)), "Avg(F(a), filter=(AND: ('a', 1)))"
- )
- self.assertEqual(
- repr(Count("a", filter=filter)), "Count(F(a), filter=(AND: ('a', 1)))"
- )
- self.assertEqual(
- repr(Max("a", filter=filter)), "Max(F(a), filter=(AND: ('a', 1)))"
- )
- self.assertEqual(
- repr(Min("a", filter=filter)), "Min(F(a), filter=(AND: ('a', 1)))"
- )
- self.assertEqual(
- repr(StdDev("a", filter=filter)),
- "StdDev(F(a), filter=(AND: ('a', 1)), sample=False)",
- )
- self.assertEqual(
- repr(Sum("a", filter=filter)), "Sum(F(a), filter=(AND: ('a', 1)))"
- )
- self.assertEqual(
- repr(Variance("a", sample=True, filter=filter)),
- "Variance(F(a), filter=(AND: ('a', 1)), sample=True)",
- )
- self.assertEqual(
- repr(Count("a", filter=filter, distinct=True)),
- "Count(F(a), distinct=True, filter=(AND: ('a', 1)))",
- )
- class CombinableTests(SimpleTestCase):
- bitwise_msg = (
- "Use .bitand(), .bitor(), and .bitxor() for bitwise logical operations."
- )
- def test_negation(self):
- c = Combinable()
- self.assertEqual(-c, c * -1)
- def test_and(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- Combinable() & Combinable()
- def test_or(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- Combinable() | Combinable()
- def test_xor(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- Combinable() ^ Combinable()
- def test_reversed_and(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- object() & Combinable()
- def test_reversed_or(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- object() | Combinable()
- def test_reversed_xor(self):
- with self.assertRaisesMessage(NotImplementedError, self.bitwise_msg):
- object() ^ Combinable()
- class CombinedExpressionTests(SimpleTestCase):
- def test_resolve_output_field_number(self):
- tests = [
- (IntegerField, AutoField, IntegerField),
- (AutoField, IntegerField, IntegerField),
- (IntegerField, DecimalField, DecimalField),
- (DecimalField, IntegerField, DecimalField),
- (IntegerField, FloatField, FloatField),
- (FloatField, IntegerField, FloatField),
- ]
- connectors = [Combinable.ADD, Combinable.SUB, Combinable.MUL, Combinable.DIV]
- for lhs, rhs, combined in tests:
- for connector in connectors:
- with self.subTest(
- lhs=lhs, connector=connector, rhs=rhs, combined=combined
- ):
- expr = CombinedExpression(
- Expression(lhs()),
- connector,
- Expression(rhs()),
- )
- self.assertIsInstance(expr.output_field, combined)
- def test_resolve_output_field_with_null(self):
- def null():
- return Value(None)
- tests = [
- # Numbers.
- (AutoField, Combinable.ADD, null),
- (DecimalField, Combinable.ADD, null),
- (FloatField, Combinable.ADD, null),
- (IntegerField, Combinable.ADD, null),
- (IntegerField, Combinable.SUB, null),
- (null, Combinable.ADD, IntegerField),
- # Dates.
- (DateField, Combinable.ADD, null),
- (DateTimeField, Combinable.ADD, null),
- (DurationField, Combinable.ADD, null),
- (TimeField, Combinable.ADD, null),
- (TimeField, Combinable.SUB, null),
- (null, Combinable.ADD, DateTimeField),
- (DateField, Combinable.SUB, null),
- ]
- for lhs, connector, rhs in tests:
- msg = (
- f"Cannot infer type of {connector!r} expression involving these types: "
- )
- with self.subTest(lhs=lhs, connector=connector, rhs=rhs):
- expr = CombinedExpression(
- Expression(lhs()),
- connector,
- Expression(rhs()),
- )
- with self.assertRaisesMessage(FieldError, msg):
- expr.output_field
- def test_resolve_output_field_dates(self):
- tests = [
- # Add - same type.
- (DateField, Combinable.ADD, DateField, FieldError),
- (DateTimeField, Combinable.ADD, DateTimeField, FieldError),
- (TimeField, Combinable.ADD, TimeField, FieldError),
- (DurationField, Combinable.ADD, DurationField, DurationField),
- # Add - different type.
- (DateField, Combinable.ADD, DurationField, DateTimeField),
- (DateTimeField, Combinable.ADD, DurationField, DateTimeField),
- (TimeField, Combinable.ADD, DurationField, TimeField),
- (DurationField, Combinable.ADD, DateField, DateTimeField),
- (DurationField, Combinable.ADD, DateTimeField, DateTimeField),
- (DurationField, Combinable.ADD, TimeField, TimeField),
- # Subtract - same type.
- (DateField, Combinable.SUB, DateField, DurationField),
- (DateTimeField, Combinable.SUB, DateTimeField, DurationField),
- (TimeField, Combinable.SUB, TimeField, DurationField),
- (DurationField, Combinable.SUB, DurationField, DurationField),
- # Subtract - different type.
- (DateField, Combinable.SUB, DurationField, DateTimeField),
- (DateTimeField, Combinable.SUB, DurationField, DateTimeField),
- (TimeField, Combinable.SUB, DurationField, TimeField),
- (DurationField, Combinable.SUB, DateField, FieldError),
- (DurationField, Combinable.SUB, DateTimeField, FieldError),
- (DurationField, Combinable.SUB, DateTimeField, FieldError),
- ]
- for lhs, connector, rhs, combined in tests:
- msg = (
- f"Cannot infer type of {connector!r} expression involving these types: "
- )
- with self.subTest(lhs=lhs, connector=connector, rhs=rhs, combined=combined):
- expr = CombinedExpression(
- Expression(lhs()),
- connector,
- Expression(rhs()),
- )
- if issubclass(combined, Exception):
- with self.assertRaisesMessage(combined, msg):
- expr.output_field
- else:
- self.assertIsInstance(expr.output_field, combined)
- def test_mixed_char_date_with_annotate(self):
- queryset = Experiment.objects.annotate(nonsense=F("name") + F("assigned"))
- msg = (
- "Cannot infer type of '+' expression involving these types: CharField, "
- "DateField. You must set output_field."
- )
- with self.assertRaisesMessage(FieldError, msg):
- list(queryset)
- class ExpressionWrapperTests(SimpleTestCase):
- def test_empty_group_by(self):
- expr = ExpressionWrapper(Value(3), output_field=IntegerField())
- self.assertEqual(expr.get_group_by_cols(alias=None), [])
- def test_non_empty_group_by(self):
- value = Value("f")
- value.output_field = None
- expr = ExpressionWrapper(Lower(value), output_field=IntegerField())
- group_by_cols = expr.get_group_by_cols(alias=None)
- self.assertEqual(group_by_cols, [expr.expression])
- self.assertEqual(group_by_cols[0].output_field, expr.output_field)
- class OrderByTests(SimpleTestCase):
- def test_equal(self):
- self.assertEqual(
- OrderBy(F("field"), nulls_last=True),
- OrderBy(F("field"), nulls_last=True),
- )
- self.assertNotEqual(
- OrderBy(F("field"), nulls_last=True),
- OrderBy(F("field")),
- )
- def test_hash(self):
- self.assertEqual(
- hash(OrderBy(F("field"), nulls_last=True)),
- hash(OrderBy(F("field"), nulls_last=True)),
- )
- self.assertNotEqual(
- hash(OrderBy(F("field"), nulls_last=True)),
- hash(OrderBy(F("field"))),
- )
- def test_nulls_false(self):
- # These tests will catch ValueError in Django 5.0 when passing False to
- # nulls_first and nulls_last becomes forbidden.
- # msg = "nulls_first and nulls_last values must be True or None."
- msg = (
- "Passing nulls_first=False or nulls_last=False is deprecated, use None "
- "instead."
- )
- with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
- OrderBy(F("field"), nulls_first=False)
- with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
- OrderBy(F("field"), nulls_last=False)
- with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
- F("field").asc(nulls_first=False)
- with self.assertRaisesMessage(RemovedInDjango50Warning, msg):
- F("field").desc(nulls_last=False)
|