test_operations.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. import unittest
  2. from migrations.test_base import OperationTestBase
  3. from django.db import IntegrityError, NotSupportedError, connection, transaction
  4. from django.db.migrations.state import ProjectState
  5. from django.db.models import CheckConstraint, Index, Q, UniqueConstraint
  6. from django.db.utils import ProgrammingError
  7. from django.test import modify_settings, override_settings
  8. from django.test.utils import CaptureQueriesContext
  9. from . import PostgreSQLTestCase
  10. try:
  11. from django.contrib.postgres.indexes import BrinIndex, BTreeIndex
  12. from django.contrib.postgres.operations import (
  13. AddConstraintNotValid,
  14. AddIndexConcurrently,
  15. BloomExtension,
  16. CreateCollation,
  17. CreateExtension,
  18. RemoveCollation,
  19. RemoveIndexConcurrently,
  20. ValidateConstraint,
  21. )
  22. except ImportError:
  23. pass
  24. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  25. @modify_settings(INSTALLED_APPS={"append": "migrations"})
  26. class AddIndexConcurrentlyTests(OperationTestBase):
  27. app_label = "test_add_concurrently"
  28. def test_requires_atomic_false(self):
  29. project_state = self.set_up_test_model(self.app_label)
  30. new_state = project_state.clone()
  31. operation = AddIndexConcurrently(
  32. "Pony",
  33. Index(fields=["pink"], name="pony_pink_idx"),
  34. )
  35. msg = (
  36. "The AddIndexConcurrently operation cannot be executed inside "
  37. "a transaction (set atomic = False on the migration)."
  38. )
  39. with self.assertRaisesMessage(NotSupportedError, msg):
  40. with connection.schema_editor(atomic=True) as editor:
  41. operation.database_forwards(
  42. self.app_label, editor, project_state, new_state
  43. )
  44. def test_add(self):
  45. project_state = self.set_up_test_model(self.app_label, index=False)
  46. table_name = "%s_pony" % self.app_label
  47. index = Index(fields=["pink"], name="pony_pink_idx")
  48. new_state = project_state.clone()
  49. operation = AddIndexConcurrently("Pony", index)
  50. self.assertEqual(
  51. operation.describe(),
  52. "Concurrently create index pony_pink_idx on field(s) pink of model Pony",
  53. )
  54. self.assertEqual(
  55. operation.formatted_description(),
  56. "+ Concurrently create index pony_pink_idx on field(s) pink of model Pony",
  57. )
  58. operation.state_forwards(self.app_label, new_state)
  59. self.assertEqual(
  60. len(new_state.models[self.app_label, "pony"].options["indexes"]), 1
  61. )
  62. self.assertIndexNotExists(table_name, ["pink"])
  63. # Add index.
  64. with connection.schema_editor(atomic=False) as editor:
  65. operation.database_forwards(
  66. self.app_label, editor, project_state, new_state
  67. )
  68. self.assertIndexExists(table_name, ["pink"])
  69. # Reversal.
  70. with connection.schema_editor(atomic=False) as editor:
  71. operation.database_backwards(
  72. self.app_label, editor, new_state, project_state
  73. )
  74. self.assertIndexNotExists(table_name, ["pink"])
  75. # Deconstruction.
  76. name, args, kwargs = operation.deconstruct()
  77. self.assertEqual(name, "AddIndexConcurrently")
  78. self.assertEqual(args, [])
  79. self.assertEqual(kwargs, {"model_name": "Pony", "index": index})
  80. def test_add_other_index_type(self):
  81. project_state = self.set_up_test_model(self.app_label, index=False)
  82. table_name = "%s_pony" % self.app_label
  83. new_state = project_state.clone()
  84. operation = AddIndexConcurrently(
  85. "Pony",
  86. BrinIndex(fields=["pink"], name="pony_pink_brin_idx"),
  87. )
  88. self.assertIndexNotExists(table_name, ["pink"])
  89. # Add index.
  90. with connection.schema_editor(atomic=False) as editor:
  91. operation.database_forwards(
  92. self.app_label, editor, project_state, new_state
  93. )
  94. self.assertIndexExists(table_name, ["pink"], index_type="brin")
  95. # Reversal.
  96. with connection.schema_editor(atomic=False) as editor:
  97. operation.database_backwards(
  98. self.app_label, editor, new_state, project_state
  99. )
  100. self.assertIndexNotExists(table_name, ["pink"])
  101. def test_add_with_options(self):
  102. project_state = self.set_up_test_model(self.app_label, index=False)
  103. table_name = "%s_pony" % self.app_label
  104. new_state = project_state.clone()
  105. index = BTreeIndex(fields=["pink"], name="pony_pink_btree_idx", fillfactor=70)
  106. operation = AddIndexConcurrently("Pony", index)
  107. self.assertIndexNotExists(table_name, ["pink"])
  108. # Add index.
  109. with connection.schema_editor(atomic=False) as editor:
  110. operation.database_forwards(
  111. self.app_label, editor, project_state, new_state
  112. )
  113. self.assertIndexExists(table_name, ["pink"], index_type="btree")
  114. # Reversal.
  115. with connection.schema_editor(atomic=False) as editor:
  116. operation.database_backwards(
  117. self.app_label, editor, new_state, project_state
  118. )
  119. self.assertIndexNotExists(table_name, ["pink"])
  120. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  121. @modify_settings(INSTALLED_APPS={"append": "migrations"})
  122. class RemoveIndexConcurrentlyTests(OperationTestBase):
  123. app_label = "test_rm_concurrently"
  124. def test_requires_atomic_false(self):
  125. project_state = self.set_up_test_model(self.app_label, index=True)
  126. new_state = project_state.clone()
  127. operation = RemoveIndexConcurrently("Pony", "pony_pink_idx")
  128. msg = (
  129. "The RemoveIndexConcurrently operation cannot be executed inside "
  130. "a transaction (set atomic = False on the migration)."
  131. )
  132. with self.assertRaisesMessage(NotSupportedError, msg):
  133. with connection.schema_editor(atomic=True) as editor:
  134. operation.database_forwards(
  135. self.app_label, editor, project_state, new_state
  136. )
  137. def test_remove(self):
  138. project_state = self.set_up_test_model(self.app_label, index=True)
  139. table_name = "%s_pony" % self.app_label
  140. self.assertTableExists(table_name)
  141. new_state = project_state.clone()
  142. operation = RemoveIndexConcurrently("Pony", "pony_pink_idx")
  143. self.assertEqual(
  144. operation.describe(),
  145. "Concurrently remove index pony_pink_idx from Pony",
  146. )
  147. self.assertEqual(
  148. operation.formatted_description(),
  149. "- Concurrently remove index pony_pink_idx from Pony",
  150. )
  151. operation.state_forwards(self.app_label, new_state)
  152. self.assertEqual(
  153. len(new_state.models[self.app_label, "pony"].options["indexes"]), 0
  154. )
  155. self.assertIndexExists(table_name, ["pink"])
  156. # Remove index.
  157. with connection.schema_editor(atomic=False) as editor:
  158. operation.database_forwards(
  159. self.app_label, editor, project_state, new_state
  160. )
  161. self.assertIndexNotExists(table_name, ["pink"])
  162. # Reversal.
  163. with connection.schema_editor(atomic=False) as editor:
  164. operation.database_backwards(
  165. self.app_label, editor, new_state, project_state
  166. )
  167. self.assertIndexExists(table_name, ["pink"])
  168. # Deconstruction.
  169. name, args, kwargs = operation.deconstruct()
  170. self.assertEqual(name, "RemoveIndexConcurrently")
  171. self.assertEqual(args, [])
  172. self.assertEqual(kwargs, {"model_name": "Pony", "name": "pony_pink_idx"})
  173. class NoMigrationRouter:
  174. def allow_migrate(self, db, app_label, **hints):
  175. return False
  176. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  177. class CreateExtensionTests(PostgreSQLTestCase):
  178. app_label = "test_allow_create_extention"
  179. @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
  180. def test_no_allow_migrate(self):
  181. operation = CreateExtension("tablefunc")
  182. self.assertEqual(
  183. operation.formatted_description(), "+ Creates extension tablefunc"
  184. )
  185. project_state = ProjectState()
  186. new_state = project_state.clone()
  187. # Don't create an extension.
  188. with CaptureQueriesContext(connection) as captured_queries:
  189. with connection.schema_editor(atomic=False) as editor:
  190. operation.database_forwards(
  191. self.app_label, editor, project_state, new_state
  192. )
  193. self.assertEqual(len(captured_queries), 0)
  194. # Reversal.
  195. with CaptureQueriesContext(connection) as captured_queries:
  196. with connection.schema_editor(atomic=False) as editor:
  197. operation.database_backwards(
  198. self.app_label, editor, new_state, project_state
  199. )
  200. self.assertEqual(len(captured_queries), 0)
  201. def test_allow_migrate(self):
  202. operation = CreateExtension("tablefunc")
  203. self.assertEqual(
  204. operation.migration_name_fragment, "create_extension_tablefunc"
  205. )
  206. project_state = ProjectState()
  207. new_state = project_state.clone()
  208. # Create an extension.
  209. with CaptureQueriesContext(connection) as captured_queries:
  210. with connection.schema_editor(atomic=False) as editor:
  211. operation.database_forwards(
  212. self.app_label, editor, project_state, new_state
  213. )
  214. self.assertEqual(len(captured_queries), 4)
  215. self.assertIn("CREATE EXTENSION IF NOT EXISTS", captured_queries[1]["sql"])
  216. # Reversal.
  217. with CaptureQueriesContext(connection) as captured_queries:
  218. with connection.schema_editor(atomic=False) as editor:
  219. operation.database_backwards(
  220. self.app_label, editor, new_state, project_state
  221. )
  222. self.assertEqual(len(captured_queries), 2)
  223. self.assertIn("DROP EXTENSION IF EXISTS", captured_queries[1]["sql"])
  224. def test_create_existing_extension(self):
  225. operation = BloomExtension()
  226. self.assertEqual(operation.migration_name_fragment, "create_extension_bloom")
  227. project_state = ProjectState()
  228. new_state = project_state.clone()
  229. # Don't create an existing extension.
  230. with CaptureQueriesContext(connection) as captured_queries:
  231. with connection.schema_editor(atomic=False) as editor:
  232. operation.database_forwards(
  233. self.app_label, editor, project_state, new_state
  234. )
  235. self.assertEqual(len(captured_queries), 3)
  236. self.assertIn("SELECT", captured_queries[0]["sql"])
  237. def test_drop_nonexistent_extension(self):
  238. operation = CreateExtension("tablefunc")
  239. project_state = ProjectState()
  240. new_state = project_state.clone()
  241. # Don't drop a nonexistent extension.
  242. with CaptureQueriesContext(connection) as captured_queries:
  243. with connection.schema_editor(atomic=False) as editor:
  244. operation.database_backwards(
  245. self.app_label, editor, project_state, new_state
  246. )
  247. self.assertEqual(len(captured_queries), 1)
  248. self.assertIn("SELECT", captured_queries[0]["sql"])
  249. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  250. class CreateCollationTests(PostgreSQLTestCase):
  251. app_label = "test_allow_create_collation"
  252. @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
  253. def test_no_allow_migrate(self):
  254. operation = CreateCollation("C_test", locale="C")
  255. project_state = ProjectState()
  256. new_state = project_state.clone()
  257. # Don't create a collation.
  258. with CaptureQueriesContext(connection) as captured_queries:
  259. with connection.schema_editor(atomic=False) as editor:
  260. operation.database_forwards(
  261. self.app_label, editor, project_state, new_state
  262. )
  263. self.assertEqual(len(captured_queries), 0)
  264. # Reversal.
  265. with CaptureQueriesContext(connection) as captured_queries:
  266. with connection.schema_editor(atomic=False) as editor:
  267. operation.database_backwards(
  268. self.app_label, editor, new_state, project_state
  269. )
  270. self.assertEqual(len(captured_queries), 0)
  271. def test_create(self):
  272. operation = CreateCollation("C_test", locale="C")
  273. self.assertEqual(operation.migration_name_fragment, "create_collation_c_test")
  274. self.assertEqual(operation.describe(), "Create collation C_test")
  275. self.assertEqual(operation.formatted_description(), "+ Create collation C_test")
  276. project_state = ProjectState()
  277. new_state = project_state.clone()
  278. # Create a collation.
  279. with CaptureQueriesContext(connection) as captured_queries:
  280. with connection.schema_editor(atomic=False) as editor:
  281. operation.database_forwards(
  282. self.app_label, editor, project_state, new_state
  283. )
  284. self.assertEqual(len(captured_queries), 1)
  285. self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
  286. # Creating the same collation raises an exception.
  287. with self.assertRaisesMessage(ProgrammingError, "already exists"):
  288. with connection.schema_editor(atomic=True) as editor:
  289. operation.database_forwards(
  290. self.app_label, editor, project_state, new_state
  291. )
  292. # Reversal.
  293. with CaptureQueriesContext(connection) as captured_queries:
  294. with connection.schema_editor(atomic=False) as editor:
  295. operation.database_backwards(
  296. self.app_label, editor, new_state, project_state
  297. )
  298. self.assertEqual(len(captured_queries), 1)
  299. self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
  300. # Deconstruction.
  301. name, args, kwargs = operation.deconstruct()
  302. self.assertEqual(name, "CreateCollation")
  303. self.assertEqual(args, [])
  304. self.assertEqual(kwargs, {"name": "C_test", "locale": "C"})
  305. def test_create_non_deterministic_collation(self):
  306. operation = CreateCollation(
  307. "case_insensitive_test",
  308. "und-u-ks-level2",
  309. provider="icu",
  310. deterministic=False,
  311. )
  312. project_state = ProjectState()
  313. new_state = project_state.clone()
  314. # Create a collation.
  315. with CaptureQueriesContext(connection) as captured_queries:
  316. with connection.schema_editor(atomic=False) as editor:
  317. operation.database_forwards(
  318. self.app_label, editor, project_state, new_state
  319. )
  320. self.assertEqual(len(captured_queries), 1)
  321. self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
  322. # Reversal.
  323. with CaptureQueriesContext(connection) as captured_queries:
  324. with connection.schema_editor(atomic=False) as editor:
  325. operation.database_backwards(
  326. self.app_label, editor, new_state, project_state
  327. )
  328. self.assertEqual(len(captured_queries), 1)
  329. self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
  330. # Deconstruction.
  331. name, args, kwargs = operation.deconstruct()
  332. self.assertEqual(name, "CreateCollation")
  333. self.assertEqual(args, [])
  334. self.assertEqual(
  335. kwargs,
  336. {
  337. "name": "case_insensitive_test",
  338. "locale": "und-u-ks-level2",
  339. "provider": "icu",
  340. "deterministic": False,
  341. },
  342. )
  343. def test_create_collation_alternate_provider(self):
  344. operation = CreateCollation(
  345. "german_phonebook_test",
  346. provider="icu",
  347. locale="de-u-co-phonebk",
  348. )
  349. project_state = ProjectState()
  350. new_state = project_state.clone()
  351. # Create an collation.
  352. with CaptureQueriesContext(connection) as captured_queries:
  353. with connection.schema_editor(atomic=False) as editor:
  354. operation.database_forwards(
  355. self.app_label, editor, project_state, new_state
  356. )
  357. self.assertEqual(len(captured_queries), 1)
  358. self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
  359. # Reversal.
  360. with CaptureQueriesContext(connection) as captured_queries:
  361. with connection.schema_editor(atomic=False) as editor:
  362. operation.database_backwards(
  363. self.app_label, editor, new_state, project_state
  364. )
  365. self.assertEqual(len(captured_queries), 1)
  366. self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
  367. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  368. class RemoveCollationTests(PostgreSQLTestCase):
  369. app_label = "test_allow_remove_collation"
  370. @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
  371. def test_no_allow_migrate(self):
  372. operation = RemoveCollation("C_test", locale="C")
  373. project_state = ProjectState()
  374. new_state = project_state.clone()
  375. # Don't create a collation.
  376. with CaptureQueriesContext(connection) as captured_queries:
  377. with connection.schema_editor(atomic=False) as editor:
  378. operation.database_forwards(
  379. self.app_label, editor, project_state, new_state
  380. )
  381. self.assertEqual(len(captured_queries), 0)
  382. # Reversal.
  383. with CaptureQueriesContext(connection) as captured_queries:
  384. with connection.schema_editor(atomic=False) as editor:
  385. operation.database_backwards(
  386. self.app_label, editor, new_state, project_state
  387. )
  388. self.assertEqual(len(captured_queries), 0)
  389. def test_remove(self):
  390. operation = CreateCollation("C_test", locale="C")
  391. project_state = ProjectState()
  392. new_state = project_state.clone()
  393. with connection.schema_editor(atomic=False) as editor:
  394. operation.database_forwards(
  395. self.app_label, editor, project_state, new_state
  396. )
  397. operation = RemoveCollation("C_test", locale="C")
  398. self.assertEqual(operation.migration_name_fragment, "remove_collation_c_test")
  399. self.assertEqual(operation.describe(), "Remove collation C_test")
  400. self.assertEqual(operation.formatted_description(), "- Remove collation C_test")
  401. project_state = ProjectState()
  402. new_state = project_state.clone()
  403. # Remove a collation.
  404. with CaptureQueriesContext(connection) as captured_queries:
  405. with connection.schema_editor(atomic=False) as editor:
  406. operation.database_forwards(
  407. self.app_label, editor, project_state, new_state
  408. )
  409. self.assertEqual(len(captured_queries), 1)
  410. self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
  411. # Removing a nonexistent collation raises an exception.
  412. with self.assertRaisesMessage(ProgrammingError, "does not exist"):
  413. with connection.schema_editor(atomic=True) as editor:
  414. operation.database_forwards(
  415. self.app_label, editor, project_state, new_state
  416. )
  417. # Reversal.
  418. with CaptureQueriesContext(connection) as captured_queries:
  419. with connection.schema_editor(atomic=False) as editor:
  420. operation.database_backwards(
  421. self.app_label, editor, new_state, project_state
  422. )
  423. self.assertEqual(len(captured_queries), 1)
  424. self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
  425. # Deconstruction.
  426. name, args, kwargs = operation.deconstruct()
  427. self.assertEqual(name, "RemoveCollation")
  428. self.assertEqual(args, [])
  429. self.assertEqual(kwargs, {"name": "C_test", "locale": "C"})
  430. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  431. @modify_settings(INSTALLED_APPS={"append": "migrations"})
  432. class AddConstraintNotValidTests(OperationTestBase):
  433. app_label = "test_add_constraint_not_valid"
  434. def test_non_check_constraint_not_supported(self):
  435. constraint = UniqueConstraint(fields=["pink"], name="pony_pink_uniq")
  436. msg = "AddConstraintNotValid.constraint must be a check constraint."
  437. with self.assertRaisesMessage(TypeError, msg):
  438. AddConstraintNotValid(model_name="pony", constraint=constraint)
  439. def test_add(self):
  440. table_name = f"{self.app_label}_pony"
  441. constraint_name = "pony_pink_gte_check"
  442. constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name)
  443. operation = AddConstraintNotValid("Pony", constraint=constraint)
  444. project_state, new_state = self.make_test_state(self.app_label, operation)
  445. self.assertEqual(
  446. operation.describe(),
  447. f"Create not valid constraint {constraint_name} on model Pony",
  448. )
  449. self.assertEqual(
  450. operation.formatted_description(),
  451. f"+ Create not valid constraint {constraint_name} on model Pony",
  452. )
  453. self.assertEqual(
  454. operation.migration_name_fragment,
  455. f"pony_{constraint_name}_not_valid",
  456. )
  457. self.assertEqual(
  458. len(new_state.models[self.app_label, "pony"].options["constraints"]),
  459. 1,
  460. )
  461. self.assertConstraintNotExists(table_name, constraint_name)
  462. Pony = new_state.apps.get_model(self.app_label, "Pony")
  463. self.assertEqual(len(Pony._meta.constraints), 1)
  464. Pony.objects.create(pink=2, weight=1.0)
  465. # Add constraint.
  466. with connection.schema_editor(atomic=True) as editor:
  467. operation.database_forwards(
  468. self.app_label, editor, project_state, new_state
  469. )
  470. msg = f'check constraint "{constraint_name}"'
  471. with self.assertRaisesMessage(IntegrityError, msg), transaction.atomic():
  472. Pony.objects.create(pink=3, weight=1.0)
  473. self.assertConstraintExists(table_name, constraint_name)
  474. # Reversal.
  475. with connection.schema_editor(atomic=True) as editor:
  476. operation.database_backwards(
  477. self.app_label, editor, project_state, new_state
  478. )
  479. self.assertConstraintNotExists(table_name, constraint_name)
  480. Pony.objects.create(pink=3, weight=1.0)
  481. # Deconstruction.
  482. name, args, kwargs = operation.deconstruct()
  483. self.assertEqual(name, "AddConstraintNotValid")
  484. self.assertEqual(args, [])
  485. self.assertEqual(kwargs, {"model_name": "Pony", "constraint": constraint})
  486. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
  487. @modify_settings(INSTALLED_APPS={"append": "migrations"})
  488. class ValidateConstraintTests(OperationTestBase):
  489. app_label = "test_validate_constraint"
  490. def test_validate(self):
  491. constraint_name = "pony_pink_gte_check"
  492. constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name)
  493. operation = AddConstraintNotValid("Pony", constraint=constraint)
  494. project_state, new_state = self.make_test_state(self.app_label, operation)
  495. Pony = new_state.apps.get_model(self.app_label, "Pony")
  496. obj = Pony.objects.create(pink=2, weight=1.0)
  497. # Add constraint.
  498. with connection.schema_editor(atomic=True) as editor:
  499. operation.database_forwards(
  500. self.app_label, editor, project_state, new_state
  501. )
  502. project_state = new_state
  503. new_state = new_state.clone()
  504. operation = ValidateConstraint("Pony", name=constraint_name)
  505. operation.state_forwards(self.app_label, new_state)
  506. self.assertEqual(
  507. operation.describe(),
  508. f"Validate constraint {constraint_name} on model Pony",
  509. )
  510. self.assertEqual(
  511. operation.formatted_description(),
  512. f"~ Validate constraint {constraint_name} on model Pony",
  513. )
  514. self.assertEqual(
  515. operation.migration_name_fragment,
  516. f"pony_validate_{constraint_name}",
  517. )
  518. # Validate constraint.
  519. with connection.schema_editor(atomic=True) as editor:
  520. msg = f'check constraint "{constraint_name}"'
  521. with self.assertRaisesMessage(IntegrityError, msg):
  522. operation.database_forwards(
  523. self.app_label, editor, project_state, new_state
  524. )
  525. obj.pink = 5
  526. obj.save()
  527. with connection.schema_editor(atomic=True) as editor:
  528. operation.database_forwards(
  529. self.app_label, editor, project_state, new_state
  530. )
  531. # Reversal is a noop.
  532. with connection.schema_editor() as editor:
  533. with self.assertNumQueries(0):
  534. operation.database_backwards(
  535. self.app_label, editor, new_state, project_state
  536. )
  537. # Deconstruction.
  538. name, args, kwargs = operation.deconstruct()
  539. self.assertEqual(name, "ValidateConstraint")
  540. self.assertEqual(args, [])
  541. self.assertEqual(kwargs, {"model_name": "Pony", "name": constraint_name})