test_aggregates.py 36 KB


  1. from django.db import transaction
  2. from django.db.models import (
  3. CharField,
  4. F,
  5. Func,
  6. IntegerField,
  7. JSONField,
  8. OuterRef,
  9. Q,
  10. Subquery,
  11. Value,
  12. Window,
  13. )
  14. from django.db.models.fields.json import KeyTextTransform, KeyTransform
  15. from django.db.models.functions import Cast, Concat, LPad, Substr
  16. from django.test.utils import Approximate
  17. from django.utils import timezone
  18. from . import PostgreSQLTestCase
  19. from .models import AggregateTestModel, HotelReservation, Room, StatTestModel
  20. try:
  21. from django.contrib.postgres.aggregates import (
  22. ArrayAgg,
  23. BitAnd,
  24. BitOr,
  25. BitXor,
  26. BoolAnd,
  27. BoolOr,
  28. Corr,
  29. CovarPop,
  30. JSONBAgg,
  31. RegrAvgX,
  32. RegrAvgY,
  33. RegrCount,
  34. RegrIntercept,
  35. RegrR2,
  36. RegrSlope,
  37. RegrSXX,
  38. RegrSXY,
  39. RegrSYY,
  40. StatAggregate,
  41. StringAgg,
  42. )
  43. from django.contrib.postgres.fields import ArrayField
  44. except ImportError:
  45. pass # psycopg2 is not installed
  46. class TestGeneralAggregate(PostgreSQLTestCase):
  47. @classmethod
  48. def setUpTestData(cls):
  49. cls.aggs = AggregateTestModel.objects.bulk_create(
  50. [
  51. AggregateTestModel(
  52. boolean_field=True,
  53. char_field="Foo1",
  54. text_field="Text1",
  55. integer_field=0,
  56. ),
  57. AggregateTestModel(
  58. boolean_field=False,
  59. char_field="Foo2",
  60. text_field="Text2",
  61. integer_field=1,
  62. json_field={"lang": "pl"},
  63. ),
  64. AggregateTestModel(
  65. boolean_field=False,
  66. char_field="Foo4",
  67. text_field="Text4",
  68. integer_field=2,
  69. json_field={"lang": "en"},
  70. ),
  71. AggregateTestModel(
  72. boolean_field=True,
  73. char_field="Foo3",
  74. text_field="Text3",
  75. integer_field=0,
  76. json_field={"breed": "collie"},
  77. ),
  78. ]
  79. )
  80. def test_empty_result_set(self):
  81. AggregateTestModel.objects.all().delete()
  82. tests = [
  83. ArrayAgg("char_field"),
  84. ArrayAgg("integer_field"),
  85. ArrayAgg("boolean_field"),
  86. BitAnd("integer_field"),
  87. BitOr("integer_field"),
  88. BoolAnd("boolean_field"),
  89. BoolOr("boolean_field"),
  90. JSONBAgg("integer_field"),
  91. StringAgg("char_field", delimiter=";"),
  92. BitXor("integer_field"),
  93. ]
  94. for aggregation in tests:
  95. with self.subTest(aggregation=aggregation):
  96. # Empty result with non-execution optimization.
  97. with self.assertNumQueries(0):
  98. values = AggregateTestModel.objects.none().aggregate(
  99. aggregation=aggregation,
  100. )
  101. self.assertEqual(values, {"aggregation": None})
  102. # Empty result when query must be executed.
  103. with self.assertNumQueries(1):
  104. values = AggregateTestModel.objects.aggregate(
  105. aggregation=aggregation,
  106. )
  107. self.assertEqual(values, {"aggregation": None})
  108. def test_default_argument(self):
  109. AggregateTestModel.objects.all().delete()
  110. tests = [
  111. (ArrayAgg("char_field", default=["<empty>"]), ["<empty>"]),
  112. (ArrayAgg("integer_field", default=[0]), [0]),
  113. (ArrayAgg("boolean_field", default=[False]), [False]),
  114. (BitAnd("integer_field", default=0), 0),
  115. (BitOr("integer_field", default=0), 0),
  116. (BoolAnd("boolean_field", default=False), False),
  117. (BoolOr("boolean_field", default=False), False),
  118. (JSONBAgg("integer_field", default=["<empty>"]), ["<empty>"]),
  119. (
  120. JSONBAgg("integer_field", default=Value(["<empty>"], JSONField())),
  121. ["<empty>"],
  122. ),
  123. (StringAgg("char_field", delimiter=";", default="<empty>"), "<empty>"),
  124. (
  125. StringAgg("char_field", delimiter=";", default=Value("<empty>")),
  126. "<empty>",
  127. ),
  128. (BitXor("integer_field", default=0), 0),
  129. ]
  130. for aggregation, expected_result in tests:
  131. with self.subTest(aggregation=aggregation):
  132. # Empty result with non-execution optimization.
  133. with self.assertNumQueries(0):
  134. values = AggregateTestModel.objects.none().aggregate(
  135. aggregation=aggregation,
  136. )
  137. self.assertEqual(values, {"aggregation": expected_result})
  138. # Empty result when query must be executed.
  139. with transaction.atomic(), self.assertNumQueries(1):
  140. values = AggregateTestModel.objects.aggregate(
  141. aggregation=aggregation,
  142. )
  143. self.assertEqual(values, {"aggregation": expected_result})
  144. def test_array_agg_charfield(self):
  145. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg("char_field"))
  146. self.assertEqual(values, {"arrayagg": ["Foo1", "Foo2", "Foo4", "Foo3"]})
  147. def test_array_agg_charfield_ordering(self):
  148. ordering_test_cases = (
  149. (F("char_field").desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  150. (F("char_field").asc(), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  151. (F("char_field"), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  152. (
  153. [F("boolean_field"), F("char_field").desc()],
  154. ["Foo4", "Foo2", "Foo3", "Foo1"],
  155. ),
  156. (
  157. (F("boolean_field"), F("char_field").desc()),
  158. ["Foo4", "Foo2", "Foo3", "Foo1"],
  159. ),
  160. ("char_field", ["Foo1", "Foo2", "Foo3", "Foo4"]),
  161. ("-char_field", ["Foo4", "Foo3", "Foo2", "Foo1"]),
  162. (Concat("char_field", Value("@")), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  163. (Concat("char_field", Value("@")).desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  164. (
  165. (
  166. Substr("char_field", 1, 1),
  167. F("integer_field"),
  168. Substr("char_field", 4, 1).desc(),
  169. ),
  170. ["Foo3", "Foo1", "Foo2", "Foo4"],
  171. ),
  172. )
  173. for ordering, expected_output in ordering_test_cases:
  174. with self.subTest(ordering=ordering, expected_output=expected_output):
  175. values = AggregateTestModel.objects.aggregate(
  176. arrayagg=ArrayAgg("char_field", ordering=ordering)
  177. )
  178. self.assertEqual(values, {"arrayagg": expected_output})
  179. def test_array_agg_integerfield(self):
  180. values = AggregateTestModel.objects.aggregate(
  181. arrayagg=ArrayAgg("integer_field")
  182. )
  183. self.assertEqual(values, {"arrayagg": [0, 1, 2, 0]})
  184. def test_array_agg_integerfield_ordering(self):
  185. values = AggregateTestModel.objects.aggregate(
  186. arrayagg=ArrayAgg("integer_field", ordering=F("integer_field").desc())
  187. )
  188. self.assertEqual(values, {"arrayagg": [2, 1, 0, 0]})
  189. def test_array_agg_booleanfield(self):
  190. values = AggregateTestModel.objects.aggregate(
  191. arrayagg=ArrayAgg("boolean_field")
  192. )
  193. self.assertEqual(values, {"arrayagg": [True, False, False, True]})
  194. def test_array_agg_booleanfield_ordering(self):
  195. ordering_test_cases = (
  196. (F("boolean_field").asc(), [False, False, True, True]),
  197. (F("boolean_field").desc(), [True, True, False, False]),
  198. (F("boolean_field"), [False, False, True, True]),
  199. )
  200. for ordering, expected_output in ordering_test_cases:
  201. with self.subTest(ordering=ordering, expected_output=expected_output):
  202. values = AggregateTestModel.objects.aggregate(
  203. arrayagg=ArrayAgg("boolean_field", ordering=ordering)
  204. )
  205. self.assertEqual(values, {"arrayagg": expected_output})
  206. def test_array_agg_jsonfield(self):
  207. values = AggregateTestModel.objects.aggregate(
  208. arrayagg=ArrayAgg(
  209. KeyTransform("lang", "json_field"),
  210. filter=Q(json_field__lang__isnull=False),
  211. ),
  212. )
  213. self.assertEqual(values, {"arrayagg": ["pl", "en"]})
  214. def test_array_agg_jsonfield_ordering(self):
  215. values = AggregateTestModel.objects.aggregate(
  216. arrayagg=ArrayAgg(
  217. KeyTransform("lang", "json_field"),
  218. filter=Q(json_field__lang__isnull=False),
  219. ordering=KeyTransform("lang", "json_field"),
  220. ),
  221. )
  222. self.assertEqual(values, {"arrayagg": ["en", "pl"]})
  223. def test_array_agg_filter_and_ordering_params(self):
  224. values = AggregateTestModel.objects.aggregate(
  225. arrayagg=ArrayAgg(
  226. "char_field",
  227. filter=Q(json_field__has_key="lang"),
  228. ordering=LPad(Cast("integer_field", CharField()), 2, Value("0")),
  229. )
  230. )
  231. self.assertEqual(values, {"arrayagg": ["Foo2", "Foo4"]})
  232. def test_array_agg_filter(self):
  233. values = AggregateTestModel.objects.aggregate(
  234. arrayagg=ArrayAgg("integer_field", filter=Q(integer_field__gt=0)),
  235. )
  236. self.assertEqual(values, {"arrayagg": [1, 2]})
  237. def test_array_agg_lookups(self):
  238. aggr1 = AggregateTestModel.objects.create()
  239. aggr2 = AggregateTestModel.objects.create()
  240. StatTestModel.objects.create(related_field=aggr1, int1=1, int2=0)
  241. StatTestModel.objects.create(related_field=aggr1, int1=2, int2=0)
  242. StatTestModel.objects.create(related_field=aggr2, int1=3, int2=0)
  243. StatTestModel.objects.create(related_field=aggr2, int1=4, int2=0)
  244. qs = (
  245. StatTestModel.objects.values("related_field")
  246. .annotate(array=ArrayAgg("int1"))
  247. .filter(array__overlap=[2])
  248. .values_list("array", flat=True)
  249. )
  250. self.assertCountEqual(qs.get(), [1, 2])
  251. def test_array_agg_filter_index(self):
  252. aggr1 = AggregateTestModel.objects.create(integer_field=1)
  253. aggr2 = AggregateTestModel.objects.create(integer_field=2)
  254. StatTestModel.objects.bulk_create(
  255. [
  256. StatTestModel(related_field=aggr1, int1=1, int2=0),
  257. StatTestModel(related_field=aggr1, int1=2, int2=1),
  258. StatTestModel(related_field=aggr2, int1=3, int2=0),
  259. StatTestModel(related_field=aggr2, int1=4, int2=1),
  260. ]
  261. )
  262. qs = (
  263. AggregateTestModel.objects.filter(pk__in=[aggr1.pk, aggr2.pk])
  264. .annotate(
  265. array=ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2=0))
  266. )
  267. .annotate(array_value=F("array__0"))
  268. .values_list("array_value", flat=True)
  269. )
  270. self.assertCountEqual(qs, [1, 3])
  271. def test_array_agg_filter_slice(self):
  272. aggr1 = AggregateTestModel.objects.create(integer_field=1)
  273. aggr2 = AggregateTestModel.objects.create(integer_field=2)
  274. StatTestModel.objects.bulk_create(
  275. [
  276. StatTestModel(related_field=aggr1, int1=1, int2=0),
  277. StatTestModel(related_field=aggr1, int1=2, int2=1),
  278. StatTestModel(related_field=aggr2, int1=3, int2=0),
  279. StatTestModel(related_field=aggr2, int1=4, int2=1),
  280. StatTestModel(related_field=aggr2, int1=5, int2=0),
  281. ]
  282. )
  283. qs = (
  284. AggregateTestModel.objects.filter(pk__in=[aggr1.pk, aggr2.pk])
  285. .annotate(
  286. array=ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2=0))
  287. )
  288. .annotate(array_value=F("array__1_2"))
  289. .values_list("array_value", flat=True)
  290. )
  291. self.assertCountEqual(qs, [[], [5]])
  292. def test_bit_and_general(self):
  293. values = AggregateTestModel.objects.filter(integer_field__in=[0, 1]).aggregate(
  294. bitand=BitAnd("integer_field")
  295. )
  296. self.assertEqual(values, {"bitand": 0})
  297. def test_bit_and_on_only_true_values(self):
  298. values = AggregateTestModel.objects.filter(integer_field=1).aggregate(
  299. bitand=BitAnd("integer_field")
  300. )
  301. self.assertEqual(values, {"bitand": 1})
  302. def test_bit_and_on_only_false_values(self):
  303. values = AggregateTestModel.objects.filter(integer_field=0).aggregate(
  304. bitand=BitAnd("integer_field")
  305. )
  306. self.assertEqual(values, {"bitand": 0})
  307. def test_bit_or_general(self):
  308. values = AggregateTestModel.objects.filter(integer_field__in=[0, 1]).aggregate(
  309. bitor=BitOr("integer_field")
  310. )
  311. self.assertEqual(values, {"bitor": 1})
  312. def test_bit_or_on_only_true_values(self):
  313. values = AggregateTestModel.objects.filter(integer_field=1).aggregate(
  314. bitor=BitOr("integer_field")
  315. )
  316. self.assertEqual(values, {"bitor": 1})
  317. def test_bit_or_on_only_false_values(self):
  318. values = AggregateTestModel.objects.filter(integer_field=0).aggregate(
  319. bitor=BitOr("integer_field")
  320. )
  321. self.assertEqual(values, {"bitor": 0})
  322. def test_bit_xor_general(self):
  323. AggregateTestModel.objects.create(integer_field=3)
  324. values = AggregateTestModel.objects.filter(
  325. integer_field__in=[1, 3],
  326. ).aggregate(bitxor=BitXor("integer_field"))
  327. self.assertEqual(values, {"bitxor": 2})
  328. def test_bit_xor_on_only_true_values(self):
  329. values = AggregateTestModel.objects.filter(
  330. integer_field=1,
  331. ).aggregate(bitxor=BitXor("integer_field"))
  332. self.assertEqual(values, {"bitxor": 1})
  333. def test_bit_xor_on_only_false_values(self):
  334. values = AggregateTestModel.objects.filter(
  335. integer_field=0,
  336. ).aggregate(bitxor=BitXor("integer_field"))
  337. self.assertEqual(values, {"bitxor": 0})
  338. def test_bool_and_general(self):
  339. values = AggregateTestModel.objects.aggregate(booland=BoolAnd("boolean_field"))
  340. self.assertEqual(values, {"booland": False})
  341. def test_bool_and_q_object(self):
  342. values = AggregateTestModel.objects.aggregate(
  343. booland=BoolAnd(Q(integer_field__gt=2)),
  344. )
  345. self.assertEqual(values, {"booland": False})
  346. def test_bool_or_general(self):
  347. values = AggregateTestModel.objects.aggregate(boolor=BoolOr("boolean_field"))
  348. self.assertEqual(values, {"boolor": True})
  349. def test_bool_or_q_object(self):
  350. values = AggregateTestModel.objects.aggregate(
  351. boolor=BoolOr(Q(integer_field__gt=2)),
  352. )
  353. self.assertEqual(values, {"boolor": False})
  354. def test_string_agg_requires_delimiter(self):
  355. with self.assertRaises(TypeError):
  356. AggregateTestModel.objects.aggregate(stringagg=StringAgg("char_field"))
  357. def test_string_agg_delimiter_escaping(self):
  358. values = AggregateTestModel.objects.aggregate(
  359. stringagg=StringAgg("char_field", delimiter="'")
  360. )
  361. self.assertEqual(values, {"stringagg": "Foo1'Foo2'Foo4'Foo3"})
  362. def test_string_agg_charfield(self):
  363. values = AggregateTestModel.objects.aggregate(
  364. stringagg=StringAgg("char_field", delimiter=";")
  365. )
  366. self.assertEqual(values, {"stringagg": "Foo1;Foo2;Foo4;Foo3"})
  367. def test_string_agg_default_output_field(self):
  368. values = AggregateTestModel.objects.aggregate(
  369. stringagg=StringAgg("text_field", delimiter=";"),
  370. )
  371. self.assertEqual(values, {"stringagg": "Text1;Text2;Text4;Text3"})
  372. def test_string_agg_charfield_ordering(self):
  373. ordering_test_cases = (
  374. (F("char_field").desc(), "Foo4;Foo3;Foo2;Foo1"),
  375. (F("char_field").asc(), "Foo1;Foo2;Foo3;Foo4"),
  376. (F("char_field"), "Foo1;Foo2;Foo3;Foo4"),
  377. ("char_field", "Foo1;Foo2;Foo3;Foo4"),
  378. ("-char_field", "Foo4;Foo3;Foo2;Foo1"),
  379. (Concat("char_field", Value("@")), "Foo1;Foo2;Foo3;Foo4"),
  380. (Concat("char_field", Value("@")).desc(), "Foo4;Foo3;Foo2;Foo1"),
  381. )
  382. for ordering, expected_output in ordering_test_cases:
  383. with self.subTest(ordering=ordering, expected_output=expected_output):
  384. values = AggregateTestModel.objects.aggregate(
  385. stringagg=StringAgg("char_field", delimiter=";", ordering=ordering)
  386. )
  387. self.assertEqual(values, {"stringagg": expected_output})
  388. def test_string_agg_jsonfield_ordering(self):
  389. values = AggregateTestModel.objects.aggregate(
  390. stringagg=StringAgg(
  391. KeyTextTransform("lang", "json_field"),
  392. delimiter=";",
  393. ordering=KeyTextTransform("lang", "json_field"),
  394. output_field=CharField(),
  395. ),
  396. )
  397. self.assertEqual(values, {"stringagg": "en;pl"})
  398. def test_string_agg_filter(self):
  399. values = AggregateTestModel.objects.aggregate(
  400. stringagg=StringAgg(
  401. "char_field",
  402. delimiter=";",
  403. filter=Q(char_field__endswith="3") | Q(char_field__endswith="1"),
  404. )
  405. )
  406. self.assertEqual(values, {"stringagg": "Foo1;Foo3"})
  407. def test_orderable_agg_alternative_fields(self):
  408. values = AggregateTestModel.objects.aggregate(
  409. arrayagg=ArrayAgg("integer_field", ordering=F("char_field").asc())
  410. )
  411. self.assertEqual(values, {"arrayagg": [0, 1, 0, 2]})
  412. def test_jsonb_agg(self):
  413. values = AggregateTestModel.objects.aggregate(jsonbagg=JSONBAgg("char_field"))
  414. self.assertEqual(values, {"jsonbagg": ["Foo1", "Foo2", "Foo4", "Foo3"]})
  415. def test_jsonb_agg_charfield_ordering(self):
  416. ordering_test_cases = (
  417. (F("char_field").desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  418. (F("char_field").asc(), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  419. (F("char_field"), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  420. ("char_field", ["Foo1", "Foo2", "Foo3", "Foo4"]),
  421. ("-char_field", ["Foo4", "Foo3", "Foo2", "Foo1"]),
  422. (Concat("char_field", Value("@")), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  423. (Concat("char_field", Value("@")).desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  424. )
  425. for ordering, expected_output in ordering_test_cases:
  426. with self.subTest(ordering=ordering, expected_output=expected_output):
  427. values = AggregateTestModel.objects.aggregate(
  428. jsonbagg=JSONBAgg("char_field", ordering=ordering),
  429. )
  430. self.assertEqual(values, {"jsonbagg": expected_output})
  431. def test_jsonb_agg_integerfield_ordering(self):
  432. values = AggregateTestModel.objects.aggregate(
  433. jsonbagg=JSONBAgg("integer_field", ordering=F("integer_field").desc()),
  434. )
  435. self.assertEqual(values, {"jsonbagg": [2, 1, 0, 0]})
  436. def test_jsonb_agg_booleanfield_ordering(self):
  437. ordering_test_cases = (
  438. (F("boolean_field").asc(), [False, False, True, True]),
  439. (F("boolean_field").desc(), [True, True, False, False]),
  440. (F("boolean_field"), [False, False, True, True]),
  441. )
  442. for ordering, expected_output in ordering_test_cases:
  443. with self.subTest(ordering=ordering, expected_output=expected_output):
  444. values = AggregateTestModel.objects.aggregate(
  445. jsonbagg=JSONBAgg("boolean_field", ordering=ordering),
  446. )
  447. self.assertEqual(values, {"jsonbagg": expected_output})
  448. def test_jsonb_agg_jsonfield_ordering(self):
  449. values = AggregateTestModel.objects.aggregate(
  450. jsonbagg=JSONBAgg(
  451. KeyTransform("lang", "json_field"),
  452. filter=Q(json_field__lang__isnull=False),
  453. ordering=KeyTransform("lang", "json_field"),
  454. ),
  455. )
  456. self.assertEqual(values, {"jsonbagg": ["en", "pl"]})
  457. def test_jsonb_agg_key_index_transforms(self):
  458. room101 = Room.objects.create(number=101)
  459. room102 = Room.objects.create(number=102)
  460. datetimes = [
  461. timezone.datetime(2018, 6, 20),
  462. timezone.datetime(2018, 6, 24),
  463. timezone.datetime(2018, 6, 28),
  464. ]
  465. HotelReservation.objects.create(
  466. datespan=(datetimes[0].date(), datetimes[1].date()),
  467. start=datetimes[0],
  468. end=datetimes[1],
  469. room=room102,
  470. requirements={"double_bed": True, "parking": True},
  471. )
  472. HotelReservation.objects.create(
  473. datespan=(datetimes[1].date(), datetimes[2].date()),
  474. start=datetimes[1],
  475. end=datetimes[2],
  476. room=room102,
  477. requirements={"double_bed": False, "sea_view": True, "parking": False},
  478. )
  479. HotelReservation.objects.create(
  480. datespan=(datetimes[0].date(), datetimes[2].date()),
  481. start=datetimes[0],
  482. end=datetimes[2],
  483. room=room101,
  484. requirements={"sea_view": False},
  485. )
  486. values = (
  487. Room.objects.annotate(
  488. requirements=JSONBAgg(
  489. "hotelreservation__requirements",
  490. ordering="-hotelreservation__start",
  491. )
  492. )
  493. .filter(requirements__0__sea_view=True)
  494. .values("number", "requirements")
  495. )
  496. self.assertSequenceEqual(
  497. values,
  498. [
  499. {
  500. "number": 102,
  501. "requirements": [
  502. {"double_bed": False, "sea_view": True, "parking": False},
  503. {"double_bed": True, "parking": True},
  504. ],
  505. },
  506. ],
  507. )
  508. def test_string_agg_array_agg_ordering_in_subquery(self):
  509. stats = []
  510. for i, agg in enumerate(AggregateTestModel.objects.order_by("char_field")):
  511. stats.append(StatTestModel(related_field=agg, int1=i, int2=i + 1))
  512. stats.append(StatTestModel(related_field=agg, int1=i + 1, int2=i))
  513. StatTestModel.objects.bulk_create(stats)
  514. for aggregate, expected_result in (
  515. (
  516. ArrayAgg("stattestmodel__int1", ordering="-stattestmodel__int2"),
  517. [
  518. ("Foo1", [0, 1]),
  519. ("Foo2", [1, 2]),
  520. ("Foo3", [2, 3]),
  521. ("Foo4", [3, 4]),
  522. ],
  523. ),
  524. (
  525. StringAgg(
  526. Cast("stattestmodel__int1", CharField()),
  527. delimiter=";",
  528. ordering="-stattestmodel__int2",
  529. ),
  530. [("Foo1", "0;1"), ("Foo2", "1;2"), ("Foo3", "2;3"), ("Foo4", "3;4")],
  531. ),
  532. ):
  533. with self.subTest(aggregate=aggregate.__class__.__name__):
  534. subquery = (
  535. AggregateTestModel.objects.filter(
  536. pk=OuterRef("pk"),
  537. )
  538. .annotate(agg=aggregate)
  539. .values("agg")
  540. )
  541. values = (
  542. AggregateTestModel.objects.annotate(
  543. agg=Subquery(subquery),
  544. )
  545. .order_by("char_field")
  546. .values_list("char_field", "agg")
  547. )
  548. self.assertEqual(list(values), expected_result)
  549. def test_string_agg_array_agg_filter_in_subquery(self):
  550. StatTestModel.objects.bulk_create(
  551. [
  552. StatTestModel(related_field=self.aggs[0], int1=0, int2=5),
  553. StatTestModel(related_field=self.aggs[0], int1=1, int2=4),
  554. StatTestModel(related_field=self.aggs[0], int1=2, int2=3),
  555. ]
  556. )
  557. for aggregate, expected_result in (
  558. (
  559. ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2__gt=3)),
  560. [("Foo1", [0, 1]), ("Foo2", None)],
  561. ),
  562. (
  563. StringAgg(
  564. Cast("stattestmodel__int2", CharField()),
  565. delimiter=";",
  566. filter=Q(stattestmodel__int1__lt=2),
  567. ),
  568. [("Foo1", "5;4"), ("Foo2", None)],
  569. ),
  570. ):
  571. with self.subTest(aggregate=aggregate.__class__.__name__):
  572. subquery = (
  573. AggregateTestModel.objects.filter(
  574. pk=OuterRef("pk"),
  575. )
  576. .annotate(agg=aggregate)
  577. .values("agg")
  578. )
  579. values = (
  580. AggregateTestModel.objects.annotate(
  581. agg=Subquery(subquery),
  582. )
  583. .filter(
  584. char_field__in=["Foo1", "Foo2"],
  585. )
  586. .order_by("char_field")
  587. .values_list("char_field", "agg")
  588. )
  589. self.assertEqual(list(values), expected_result)
  590. def test_string_agg_filter_in_subquery_with_exclude(self):
  591. subquery = (
  592. AggregateTestModel.objects.annotate(
  593. stringagg=StringAgg(
  594. "char_field",
  595. delimiter=";",
  596. filter=Q(char_field__endswith="1"),
  597. )
  598. )
  599. .exclude(stringagg="")
  600. .values("id")
  601. )
  602. self.assertSequenceEqual(
  603. AggregateTestModel.objects.filter(id__in=Subquery(subquery)),
  604. [self.aggs[0]],
  605. )
  606. def test_ordering_isnt_cleared_for_array_subquery(self):
  607. inner_qs = AggregateTestModel.objects.order_by("-integer_field")
  608. qs = AggregateTestModel.objects.annotate(
  609. integers=Func(
  610. Subquery(inner_qs.values("integer_field")),
  611. function="ARRAY",
  612. output_field=ArrayField(base_field=IntegerField()),
  613. ),
  614. )
  615. self.assertSequenceEqual(
  616. qs.first().integers,
  617. inner_qs.values_list("integer_field", flat=True),
  618. )
  619. def test_window(self):
  620. self.assertCountEqual(
  621. AggregateTestModel.objects.annotate(
  622. integers=Window(
  623. expression=ArrayAgg("char_field"),
  624. partition_by=F("integer_field"),
  625. )
  626. ).values("integers", "char_field"),
  627. [
  628. {"integers": ["Foo1", "Foo3"], "char_field": "Foo1"},
  629. {"integers": ["Foo1", "Foo3"], "char_field": "Foo3"},
  630. {"integers": ["Foo2"], "char_field": "Foo2"},
  631. {"integers": ["Foo4"], "char_field": "Foo4"},
  632. ],
  633. )
  634. def test_values_list(self):
  635. tests = [ArrayAgg("integer_field"), JSONBAgg("integer_field")]
  636. for aggregation in tests:
  637. with self.subTest(aggregation=aggregation):
  638. self.assertCountEqual(
  639. AggregateTestModel.objects.values_list(aggregation),
  640. [([0],), ([1],), ([2],), ([0],)],
  641. )
  642. class TestAggregateDistinct(PostgreSQLTestCase):
  643. @classmethod
  644. def setUpTestData(cls):
  645. AggregateTestModel.objects.create(char_field="Foo")
  646. AggregateTestModel.objects.create(char_field="Foo")
  647. AggregateTestModel.objects.create(char_field="Bar")
  648. def test_string_agg_distinct_false(self):
  649. values = AggregateTestModel.objects.aggregate(
  650. stringagg=StringAgg("char_field", delimiter=" ", distinct=False)
  651. )
  652. self.assertEqual(values["stringagg"].count("Foo"), 2)
  653. self.assertEqual(values["stringagg"].count("Bar"), 1)
  654. def test_string_agg_distinct_true(self):
  655. values = AggregateTestModel.objects.aggregate(
  656. stringagg=StringAgg("char_field", delimiter=" ", distinct=True)
  657. )
  658. self.assertEqual(values["stringagg"].count("Foo"), 1)
  659. self.assertEqual(values["stringagg"].count("Bar"), 1)
  660. def test_array_agg_distinct_false(self):
  661. values = AggregateTestModel.objects.aggregate(
  662. arrayagg=ArrayAgg("char_field", distinct=False)
  663. )
  664. self.assertEqual(sorted(values["arrayagg"]), ["Bar", "Foo", "Foo"])
  665. def test_array_agg_distinct_true(self):
  666. values = AggregateTestModel.objects.aggregate(
  667. arrayagg=ArrayAgg("char_field", distinct=True)
  668. )
  669. self.assertEqual(sorted(values["arrayagg"]), ["Bar", "Foo"])
  670. def test_jsonb_agg_distinct_false(self):
  671. values = AggregateTestModel.objects.aggregate(
  672. jsonbagg=JSONBAgg("char_field", distinct=False),
  673. )
  674. self.assertEqual(sorted(values["jsonbagg"]), ["Bar", "Foo", "Foo"])
  675. def test_jsonb_agg_distinct_true(self):
  676. values = AggregateTestModel.objects.aggregate(
  677. jsonbagg=JSONBAgg("char_field", distinct=True),
  678. )
  679. self.assertEqual(sorted(values["jsonbagg"]), ["Bar", "Foo"])
  680. class TestStatisticsAggregate(PostgreSQLTestCase):
  681. @classmethod
  682. def setUpTestData(cls):
  683. StatTestModel.objects.create(
  684. int1=1,
  685. int2=3,
  686. related_field=AggregateTestModel.objects.create(integer_field=0),
  687. )
  688. StatTestModel.objects.create(
  689. int1=2,
  690. int2=2,
  691. related_field=AggregateTestModel.objects.create(integer_field=1),
  692. )
  693. StatTestModel.objects.create(
  694. int1=3,
  695. int2=1,
  696. related_field=AggregateTestModel.objects.create(integer_field=2),
  697. )
  698. # Tests for base class (StatAggregate)
  699. def test_missing_arguments_raises_exception(self):
  700. with self.assertRaisesMessage(ValueError, "Both y and x must be provided."):
  701. StatAggregate(x=None, y=None)
  702. def test_correct_source_expressions(self):
  703. func = StatAggregate(x="test", y=13)
  704. self.assertIsInstance(func.source_expressions[0], Value)
  705. self.assertIsInstance(func.source_expressions[1], F)
  706. def test_alias_is_required(self):
  707. class SomeFunc(StatAggregate):
  708. function = "TEST"
  709. with self.assertRaisesMessage(TypeError, "Complex aggregates require an alias"):
  710. StatTestModel.objects.aggregate(SomeFunc(y="int2", x="int1"))
  711. # Test aggregates
  712. def test_empty_result_set(self):
  713. StatTestModel.objects.all().delete()
  714. tests = [
  715. (Corr(y="int2", x="int1"), None),
  716. (CovarPop(y="int2", x="int1"), None),
  717. (CovarPop(y="int2", x="int1", sample=True), None),
  718. (RegrAvgX(y="int2", x="int1"), None),
  719. (RegrAvgY(y="int2", x="int1"), None),
  720. (RegrCount(y="int2", x="int1"), 0),
  721. (RegrIntercept(y="int2", x="int1"), None),
  722. (RegrR2(y="int2", x="int1"), None),
  723. (RegrSlope(y="int2", x="int1"), None),
  724. (RegrSXX(y="int2", x="int1"), None),
  725. (RegrSXY(y="int2", x="int1"), None),
  726. (RegrSYY(y="int2", x="int1"), None),
  727. ]
  728. for aggregation, expected_result in tests:
  729. with self.subTest(aggregation=aggregation):
  730. # Empty result with non-execution optimization.
  731. with self.assertNumQueries(0):
  732. values = StatTestModel.objects.none().aggregate(
  733. aggregation=aggregation,
  734. )
  735. self.assertEqual(values, {"aggregation": expected_result})
  736. # Empty result when query must be executed.
  737. with self.assertNumQueries(1):
  738. values = StatTestModel.objects.aggregate(
  739. aggregation=aggregation,
  740. )
  741. self.assertEqual(values, {"aggregation": expected_result})
  742. def test_default_argument(self):
  743. StatTestModel.objects.all().delete()
  744. tests = [
  745. (Corr(y="int2", x="int1", default=0), 0),
  746. (CovarPop(y="int2", x="int1", default=0), 0),
  747. (CovarPop(y="int2", x="int1", sample=True, default=0), 0),
  748. (RegrAvgX(y="int2", x="int1", default=0), 0),
  749. (RegrAvgY(y="int2", x="int1", default=0), 0),
  750. # RegrCount() doesn't support the default argument.
  751. (RegrIntercept(y="int2", x="int1", default=0), 0),
  752. (RegrR2(y="int2", x="int1", default=0), 0),
  753. (RegrSlope(y="int2", x="int1", default=0), 0),
  754. (RegrSXX(y="int2", x="int1", default=0), 0),
  755. (RegrSXY(y="int2", x="int1", default=0), 0),
  756. (RegrSYY(y="int2", x="int1", default=0), 0),
  757. ]
  758. for aggregation, expected_result in tests:
  759. with self.subTest(aggregation=aggregation):
  760. # Empty result with non-execution optimization.
  761. with self.assertNumQueries(0):
  762. values = StatTestModel.objects.none().aggregate(
  763. aggregation=aggregation,
  764. )
  765. self.assertEqual(values, {"aggregation": expected_result})
  766. # Empty result when query must be executed.
  767. with self.assertNumQueries(1):
  768. values = StatTestModel.objects.aggregate(
  769. aggregation=aggregation,
  770. )
  771. self.assertEqual(values, {"aggregation": expected_result})
  772. def test_corr_general(self):
  773. values = StatTestModel.objects.aggregate(corr=Corr(y="int2", x="int1"))
  774. self.assertEqual(values, {"corr": -1.0})
  775. def test_covar_pop_general(self):
  776. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y="int2", x="int1"))
  777. self.assertEqual(values, {"covarpop": Approximate(-0.66, places=1)})
  778. def test_covar_pop_sample(self):
  779. values = StatTestModel.objects.aggregate(
  780. covarpop=CovarPop(y="int2", x="int1", sample=True)
  781. )
  782. self.assertEqual(values, {"covarpop": -1.0})
  783. def test_regr_avgx_general(self):
  784. values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y="int2", x="int1"))
  785. self.assertEqual(values, {"regravgx": 2.0})
  786. def test_regr_avgy_general(self):
  787. values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y="int2", x="int1"))
  788. self.assertEqual(values, {"regravgy": 2.0})
  789. def test_regr_count_general(self):
  790. values = StatTestModel.objects.aggregate(
  791. regrcount=RegrCount(y="int2", x="int1")
  792. )
  793. self.assertEqual(values, {"regrcount": 3})
  794. def test_regr_count_default(self):
  795. msg = "RegrCount does not allow default."
  796. with self.assertRaisesMessage(TypeError, msg):
  797. RegrCount(y="int2", x="int1", default=0)
  798. def test_regr_intercept_general(self):
  799. values = StatTestModel.objects.aggregate(
  800. regrintercept=RegrIntercept(y="int2", x="int1")
  801. )
  802. self.assertEqual(values, {"regrintercept": 4})
  803. def test_regr_r2_general(self):
  804. values = StatTestModel.objects.aggregate(regrr2=RegrR2(y="int2", x="int1"))
  805. self.assertEqual(values, {"regrr2": 1})
  806. def test_regr_slope_general(self):
  807. values = StatTestModel.objects.aggregate(
  808. regrslope=RegrSlope(y="int2", x="int1")
  809. )
  810. self.assertEqual(values, {"regrslope": -1})
  811. def test_regr_sxx_general(self):
  812. values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y="int2", x="int1"))
  813. self.assertEqual(values, {"regrsxx": 2.0})
  814. def test_regr_sxy_general(self):
  815. values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y="int2", x="int1"))
  816. self.assertEqual(values, {"regrsxy": -2.0})
  817. def test_regr_syy_general(self):
  818. values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y="int2", x="int1"))
  819. self.assertEqual(values, {"regrsyy": 2.0})
  820. def test_regr_avgx_with_related_obj_and_number_as_argument(self):
  821. """
  822. This is more complex test to check if JOIN on field and
  823. number as argument works as expected.
  824. """
  825. values = StatTestModel.objects.aggregate(
  826. complex_regravgx=RegrAvgX(y=5, x="related_field__integer_field")
  827. )
  828. self.assertEqual(values, {"complex_regravgx": 1.0})