test_constraints.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626
  1. import datetime
  2. from unittest import mock
  3. from django.db import (
  4. IntegrityError, NotSupportedError, connection, transaction,
  5. )
  6. from django.db.models import (
  7. CheckConstraint, Deferrable, F, Func, Q, UniqueConstraint,
  8. )
  9. from django.test import skipUnlessDBFeature
  10. from django.utils import timezone
  11. from . import PostgreSQLTestCase
  12. from .models import HotelReservation, RangesModel, Room, Scene
  13. try:
  14. from django.contrib.postgres.constraints import ExclusionConstraint
  15. from django.contrib.postgres.fields import DateTimeRangeField, RangeBoundary, RangeOperators
  16. from psycopg2.extras import DateRange, NumericRange
  17. except ImportError:
  18. pass
  19. class SchemaTests(PostgreSQLTestCase):
  20. get_opclass_query = '''
  21. SELECT opcname, c.relname FROM pg_opclass AS oc
  22. JOIN pg_index as i on oc.oid = ANY(i.indclass)
  23. JOIN pg_class as c on c.oid = i.indexrelid
  24. WHERE c.relname = %s
  25. '''
  26. def get_constraints(self, table):
  27. """Get the constraints on the table using a new cursor."""
  28. with connection.cursor() as cursor:
  29. return connection.introspection.get_constraints(cursor, table)
  30. def test_check_constraint_range_value(self):
  31. constraint_name = 'ints_between'
  32. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  33. constraint = CheckConstraint(
  34. check=Q(ints__contained_by=NumericRange(10, 30)),
  35. name=constraint_name,
  36. )
  37. with connection.schema_editor() as editor:
  38. editor.add_constraint(RangesModel, constraint)
  39. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  40. with self.assertRaises(IntegrityError), transaction.atomic():
  41. RangesModel.objects.create(ints=(20, 50))
  42. RangesModel.objects.create(ints=(10, 30))
  43. def test_check_constraint_daterange_contains(self):
  44. constraint_name = 'dates_contains'
  45. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  46. constraint = CheckConstraint(
  47. check=Q(dates__contains=F('dates_inner')),
  48. name=constraint_name,
  49. )
  50. with connection.schema_editor() as editor:
  51. editor.add_constraint(RangesModel, constraint)
  52. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  53. date_1 = datetime.date(2016, 1, 1)
  54. date_2 = datetime.date(2016, 1, 4)
  55. with self.assertRaises(IntegrityError), transaction.atomic():
  56. RangesModel.objects.create(
  57. dates=(date_1, date_2),
  58. dates_inner=(date_1, date_2.replace(day=5)),
  59. )
  60. RangesModel.objects.create(
  61. dates=(date_1, date_2),
  62. dates_inner=(date_1, date_2),
  63. )
  64. def test_check_constraint_datetimerange_contains(self):
  65. constraint_name = 'timestamps_contains'
  66. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  67. constraint = CheckConstraint(
  68. check=Q(timestamps__contains=F('timestamps_inner')),
  69. name=constraint_name,
  70. )
  71. with connection.schema_editor() as editor:
  72. editor.add_constraint(RangesModel, constraint)
  73. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  74. datetime_1 = datetime.datetime(2016, 1, 1)
  75. datetime_2 = datetime.datetime(2016, 1, 2, 12)
  76. with self.assertRaises(IntegrityError), transaction.atomic():
  77. RangesModel.objects.create(
  78. timestamps=(datetime_1, datetime_2),
  79. timestamps_inner=(datetime_1, datetime_2.replace(hour=13)),
  80. )
  81. RangesModel.objects.create(
  82. timestamps=(datetime_1, datetime_2),
  83. timestamps_inner=(datetime_1, datetime_2),
  84. )
  85. def test_opclass(self):
  86. constraint = UniqueConstraint(
  87. name='test_opclass',
  88. fields=['scene'],
  89. opclasses=['varchar_pattern_ops'],
  90. )
  91. with connection.schema_editor() as editor:
  92. editor.add_constraint(Scene, constraint)
  93. self.assertIn(constraint.name, self.get_constraints(Scene._meta.db_table))
  94. with editor.connection.cursor() as cursor:
  95. cursor.execute(self.get_opclass_query, [constraint.name])
  96. self.assertEqual(
  97. cursor.fetchall(),
  98. [('varchar_pattern_ops', constraint.name)],
  99. )
  100. # Drop the constraint.
  101. with connection.schema_editor() as editor:
  102. editor.remove_constraint(Scene, constraint)
  103. self.assertNotIn(constraint.name, self.get_constraints(Scene._meta.db_table))
  104. def test_opclass_multiple_columns(self):
  105. constraint = UniqueConstraint(
  106. name='test_opclass_multiple',
  107. fields=['scene', 'setting'],
  108. opclasses=['varchar_pattern_ops', 'text_pattern_ops'],
  109. )
  110. with connection.schema_editor() as editor:
  111. editor.add_constraint(Scene, constraint)
  112. with editor.connection.cursor() as cursor:
  113. cursor.execute(self.get_opclass_query, [constraint.name])
  114. expected_opclasses = (
  115. ('varchar_pattern_ops', constraint.name),
  116. ('text_pattern_ops', constraint.name),
  117. )
  118. self.assertCountEqual(cursor.fetchall(), expected_opclasses)
  119. def test_opclass_partial(self):
  120. constraint = UniqueConstraint(
  121. name='test_opclass_partial',
  122. fields=['scene'],
  123. opclasses=['varchar_pattern_ops'],
  124. condition=Q(setting__contains="Sir Bedemir's Castle"),
  125. )
  126. with connection.schema_editor() as editor:
  127. editor.add_constraint(Scene, constraint)
  128. with editor.connection.cursor() as cursor:
  129. cursor.execute(self.get_opclass_query, [constraint.name])
  130. self.assertCountEqual(
  131. cursor.fetchall(),
  132. [('varchar_pattern_ops', constraint.name)],
  133. )
  134. @skipUnlessDBFeature('supports_covering_indexes')
  135. def test_opclass_include(self):
  136. constraint = UniqueConstraint(
  137. name='test_opclass_include',
  138. fields=['scene'],
  139. opclasses=['varchar_pattern_ops'],
  140. include=['setting'],
  141. )
  142. with connection.schema_editor() as editor:
  143. editor.add_constraint(Scene, constraint)
  144. with editor.connection.cursor() as cursor:
  145. cursor.execute(self.get_opclass_query, [constraint.name])
  146. self.assertCountEqual(
  147. cursor.fetchall(),
  148. [('varchar_pattern_ops', constraint.name)],
  149. )
  150. class ExclusionConstraintTests(PostgreSQLTestCase):
  151. def get_constraints(self, table):
  152. """Get the constraints on the table using a new cursor."""
  153. with connection.cursor() as cursor:
  154. return connection.introspection.get_constraints(cursor, table)
  155. def test_invalid_condition(self):
  156. msg = 'ExclusionConstraint.condition must be a Q instance.'
  157. with self.assertRaisesMessage(ValueError, msg):
  158. ExclusionConstraint(
  159. index_type='GIST',
  160. name='exclude_invalid_condition',
  161. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  162. condition=F('invalid'),
  163. )
  164. def test_invalid_index_type(self):
  165. msg = 'Exclusion constraints only support GiST or SP-GiST indexes.'
  166. with self.assertRaisesMessage(ValueError, msg):
  167. ExclusionConstraint(
  168. index_type='gin',
  169. name='exclude_invalid_index_type',
  170. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  171. )
  172. def test_invalid_expressions(self):
  173. msg = 'The expressions must be a list of 2-tuples.'
  174. for expressions in (['foo'], [('foo')], [('foo_1', 'foo_2', 'foo_3')]):
  175. with self.subTest(expressions), self.assertRaisesMessage(ValueError, msg):
  176. ExclusionConstraint(
  177. index_type='GIST',
  178. name='exclude_invalid_expressions',
  179. expressions=expressions,
  180. )
  181. def test_empty_expressions(self):
  182. msg = 'At least one expression is required to define an exclusion constraint.'
  183. for empty_expressions in (None, []):
  184. with self.subTest(empty_expressions), self.assertRaisesMessage(ValueError, msg):
  185. ExclusionConstraint(
  186. index_type='GIST',
  187. name='exclude_empty_expressions',
  188. expressions=empty_expressions,
  189. )
  190. def test_invalid_deferrable(self):
  191. msg = 'ExclusionConstraint.deferrable must be a Deferrable instance.'
  192. with self.assertRaisesMessage(ValueError, msg):
  193. ExclusionConstraint(
  194. name='exclude_invalid_deferrable',
  195. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  196. deferrable='invalid',
  197. )
  198. def test_deferrable_with_condition(self):
  199. msg = 'ExclusionConstraint with conditions cannot be deferred.'
  200. with self.assertRaisesMessage(ValueError, msg):
  201. ExclusionConstraint(
  202. name='exclude_invalid_condition',
  203. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  204. condition=Q(cancelled=False),
  205. deferrable=Deferrable.DEFERRED,
  206. )
  207. def test_invalid_include_type(self):
  208. msg = 'ExclusionConstraint.include must be a list or tuple.'
  209. with self.assertRaisesMessage(ValueError, msg):
  210. ExclusionConstraint(
  211. name='exclude_invalid_include',
  212. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  213. include='invalid',
  214. )
  215. def test_invalid_include_index_type(self):
  216. msg = 'Covering exclusion constraints only support GiST indexes.'
  217. with self.assertRaisesMessage(ValueError, msg):
  218. ExclusionConstraint(
  219. name='exclude_invalid_index_type',
  220. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  221. include=['cancelled'],
  222. index_type='spgist',
  223. )
  224. def test_repr(self):
  225. constraint = ExclusionConstraint(
  226. name='exclude_overlapping',
  227. expressions=[
  228. (F('datespan'), RangeOperators.OVERLAPS),
  229. (F('room'), RangeOperators.EQUAL),
  230. ],
  231. )
  232. self.assertEqual(
  233. repr(constraint),
  234. "<ExclusionConstraint: index_type=GIST, expressions=["
  235. "(F(datespan), '&&'), (F(room), '=')]>",
  236. )
  237. constraint = ExclusionConstraint(
  238. name='exclude_overlapping',
  239. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  240. condition=Q(cancelled=False),
  241. index_type='SPGiST',
  242. )
  243. self.assertEqual(
  244. repr(constraint),
  245. "<ExclusionConstraint: index_type=SPGiST, expressions=["
  246. "(F(datespan), '-|-')], condition=(AND: ('cancelled', False))>",
  247. )
  248. constraint = ExclusionConstraint(
  249. name='exclude_overlapping',
  250. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  251. deferrable=Deferrable.IMMEDIATE,
  252. )
  253. self.assertEqual(
  254. repr(constraint),
  255. "<ExclusionConstraint: index_type=GIST, expressions=["
  256. "(F(datespan), '-|-')], deferrable=Deferrable.IMMEDIATE>",
  257. )
  258. constraint = ExclusionConstraint(
  259. name='exclude_overlapping',
  260. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  261. include=['cancelled', 'room'],
  262. )
  263. self.assertEqual(
  264. repr(constraint),
  265. "<ExclusionConstraint: index_type=GIST, expressions=["
  266. "(F(datespan), '-|-')], include=('cancelled', 'room')>",
  267. )
  268. def test_eq(self):
  269. constraint_1 = ExclusionConstraint(
  270. name='exclude_overlapping',
  271. expressions=[
  272. (F('datespan'), RangeOperators.OVERLAPS),
  273. (F('room'), RangeOperators.EQUAL),
  274. ],
  275. condition=Q(cancelled=False),
  276. )
  277. constraint_2 = ExclusionConstraint(
  278. name='exclude_overlapping',
  279. expressions=[
  280. ('datespan', RangeOperators.OVERLAPS),
  281. ('room', RangeOperators.EQUAL),
  282. ],
  283. )
  284. constraint_3 = ExclusionConstraint(
  285. name='exclude_overlapping',
  286. expressions=[('datespan', RangeOperators.OVERLAPS)],
  287. condition=Q(cancelled=False),
  288. )
  289. constraint_4 = ExclusionConstraint(
  290. name='exclude_overlapping',
  291. expressions=[
  292. ('datespan', RangeOperators.OVERLAPS),
  293. ('room', RangeOperators.EQUAL),
  294. ],
  295. deferrable=Deferrable.DEFERRED,
  296. )
  297. constraint_5 = ExclusionConstraint(
  298. name='exclude_overlapping',
  299. expressions=[
  300. ('datespan', RangeOperators.OVERLAPS),
  301. ('room', RangeOperators.EQUAL),
  302. ],
  303. deferrable=Deferrable.IMMEDIATE,
  304. )
  305. constraint_6 = ExclusionConstraint(
  306. name='exclude_overlapping',
  307. expressions=[
  308. ('datespan', RangeOperators.OVERLAPS),
  309. ('room', RangeOperators.EQUAL),
  310. ],
  311. deferrable=Deferrable.IMMEDIATE,
  312. include=['cancelled'],
  313. )
  314. constraint_7 = ExclusionConstraint(
  315. name='exclude_overlapping',
  316. expressions=[
  317. ('datespan', RangeOperators.OVERLAPS),
  318. ('room', RangeOperators.EQUAL),
  319. ],
  320. include=['cancelled'],
  321. )
  322. self.assertEqual(constraint_1, constraint_1)
  323. self.assertEqual(constraint_1, mock.ANY)
  324. self.assertNotEqual(constraint_1, constraint_2)
  325. self.assertNotEqual(constraint_1, constraint_3)
  326. self.assertNotEqual(constraint_1, constraint_4)
  327. self.assertNotEqual(constraint_2, constraint_3)
  328. self.assertNotEqual(constraint_2, constraint_4)
  329. self.assertNotEqual(constraint_2, constraint_7)
  330. self.assertNotEqual(constraint_4, constraint_5)
  331. self.assertNotEqual(constraint_5, constraint_6)
  332. self.assertNotEqual(constraint_1, object())
  333. def test_deconstruct(self):
  334. constraint = ExclusionConstraint(
  335. name='exclude_overlapping',
  336. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  337. )
  338. path, args, kwargs = constraint.deconstruct()
  339. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  340. self.assertEqual(args, ())
  341. self.assertEqual(kwargs, {
  342. 'name': 'exclude_overlapping',
  343. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  344. })
  345. def test_deconstruct_index_type(self):
  346. constraint = ExclusionConstraint(
  347. name='exclude_overlapping',
  348. index_type='SPGIST',
  349. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  350. )
  351. path, args, kwargs = constraint.deconstruct()
  352. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  353. self.assertEqual(args, ())
  354. self.assertEqual(kwargs, {
  355. 'name': 'exclude_overlapping',
  356. 'index_type': 'SPGIST',
  357. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  358. })
  359. def test_deconstruct_condition(self):
  360. constraint = ExclusionConstraint(
  361. name='exclude_overlapping',
  362. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  363. condition=Q(cancelled=False),
  364. )
  365. path, args, kwargs = constraint.deconstruct()
  366. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  367. self.assertEqual(args, ())
  368. self.assertEqual(kwargs, {
  369. 'name': 'exclude_overlapping',
  370. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  371. 'condition': Q(cancelled=False),
  372. })
  373. def test_deconstruct_deferrable(self):
  374. constraint = ExclusionConstraint(
  375. name='exclude_overlapping',
  376. expressions=[('datespan', RangeOperators.OVERLAPS)],
  377. deferrable=Deferrable.DEFERRED,
  378. )
  379. path, args, kwargs = constraint.deconstruct()
  380. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  381. self.assertEqual(args, ())
  382. self.assertEqual(kwargs, {
  383. 'name': 'exclude_overlapping',
  384. 'expressions': [('datespan', RangeOperators.OVERLAPS)],
  385. 'deferrable': Deferrable.DEFERRED,
  386. })
  387. def test_deconstruct_include(self):
  388. constraint = ExclusionConstraint(
  389. name='exclude_overlapping',
  390. expressions=[('datespan', RangeOperators.OVERLAPS)],
  391. include=['cancelled', 'room'],
  392. )
  393. path, args, kwargs = constraint.deconstruct()
  394. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  395. self.assertEqual(args, ())
  396. self.assertEqual(kwargs, {
  397. 'name': 'exclude_overlapping',
  398. 'expressions': [('datespan', RangeOperators.OVERLAPS)],
  399. 'include': ('cancelled', 'room'),
  400. })
  401. def _test_range_overlaps(self, constraint):
  402. # Create exclusion constraint.
  403. self.assertNotIn(constraint.name, self.get_constraints(HotelReservation._meta.db_table))
  404. with connection.schema_editor() as editor:
  405. editor.add_constraint(HotelReservation, constraint)
  406. self.assertIn(constraint.name, self.get_constraints(HotelReservation._meta.db_table))
  407. # Add initial reservations.
  408. room101 = Room.objects.create(number=101)
  409. room102 = Room.objects.create(number=102)
  410. datetimes = [
  411. timezone.datetime(2018, 6, 20),
  412. timezone.datetime(2018, 6, 24),
  413. timezone.datetime(2018, 6, 26),
  414. timezone.datetime(2018, 6, 28),
  415. timezone.datetime(2018, 6, 29),
  416. ]
  417. HotelReservation.objects.create(
  418. datespan=DateRange(datetimes[0].date(), datetimes[1].date()),
  419. start=datetimes[0],
  420. end=datetimes[1],
  421. room=room102,
  422. )
  423. HotelReservation.objects.create(
  424. datespan=DateRange(datetimes[1].date(), datetimes[3].date()),
  425. start=datetimes[1],
  426. end=datetimes[3],
  427. room=room102,
  428. )
  429. # Overlap dates.
  430. with self.assertRaises(IntegrityError), transaction.atomic():
  431. reservation = HotelReservation(
  432. datespan=(datetimes[1].date(), datetimes[2].date()),
  433. start=datetimes[1],
  434. end=datetimes[2],
  435. room=room102,
  436. )
  437. reservation.save()
  438. # Valid range.
  439. HotelReservation.objects.bulk_create([
  440. # Other room.
  441. HotelReservation(
  442. datespan=(datetimes[1].date(), datetimes[2].date()),
  443. start=datetimes[1],
  444. end=datetimes[2],
  445. room=room101,
  446. ),
  447. # Cancelled reservation.
  448. HotelReservation(
  449. datespan=(datetimes[1].date(), datetimes[1].date()),
  450. start=datetimes[1],
  451. end=datetimes[2],
  452. room=room102,
  453. cancelled=True,
  454. ),
  455. # Other adjacent dates.
  456. HotelReservation(
  457. datespan=(datetimes[3].date(), datetimes[4].date()),
  458. start=datetimes[3],
  459. end=datetimes[4],
  460. room=room102,
  461. ),
  462. ])
  463. def test_range_overlaps_custom(self):
  464. class TsTzRange(Func):
  465. function = 'TSTZRANGE'
  466. output_field = DateTimeRangeField()
  467. constraint = ExclusionConstraint(
  468. name='exclude_overlapping_reservations_custom',
  469. expressions=[
  470. (TsTzRange('start', 'end', RangeBoundary()), RangeOperators.OVERLAPS),
  471. ('room', RangeOperators.EQUAL)
  472. ],
  473. condition=Q(cancelled=False),
  474. )
  475. self._test_range_overlaps(constraint)
  476. def test_range_overlaps(self):
  477. constraint = ExclusionConstraint(
  478. name='exclude_overlapping_reservations',
  479. expressions=[
  480. (F('datespan'), RangeOperators.OVERLAPS),
  481. ('room', RangeOperators.EQUAL)
  482. ],
  483. condition=Q(cancelled=False),
  484. )
  485. self._test_range_overlaps(constraint)
  486. def test_range_adjacent(self):
  487. constraint_name = 'ints_adjacent'
  488. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  489. constraint = ExclusionConstraint(
  490. name=constraint_name,
  491. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  492. )
  493. with connection.schema_editor() as editor:
  494. editor.add_constraint(RangesModel, constraint)
  495. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  496. RangesModel.objects.create(ints=(20, 50))
  497. with self.assertRaises(IntegrityError), transaction.atomic():
  498. RangesModel.objects.create(ints=(10, 20))
  499. RangesModel.objects.create(ints=(10, 19))
  500. RangesModel.objects.create(ints=(51, 60))
  501. # Drop the constraint.
  502. with connection.schema_editor() as editor:
  503. editor.remove_constraint(RangesModel, constraint)
  504. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  505. def test_range_adjacent_initially_deferred(self):
  506. constraint_name = 'ints_adjacent_deferred'
  507. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  508. constraint = ExclusionConstraint(
  509. name=constraint_name,
  510. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  511. deferrable=Deferrable.DEFERRED,
  512. )
  513. with connection.schema_editor() as editor:
  514. editor.add_constraint(RangesModel, constraint)
  515. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  516. RangesModel.objects.create(ints=(20, 50))
  517. adjacent_range = RangesModel.objects.create(ints=(10, 20))
  518. # Constraint behavior can be changed with SET CONSTRAINTS.
  519. with self.assertRaises(IntegrityError):
  520. with transaction.atomic(), connection.cursor() as cursor:
  521. quoted_name = connection.ops.quote_name(constraint_name)
  522. cursor.execute('SET CONSTRAINTS %s IMMEDIATE' % quoted_name)
  523. # Remove adjacent range before the end of transaction.
  524. adjacent_range.delete()
  525. RangesModel.objects.create(ints=(10, 19))
  526. RangesModel.objects.create(ints=(51, 60))
  527. @skipUnlessDBFeature('supports_covering_gist_indexes')
  528. def test_range_adjacent_include(self):
  529. constraint_name = 'ints_adjacent_include'
  530. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  531. constraint = ExclusionConstraint(
  532. name=constraint_name,
  533. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  534. include=['decimals', 'ints'],
  535. index_type='gist',
  536. )
  537. with connection.schema_editor() as editor:
  538. editor.add_constraint(RangesModel, constraint)
  539. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  540. RangesModel.objects.create(ints=(20, 50))
  541. with self.assertRaises(IntegrityError), transaction.atomic():
  542. RangesModel.objects.create(ints=(10, 20))
  543. RangesModel.objects.create(ints=(10, 19))
  544. RangesModel.objects.create(ints=(51, 60))
  545. @skipUnlessDBFeature('supports_covering_gist_indexes')
  546. def test_range_adjacent_include_condition(self):
  547. constraint_name = 'ints_adjacent_include_condition'
  548. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  549. constraint = ExclusionConstraint(
  550. name=constraint_name,
  551. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  552. include=['decimals'],
  553. condition=Q(id__gte=100),
  554. )
  555. with connection.schema_editor() as editor:
  556. editor.add_constraint(RangesModel, constraint)
  557. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  558. @skipUnlessDBFeature('supports_covering_gist_indexes')
  559. def test_range_adjacent_include_deferrable(self):
  560. constraint_name = 'ints_adjacent_include_deferrable'
  561. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  562. constraint = ExclusionConstraint(
  563. name=constraint_name,
  564. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  565. include=['decimals'],
  566. deferrable=Deferrable.DEFERRED,
  567. )
  568. with connection.schema_editor() as editor:
  569. editor.add_constraint(RangesModel, constraint)
  570. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  571. def test_include_not_supported(self):
  572. constraint_name = 'ints_adjacent_include_not_supported'
  573. constraint = ExclusionConstraint(
  574. name=constraint_name,
  575. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  576. include=['id'],
  577. )
  578. msg = 'Covering exclusion constraints requires PostgreSQL 12+.'
  579. with connection.schema_editor() as editor:
  580. with mock.patch(
  581. 'django.db.backends.postgresql.features.DatabaseFeatures.supports_covering_gist_indexes',
  582. False,
  583. ):
  584. with self.assertRaisesMessage(NotSupportedError, msg):
  585. editor.add_constraint(RangesModel, constraint)