test_aggregates.py 13 KB


  1. import json
  2. from django.db.models.expressions import F, Value
  3. from django.test.testcases import skipUnlessDBFeature
  4. from django.test.utils import Approximate
  5. from . import PostgreSQLTestCase
  6. from .models import AggregateTestModel, StatTestModel
  7. try:
  8. from django.contrib.postgres.aggregates import (
  9. ArrayAgg, BitAnd, BitOr, BoolAnd, BoolOr, Corr, CovarPop, JSONBAgg,
  10. RegrAvgX, RegrAvgY, RegrCount, RegrIntercept, RegrR2, RegrSlope,
  11. RegrSXX, RegrSXY, RegrSYY, StatAggregate, StringAgg,
  12. )
  13. except ImportError:
  14. pass # psycopg2 is not installed
  15. class TestGeneralAggregate(PostgreSQLTestCase):
  16. @classmethod
  17. def setUpTestData(cls):
  18. AggregateTestModel.objects.create(boolean_field=True, char_field='Foo1', integer_field=0)
  19. AggregateTestModel.objects.create(boolean_field=False, char_field='Foo2', integer_field=1)
  20. AggregateTestModel.objects.create(boolean_field=False, char_field='Foo3', integer_field=2)
  21. AggregateTestModel.objects.create(boolean_field=True, char_field='Foo4', integer_field=0)
  22. def test_array_agg_charfield(self):
  23. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('char_field'))
  24. self.assertEqual(values, {'arrayagg': ['Foo1', 'Foo2', 'Foo3', 'Foo4']})
  25. def test_array_agg_integerfield(self):
  26. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('integer_field'))
  27. self.assertEqual(values, {'arrayagg': [0, 1, 2, 0]})
  28. def test_array_agg_booleanfield(self):
  29. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('boolean_field'))
  30. self.assertEqual(values, {'arrayagg': [True, False, False, True]})
  31. def test_array_agg_empty_result(self):
  32. AggregateTestModel.objects.all().delete()
  33. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('char_field'))
  34. self.assertEqual(values, {'arrayagg': []})
  35. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('integer_field'))
  36. self.assertEqual(values, {'arrayagg': []})
  37. values = AggregateTestModel.objects.aggregate(arrayagg=ArrayAgg('boolean_field'))
  38. self.assertEqual(values, {'arrayagg': []})
  39. def test_bit_and_general(self):
  40. values = AggregateTestModel.objects.filter(
  41. integer_field__in=[0, 1]).aggregate(bitand=BitAnd('integer_field'))
  42. self.assertEqual(values, {'bitand': 0})
  43. def test_bit_and_on_only_true_values(self):
  44. values = AggregateTestModel.objects.filter(
  45. integer_field=1).aggregate(bitand=BitAnd('integer_field'))
  46. self.assertEqual(values, {'bitand': 1})
  47. def test_bit_and_on_only_false_values(self):
  48. values = AggregateTestModel.objects.filter(
  49. integer_field=0).aggregate(bitand=BitAnd('integer_field'))
  50. self.assertEqual(values, {'bitand': 0})
  51. def test_bit_and_empty_result(self):
  52. AggregateTestModel.objects.all().delete()
  53. values = AggregateTestModel.objects.aggregate(bitand=BitAnd('integer_field'))
  54. self.assertEqual(values, {'bitand': None})
  55. def test_bit_or_general(self):
  56. values = AggregateTestModel.objects.filter(
  57. integer_field__in=[0, 1]).aggregate(bitor=BitOr('integer_field'))
  58. self.assertEqual(values, {'bitor': 1})
  59. def test_bit_or_on_only_true_values(self):
  60. values = AggregateTestModel.objects.filter(
  61. integer_field=1).aggregate(bitor=BitOr('integer_field'))
  62. self.assertEqual(values, {'bitor': 1})
  63. def test_bit_or_on_only_false_values(self):
  64. values = AggregateTestModel.objects.filter(
  65. integer_field=0).aggregate(bitor=BitOr('integer_field'))
  66. self.assertEqual(values, {'bitor': 0})
  67. def test_bit_or_empty_result(self):
  68. AggregateTestModel.objects.all().delete()
  69. values = AggregateTestModel.objects.aggregate(bitor=BitOr('integer_field'))
  70. self.assertEqual(values, {'bitor': None})
  71. def test_bool_and_general(self):
  72. values = AggregateTestModel.objects.aggregate(booland=BoolAnd('boolean_field'))
  73. self.assertEqual(values, {'booland': False})
  74. def test_bool_and_empty_result(self):
  75. AggregateTestModel.objects.all().delete()
  76. values = AggregateTestModel.objects.aggregate(booland=BoolAnd('boolean_field'))
  77. self.assertEqual(values, {'booland': None})
  78. def test_bool_or_general(self):
  79. values = AggregateTestModel.objects.aggregate(boolor=BoolOr('boolean_field'))
  80. self.assertEqual(values, {'boolor': True})
  81. def test_bool_or_empty_result(self):
  82. AggregateTestModel.objects.all().delete()
  83. values = AggregateTestModel.objects.aggregate(boolor=BoolOr('boolean_field'))
  84. self.assertEqual(values, {'boolor': None})
  85. def test_string_agg_requires_delimiter(self):
  86. with self.assertRaises(TypeError):
  87. AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field'))
  88. def test_string_agg_charfield(self):
  89. values = AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field', delimiter=';'))
  90. self.assertEqual(values, {'stringagg': 'Foo1;Foo2;Foo3;Foo4'})
  91. def test_string_agg_empty_result(self):
  92. AggregateTestModel.objects.all().delete()
  93. values = AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field', delimiter=';'))
  94. self.assertEqual(values, {'stringagg': ''})
  95. @skipUnlessDBFeature('has_jsonb_agg')
  96. def test_json_agg(self):
  97. values = AggregateTestModel.objects.aggregate(jsonagg=JSONBAgg('char_field'))
  98. self.assertEqual(values, {'jsonagg': ['Foo1', 'Foo2', 'Foo3', 'Foo4']})
  99. @skipUnlessDBFeature('has_jsonb_agg')
  100. def test_json_agg_empty(self):
  101. values = AggregateTestModel.objects.none().aggregate(jsonagg=JSONBAgg('integer_field'))
  102. self.assertEqual(values, json.loads('{"jsonagg": []}'))
  103. class TestStringAggregateDistinct(PostgreSQLTestCase):
  104. @classmethod
  105. def setUpTestData(cls):
  106. AggregateTestModel.objects.create(char_field='Foo')
  107. AggregateTestModel.objects.create(char_field='Foo')
  108. AggregateTestModel.objects.create(char_field='Bar')
  109. def test_string_agg_distinct_false(self):
  110. values = AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field', delimiter=' ', distinct=False))
  111. self.assertEqual(values['stringagg'].count('Foo'), 2)
  112. self.assertEqual(values['stringagg'].count('Bar'), 1)
  113. def test_string_agg_distinct_true(self):
  114. values = AggregateTestModel.objects.aggregate(stringagg=StringAgg('char_field', delimiter=' ', distinct=True))
  115. self.assertEqual(values['stringagg'].count('Foo'), 1)
  116. self.assertEqual(values['stringagg'].count('Bar'), 1)
  117. class TestStatisticsAggregate(PostgreSQLTestCase):
  118. @classmethod
  119. def setUpTestData(cls):
  120. StatTestModel.objects.create(
  121. int1=1,
  122. int2=3,
  123. related_field=AggregateTestModel.objects.create(integer_field=0),
  124. )
  125. StatTestModel.objects.create(
  126. int1=2,
  127. int2=2,
  128. related_field=AggregateTestModel.objects.create(integer_field=1),
  129. )
  130. StatTestModel.objects.create(
  131. int1=3,
  132. int2=1,
  133. related_field=AggregateTestModel.objects.create(integer_field=2),
  134. )
  135. # Tests for base class (StatAggregate)
  136. def test_missing_arguments_raises_exception(self):
  137. with self.assertRaisesMessage(ValueError, 'Both y and x must be provided.'):
  138. StatAggregate(x=None, y=None)
  139. def test_correct_source_expressions(self):
  140. func = StatAggregate(x='test', y=13)
  141. self.assertIsInstance(func.source_expressions[0], Value)
  142. self.assertIsInstance(func.source_expressions[1], F)
  143. def test_alias_is_required(self):
  144. class SomeFunc(StatAggregate):
  145. function = 'TEST'
  146. with self.assertRaisesMessage(TypeError, 'Complex aggregates require an alias'):
  147. StatTestModel.objects.aggregate(SomeFunc(y='int2', x='int1'))
  148. # Test aggregates
  149. def test_corr_general(self):
  150. values = StatTestModel.objects.aggregate(corr=Corr(y='int2', x='int1'))
  151. self.assertEqual(values, {'corr': -1.0})
  152. def test_corr_empty_result(self):
  153. StatTestModel.objects.all().delete()
  154. values = StatTestModel.objects.aggregate(corr=Corr(y='int2', x='int1'))
  155. self.assertEqual(values, {'corr': None})
  156. def test_covar_pop_general(self):
  157. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1'))
  158. self.assertEqual(values, {'covarpop': Approximate(-0.66, places=1)})
  159. def test_covar_pop_empty_result(self):
  160. StatTestModel.objects.all().delete()
  161. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1'))
  162. self.assertEqual(values, {'covarpop': None})
  163. def test_covar_pop_sample(self):
  164. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1', sample=True))
  165. self.assertEqual(values, {'covarpop': -1.0})
  166. def test_covar_pop_sample_empty_result(self):
  167. StatTestModel.objects.all().delete()
  168. values = StatTestModel.objects.aggregate(covarpop=CovarPop(y='int2', x='int1', sample=True))
  169. self.assertEqual(values, {'covarpop': None})
  170. def test_regr_avgx_general(self):
  171. values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y='int2', x='int1'))
  172. self.assertEqual(values, {'regravgx': 2.0})
  173. def test_regr_avgx_empty_result(self):
  174. StatTestModel.objects.all().delete()
  175. values = StatTestModel.objects.aggregate(regravgx=RegrAvgX(y='int2', x='int1'))
  176. self.assertEqual(values, {'regravgx': None})
  177. def test_regr_avgy_general(self):
  178. values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y='int2', x='int1'))
  179. self.assertEqual(values, {'regravgy': 2.0})
  180. def test_regr_avgy_empty_result(self):
  181. StatTestModel.objects.all().delete()
  182. values = StatTestModel.objects.aggregate(regravgy=RegrAvgY(y='int2', x='int1'))
  183. self.assertEqual(values, {'regravgy': None})
  184. def test_regr_count_general(self):
  185. values = StatTestModel.objects.aggregate(regrcount=RegrCount(y='int2', x='int1'))
  186. self.assertEqual(values, {'regrcount': 3})
  187. def test_regr_count_empty_result(self):
  188. StatTestModel.objects.all().delete()
  189. values = StatTestModel.objects.aggregate(regrcount=RegrCount(y='int2', x='int1'))
  190. self.assertEqual(values, {'regrcount': 0})
  191. def test_regr_intercept_general(self):
  192. values = StatTestModel.objects.aggregate(regrintercept=RegrIntercept(y='int2', x='int1'))
  193. self.assertEqual(values, {'regrintercept': 4})
  194. def test_regr_intercept_empty_result(self):
  195. StatTestModel.objects.all().delete()
  196. values = StatTestModel.objects.aggregate(regrintercept=RegrIntercept(y='int2', x='int1'))
  197. self.assertEqual(values, {'regrintercept': None})
  198. def test_regr_r2_general(self):
  199. values = StatTestModel.objects.aggregate(regrr2=RegrR2(y='int2', x='int1'))
  200. self.assertEqual(values, {'regrr2': 1})
  201. def test_regr_r2_empty_result(self):
  202. StatTestModel.objects.all().delete()
  203. values = StatTestModel.objects.aggregate(regrr2=RegrR2(y='int2', x='int1'))
  204. self.assertEqual(values, {'regrr2': None})
  205. def test_regr_slope_general(self):
  206. values = StatTestModel.objects.aggregate(regrslope=RegrSlope(y='int2', x='int1'))
  207. self.assertEqual(values, {'regrslope': -1})
  208. def test_regr_slope_empty_result(self):
  209. StatTestModel.objects.all().delete()
  210. values = StatTestModel.objects.aggregate(regrslope=RegrSlope(y='int2', x='int1'))
  211. self.assertEqual(values, {'regrslope': None})
  212. def test_regr_sxx_general(self):
  213. values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y='int2', x='int1'))
  214. self.assertEqual(values, {'regrsxx': 2.0})
  215. def test_regr_sxx_empty_result(self):
  216. StatTestModel.objects.all().delete()
  217. values = StatTestModel.objects.aggregate(regrsxx=RegrSXX(y='int2', x='int1'))
  218. self.assertEqual(values, {'regrsxx': None})
  219. def test_regr_sxy_general(self):
  220. values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y='int2', x='int1'))
  221. self.assertEqual(values, {'regrsxy': -2.0})
  222. def test_regr_sxy_empty_result(self):
  223. StatTestModel.objects.all().delete()
  224. values = StatTestModel.objects.aggregate(regrsxy=RegrSXY(y='int2', x='int1'))
  225. self.assertEqual(values, {'regrsxy': None})
  226. def test_regr_syy_general(self):
  227. values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y='int2', x='int1'))
  228. self.assertEqual(values, {'regrsyy': 2.0})
  229. def test_regr_syy_empty_result(self):
  230. StatTestModel.objects.all().delete()
  231. values = StatTestModel.objects.aggregate(regrsyy=RegrSYY(y='int2', x='int1'))
  232. self.assertEqual(values, {'regrsyy': None})
  233. def test_regr_avgx_with_related_obj_and_number_as_argument(self):
  234. """
  235. This is more complex test to check if JOIN on field and
  236. number as argument works as expected.
  237. """
  238. values = StatTestModel.objects.aggregate(complex_regravgx=RegrAvgX(y=5, x='related_field__integer_field'))
  239. self.assertEqual(values, {'complex_regravgx': 1.0})