test_constraints.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756
  1. import datetime
  2. from unittest import mock
  3. from django.db import (
  4. IntegrityError, NotSupportedError, connection, transaction,
  5. )
  6. from django.db.models import (
  7. CheckConstraint, Deferrable, F, Func, Q, UniqueConstraint,
  8. )
  9. from django.test import skipUnlessDBFeature
  10. from django.utils import timezone
  11. from . import PostgreSQLTestCase
  12. from .models import HotelReservation, RangesModel, Room, Scene
  13. try:
  14. from psycopg2.extras import DateRange, NumericRange
  15. from django.contrib.postgres.constraints import ExclusionConstraint
  16. from django.contrib.postgres.fields import (
  17. DateTimeRangeField, RangeBoundary, RangeOperators,
  18. )
  19. except ImportError:
  20. pass
  21. class SchemaTests(PostgreSQLTestCase):
  22. get_opclass_query = '''
  23. SELECT opcname, c.relname FROM pg_opclass AS oc
  24. JOIN pg_index as i on oc.oid = ANY(i.indclass)
  25. JOIN pg_class as c on c.oid = i.indexrelid
  26. WHERE c.relname = %s
  27. '''
  28. def get_constraints(self, table):
  29. """Get the constraints on the table using a new cursor."""
  30. with connection.cursor() as cursor:
  31. return connection.introspection.get_constraints(cursor, table)
  32. def test_check_constraint_range_value(self):
  33. constraint_name = 'ints_between'
  34. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  35. constraint = CheckConstraint(
  36. check=Q(ints__contained_by=NumericRange(10, 30)),
  37. name=constraint_name,
  38. )
  39. with connection.schema_editor() as editor:
  40. editor.add_constraint(RangesModel, constraint)
  41. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  42. with self.assertRaises(IntegrityError), transaction.atomic():
  43. RangesModel.objects.create(ints=(20, 50))
  44. RangesModel.objects.create(ints=(10, 30))
  45. def test_check_constraint_daterange_contains(self):
  46. constraint_name = 'dates_contains'
  47. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  48. constraint = CheckConstraint(
  49. check=Q(dates__contains=F('dates_inner')),
  50. name=constraint_name,
  51. )
  52. with connection.schema_editor() as editor:
  53. editor.add_constraint(RangesModel, constraint)
  54. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  55. date_1 = datetime.date(2016, 1, 1)
  56. date_2 = datetime.date(2016, 1, 4)
  57. with self.assertRaises(IntegrityError), transaction.atomic():
  58. RangesModel.objects.create(
  59. dates=(date_1, date_2),
  60. dates_inner=(date_1, date_2.replace(day=5)),
  61. )
  62. RangesModel.objects.create(
  63. dates=(date_1, date_2),
  64. dates_inner=(date_1, date_2),
  65. )
  66. def test_check_constraint_datetimerange_contains(self):
  67. constraint_name = 'timestamps_contains'
  68. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  69. constraint = CheckConstraint(
  70. check=Q(timestamps__contains=F('timestamps_inner')),
  71. name=constraint_name,
  72. )
  73. with connection.schema_editor() as editor:
  74. editor.add_constraint(RangesModel, constraint)
  75. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  76. datetime_1 = datetime.datetime(2016, 1, 1)
  77. datetime_2 = datetime.datetime(2016, 1, 2, 12)
  78. with self.assertRaises(IntegrityError), transaction.atomic():
  79. RangesModel.objects.create(
  80. timestamps=(datetime_1, datetime_2),
  81. timestamps_inner=(datetime_1, datetime_2.replace(hour=13)),
  82. )
  83. RangesModel.objects.create(
  84. timestamps=(datetime_1, datetime_2),
  85. timestamps_inner=(datetime_1, datetime_2),
  86. )
  87. def test_opclass(self):
  88. constraint = UniqueConstraint(
  89. name='test_opclass',
  90. fields=['scene'],
  91. opclasses=['varchar_pattern_ops'],
  92. )
  93. with connection.schema_editor() as editor:
  94. editor.add_constraint(Scene, constraint)
  95. self.assertIn(constraint.name, self.get_constraints(Scene._meta.db_table))
  96. with editor.connection.cursor() as cursor:
  97. cursor.execute(self.get_opclass_query, [constraint.name])
  98. self.assertEqual(
  99. cursor.fetchall(),
  100. [('varchar_pattern_ops', constraint.name)],
  101. )
  102. # Drop the constraint.
  103. with connection.schema_editor() as editor:
  104. editor.remove_constraint(Scene, constraint)
  105. self.assertNotIn(constraint.name, self.get_constraints(Scene._meta.db_table))
  106. def test_opclass_multiple_columns(self):
  107. constraint = UniqueConstraint(
  108. name='test_opclass_multiple',
  109. fields=['scene', 'setting'],
  110. opclasses=['varchar_pattern_ops', 'text_pattern_ops'],
  111. )
  112. with connection.schema_editor() as editor:
  113. editor.add_constraint(Scene, constraint)
  114. with editor.connection.cursor() as cursor:
  115. cursor.execute(self.get_opclass_query, [constraint.name])
  116. expected_opclasses = (
  117. ('varchar_pattern_ops', constraint.name),
  118. ('text_pattern_ops', constraint.name),
  119. )
  120. self.assertCountEqual(cursor.fetchall(), expected_opclasses)
  121. def test_opclass_partial(self):
  122. constraint = UniqueConstraint(
  123. name='test_opclass_partial',
  124. fields=['scene'],
  125. opclasses=['varchar_pattern_ops'],
  126. condition=Q(setting__contains="Sir Bedemir's Castle"),
  127. )
  128. with connection.schema_editor() as editor:
  129. editor.add_constraint(Scene, constraint)
  130. with editor.connection.cursor() as cursor:
  131. cursor.execute(self.get_opclass_query, [constraint.name])
  132. self.assertCountEqual(
  133. cursor.fetchall(),
  134. [('varchar_pattern_ops', constraint.name)],
  135. )
  136. @skipUnlessDBFeature('supports_covering_indexes')
  137. def test_opclass_include(self):
  138. constraint = UniqueConstraint(
  139. name='test_opclass_include',
  140. fields=['scene'],
  141. opclasses=['varchar_pattern_ops'],
  142. include=['setting'],
  143. )
  144. with connection.schema_editor() as editor:
  145. editor.add_constraint(Scene, constraint)
  146. with editor.connection.cursor() as cursor:
  147. cursor.execute(self.get_opclass_query, [constraint.name])
  148. self.assertCountEqual(
  149. cursor.fetchall(),
  150. [('varchar_pattern_ops', constraint.name)],
  151. )
  152. class ExclusionConstraintTests(PostgreSQLTestCase):
  153. def get_constraints(self, table):
  154. """Get the constraints on the table using a new cursor."""
  155. with connection.cursor() as cursor:
  156. return connection.introspection.get_constraints(cursor, table)
  157. def test_invalid_condition(self):
  158. msg = 'ExclusionConstraint.condition must be a Q instance.'
  159. with self.assertRaisesMessage(ValueError, msg):
  160. ExclusionConstraint(
  161. index_type='GIST',
  162. name='exclude_invalid_condition',
  163. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  164. condition=F('invalid'),
  165. )
  166. def test_invalid_index_type(self):
  167. msg = 'Exclusion constraints only support GiST or SP-GiST indexes.'
  168. with self.assertRaisesMessage(ValueError, msg):
  169. ExclusionConstraint(
  170. index_type='gin',
  171. name='exclude_invalid_index_type',
  172. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  173. )
  174. def test_invalid_expressions(self):
  175. msg = 'The expressions must be a list of 2-tuples.'
  176. for expressions in (['foo'], [('foo')], [('foo_1', 'foo_2', 'foo_3')]):
  177. with self.subTest(expressions), self.assertRaisesMessage(ValueError, msg):
  178. ExclusionConstraint(
  179. index_type='GIST',
  180. name='exclude_invalid_expressions',
  181. expressions=expressions,
  182. )
  183. def test_empty_expressions(self):
  184. msg = 'At least one expression is required to define an exclusion constraint.'
  185. for empty_expressions in (None, []):
  186. with self.subTest(empty_expressions), self.assertRaisesMessage(ValueError, msg):
  187. ExclusionConstraint(
  188. index_type='GIST',
  189. name='exclude_empty_expressions',
  190. expressions=empty_expressions,
  191. )
  192. def test_invalid_deferrable(self):
  193. msg = 'ExclusionConstraint.deferrable must be a Deferrable instance.'
  194. with self.assertRaisesMessage(ValueError, msg):
  195. ExclusionConstraint(
  196. name='exclude_invalid_deferrable',
  197. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  198. deferrable='invalid',
  199. )
  200. def test_deferrable_with_condition(self):
  201. msg = 'ExclusionConstraint with conditions cannot be deferred.'
  202. with self.assertRaisesMessage(ValueError, msg):
  203. ExclusionConstraint(
  204. name='exclude_invalid_condition',
  205. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  206. condition=Q(cancelled=False),
  207. deferrable=Deferrable.DEFERRED,
  208. )
  209. def test_invalid_include_type(self):
  210. msg = 'ExclusionConstraint.include must be a list or tuple.'
  211. with self.assertRaisesMessage(ValueError, msg):
  212. ExclusionConstraint(
  213. name='exclude_invalid_include',
  214. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  215. include='invalid',
  216. )
  217. def test_invalid_include_index_type(self):
  218. msg = 'Covering exclusion constraints only support GiST indexes.'
  219. with self.assertRaisesMessage(ValueError, msg):
  220. ExclusionConstraint(
  221. name='exclude_invalid_index_type',
  222. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  223. include=['cancelled'],
  224. index_type='spgist',
  225. )
  226. def test_invalid_opclasses_type(self):
  227. msg = 'ExclusionConstraint.opclasses must be a list or tuple.'
  228. with self.assertRaisesMessage(ValueError, msg):
  229. ExclusionConstraint(
  230. name='exclude_invalid_opclasses',
  231. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  232. opclasses='invalid',
  233. )
  234. def test_opclasses_and_expressions_same_length(self):
  235. msg = (
  236. 'ExclusionConstraint.expressions and '
  237. 'ExclusionConstraint.opclasses must have the same number of '
  238. 'elements.'
  239. )
  240. with self.assertRaisesMessage(ValueError, msg):
  241. ExclusionConstraint(
  242. name='exclude_invalid_expressions_opclasses_length',
  243. expressions=[(F('datespan'), RangeOperators.OVERLAPS)],
  244. opclasses=['foo', 'bar'],
  245. )
  246. def test_repr(self):
  247. constraint = ExclusionConstraint(
  248. name='exclude_overlapping',
  249. expressions=[
  250. (F('datespan'), RangeOperators.OVERLAPS),
  251. (F('room'), RangeOperators.EQUAL),
  252. ],
  253. )
  254. self.assertEqual(
  255. repr(constraint),
  256. "<ExclusionConstraint: index_type=GIST, expressions=["
  257. "(F(datespan), '&&'), (F(room), '=')]>",
  258. )
  259. constraint = ExclusionConstraint(
  260. name='exclude_overlapping',
  261. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  262. condition=Q(cancelled=False),
  263. index_type='SPGiST',
  264. )
  265. self.assertEqual(
  266. repr(constraint),
  267. "<ExclusionConstraint: index_type=SPGiST, expressions=["
  268. "(F(datespan), '-|-')], condition=(AND: ('cancelled', False))>",
  269. )
  270. constraint = ExclusionConstraint(
  271. name='exclude_overlapping',
  272. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  273. deferrable=Deferrable.IMMEDIATE,
  274. )
  275. self.assertEqual(
  276. repr(constraint),
  277. "<ExclusionConstraint: index_type=GIST, expressions=["
  278. "(F(datespan), '-|-')], deferrable=Deferrable.IMMEDIATE>",
  279. )
  280. constraint = ExclusionConstraint(
  281. name='exclude_overlapping',
  282. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  283. include=['cancelled', 'room'],
  284. )
  285. self.assertEqual(
  286. repr(constraint),
  287. "<ExclusionConstraint: index_type=GIST, expressions=["
  288. "(F(datespan), '-|-')], include=('cancelled', 'room')>",
  289. )
  290. constraint = ExclusionConstraint(
  291. name='exclude_overlapping',
  292. expressions=[(F('datespan'), RangeOperators.ADJACENT_TO)],
  293. opclasses=['range_ops'],
  294. )
  295. self.assertEqual(
  296. repr(constraint),
  297. "<ExclusionConstraint: index_type=GIST, expressions=["
  298. "(F(datespan), '-|-')], opclasses=['range_ops']>",
  299. )
  300. def test_eq(self):
  301. constraint_1 = ExclusionConstraint(
  302. name='exclude_overlapping',
  303. expressions=[
  304. (F('datespan'), RangeOperators.OVERLAPS),
  305. (F('room'), RangeOperators.EQUAL),
  306. ],
  307. condition=Q(cancelled=False),
  308. )
  309. constraint_2 = ExclusionConstraint(
  310. name='exclude_overlapping',
  311. expressions=[
  312. ('datespan', RangeOperators.OVERLAPS),
  313. ('room', RangeOperators.EQUAL),
  314. ],
  315. )
  316. constraint_3 = ExclusionConstraint(
  317. name='exclude_overlapping',
  318. expressions=[('datespan', RangeOperators.OVERLAPS)],
  319. condition=Q(cancelled=False),
  320. )
  321. constraint_4 = ExclusionConstraint(
  322. name='exclude_overlapping',
  323. expressions=[
  324. ('datespan', RangeOperators.OVERLAPS),
  325. ('room', RangeOperators.EQUAL),
  326. ],
  327. deferrable=Deferrable.DEFERRED,
  328. )
  329. constraint_5 = ExclusionConstraint(
  330. name='exclude_overlapping',
  331. expressions=[
  332. ('datespan', RangeOperators.OVERLAPS),
  333. ('room', RangeOperators.EQUAL),
  334. ],
  335. deferrable=Deferrable.IMMEDIATE,
  336. )
  337. constraint_6 = ExclusionConstraint(
  338. name='exclude_overlapping',
  339. expressions=[
  340. ('datespan', RangeOperators.OVERLAPS),
  341. ('room', RangeOperators.EQUAL),
  342. ],
  343. deferrable=Deferrable.IMMEDIATE,
  344. include=['cancelled'],
  345. )
  346. constraint_7 = ExclusionConstraint(
  347. name='exclude_overlapping',
  348. expressions=[
  349. ('datespan', RangeOperators.OVERLAPS),
  350. ('room', RangeOperators.EQUAL),
  351. ],
  352. include=['cancelled'],
  353. )
  354. constraint_8 = ExclusionConstraint(
  355. name='exclude_overlapping',
  356. expressions=[
  357. ('datespan', RangeOperators.OVERLAPS),
  358. ('room', RangeOperators.EQUAL),
  359. ],
  360. include=['cancelled'],
  361. opclasses=['range_ops', 'range_ops']
  362. )
  363. constraint_9 = ExclusionConstraint(
  364. name='exclude_overlapping',
  365. expressions=[
  366. ('datespan', RangeOperators.OVERLAPS),
  367. ('room', RangeOperators.EQUAL),
  368. ],
  369. opclasses=['range_ops', 'range_ops']
  370. )
  371. self.assertEqual(constraint_1, constraint_1)
  372. self.assertEqual(constraint_1, mock.ANY)
  373. self.assertNotEqual(constraint_1, constraint_2)
  374. self.assertNotEqual(constraint_1, constraint_3)
  375. self.assertNotEqual(constraint_1, constraint_4)
  376. self.assertNotEqual(constraint_2, constraint_3)
  377. self.assertNotEqual(constraint_2, constraint_4)
  378. self.assertNotEqual(constraint_2, constraint_7)
  379. self.assertNotEqual(constraint_2, constraint_9)
  380. self.assertNotEqual(constraint_4, constraint_5)
  381. self.assertNotEqual(constraint_5, constraint_6)
  382. self.assertNotEqual(constraint_7, constraint_8)
  383. self.assertNotEqual(constraint_1, object())
  384. def test_deconstruct(self):
  385. constraint = ExclusionConstraint(
  386. name='exclude_overlapping',
  387. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  388. )
  389. path, args, kwargs = constraint.deconstruct()
  390. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  391. self.assertEqual(args, ())
  392. self.assertEqual(kwargs, {
  393. 'name': 'exclude_overlapping',
  394. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  395. })
  396. def test_deconstruct_index_type(self):
  397. constraint = ExclusionConstraint(
  398. name='exclude_overlapping',
  399. index_type='SPGIST',
  400. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  401. )
  402. path, args, kwargs = constraint.deconstruct()
  403. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  404. self.assertEqual(args, ())
  405. self.assertEqual(kwargs, {
  406. 'name': 'exclude_overlapping',
  407. 'index_type': 'SPGIST',
  408. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  409. })
  410. def test_deconstruct_condition(self):
  411. constraint = ExclusionConstraint(
  412. name='exclude_overlapping',
  413. expressions=[('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  414. condition=Q(cancelled=False),
  415. )
  416. path, args, kwargs = constraint.deconstruct()
  417. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  418. self.assertEqual(args, ())
  419. self.assertEqual(kwargs, {
  420. 'name': 'exclude_overlapping',
  421. 'expressions': [('datespan', RangeOperators.OVERLAPS), ('room', RangeOperators.EQUAL)],
  422. 'condition': Q(cancelled=False),
  423. })
  424. def test_deconstruct_deferrable(self):
  425. constraint = ExclusionConstraint(
  426. name='exclude_overlapping',
  427. expressions=[('datespan', RangeOperators.OVERLAPS)],
  428. deferrable=Deferrable.DEFERRED,
  429. )
  430. path, args, kwargs = constraint.deconstruct()
  431. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  432. self.assertEqual(args, ())
  433. self.assertEqual(kwargs, {
  434. 'name': 'exclude_overlapping',
  435. 'expressions': [('datespan', RangeOperators.OVERLAPS)],
  436. 'deferrable': Deferrable.DEFERRED,
  437. })
  438. def test_deconstruct_include(self):
  439. constraint = ExclusionConstraint(
  440. name='exclude_overlapping',
  441. expressions=[('datespan', RangeOperators.OVERLAPS)],
  442. include=['cancelled', 'room'],
  443. )
  444. path, args, kwargs = constraint.deconstruct()
  445. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  446. self.assertEqual(args, ())
  447. self.assertEqual(kwargs, {
  448. 'name': 'exclude_overlapping',
  449. 'expressions': [('datespan', RangeOperators.OVERLAPS)],
  450. 'include': ('cancelled', 'room'),
  451. })
  452. def test_deconstruct_opclasses(self):
  453. constraint = ExclusionConstraint(
  454. name='exclude_overlapping',
  455. expressions=[('datespan', RangeOperators.OVERLAPS)],
  456. opclasses=['range_ops'],
  457. )
  458. path, args, kwargs = constraint.deconstruct()
  459. self.assertEqual(path, 'django.contrib.postgres.constraints.ExclusionConstraint')
  460. self.assertEqual(args, ())
  461. self.assertEqual(kwargs, {
  462. 'name': 'exclude_overlapping',
  463. 'expressions': [('datespan', RangeOperators.OVERLAPS)],
  464. 'opclasses': ['range_ops'],
  465. })
  466. def _test_range_overlaps(self, constraint):
  467. # Create exclusion constraint.
  468. self.assertNotIn(constraint.name, self.get_constraints(HotelReservation._meta.db_table))
  469. with connection.schema_editor() as editor:
  470. editor.add_constraint(HotelReservation, constraint)
  471. self.assertIn(constraint.name, self.get_constraints(HotelReservation._meta.db_table))
  472. # Add initial reservations.
  473. room101 = Room.objects.create(number=101)
  474. room102 = Room.objects.create(number=102)
  475. datetimes = [
  476. timezone.datetime(2018, 6, 20),
  477. timezone.datetime(2018, 6, 24),
  478. timezone.datetime(2018, 6, 26),
  479. timezone.datetime(2018, 6, 28),
  480. timezone.datetime(2018, 6, 29),
  481. ]
  482. HotelReservation.objects.create(
  483. datespan=DateRange(datetimes[0].date(), datetimes[1].date()),
  484. start=datetimes[0],
  485. end=datetimes[1],
  486. room=room102,
  487. )
  488. HotelReservation.objects.create(
  489. datespan=DateRange(datetimes[1].date(), datetimes[3].date()),
  490. start=datetimes[1],
  491. end=datetimes[3],
  492. room=room102,
  493. )
  494. # Overlap dates.
  495. with self.assertRaises(IntegrityError), transaction.atomic():
  496. reservation = HotelReservation(
  497. datespan=(datetimes[1].date(), datetimes[2].date()),
  498. start=datetimes[1],
  499. end=datetimes[2],
  500. room=room102,
  501. )
  502. reservation.save()
  503. # Valid range.
  504. HotelReservation.objects.bulk_create([
  505. # Other room.
  506. HotelReservation(
  507. datespan=(datetimes[1].date(), datetimes[2].date()),
  508. start=datetimes[1],
  509. end=datetimes[2],
  510. room=room101,
  511. ),
  512. # Cancelled reservation.
  513. HotelReservation(
  514. datespan=(datetimes[1].date(), datetimes[1].date()),
  515. start=datetimes[1],
  516. end=datetimes[2],
  517. room=room102,
  518. cancelled=True,
  519. ),
  520. # Other adjacent dates.
  521. HotelReservation(
  522. datespan=(datetimes[3].date(), datetimes[4].date()),
  523. start=datetimes[3],
  524. end=datetimes[4],
  525. room=room102,
  526. ),
  527. ])
  528. def test_range_overlaps_custom(self):
  529. class TsTzRange(Func):
  530. function = 'TSTZRANGE'
  531. output_field = DateTimeRangeField()
  532. constraint = ExclusionConstraint(
  533. name='exclude_overlapping_reservations_custom',
  534. expressions=[
  535. (TsTzRange('start', 'end', RangeBoundary()), RangeOperators.OVERLAPS),
  536. ('room', RangeOperators.EQUAL)
  537. ],
  538. condition=Q(cancelled=False),
  539. opclasses=['range_ops', 'gist_int4_ops'],
  540. )
  541. self._test_range_overlaps(constraint)
  542. def test_range_overlaps(self):
  543. constraint = ExclusionConstraint(
  544. name='exclude_overlapping_reservations',
  545. expressions=[
  546. (F('datespan'), RangeOperators.OVERLAPS),
  547. ('room', RangeOperators.EQUAL)
  548. ],
  549. condition=Q(cancelled=False),
  550. )
  551. self._test_range_overlaps(constraint)
  552. def test_range_adjacent(self):
  553. constraint_name = 'ints_adjacent'
  554. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  555. constraint = ExclusionConstraint(
  556. name=constraint_name,
  557. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  558. )
  559. with connection.schema_editor() as editor:
  560. editor.add_constraint(RangesModel, constraint)
  561. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  562. RangesModel.objects.create(ints=(20, 50))
  563. with self.assertRaises(IntegrityError), transaction.atomic():
  564. RangesModel.objects.create(ints=(10, 20))
  565. RangesModel.objects.create(ints=(10, 19))
  566. RangesModel.objects.create(ints=(51, 60))
  567. # Drop the constraint.
  568. with connection.schema_editor() as editor:
  569. editor.remove_constraint(RangesModel, constraint)
  570. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  571. def test_range_adjacent_initially_deferred(self):
  572. constraint_name = 'ints_adjacent_deferred'
  573. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  574. constraint = ExclusionConstraint(
  575. name=constraint_name,
  576. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  577. deferrable=Deferrable.DEFERRED,
  578. )
  579. with connection.schema_editor() as editor:
  580. editor.add_constraint(RangesModel, constraint)
  581. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  582. RangesModel.objects.create(ints=(20, 50))
  583. adjacent_range = RangesModel.objects.create(ints=(10, 20))
  584. # Constraint behavior can be changed with SET CONSTRAINTS.
  585. with self.assertRaises(IntegrityError):
  586. with transaction.atomic(), connection.cursor() as cursor:
  587. quoted_name = connection.ops.quote_name(constraint_name)
  588. cursor.execute('SET CONSTRAINTS %s IMMEDIATE' % quoted_name)
  589. # Remove adjacent range before the end of transaction.
  590. adjacent_range.delete()
  591. RangesModel.objects.create(ints=(10, 19))
  592. RangesModel.objects.create(ints=(51, 60))
  593. @skipUnlessDBFeature('supports_covering_gist_indexes')
  594. def test_range_adjacent_include(self):
  595. constraint_name = 'ints_adjacent_include'
  596. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  597. constraint = ExclusionConstraint(
  598. name=constraint_name,
  599. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  600. include=['decimals', 'ints'],
  601. index_type='gist',
  602. )
  603. with connection.schema_editor() as editor:
  604. editor.add_constraint(RangesModel, constraint)
  605. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  606. RangesModel.objects.create(ints=(20, 50))
  607. with self.assertRaises(IntegrityError), transaction.atomic():
  608. RangesModel.objects.create(ints=(10, 20))
  609. RangesModel.objects.create(ints=(10, 19))
  610. RangesModel.objects.create(ints=(51, 60))
  611. @skipUnlessDBFeature('supports_covering_gist_indexes')
  612. def test_range_adjacent_include_condition(self):
  613. constraint_name = 'ints_adjacent_include_condition'
  614. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  615. constraint = ExclusionConstraint(
  616. name=constraint_name,
  617. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  618. include=['decimals'],
  619. condition=Q(id__gte=100),
  620. )
  621. with connection.schema_editor() as editor:
  622. editor.add_constraint(RangesModel, constraint)
  623. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  624. @skipUnlessDBFeature('supports_covering_gist_indexes')
  625. def test_range_adjacent_include_deferrable(self):
  626. constraint_name = 'ints_adjacent_include_deferrable'
  627. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  628. constraint = ExclusionConstraint(
  629. name=constraint_name,
  630. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  631. include=['decimals'],
  632. deferrable=Deferrable.DEFERRED,
  633. )
  634. with connection.schema_editor() as editor:
  635. editor.add_constraint(RangesModel, constraint)
  636. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  637. def test_include_not_supported(self):
  638. constraint_name = 'ints_adjacent_include_not_supported'
  639. constraint = ExclusionConstraint(
  640. name=constraint_name,
  641. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  642. include=['id'],
  643. )
  644. msg = 'Covering exclusion constraints requires PostgreSQL 12+.'
  645. with connection.schema_editor() as editor:
  646. with mock.patch(
  647. 'django.db.backends.postgresql.features.DatabaseFeatures.supports_covering_gist_indexes',
  648. False,
  649. ):
  650. with self.assertRaisesMessage(NotSupportedError, msg):
  651. editor.add_constraint(RangesModel, constraint)
  652. def test_range_adjacent_opclasses(self):
  653. constraint_name = 'ints_adjacent_opclasses'
  654. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  655. constraint = ExclusionConstraint(
  656. name=constraint_name,
  657. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  658. opclasses=['range_ops'],
  659. )
  660. with connection.schema_editor() as editor:
  661. editor.add_constraint(RangesModel, constraint)
  662. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  663. RangesModel.objects.create(ints=(20, 50))
  664. with self.assertRaises(IntegrityError), transaction.atomic():
  665. RangesModel.objects.create(ints=(10, 20))
  666. RangesModel.objects.create(ints=(10, 19))
  667. RangesModel.objects.create(ints=(51, 60))
  668. # Drop the constraint.
  669. with connection.schema_editor() as editor:
  670. editor.remove_constraint(RangesModel, constraint)
  671. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  672. def test_range_adjacent_opclasses_condition(self):
  673. constraint_name = 'ints_adjacent_opclasses_condition'
  674. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  675. constraint = ExclusionConstraint(
  676. name=constraint_name,
  677. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  678. opclasses=['range_ops'],
  679. condition=Q(id__gte=100),
  680. )
  681. with connection.schema_editor() as editor:
  682. editor.add_constraint(RangesModel, constraint)
  683. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  684. def test_range_adjacent_opclasses_deferrable(self):
  685. constraint_name = 'ints_adjacent_opclasses_deferrable'
  686. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  687. constraint = ExclusionConstraint(
  688. name=constraint_name,
  689. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  690. opclasses=['range_ops'],
  691. deferrable=Deferrable.DEFERRED,
  692. )
  693. with connection.schema_editor() as editor:
  694. editor.add_constraint(RangesModel, constraint)
  695. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  696. @skipUnlessDBFeature('supports_covering_gist_indexes')
  697. def test_range_adjacent_opclasses_include(self):
  698. constraint_name = 'ints_adjacent_opclasses_include'
  699. self.assertNotIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))
  700. constraint = ExclusionConstraint(
  701. name=constraint_name,
  702. expressions=[('ints', RangeOperators.ADJACENT_TO)],
  703. opclasses=['range_ops'],
  704. include=['decimals'],
  705. )
  706. with connection.schema_editor() as editor:
  707. editor.add_constraint(RangesModel, constraint)
  708. self.assertIn(constraint_name, self.get_constraints(RangesModel._meta.db_table))