test_aggregates.py 37 KB


  1. from django.db import transaction
  2. from django.db.models import (
  3. CharField,
  4. F,
  5. Func,
  6. IntegerField,
  7. JSONField,
  8. OuterRef,
  9. Q,
  10. Subquery,
  11. Value,
  12. Window,
  13. )
  14. from django.db.models.fields.json import KeyTextTransform, KeyTransform
  15. from django.db.models.functions import Cast, Concat, LPad, Substr
  16. from django.test.utils import Approximate
  17. from django.utils import timezone
  18. from django.utils.deprecation import RemovedInDjango61Warning
  19. from . import PostgreSQLTestCase
  20. from .models import AggregateTestModel, HotelReservation, Room, StatTestModel
  21. try:
  22. from django.contrib.postgres.aggregates import (
  23. ArrayAgg,
  24. BitAnd,
  25. BitOr,
  26. BitXor,
  27. BoolAnd,
  28. BoolOr,
  29. Corr,
  30. CovarPop,
  31. JSONBAgg,
  32. RegrAvgX,
  33. RegrAvgY,
  34. RegrCount,
  35. RegrIntercept,
  36. RegrR2,
  37. RegrSlope,
  38. RegrSXX,
  39. RegrSXY,
  40. RegrSYY,
  41. StatAggregate,
  42. StringAgg,
  43. )
  44. from django.contrib.postgres.fields import ArrayField
  45. except ImportError:
  46. pass # psycopg2 is not installed
  47. class TestGeneralAggregate(PostgreSQLTestCase):
  48. @classmethod
  49. def setUpTestData(cls):
  50. cls.aggs = AggregateTestModel.objects.bulk_create(
  51. [
  52. AggregateTestModel(
  53. boolean_field=True,
  54. char_field="Foo1",
  55. text_field="Text1",
  56. integer_field=0,
  57. ),
  58. AggregateTestModel(
  59. boolean_field=False,
  60. char_field="Foo2",
  61. text_field="Text2",
  62. integer_field=1,
  63. json_field={"lang": "pl"},
  64. ),
  65. AggregateTestModel(
  66. boolean_field=False,
  67. char_field="Foo4",
  68. text_field="Text4",
  69. integer_field=2,
  70. json_field={"lang": "en"},
  71. ),
  72. AggregateTestModel(
  73. boolean_field=True,
  74. char_field="Foo3",
  75. text_field="Text3",
  76. integer_field=0,
  77. json_field={"breed": "collie"},
  78. ),
  79. ]
  80. )
  81. def test_empty_result_set(self):
  82. AggregateTestModel.objects.all().delete()
  83. tests = [
  84. ArrayAgg("char_field"),
  85. ArrayAgg("integer_field"),
  86. ArrayAgg("boolean_field"),
  87. BitAnd("integer_field"),
  88. BitOr("integer_field"),
  89. BoolAnd("boolean_field"),
  90. BoolOr("boolean_field"),
  91. JSONBAgg("integer_field"),
  92. StringAgg("char_field", delimiter=";"),
  93. BitXor("integer_field"),
  94. ]
  95. for aggregation in tests:
  96. with self.subTest(aggregation=aggregation):
  97. # Empty result with non-execution optimization.
  98. with self.assertNumQueries(0):
  99. values = AggregateTestModel.objects.none().aggregate(
  100. aggregation=aggregation,
  101. )
  102. self.assertEqual(values, {"aggregation": None})
  103. # Empty result when query must be executed.
  104. with self.assertNumQueries(1):
  105. values = AggregateTestModel.objects.aggregate(
  106. aggregation=aggregation,
  107. )
  108. self.assertEqual(values, {"aggregation": None})
  109. def test_default_argument(self):
  110. AggregateTestModel.objects.all().delete()
  111. tests = [
  112. (ArrayAgg("char_field", default=["<empty>"]), ["<empty>"]),
  113. (ArrayAgg("integer_field", default=[0]), [0]),
  114. (ArrayAgg("boolean_field", default=[False]), [False]),
  115. (BitAnd("integer_field", default=0), 0),
  116. (BitOr("integer_field", default=0), 0),
  117. (BoolAnd("boolean_field", default=False), False),
  118. (BoolOr("boolean_field", default=False), False),
  119. (JSONBAgg("integer_field", default=["<empty>"]), ["<empty>"]),
  120. (
  121. JSONBAgg("integer_field", default=Value(["<empty>"], JSONField())),
  122. ["<empty>"],
  123. ),
  124. (StringAgg("char_field", delimiter=";", default="<empty>"), "<empty>"),
  125. (
  126. StringAgg("char_field", delimiter=";", default=Value("<empty>")),
  127. "<empty>",
  128. ),
  129. (BitXor("integer_field", default=0), 0),
  130. ]
  131. for aggregation, expected_result in tests:
  132. with self.subTest(aggregation=aggregation):
  133. # Empty result with non-execution optimization.
  134. with self.assertNumQueries(0):
  135. values = AggregateTestModel.objects.none().aggregate(
  136. aggregation=aggregation,
  137. )
  138. self.assertEqual(values, {"aggregation": expected_result})
  139. # Empty result when query must be executed.
  140. with transaction.atomic(), self.assertNumQueries(1):
  141. values = AggregateTestModel.objects.aggregate(
  142. aggregation=aggregation,
  143. )
  144. self.assertEqual(values, {"aggregation": expected_result})
  145. def test_ordering_warns_of_deprecation(self):
  146. msg = "The ordering argument is deprecated. Use order_by instead."
  147. with self.assertWarnsMessage(RemovedInDjango61Warning, msg) as ctx:
  148. values = AggregateTestModel.objects.aggregate(
  149. arrayagg=ArrayAgg("integer_field", ordering=F("integer_field").desc())
  150. )
  151. self.assertEqual(values, {"arrayagg": [2, 1, 0, 0]})
  152. self.assertEqual(ctx.filename, __file__)
  153. def test_ordering_and_order_by_causes_error(self):
  154. with self.assertWarns(RemovedInDjango61Warning):
  155. with self.assertRaisesMessage(
  156. TypeError,
  157. "Cannot specify both order_by and ordering.",
  158. ):
  159. AggregateTestModel.objects.aggregate(
  160. stringagg=StringAgg(
  161. "char_field",
  162. delimiter=Value("'"),
  163. order_by="char_field",
  164. ordering="char_field",
  165. )
  166. )
  167. def test_array_agg_charfield(self):
  168. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg("char_field"))
  169. self.assertEqual(values, {"arrayagg": ["Foo1", "Foo2", "Foo4", "Foo3"]})
  170. def test_array_agg_charfield_order_by(self):
  171. order_by_test_cases = (
  172. (F("char_field").desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  173. (F("char_field").asc(), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  174. (F("char_field"), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  175. (
  176. [F("boolean_field"), F("char_field").desc()],
  177. ["Foo4", "Foo2", "Foo3", "Foo1"],
  178. ),
  179. (
  180. (F("boolean_field"), F("char_field").desc()),
  181. ["Foo4", "Foo2", "Foo3", "Foo1"],
  182. ),
  183. ("char_field", ["Foo1", "Foo2", "Foo3", "Foo4"]),
  184. ("-char_field", ["Foo4", "Foo3", "Foo2", "Foo1"]),
  185. (Concat("char_field", Value("@")), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  186. (Concat("char_field", Value("@")).desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  187. (
  188. (
  189. Substr("char_field", 1, 1),
  190. F("integer_field"),
  191. Substr("char_field", 4, 1).desc(),
  192. ),
  193. ["Foo3", "Foo1", "Foo2", "Foo4"],
  194. ),
  195. )
  196. for order_by, expected_output in order_by_test_cases:
  197. with self.subTest(order_by=order_by, expected_output=expected_output):
  198. values = AggregateTestModel.objects.aggregate(
  199. arrayagg=ArrayAgg("char_field", order_by=order_by)
  200. )
  201. self.assertEqual(values, {"arrayagg": expected_output})
  202. def test_array_agg_integerfield(self):
  203. values = AggregateTestModel.objects.aggregate(
  204. arrayagg=ArrayAgg("integer_field")
  205. )
  206. self.assertEqual(values, {"arrayagg": [0, 1, 2, 0]})
  207. def test_array_agg_integerfield_order_by(self):
  208. values = AggregateTestModel.objects.aggregate(
  209. arrayagg=ArrayAgg("integer_field", order_by=F("integer_field").desc())
  210. )
  211. self.assertEqual(values, {"arrayagg": [2, 1, 0, 0]})
  212. def test_array_agg_booleanfield(self):
  213. values = AggregateTestModel.objects.aggregate(
  214. arrayagg=ArrayAgg("boolean_field")
  215. )
  216. self.assertEqual(values, {"arrayagg": [True, False, False, True]})
  217. def test_array_agg_booleanfield_order_by(self):
  218. order_by_test_cases = (
  219. (F("boolean_field").asc(), [False, False, True, True]),
  220. (F("boolean_field").desc(), [True, True, False, False]),
  221. (F("boolean_field"), [False, False, True, True]),
  222. )
  223. for order_by, expected_output in order_by_test_cases:
  224. with self.subTest(order_by=order_by, expected_output=expected_output):
  225. values = AggregateTestModel.objects.aggregate(
  226. arrayagg=ArrayAgg("boolean_field", order_by=order_by)
  227. )
  228. self.assertEqual(values, {"arrayagg": expected_output})
  229. def test_array_agg_jsonfield(self):
  230. values = AggregateTestModel.objects.aggregate(
  231. arrayagg=ArrayAgg(
  232. KeyTransform("lang", "json_field"),
  233. filter=Q(json_field__lang__isnull=False),
  234. ),
  235. )
  236. self.assertEqual(values, {"arrayagg": ["pl", "en"]})
  237. def test_array_agg_jsonfield_order_by(self):
  238. values = AggregateTestModel.objects.aggregate(
  239. arrayagg=ArrayAgg(
  240. KeyTransform("lang", "json_field"),
  241. filter=Q(json_field__lang__isnull=False),
  242. order_by=KeyTransform("lang", "json_field"),
  243. ),
  244. )
  245. self.assertEqual(values, {"arrayagg": ["en", "pl"]})
  246. def test_array_agg_filter_and_order_by_params(self):
  247. values = AggregateTestModel.objects.aggregate(
  248. arrayagg=ArrayAgg(
  249. "char_field",
  250. filter=Q(json_field__has_key="lang"),
  251. order_by=LPad(Cast("integer_field", CharField()), 2, Value("0")),
  252. )
  253. )
  254. self.assertEqual(values, {"arrayagg": ["Foo2", "Foo4"]})
  255. def test_array_agg_filter(self):
  256. values = AggregateTestModel.objects.aggregate(
  257. arrayagg=ArrayAgg("integer_field", filter=Q(integer_field__gt=0)),
  258. )
  259. self.assertEqual(values, {"arrayagg": [1, 2]})
  260. def test_array_agg_lookups(self):
  261. aggr1 = AggregateTestModel.objects.create()
  262. aggr2 = AggregateTestModel.objects.create()
  263. StatTestModel.objects.create(related_field=aggr1, int1=1, int2=0)
  264. StatTestModel.objects.create(related_field=aggr1, int1=2, int2=0)
  265. StatTestModel.objects.create(related_field=aggr2, int1=3, int2=0)
  266. StatTestModel.objects.create(related_field=aggr2, int1=4, int2=0)
  267. qs = (
  268. StatTestModel.objects.values("related_field")
  269. .annotate(array=ArrayAgg("int1"))
  270. .filter(array__overlap=[2])
  271. .values_list("array", flat=True)
  272. )
  273. self.assertCountEqual(qs.get(), [1, 2])
  274. def test_array_agg_filter_index(self):
  275. aggr1 = AggregateTestModel.objects.create(integer_field=1)
  276. aggr2 = AggregateTestModel.objects.create(integer_field=2)
  277. StatTestModel.objects.bulk_create(
  278. [
  279. StatTestModel(related_field=aggr1, int1=1, int2=0),
  280. StatTestModel(related_field=aggr1, int1=2, int2=1),
  281. StatTestModel(related_field=aggr2, int1=3, int2=0),
  282. StatTestModel(related_field=aggr2, int1=4, int2=1),
  283. ]
  284. )
  285. qs = (
  286. AggregateTestModel.objects.filter(pk__in=[aggr1.pk, aggr2.pk])
  287. .annotate(
  288. array=ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2=0))
  289. )
  290. .annotate(array_value=F("array__0"))
  291. .values_list("array_value", flat=True)
  292. )
  293. self.assertCountEqual(qs, [1, 3])
  294. def test_array_agg_filter_slice(self):
  295. aggr1 = AggregateTestModel.objects.create(integer_field=1)
  296. aggr2 = AggregateTestModel.objects.create(integer_field=2)
  297. StatTestModel.objects.bulk_create(
  298. [
  299. StatTestModel(related_field=aggr1, int1=1, int2=0),
  300. StatTestModel(related_field=aggr1, int1=2, int2=1),
  301. StatTestModel(related_field=aggr2, int1=3, int2=0),
  302. StatTestModel(related_field=aggr2, int1=4, int2=1),
  303. StatTestModel(related_field=aggr2, int1=5, int2=0),
  304. ]
  305. )
  306. qs = (
  307. AggregateTestModel.objects.filter(pk__in=[aggr1.pk, aggr2.pk])
  308. .annotate(
  309. array=ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2=0))
  310. )
  311. .annotate(array_value=F("array__1_2"))
  312. .values_list("array_value", flat=True)
  313. )
  314. self.assertCountEqual(qs, [[], [5]])
  315. def test_bit_and_general(self):
  316. values = AggregateTestModel.objects.filter(integer_field__in=[0, 1]).aggregate(
  317. bitand=BitAnd("integer_field")
  318. )
  319. self.assertEqual(values, {"bitand": 0})
  320. def test_bit_and_on_only_true_values(self):
  321. values = AggregateTestModel.objects.filter(integer_field=1).aggregate(
  322. bitand=BitAnd("integer_field")
  323. )
  324. self.assertEqual(values, {"bitand": 1})
  325. def test_bit_and_on_only_false_values(self):
  326. values = AggregateTestModel.objects.filter(integer_field=0).aggregate(
  327. bitand=BitAnd("integer_field")
  328. )
  329. self.assertEqual(values, {"bitand": 0})
  330. def test_bit_or_general(self):
  331. values = AggregateTestModel.objects.filter(integer_field__in=[0, 1]).aggregate(
  332. bitor=BitOr("integer_field")
  333. )
  334. self.assertEqual(values, {"bitor": 1})
  335. def test_bit_or_on_only_true_values(self):
  336. values = AggregateTestModel.objects.filter(integer_field=1).aggregate(
  337. bitor=BitOr("integer_field")
  338. )
  339. self.assertEqual(values, {"bitor": 1})
  340. def test_bit_or_on_only_false_values(self):
  341. values = AggregateTestModel.objects.filter(integer_field=0).aggregate(
  342. bitor=BitOr("integer_field")
  343. )
  344. self.assertEqual(values, {"bitor": 0})
  345. def test_bit_xor_general(self):
  346. AggregateTestModel.objects.create(integer_field=3)
  347. values = AggregateTestModel.objects.filter(
  348. integer_field__in=[1, 3],
  349. ).aggregate(bitxor=BitXor("integer_field"))
  350. self.assertEqual(values, {"bitxor": 2})
  351. def test_bit_xor_on_only_true_values(self):
  352. values = AggregateTestModel.objects.filter(
  353. integer_field=1,
  354. ).aggregate(bitxor=BitXor("integer_field"))
  355. self.assertEqual(values, {"bitxor": 1})
  356. def test_bit_xor_on_only_false_values(self):
  357. values = AggregateTestModel.objects.filter(
  358. integer_field=0,
  359. ).aggregate(bitxor=BitXor("integer_field"))
  360. self.assertEqual(values, {"bitxor": 0})
  361. def test_bool_and_general(self):
  362. values = AggregateTestModel.objects.aggregate(booland=BoolAnd("boolean_field"))
  363. self.assertEqual(values, {"booland": False})
  364. def test_bool_and_q_object(self):
  365. values = AggregateTestModel.objects.aggregate(
  366. booland=BoolAnd(Q(integer_field__gt=2)),
  367. )
  368. self.assertEqual(values, {"booland": False})
  369. def test_bool_or_general(self):
  370. values = AggregateTestModel.objects.aggregate(boolor=BoolOr("boolean_field"))
  371. self.assertEqual(values, {"boolor": True})
  372. def test_bool_or_q_object(self):
  373. values = AggregateTestModel.objects.aggregate(
  374. boolor=BoolOr(Q(integer_field__gt=2)),
  375. )
  376. self.assertEqual(values, {"boolor": False})
  377. def test_string_agg_requires_delimiter(self):
  378. with self.assertRaises(TypeError):
  379. AggregateTestModel.objects.aggregate(stringagg=StringAgg("char_field"))
  380. def test_string_agg_delimiter_escaping(self):
  381. values = AggregateTestModel.objects.aggregate(
  382. stringagg=StringAgg("char_field", delimiter="'")
  383. )
  384. self.assertEqual(values, {"stringagg": "Foo1'Foo2'Foo4'Foo3"})
  385. def test_string_agg_charfield(self):
  386. values = AggregateTestModel.objects.aggregate(
  387. stringagg=StringAgg("char_field", delimiter=";")
  388. )
  389. self.assertEqual(values, {"stringagg": "Foo1;Foo2;Foo4;Foo3"})
  390. def test_string_agg_default_output_field(self):
  391. values = AggregateTestModel.objects.aggregate(
  392. stringagg=StringAgg("text_field", delimiter=";"),
  393. )
  394. self.assertEqual(values, {"stringagg": "Text1;Text2;Text4;Text3"})
  395. def test_string_agg_charfield_order_by(self):
  396. order_by_test_cases = (
  397. (F("char_field").desc(), "Foo4;Foo3;Foo2;Foo1"),
  398. (F("char_field").asc(), "Foo1;Foo2;Foo3;Foo4"),
  399. (F("char_field"), "Foo1;Foo2;Foo3;Foo4"),
  400. ("char_field", "Foo1;Foo2;Foo3;Foo4"),
  401. ("-char_field", "Foo4;Foo3;Foo2;Foo1"),
  402. (Concat("char_field", Value("@")), "Foo1;Foo2;Foo3;Foo4"),
  403. (Concat("char_field", Value("@")).desc(), "Foo4;Foo3;Foo2;Foo1"),
  404. )
  405. for order_by, expected_output in order_by_test_cases:
  406. with self.subTest(order_by=order_by, expected_output=expected_output):
  407. values = AggregateTestModel.objects.aggregate(
  408. stringagg=StringAgg("char_field", delimiter=";", order_by=order_by)
  409. )
  410. self.assertEqual(values, {"stringagg": expected_output})
  411. def test_string_agg_jsonfield_order_by(self):
  412. values = AggregateTestModel.objects.aggregate(
  413. stringagg=StringAgg(
  414. KeyTextTransform("lang", "json_field"),
  415. delimiter=";",
  416. order_by=KeyTextTransform("lang", "json_field"),
  417. output_field=CharField(),
  418. ),
  419. )
  420. self.assertEqual(values, {"stringagg": "en;pl"})
  421. def test_string_agg_filter(self):
  422. values = AggregateTestModel.objects.aggregate(
  423. stringagg=StringAgg(
  424. "char_field",
  425. delimiter=";",
  426. filter=Q(char_field__endswith="3") | Q(char_field__endswith="1"),
  427. )
  428. )
  429. self.assertEqual(values, {"stringagg": "Foo1;Foo3"})
  430. def test_orderable_agg_alternative_fields(self):
  431. values = AggregateTestModel.objects.aggregate(
  432. arrayagg=ArrayAgg("integer_field", order_by=F("char_field").asc())
  433. )
  434. self.assertEqual(values, {"arrayagg": [0, 1, 0, 2]})
  435. def test_jsonb_agg(self):
  436. values = AggregateTestModel.objects.aggregate(jsonbagg=JSONBAgg("char_field"))
  437. self.assertEqual(values, {"jsonbagg": ["Foo1", "Foo2", "Foo4", "Foo3"]})
  438. def test_jsonb_agg_charfield_order_by(self):
  439. order_by_test_cases = (
  440. (F("char_field").desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  441. (F("char_field").asc(), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  442. (F("char_field"), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  443. ("char_field", ["Foo1", "Foo2", "Foo3", "Foo4"]),
  444. ("-char_field", ["Foo4", "Foo3", "Foo2", "Foo1"]),
  445. (Concat("char_field", Value("@")), ["Foo1", "Foo2", "Foo3", "Foo4"]),
  446. (Concat("char_field", Value("@")).desc(), ["Foo4", "Foo3", "Foo2", "Foo1"]),
  447. )
  448. for order_by, expected_output in order_by_test_cases:
  449. with self.subTest(order_by=order_by, expected_output=expected_output):
  450. values = AggregateTestModel.objects.aggregate(
  451. jsonbagg=JSONBAgg("char_field", order_by=order_by),
  452. )
  453. self.assertEqual(values, {"jsonbagg": expected_output})
  454. def test_jsonb_agg_integerfield_order_by(self):
  455. values = AggregateTestModel.objects.aggregate(
  456. jsonbagg=JSONBAgg("integer_field", order_by=F("integer_field").desc()),
  457. )
  458. self.assertEqual(values, {"jsonbagg": [2, 1, 0, 0]})
  459. def test_jsonb_agg_booleanfield_order_by(self):
  460. order_by_test_cases = (
  461. (F("boolean_field").asc(), [False, False, True, True]),
  462. (F("boolean_field").desc(), [True, True, False, False]),
  463. (F("boolean_field"), [False, False, True, True]),
  464. )
  465. for order_by, expected_output in order_by_test_cases:
  466. with self.subTest(order_by=order_by, expected_output=expected_output):
  467. values = AggregateTestModel.objects.aggregate(
  468. jsonbagg=JSONBAgg("boolean_field", order_by=order_by),
  469. )
  470. self.assertEqual(values, {"jsonbagg": expected_output})
  471. def test_jsonb_agg_jsonfield_order_by(self):
  472. values = AggregateTestModel.objects.aggregate(
  473. jsonbagg=JSONBAgg(
  474. KeyTransform("lang", "json_field"),
  475. filter=Q(json_field__lang__isnull=False),
  476. order_by=KeyTransform("lang", "json_field"),
  477. ),
  478. )
  479. self.assertEqual(values, {"jsonbagg": ["en", "pl"]})
  480. def test_jsonb_agg_key_index_transforms(self):
  481. room101 = Room.objects.create(number=101)
  482. room102 = Room.objects.create(number=102)
  483. datetimes = [
  484. timezone.datetime(2018, 6, 20),
  485. timezone.datetime(2018, 6, 24),
  486. timezone.datetime(2018, 6, 28),
  487. ]
  488. HotelReservation.objects.create(
  489. datespan=(datetimes[0].date(), datetimes[1].date()),
  490. start=datetimes[0],
  491. end=datetimes[1],
  492. room=room102,
  493. requirements={"double_bed": True, "parking": True},
  494. )
  495. HotelReservation.objects.create(
  496. datespan=(datetimes[1].date(), datetimes[2].date()),
  497. start=datetimes[1],
  498. end=datetimes[2],
  499. room=room102,
  500. requirements={"double_bed": False, "sea_view": True, "parking": False},
  501. )
  502. HotelReservation.objects.create(
  503. datespan=(datetimes[0].date(), datetimes[2].date()),
  504. start=datetimes[0],
  505. end=datetimes[2],
  506. room=room101,
  507. requirements={"sea_view": False},
  508. )
  509. values = (
  510. Room.objects.annotate(
  511. requirements=JSONBAgg(
  512. "hotelreservation__requirements",
  513. order_by="-hotelreservation__start",
  514. )
  515. )
  516. .filter(requirements__0__sea_view=True)
  517. .values("number", "requirements")
  518. )
  519. self.assertSequenceEqual(
  520. values,
  521. [
  522. {
  523. "number": 102,
  524. "requirements": [
  525. {"double_bed": False, "sea_view": True, "parking": False},
  526. {"double_bed": True, "parking": True},
  527. ],
  528. },
  529. ],
  530. )
  531. def test_string_agg_array_agg_order_by_in_subquery(self):
  532. stats = []
  533. for i, agg in enumerate(AggregateTestModel.objects.order_by("char_field")):
  534. stats.append(StatTestModel(related_field=agg, int1=i, int2=i + 1))
  535. stats.append(StatTestModel(related_field=agg, int1=i + 1, int2=i))
  536. StatTestModel.objects.bulk_create(stats)
  537. for aggregate, expected_result in (
  538. (
  539. ArrayAgg("stattestmodel__int1", order_by="-stattestmodel__int2"),
  540. [
  541. ("Foo1", [0, 1]),
  542. ("Foo2", [1, 2]),
  543. ("Foo3", [2, 3]),
  544. ("Foo4", [3, 4]),
  545. ],
  546. ),
  547. (
  548. StringAgg(
  549. Cast("stattestmodel__int1", CharField()),
  550. delimiter=";",
  551. order_by="-stattestmodel__int2",
  552. ),
  553. [("Foo1", "0;1"), ("Foo2", "1;2"), ("Foo3", "2;3"), ("Foo4", "3;4")],
  554. ),
  555. ):
  556. with self.subTest(aggregate=aggregate.__class__.__name__):
  557. subquery = (
  558. AggregateTestModel.objects.filter(
  559. pk=OuterRef("pk"),
  560. )
  561. .annotate(agg=aggregate)
  562. .values("agg")
  563. )
  564. values = (
  565. AggregateTestModel.objects.annotate(
  566. agg=Subquery(subquery),
  567. )
  568. .order_by("char_field")
  569. .values_list("char_field", "agg")
  570. )
  571. self.assertEqual(list(values), expected_result)
  572. def test_string_agg_array_agg_filter_in_subquery(self):
  573. StatTestModel.objects.bulk_create(
  574. [
  575. StatTestModel(related_field=self.aggs[0], int1=0, int2=5),
  576. StatTestModel(related_field=self.aggs[0], int1=1, int2=4),
  577. StatTestModel(related_field=self.aggs[0], int1=2, int2=3),
  578. ]
  579. )
  580. for aggregate, expected_result in (
  581. (
  582. ArrayAgg("stattestmodel__int1", filter=Q(stattestmodel__int2__gt=3)),
  583. [("Foo1", [0, 1]), ("Foo2", None)],
  584. ),
  585. (
  586. StringAgg(
  587. Cast("stattestmodel__int2", CharField()),
  588. delimiter=";",
  589. filter=Q(stattestmodel__int1__lt=2),
  590. ),
  591. [("Foo1", "5;4"), ("Foo2", None)],
  592. ),
  593. ):
  594. with self.subTest(aggregate=aggregate.__class__.__name__):
  595. subquery = (
  596. AggregateTestModel.objects.filter(
  597. pk=OuterRef("pk"),
  598. )
  599. .annotate(agg=aggregate)
  600. .values("agg")
  601. )
  602. values = (
  603. AggregateTestModel.objects.annotate(
  604. agg=Subquery(subquery),
  605. )
  606. .filter(
  607. char_field__in=["Foo1", "Foo2"],
  608. )
  609. .order_by("char_field")
  610. .values_list("char_field", "agg")
  611. )
  612. self.assertEqual(list(values), expected_result)
  613. def test_string_agg_filter_in_subquery_with_exclude(self):
  614. subquery = (
  615. AggregateTestModel.objects.annotate(
  616. stringagg=StringAgg(
  617. "char_field",
  618. delimiter=";",
  619. filter=Q(char_field__endswith="1"),
  620. )
  621. )
  622. .exclude(stringagg="")
  623. .values("id")
  624. )
  625. self.assertSequenceEqual(
  626. AggregateTestModel.objects.filter(id__in=Subquery(subquery)),
  627. [self.aggs[0]],
  628. )
  629. def test_ordering_isnt_cleared_for_array_subquery(self):
  630. inner_qs = AggregateTestModel.objects.order_by("-integer_field")
  631. qs = AggregateTestModel.objects.annotate(
  632. integers=Func(
  633. Subquery(inner_qs.values("integer_field")),
  634. function="ARRAY",
  635. output_field=ArrayField(base_field=IntegerField()),
  636. ),
  637. )
  638. self.assertSequenceEqual(
  639. qs.first().integers,
  640. inner_qs.values_list("integer_field", flat=True),
  641. )
  642. def test_window(self):
  643. self.assertCountEqual(
  644. AggregateTestModel.objects.annotate(
  645. integers=Window(
  646. expression=ArrayAgg("char_field"),
  647. partition_by=F("integer_field"),
  648. )
  649. ).values("integers", "char_field"),
  650. [
  651. {"integers": ["Foo1", "Foo3"], "char_field": "Foo1"},
  652. {"integers": ["Foo1", "Foo3"], "char_field": "Foo3"},
  653. {"integers": ["Foo2"], "char_field": "Foo2"},
  654. {"integers": ["Foo4"], "char_field": "Foo4"},
  655. ],
  656. )
  657. def test_values_list(self):
  658. tests = [ArrayAgg("integer_field"), JSONBAgg("integer_field")]
  659. for aggregation in tests:
  660. with self.subTest(aggregation=aggregation):
  661. self.assertCountEqual(
  662. AggregateTestModel.objects.values_list(aggregation),
  663. [([0],), ([1],), ([2],), ([0],)],
  664. )
  665. class TestAggregateDistinct(PostgreSQLTestCase):
  666. @classmethod
  667. def setUpTestData(cls):
  668. AggregateTestModel.objects.create(char_field="Foo")
  669. AggregateTestModel.objects.create(char_field="Foo")
  670. AggregateTestModel.objects.create(char_field="Bar")
  671. def test_string_agg_distinct_false(self):
  672. values = AggregateTestModel.objects.aggregate(
  673. stringagg=StringAgg("char_field", delimiter=" ", distinct=False)
  674. )
  675. self.assertEqual(values["stringagg"].count("Foo"), 2)
  676. self.assertEqual(values["stringagg"].count("Bar"), 1)
  677. def test_string_agg_distinct_true(self):
  678. values = AggregateTestModel.objects.aggregate(
  679. stringagg=StringAgg("char_field", delimiter=" ", distinct=True)
  680. )
  681. self.assertEqual(values["stringagg"].count("Foo"), 1)
  682. self.assertEqual(values["stringagg"].count("Bar"), 1)
  683. def test_array_agg_distinct_false(self):
  684. values = AggregateTestModel.objects.aggregate(
  685. arrayagg=ArrayAgg("char_field", distinct=False)
  686. )
  687. self.assertEqual(sorted(values["arrayagg"]), ["Bar", "Foo", "Foo"])
  688. def test_array_agg_distinct_true(self):
  689. values = AggregateTestModel.objects.aggregate(
  690. arrayagg=ArrayAgg("char_field", distinct=True)
  691. )
  692. self.assertEqual(sorted(values["arrayagg"]), ["Bar", "Foo"])
  693. def test_jsonb_agg_distinct_false(self):
  694. values = AggregateTestModel.objects.aggregate(
  695. jsonbagg=JSONBAgg("char_field", distinct=False),
  696. )
  697. self.assertEqual(sorted(values["jsonbagg"]), ["Bar", "Foo", "Foo"])
  698. def test_jsonb_agg_distinct_true(self):
  699. values = AggregateTestModel.objects.aggregate(
  700. jsonbagg=JSONBAgg("char_field", distinct=True),
  701. )
  702. self.assertEqual(sorted(values["jsonbagg"]), ["Bar", "Foo"])
  703. class TestStatisticsAggregate(PostgreSQLTestCase):
  704. @classmethod
  705. def setUpTestData(cls):
  706. StatTestModel.objects.create(
  707. int1=1,
  708. int2=3,
  709. related_field=AggregateTestModel.objects.create(integer_field=0),
  710. )
  711. StatTestModel.objects.create(
  712. int1=2,
  713. int2=2,
  714. related_field=AggregateTestModel.objects.create(integer_field=1),
  715. )
  716. StatTestModel.objects.create(
  717. int1=3,
  718. int2=1,
  719. related_field=AggregateTestModel.objects.create(integer_field=2),
  720. )
  721. # Tests for base class (StatAggregate)
  722. def test_missing_arguments_raises_exception(self):
  723. with self.assertRaisesMessage(ValueError, "Both y and x must be provided."):
  724. StatAggregate(x=None, y=None)
  725. def test_correct_source_expressions(self):
  726. func = StatAggregate(x="test", y=13)
  727. self.assertIsInstance(func.source_expressions[0], Value)
  728. self.assertIsInstance(func.source_expressions[1], F)
  729. def test_alias_is_required(self):
  730. class SomeFunc(StatAggregate):
  731. function = "TEST"
  732. with self.assertRaisesMessage(TypeError, "Complex aggregates require an alias"):
  733. StatTestModel.objects.aggregate(SomeFunc(y="int2", x="int1"))
  734. # Test aggregates
  735. def test_empty_result_set(self):
  736. StatTestModel.objects.all().delete()
  737. tests = [
  738. (Corr(y="int2", x="int1"), None),
  739. (CovarPop(y="int2", x="int1"), None),
  740. (CovarPop(y="int2", x="int1", sample=True), None),
  741. (RegrAvgX(y="int2", x="int1"), None),
  742. (RegrAvgY(y="int2", x="int1"), None),
  743. (RegrCount(y="int2", x="int1"), 0),
  744. (RegrIntercept(y="int2", x="int1"), None),
  745. (RegrR2(y="int2", x="int1"), None),
  746. (RegrSlope(y="int2", x="int1"), None),
  747. (RegrSXX(y="int2", x="int1"), None),
  748. (RegrSXY(y="int2", x="int1"), None),
  749. (RegrSYY(y="int2", x="int1"), None),
  750. ]
  751. for aggregation, expected_result in tests:
  752. with self.subTest(aggregation=aggregation):
  753. # Empty result with non-execution optimization.
  754. with self.assertNumQueries(0):
  755. values = StatTestModel.objects.none().aggregate(
  756. aggregation=aggregation,
  757. )
  758. self.assertEqual(values, {"aggregation": expected_result})
  759. # Empty result when query must be executed.
  760. with self.assertNumQueries(1):
  761. values = StatTestModel.objects.aggregate(
  762. aggregation=aggregation,
  763. )
  764. self.assertEqual(values, {"aggregation": expected_result})
  765. def test_default_argument(self):
  766. StatTestModel.objects.all().delete()
  767. tests = [
  768. (Corr(y="int2", x="int1", default=0), 0),
  769. (CovarPop(y="int2", x="int1", default=0), 0),
  770. (CovarPop(y="int2", x="int1", sample=True, default=0), 0),
  771. (RegrAvgX(y="int2", x="int1", default=0), 0),
  772. (RegrAvgY(y="int2", x="int1", default=0), 0),
  773. # RegrCount() doesn't support the default argument.
  774. (RegrIntercept(y="int2", x="int1", default=0), 0),
  775. (RegrR2(y="int2", x="int1", default=0), 0),
  776. (RegrSlope(y="int2", x="int1", default=0), 0),
  777. (RegrSXX(y="int2", x="int1", default=0), 0),
  778. (RegrSXY(y="int2", x="int1", default=0), 0),
  779. (RegrSYY(y="int2", x="int1", default=0), 0),
  780. ]
  781. for aggregation, expected_result in tests:
  782. with self.subTest(aggregation=aggregation):
  783. # Empty result with non-execution optimization.
  784. with self.assertNumQueries(0):
  785. values = StatTestModel.objects.none().aggregate(
  786. aggregation=aggregation,
  787. )
  788. self.assertEqual(values, {"aggregation": expected_result})
  789. # Empty result when query must be executed.
  790. with self.assertNumQueries(1):
  791. values = StatTestModel.objects.aggregate(
  792. aggregation=aggregation,
  793. )
  794. self.assertEqual(values, {"aggregation": expected_result})
  795. def test_corr_general(self):
  796. values = StatTestModel.objects.aggregate(corr=Corr(y="int2", x="int1"))
  797. self.assertEqual(values, {"corr": -1.0})
  798. def test_covar_pop_general(self):
  799. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y="int2", x="int1"))
  800. self.assertEqual(values, {"covarpop": Approximate(-0.66, places=1)})
  801. def test_covar_pop_sample(self):
  802. values = StatTestModel.objects.aggregate(
  803. covarpop=CovarPop(y="int2", x="int1", sample=True)
  804. )
  805. self.assertEqual(values, {"covarpop": -1.0})
  806. def test_regr_avgx_general(self):
  807. values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y="int2", x="int1"))
  808. self.assertEqual(values, {"regravgx": 2.0})
  809. def test_regr_avgy_general(self):
  810. values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y="int2", x="int1"))
  811. self.assertEqual(values, {"regravgy": 2.0})
  812. def test_regr_count_general(self):
  813. values = StatTestModel.objects.aggregate(
  814. regrcount=RegrCount(y="int2", x="int1")
  815. )
  816. self.assertEqual(values, {"regrcount": 3})
  817. def test_regr_count_default(self):
  818. msg = "RegrCount does not allow default."
  819. with self.assertRaisesMessage(TypeError, msg):
  820. RegrCount(y="int2", x="int1", default=0)
  821. def test_regr_intercept_general(self):
  822. values = StatTestModel.objects.aggregate(
  823. regrintercept=RegrIntercept(y="int2", x="int1")
  824. )
  825. self.assertEqual(values, {"regrintercept": 4})
  826. def test_regr_r2_general(self):
  827. values = StatTestModel.objects.aggregate(regrr2=RegrR2(y="int2", x="int1"))
  828. self.assertEqual(values, {"regrr2": 1})
  829. def test_regr_slope_general(self):
  830. values = StatTestModel.objects.aggregate(
  831. regrslope=RegrSlope(y="int2", x="int1")
  832. )
  833. self.assertEqual(values, {"regrslope": -1})
  834. def test_regr_sxx_general(self):
  835. values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y="int2", x="int1"))
  836. self.assertEqual(values, {"regrsxx": 2.0})
  837. def test_regr_sxy_general(self):
  838. values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y="int2", x="int1"))
  839. self.assertEqual(values, {"regrsxy": -2.0})
  840. def test_regr_syy_general(self):
  841. values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y="int2", x="int1"))
  842. self.assertEqual(values, {"regrsyy": 2.0})
  843. def test_regr_avgx_with_related_obj_and_number_as_argument(self):
  844. """
  845. This is more complex test to check if JOIN on field and
  846. number as argument works as expected.
  847. """
  848. values = StatTestModel.objects.aggregate(
  849. complex_regravgx=RegrAvgX(y=5, x="related_field__integer_field")
  850. )
  851. self.assertEqual(values, {"complex_regravgx": 1.0})