test_operations.py 218 KB


  1. from django.core.exceptions import FieldDoesNotExist
  2. from django.db import IntegrityError, connection, migrations, models, transaction
  3. from django.db.migrations.migration import Migration
  4. from django.db.migrations.operations.fields import FieldOperation
  5. from django.db.migrations.state import ModelState, ProjectState
  6. from django.db.models.functions import Abs
  7. from django.db.transaction import atomic
  8. from django.test import SimpleTestCase, override_settings, skipUnlessDBFeature
  9. from django.test.utils import CaptureQueriesContext
  10. from .models import FoodManager, FoodQuerySet, UnicodeModel
  11. from .test_base import OperationTestBase
  12. class Mixin:
  13. pass
  14. class OperationTests(OperationTestBase):
  15. """
  16. Tests running the operations and making sure they do what they say they do.
  17. Each test looks at their state changing, and then their database operation -
  18. both forwards and backwards.
  19. """
  20. def test_create_model(self):
  21. """
  22. Tests the CreateModel operation.
  23. Most other tests use this operation as part of setup, so check failures
  24. here first.
  25. """
  26. operation = migrations.CreateModel(
  27. "Pony",
  28. [
  29. ("id", models.AutoField(primary_key=True)),
  30. ("pink", models.IntegerField(default=1)),
  31. ],
  32. )
  33. self.assertEqual(operation.describe(), "Create model Pony")
  34. self.assertEqual(operation.migration_name_fragment, "pony")
  35. # Test the state alteration
  36. project_state = ProjectState()
  37. new_state = project_state.clone()
  38. operation.state_forwards("test_crmo", new_state)
  39. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  40. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  41. # Test the database alteration
  42. self.assertTableNotExists("test_crmo_pony")
  43. with connection.schema_editor() as editor:
  44. operation.database_forwards("test_crmo", editor, project_state, new_state)
  45. self.assertTableExists("test_crmo_pony")
  46. # And test reversal
  47. with connection.schema_editor() as editor:
  48. operation.database_backwards("test_crmo", editor, new_state, project_state)
  49. self.assertTableNotExists("test_crmo_pony")
  50. # And deconstruction
  51. definition = operation.deconstruct()
  52. self.assertEqual(definition[0], "CreateModel")
  53. self.assertEqual(definition[1], [])
  54. self.assertEqual(sorted(definition[2]), ["fields", "name"])
  55. # And default manager not in set
  56. operation = migrations.CreateModel(
  57. "Foo", fields=[], managers=[("objects", models.Manager())]
  58. )
  59. definition = operation.deconstruct()
  60. self.assertNotIn("managers", definition[2])
  61. def test_create_model_with_duplicate_field_name(self):
  62. with self.assertRaisesMessage(
  63. ValueError, "Found duplicate value pink in CreateModel fields argument."
  64. ):
  65. migrations.CreateModel(
  66. "Pony",
  67. [
  68. ("id", models.AutoField(primary_key=True)),
  69. ("pink", models.TextField()),
  70. ("pink", models.IntegerField(default=1)),
  71. ],
  72. )
  73. def test_create_model_with_duplicate_base(self):
  74. message = "Found duplicate value test_crmo.pony in CreateModel bases argument."
  75. with self.assertRaisesMessage(ValueError, message):
  76. migrations.CreateModel(
  77. "Pony",
  78. fields=[],
  79. bases=(
  80. "test_crmo.Pony",
  81. "test_crmo.Pony",
  82. ),
  83. )
  84. with self.assertRaisesMessage(ValueError, message):
  85. migrations.CreateModel(
  86. "Pony",
  87. fields=[],
  88. bases=(
  89. "test_crmo.Pony",
  90. "test_crmo.pony",
  91. ),
  92. )
  93. message = (
  94. "Found duplicate value migrations.unicodemodel in CreateModel bases "
  95. "argument."
  96. )
  97. with self.assertRaisesMessage(ValueError, message):
  98. migrations.CreateModel(
  99. "Pony",
  100. fields=[],
  101. bases=(
  102. UnicodeModel,
  103. UnicodeModel,
  104. ),
  105. )
  106. with self.assertRaisesMessage(ValueError, message):
  107. migrations.CreateModel(
  108. "Pony",
  109. fields=[],
  110. bases=(
  111. UnicodeModel,
  112. "migrations.unicodemodel",
  113. ),
  114. )
  115. with self.assertRaisesMessage(ValueError, message):
  116. migrations.CreateModel(
  117. "Pony",
  118. fields=[],
  119. bases=(
  120. UnicodeModel,
  121. "migrations.UnicodeModel",
  122. ),
  123. )
  124. message = (
  125. "Found duplicate value <class 'django.db.models.base.Model'> in "
  126. "CreateModel bases argument."
  127. )
  128. with self.assertRaisesMessage(ValueError, message):
  129. migrations.CreateModel(
  130. "Pony",
  131. fields=[],
  132. bases=(
  133. models.Model,
  134. models.Model,
  135. ),
  136. )
  137. message = (
  138. "Found duplicate value <class 'migrations.test_operations.Mixin'> in "
  139. "CreateModel bases argument."
  140. )
  141. with self.assertRaisesMessage(ValueError, message):
  142. migrations.CreateModel(
  143. "Pony",
  144. fields=[],
  145. bases=(
  146. Mixin,
  147. Mixin,
  148. ),
  149. )
  150. def test_create_model_with_duplicate_manager_name(self):
  151. with self.assertRaisesMessage(
  152. ValueError,
  153. "Found duplicate value objects in CreateModel managers argument.",
  154. ):
  155. migrations.CreateModel(
  156. "Pony",
  157. fields=[],
  158. managers=[
  159. ("objects", models.Manager()),
  160. ("objects", models.Manager()),
  161. ],
  162. )
  163. def test_create_model_with_unique_after(self):
  164. """
  165. Tests the CreateModel operation directly followed by an
  166. AlterUniqueTogether (bug #22844 - sqlite remake issues)
  167. """
  168. operation1 = migrations.CreateModel(
  169. "Pony",
  170. [
  171. ("id", models.AutoField(primary_key=True)),
  172. ("pink", models.IntegerField(default=1)),
  173. ],
  174. )
  175. operation2 = migrations.CreateModel(
  176. "Rider",
  177. [
  178. ("id", models.AutoField(primary_key=True)),
  179. ("number", models.IntegerField(default=1)),
  180. ("pony", models.ForeignKey("test_crmoua.Pony", models.CASCADE)),
  181. ],
  182. )
  183. operation3 = migrations.AlterUniqueTogether(
  184. "Rider",
  185. [
  186. ("number", "pony"),
  187. ],
  188. )
  189. # Test the database alteration
  190. project_state = ProjectState()
  191. self.assertTableNotExists("test_crmoua_pony")
  192. self.assertTableNotExists("test_crmoua_rider")
  193. with connection.schema_editor() as editor:
  194. new_state = project_state.clone()
  195. operation1.state_forwards("test_crmoua", new_state)
  196. operation1.database_forwards(
  197. "test_crmoua", editor, project_state, new_state
  198. )
  199. project_state, new_state = new_state, new_state.clone()
  200. operation2.state_forwards("test_crmoua", new_state)
  201. operation2.database_forwards(
  202. "test_crmoua", editor, project_state, new_state
  203. )
  204. project_state, new_state = new_state, new_state.clone()
  205. operation3.state_forwards("test_crmoua", new_state)
  206. operation3.database_forwards(
  207. "test_crmoua", editor, project_state, new_state
  208. )
  209. self.assertTableExists("test_crmoua_pony")
  210. self.assertTableExists("test_crmoua_rider")
  211. def test_create_model_m2m(self):
  212. """
  213. Test the creation of a model with a ManyToMany field and the
  214. auto-created "through" model.
  215. """
  216. project_state = self.set_up_test_model("test_crmomm")
  217. operation = migrations.CreateModel(
  218. "Stable",
  219. [
  220. ("id", models.AutoField(primary_key=True)),
  221. ("ponies", models.ManyToManyField("Pony", related_name="stables")),
  222. ],
  223. )
  224. # Test the state alteration
  225. new_state = project_state.clone()
  226. operation.state_forwards("test_crmomm", new_state)
  227. # Test the database alteration
  228. self.assertTableNotExists("test_crmomm_stable_ponies")
  229. with connection.schema_editor() as editor:
  230. operation.database_forwards("test_crmomm", editor, project_state, new_state)
  231. self.assertTableExists("test_crmomm_stable")
  232. self.assertTableExists("test_crmomm_stable_ponies")
  233. self.assertColumnNotExists("test_crmomm_stable", "ponies")
  234. # Make sure the M2M field actually works
  235. with atomic():
  236. Pony = new_state.apps.get_model("test_crmomm", "Pony")
  237. Stable = new_state.apps.get_model("test_crmomm", "Stable")
  238. stable = Stable.objects.create()
  239. p1 = Pony.objects.create(pink=False, weight=4.55)
  240. p2 = Pony.objects.create(pink=True, weight=5.43)
  241. stable.ponies.add(p1, p2)
  242. self.assertEqual(stable.ponies.count(), 2)
  243. stable.ponies.all().delete()
  244. # And test reversal
  245. with connection.schema_editor() as editor:
  246. operation.database_backwards(
  247. "test_crmomm", editor, new_state, project_state
  248. )
  249. self.assertTableNotExists("test_crmomm_stable")
  250. self.assertTableNotExists("test_crmomm_stable_ponies")
  251. @skipUnlessDBFeature("supports_collation_on_charfield", "supports_foreign_keys")
  252. def test_create_fk_models_to_pk_field_db_collation(self):
  253. """Creation of models with a FK to a PK with db_collation."""
  254. collation = connection.features.test_collations.get("non_default")
  255. if not collation:
  256. self.skipTest("Language collations are not supported.")
  257. app_label = "test_cfkmtopkfdbc"
  258. operations = [
  259. migrations.CreateModel(
  260. "Pony",
  261. [
  262. (
  263. "id",
  264. models.CharField(
  265. primary_key=True,
  266. max_length=10,
  267. db_collation=collation,
  268. ),
  269. ),
  270. ],
  271. )
  272. ]
  273. project_state = self.apply_operations(app_label, ProjectState(), operations)
  274. # ForeignKey.
  275. new_state = project_state.clone()
  276. operation = migrations.CreateModel(
  277. "Rider",
  278. [
  279. ("id", models.AutoField(primary_key=True)),
  280. ("pony", models.ForeignKey("Pony", models.CASCADE)),
  281. ],
  282. )
  283. operation.state_forwards(app_label, new_state)
  284. with connection.schema_editor() as editor:
  285. operation.database_forwards(app_label, editor, project_state, new_state)
  286. self.assertColumnCollation(f"{app_label}_rider", "pony_id", collation)
  287. # Reversal.
  288. with connection.schema_editor() as editor:
  289. operation.database_backwards(app_label, editor, new_state, project_state)
  290. # OneToOneField.
  291. new_state = project_state.clone()
  292. operation = migrations.CreateModel(
  293. "ShetlandPony",
  294. [
  295. (
  296. "pony",
  297. models.OneToOneField("Pony", models.CASCADE, primary_key=True),
  298. ),
  299. ("cuteness", models.IntegerField(default=1)),
  300. ],
  301. )
  302. operation.state_forwards(app_label, new_state)
  303. with connection.schema_editor() as editor:
  304. operation.database_forwards(app_label, editor, project_state, new_state)
  305. self.assertColumnCollation(f"{app_label}_shetlandpony", "pony_id", collation)
  306. # Reversal.
  307. with connection.schema_editor() as editor:
  308. operation.database_backwards(app_label, editor, new_state, project_state)
  309. def test_create_model_inheritance(self):
  310. """
  311. Tests the CreateModel operation on a multi-table inheritance setup.
  312. """
  313. project_state = self.set_up_test_model("test_crmoih")
  314. # Test the state alteration
  315. operation = migrations.CreateModel(
  316. "ShetlandPony",
  317. [
  318. (
  319. "pony_ptr",
  320. models.OneToOneField(
  321. "test_crmoih.Pony",
  322. models.CASCADE,
  323. auto_created=True,
  324. primary_key=True,
  325. to_field="id",
  326. serialize=False,
  327. ),
  328. ),
  329. ("cuteness", models.IntegerField(default=1)),
  330. ],
  331. )
  332. new_state = project_state.clone()
  333. operation.state_forwards("test_crmoih", new_state)
  334. self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
  335. # Test the database alteration
  336. self.assertTableNotExists("test_crmoih_shetlandpony")
  337. with connection.schema_editor() as editor:
  338. operation.database_forwards("test_crmoih", editor, project_state, new_state)
  339. self.assertTableExists("test_crmoih_shetlandpony")
  340. # And test reversal
  341. with connection.schema_editor() as editor:
  342. operation.database_backwards(
  343. "test_crmoih", editor, new_state, project_state
  344. )
  345. self.assertTableNotExists("test_crmoih_shetlandpony")
  346. def test_create_proxy_model(self):
  347. """
  348. CreateModel ignores proxy models.
  349. """
  350. project_state = self.set_up_test_model("test_crprmo")
  351. # Test the state alteration
  352. operation = migrations.CreateModel(
  353. "ProxyPony",
  354. [],
  355. options={"proxy": True},
  356. bases=("test_crprmo.Pony",),
  357. )
  358. self.assertEqual(operation.describe(), "Create proxy model ProxyPony")
  359. new_state = project_state.clone()
  360. operation.state_forwards("test_crprmo", new_state)
  361. self.assertIn(("test_crprmo", "proxypony"), new_state.models)
  362. # Test the database alteration
  363. self.assertTableNotExists("test_crprmo_proxypony")
  364. self.assertTableExists("test_crprmo_pony")
  365. with connection.schema_editor() as editor:
  366. operation.database_forwards("test_crprmo", editor, project_state, new_state)
  367. self.assertTableNotExists("test_crprmo_proxypony")
  368. self.assertTableExists("test_crprmo_pony")
  369. # And test reversal
  370. with connection.schema_editor() as editor:
  371. operation.database_backwards(
  372. "test_crprmo", editor, new_state, project_state
  373. )
  374. self.assertTableNotExists("test_crprmo_proxypony")
  375. self.assertTableExists("test_crprmo_pony")
  376. # And deconstruction
  377. definition = operation.deconstruct()
  378. self.assertEqual(definition[0], "CreateModel")
  379. self.assertEqual(definition[1], [])
  380. self.assertEqual(sorted(definition[2]), ["bases", "fields", "name", "options"])
  381. def test_create_unmanaged_model(self):
  382. """
  383. CreateModel ignores unmanaged models.
  384. """
  385. project_state = self.set_up_test_model("test_crummo")
  386. # Test the state alteration
  387. operation = migrations.CreateModel(
  388. "UnmanagedPony",
  389. [],
  390. options={"proxy": True},
  391. bases=("test_crummo.Pony",),
  392. )
  393. self.assertEqual(operation.describe(), "Create proxy model UnmanagedPony")
  394. new_state = project_state.clone()
  395. operation.state_forwards("test_crummo", new_state)
  396. self.assertIn(("test_crummo", "unmanagedpony"), new_state.models)
  397. # Test the database alteration
  398. self.assertTableNotExists("test_crummo_unmanagedpony")
  399. self.assertTableExists("test_crummo_pony")
  400. with connection.schema_editor() as editor:
  401. operation.database_forwards("test_crummo", editor, project_state, new_state)
  402. self.assertTableNotExists("test_crummo_unmanagedpony")
  403. self.assertTableExists("test_crummo_pony")
  404. # And test reversal
  405. with connection.schema_editor() as editor:
  406. operation.database_backwards(
  407. "test_crummo", editor, new_state, project_state
  408. )
  409. self.assertTableNotExists("test_crummo_unmanagedpony")
  410. self.assertTableExists("test_crummo_pony")
  411. @skipUnlessDBFeature("supports_table_check_constraints")
  412. def test_create_model_with_constraint(self):
  413. where = models.Q(pink__gt=2)
  414. check_constraint = models.CheckConstraint(
  415. check=where, name="test_constraint_pony_pink_gt_2"
  416. )
  417. operation = migrations.CreateModel(
  418. "Pony",
  419. [
  420. ("id", models.AutoField(primary_key=True)),
  421. ("pink", models.IntegerField(default=3)),
  422. ],
  423. options={"constraints": [check_constraint]},
  424. )
  425. # Test the state alteration
  426. project_state = ProjectState()
  427. new_state = project_state.clone()
  428. operation.state_forwards("test_crmo", new_state)
  429. self.assertEqual(
  430. len(new_state.models["test_crmo", "pony"].options["constraints"]), 1
  431. )
  432. # Test database alteration
  433. self.assertTableNotExists("test_crmo_pony")
  434. with connection.schema_editor() as editor:
  435. operation.database_forwards("test_crmo", editor, project_state, new_state)
  436. self.assertTableExists("test_crmo_pony")
  437. with connection.cursor() as cursor:
  438. with self.assertRaises(IntegrityError):
  439. cursor.execute("INSERT INTO test_crmo_pony (id, pink) VALUES (1, 1)")
  440. # Test reversal
  441. with connection.schema_editor() as editor:
  442. operation.database_backwards("test_crmo", editor, new_state, project_state)
  443. self.assertTableNotExists("test_crmo_pony")
  444. # Test deconstruction
  445. definition = operation.deconstruct()
  446. self.assertEqual(definition[0], "CreateModel")
  447. self.assertEqual(definition[1], [])
  448. self.assertEqual(definition[2]["options"]["constraints"], [check_constraint])
  449. @skipUnlessDBFeature("supports_table_check_constraints")
  450. def test_create_model_with_boolean_expression_in_check_constraint(self):
  451. app_label = "test_crmobechc"
  452. rawsql_constraint = models.CheckConstraint(
  453. check=models.expressions.RawSQL(
  454. "price < %s", (1000,), output_field=models.BooleanField()
  455. ),
  456. name=f"{app_label}_price_lt_1000_raw",
  457. )
  458. wrapper_constraint = models.CheckConstraint(
  459. check=models.expressions.ExpressionWrapper(
  460. models.Q(price__gt=500) | models.Q(price__lt=500),
  461. output_field=models.BooleanField(),
  462. ),
  463. name=f"{app_label}_price_neq_500_wrap",
  464. )
  465. operation = migrations.CreateModel(
  466. "Product",
  467. [
  468. ("id", models.AutoField(primary_key=True)),
  469. ("price", models.IntegerField(null=True)),
  470. ],
  471. options={"constraints": [rawsql_constraint, wrapper_constraint]},
  472. )
  473. project_state = ProjectState()
  474. new_state = project_state.clone()
  475. operation.state_forwards(app_label, new_state)
  476. # Add table.
  477. self.assertTableNotExists(app_label)
  478. with connection.schema_editor() as editor:
  479. operation.database_forwards(app_label, editor, project_state, new_state)
  480. self.assertTableExists(f"{app_label}_product")
  481. insert_sql = f"INSERT INTO {app_label}_product (id, price) VALUES (%d, %d)"
  482. with connection.cursor() as cursor:
  483. with self.assertRaises(IntegrityError):
  484. cursor.execute(insert_sql % (1, 1000))
  485. cursor.execute(insert_sql % (1, 999))
  486. with self.assertRaises(IntegrityError):
  487. cursor.execute(insert_sql % (2, 500))
  488. cursor.execute(insert_sql % (2, 499))
  489. def test_create_model_with_partial_unique_constraint(self):
  490. partial_unique_constraint = models.UniqueConstraint(
  491. fields=["pink"],
  492. condition=models.Q(weight__gt=5),
  493. name="test_constraint_pony_pink_for_weight_gt_5_uniq",
  494. )
  495. operation = migrations.CreateModel(
  496. "Pony",
  497. [
  498. ("id", models.AutoField(primary_key=True)),
  499. ("pink", models.IntegerField(default=3)),
  500. ("weight", models.FloatField()),
  501. ],
  502. options={"constraints": [partial_unique_constraint]},
  503. )
  504. # Test the state alteration
  505. project_state = ProjectState()
  506. new_state = project_state.clone()
  507. operation.state_forwards("test_crmo", new_state)
  508. self.assertEqual(
  509. len(new_state.models["test_crmo", "pony"].options["constraints"]), 1
  510. )
  511. # Test database alteration
  512. self.assertTableNotExists("test_crmo_pony")
  513. with connection.schema_editor() as editor:
  514. operation.database_forwards("test_crmo", editor, project_state, new_state)
  515. self.assertTableExists("test_crmo_pony")
  516. # Test constraint works
  517. Pony = new_state.apps.get_model("test_crmo", "Pony")
  518. Pony.objects.create(pink=1, weight=4.0)
  519. Pony.objects.create(pink=1, weight=4.0)
  520. Pony.objects.create(pink=1, weight=6.0)
  521. if connection.features.supports_partial_indexes:
  522. with self.assertRaises(IntegrityError):
  523. Pony.objects.create(pink=1, weight=7.0)
  524. else:
  525. Pony.objects.create(pink=1, weight=7.0)
  526. # Test reversal
  527. with connection.schema_editor() as editor:
  528. operation.database_backwards("test_crmo", editor, new_state, project_state)
  529. self.assertTableNotExists("test_crmo_pony")
  530. # Test deconstruction
  531. definition = operation.deconstruct()
  532. self.assertEqual(definition[0], "CreateModel")
  533. self.assertEqual(definition[1], [])
  534. self.assertEqual(
  535. definition[2]["options"]["constraints"], [partial_unique_constraint]
  536. )
  537. def test_create_model_with_deferred_unique_constraint(self):
  538. deferred_unique_constraint = models.UniqueConstraint(
  539. fields=["pink"],
  540. name="deferrable_pink_constraint",
  541. deferrable=models.Deferrable.DEFERRED,
  542. )
  543. operation = migrations.CreateModel(
  544. "Pony",
  545. [
  546. ("id", models.AutoField(primary_key=True)),
  547. ("pink", models.IntegerField(default=3)),
  548. ],
  549. options={"constraints": [deferred_unique_constraint]},
  550. )
  551. project_state = ProjectState()
  552. new_state = project_state.clone()
  553. operation.state_forwards("test_crmo", new_state)
  554. self.assertEqual(
  555. len(new_state.models["test_crmo", "pony"].options["constraints"]), 1
  556. )
  557. self.assertTableNotExists("test_crmo_pony")
  558. # Create table.
  559. with connection.schema_editor() as editor:
  560. operation.database_forwards("test_crmo", editor, project_state, new_state)
  561. self.assertTableExists("test_crmo_pony")
  562. Pony = new_state.apps.get_model("test_crmo", "Pony")
  563. Pony.objects.create(pink=1)
  564. if connection.features.supports_deferrable_unique_constraints:
  565. # Unique constraint is deferred.
  566. with transaction.atomic():
  567. obj = Pony.objects.create(pink=1)
  568. obj.pink = 2
  569. obj.save()
  570. # Constraint behavior can be changed with SET CONSTRAINTS.
  571. with self.assertRaises(IntegrityError):
  572. with transaction.atomic(), connection.cursor() as cursor:
  573. quoted_name = connection.ops.quote_name(
  574. deferred_unique_constraint.name
  575. )
  576. cursor.execute("SET CONSTRAINTS %s IMMEDIATE" % quoted_name)
  577. obj = Pony.objects.create(pink=1)
  578. obj.pink = 3
  579. obj.save()
  580. else:
  581. Pony.objects.create(pink=1)
  582. # Reversal.
  583. with connection.schema_editor() as editor:
  584. operation.database_backwards("test_crmo", editor, new_state, project_state)
  585. self.assertTableNotExists("test_crmo_pony")
  586. # Deconstruction.
  587. definition = operation.deconstruct()
  588. self.assertEqual(definition[0], "CreateModel")
  589. self.assertEqual(definition[1], [])
  590. self.assertEqual(
  591. definition[2]["options"]["constraints"],
  592. [deferred_unique_constraint],
  593. )
  594. @skipUnlessDBFeature("supports_covering_indexes")
  595. def test_create_model_with_covering_unique_constraint(self):
  596. covering_unique_constraint = models.UniqueConstraint(
  597. fields=["pink"],
  598. include=["weight"],
  599. name="test_constraint_pony_pink_covering_weight",
  600. )
  601. operation = migrations.CreateModel(
  602. "Pony",
  603. [
  604. ("id", models.AutoField(primary_key=True)),
  605. ("pink", models.IntegerField(default=3)),
  606. ("weight", models.FloatField()),
  607. ],
  608. options={"constraints": [covering_unique_constraint]},
  609. )
  610. project_state = ProjectState()
  611. new_state = project_state.clone()
  612. operation.state_forwards("test_crmo", new_state)
  613. self.assertEqual(
  614. len(new_state.models["test_crmo", "pony"].options["constraints"]), 1
  615. )
  616. self.assertTableNotExists("test_crmo_pony")
  617. # Create table.
  618. with connection.schema_editor() as editor:
  619. operation.database_forwards("test_crmo", editor, project_state, new_state)
  620. self.assertTableExists("test_crmo_pony")
  621. Pony = new_state.apps.get_model("test_crmo", "Pony")
  622. Pony.objects.create(pink=1, weight=4.0)
  623. with self.assertRaises(IntegrityError):
  624. Pony.objects.create(pink=1, weight=7.0)
  625. # Reversal.
  626. with connection.schema_editor() as editor:
  627. operation.database_backwards("test_crmo", editor, new_state, project_state)
  628. self.assertTableNotExists("test_crmo_pony")
  629. # Deconstruction.
  630. definition = operation.deconstruct()
  631. self.assertEqual(definition[0], "CreateModel")
  632. self.assertEqual(definition[1], [])
  633. self.assertEqual(
  634. definition[2]["options"]["constraints"],
  635. [covering_unique_constraint],
  636. )
  637. def test_create_model_managers(self):
  638. """
  639. The managers on a model are set.
  640. """
  641. project_state = self.set_up_test_model("test_cmoma")
  642. # Test the state alteration
  643. operation = migrations.CreateModel(
  644. "Food",
  645. fields=[
  646. ("id", models.AutoField(primary_key=True)),
  647. ],
  648. managers=[
  649. ("food_qs", FoodQuerySet.as_manager()),
  650. ("food_mgr", FoodManager("a", "b")),
  651. ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)),
  652. ],
  653. )
  654. self.assertEqual(operation.describe(), "Create model Food")
  655. new_state = project_state.clone()
  656. operation.state_forwards("test_cmoma", new_state)
  657. self.assertIn(("test_cmoma", "food"), new_state.models)
  658. managers = new_state.models["test_cmoma", "food"].managers
  659. self.assertEqual(managers[0][0], "food_qs")
  660. self.assertIsInstance(managers[0][1], models.Manager)
  661. self.assertEqual(managers[1][0], "food_mgr")
  662. self.assertIsInstance(managers[1][1], FoodManager)
  663. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  664. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  665. self.assertIsInstance(managers[2][1], FoodManager)
  666. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  667. def test_delete_model(self):
  668. """
  669. Tests the DeleteModel operation.
  670. """
  671. project_state = self.set_up_test_model("test_dlmo")
  672. # Test the state alteration
  673. operation = migrations.DeleteModel("Pony")
  674. self.assertEqual(operation.describe(), "Delete model Pony")
  675. self.assertEqual(operation.migration_name_fragment, "delete_pony")
  676. new_state = project_state.clone()
  677. operation.state_forwards("test_dlmo", new_state)
  678. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  679. # Test the database alteration
  680. self.assertTableExists("test_dlmo_pony")
  681. with connection.schema_editor() as editor:
  682. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  683. self.assertTableNotExists("test_dlmo_pony")
  684. # And test reversal
  685. with connection.schema_editor() as editor:
  686. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  687. self.assertTableExists("test_dlmo_pony")
  688. # And deconstruction
  689. definition = operation.deconstruct()
  690. self.assertEqual(definition[0], "DeleteModel")
  691. self.assertEqual(definition[1], [])
  692. self.assertEqual(list(definition[2]), ["name"])
  693. def test_delete_proxy_model(self):
  694. """
  695. Tests the DeleteModel operation ignores proxy models.
  696. """
  697. project_state = self.set_up_test_model("test_dlprmo", proxy_model=True)
  698. # Test the state alteration
  699. operation = migrations.DeleteModel("ProxyPony")
  700. new_state = project_state.clone()
  701. operation.state_forwards("test_dlprmo", new_state)
  702. self.assertIn(("test_dlprmo", "proxypony"), project_state.models)
  703. self.assertNotIn(("test_dlprmo", "proxypony"), new_state.models)
  704. # Test the database alteration
  705. self.assertTableExists("test_dlprmo_pony")
  706. self.assertTableNotExists("test_dlprmo_proxypony")
  707. with connection.schema_editor() as editor:
  708. operation.database_forwards("test_dlprmo", editor, project_state, new_state)
  709. self.assertTableExists("test_dlprmo_pony")
  710. self.assertTableNotExists("test_dlprmo_proxypony")
  711. # And test reversal
  712. with connection.schema_editor() as editor:
  713. operation.database_backwards(
  714. "test_dlprmo", editor, new_state, project_state
  715. )
  716. self.assertTableExists("test_dlprmo_pony")
  717. self.assertTableNotExists("test_dlprmo_proxypony")
  718. def test_delete_mti_model(self):
  719. project_state = self.set_up_test_model("test_dlmtimo", mti_model=True)
  720. # Test the state alteration
  721. operation = migrations.DeleteModel("ShetlandPony")
  722. new_state = project_state.clone()
  723. operation.state_forwards("test_dlmtimo", new_state)
  724. self.assertIn(("test_dlmtimo", "shetlandpony"), project_state.models)
  725. self.assertNotIn(("test_dlmtimo", "shetlandpony"), new_state.models)
  726. # Test the database alteration
  727. self.assertTableExists("test_dlmtimo_pony")
  728. self.assertTableExists("test_dlmtimo_shetlandpony")
  729. self.assertColumnExists("test_dlmtimo_shetlandpony", "pony_ptr_id")
  730. with connection.schema_editor() as editor:
  731. operation.database_forwards(
  732. "test_dlmtimo", editor, project_state, new_state
  733. )
  734. self.assertTableExists("test_dlmtimo_pony")
  735. self.assertTableNotExists("test_dlmtimo_shetlandpony")
  736. # And test reversal
  737. with connection.schema_editor() as editor:
  738. operation.database_backwards(
  739. "test_dlmtimo", editor, new_state, project_state
  740. )
  741. self.assertTableExists("test_dlmtimo_pony")
  742. self.assertTableExists("test_dlmtimo_shetlandpony")
  743. self.assertColumnExists("test_dlmtimo_shetlandpony", "pony_ptr_id")
  744. def test_rename_model(self):
  745. """
  746. Tests the RenameModel operation.
  747. """
  748. project_state = self.set_up_test_model("test_rnmo", related_model=True)
  749. # Test the state alteration
  750. operation = migrations.RenameModel("Pony", "Horse")
  751. self.assertEqual(operation.describe(), "Rename model Pony to Horse")
  752. self.assertEqual(operation.migration_name_fragment, "rename_pony_horse")
  753. # Test initial state and database
  754. self.assertIn(("test_rnmo", "pony"), project_state.models)
  755. self.assertNotIn(("test_rnmo", "horse"), project_state.models)
  756. self.assertTableExists("test_rnmo_pony")
  757. self.assertTableNotExists("test_rnmo_horse")
  758. if connection.features.supports_foreign_keys:
  759. self.assertFKExists(
  760. "test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")
  761. )
  762. self.assertFKNotExists(
  763. "test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")
  764. )
  765. # Migrate forwards
  766. new_state = project_state.clone()
  767. atomic_rename = connection.features.supports_atomic_references_rename
  768. new_state = self.apply_operations(
  769. "test_rnmo", new_state, [operation], atomic=atomic_rename
  770. )
  771. # Test new state and database
  772. self.assertNotIn(("test_rnmo", "pony"), new_state.models)
  773. self.assertIn(("test_rnmo", "horse"), new_state.models)
  774. # RenameModel also repoints all incoming FKs and M2Ms
  775. self.assertEqual(
  776. new_state.models["test_rnmo", "rider"].fields["pony"].remote_field.model,
  777. "test_rnmo.Horse",
  778. )
  779. self.assertTableNotExists("test_rnmo_pony")
  780. self.assertTableExists("test_rnmo_horse")
  781. if connection.features.supports_foreign_keys:
  782. self.assertFKNotExists(
  783. "test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")
  784. )
  785. self.assertFKExists(
  786. "test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")
  787. )
  788. # Migrate backwards
  789. original_state = self.unapply_operations(
  790. "test_rnmo", project_state, [operation], atomic=atomic_rename
  791. )
  792. # Test original state and database
  793. self.assertIn(("test_rnmo", "pony"), original_state.models)
  794. self.assertNotIn(("test_rnmo", "horse"), original_state.models)
  795. self.assertEqual(
  796. original_state.models["test_rnmo", "rider"]
  797. .fields["pony"]
  798. .remote_field.model,
  799. "Pony",
  800. )
  801. self.assertTableExists("test_rnmo_pony")
  802. self.assertTableNotExists("test_rnmo_horse")
  803. if connection.features.supports_foreign_keys:
  804. self.assertFKExists(
  805. "test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id")
  806. )
  807. self.assertFKNotExists(
  808. "test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id")
  809. )
  810. # And deconstruction
  811. definition = operation.deconstruct()
  812. self.assertEqual(definition[0], "RenameModel")
  813. self.assertEqual(definition[1], [])
  814. self.assertEqual(definition[2], {"old_name": "Pony", "new_name": "Horse"})
  815. def test_rename_model_state_forwards(self):
  816. """
  817. RenameModel operations shouldn't trigger the caching of rendered apps
  818. on state without prior apps.
  819. """
  820. state = ProjectState()
  821. state.add_model(ModelState("migrations", "Foo", []))
  822. operation = migrations.RenameModel("Foo", "Bar")
  823. operation.state_forwards("migrations", state)
  824. self.assertNotIn("apps", state.__dict__)
  825. self.assertNotIn(("migrations", "foo"), state.models)
  826. self.assertIn(("migrations", "bar"), state.models)
  827. # Now with apps cached.
  828. apps = state.apps
  829. operation = migrations.RenameModel("Bar", "Foo")
  830. operation.state_forwards("migrations", state)
  831. self.assertIs(state.apps, apps)
  832. self.assertNotIn(("migrations", "bar"), state.models)
  833. self.assertIn(("migrations", "foo"), state.models)
  834. def test_rename_model_with_self_referential_fk(self):
  835. """
  836. Tests the RenameModel operation on model with self referential FK.
  837. """
  838. project_state = self.set_up_test_model("test_rmwsrf", related_model=True)
  839. # Test the state alteration
  840. operation = migrations.RenameModel("Rider", "HorseRider")
  841. self.assertEqual(operation.describe(), "Rename model Rider to HorseRider")
  842. new_state = project_state.clone()
  843. operation.state_forwards("test_rmwsrf", new_state)
  844. self.assertNotIn(("test_rmwsrf", "rider"), new_state.models)
  845. self.assertIn(("test_rmwsrf", "horserider"), new_state.models)
  846. # Remember, RenameModel also repoints all incoming FKs and M2Ms
  847. self.assertEqual(
  848. "self",
  849. new_state.models["test_rmwsrf", "horserider"]
  850. .fields["friend"]
  851. .remote_field.model,
  852. )
  853. HorseRider = new_state.apps.get_model("test_rmwsrf", "horserider")
  854. self.assertIs(
  855. HorseRider._meta.get_field("horserider").remote_field.model, HorseRider
  856. )
  857. # Test the database alteration
  858. self.assertTableExists("test_rmwsrf_rider")
  859. self.assertTableNotExists("test_rmwsrf_horserider")
  860. if connection.features.supports_foreign_keys:
  861. self.assertFKExists(
  862. "test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_rider", "id")
  863. )
  864. self.assertFKNotExists(
  865. "test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_horserider", "id")
  866. )
  867. atomic_rename = connection.features.supports_atomic_references_rename
  868. with connection.schema_editor(atomic=atomic_rename) as editor:
  869. operation.database_forwards("test_rmwsrf", editor, project_state, new_state)
  870. self.assertTableNotExists("test_rmwsrf_rider")
  871. self.assertTableExists("test_rmwsrf_horserider")
  872. if connection.features.supports_foreign_keys:
  873. self.assertFKNotExists(
  874. "test_rmwsrf_horserider", ["friend_id"], ("test_rmwsrf_rider", "id")
  875. )
  876. self.assertFKExists(
  877. "test_rmwsrf_horserider",
  878. ["friend_id"],
  879. ("test_rmwsrf_horserider", "id"),
  880. )
  881. # And test reversal
  882. with connection.schema_editor(atomic=atomic_rename) as editor:
  883. operation.database_backwards(
  884. "test_rmwsrf", editor, new_state, project_state
  885. )
  886. self.assertTableExists("test_rmwsrf_rider")
  887. self.assertTableNotExists("test_rmwsrf_horserider")
  888. if connection.features.supports_foreign_keys:
  889. self.assertFKExists(
  890. "test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_rider", "id")
  891. )
  892. self.assertFKNotExists(
  893. "test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_horserider", "id")
  894. )
  895. def test_rename_model_with_superclass_fk(self):
  896. """
  897. Tests the RenameModel operation on a model which has a superclass that
  898. has a foreign key.
  899. """
  900. project_state = self.set_up_test_model(
  901. "test_rmwsc", related_model=True, mti_model=True
  902. )
  903. # Test the state alteration
  904. operation = migrations.RenameModel("ShetlandPony", "LittleHorse")
  905. self.assertEqual(
  906. operation.describe(), "Rename model ShetlandPony to LittleHorse"
  907. )
  908. new_state = project_state.clone()
  909. operation.state_forwards("test_rmwsc", new_state)
  910. self.assertNotIn(("test_rmwsc", "shetlandpony"), new_state.models)
  911. self.assertIn(("test_rmwsc", "littlehorse"), new_state.models)
  912. # RenameModel shouldn't repoint the superclass's relations, only local ones
  913. self.assertEqual(
  914. project_state.models["test_rmwsc", "rider"]
  915. .fields["pony"]
  916. .remote_field.model,
  917. new_state.models["test_rmwsc", "rider"].fields["pony"].remote_field.model,
  918. )
  919. # Before running the migration we have a table for Shetland Pony, not
  920. # Little Horse.
  921. self.assertTableExists("test_rmwsc_shetlandpony")
  922. self.assertTableNotExists("test_rmwsc_littlehorse")
  923. if connection.features.supports_foreign_keys:
  924. # and the foreign key on rider points to pony, not shetland pony
  925. self.assertFKExists(
  926. "test_rmwsc_rider", ["pony_id"], ("test_rmwsc_pony", "id")
  927. )
  928. self.assertFKNotExists(
  929. "test_rmwsc_rider", ["pony_id"], ("test_rmwsc_shetlandpony", "id")
  930. )
  931. with connection.schema_editor(
  932. atomic=connection.features.supports_atomic_references_rename
  933. ) as editor:
  934. operation.database_forwards("test_rmwsc", editor, project_state, new_state)
  935. # Now we have a little horse table, not shetland pony
  936. self.assertTableNotExists("test_rmwsc_shetlandpony")
  937. self.assertTableExists("test_rmwsc_littlehorse")
  938. if connection.features.supports_foreign_keys:
  939. # but the Foreign keys still point at pony, not little horse
  940. self.assertFKExists(
  941. "test_rmwsc_rider", ["pony_id"], ("test_rmwsc_pony", "id")
  942. )
  943. self.assertFKNotExists(
  944. "test_rmwsc_rider", ["pony_id"], ("test_rmwsc_littlehorse", "id")
  945. )
  946. def test_rename_model_with_self_referential_m2m(self):
  947. app_label = "test_rename_model_with_self_referential_m2m"
  948. project_state = self.apply_operations(
  949. app_label,
  950. ProjectState(),
  951. operations=[
  952. migrations.CreateModel(
  953. "ReflexivePony",
  954. fields=[
  955. ("id", models.AutoField(primary_key=True)),
  956. ("ponies", models.ManyToManyField("self")),
  957. ],
  958. ),
  959. ],
  960. )
  961. project_state = self.apply_operations(
  962. app_label,
  963. project_state,
  964. operations=[
  965. migrations.RenameModel("ReflexivePony", "ReflexivePony2"),
  966. ],
  967. atomic=connection.features.supports_atomic_references_rename,
  968. )
  969. Pony = project_state.apps.get_model(app_label, "ReflexivePony2")
  970. pony = Pony.objects.create()
  971. pony.ponies.add(pony)
  972. def test_rename_model_with_m2m(self):
  973. app_label = "test_rename_model_with_m2m"
  974. project_state = self.apply_operations(
  975. app_label,
  976. ProjectState(),
  977. operations=[
  978. migrations.CreateModel(
  979. "Rider",
  980. fields=[
  981. ("id", models.AutoField(primary_key=True)),
  982. ],
  983. ),
  984. migrations.CreateModel(
  985. "Pony",
  986. fields=[
  987. ("id", models.AutoField(primary_key=True)),
  988. ("riders", models.ManyToManyField("Rider")),
  989. ],
  990. ),
  991. ],
  992. )
  993. Pony = project_state.apps.get_model(app_label, "Pony")
  994. Rider = project_state.apps.get_model(app_label, "Rider")
  995. pony = Pony.objects.create()
  996. rider = Rider.objects.create()
  997. pony.riders.add(rider)
  998. project_state = self.apply_operations(
  999. app_label,
  1000. project_state,
  1001. operations=[
  1002. migrations.RenameModel("Pony", "Pony2"),
  1003. ],
  1004. atomic=connection.features.supports_atomic_references_rename,
  1005. )
  1006. Pony = project_state.apps.get_model(app_label, "Pony2")
  1007. Rider = project_state.apps.get_model(app_label, "Rider")
  1008. pony = Pony.objects.create()
  1009. rider = Rider.objects.create()
  1010. pony.riders.add(rider)
  1011. self.assertEqual(Pony.objects.count(), 2)
  1012. self.assertEqual(Rider.objects.count(), 2)
  1013. self.assertEqual(
  1014. Pony._meta.get_field("riders").remote_field.through.objects.count(), 2
  1015. )
  1016. def test_rename_model_with_db_table_noop(self):
  1017. app_label = "test_rmwdbtn"
  1018. project_state = self.apply_operations(
  1019. app_label,
  1020. ProjectState(),
  1021. operations=[
  1022. migrations.CreateModel(
  1023. "Rider",
  1024. fields=[
  1025. ("id", models.AutoField(primary_key=True)),
  1026. ],
  1027. options={"db_table": "rider"},
  1028. ),
  1029. migrations.CreateModel(
  1030. "Pony",
  1031. fields=[
  1032. ("id", models.AutoField(primary_key=True)),
  1033. (
  1034. "rider",
  1035. models.ForeignKey("%s.Rider" % app_label, models.CASCADE),
  1036. ),
  1037. ],
  1038. ),
  1039. ],
  1040. )
  1041. new_state = project_state.clone()
  1042. operation = migrations.RenameModel("Rider", "Runner")
  1043. operation.state_forwards(app_label, new_state)
  1044. with connection.schema_editor() as editor:
  1045. with self.assertNumQueries(0):
  1046. operation.database_forwards(app_label, editor, project_state, new_state)
  1047. with connection.schema_editor() as editor:
  1048. with self.assertNumQueries(0):
  1049. operation.database_backwards(
  1050. app_label, editor, new_state, project_state
  1051. )
  1052. def test_rename_m2m_target_model(self):
  1053. app_label = "test_rename_m2m_target_model"
  1054. project_state = self.apply_operations(
  1055. app_label,
  1056. ProjectState(),
  1057. operations=[
  1058. migrations.CreateModel(
  1059. "Rider",
  1060. fields=[
  1061. ("id", models.AutoField(primary_key=True)),
  1062. ],
  1063. ),
  1064. migrations.CreateModel(
  1065. "Pony",
  1066. fields=[
  1067. ("id", models.AutoField(primary_key=True)),
  1068. ("riders", models.ManyToManyField("Rider")),
  1069. ],
  1070. ),
  1071. ],
  1072. )
  1073. Pony = project_state.apps.get_model(app_label, "Pony")
  1074. Rider = project_state.apps.get_model(app_label, "Rider")
  1075. pony = Pony.objects.create()
  1076. rider = Rider.objects.create()
  1077. pony.riders.add(rider)
  1078. project_state = self.apply_operations(
  1079. app_label,
  1080. project_state,
  1081. operations=[
  1082. migrations.RenameModel("Rider", "Rider2"),
  1083. ],
  1084. atomic=connection.features.supports_atomic_references_rename,
  1085. )
  1086. Pony = project_state.apps.get_model(app_label, "Pony")
  1087. Rider = project_state.apps.get_model(app_label, "Rider2")
  1088. pony = Pony.objects.create()
  1089. rider = Rider.objects.create()
  1090. pony.riders.add(rider)
  1091. self.assertEqual(Pony.objects.count(), 2)
  1092. self.assertEqual(Rider.objects.count(), 2)
  1093. self.assertEqual(
  1094. Pony._meta.get_field("riders").remote_field.through.objects.count(), 2
  1095. )
  1096. def test_rename_m2m_through_model(self):
  1097. app_label = "test_rename_through"
  1098. project_state = self.apply_operations(
  1099. app_label,
  1100. ProjectState(),
  1101. operations=[
  1102. migrations.CreateModel(
  1103. "Rider",
  1104. fields=[
  1105. ("id", models.AutoField(primary_key=True)),
  1106. ],
  1107. ),
  1108. migrations.CreateModel(
  1109. "Pony",
  1110. fields=[
  1111. ("id", models.AutoField(primary_key=True)),
  1112. ],
  1113. ),
  1114. migrations.CreateModel(
  1115. "PonyRider",
  1116. fields=[
  1117. ("id", models.AutoField(primary_key=True)),
  1118. (
  1119. "rider",
  1120. models.ForeignKey(
  1121. "test_rename_through.Rider", models.CASCADE
  1122. ),
  1123. ),
  1124. (
  1125. "pony",
  1126. models.ForeignKey(
  1127. "test_rename_through.Pony", models.CASCADE
  1128. ),
  1129. ),
  1130. ],
  1131. ),
  1132. migrations.AddField(
  1133. "Pony",
  1134. "riders",
  1135. models.ManyToManyField(
  1136. "test_rename_through.Rider",
  1137. through="test_rename_through.PonyRider",
  1138. ),
  1139. ),
  1140. ],
  1141. )
  1142. Pony = project_state.apps.get_model(app_label, "Pony")
  1143. Rider = project_state.apps.get_model(app_label, "Rider")
  1144. PonyRider = project_state.apps.get_model(app_label, "PonyRider")
  1145. pony = Pony.objects.create()
  1146. rider = Rider.objects.create()
  1147. PonyRider.objects.create(pony=pony, rider=rider)
  1148. project_state = self.apply_operations(
  1149. app_label,
  1150. project_state,
  1151. operations=[
  1152. migrations.RenameModel("PonyRider", "PonyRider2"),
  1153. ],
  1154. )
  1155. Pony = project_state.apps.get_model(app_label, "Pony")
  1156. Rider = project_state.apps.get_model(app_label, "Rider")
  1157. PonyRider = project_state.apps.get_model(app_label, "PonyRider2")
  1158. pony = Pony.objects.first()
  1159. rider = Rider.objects.create()
  1160. PonyRider.objects.create(pony=pony, rider=rider)
  1161. self.assertEqual(Pony.objects.count(), 1)
  1162. self.assertEqual(Rider.objects.count(), 2)
  1163. self.assertEqual(PonyRider.objects.count(), 2)
  1164. self.assertEqual(pony.riders.count(), 2)
  1165. def test_rename_m2m_model_after_rename_field(self):
  1166. """RenameModel renames a many-to-many column after a RenameField."""
  1167. app_label = "test_rename_multiple"
  1168. project_state = self.apply_operations(
  1169. app_label,
  1170. ProjectState(),
  1171. operations=[
  1172. migrations.CreateModel(
  1173. "Pony",
  1174. fields=[
  1175. ("id", models.AutoField(primary_key=True)),
  1176. ("name", models.CharField(max_length=20)),
  1177. ],
  1178. ),
  1179. migrations.CreateModel(
  1180. "Rider",
  1181. fields=[
  1182. ("id", models.AutoField(primary_key=True)),
  1183. (
  1184. "pony",
  1185. models.ForeignKey(
  1186. "test_rename_multiple.Pony", models.CASCADE
  1187. ),
  1188. ),
  1189. ],
  1190. ),
  1191. migrations.CreateModel(
  1192. "PonyRider",
  1193. fields=[
  1194. ("id", models.AutoField(primary_key=True)),
  1195. ("riders", models.ManyToManyField("Rider")),
  1196. ],
  1197. ),
  1198. migrations.RenameField(
  1199. model_name="pony", old_name="name", new_name="fancy_name"
  1200. ),
  1201. migrations.RenameModel(old_name="Rider", new_name="Jockey"),
  1202. ],
  1203. atomic=connection.features.supports_atomic_references_rename,
  1204. )
  1205. Pony = project_state.apps.get_model(app_label, "Pony")
  1206. Jockey = project_state.apps.get_model(app_label, "Jockey")
  1207. PonyRider = project_state.apps.get_model(app_label, "PonyRider")
  1208. # No "no such column" error means the column was renamed correctly.
  1209. pony = Pony.objects.create(fancy_name="a good name")
  1210. jockey = Jockey.objects.create(pony=pony)
  1211. ponyrider = PonyRider.objects.create()
  1212. ponyrider.riders.add(jockey)
  1213. def test_add_field(self):
  1214. """
  1215. Tests the AddField operation.
  1216. """
  1217. # Test the state alteration
  1218. operation = migrations.AddField(
  1219. "Pony",
  1220. "height",
  1221. models.FloatField(null=True, default=5),
  1222. )
  1223. self.assertEqual(operation.describe(), "Add field height to Pony")
  1224. self.assertEqual(operation.migration_name_fragment, "pony_height")
  1225. project_state, new_state = self.make_test_state("test_adfl", operation)
  1226. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  1227. field = new_state.models["test_adfl", "pony"].fields["height"]
  1228. self.assertEqual(field.default, 5)
  1229. # Test the database alteration
  1230. self.assertColumnNotExists("test_adfl_pony", "height")
  1231. with connection.schema_editor() as editor:
  1232. operation.database_forwards("test_adfl", editor, project_state, new_state)
  1233. self.assertColumnExists("test_adfl_pony", "height")
  1234. # And test reversal
  1235. with connection.schema_editor() as editor:
  1236. operation.database_backwards("test_adfl", editor, new_state, project_state)
  1237. self.assertColumnNotExists("test_adfl_pony", "height")
  1238. # And deconstruction
  1239. definition = operation.deconstruct()
  1240. self.assertEqual(definition[0], "AddField")
  1241. self.assertEqual(definition[1], [])
  1242. self.assertEqual(sorted(definition[2]), ["field", "model_name", "name"])
  1243. def test_add_charfield(self):
  1244. """
  1245. Tests the AddField operation on TextField.
  1246. """
  1247. project_state = self.set_up_test_model("test_adchfl")
  1248. Pony = project_state.apps.get_model("test_adchfl", "Pony")
  1249. pony = Pony.objects.create(weight=42)
  1250. new_state = self.apply_operations(
  1251. "test_adchfl",
  1252. project_state,
  1253. [
  1254. migrations.AddField(
  1255. "Pony",
  1256. "text",
  1257. models.CharField(max_length=10, default="some text"),
  1258. ),
  1259. migrations.AddField(
  1260. "Pony",
  1261. "empty",
  1262. models.CharField(max_length=10, default=""),
  1263. ),
  1264. # If not properly quoted digits would be interpreted as an int.
  1265. migrations.AddField(
  1266. "Pony",
  1267. "digits",
  1268. models.CharField(max_length=10, default="42"),
  1269. ),
  1270. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  1271. migrations.AddField(
  1272. "Pony",
  1273. "quotes",
  1274. models.CharField(max_length=10, default='"\'"'),
  1275. ),
  1276. ],
  1277. )
  1278. Pony = new_state.apps.get_model("test_adchfl", "Pony")
  1279. pony = Pony.objects.get(pk=pony.pk)
  1280. self.assertEqual(pony.text, "some text")
  1281. self.assertEqual(pony.empty, "")
  1282. self.assertEqual(pony.digits, "42")
  1283. self.assertEqual(pony.quotes, '"\'"')
  1284. def test_add_textfield(self):
  1285. """
  1286. Tests the AddField operation on TextField.
  1287. """
  1288. project_state = self.set_up_test_model("test_adtxtfl")
  1289. Pony = project_state.apps.get_model("test_adtxtfl", "Pony")
  1290. pony = Pony.objects.create(weight=42)
  1291. new_state = self.apply_operations(
  1292. "test_adtxtfl",
  1293. project_state,
  1294. [
  1295. migrations.AddField(
  1296. "Pony",
  1297. "text",
  1298. models.TextField(default="some text"),
  1299. ),
  1300. migrations.AddField(
  1301. "Pony",
  1302. "empty",
  1303. models.TextField(default=""),
  1304. ),
  1305. # If not properly quoted digits would be interpreted as an int.
  1306. migrations.AddField(
  1307. "Pony",
  1308. "digits",
  1309. models.TextField(default="42"),
  1310. ),
  1311. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  1312. migrations.AddField(
  1313. "Pony",
  1314. "quotes",
  1315. models.TextField(default='"\'"'),
  1316. ),
  1317. ],
  1318. )
  1319. Pony = new_state.apps.get_model("test_adtxtfl", "Pony")
  1320. pony = Pony.objects.get(pk=pony.pk)
  1321. self.assertEqual(pony.text, "some text")
  1322. self.assertEqual(pony.empty, "")
  1323. self.assertEqual(pony.digits, "42")
  1324. self.assertEqual(pony.quotes, '"\'"')
  1325. def test_add_binaryfield(self):
  1326. """
  1327. Tests the AddField operation on TextField/BinaryField.
  1328. """
  1329. project_state = self.set_up_test_model("test_adbinfl")
  1330. Pony = project_state.apps.get_model("test_adbinfl", "Pony")
  1331. pony = Pony.objects.create(weight=42)
  1332. new_state = self.apply_operations(
  1333. "test_adbinfl",
  1334. project_state,
  1335. [
  1336. migrations.AddField(
  1337. "Pony",
  1338. "blob",
  1339. models.BinaryField(default=b"some text"),
  1340. ),
  1341. migrations.AddField(
  1342. "Pony",
  1343. "empty",
  1344. models.BinaryField(default=b""),
  1345. ),
  1346. # If not properly quoted digits would be interpreted as an int.
  1347. migrations.AddField(
  1348. "Pony",
  1349. "digits",
  1350. models.BinaryField(default=b"42"),
  1351. ),
  1352. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  1353. migrations.AddField(
  1354. "Pony",
  1355. "quotes",
  1356. models.BinaryField(default=b'"\'"'),
  1357. ),
  1358. ],
  1359. )
  1360. Pony = new_state.apps.get_model("test_adbinfl", "Pony")
  1361. pony = Pony.objects.get(pk=pony.pk)
  1362. # SQLite returns buffer/memoryview, cast to bytes for checking.
  1363. self.assertEqual(bytes(pony.blob), b"some text")
  1364. self.assertEqual(bytes(pony.empty), b"")
  1365. self.assertEqual(bytes(pony.digits), b"42")
  1366. self.assertEqual(bytes(pony.quotes), b'"\'"')
  1367. def test_column_name_quoting(self):
  1368. """
  1369. Column names that are SQL keywords shouldn't cause problems when used
  1370. in migrations (#22168).
  1371. """
  1372. project_state = self.set_up_test_model("test_regr22168")
  1373. operation = migrations.AddField(
  1374. "Pony",
  1375. "order",
  1376. models.IntegerField(default=0),
  1377. )
  1378. new_state = project_state.clone()
  1379. operation.state_forwards("test_regr22168", new_state)
  1380. with connection.schema_editor() as editor:
  1381. operation.database_forwards(
  1382. "test_regr22168", editor, project_state, new_state
  1383. )
  1384. self.assertColumnExists("test_regr22168_pony", "order")
  1385. def test_add_field_preserve_default(self):
  1386. """
  1387. Tests the AddField operation's state alteration
  1388. when preserve_default = False.
  1389. """
  1390. project_state = self.set_up_test_model("test_adflpd")
  1391. # Test the state alteration
  1392. operation = migrations.AddField(
  1393. "Pony",
  1394. "height",
  1395. models.FloatField(null=True, default=4),
  1396. preserve_default=False,
  1397. )
  1398. new_state = project_state.clone()
  1399. operation.state_forwards("test_adflpd", new_state)
  1400. self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
  1401. field = new_state.models["test_adflpd", "pony"].fields["height"]
  1402. self.assertEqual(field.default, models.NOT_PROVIDED)
  1403. # Test the database alteration
  1404. project_state.apps.get_model("test_adflpd", "pony").objects.create(
  1405. weight=4,
  1406. )
  1407. self.assertColumnNotExists("test_adflpd_pony", "height")
  1408. with connection.schema_editor() as editor:
  1409. operation.database_forwards("test_adflpd", editor, project_state, new_state)
  1410. self.assertColumnExists("test_adflpd_pony", "height")
  1411. # And deconstruction
  1412. definition = operation.deconstruct()
  1413. self.assertEqual(definition[0], "AddField")
  1414. self.assertEqual(definition[1], [])
  1415. self.assertEqual(
  1416. sorted(definition[2]), ["field", "model_name", "name", "preserve_default"]
  1417. )
  1418. def test_add_field_m2m(self):
  1419. """
  1420. Tests the AddField operation with a ManyToManyField.
  1421. """
  1422. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  1423. # Test the state alteration
  1424. operation = migrations.AddField(
  1425. "Pony", "stables", models.ManyToManyField("Stable", related_name="ponies")
  1426. )
  1427. new_state = project_state.clone()
  1428. operation.state_forwards("test_adflmm", new_state)
  1429. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  1430. # Test the database alteration
  1431. self.assertTableNotExists("test_adflmm_pony_stables")
  1432. with connection.schema_editor() as editor:
  1433. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  1434. self.assertTableExists("test_adflmm_pony_stables")
  1435. self.assertColumnNotExists("test_adflmm_pony", "stables")
  1436. # Make sure the M2M field actually works
  1437. with atomic():
  1438. Pony = new_state.apps.get_model("test_adflmm", "Pony")
  1439. p = Pony.objects.create(pink=False, weight=4.55)
  1440. p.stables.create()
  1441. self.assertEqual(p.stables.count(), 1)
  1442. p.stables.all().delete()
  1443. # And test reversal
  1444. with connection.schema_editor() as editor:
  1445. operation.database_backwards(
  1446. "test_adflmm", editor, new_state, project_state
  1447. )
  1448. self.assertTableNotExists("test_adflmm_pony_stables")
  1449. def test_alter_field_m2m(self):
  1450. project_state = self.set_up_test_model("test_alflmm", second_model=True)
  1451. project_state = self.apply_operations(
  1452. "test_alflmm",
  1453. project_state,
  1454. operations=[
  1455. migrations.AddField(
  1456. "Pony",
  1457. "stables",
  1458. models.ManyToManyField("Stable", related_name="ponies"),
  1459. )
  1460. ],
  1461. )
  1462. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  1463. self.assertFalse(Pony._meta.get_field("stables").blank)
  1464. project_state = self.apply_operations(
  1465. "test_alflmm",
  1466. project_state,
  1467. operations=[
  1468. migrations.AlterField(
  1469. "Pony",
  1470. "stables",
  1471. models.ManyToManyField(
  1472. to="Stable", related_name="ponies", blank=True
  1473. ),
  1474. )
  1475. ],
  1476. )
  1477. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  1478. self.assertTrue(Pony._meta.get_field("stables").blank)
  1479. def test_repoint_field_m2m(self):
  1480. project_state = self.set_up_test_model(
  1481. "test_alflmm", second_model=True, third_model=True
  1482. )
  1483. project_state = self.apply_operations(
  1484. "test_alflmm",
  1485. project_state,
  1486. operations=[
  1487. migrations.AddField(
  1488. "Pony",
  1489. "places",
  1490. models.ManyToManyField("Stable", related_name="ponies"),
  1491. )
  1492. ],
  1493. )
  1494. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  1495. project_state = self.apply_operations(
  1496. "test_alflmm",
  1497. project_state,
  1498. operations=[
  1499. migrations.AlterField(
  1500. "Pony",
  1501. "places",
  1502. models.ManyToManyField(to="Van", related_name="ponies"),
  1503. )
  1504. ],
  1505. )
  1506. # Ensure the new field actually works
  1507. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  1508. p = Pony.objects.create(pink=False, weight=4.55)
  1509. p.places.create()
  1510. self.assertEqual(p.places.count(), 1)
  1511. p.places.all().delete()
  1512. def test_remove_field_m2m(self):
  1513. project_state = self.set_up_test_model("test_rmflmm", second_model=True)
  1514. project_state = self.apply_operations(
  1515. "test_rmflmm",
  1516. project_state,
  1517. operations=[
  1518. migrations.AddField(
  1519. "Pony",
  1520. "stables",
  1521. models.ManyToManyField("Stable", related_name="ponies"),
  1522. )
  1523. ],
  1524. )
  1525. self.assertTableExists("test_rmflmm_pony_stables")
  1526. with_field_state = project_state.clone()
  1527. operations = [migrations.RemoveField("Pony", "stables")]
  1528. project_state = self.apply_operations(
  1529. "test_rmflmm", project_state, operations=operations
  1530. )
  1531. self.assertTableNotExists("test_rmflmm_pony_stables")
  1532. # And test reversal
  1533. self.unapply_operations("test_rmflmm", with_field_state, operations=operations)
  1534. self.assertTableExists("test_rmflmm_pony_stables")
  1535. def test_remove_field_m2m_with_through(self):
  1536. project_state = self.set_up_test_model("test_rmflmmwt", second_model=True)
  1537. self.assertTableNotExists("test_rmflmmwt_ponystables")
  1538. project_state = self.apply_operations(
  1539. "test_rmflmmwt",
  1540. project_state,
  1541. operations=[
  1542. migrations.CreateModel(
  1543. "PonyStables",
  1544. fields=[
  1545. (
  1546. "pony",
  1547. models.ForeignKey("test_rmflmmwt.Pony", models.CASCADE),
  1548. ),
  1549. (
  1550. "stable",
  1551. models.ForeignKey("test_rmflmmwt.Stable", models.CASCADE),
  1552. ),
  1553. ],
  1554. ),
  1555. migrations.AddField(
  1556. "Pony",
  1557. "stables",
  1558. models.ManyToManyField(
  1559. "Stable",
  1560. related_name="ponies",
  1561. through="test_rmflmmwt.PonyStables",
  1562. ),
  1563. ),
  1564. ],
  1565. )
  1566. self.assertTableExists("test_rmflmmwt_ponystables")
  1567. operations = [
  1568. migrations.RemoveField("Pony", "stables"),
  1569. migrations.DeleteModel("PonyStables"),
  1570. ]
  1571. self.apply_operations("test_rmflmmwt", project_state, operations=operations)
  1572. def test_remove_field(self):
  1573. """
  1574. Tests the RemoveField operation.
  1575. """
  1576. project_state = self.set_up_test_model("test_rmfl")
  1577. # Test the state alteration
  1578. operation = migrations.RemoveField("Pony", "pink")
  1579. self.assertEqual(operation.describe(), "Remove field pink from Pony")
  1580. self.assertEqual(operation.migration_name_fragment, "remove_pony_pink")
  1581. new_state = project_state.clone()
  1582. operation.state_forwards("test_rmfl", new_state)
  1583. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  1584. # Test the database alteration
  1585. self.assertColumnExists("test_rmfl_pony", "pink")
  1586. with connection.schema_editor() as editor:
  1587. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  1588. self.assertColumnNotExists("test_rmfl_pony", "pink")
  1589. # And test reversal
  1590. with connection.schema_editor() as editor:
  1591. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  1592. self.assertColumnExists("test_rmfl_pony", "pink")
  1593. # And deconstruction
  1594. definition = operation.deconstruct()
  1595. self.assertEqual(definition[0], "RemoveField")
  1596. self.assertEqual(definition[1], [])
  1597. self.assertEqual(definition[2], {"model_name": "Pony", "name": "pink"})
  1598. def test_remove_fk(self):
  1599. """
  1600. Tests the RemoveField operation on a foreign key.
  1601. """
  1602. project_state = self.set_up_test_model("test_rfk", related_model=True)
  1603. self.assertColumnExists("test_rfk_rider", "pony_id")
  1604. operation = migrations.RemoveField("Rider", "pony")
  1605. new_state = project_state.clone()
  1606. operation.state_forwards("test_rfk", new_state)
  1607. with connection.schema_editor() as editor:
  1608. operation.database_forwards("test_rfk", editor, project_state, new_state)
  1609. self.assertColumnNotExists("test_rfk_rider", "pony_id")
  1610. with connection.schema_editor() as editor:
  1611. operation.database_backwards("test_rfk", editor, new_state, project_state)
  1612. self.assertColumnExists("test_rfk_rider", "pony_id")
  1613. def test_alter_model_table(self):
  1614. """
  1615. Tests the AlterModelTable operation.
  1616. """
  1617. project_state = self.set_up_test_model("test_almota")
  1618. # Test the state alteration
  1619. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  1620. self.assertEqual(
  1621. operation.describe(), "Rename table for Pony to test_almota_pony_2"
  1622. )
  1623. self.assertEqual(operation.migration_name_fragment, "alter_pony_table")
  1624. new_state = project_state.clone()
  1625. operation.state_forwards("test_almota", new_state)
  1626. self.assertEqual(
  1627. new_state.models["test_almota", "pony"].options["db_table"],
  1628. "test_almota_pony_2",
  1629. )
  1630. # Test the database alteration
  1631. self.assertTableExists("test_almota_pony")
  1632. self.assertTableNotExists("test_almota_pony_2")
  1633. with connection.schema_editor() as editor:
  1634. operation.database_forwards("test_almota", editor, project_state, new_state)
  1635. self.assertTableNotExists("test_almota_pony")
  1636. self.assertTableExists("test_almota_pony_2")
  1637. # And test reversal
  1638. with connection.schema_editor() as editor:
  1639. operation.database_backwards(
  1640. "test_almota", editor, new_state, project_state
  1641. )
  1642. self.assertTableExists("test_almota_pony")
  1643. self.assertTableNotExists("test_almota_pony_2")
  1644. # And deconstruction
  1645. definition = operation.deconstruct()
  1646. self.assertEqual(definition[0], "AlterModelTable")
  1647. self.assertEqual(definition[1], [])
  1648. self.assertEqual(definition[2], {"name": "Pony", "table": "test_almota_pony_2"})
  1649. def test_alter_model_table_none(self):
  1650. """
  1651. Tests the AlterModelTable operation if the table name is set to None.
  1652. """
  1653. operation = migrations.AlterModelTable("Pony", None)
  1654. self.assertEqual(operation.describe(), "Rename table for Pony to (default)")
  1655. def test_alter_model_table_noop(self):
  1656. """
  1657. Tests the AlterModelTable operation if the table name is not changed.
  1658. """
  1659. project_state = self.set_up_test_model("test_almota")
  1660. # Test the state alteration
  1661. operation = migrations.AlterModelTable("Pony", "test_almota_pony")
  1662. new_state = project_state.clone()
  1663. operation.state_forwards("test_almota", new_state)
  1664. self.assertEqual(
  1665. new_state.models["test_almota", "pony"].options["db_table"],
  1666. "test_almota_pony",
  1667. )
  1668. # Test the database alteration
  1669. self.assertTableExists("test_almota_pony")
  1670. with connection.schema_editor() as editor:
  1671. operation.database_forwards("test_almota", editor, project_state, new_state)
  1672. self.assertTableExists("test_almota_pony")
  1673. # And test reversal
  1674. with connection.schema_editor() as editor:
  1675. operation.database_backwards(
  1676. "test_almota", editor, new_state, project_state
  1677. )
  1678. self.assertTableExists("test_almota_pony")
  1679. def test_alter_model_table_m2m(self):
  1680. """
  1681. AlterModelTable should rename auto-generated M2M tables.
  1682. """
  1683. app_label = "test_talflmltlm2m"
  1684. pony_db_table = "pony_foo"
  1685. project_state = self.set_up_test_model(
  1686. app_label, second_model=True, db_table=pony_db_table
  1687. )
  1688. # Add the M2M field
  1689. first_state = project_state.clone()
  1690. operation = migrations.AddField(
  1691. "Pony", "stables", models.ManyToManyField("Stable")
  1692. )
  1693. operation.state_forwards(app_label, first_state)
  1694. with connection.schema_editor() as editor:
  1695. operation.database_forwards(app_label, editor, project_state, first_state)
  1696. original_m2m_table = "%s_%s" % (pony_db_table, "stables")
  1697. new_m2m_table = "%s_%s" % (app_label, "pony_stables")
  1698. self.assertTableExists(original_m2m_table)
  1699. self.assertTableNotExists(new_m2m_table)
  1700. # Rename the Pony db_table which should also rename the m2m table.
  1701. second_state = first_state.clone()
  1702. operation = migrations.AlterModelTable(name="pony", table=None)
  1703. operation.state_forwards(app_label, second_state)
  1704. atomic_rename = connection.features.supports_atomic_references_rename
  1705. with connection.schema_editor(atomic=atomic_rename) as editor:
  1706. operation.database_forwards(app_label, editor, first_state, second_state)
  1707. self.assertTableExists(new_m2m_table)
  1708. self.assertTableNotExists(original_m2m_table)
  1709. # And test reversal
  1710. with connection.schema_editor(atomic=atomic_rename) as editor:
  1711. operation.database_backwards(app_label, editor, second_state, first_state)
  1712. self.assertTableExists(original_m2m_table)
  1713. self.assertTableNotExists(new_m2m_table)
  1714. def test_alter_field(self):
  1715. """
  1716. Tests the AlterField operation.
  1717. """
  1718. project_state = self.set_up_test_model("test_alfl")
  1719. # Test the state alteration
  1720. operation = migrations.AlterField(
  1721. "Pony", "pink", models.IntegerField(null=True)
  1722. )
  1723. self.assertEqual(operation.describe(), "Alter field pink on Pony")
  1724. self.assertEqual(operation.migration_name_fragment, "alter_pony_pink")
  1725. new_state = project_state.clone()
  1726. operation.state_forwards("test_alfl", new_state)
  1727. self.assertIs(
  1728. project_state.models["test_alfl", "pony"].fields["pink"].null, False
  1729. )
  1730. self.assertIs(new_state.models["test_alfl", "pony"].fields["pink"].null, True)
  1731. # Test the database alteration
  1732. self.assertColumnNotNull("test_alfl_pony", "pink")
  1733. with connection.schema_editor() as editor:
  1734. operation.database_forwards("test_alfl", editor, project_state, new_state)
  1735. self.assertColumnNull("test_alfl_pony", "pink")
  1736. # And test reversal
  1737. with connection.schema_editor() as editor:
  1738. operation.database_backwards("test_alfl", editor, new_state, project_state)
  1739. self.assertColumnNotNull("test_alfl_pony", "pink")
  1740. # And deconstruction
  1741. definition = operation.deconstruct()
  1742. self.assertEqual(definition[0], "AlterField")
  1743. self.assertEqual(definition[1], [])
  1744. self.assertEqual(sorted(definition[2]), ["field", "model_name", "name"])
  1745. def test_alter_field_add_db_column_noop(self):
  1746. """
  1747. AlterField operation is a noop when adding only a db_column and the
  1748. column name is not changed.
  1749. """
  1750. app_label = "test_afadbn"
  1751. project_state = self.set_up_test_model(app_label, related_model=True)
  1752. pony_table = "%s_pony" % app_label
  1753. new_state = project_state.clone()
  1754. operation = migrations.AlterField(
  1755. "Pony", "weight", models.FloatField(db_column="weight")
  1756. )
  1757. operation.state_forwards(app_label, new_state)
  1758. self.assertIsNone(
  1759. project_state.models[app_label, "pony"].fields["weight"].db_column,
  1760. )
  1761. self.assertEqual(
  1762. new_state.models[app_label, "pony"].fields["weight"].db_column,
  1763. "weight",
  1764. )
  1765. self.assertColumnExists(pony_table, "weight")
  1766. with connection.schema_editor() as editor:
  1767. with self.assertNumQueries(0):
  1768. operation.database_forwards(app_label, editor, project_state, new_state)
  1769. self.assertColumnExists(pony_table, "weight")
  1770. with connection.schema_editor() as editor:
  1771. with self.assertNumQueries(0):
  1772. operation.database_backwards(
  1773. app_label, editor, new_state, project_state
  1774. )
  1775. self.assertColumnExists(pony_table, "weight")
  1776. rider_table = "%s_rider" % app_label
  1777. new_state = project_state.clone()
  1778. operation = migrations.AlterField(
  1779. "Rider",
  1780. "pony",
  1781. models.ForeignKey("Pony", models.CASCADE, db_column="pony_id"),
  1782. )
  1783. operation.state_forwards(app_label, new_state)
  1784. self.assertIsNone(
  1785. project_state.models[app_label, "rider"].fields["pony"].db_column,
  1786. )
  1787. self.assertIs(
  1788. new_state.models[app_label, "rider"].fields["pony"].db_column,
  1789. "pony_id",
  1790. )
  1791. self.assertColumnExists(rider_table, "pony_id")
  1792. with connection.schema_editor() as editor:
  1793. with self.assertNumQueries(0):
  1794. operation.database_forwards(app_label, editor, project_state, new_state)
  1795. self.assertColumnExists(rider_table, "pony_id")
  1796. with connection.schema_editor() as editor:
  1797. with self.assertNumQueries(0):
  1798. operation.database_forwards(app_label, editor, new_state, project_state)
  1799. self.assertColumnExists(rider_table, "pony_id")
  1800. def test_alter_field_pk(self):
  1801. """
  1802. The AlterField operation on primary keys (things like PostgreSQL's
  1803. SERIAL weirdness).
  1804. """
  1805. project_state = self.set_up_test_model("test_alflpk")
  1806. # Test the state alteration
  1807. operation = migrations.AlterField(
  1808. "Pony", "id", models.IntegerField(primary_key=True)
  1809. )
  1810. new_state = project_state.clone()
  1811. operation.state_forwards("test_alflpk", new_state)
  1812. self.assertIsInstance(
  1813. project_state.models["test_alflpk", "pony"].fields["id"],
  1814. models.AutoField,
  1815. )
  1816. self.assertIsInstance(
  1817. new_state.models["test_alflpk", "pony"].fields["id"],
  1818. models.IntegerField,
  1819. )
  1820. # Test the database alteration
  1821. with connection.schema_editor() as editor:
  1822. operation.database_forwards("test_alflpk", editor, project_state, new_state)
  1823. # And test reversal
  1824. with connection.schema_editor() as editor:
  1825. operation.database_backwards(
  1826. "test_alflpk", editor, new_state, project_state
  1827. )
  1828. @skipUnlessDBFeature("supports_foreign_keys")
  1829. def test_alter_field_pk_fk(self):
  1830. """
  1831. Tests the AlterField operation on primary keys changes any FKs pointing to it.
  1832. """
  1833. project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
  1834. project_state = self.apply_operations(
  1835. "test_alflpkfk",
  1836. project_state,
  1837. [
  1838. migrations.CreateModel(
  1839. "Stable",
  1840. fields=[
  1841. ("ponies", models.ManyToManyField("Pony")),
  1842. ],
  1843. ),
  1844. migrations.AddField(
  1845. "Pony",
  1846. "stables",
  1847. models.ManyToManyField("Stable"),
  1848. ),
  1849. ],
  1850. )
  1851. # Test the state alteration
  1852. operation = migrations.AlterField(
  1853. "Pony", "id", models.FloatField(primary_key=True)
  1854. )
  1855. new_state = project_state.clone()
  1856. operation.state_forwards("test_alflpkfk", new_state)
  1857. self.assertIsInstance(
  1858. project_state.models["test_alflpkfk", "pony"].fields["id"],
  1859. models.AutoField,
  1860. )
  1861. self.assertIsInstance(
  1862. new_state.models["test_alflpkfk", "pony"].fields["id"],
  1863. models.FloatField,
  1864. )
  1865. def assertIdTypeEqualsFkType():
  1866. with connection.cursor() as cursor:
  1867. id_type, id_null = [
  1868. (c.type_code, c.null_ok)
  1869. for c in connection.introspection.get_table_description(
  1870. cursor, "test_alflpkfk_pony"
  1871. )
  1872. if c.name == "id"
  1873. ][0]
  1874. fk_type, fk_null = [
  1875. (c.type_code, c.null_ok)
  1876. for c in connection.introspection.get_table_description(
  1877. cursor, "test_alflpkfk_rider"
  1878. )
  1879. if c.name == "pony_id"
  1880. ][0]
  1881. m2m_fk_type, m2m_fk_null = [
  1882. (c.type_code, c.null_ok)
  1883. for c in connection.introspection.get_table_description(
  1884. cursor,
  1885. "test_alflpkfk_pony_stables",
  1886. )
  1887. if c.name == "pony_id"
  1888. ][0]
  1889. remote_m2m_fk_type, remote_m2m_fk_null = [
  1890. (c.type_code, c.null_ok)
  1891. for c in connection.introspection.get_table_description(
  1892. cursor,
  1893. "test_alflpkfk_stable_ponies",
  1894. )
  1895. if c.name == "pony_id"
  1896. ][0]
  1897. self.assertEqual(id_type, fk_type)
  1898. self.assertEqual(id_type, m2m_fk_type)
  1899. self.assertEqual(id_type, remote_m2m_fk_type)
  1900. self.assertEqual(id_null, fk_null)
  1901. self.assertEqual(id_null, m2m_fk_null)
  1902. self.assertEqual(id_null, remote_m2m_fk_null)
  1903. assertIdTypeEqualsFkType()
  1904. # Test the database alteration
  1905. with connection.schema_editor() as editor:
  1906. operation.database_forwards(
  1907. "test_alflpkfk", editor, project_state, new_state
  1908. )
  1909. assertIdTypeEqualsFkType()
  1910. if connection.features.supports_foreign_keys:
  1911. self.assertFKExists(
  1912. "test_alflpkfk_pony_stables",
  1913. ["pony_id"],
  1914. ("test_alflpkfk_pony", "id"),
  1915. )
  1916. self.assertFKExists(
  1917. "test_alflpkfk_stable_ponies",
  1918. ["pony_id"],
  1919. ("test_alflpkfk_pony", "id"),
  1920. )
  1921. # And test reversal
  1922. with connection.schema_editor() as editor:
  1923. operation.database_backwards(
  1924. "test_alflpkfk", editor, new_state, project_state
  1925. )
  1926. assertIdTypeEqualsFkType()
  1927. if connection.features.supports_foreign_keys:
  1928. self.assertFKExists(
  1929. "test_alflpkfk_pony_stables",
  1930. ["pony_id"],
  1931. ("test_alflpkfk_pony", "id"),
  1932. )
  1933. self.assertFKExists(
  1934. "test_alflpkfk_stable_ponies",
  1935. ["pony_id"],
  1936. ("test_alflpkfk_pony", "id"),
  1937. )
  1938. @skipUnlessDBFeature("supports_collation_on_charfield", "supports_foreign_keys")
  1939. def test_alter_field_pk_fk_db_collation(self):
  1940. """
  1941. AlterField operation of db_collation on primary keys changes any FKs
  1942. pointing to it.
  1943. """
  1944. collation = connection.features.test_collations.get("non_default")
  1945. if not collation:
  1946. self.skipTest("Language collations are not supported.")
  1947. app_label = "test_alflpkfkdbc"
  1948. project_state = self.apply_operations(
  1949. app_label,
  1950. ProjectState(),
  1951. [
  1952. migrations.CreateModel(
  1953. "Pony",
  1954. [
  1955. ("id", models.CharField(primary_key=True, max_length=10)),
  1956. ],
  1957. ),
  1958. migrations.CreateModel(
  1959. "Rider",
  1960. [
  1961. ("pony", models.ForeignKey("Pony", models.CASCADE)),
  1962. ],
  1963. ),
  1964. migrations.CreateModel(
  1965. "Stable",
  1966. [
  1967. ("ponies", models.ManyToManyField("Pony")),
  1968. ],
  1969. ),
  1970. ],
  1971. )
  1972. # State alteration.
  1973. operation = migrations.AlterField(
  1974. "Pony",
  1975. "id",
  1976. models.CharField(
  1977. primary_key=True,
  1978. max_length=10,
  1979. db_collation=collation,
  1980. ),
  1981. )
  1982. new_state = project_state.clone()
  1983. operation.state_forwards(app_label, new_state)
  1984. # Database alteration.
  1985. with connection.schema_editor() as editor:
  1986. operation.database_forwards(app_label, editor, project_state, new_state)
  1987. self.assertColumnCollation(f"{app_label}_pony", "id", collation)
  1988. self.assertColumnCollation(f"{app_label}_rider", "pony_id", collation)
  1989. self.assertColumnCollation(f"{app_label}_stable_ponies", "pony_id", collation)
  1990. # Reversal.
  1991. with connection.schema_editor() as editor:
  1992. operation.database_backwards(app_label, editor, new_state, project_state)
  1993. def test_alter_field_pk_mti_fk(self):
  1994. app_label = "test_alflpkmtifk"
  1995. project_state = self.set_up_test_model(app_label, mti_model=True)
  1996. project_state = self.apply_operations(
  1997. app_label,
  1998. project_state,
  1999. [
  2000. migrations.CreateModel(
  2001. "ShetlandRider",
  2002. fields=[
  2003. (
  2004. "pony",
  2005. models.ForeignKey(
  2006. f"{app_label}.ShetlandPony", models.CASCADE
  2007. ),
  2008. ),
  2009. ],
  2010. ),
  2011. ],
  2012. )
  2013. operation = migrations.AlterField(
  2014. "Pony",
  2015. "id",
  2016. models.BigAutoField(primary_key=True),
  2017. )
  2018. new_state = project_state.clone()
  2019. operation.state_forwards(app_label, new_state)
  2020. self.assertIsInstance(
  2021. new_state.models[app_label, "pony"].fields["id"],
  2022. models.BigAutoField,
  2023. )
  2024. def _get_column_id_type(cursor, table, column):
  2025. return [
  2026. c.type_code
  2027. for c in connection.introspection.get_table_description(
  2028. cursor,
  2029. f"{app_label}_{table}",
  2030. )
  2031. if c.name == column
  2032. ][0]
  2033. def assertIdTypeEqualsMTIFkType():
  2034. with connection.cursor() as cursor:
  2035. parent_id_type = _get_column_id_type(cursor, "pony", "id")
  2036. child_id_type = _get_column_id_type(
  2037. cursor, "shetlandpony", "pony_ptr_id"
  2038. )
  2039. mti_id_type = _get_column_id_type(cursor, "shetlandrider", "pony_id")
  2040. self.assertEqual(parent_id_type, child_id_type)
  2041. self.assertEqual(parent_id_type, mti_id_type)
  2042. assertIdTypeEqualsMTIFkType()
  2043. # Alter primary key.
  2044. with connection.schema_editor() as editor:
  2045. operation.database_forwards(app_label, editor, project_state, new_state)
  2046. assertIdTypeEqualsMTIFkType()
  2047. if connection.features.supports_foreign_keys:
  2048. self.assertFKExists(
  2049. f"{app_label}_shetlandpony",
  2050. ["pony_ptr_id"],
  2051. (f"{app_label}_pony", "id"),
  2052. )
  2053. self.assertFKExists(
  2054. f"{app_label}_shetlandrider",
  2055. ["pony_id"],
  2056. (f"{app_label}_shetlandpony", "pony_ptr_id"),
  2057. )
  2058. # Reversal.
  2059. with connection.schema_editor() as editor:
  2060. operation.database_backwards(app_label, editor, new_state, project_state)
  2061. assertIdTypeEqualsMTIFkType()
  2062. if connection.features.supports_foreign_keys:
  2063. self.assertFKExists(
  2064. f"{app_label}_shetlandpony",
  2065. ["pony_ptr_id"],
  2066. (f"{app_label}_pony", "id"),
  2067. )
  2068. self.assertFKExists(
  2069. f"{app_label}_shetlandrider",
  2070. ["pony_id"],
  2071. (f"{app_label}_shetlandpony", "pony_ptr_id"),
  2072. )
  2073. def test_alter_field_pk_mti_and_fk_to_base(self):
  2074. app_label = "test_alflpkmtiftb"
  2075. project_state = self.set_up_test_model(
  2076. app_label,
  2077. mti_model=True,
  2078. related_model=True,
  2079. )
  2080. operation = migrations.AlterField(
  2081. "Pony",
  2082. "id",
  2083. models.BigAutoField(primary_key=True),
  2084. )
  2085. new_state = project_state.clone()
  2086. operation.state_forwards(app_label, new_state)
  2087. self.assertIsInstance(
  2088. new_state.models[app_label, "pony"].fields["id"],
  2089. models.BigAutoField,
  2090. )
  2091. def _get_column_id_type(cursor, table, column):
  2092. return [
  2093. c.type_code
  2094. for c in connection.introspection.get_table_description(
  2095. cursor,
  2096. f"{app_label}_{table}",
  2097. )
  2098. if c.name == column
  2099. ][0]
  2100. def assertIdTypeEqualsMTIFkType():
  2101. with connection.cursor() as cursor:
  2102. parent_id_type = _get_column_id_type(cursor, "pony", "id")
  2103. fk_id_type = _get_column_id_type(cursor, "rider", "pony_id")
  2104. child_id_type = _get_column_id_type(
  2105. cursor, "shetlandpony", "pony_ptr_id"
  2106. )
  2107. self.assertEqual(parent_id_type, child_id_type)
  2108. self.assertEqual(parent_id_type, fk_id_type)
  2109. assertIdTypeEqualsMTIFkType()
  2110. # Alter primary key.
  2111. with connection.schema_editor() as editor:
  2112. operation.database_forwards(app_label, editor, project_state, new_state)
  2113. assertIdTypeEqualsMTIFkType()
  2114. if connection.features.supports_foreign_keys:
  2115. self.assertFKExists(
  2116. f"{app_label}_shetlandpony",
  2117. ["pony_ptr_id"],
  2118. (f"{app_label}_pony", "id"),
  2119. )
  2120. self.assertFKExists(
  2121. f"{app_label}_rider",
  2122. ["pony_id"],
  2123. (f"{app_label}_pony", "id"),
  2124. )
  2125. # Reversal.
  2126. with connection.schema_editor() as editor:
  2127. operation.database_backwards(app_label, editor, new_state, project_state)
  2128. assertIdTypeEqualsMTIFkType()
  2129. if connection.features.supports_foreign_keys:
  2130. self.assertFKExists(
  2131. f"{app_label}_shetlandpony",
  2132. ["pony_ptr_id"],
  2133. (f"{app_label}_pony", "id"),
  2134. )
  2135. self.assertFKExists(
  2136. f"{app_label}_rider",
  2137. ["pony_id"],
  2138. (f"{app_label}_pony", "id"),
  2139. )
  2140. @skipUnlessDBFeature("supports_foreign_keys")
  2141. def test_alter_field_reloads_state_on_fk_with_to_field_target_type_change(self):
  2142. app_label = "test_alflrsfkwtflttc"
  2143. project_state = self.apply_operations(
  2144. app_label,
  2145. ProjectState(),
  2146. operations=[
  2147. migrations.CreateModel(
  2148. "Rider",
  2149. fields=[
  2150. ("id", models.AutoField(primary_key=True)),
  2151. ("code", models.IntegerField(unique=True)),
  2152. ],
  2153. ),
  2154. migrations.CreateModel(
  2155. "Pony",
  2156. fields=[
  2157. ("id", models.AutoField(primary_key=True)),
  2158. (
  2159. "rider",
  2160. models.ForeignKey(
  2161. "%s.Rider" % app_label, models.CASCADE, to_field="code"
  2162. ),
  2163. ),
  2164. ],
  2165. ),
  2166. ],
  2167. )
  2168. operation = migrations.AlterField(
  2169. "Rider",
  2170. "code",
  2171. models.CharField(max_length=100, unique=True),
  2172. )
  2173. self.apply_operations(app_label, project_state, operations=[operation])
  2174. id_type, id_null = [
  2175. (c.type_code, c.null_ok)
  2176. for c in self.get_table_description("%s_rider" % app_label)
  2177. if c.name == "code"
  2178. ][0]
  2179. fk_type, fk_null = [
  2180. (c.type_code, c.null_ok)
  2181. for c in self.get_table_description("%s_pony" % app_label)
  2182. if c.name == "rider_id"
  2183. ][0]
  2184. self.assertEqual(id_type, fk_type)
  2185. self.assertEqual(id_null, fk_null)
  2186. @skipUnlessDBFeature("supports_foreign_keys")
  2187. def test_alter_field_reloads_state_fk_with_to_field_related_name_target_type_change(
  2188. self,
  2189. ):
  2190. app_label = "test_alflrsfkwtflrnttc"
  2191. project_state = self.apply_operations(
  2192. app_label,
  2193. ProjectState(),
  2194. operations=[
  2195. migrations.CreateModel(
  2196. "Rider",
  2197. fields=[
  2198. ("id", models.AutoField(primary_key=True)),
  2199. ("code", models.PositiveIntegerField(unique=True)),
  2200. ],
  2201. ),
  2202. migrations.CreateModel(
  2203. "Pony",
  2204. fields=[
  2205. ("id", models.AutoField(primary_key=True)),
  2206. (
  2207. "rider",
  2208. models.ForeignKey(
  2209. "%s.Rider" % app_label,
  2210. models.CASCADE,
  2211. to_field="code",
  2212. related_name="+",
  2213. ),
  2214. ),
  2215. ],
  2216. ),
  2217. ],
  2218. )
  2219. operation = migrations.AlterField(
  2220. "Rider",
  2221. "code",
  2222. models.CharField(max_length=100, unique=True),
  2223. )
  2224. self.apply_operations(app_label, project_state, operations=[operation])
  2225. def test_alter_field_reloads_state_on_fk_target_changes(self):
  2226. """
  2227. If AlterField doesn't reload state appropriately, the second AlterField
  2228. crashes on MySQL due to not dropping the PonyRider.pony foreign key
  2229. constraint before modifying the column.
  2230. """
  2231. app_label = "alter_alter_field_reloads_state_on_fk_target_changes"
  2232. project_state = self.apply_operations(
  2233. app_label,
  2234. ProjectState(),
  2235. operations=[
  2236. migrations.CreateModel(
  2237. "Rider",
  2238. fields=[
  2239. ("id", models.CharField(primary_key=True, max_length=100)),
  2240. ],
  2241. ),
  2242. migrations.CreateModel(
  2243. "Pony",
  2244. fields=[
  2245. ("id", models.CharField(primary_key=True, max_length=100)),
  2246. (
  2247. "rider",
  2248. models.ForeignKey("%s.Rider" % app_label, models.CASCADE),
  2249. ),
  2250. ],
  2251. ),
  2252. migrations.CreateModel(
  2253. "PonyRider",
  2254. fields=[
  2255. ("id", models.AutoField(primary_key=True)),
  2256. (
  2257. "pony",
  2258. models.ForeignKey("%s.Pony" % app_label, models.CASCADE),
  2259. ),
  2260. ],
  2261. ),
  2262. ],
  2263. )
  2264. project_state = self.apply_operations(
  2265. app_label,
  2266. project_state,
  2267. operations=[
  2268. migrations.AlterField(
  2269. "Rider", "id", models.CharField(primary_key=True, max_length=99)
  2270. ),
  2271. migrations.AlterField(
  2272. "Pony", "id", models.CharField(primary_key=True, max_length=99)
  2273. ),
  2274. ],
  2275. )
  2276. def test_alter_field_reloads_state_on_fk_with_to_field_target_changes(self):
  2277. """
  2278. If AlterField doesn't reload state appropriately, the second AlterField
  2279. crashes on MySQL due to not dropping the PonyRider.pony foreign key
  2280. constraint before modifying the column.
  2281. """
  2282. app_label = "alter_alter_field_reloads_state_on_fk_with_to_field_target_changes"
  2283. project_state = self.apply_operations(
  2284. app_label,
  2285. ProjectState(),
  2286. operations=[
  2287. migrations.CreateModel(
  2288. "Rider",
  2289. fields=[
  2290. ("id", models.CharField(primary_key=True, max_length=100)),
  2291. ("slug", models.CharField(unique=True, max_length=100)),
  2292. ],
  2293. ),
  2294. migrations.CreateModel(
  2295. "Pony",
  2296. fields=[
  2297. ("id", models.CharField(primary_key=True, max_length=100)),
  2298. (
  2299. "rider",
  2300. models.ForeignKey(
  2301. "%s.Rider" % app_label, models.CASCADE, to_field="slug"
  2302. ),
  2303. ),
  2304. ("slug", models.CharField(unique=True, max_length=100)),
  2305. ],
  2306. ),
  2307. migrations.CreateModel(
  2308. "PonyRider",
  2309. fields=[
  2310. ("id", models.AutoField(primary_key=True)),
  2311. (
  2312. "pony",
  2313. models.ForeignKey(
  2314. "%s.Pony" % app_label, models.CASCADE, to_field="slug"
  2315. ),
  2316. ),
  2317. ],
  2318. ),
  2319. ],
  2320. )
  2321. project_state = self.apply_operations(
  2322. app_label,
  2323. project_state,
  2324. operations=[
  2325. migrations.AlterField(
  2326. "Rider", "slug", models.CharField(unique=True, max_length=99)
  2327. ),
  2328. migrations.AlterField(
  2329. "Pony", "slug", models.CharField(unique=True, max_length=99)
  2330. ),
  2331. ],
  2332. )
  2333. def test_rename_field_reloads_state_on_fk_target_changes(self):
  2334. """
  2335. If RenameField doesn't reload state appropriately, the AlterField
  2336. crashes on MySQL due to not dropping the PonyRider.pony foreign key
  2337. constraint before modifying the column.
  2338. """
  2339. app_label = "alter_rename_field_reloads_state_on_fk_target_changes"
  2340. project_state = self.apply_operations(
  2341. app_label,
  2342. ProjectState(),
  2343. operations=[
  2344. migrations.CreateModel(
  2345. "Rider",
  2346. fields=[
  2347. ("id", models.CharField(primary_key=True, max_length=100)),
  2348. ],
  2349. ),
  2350. migrations.CreateModel(
  2351. "Pony",
  2352. fields=[
  2353. ("id", models.CharField(primary_key=True, max_length=100)),
  2354. (
  2355. "rider",
  2356. models.ForeignKey("%s.Rider" % app_label, models.CASCADE),
  2357. ),
  2358. ],
  2359. ),
  2360. migrations.CreateModel(
  2361. "PonyRider",
  2362. fields=[
  2363. ("id", models.AutoField(primary_key=True)),
  2364. (
  2365. "pony",
  2366. models.ForeignKey("%s.Pony" % app_label, models.CASCADE),
  2367. ),
  2368. ],
  2369. ),
  2370. ],
  2371. )
  2372. project_state = self.apply_operations(
  2373. app_label,
  2374. project_state,
  2375. operations=[
  2376. migrations.RenameField("Rider", "id", "id2"),
  2377. migrations.AlterField(
  2378. "Pony", "id", models.CharField(primary_key=True, max_length=99)
  2379. ),
  2380. ],
  2381. atomic=connection.features.supports_atomic_references_rename,
  2382. )
  2383. def test_rename_field(self):
  2384. """
  2385. Tests the RenameField operation.
  2386. """
  2387. project_state = self.set_up_test_model(
  2388. "test_rnfl", unique_together=True, index_together=True
  2389. )
  2390. # Test the state alteration
  2391. operation = migrations.RenameField("Pony", "pink", "blue")
  2392. self.assertEqual(operation.describe(), "Rename field pink on Pony to blue")
  2393. self.assertEqual(operation.migration_name_fragment, "rename_pink_pony_blue")
  2394. new_state = project_state.clone()
  2395. operation.state_forwards("test_rnfl", new_state)
  2396. self.assertIn("blue", new_state.models["test_rnfl", "pony"].fields)
  2397. self.assertNotIn("pink", new_state.models["test_rnfl", "pony"].fields)
  2398. # Make sure the unique_together has the renamed column too
  2399. self.assertIn(
  2400. "blue", new_state.models["test_rnfl", "pony"].options["unique_together"][0]
  2401. )
  2402. self.assertNotIn(
  2403. "pink", new_state.models["test_rnfl", "pony"].options["unique_together"][0]
  2404. )
  2405. # Make sure the index_together has the renamed column too
  2406. self.assertIn(
  2407. "blue", new_state.models["test_rnfl", "pony"].options["index_together"][0]
  2408. )
  2409. self.assertNotIn(
  2410. "pink", new_state.models["test_rnfl", "pony"].options["index_together"][0]
  2411. )
  2412. # Test the database alteration
  2413. self.assertColumnExists("test_rnfl_pony", "pink")
  2414. self.assertColumnNotExists("test_rnfl_pony", "blue")
  2415. with connection.schema_editor() as editor:
  2416. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  2417. self.assertColumnExists("test_rnfl_pony", "blue")
  2418. self.assertColumnNotExists("test_rnfl_pony", "pink")
  2419. # Ensure the unique constraint has been ported over
  2420. with connection.cursor() as cursor:
  2421. cursor.execute("INSERT INTO test_rnfl_pony (blue, weight) VALUES (1, 1)")
  2422. with self.assertRaises(IntegrityError):
  2423. with atomic():
  2424. cursor.execute(
  2425. "INSERT INTO test_rnfl_pony (blue, weight) VALUES (1, 1)"
  2426. )
  2427. cursor.execute("DELETE FROM test_rnfl_pony")
  2428. # Ensure the index constraint has been ported over
  2429. self.assertIndexExists("test_rnfl_pony", ["weight", "blue"])
  2430. # And test reversal
  2431. with connection.schema_editor() as editor:
  2432. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  2433. self.assertColumnExists("test_rnfl_pony", "pink")
  2434. self.assertColumnNotExists("test_rnfl_pony", "blue")
  2435. # Ensure the index constraint has been reset
  2436. self.assertIndexExists("test_rnfl_pony", ["weight", "pink"])
  2437. # And deconstruction
  2438. definition = operation.deconstruct()
  2439. self.assertEqual(definition[0], "RenameField")
  2440. self.assertEqual(definition[1], [])
  2441. self.assertEqual(
  2442. definition[2],
  2443. {"model_name": "Pony", "old_name": "pink", "new_name": "blue"},
  2444. )
  2445. def test_rename_field_with_db_column(self):
  2446. project_state = self.apply_operations(
  2447. "test_rfwdbc",
  2448. ProjectState(),
  2449. operations=[
  2450. migrations.CreateModel(
  2451. "Pony",
  2452. fields=[
  2453. ("id", models.AutoField(primary_key=True)),
  2454. ("field", models.IntegerField(db_column="db_field")),
  2455. (
  2456. "fk_field",
  2457. models.ForeignKey(
  2458. "Pony",
  2459. models.CASCADE,
  2460. db_column="db_fk_field",
  2461. ),
  2462. ),
  2463. ],
  2464. ),
  2465. ],
  2466. )
  2467. new_state = project_state.clone()
  2468. operation = migrations.RenameField("Pony", "field", "renamed_field")
  2469. operation.state_forwards("test_rfwdbc", new_state)
  2470. self.assertIn("renamed_field", new_state.models["test_rfwdbc", "pony"].fields)
  2471. self.assertNotIn("field", new_state.models["test_rfwdbc", "pony"].fields)
  2472. self.assertColumnExists("test_rfwdbc_pony", "db_field")
  2473. with connection.schema_editor() as editor:
  2474. with self.assertNumQueries(0):
  2475. operation.database_forwards(
  2476. "test_rfwdbc", editor, project_state, new_state
  2477. )
  2478. self.assertColumnExists("test_rfwdbc_pony", "db_field")
  2479. with connection.schema_editor() as editor:
  2480. with self.assertNumQueries(0):
  2481. operation.database_backwards(
  2482. "test_rfwdbc", editor, new_state, project_state
  2483. )
  2484. self.assertColumnExists("test_rfwdbc_pony", "db_field")
  2485. new_state = project_state.clone()
  2486. operation = migrations.RenameField("Pony", "fk_field", "renamed_fk_field")
  2487. operation.state_forwards("test_rfwdbc", new_state)
  2488. self.assertIn(
  2489. "renamed_fk_field", new_state.models["test_rfwdbc", "pony"].fields
  2490. )
  2491. self.assertNotIn("fk_field", new_state.models["test_rfwdbc", "pony"].fields)
  2492. self.assertColumnExists("test_rfwdbc_pony", "db_fk_field")
  2493. with connection.schema_editor() as editor:
  2494. with self.assertNumQueries(0):
  2495. operation.database_forwards(
  2496. "test_rfwdbc", editor, project_state, new_state
  2497. )
  2498. self.assertColumnExists("test_rfwdbc_pony", "db_fk_field")
  2499. with connection.schema_editor() as editor:
  2500. with self.assertNumQueries(0):
  2501. operation.database_backwards(
  2502. "test_rfwdbc", editor, new_state, project_state
  2503. )
  2504. self.assertColumnExists("test_rfwdbc_pony", "db_fk_field")
  2505. def test_rename_field_case(self):
  2506. project_state = self.apply_operations(
  2507. "test_rfmx",
  2508. ProjectState(),
  2509. operations=[
  2510. migrations.CreateModel(
  2511. "Pony",
  2512. fields=[
  2513. ("id", models.AutoField(primary_key=True)),
  2514. ("field", models.IntegerField()),
  2515. ],
  2516. ),
  2517. ],
  2518. )
  2519. new_state = project_state.clone()
  2520. operation = migrations.RenameField("Pony", "field", "FiElD")
  2521. operation.state_forwards("test_rfmx", new_state)
  2522. self.assertIn("FiElD", new_state.models["test_rfmx", "pony"].fields)
  2523. self.assertColumnExists("test_rfmx_pony", "field")
  2524. with connection.schema_editor() as editor:
  2525. operation.database_forwards("test_rfmx", editor, project_state, new_state)
  2526. self.assertColumnExists(
  2527. "test_rfmx_pony",
  2528. connection.introspection.identifier_converter("FiElD"),
  2529. )
  2530. with connection.schema_editor() as editor:
  2531. operation.database_backwards("test_rfmx", editor, new_state, project_state)
  2532. self.assertColumnExists("test_rfmx_pony", "field")
  2533. def test_rename_missing_field(self):
  2534. state = ProjectState()
  2535. state.add_model(ModelState("app", "model", []))
  2536. with self.assertRaisesMessage(
  2537. FieldDoesNotExist, "app.model has no field named 'field'"
  2538. ):
  2539. migrations.RenameField("model", "field", "new_field").state_forwards(
  2540. "app", state
  2541. )
  2542. def test_rename_referenced_field_state_forward(self):
  2543. state = ProjectState()
  2544. state.add_model(
  2545. ModelState(
  2546. "app",
  2547. "Model",
  2548. [
  2549. ("id", models.AutoField(primary_key=True)),
  2550. ("field", models.IntegerField(unique=True)),
  2551. ],
  2552. )
  2553. )
  2554. state.add_model(
  2555. ModelState(
  2556. "app",
  2557. "OtherModel",
  2558. [
  2559. ("id", models.AutoField(primary_key=True)),
  2560. (
  2561. "fk",
  2562. models.ForeignKey("Model", models.CASCADE, to_field="field"),
  2563. ),
  2564. (
  2565. "fo",
  2566. models.ForeignObject(
  2567. "Model",
  2568. models.CASCADE,
  2569. from_fields=("fk",),
  2570. to_fields=("field",),
  2571. ),
  2572. ),
  2573. ],
  2574. )
  2575. )
  2576. operation = migrations.RenameField("Model", "field", "renamed")
  2577. new_state = state.clone()
  2578. operation.state_forwards("app", new_state)
  2579. self.assertEqual(
  2580. new_state.models["app", "othermodel"].fields["fk"].remote_field.field_name,
  2581. "renamed",
  2582. )
  2583. self.assertEqual(
  2584. new_state.models["app", "othermodel"].fields["fk"].from_fields, ["self"]
  2585. )
  2586. self.assertEqual(
  2587. new_state.models["app", "othermodel"].fields["fk"].to_fields, ("renamed",)
  2588. )
  2589. self.assertEqual(
  2590. new_state.models["app", "othermodel"].fields["fo"].from_fields, ("fk",)
  2591. )
  2592. self.assertEqual(
  2593. new_state.models["app", "othermodel"].fields["fo"].to_fields, ("renamed",)
  2594. )
  2595. operation = migrations.RenameField("OtherModel", "fk", "renamed_fk")
  2596. new_state = state.clone()
  2597. operation.state_forwards("app", new_state)
  2598. self.assertEqual(
  2599. new_state.models["app", "othermodel"]
  2600. .fields["renamed_fk"]
  2601. .remote_field.field_name,
  2602. "renamed",
  2603. )
  2604. self.assertEqual(
  2605. new_state.models["app", "othermodel"].fields["renamed_fk"].from_fields,
  2606. ("self",),
  2607. )
  2608. self.assertEqual(
  2609. new_state.models["app", "othermodel"].fields["renamed_fk"].to_fields,
  2610. ("renamed",),
  2611. )
  2612. self.assertEqual(
  2613. new_state.models["app", "othermodel"].fields["fo"].from_fields,
  2614. ("renamed_fk",),
  2615. )
  2616. self.assertEqual(
  2617. new_state.models["app", "othermodel"].fields["fo"].to_fields, ("renamed",)
  2618. )
  2619. def test_alter_unique_together(self):
  2620. """
  2621. Tests the AlterUniqueTogether operation.
  2622. """
  2623. project_state = self.set_up_test_model("test_alunto")
  2624. # Test the state alteration
  2625. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  2626. self.assertEqual(
  2627. operation.describe(), "Alter unique_together for Pony (1 constraint(s))"
  2628. )
  2629. self.assertEqual(
  2630. operation.migration_name_fragment,
  2631. "alter_pony_unique_together",
  2632. )
  2633. new_state = project_state.clone()
  2634. operation.state_forwards("test_alunto", new_state)
  2635. self.assertEqual(
  2636. len(
  2637. project_state.models["test_alunto", "pony"].options.get(
  2638. "unique_together", set()
  2639. )
  2640. ),
  2641. 0,
  2642. )
  2643. self.assertEqual(
  2644. len(
  2645. new_state.models["test_alunto", "pony"].options.get(
  2646. "unique_together", set()
  2647. )
  2648. ),
  2649. 1,
  2650. )
  2651. # Make sure we can insert duplicate rows
  2652. with connection.cursor() as cursor:
  2653. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  2654. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  2655. cursor.execute("DELETE FROM test_alunto_pony")
  2656. # Test the database alteration
  2657. with connection.schema_editor() as editor:
  2658. operation.database_forwards(
  2659. "test_alunto", editor, project_state, new_state
  2660. )
  2661. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  2662. with self.assertRaises(IntegrityError):
  2663. with atomic():
  2664. cursor.execute(
  2665. "INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)"
  2666. )
  2667. cursor.execute("DELETE FROM test_alunto_pony")
  2668. # And test reversal
  2669. with connection.schema_editor() as editor:
  2670. operation.database_backwards(
  2671. "test_alunto", editor, new_state, project_state
  2672. )
  2673. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  2674. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  2675. cursor.execute("DELETE FROM test_alunto_pony")
  2676. # Test flat unique_together
  2677. operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
  2678. operation.state_forwards("test_alunto", new_state)
  2679. self.assertEqual(
  2680. len(
  2681. new_state.models["test_alunto", "pony"].options.get(
  2682. "unique_together", set()
  2683. )
  2684. ),
  2685. 1,
  2686. )
  2687. # And deconstruction
  2688. definition = operation.deconstruct()
  2689. self.assertEqual(definition[0], "AlterUniqueTogether")
  2690. self.assertEqual(definition[1], [])
  2691. self.assertEqual(
  2692. definition[2], {"name": "Pony", "unique_together": {("pink", "weight")}}
  2693. )
  2694. def test_alter_unique_together_remove(self):
  2695. operation = migrations.AlterUniqueTogether("Pony", None)
  2696. self.assertEqual(
  2697. operation.describe(), "Alter unique_together for Pony (0 constraint(s))"
  2698. )
  2699. def test_add_index(self):
  2700. """
  2701. Test the AddIndex operation.
  2702. """
  2703. project_state = self.set_up_test_model("test_adin")
  2704. msg = (
  2705. "Indexes passed to AddIndex operations require a name argument. "
  2706. "<Index: fields=['pink']> doesn't have one."
  2707. )
  2708. with self.assertRaisesMessage(ValueError, msg):
  2709. migrations.AddIndex("Pony", models.Index(fields=["pink"]))
  2710. index = models.Index(fields=["pink"], name="test_adin_pony_pink_idx")
  2711. operation = migrations.AddIndex("Pony", index)
  2712. self.assertEqual(
  2713. operation.describe(),
  2714. "Create index test_adin_pony_pink_idx on field(s) pink of model Pony",
  2715. )
  2716. self.assertEqual(
  2717. operation.migration_name_fragment,
  2718. "pony_test_adin_pony_pink_idx",
  2719. )
  2720. new_state = project_state.clone()
  2721. operation.state_forwards("test_adin", new_state)
  2722. # Test the database alteration
  2723. self.assertEqual(
  2724. len(new_state.models["test_adin", "pony"].options["indexes"]), 1
  2725. )
  2726. self.assertIndexNotExists("test_adin_pony", ["pink"])
  2727. with connection.schema_editor() as editor:
  2728. operation.database_forwards("test_adin", editor, project_state, new_state)
  2729. self.assertIndexExists("test_adin_pony", ["pink"])
  2730. # And test reversal
  2731. with connection.schema_editor() as editor:
  2732. operation.database_backwards("test_adin", editor, new_state, project_state)
  2733. self.assertIndexNotExists("test_adin_pony", ["pink"])
  2734. # And deconstruction
  2735. definition = operation.deconstruct()
  2736. self.assertEqual(definition[0], "AddIndex")
  2737. self.assertEqual(definition[1], [])
  2738. self.assertEqual(definition[2], {"model_name": "Pony", "index": index})
  2739. def test_remove_index(self):
  2740. """
  2741. Test the RemoveIndex operation.
  2742. """
  2743. project_state = self.set_up_test_model("test_rmin", multicol_index=True)
  2744. self.assertTableExists("test_rmin_pony")
  2745. self.assertIndexExists("test_rmin_pony", ["pink", "weight"])
  2746. operation = migrations.RemoveIndex("Pony", "pony_test_idx")
  2747. self.assertEqual(operation.describe(), "Remove index pony_test_idx from Pony")
  2748. self.assertEqual(
  2749. operation.migration_name_fragment,
  2750. "remove_pony_pony_test_idx",
  2751. )
  2752. new_state = project_state.clone()
  2753. operation.state_forwards("test_rmin", new_state)
  2754. # Test the state alteration
  2755. self.assertEqual(
  2756. len(new_state.models["test_rmin", "pony"].options["indexes"]), 0
  2757. )
  2758. self.assertIndexExists("test_rmin_pony", ["pink", "weight"])
  2759. # Test the database alteration
  2760. with connection.schema_editor() as editor:
  2761. operation.database_forwards("test_rmin", editor, project_state, new_state)
  2762. self.assertIndexNotExists("test_rmin_pony", ["pink", "weight"])
  2763. # And test reversal
  2764. with connection.schema_editor() as editor:
  2765. operation.database_backwards("test_rmin", editor, new_state, project_state)
  2766. self.assertIndexExists("test_rmin_pony", ["pink", "weight"])
  2767. # And deconstruction
  2768. definition = operation.deconstruct()
  2769. self.assertEqual(definition[0], "RemoveIndex")
  2770. self.assertEqual(definition[1], [])
  2771. self.assertEqual(definition[2], {"model_name": "Pony", "name": "pony_test_idx"})
  2772. # Also test a field dropped with index - sqlite remake issue
  2773. operations = [
  2774. migrations.RemoveIndex("Pony", "pony_test_idx"),
  2775. migrations.RemoveField("Pony", "pink"),
  2776. ]
  2777. self.assertColumnExists("test_rmin_pony", "pink")
  2778. self.assertIndexExists("test_rmin_pony", ["pink", "weight"])
  2779. # Test database alteration
  2780. new_state = project_state.clone()
  2781. self.apply_operations("test_rmin", new_state, operations=operations)
  2782. self.assertColumnNotExists("test_rmin_pony", "pink")
  2783. self.assertIndexNotExists("test_rmin_pony", ["pink", "weight"])
  2784. # And test reversal
  2785. self.unapply_operations("test_rmin", project_state, operations=operations)
  2786. self.assertIndexExists("test_rmin_pony", ["pink", "weight"])
  2787. def test_rename_index(self):
  2788. app_label = "test_rnin"
  2789. project_state = self.set_up_test_model(app_label, index=True)
  2790. table_name = app_label + "_pony"
  2791. self.assertIndexNameExists(table_name, "pony_pink_idx")
  2792. self.assertIndexNameNotExists(table_name, "new_pony_test_idx")
  2793. operation = migrations.RenameIndex(
  2794. "Pony", new_name="new_pony_test_idx", old_name="pony_pink_idx"
  2795. )
  2796. self.assertEqual(
  2797. operation.describe(),
  2798. "Rename index pony_pink_idx on Pony to new_pony_test_idx",
  2799. )
  2800. self.assertEqual(
  2801. operation.migration_name_fragment,
  2802. "rename_pony_pink_idx_new_pony_test_idx",
  2803. )
  2804. new_state = project_state.clone()
  2805. operation.state_forwards(app_label, new_state)
  2806. # Rename index.
  2807. expected_queries = 1 if connection.features.can_rename_index else 2
  2808. with connection.schema_editor() as editor, self.assertNumQueries(
  2809. expected_queries
  2810. ):
  2811. operation.database_forwards(app_label, editor, project_state, new_state)
  2812. self.assertIndexNameNotExists(table_name, "pony_pink_idx")
  2813. self.assertIndexNameExists(table_name, "new_pony_test_idx")
  2814. # Reversal.
  2815. with connection.schema_editor() as editor, self.assertNumQueries(
  2816. expected_queries
  2817. ):
  2818. operation.database_backwards(app_label, editor, new_state, project_state)
  2819. self.assertIndexNameExists(table_name, "pony_pink_idx")
  2820. self.assertIndexNameNotExists(table_name, "new_pony_test_idx")
  2821. # Deconstruction.
  2822. definition = operation.deconstruct()
  2823. self.assertEqual(definition[0], "RenameIndex")
  2824. self.assertEqual(definition[1], [])
  2825. self.assertEqual(
  2826. definition[2],
  2827. {
  2828. "model_name": "Pony",
  2829. "old_name": "pony_pink_idx",
  2830. "new_name": "new_pony_test_idx",
  2831. },
  2832. )
  2833. def test_rename_index_arguments(self):
  2834. msg = "RenameIndex.old_name and old_fields are mutually exclusive."
  2835. with self.assertRaisesMessage(ValueError, msg):
  2836. migrations.RenameIndex(
  2837. "Pony",
  2838. new_name="new_idx_name",
  2839. old_name="old_idx_name",
  2840. old_fields=("weight", "pink"),
  2841. )
  2842. msg = "RenameIndex requires one of old_name and old_fields arguments to be set."
  2843. with self.assertRaisesMessage(ValueError, msg):
  2844. migrations.RenameIndex("Pony", new_name="new_idx_name")
  2845. def test_rename_index_unnamed_index(self):
  2846. app_label = "test_rninui"
  2847. project_state = self.set_up_test_model(app_label, index_together=True)
  2848. table_name = app_label + "_pony"
  2849. self.assertIndexNameNotExists(table_name, "new_pony_test_idx")
  2850. operation = migrations.RenameIndex(
  2851. "Pony", new_name="new_pony_test_idx", old_fields=("weight", "pink")
  2852. )
  2853. self.assertEqual(
  2854. operation.describe(),
  2855. "Rename unnamed index for ('weight', 'pink') on Pony to new_pony_test_idx",
  2856. )
  2857. self.assertEqual(
  2858. operation.migration_name_fragment,
  2859. "rename_pony_weight_pink_new_pony_test_idx",
  2860. )
  2861. new_state = project_state.clone()
  2862. operation.state_forwards(app_label, new_state)
  2863. # Rename index.
  2864. with connection.schema_editor() as editor:
  2865. operation.database_forwards(app_label, editor, project_state, new_state)
  2866. self.assertIndexNameExists(table_name, "new_pony_test_idx")
  2867. # Reverse is a no-op.
  2868. with connection.schema_editor() as editor, self.assertNumQueries(0):
  2869. operation.database_backwards(app_label, editor, new_state, project_state)
  2870. self.assertIndexNameExists(table_name, "new_pony_test_idx")
  2871. # Deconstruction.
  2872. definition = operation.deconstruct()
  2873. self.assertEqual(definition[0], "RenameIndex")
  2874. self.assertEqual(definition[1], [])
  2875. self.assertEqual(
  2876. definition[2],
  2877. {
  2878. "model_name": "Pony",
  2879. "new_name": "new_pony_test_idx",
  2880. "old_fields": ("weight", "pink"),
  2881. },
  2882. )
  2883. def test_rename_index_unknown_unnamed_index(self):
  2884. app_label = "test_rninuui"
  2885. project_state = self.set_up_test_model(app_label)
  2886. operation = migrations.RenameIndex(
  2887. "Pony", new_name="new_pony_test_idx", old_fields=("weight", "pink")
  2888. )
  2889. new_state = project_state.clone()
  2890. operation.state_forwards(app_label, new_state)
  2891. msg = "Found wrong number (0) of indexes for test_rninuui_pony(weight, pink)."
  2892. with connection.schema_editor() as editor:
  2893. with self.assertRaisesMessage(ValueError, msg):
  2894. operation.database_forwards(app_label, editor, project_state, new_state)
  2895. def test_add_index_state_forwards(self):
  2896. project_state = self.set_up_test_model("test_adinsf")
  2897. index = models.Index(fields=["pink"], name="test_adinsf_pony_pink_idx")
  2898. old_model = project_state.apps.get_model("test_adinsf", "Pony")
  2899. new_state = project_state.clone()
  2900. operation = migrations.AddIndex("Pony", index)
  2901. operation.state_forwards("test_adinsf", new_state)
  2902. new_model = new_state.apps.get_model("test_adinsf", "Pony")
  2903. self.assertIsNot(old_model, new_model)
  2904. def test_remove_index_state_forwards(self):
  2905. project_state = self.set_up_test_model("test_rminsf")
  2906. index = models.Index(fields=["pink"], name="test_rminsf_pony_pink_idx")
  2907. migrations.AddIndex("Pony", index).state_forwards("test_rminsf", project_state)
  2908. old_model = project_state.apps.get_model("test_rminsf", "Pony")
  2909. new_state = project_state.clone()
  2910. operation = migrations.RemoveIndex("Pony", "test_rminsf_pony_pink_idx")
  2911. operation.state_forwards("test_rminsf", new_state)
  2912. new_model = new_state.apps.get_model("test_rminsf", "Pony")
  2913. self.assertIsNot(old_model, new_model)
  2914. def test_rename_index_state_forwards(self):
  2915. app_label = "test_rnidsf"
  2916. project_state = self.set_up_test_model(app_label, index=True)
  2917. old_model = project_state.apps.get_model(app_label, "Pony")
  2918. new_state = project_state.clone()
  2919. operation = migrations.RenameIndex(
  2920. "Pony", new_name="new_pony_pink_idx", old_name="pony_pink_idx"
  2921. )
  2922. operation.state_forwards(app_label, new_state)
  2923. new_model = new_state.apps.get_model(app_label, "Pony")
  2924. self.assertIsNot(old_model, new_model)
  2925. self.assertEqual(new_model._meta.indexes[0].name, "new_pony_pink_idx")
  2926. def test_rename_index_state_forwards_unnamed_index(self):
  2927. app_label = "test_rnidsfui"
  2928. project_state = self.set_up_test_model(app_label, index_together=True)
  2929. old_model = project_state.apps.get_model(app_label, "Pony")
  2930. new_state = project_state.clone()
  2931. operation = migrations.RenameIndex(
  2932. "Pony", new_name="new_pony_pink_idx", old_fields=("weight", "pink")
  2933. )
  2934. operation.state_forwards(app_label, new_state)
  2935. new_model = new_state.apps.get_model(app_label, "Pony")
  2936. self.assertIsNot(old_model, new_model)
  2937. self.assertEqual(new_model._meta.index_together, tuple())
  2938. self.assertEqual(new_model._meta.indexes[0].name, "new_pony_pink_idx")
  2939. @skipUnlessDBFeature("supports_expression_indexes")
  2940. def test_add_func_index(self):
  2941. app_label = "test_addfuncin"
  2942. index_name = f"{app_label}_pony_abs_idx"
  2943. table_name = f"{app_label}_pony"
  2944. project_state = self.set_up_test_model(app_label)
  2945. index = models.Index(Abs("weight"), name=index_name)
  2946. operation = migrations.AddIndex("Pony", index)
  2947. self.assertEqual(
  2948. operation.describe(),
  2949. "Create index test_addfuncin_pony_abs_idx on Abs(F(weight)) on model Pony",
  2950. )
  2951. self.assertEqual(
  2952. operation.migration_name_fragment,
  2953. "pony_test_addfuncin_pony_abs_idx",
  2954. )
  2955. new_state = project_state.clone()
  2956. operation.state_forwards(app_label, new_state)
  2957. self.assertEqual(len(new_state.models[app_label, "pony"].options["indexes"]), 1)
  2958. self.assertIndexNameNotExists(table_name, index_name)
  2959. # Add index.
  2960. with connection.schema_editor() as editor:
  2961. operation.database_forwards(app_label, editor, project_state, new_state)
  2962. self.assertIndexNameExists(table_name, index_name)
  2963. # Reversal.
  2964. with connection.schema_editor() as editor:
  2965. operation.database_backwards(app_label, editor, new_state, project_state)
  2966. self.assertIndexNameNotExists(table_name, index_name)
  2967. # Deconstruction.
  2968. definition = operation.deconstruct()
  2969. self.assertEqual(definition[0], "AddIndex")
  2970. self.assertEqual(definition[1], [])
  2971. self.assertEqual(definition[2], {"model_name": "Pony", "index": index})
  2972. @skipUnlessDBFeature("supports_expression_indexes")
  2973. def test_remove_func_index(self):
  2974. app_label = "test_rmfuncin"
  2975. index_name = f"{app_label}_pony_abs_idx"
  2976. table_name = f"{app_label}_pony"
  2977. project_state = self.set_up_test_model(
  2978. app_label,
  2979. indexes=[
  2980. models.Index(Abs("weight"), name=index_name),
  2981. ],
  2982. )
  2983. self.assertTableExists(table_name)
  2984. self.assertIndexNameExists(table_name, index_name)
  2985. operation = migrations.RemoveIndex("Pony", index_name)
  2986. self.assertEqual(
  2987. operation.describe(),
  2988. "Remove index test_rmfuncin_pony_abs_idx from Pony",
  2989. )
  2990. self.assertEqual(
  2991. operation.migration_name_fragment,
  2992. "remove_pony_test_rmfuncin_pony_abs_idx",
  2993. )
  2994. new_state = project_state.clone()
  2995. operation.state_forwards(app_label, new_state)
  2996. self.assertEqual(len(new_state.models[app_label, "pony"].options["indexes"]), 0)
  2997. # Remove index.
  2998. with connection.schema_editor() as editor:
  2999. operation.database_forwards(app_label, editor, project_state, new_state)
  3000. self.assertIndexNameNotExists(table_name, index_name)
  3001. # Reversal.
  3002. with connection.schema_editor() as editor:
  3003. operation.database_backwards(app_label, editor, new_state, project_state)
  3004. self.assertIndexNameExists(table_name, index_name)
  3005. # Deconstruction.
  3006. definition = operation.deconstruct()
  3007. self.assertEqual(definition[0], "RemoveIndex")
  3008. self.assertEqual(definition[1], [])
  3009. self.assertEqual(definition[2], {"model_name": "Pony", "name": index_name})
  3010. @skipUnlessDBFeature("supports_expression_indexes")
  3011. def test_alter_field_with_func_index(self):
  3012. app_label = "test_alfuncin"
  3013. index_name = f"{app_label}_pony_idx"
  3014. table_name = f"{app_label}_pony"
  3015. project_state = self.set_up_test_model(
  3016. app_label,
  3017. indexes=[models.Index(Abs("pink"), name=index_name)],
  3018. )
  3019. operation = migrations.AlterField(
  3020. "Pony", "pink", models.IntegerField(null=True)
  3021. )
  3022. new_state = project_state.clone()
  3023. operation.state_forwards(app_label, new_state)
  3024. with connection.schema_editor() as editor:
  3025. operation.database_forwards(app_label, editor, project_state, new_state)
  3026. self.assertIndexNameExists(table_name, index_name)
  3027. with connection.schema_editor() as editor:
  3028. operation.database_backwards(app_label, editor, new_state, project_state)
  3029. self.assertIndexNameExists(table_name, index_name)
  3030. def test_alter_field_with_index(self):
  3031. """
  3032. Test AlterField operation with an index to ensure indexes created via
  3033. Meta.indexes don't get dropped with sqlite3 remake.
  3034. """
  3035. project_state = self.set_up_test_model("test_alflin", index=True)
  3036. operation = migrations.AlterField(
  3037. "Pony", "pink", models.IntegerField(null=True)
  3038. )
  3039. new_state = project_state.clone()
  3040. operation.state_forwards("test_alflin", new_state)
  3041. # Test the database alteration
  3042. self.assertColumnNotNull("test_alflin_pony", "pink")
  3043. with connection.schema_editor() as editor:
  3044. operation.database_forwards("test_alflin", editor, project_state, new_state)
  3045. # Index hasn't been dropped
  3046. self.assertIndexExists("test_alflin_pony", ["pink"])
  3047. # And test reversal
  3048. with connection.schema_editor() as editor:
  3049. operation.database_backwards(
  3050. "test_alflin", editor, new_state, project_state
  3051. )
  3052. # Ensure the index is still there
  3053. self.assertIndexExists("test_alflin_pony", ["pink"])
  3054. def test_alter_index_together(self):
  3055. """
  3056. Tests the AlterIndexTogether operation.
  3057. """
  3058. project_state = self.set_up_test_model("test_alinto")
  3059. # Test the state alteration
  3060. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  3061. self.assertEqual(
  3062. operation.describe(), "Alter index_together for Pony (1 constraint(s))"
  3063. )
  3064. self.assertEqual(
  3065. operation.migration_name_fragment,
  3066. "alter_pony_index_together",
  3067. )
  3068. new_state = project_state.clone()
  3069. operation.state_forwards("test_alinto", new_state)
  3070. self.assertEqual(
  3071. len(
  3072. project_state.models["test_alinto", "pony"].options.get(
  3073. "index_together", set()
  3074. )
  3075. ),
  3076. 0,
  3077. )
  3078. self.assertEqual(
  3079. len(
  3080. new_state.models["test_alinto", "pony"].options.get(
  3081. "index_together", set()
  3082. )
  3083. ),
  3084. 1,
  3085. )
  3086. # Make sure there's no matching index
  3087. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  3088. # Test the database alteration
  3089. with connection.schema_editor() as editor:
  3090. operation.database_forwards("test_alinto", editor, project_state, new_state)
  3091. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  3092. # And test reversal
  3093. with connection.schema_editor() as editor:
  3094. operation.database_backwards(
  3095. "test_alinto", editor, new_state, project_state
  3096. )
  3097. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  3098. # And deconstruction
  3099. definition = operation.deconstruct()
  3100. self.assertEqual(definition[0], "AlterIndexTogether")
  3101. self.assertEqual(definition[1], [])
  3102. self.assertEqual(
  3103. definition[2], {"name": "Pony", "index_together": {("pink", "weight")}}
  3104. )
  3105. def test_alter_index_together_remove(self):
  3106. operation = migrations.AlterIndexTogether("Pony", None)
  3107. self.assertEqual(
  3108. operation.describe(), "Alter index_together for Pony (0 constraint(s))"
  3109. )
  3110. @skipUnlessDBFeature("allows_multiple_constraints_on_same_fields")
  3111. def test_alter_index_together_remove_with_unique_together(self):
  3112. app_label = "test_alintoremove_wunto"
  3113. table_name = "%s_pony" % app_label
  3114. project_state = self.set_up_test_model(app_label, unique_together=True)
  3115. self.assertUniqueConstraintExists(table_name, ["pink", "weight"])
  3116. # Add index together.
  3117. new_state = project_state.clone()
  3118. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  3119. operation.state_forwards(app_label, new_state)
  3120. with connection.schema_editor() as editor:
  3121. operation.database_forwards(app_label, editor, project_state, new_state)
  3122. self.assertIndexExists(table_name, ["pink", "weight"])
  3123. # Remove index together.
  3124. project_state = new_state
  3125. new_state = project_state.clone()
  3126. operation = migrations.AlterIndexTogether("Pony", set())
  3127. operation.state_forwards(app_label, new_state)
  3128. with connection.schema_editor() as editor:
  3129. operation.database_forwards(app_label, editor, project_state, new_state)
  3130. self.assertIndexNotExists(table_name, ["pink", "weight"])
  3131. self.assertUniqueConstraintExists(table_name, ["pink", "weight"])
  3132. @skipUnlessDBFeature("supports_table_check_constraints")
  3133. def test_add_constraint(self):
  3134. project_state = self.set_up_test_model("test_addconstraint")
  3135. gt_check = models.Q(pink__gt=2)
  3136. gt_constraint = models.CheckConstraint(
  3137. check=gt_check, name="test_add_constraint_pony_pink_gt_2"
  3138. )
  3139. gt_operation = migrations.AddConstraint("Pony", gt_constraint)
  3140. self.assertEqual(
  3141. gt_operation.describe(),
  3142. "Create constraint test_add_constraint_pony_pink_gt_2 on model Pony",
  3143. )
  3144. self.assertEqual(
  3145. gt_operation.migration_name_fragment,
  3146. "pony_test_add_constraint_pony_pink_gt_2",
  3147. )
  3148. # Test the state alteration
  3149. new_state = project_state.clone()
  3150. gt_operation.state_forwards("test_addconstraint", new_state)
  3151. self.assertEqual(
  3152. len(new_state.models["test_addconstraint", "pony"].options["constraints"]),
  3153. 1,
  3154. )
  3155. Pony = new_state.apps.get_model("test_addconstraint", "Pony")
  3156. self.assertEqual(len(Pony._meta.constraints), 1)
  3157. # Test the database alteration
  3158. with connection.schema_editor() as editor:
  3159. gt_operation.database_forwards(
  3160. "test_addconstraint", editor, project_state, new_state
  3161. )
  3162. with self.assertRaises(IntegrityError), transaction.atomic():
  3163. Pony.objects.create(pink=1, weight=1.0)
  3164. # Add another one.
  3165. lt_check = models.Q(pink__lt=100)
  3166. lt_constraint = models.CheckConstraint(
  3167. check=lt_check, name="test_add_constraint_pony_pink_lt_100"
  3168. )
  3169. lt_operation = migrations.AddConstraint("Pony", lt_constraint)
  3170. lt_operation.state_forwards("test_addconstraint", new_state)
  3171. self.assertEqual(
  3172. len(new_state.models["test_addconstraint", "pony"].options["constraints"]),
  3173. 2,
  3174. )
  3175. Pony = new_state.apps.get_model("test_addconstraint", "Pony")
  3176. self.assertEqual(len(Pony._meta.constraints), 2)
  3177. with connection.schema_editor() as editor:
  3178. lt_operation.database_forwards(
  3179. "test_addconstraint", editor, project_state, new_state
  3180. )
  3181. with self.assertRaises(IntegrityError), transaction.atomic():
  3182. Pony.objects.create(pink=100, weight=1.0)
  3183. # Test reversal
  3184. with connection.schema_editor() as editor:
  3185. gt_operation.database_backwards(
  3186. "test_addconstraint", editor, new_state, project_state
  3187. )
  3188. Pony.objects.create(pink=1, weight=1.0)
  3189. # Test deconstruction
  3190. definition = gt_operation.deconstruct()
  3191. self.assertEqual(definition[0], "AddConstraint")
  3192. self.assertEqual(definition[1], [])
  3193. self.assertEqual(
  3194. definition[2], {"model_name": "Pony", "constraint": gt_constraint}
  3195. )
  3196. @skipUnlessDBFeature("supports_table_check_constraints")
  3197. def test_add_constraint_percent_escaping(self):
  3198. app_label = "add_constraint_string_quoting"
  3199. operations = [
  3200. migrations.CreateModel(
  3201. "Author",
  3202. fields=[
  3203. ("id", models.AutoField(primary_key=True)),
  3204. ("name", models.CharField(max_length=100)),
  3205. ("surname", models.CharField(max_length=100, default="")),
  3206. ("rebate", models.CharField(max_length=100)),
  3207. ],
  3208. ),
  3209. ]
  3210. from_state = self.apply_operations(app_label, ProjectState(), operations)
  3211. # "%" generated in startswith lookup should be escaped in a way that is
  3212. # considered a leading wildcard.
  3213. check = models.Q(name__startswith="Albert")
  3214. constraint = models.CheckConstraint(check=check, name="name_constraint")
  3215. operation = migrations.AddConstraint("Author", constraint)
  3216. to_state = from_state.clone()
  3217. operation.state_forwards(app_label, to_state)
  3218. with connection.schema_editor() as editor:
  3219. operation.database_forwards(app_label, editor, from_state, to_state)
  3220. Author = to_state.apps.get_model(app_label, "Author")
  3221. with self.assertRaises(IntegrityError), transaction.atomic():
  3222. Author.objects.create(name="Artur")
  3223. # Literal "%" should be escaped in a way that is not a considered a
  3224. # wildcard.
  3225. check = models.Q(rebate__endswith="%")
  3226. constraint = models.CheckConstraint(check=check, name="rebate_constraint")
  3227. operation = migrations.AddConstraint("Author", constraint)
  3228. from_state = to_state
  3229. to_state = from_state.clone()
  3230. operation.state_forwards(app_label, to_state)
  3231. Author = to_state.apps.get_model(app_label, "Author")
  3232. with connection.schema_editor() as editor:
  3233. operation.database_forwards(app_label, editor, from_state, to_state)
  3234. Author = to_state.apps.get_model(app_label, "Author")
  3235. with self.assertRaises(IntegrityError), transaction.atomic():
  3236. Author.objects.create(name="Albert", rebate="10$")
  3237. author = Author.objects.create(name="Albert", rebate="10%")
  3238. self.assertEqual(Author.objects.get(), author)
  3239. # Right-hand-side baked "%" literals should not be used for parameters
  3240. # interpolation.
  3241. check = ~models.Q(surname__startswith=models.F("name"))
  3242. constraint = models.CheckConstraint(check=check, name="name_constraint_rhs")
  3243. operation = migrations.AddConstraint("Author", constraint)
  3244. from_state = to_state
  3245. to_state = from_state.clone()
  3246. operation.state_forwards(app_label, to_state)
  3247. with connection.schema_editor() as editor:
  3248. operation.database_forwards(app_label, editor, from_state, to_state)
  3249. Author = to_state.apps.get_model(app_label, "Author")
  3250. with self.assertRaises(IntegrityError), transaction.atomic():
  3251. Author.objects.create(name="Albert", surname="Alberto")
  3252. @skipUnlessDBFeature("supports_table_check_constraints")
  3253. def test_add_or_constraint(self):
  3254. app_label = "test_addorconstraint"
  3255. constraint_name = "add_constraint_or"
  3256. from_state = self.set_up_test_model(app_label)
  3257. check = models.Q(pink__gt=2, weight__gt=2) | models.Q(weight__lt=0)
  3258. constraint = models.CheckConstraint(check=check, name=constraint_name)
  3259. operation = migrations.AddConstraint("Pony", constraint)
  3260. to_state = from_state.clone()
  3261. operation.state_forwards(app_label, to_state)
  3262. with connection.schema_editor() as editor:
  3263. operation.database_forwards(app_label, editor, from_state, to_state)
  3264. Pony = to_state.apps.get_model(app_label, "Pony")
  3265. with self.assertRaises(IntegrityError), transaction.atomic():
  3266. Pony.objects.create(pink=2, weight=3.0)
  3267. with self.assertRaises(IntegrityError), transaction.atomic():
  3268. Pony.objects.create(pink=3, weight=1.0)
  3269. Pony.objects.bulk_create(
  3270. [
  3271. Pony(pink=3, weight=-1.0),
  3272. Pony(pink=1, weight=-1.0),
  3273. Pony(pink=3, weight=3.0),
  3274. ]
  3275. )
  3276. @skipUnlessDBFeature("supports_table_check_constraints")
  3277. def test_add_constraint_combinable(self):
  3278. app_label = "test_addconstraint_combinable"
  3279. operations = [
  3280. migrations.CreateModel(
  3281. "Book",
  3282. fields=[
  3283. ("id", models.AutoField(primary_key=True)),
  3284. ("read", models.PositiveIntegerField()),
  3285. ("unread", models.PositiveIntegerField()),
  3286. ],
  3287. ),
  3288. ]
  3289. from_state = self.apply_operations(app_label, ProjectState(), operations)
  3290. constraint = models.CheckConstraint(
  3291. check=models.Q(read=(100 - models.F("unread"))),
  3292. name="test_addconstraint_combinable_sum_100",
  3293. )
  3294. operation = migrations.AddConstraint("Book", constraint)
  3295. to_state = from_state.clone()
  3296. operation.state_forwards(app_label, to_state)
  3297. with connection.schema_editor() as editor:
  3298. operation.database_forwards(app_label, editor, from_state, to_state)
  3299. Book = to_state.apps.get_model(app_label, "Book")
  3300. with self.assertRaises(IntegrityError), transaction.atomic():
  3301. Book.objects.create(read=70, unread=10)
  3302. Book.objects.create(read=70, unread=30)
  3303. @skipUnlessDBFeature("supports_table_check_constraints")
  3304. def test_remove_constraint(self):
  3305. project_state = self.set_up_test_model(
  3306. "test_removeconstraint",
  3307. constraints=[
  3308. models.CheckConstraint(
  3309. check=models.Q(pink__gt=2),
  3310. name="test_remove_constraint_pony_pink_gt_2",
  3311. ),
  3312. models.CheckConstraint(
  3313. check=models.Q(pink__lt=100),
  3314. name="test_remove_constraint_pony_pink_lt_100",
  3315. ),
  3316. ],
  3317. )
  3318. gt_operation = migrations.RemoveConstraint(
  3319. "Pony", "test_remove_constraint_pony_pink_gt_2"
  3320. )
  3321. self.assertEqual(
  3322. gt_operation.describe(),
  3323. "Remove constraint test_remove_constraint_pony_pink_gt_2 from model Pony",
  3324. )
  3325. self.assertEqual(
  3326. gt_operation.migration_name_fragment,
  3327. "remove_pony_test_remove_constraint_pony_pink_gt_2",
  3328. )
  3329. # Test state alteration
  3330. new_state = project_state.clone()
  3331. gt_operation.state_forwards("test_removeconstraint", new_state)
  3332. self.assertEqual(
  3333. len(
  3334. new_state.models["test_removeconstraint", "pony"].options["constraints"]
  3335. ),
  3336. 1,
  3337. )
  3338. Pony = new_state.apps.get_model("test_removeconstraint", "Pony")
  3339. self.assertEqual(len(Pony._meta.constraints), 1)
  3340. # Test database alteration
  3341. with connection.schema_editor() as editor:
  3342. gt_operation.database_forwards(
  3343. "test_removeconstraint", editor, project_state, new_state
  3344. )
  3345. Pony.objects.create(pink=1, weight=1.0).delete()
  3346. with self.assertRaises(IntegrityError), transaction.atomic():
  3347. Pony.objects.create(pink=100, weight=1.0)
  3348. # Remove the other one.
  3349. lt_operation = migrations.RemoveConstraint(
  3350. "Pony", "test_remove_constraint_pony_pink_lt_100"
  3351. )
  3352. lt_operation.state_forwards("test_removeconstraint", new_state)
  3353. self.assertEqual(
  3354. len(
  3355. new_state.models["test_removeconstraint", "pony"].options["constraints"]
  3356. ),
  3357. 0,
  3358. )
  3359. Pony = new_state.apps.get_model("test_removeconstraint", "Pony")
  3360. self.assertEqual(len(Pony._meta.constraints), 0)
  3361. with connection.schema_editor() as editor:
  3362. lt_operation.database_forwards(
  3363. "test_removeconstraint", editor, project_state, new_state
  3364. )
  3365. Pony.objects.create(pink=100, weight=1.0).delete()
  3366. # Test reversal
  3367. with connection.schema_editor() as editor:
  3368. gt_operation.database_backwards(
  3369. "test_removeconstraint", editor, new_state, project_state
  3370. )
  3371. with self.assertRaises(IntegrityError), transaction.atomic():
  3372. Pony.objects.create(pink=1, weight=1.0)
  3373. # Test deconstruction
  3374. definition = gt_operation.deconstruct()
  3375. self.assertEqual(definition[0], "RemoveConstraint")
  3376. self.assertEqual(definition[1], [])
  3377. self.assertEqual(
  3378. definition[2],
  3379. {"model_name": "Pony", "name": "test_remove_constraint_pony_pink_gt_2"},
  3380. )
  3381. def test_add_partial_unique_constraint(self):
  3382. project_state = self.set_up_test_model("test_addpartialuniqueconstraint")
  3383. partial_unique_constraint = models.UniqueConstraint(
  3384. fields=["pink"],
  3385. condition=models.Q(weight__gt=5),
  3386. name="test_constraint_pony_pink_for_weight_gt_5_uniq",
  3387. )
  3388. operation = migrations.AddConstraint("Pony", partial_unique_constraint)
  3389. self.assertEqual(
  3390. operation.describe(),
  3391. "Create constraint test_constraint_pony_pink_for_weight_gt_5_uniq "
  3392. "on model Pony",
  3393. )
  3394. # Test the state alteration
  3395. new_state = project_state.clone()
  3396. operation.state_forwards("test_addpartialuniqueconstraint", new_state)
  3397. self.assertEqual(
  3398. len(
  3399. new_state.models["test_addpartialuniqueconstraint", "pony"].options[
  3400. "constraints"
  3401. ]
  3402. ),
  3403. 1,
  3404. )
  3405. Pony = new_state.apps.get_model("test_addpartialuniqueconstraint", "Pony")
  3406. self.assertEqual(len(Pony._meta.constraints), 1)
  3407. # Test the database alteration
  3408. with connection.schema_editor() as editor:
  3409. operation.database_forwards(
  3410. "test_addpartialuniqueconstraint", editor, project_state, new_state
  3411. )
  3412. # Test constraint works
  3413. Pony.objects.create(pink=1, weight=4.0)
  3414. Pony.objects.create(pink=1, weight=4.0)
  3415. Pony.objects.create(pink=1, weight=6.0)
  3416. if connection.features.supports_partial_indexes:
  3417. with self.assertRaises(IntegrityError), transaction.atomic():
  3418. Pony.objects.create(pink=1, weight=7.0)
  3419. else:
  3420. Pony.objects.create(pink=1, weight=7.0)
  3421. # Test reversal
  3422. with connection.schema_editor() as editor:
  3423. operation.database_backwards(
  3424. "test_addpartialuniqueconstraint", editor, new_state, project_state
  3425. )
  3426. # Test constraint doesn't work
  3427. Pony.objects.create(pink=1, weight=7.0)
  3428. # Test deconstruction
  3429. definition = operation.deconstruct()
  3430. self.assertEqual(definition[0], "AddConstraint")
  3431. self.assertEqual(definition[1], [])
  3432. self.assertEqual(
  3433. definition[2],
  3434. {"model_name": "Pony", "constraint": partial_unique_constraint},
  3435. )
  3436. def test_remove_partial_unique_constraint(self):
  3437. project_state = self.set_up_test_model(
  3438. "test_removepartialuniqueconstraint",
  3439. constraints=[
  3440. models.UniqueConstraint(
  3441. fields=["pink"],
  3442. condition=models.Q(weight__gt=5),
  3443. name="test_constraint_pony_pink_for_weight_gt_5_uniq",
  3444. ),
  3445. ],
  3446. )
  3447. gt_operation = migrations.RemoveConstraint(
  3448. "Pony", "test_constraint_pony_pink_for_weight_gt_5_uniq"
  3449. )
  3450. self.assertEqual(
  3451. gt_operation.describe(),
  3452. "Remove constraint test_constraint_pony_pink_for_weight_gt_5_uniq from "
  3453. "model Pony",
  3454. )
  3455. # Test state alteration
  3456. new_state = project_state.clone()
  3457. gt_operation.state_forwards("test_removepartialuniqueconstraint", new_state)
  3458. self.assertEqual(
  3459. len(
  3460. new_state.models["test_removepartialuniqueconstraint", "pony"].options[
  3461. "constraints"
  3462. ]
  3463. ),
  3464. 0,
  3465. )
  3466. Pony = new_state.apps.get_model("test_removepartialuniqueconstraint", "Pony")
  3467. self.assertEqual(len(Pony._meta.constraints), 0)
  3468. # Test database alteration
  3469. with connection.schema_editor() as editor:
  3470. gt_operation.database_forwards(
  3471. "test_removepartialuniqueconstraint", editor, project_state, new_state
  3472. )
  3473. # Test constraint doesn't work
  3474. Pony.objects.create(pink=1, weight=4.0)
  3475. Pony.objects.create(pink=1, weight=4.0)
  3476. Pony.objects.create(pink=1, weight=6.0)
  3477. Pony.objects.create(pink=1, weight=7.0).delete()
  3478. # Test reversal
  3479. with connection.schema_editor() as editor:
  3480. gt_operation.database_backwards(
  3481. "test_removepartialuniqueconstraint", editor, new_state, project_state
  3482. )
  3483. # Test constraint works
  3484. if connection.features.supports_partial_indexes:
  3485. with self.assertRaises(IntegrityError), transaction.atomic():
  3486. Pony.objects.create(pink=1, weight=7.0)
  3487. else:
  3488. Pony.objects.create(pink=1, weight=7.0)
  3489. # Test deconstruction
  3490. definition = gt_operation.deconstruct()
  3491. self.assertEqual(definition[0], "RemoveConstraint")
  3492. self.assertEqual(definition[1], [])
  3493. self.assertEqual(
  3494. definition[2],
  3495. {
  3496. "model_name": "Pony",
  3497. "name": "test_constraint_pony_pink_for_weight_gt_5_uniq",
  3498. },
  3499. )
  3500. def test_add_deferred_unique_constraint(self):
  3501. app_label = "test_adddeferred_uc"
  3502. project_state = self.set_up_test_model(app_label)
  3503. deferred_unique_constraint = models.UniqueConstraint(
  3504. fields=["pink"],
  3505. name="deferred_pink_constraint_add",
  3506. deferrable=models.Deferrable.DEFERRED,
  3507. )
  3508. operation = migrations.AddConstraint("Pony", deferred_unique_constraint)
  3509. self.assertEqual(
  3510. operation.describe(),
  3511. "Create constraint deferred_pink_constraint_add on model Pony",
  3512. )
  3513. # Add constraint.
  3514. new_state = project_state.clone()
  3515. operation.state_forwards(app_label, new_state)
  3516. self.assertEqual(
  3517. len(new_state.models[app_label, "pony"].options["constraints"]), 1
  3518. )
  3519. Pony = new_state.apps.get_model(app_label, "Pony")
  3520. self.assertEqual(len(Pony._meta.constraints), 1)
  3521. with connection.schema_editor() as editor, CaptureQueriesContext(
  3522. connection
  3523. ) as ctx:
  3524. operation.database_forwards(app_label, editor, project_state, new_state)
  3525. Pony.objects.create(pink=1, weight=4.0)
  3526. if connection.features.supports_deferrable_unique_constraints:
  3527. # Unique constraint is deferred.
  3528. with transaction.atomic():
  3529. obj = Pony.objects.create(pink=1, weight=4.0)
  3530. obj.pink = 2
  3531. obj.save()
  3532. # Constraint behavior can be changed with SET CONSTRAINTS.
  3533. with self.assertRaises(IntegrityError):
  3534. with transaction.atomic(), connection.cursor() as cursor:
  3535. quoted_name = connection.ops.quote_name(
  3536. deferred_unique_constraint.name
  3537. )
  3538. cursor.execute("SET CONSTRAINTS %s IMMEDIATE" % quoted_name)
  3539. obj = Pony.objects.create(pink=1, weight=4.0)
  3540. obj.pink = 3
  3541. obj.save()
  3542. else:
  3543. self.assertEqual(len(ctx), 0)
  3544. Pony.objects.create(pink=1, weight=4.0)
  3545. # Reversal.
  3546. with connection.schema_editor() as editor:
  3547. operation.database_backwards(app_label, editor, new_state, project_state)
  3548. # Constraint doesn't work.
  3549. Pony.objects.create(pink=1, weight=4.0)
  3550. # Deconstruction.
  3551. definition = operation.deconstruct()
  3552. self.assertEqual(definition[0], "AddConstraint")
  3553. self.assertEqual(definition[1], [])
  3554. self.assertEqual(
  3555. definition[2],
  3556. {"model_name": "Pony", "constraint": deferred_unique_constraint},
  3557. )
  3558. def test_remove_deferred_unique_constraint(self):
  3559. app_label = "test_removedeferred_uc"
  3560. deferred_unique_constraint = models.UniqueConstraint(
  3561. fields=["pink"],
  3562. name="deferred_pink_constraint_rm",
  3563. deferrable=models.Deferrable.DEFERRED,
  3564. )
  3565. project_state = self.set_up_test_model(
  3566. app_label, constraints=[deferred_unique_constraint]
  3567. )
  3568. operation = migrations.RemoveConstraint("Pony", deferred_unique_constraint.name)
  3569. self.assertEqual(
  3570. operation.describe(),
  3571. "Remove constraint deferred_pink_constraint_rm from model Pony",
  3572. )
  3573. # Remove constraint.
  3574. new_state = project_state.clone()
  3575. operation.state_forwards(app_label, new_state)
  3576. self.assertEqual(
  3577. len(new_state.models[app_label, "pony"].options["constraints"]), 0
  3578. )
  3579. Pony = new_state.apps.get_model(app_label, "Pony")
  3580. self.assertEqual(len(Pony._meta.constraints), 0)
  3581. with connection.schema_editor() as editor, CaptureQueriesContext(
  3582. connection
  3583. ) as ctx:
  3584. operation.database_forwards(app_label, editor, project_state, new_state)
  3585. # Constraint doesn't work.
  3586. Pony.objects.create(pink=1, weight=4.0)
  3587. Pony.objects.create(pink=1, weight=4.0).delete()
  3588. if not connection.features.supports_deferrable_unique_constraints:
  3589. self.assertEqual(len(ctx), 0)
  3590. # Reversal.
  3591. with connection.schema_editor() as editor:
  3592. operation.database_backwards(app_label, editor, new_state, project_state)
  3593. if connection.features.supports_deferrable_unique_constraints:
  3594. # Unique constraint is deferred.
  3595. with transaction.atomic():
  3596. obj = Pony.objects.create(pink=1, weight=4.0)
  3597. obj.pink = 2
  3598. obj.save()
  3599. # Constraint behavior can be changed with SET CONSTRAINTS.
  3600. with self.assertRaises(IntegrityError):
  3601. with transaction.atomic(), connection.cursor() as cursor:
  3602. quoted_name = connection.ops.quote_name(
  3603. deferred_unique_constraint.name
  3604. )
  3605. cursor.execute("SET CONSTRAINTS %s IMMEDIATE" % quoted_name)
  3606. obj = Pony.objects.create(pink=1, weight=4.0)
  3607. obj.pink = 3
  3608. obj.save()
  3609. else:
  3610. Pony.objects.create(pink=1, weight=4.0)
  3611. # Deconstruction.
  3612. definition = operation.deconstruct()
  3613. self.assertEqual(definition[0], "RemoveConstraint")
  3614. self.assertEqual(definition[1], [])
  3615. self.assertEqual(
  3616. definition[2],
  3617. {
  3618. "model_name": "Pony",
  3619. "name": "deferred_pink_constraint_rm",
  3620. },
  3621. )
  3622. def test_add_covering_unique_constraint(self):
  3623. app_label = "test_addcovering_uc"
  3624. project_state = self.set_up_test_model(app_label)
  3625. covering_unique_constraint = models.UniqueConstraint(
  3626. fields=["pink"],
  3627. name="covering_pink_constraint_add",
  3628. include=["weight"],
  3629. )
  3630. operation = migrations.AddConstraint("Pony", covering_unique_constraint)
  3631. self.assertEqual(
  3632. operation.describe(),
  3633. "Create constraint covering_pink_constraint_add on model Pony",
  3634. )
  3635. # Add constraint.
  3636. new_state = project_state.clone()
  3637. operation.state_forwards(app_label, new_state)
  3638. self.assertEqual(
  3639. len(new_state.models[app_label, "pony"].options["constraints"]), 1
  3640. )
  3641. Pony = new_state.apps.get_model(app_label, "Pony")
  3642. self.assertEqual(len(Pony._meta.constraints), 1)
  3643. with connection.schema_editor() as editor, CaptureQueriesContext(
  3644. connection
  3645. ) as ctx:
  3646. operation.database_forwards(app_label, editor, project_state, new_state)
  3647. Pony.objects.create(pink=1, weight=4.0)
  3648. if connection.features.supports_covering_indexes:
  3649. with self.assertRaises(IntegrityError):
  3650. Pony.objects.create(pink=1, weight=4.0)
  3651. else:
  3652. self.assertEqual(len(ctx), 0)
  3653. Pony.objects.create(pink=1, weight=4.0)
  3654. # Reversal.
  3655. with connection.schema_editor() as editor:
  3656. operation.database_backwards(app_label, editor, new_state, project_state)
  3657. # Constraint doesn't work.
  3658. Pony.objects.create(pink=1, weight=4.0)
  3659. # Deconstruction.
  3660. definition = operation.deconstruct()
  3661. self.assertEqual(definition[0], "AddConstraint")
  3662. self.assertEqual(definition[1], [])
  3663. self.assertEqual(
  3664. definition[2],
  3665. {"model_name": "Pony", "constraint": covering_unique_constraint},
  3666. )
  3667. def test_remove_covering_unique_constraint(self):
  3668. app_label = "test_removecovering_uc"
  3669. covering_unique_constraint = models.UniqueConstraint(
  3670. fields=["pink"],
  3671. name="covering_pink_constraint_rm",
  3672. include=["weight"],
  3673. )
  3674. project_state = self.set_up_test_model(
  3675. app_label, constraints=[covering_unique_constraint]
  3676. )
  3677. operation = migrations.RemoveConstraint("Pony", covering_unique_constraint.name)
  3678. self.assertEqual(
  3679. operation.describe(),
  3680. "Remove constraint covering_pink_constraint_rm from model Pony",
  3681. )
  3682. # Remove constraint.
  3683. new_state = project_state.clone()
  3684. operation.state_forwards(app_label, new_state)
  3685. self.assertEqual(
  3686. len(new_state.models[app_label, "pony"].options["constraints"]), 0
  3687. )
  3688. Pony = new_state.apps.get_model(app_label, "Pony")
  3689. self.assertEqual(len(Pony._meta.constraints), 0)
  3690. with connection.schema_editor() as editor, CaptureQueriesContext(
  3691. connection
  3692. ) as ctx:
  3693. operation.database_forwards(app_label, editor, project_state, new_state)
  3694. # Constraint doesn't work.
  3695. Pony.objects.create(pink=1, weight=4.0)
  3696. Pony.objects.create(pink=1, weight=4.0).delete()
  3697. if not connection.features.supports_covering_indexes:
  3698. self.assertEqual(len(ctx), 0)
  3699. # Reversal.
  3700. with connection.schema_editor() as editor:
  3701. operation.database_backwards(app_label, editor, new_state, project_state)
  3702. if connection.features.supports_covering_indexes:
  3703. with self.assertRaises(IntegrityError):
  3704. Pony.objects.create(pink=1, weight=4.0)
  3705. else:
  3706. Pony.objects.create(pink=1, weight=4.0)
  3707. # Deconstruction.
  3708. definition = operation.deconstruct()
  3709. self.assertEqual(definition[0], "RemoveConstraint")
  3710. self.assertEqual(definition[1], [])
  3711. self.assertEqual(
  3712. definition[2],
  3713. {
  3714. "model_name": "Pony",
  3715. "name": "covering_pink_constraint_rm",
  3716. },
  3717. )
  3718. def test_alter_field_with_func_unique_constraint(self):
  3719. app_label = "test_alfuncuc"
  3720. constraint_name = f"{app_label}_pony_uq"
  3721. table_name = f"{app_label}_pony"
  3722. project_state = self.set_up_test_model(
  3723. app_label,
  3724. constraints=[
  3725. models.UniqueConstraint("pink", "weight", name=constraint_name)
  3726. ],
  3727. )
  3728. operation = migrations.AlterField(
  3729. "Pony", "pink", models.IntegerField(null=True)
  3730. )
  3731. new_state = project_state.clone()
  3732. operation.state_forwards(app_label, new_state)
  3733. with connection.schema_editor() as editor:
  3734. operation.database_forwards(app_label, editor, project_state, new_state)
  3735. if connection.features.supports_expression_indexes:
  3736. self.assertIndexNameExists(table_name, constraint_name)
  3737. with connection.schema_editor() as editor:
  3738. operation.database_backwards(app_label, editor, new_state, project_state)
  3739. if connection.features.supports_expression_indexes:
  3740. self.assertIndexNameExists(table_name, constraint_name)
  3741. def test_add_func_unique_constraint(self):
  3742. app_label = "test_adfuncuc"
  3743. constraint_name = f"{app_label}_pony_abs_uq"
  3744. table_name = f"{app_label}_pony"
  3745. project_state = self.set_up_test_model(app_label)
  3746. constraint = models.UniqueConstraint(Abs("weight"), name=constraint_name)
  3747. operation = migrations.AddConstraint("Pony", constraint)
  3748. self.assertEqual(
  3749. operation.describe(),
  3750. "Create constraint test_adfuncuc_pony_abs_uq on model Pony",
  3751. )
  3752. self.assertEqual(
  3753. operation.migration_name_fragment,
  3754. "pony_test_adfuncuc_pony_abs_uq",
  3755. )
  3756. new_state = project_state.clone()
  3757. operation.state_forwards(app_label, new_state)
  3758. self.assertEqual(
  3759. len(new_state.models[app_label, "pony"].options["constraints"]), 1
  3760. )
  3761. self.assertIndexNameNotExists(table_name, constraint_name)
  3762. # Add constraint.
  3763. with connection.schema_editor() as editor:
  3764. operation.database_forwards(app_label, editor, project_state, new_state)
  3765. Pony = new_state.apps.get_model(app_label, "Pony")
  3766. Pony.objects.create(weight=4.0)
  3767. if connection.features.supports_expression_indexes:
  3768. self.assertIndexNameExists(table_name, constraint_name)
  3769. with self.assertRaises(IntegrityError):
  3770. Pony.objects.create(weight=-4.0)
  3771. else:
  3772. self.assertIndexNameNotExists(table_name, constraint_name)
  3773. Pony.objects.create(weight=-4.0)
  3774. # Reversal.
  3775. with connection.schema_editor() as editor:
  3776. operation.database_backwards(app_label, editor, new_state, project_state)
  3777. self.assertIndexNameNotExists(table_name, constraint_name)
  3778. # Constraint doesn't work.
  3779. Pony.objects.create(weight=-4.0)
  3780. # Deconstruction.
  3781. definition = operation.deconstruct()
  3782. self.assertEqual(definition[0], "AddConstraint")
  3783. self.assertEqual(definition[1], [])
  3784. self.assertEqual(
  3785. definition[2],
  3786. {"model_name": "Pony", "constraint": constraint},
  3787. )
  3788. def test_remove_func_unique_constraint(self):
  3789. app_label = "test_rmfuncuc"
  3790. constraint_name = f"{app_label}_pony_abs_uq"
  3791. table_name = f"{app_label}_pony"
  3792. project_state = self.set_up_test_model(
  3793. app_label,
  3794. constraints=[
  3795. models.UniqueConstraint(Abs("weight"), name=constraint_name),
  3796. ],
  3797. )
  3798. self.assertTableExists(table_name)
  3799. if connection.features.supports_expression_indexes:
  3800. self.assertIndexNameExists(table_name, constraint_name)
  3801. operation = migrations.RemoveConstraint("Pony", constraint_name)
  3802. self.assertEqual(
  3803. operation.describe(),
  3804. "Remove constraint test_rmfuncuc_pony_abs_uq from model Pony",
  3805. )
  3806. self.assertEqual(
  3807. operation.migration_name_fragment,
  3808. "remove_pony_test_rmfuncuc_pony_abs_uq",
  3809. )
  3810. new_state = project_state.clone()
  3811. operation.state_forwards(app_label, new_state)
  3812. self.assertEqual(
  3813. len(new_state.models[app_label, "pony"].options["constraints"]), 0
  3814. )
  3815. Pony = new_state.apps.get_model(app_label, "Pony")
  3816. self.assertEqual(len(Pony._meta.constraints), 0)
  3817. # Remove constraint.
  3818. with connection.schema_editor() as editor:
  3819. operation.database_forwards(app_label, editor, project_state, new_state)
  3820. self.assertIndexNameNotExists(table_name, constraint_name)
  3821. # Constraint doesn't work.
  3822. Pony.objects.create(pink=1, weight=4.0)
  3823. Pony.objects.create(pink=1, weight=-4.0).delete()
  3824. # Reversal.
  3825. with connection.schema_editor() as editor:
  3826. operation.database_backwards(app_label, editor, new_state, project_state)
  3827. if connection.features.supports_expression_indexes:
  3828. self.assertIndexNameExists(table_name, constraint_name)
  3829. with self.assertRaises(IntegrityError):
  3830. Pony.objects.create(weight=-4.0)
  3831. else:
  3832. self.assertIndexNameNotExists(table_name, constraint_name)
  3833. Pony.objects.create(weight=-4.0)
  3834. # Deconstruction.
  3835. definition = operation.deconstruct()
  3836. self.assertEqual(definition[0], "RemoveConstraint")
  3837. self.assertEqual(definition[1], [])
  3838. self.assertEqual(definition[2], {"model_name": "Pony", "name": constraint_name})
  3839. def test_alter_model_options(self):
  3840. """
  3841. Tests the AlterModelOptions operation.
  3842. """
  3843. project_state = self.set_up_test_model("test_almoop")
  3844. # Test the state alteration (no DB alteration to test)
  3845. operation = migrations.AlterModelOptions(
  3846. "Pony", {"permissions": [("can_groom", "Can groom")]}
  3847. )
  3848. self.assertEqual(operation.describe(), "Change Meta options on Pony")
  3849. self.assertEqual(operation.migration_name_fragment, "alter_pony_options")
  3850. new_state = project_state.clone()
  3851. operation.state_forwards("test_almoop", new_state)
  3852. self.assertEqual(
  3853. len(
  3854. project_state.models["test_almoop", "pony"].options.get(
  3855. "permissions", []
  3856. )
  3857. ),
  3858. 0,
  3859. )
  3860. self.assertEqual(
  3861. len(new_state.models["test_almoop", "pony"].options.get("permissions", [])),
  3862. 1,
  3863. )
  3864. self.assertEqual(
  3865. new_state.models["test_almoop", "pony"].options["permissions"][0][0],
  3866. "can_groom",
  3867. )
  3868. # And deconstruction
  3869. definition = operation.deconstruct()
  3870. self.assertEqual(definition[0], "AlterModelOptions")
  3871. self.assertEqual(definition[1], [])
  3872. self.assertEqual(
  3873. definition[2],
  3874. {"name": "Pony", "options": {"permissions": [("can_groom", "Can groom")]}},
  3875. )
  3876. def test_alter_model_options_emptying(self):
  3877. """
  3878. The AlterModelOptions operation removes keys from the dict (#23121)
  3879. """
  3880. project_state = self.set_up_test_model("test_almoop", options=True)
  3881. # Test the state alteration (no DB alteration to test)
  3882. operation = migrations.AlterModelOptions("Pony", {})
  3883. self.assertEqual(operation.describe(), "Change Meta options on Pony")
  3884. new_state = project_state.clone()
  3885. operation.state_forwards("test_almoop", new_state)
  3886. self.assertEqual(
  3887. len(
  3888. project_state.models["test_almoop", "pony"].options.get(
  3889. "permissions", []
  3890. )
  3891. ),
  3892. 1,
  3893. )
  3894. self.assertEqual(
  3895. len(new_state.models["test_almoop", "pony"].options.get("permissions", [])),
  3896. 0,
  3897. )
  3898. # And deconstruction
  3899. definition = operation.deconstruct()
  3900. self.assertEqual(definition[0], "AlterModelOptions")
  3901. self.assertEqual(definition[1], [])
  3902. self.assertEqual(definition[2], {"name": "Pony", "options": {}})
  3903. def test_alter_order_with_respect_to(self):
  3904. """
  3905. Tests the AlterOrderWithRespectTo operation.
  3906. """
  3907. project_state = self.set_up_test_model("test_alorwrtto", related_model=True)
  3908. # Test the state alteration
  3909. operation = migrations.AlterOrderWithRespectTo("Rider", "pony")
  3910. self.assertEqual(
  3911. operation.describe(), "Set order_with_respect_to on Rider to pony"
  3912. )
  3913. self.assertEqual(
  3914. operation.migration_name_fragment,
  3915. "alter_rider_order_with_respect_to",
  3916. )
  3917. new_state = project_state.clone()
  3918. operation.state_forwards("test_alorwrtto", new_state)
  3919. self.assertIsNone(
  3920. project_state.models["test_alorwrtto", "rider"].options.get(
  3921. "order_with_respect_to", None
  3922. )
  3923. )
  3924. self.assertEqual(
  3925. new_state.models["test_alorwrtto", "rider"].options.get(
  3926. "order_with_respect_to", None
  3927. ),
  3928. "pony",
  3929. )
  3930. # Make sure there's no matching index
  3931. self.assertColumnNotExists("test_alorwrtto_rider", "_order")
  3932. # Create some rows before alteration
  3933. rendered_state = project_state.apps
  3934. pony = rendered_state.get_model("test_alorwrtto", "Pony").objects.create(
  3935. weight=50
  3936. )
  3937. rider1 = rendered_state.get_model("test_alorwrtto", "Rider").objects.create(
  3938. pony=pony
  3939. )
  3940. rider1.friend = rider1
  3941. rider1.save()
  3942. rider2 = rendered_state.get_model("test_alorwrtto", "Rider").objects.create(
  3943. pony=pony
  3944. )
  3945. rider2.friend = rider2
  3946. rider2.save()
  3947. # Test the database alteration
  3948. with connection.schema_editor() as editor:
  3949. operation.database_forwards(
  3950. "test_alorwrtto", editor, project_state, new_state
  3951. )
  3952. self.assertColumnExists("test_alorwrtto_rider", "_order")
  3953. # Check for correct value in rows
  3954. updated_riders = new_state.apps.get_model(
  3955. "test_alorwrtto", "Rider"
  3956. ).objects.all()
  3957. self.assertEqual(updated_riders[0]._order, 0)
  3958. self.assertEqual(updated_riders[1]._order, 0)
  3959. # And test reversal
  3960. with connection.schema_editor() as editor:
  3961. operation.database_backwards(
  3962. "test_alorwrtto", editor, new_state, project_state
  3963. )
  3964. self.assertColumnNotExists("test_alorwrtto_rider", "_order")
  3965. # And deconstruction
  3966. definition = operation.deconstruct()
  3967. self.assertEqual(definition[0], "AlterOrderWithRespectTo")
  3968. self.assertEqual(definition[1], [])
  3969. self.assertEqual(
  3970. definition[2], {"name": "Rider", "order_with_respect_to": "pony"}
  3971. )
  3972. def test_alter_model_managers(self):
  3973. """
  3974. The managers on a model are set.
  3975. """
  3976. project_state = self.set_up_test_model("test_almoma")
  3977. # Test the state alteration
  3978. operation = migrations.AlterModelManagers(
  3979. "Pony",
  3980. managers=[
  3981. ("food_qs", FoodQuerySet.as_manager()),
  3982. ("food_mgr", FoodManager("a", "b")),
  3983. ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)),
  3984. ],
  3985. )
  3986. self.assertEqual(operation.describe(), "Change managers on Pony")
  3987. self.assertEqual(operation.migration_name_fragment, "alter_pony_managers")
  3988. managers = project_state.models["test_almoma", "pony"].managers
  3989. self.assertEqual(managers, [])
  3990. new_state = project_state.clone()
  3991. operation.state_forwards("test_almoma", new_state)
  3992. self.assertIn(("test_almoma", "pony"), new_state.models)
  3993. managers = new_state.models["test_almoma", "pony"].managers
  3994. self.assertEqual(managers[0][0], "food_qs")
  3995. self.assertIsInstance(managers[0][1], models.Manager)
  3996. self.assertEqual(managers[1][0], "food_mgr")
  3997. self.assertIsInstance(managers[1][1], FoodManager)
  3998. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  3999. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  4000. self.assertIsInstance(managers[2][1], FoodManager)
  4001. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  4002. rendered_state = new_state.apps
  4003. model = rendered_state.get_model("test_almoma", "pony")
  4004. self.assertIsInstance(model.food_qs, models.Manager)
  4005. self.assertIsInstance(model.food_mgr, FoodManager)
  4006. self.assertIsInstance(model.food_mgr_kwargs, FoodManager)
  4007. def test_alter_model_managers_emptying(self):
  4008. """
  4009. The managers on a model are set.
  4010. """
  4011. project_state = self.set_up_test_model("test_almomae", manager_model=True)
  4012. # Test the state alteration
  4013. operation = migrations.AlterModelManagers("Food", managers=[])
  4014. self.assertEqual(operation.describe(), "Change managers on Food")
  4015. self.assertIn(("test_almomae", "food"), project_state.models)
  4016. managers = project_state.models["test_almomae", "food"].managers
  4017. self.assertEqual(managers[0][0], "food_qs")
  4018. self.assertIsInstance(managers[0][1], models.Manager)
  4019. self.assertEqual(managers[1][0], "food_mgr")
  4020. self.assertIsInstance(managers[1][1], FoodManager)
  4021. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  4022. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  4023. self.assertIsInstance(managers[2][1], FoodManager)
  4024. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  4025. new_state = project_state.clone()
  4026. operation.state_forwards("test_almomae", new_state)
  4027. managers = new_state.models["test_almomae", "food"].managers
  4028. self.assertEqual(managers, [])
  4029. def test_alter_fk(self):
  4030. """
  4031. Creating and then altering an FK works correctly
  4032. and deals with the pending SQL (#23091)
  4033. """
  4034. project_state = self.set_up_test_model("test_alfk")
  4035. # Test adding and then altering the FK in one go
  4036. create_operation = migrations.CreateModel(
  4037. name="Rider",
  4038. fields=[
  4039. ("id", models.AutoField(primary_key=True)),
  4040. ("pony", models.ForeignKey("Pony", models.CASCADE)),
  4041. ],
  4042. )
  4043. create_state = project_state.clone()
  4044. create_operation.state_forwards("test_alfk", create_state)
  4045. alter_operation = migrations.AlterField(
  4046. model_name="Rider",
  4047. name="pony",
  4048. field=models.ForeignKey("Pony", models.CASCADE, editable=False),
  4049. )
  4050. alter_state = create_state.clone()
  4051. alter_operation.state_forwards("test_alfk", alter_state)
  4052. with connection.schema_editor() as editor:
  4053. create_operation.database_forwards(
  4054. "test_alfk", editor, project_state, create_state
  4055. )
  4056. alter_operation.database_forwards(
  4057. "test_alfk", editor, create_state, alter_state
  4058. )
  4059. def test_alter_fk_non_fk(self):
  4060. """
  4061. Altering an FK to a non-FK works (#23244)
  4062. """
  4063. # Test the state alteration
  4064. operation = migrations.AlterField(
  4065. model_name="Rider",
  4066. name="pony",
  4067. field=models.FloatField(),
  4068. )
  4069. project_state, new_state = self.make_test_state(
  4070. "test_afknfk", operation, related_model=True
  4071. )
  4072. # Test the database alteration
  4073. self.assertColumnExists("test_afknfk_rider", "pony_id")
  4074. self.assertColumnNotExists("test_afknfk_rider", "pony")
  4075. with connection.schema_editor() as editor:
  4076. operation.database_forwards("test_afknfk", editor, project_state, new_state)
  4077. self.assertColumnExists("test_afknfk_rider", "pony")
  4078. self.assertColumnNotExists("test_afknfk_rider", "pony_id")
  4079. # And test reversal
  4080. with connection.schema_editor() as editor:
  4081. operation.database_backwards(
  4082. "test_afknfk", editor, new_state, project_state
  4083. )
  4084. self.assertColumnExists("test_afknfk_rider", "pony_id")
  4085. self.assertColumnNotExists("test_afknfk_rider", "pony")
  4086. def test_run_sql(self):
  4087. """
  4088. Tests the RunSQL operation.
  4089. """
  4090. project_state = self.set_up_test_model("test_runsql")
  4091. # Create the operation
  4092. operation = migrations.RunSQL(
  4093. # Use a multi-line string with a comment to test splitting on
  4094. # SQLite and MySQL respectively.
  4095. "CREATE TABLE i_love_ponies (id int, special_thing varchar(15));\n"
  4096. "INSERT INTO i_love_ponies (id, special_thing) "
  4097. "VALUES (1, 'i love ponies'); -- this is magic!\n"
  4098. "INSERT INTO i_love_ponies (id, special_thing) "
  4099. "VALUES (2, 'i love django');\n"
  4100. "UPDATE i_love_ponies SET special_thing = 'Ponies' "
  4101. "WHERE special_thing LIKE '%%ponies';"
  4102. "UPDATE i_love_ponies SET special_thing = 'Django' "
  4103. "WHERE special_thing LIKE '%django';",
  4104. # Run delete queries to test for parameter substitution failure
  4105. # reported in #23426
  4106. "DELETE FROM i_love_ponies WHERE special_thing LIKE '%Django%';"
  4107. "DELETE FROM i_love_ponies WHERE special_thing LIKE '%%Ponies%%';"
  4108. "DROP TABLE i_love_ponies",
  4109. state_operations=[
  4110. migrations.CreateModel(
  4111. "SomethingElse", [("id", models.AutoField(primary_key=True))]
  4112. )
  4113. ],
  4114. )
  4115. self.assertEqual(operation.describe(), "Raw SQL operation")
  4116. # Test the state alteration
  4117. new_state = project_state.clone()
  4118. operation.state_forwards("test_runsql", new_state)
  4119. self.assertEqual(
  4120. len(new_state.models["test_runsql", "somethingelse"].fields), 1
  4121. )
  4122. # Make sure there's no table
  4123. self.assertTableNotExists("i_love_ponies")
  4124. # Test SQL collection
  4125. with connection.schema_editor(collect_sql=True) as editor:
  4126. operation.database_forwards("test_runsql", editor, project_state, new_state)
  4127. self.assertIn("LIKE '%%ponies';", "\n".join(editor.collected_sql))
  4128. operation.database_backwards(
  4129. "test_runsql", editor, project_state, new_state
  4130. )
  4131. self.assertIn("LIKE '%%Ponies%%';", "\n".join(editor.collected_sql))
  4132. # Test the database alteration
  4133. with connection.schema_editor() as editor:
  4134. operation.database_forwards("test_runsql", editor, project_state, new_state)
  4135. self.assertTableExists("i_love_ponies")
  4136. # Make sure all the SQL was processed
  4137. with connection.cursor() as cursor:
  4138. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  4139. self.assertEqual(cursor.fetchall()[0][0], 2)
  4140. cursor.execute(
  4141. "SELECT COUNT(*) FROM i_love_ponies WHERE special_thing = 'Django'"
  4142. )
  4143. self.assertEqual(cursor.fetchall()[0][0], 1)
  4144. cursor.execute(
  4145. "SELECT COUNT(*) FROM i_love_ponies WHERE special_thing = 'Ponies'"
  4146. )
  4147. self.assertEqual(cursor.fetchall()[0][0], 1)
  4148. # And test reversal
  4149. self.assertTrue(operation.reversible)
  4150. with connection.schema_editor() as editor:
  4151. operation.database_backwards(
  4152. "test_runsql", editor, new_state, project_state
  4153. )
  4154. self.assertTableNotExists("i_love_ponies")
  4155. # And deconstruction
  4156. definition = operation.deconstruct()
  4157. self.assertEqual(definition[0], "RunSQL")
  4158. self.assertEqual(definition[1], [])
  4159. self.assertEqual(
  4160. sorted(definition[2]), ["reverse_sql", "sql", "state_operations"]
  4161. )
  4162. # And elidable reduction
  4163. self.assertIs(False, operation.reduce(operation, []))
  4164. elidable_operation = migrations.RunSQL("SELECT 1 FROM void;", elidable=True)
  4165. self.assertEqual(elidable_operation.reduce(operation, []), [operation])
  4166. def test_run_sql_params(self):
  4167. """
  4168. #23426 - RunSQL should accept parameters.
  4169. """
  4170. project_state = self.set_up_test_model("test_runsql")
  4171. # Create the operation
  4172. operation = migrations.RunSQL(
  4173. ["CREATE TABLE i_love_ponies (id int, special_thing varchar(15));"],
  4174. ["DROP TABLE i_love_ponies"],
  4175. )
  4176. param_operation = migrations.RunSQL(
  4177. # forwards
  4178. (
  4179. "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 'Django');",
  4180. [
  4181. "INSERT INTO i_love_ponies (id, special_thing) VALUES (2, %s);",
  4182. ["Ponies"],
  4183. ],
  4184. (
  4185. "INSERT INTO i_love_ponies (id, special_thing) VALUES (%s, %s);",
  4186. (
  4187. 3,
  4188. "Python",
  4189. ),
  4190. ),
  4191. ),
  4192. # backwards
  4193. [
  4194. "DELETE FROM i_love_ponies WHERE special_thing = 'Django';",
  4195. ["DELETE FROM i_love_ponies WHERE special_thing = 'Ponies';", None],
  4196. (
  4197. "DELETE FROM i_love_ponies WHERE id = %s OR special_thing = %s;",
  4198. [3, "Python"],
  4199. ),
  4200. ],
  4201. )
  4202. # Make sure there's no table
  4203. self.assertTableNotExists("i_love_ponies")
  4204. new_state = project_state.clone()
  4205. # Test the database alteration
  4206. with connection.schema_editor() as editor:
  4207. operation.database_forwards("test_runsql", editor, project_state, new_state)
  4208. # Test parameter passing
  4209. with connection.schema_editor() as editor:
  4210. param_operation.database_forwards(
  4211. "test_runsql", editor, project_state, new_state
  4212. )
  4213. # Make sure all the SQL was processed
  4214. with connection.cursor() as cursor:
  4215. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  4216. self.assertEqual(cursor.fetchall()[0][0], 3)
  4217. with connection.schema_editor() as editor:
  4218. param_operation.database_backwards(
  4219. "test_runsql", editor, new_state, project_state
  4220. )
  4221. with connection.cursor() as cursor:
  4222. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  4223. self.assertEqual(cursor.fetchall()[0][0], 0)
  4224. # And test reversal
  4225. with connection.schema_editor() as editor:
  4226. operation.database_backwards(
  4227. "test_runsql", editor, new_state, project_state
  4228. )
  4229. self.assertTableNotExists("i_love_ponies")
  4230. def test_run_sql_params_invalid(self):
  4231. """
  4232. #23426 - RunSQL should fail when a list of statements with an incorrect
  4233. number of tuples is given.
  4234. """
  4235. project_state = self.set_up_test_model("test_runsql")
  4236. new_state = project_state.clone()
  4237. operation = migrations.RunSQL(
  4238. # forwards
  4239. [["INSERT INTO foo (bar) VALUES ('buz');"]],
  4240. # backwards
  4241. (("DELETE FROM foo WHERE bar = 'buz';", "invalid", "parameter count"),),
  4242. )
  4243. with connection.schema_editor() as editor:
  4244. with self.assertRaisesMessage(ValueError, "Expected a 2-tuple but got 1"):
  4245. operation.database_forwards(
  4246. "test_runsql", editor, project_state, new_state
  4247. )
  4248. with connection.schema_editor() as editor:
  4249. with self.assertRaisesMessage(ValueError, "Expected a 2-tuple but got 3"):
  4250. operation.database_backwards(
  4251. "test_runsql", editor, new_state, project_state
  4252. )
  4253. def test_run_sql_noop(self):
  4254. """
  4255. #24098 - Tests no-op RunSQL operations.
  4256. """
  4257. operation = migrations.RunSQL(migrations.RunSQL.noop, migrations.RunSQL.noop)
  4258. with connection.schema_editor() as editor:
  4259. operation.database_forwards("test_runsql", editor, None, None)
  4260. operation.database_backwards("test_runsql", editor, None, None)
  4261. def test_run_sql_add_missing_semicolon_on_collect_sql(self):
  4262. project_state = self.set_up_test_model("test_runsql")
  4263. new_state = project_state.clone()
  4264. tests = [
  4265. "INSERT INTO test_runsql_pony (pink, weight) VALUES (1, 1);\n",
  4266. "INSERT INTO test_runsql_pony (pink, weight) VALUES (1, 1)\n",
  4267. ]
  4268. for sql in tests:
  4269. with self.subTest(sql=sql):
  4270. operation = migrations.RunSQL(sql, migrations.RunPython.noop)
  4271. with connection.schema_editor(collect_sql=True) as editor:
  4272. operation.database_forwards(
  4273. "test_runsql", editor, project_state, new_state
  4274. )
  4275. collected_sql = "\n".join(editor.collected_sql)
  4276. self.assertEqual(collected_sql.count(";"), 1)
  4277. def test_run_python(self):
  4278. """
  4279. Tests the RunPython operation
  4280. """
  4281. project_state = self.set_up_test_model("test_runpython", mti_model=True)
  4282. # Create the operation
  4283. def inner_method(models, schema_editor):
  4284. Pony = models.get_model("test_runpython", "Pony")
  4285. Pony.objects.create(pink=1, weight=3.55)
  4286. Pony.objects.create(weight=5)
  4287. def inner_method_reverse(models, schema_editor):
  4288. Pony = models.get_model("test_runpython", "Pony")
  4289. Pony.objects.filter(pink=1, weight=3.55).delete()
  4290. Pony.objects.filter(weight=5).delete()
  4291. operation = migrations.RunPython(
  4292. inner_method, reverse_code=inner_method_reverse
  4293. )
  4294. self.assertEqual(operation.describe(), "Raw Python operation")
  4295. # Test the state alteration does nothing
  4296. new_state = project_state.clone()
  4297. operation.state_forwards("test_runpython", new_state)
  4298. self.assertEqual(new_state, project_state)
  4299. # Test the database alteration
  4300. self.assertEqual(
  4301. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 0
  4302. )
  4303. with connection.schema_editor() as editor:
  4304. operation.database_forwards(
  4305. "test_runpython", editor, project_state, new_state
  4306. )
  4307. self.assertEqual(
  4308. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 2
  4309. )
  4310. # Now test reversal
  4311. self.assertTrue(operation.reversible)
  4312. with connection.schema_editor() as editor:
  4313. operation.database_backwards(
  4314. "test_runpython", editor, project_state, new_state
  4315. )
  4316. self.assertEqual(
  4317. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 0
  4318. )
  4319. # Now test we can't use a string
  4320. with self.assertRaisesMessage(
  4321. ValueError, "RunPython must be supplied with a callable"
  4322. ):
  4323. migrations.RunPython("print 'ahahaha'")
  4324. # And deconstruction
  4325. definition = operation.deconstruct()
  4326. self.assertEqual(definition[0], "RunPython")
  4327. self.assertEqual(definition[1], [])
  4328. self.assertEqual(sorted(definition[2]), ["code", "reverse_code"])
  4329. # Also test reversal fails, with an operation identical to above but
  4330. # without reverse_code set.
  4331. no_reverse_operation = migrations.RunPython(inner_method)
  4332. self.assertFalse(no_reverse_operation.reversible)
  4333. with connection.schema_editor() as editor:
  4334. no_reverse_operation.database_forwards(
  4335. "test_runpython", editor, project_state, new_state
  4336. )
  4337. with self.assertRaises(NotImplementedError):
  4338. no_reverse_operation.database_backwards(
  4339. "test_runpython", editor, new_state, project_state
  4340. )
  4341. self.assertEqual(
  4342. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 2
  4343. )
  4344. def create_ponies(models, schema_editor):
  4345. Pony = models.get_model("test_runpython", "Pony")
  4346. pony1 = Pony.objects.create(pink=1, weight=3.55)
  4347. self.assertIsNot(pony1.pk, None)
  4348. pony2 = Pony.objects.create(weight=5)
  4349. self.assertIsNot(pony2.pk, None)
  4350. self.assertNotEqual(pony1.pk, pony2.pk)
  4351. operation = migrations.RunPython(create_ponies)
  4352. with connection.schema_editor() as editor:
  4353. operation.database_forwards(
  4354. "test_runpython", editor, project_state, new_state
  4355. )
  4356. self.assertEqual(
  4357. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 4
  4358. )
  4359. # And deconstruction
  4360. definition = operation.deconstruct()
  4361. self.assertEqual(definition[0], "RunPython")
  4362. self.assertEqual(definition[1], [])
  4363. self.assertEqual(sorted(definition[2]), ["code"])
  4364. def create_shetlandponies(models, schema_editor):
  4365. ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
  4366. pony1 = ShetlandPony.objects.create(weight=4.0)
  4367. self.assertIsNot(pony1.pk, None)
  4368. pony2 = ShetlandPony.objects.create(weight=5.0)
  4369. self.assertIsNot(pony2.pk, None)
  4370. self.assertNotEqual(pony1.pk, pony2.pk)
  4371. operation = migrations.RunPython(create_shetlandponies)
  4372. with connection.schema_editor() as editor:
  4373. operation.database_forwards(
  4374. "test_runpython", editor, project_state, new_state
  4375. )
  4376. self.assertEqual(
  4377. project_state.apps.get_model("test_runpython", "Pony").objects.count(), 6
  4378. )
  4379. self.assertEqual(
  4380. project_state.apps.get_model(
  4381. "test_runpython", "ShetlandPony"
  4382. ).objects.count(),
  4383. 2,
  4384. )
  4385. # And elidable reduction
  4386. self.assertIs(False, operation.reduce(operation, []))
  4387. elidable_operation = migrations.RunPython(inner_method, elidable=True)
  4388. self.assertEqual(elidable_operation.reduce(operation, []), [operation])
  4389. def test_run_python_atomic(self):
  4390. """
  4391. Tests the RunPython operation correctly handles the "atomic" keyword
  4392. """
  4393. project_state = self.set_up_test_model("test_runpythonatomic", mti_model=True)
  4394. def inner_method(models, schema_editor):
  4395. Pony = models.get_model("test_runpythonatomic", "Pony")
  4396. Pony.objects.create(pink=1, weight=3.55)
  4397. raise ValueError("Adrian hates ponies.")
  4398. # Verify atomicity when applying.
  4399. atomic_migration = Migration("test", "test_runpythonatomic")
  4400. atomic_migration.operations = [
  4401. migrations.RunPython(inner_method, reverse_code=inner_method)
  4402. ]
  4403. non_atomic_migration = Migration("test", "test_runpythonatomic")
  4404. non_atomic_migration.operations = [
  4405. migrations.RunPython(inner_method, reverse_code=inner_method, atomic=False)
  4406. ]
  4407. # If we're a fully-transactional database, both versions should rollback
  4408. if connection.features.can_rollback_ddl:
  4409. self.assertEqual(
  4410. project_state.apps.get_model(
  4411. "test_runpythonatomic", "Pony"
  4412. ).objects.count(),
  4413. 0,
  4414. )
  4415. with self.assertRaises(ValueError):
  4416. with connection.schema_editor() as editor:
  4417. atomic_migration.apply(project_state, editor)
  4418. self.assertEqual(
  4419. project_state.apps.get_model(
  4420. "test_runpythonatomic", "Pony"
  4421. ).objects.count(),
  4422. 0,
  4423. )
  4424. with self.assertRaises(ValueError):
  4425. with connection.schema_editor() as editor:
  4426. non_atomic_migration.apply(project_state, editor)
  4427. self.assertEqual(
  4428. project_state.apps.get_model(
  4429. "test_runpythonatomic", "Pony"
  4430. ).objects.count(),
  4431. 0,
  4432. )
  4433. # Otherwise, the non-atomic operation should leave a row there
  4434. else:
  4435. self.assertEqual(
  4436. project_state.apps.get_model(
  4437. "test_runpythonatomic", "Pony"
  4438. ).objects.count(),
  4439. 0,
  4440. )
  4441. with self.assertRaises(ValueError):
  4442. with connection.schema_editor() as editor:
  4443. atomic_migration.apply(project_state, editor)
  4444. self.assertEqual(
  4445. project_state.apps.get_model(
  4446. "test_runpythonatomic", "Pony"
  4447. ).objects.count(),
  4448. 0,
  4449. )
  4450. with self.assertRaises(ValueError):
  4451. with connection.schema_editor() as editor:
  4452. non_atomic_migration.apply(project_state, editor)
  4453. self.assertEqual(
  4454. project_state.apps.get_model(
  4455. "test_runpythonatomic", "Pony"
  4456. ).objects.count(),
  4457. 1,
  4458. )
  4459. # Reset object count to zero and verify atomicity when unapplying.
  4460. project_state.apps.get_model(
  4461. "test_runpythonatomic", "Pony"
  4462. ).objects.all().delete()
  4463. # On a fully-transactional database, both versions rollback.
  4464. if connection.features.can_rollback_ddl:
  4465. self.assertEqual(
  4466. project_state.apps.get_model(
  4467. "test_runpythonatomic", "Pony"
  4468. ).objects.count(),
  4469. 0,
  4470. )
  4471. with self.assertRaises(ValueError):
  4472. with connection.schema_editor() as editor:
  4473. atomic_migration.unapply(project_state, editor)
  4474. self.assertEqual(
  4475. project_state.apps.get_model(
  4476. "test_runpythonatomic", "Pony"
  4477. ).objects.count(),
  4478. 0,
  4479. )
  4480. with self.assertRaises(ValueError):
  4481. with connection.schema_editor() as editor:
  4482. non_atomic_migration.unapply(project_state, editor)
  4483. self.assertEqual(
  4484. project_state.apps.get_model(
  4485. "test_runpythonatomic", "Pony"
  4486. ).objects.count(),
  4487. 0,
  4488. )
  4489. # Otherwise, the non-atomic operation leaves a row there.
  4490. else:
  4491. self.assertEqual(
  4492. project_state.apps.get_model(
  4493. "test_runpythonatomic", "Pony"
  4494. ).objects.count(),
  4495. 0,
  4496. )
  4497. with self.assertRaises(ValueError):
  4498. with connection.schema_editor() as editor:
  4499. atomic_migration.unapply(project_state, editor)
  4500. self.assertEqual(
  4501. project_state.apps.get_model(
  4502. "test_runpythonatomic", "Pony"
  4503. ).objects.count(),
  4504. 0,
  4505. )
  4506. with self.assertRaises(ValueError):
  4507. with connection.schema_editor() as editor:
  4508. non_atomic_migration.unapply(project_state, editor)
  4509. self.assertEqual(
  4510. project_state.apps.get_model(
  4511. "test_runpythonatomic", "Pony"
  4512. ).objects.count(),
  4513. 1,
  4514. )
  4515. # Verify deconstruction.
  4516. definition = non_atomic_migration.operations[0].deconstruct()
  4517. self.assertEqual(definition[0], "RunPython")
  4518. self.assertEqual(definition[1], [])
  4519. self.assertEqual(sorted(definition[2]), ["atomic", "code", "reverse_code"])
  4520. def test_run_python_related_assignment(self):
  4521. """
  4522. #24282 - Model changes to a FK reverse side update the model
  4523. on the FK side as well.
  4524. """
  4525. def inner_method(models, schema_editor):
  4526. Author = models.get_model("test_authors", "Author")
  4527. Book = models.get_model("test_books", "Book")
  4528. author = Author.objects.create(name="Hemingway")
  4529. Book.objects.create(title="Old Man and The Sea", author=author)
  4530. create_author = migrations.CreateModel(
  4531. "Author",
  4532. [
  4533. ("id", models.AutoField(primary_key=True)),
  4534. ("name", models.CharField(max_length=100)),
  4535. ],
  4536. options={},
  4537. )
  4538. create_book = migrations.CreateModel(
  4539. "Book",
  4540. [
  4541. ("id", models.AutoField(primary_key=True)),
  4542. ("title", models.CharField(max_length=100)),
  4543. ("author", models.ForeignKey("test_authors.Author", models.CASCADE)),
  4544. ],
  4545. options={},
  4546. )
  4547. add_hometown = migrations.AddField(
  4548. "Author",
  4549. "hometown",
  4550. models.CharField(max_length=100),
  4551. )
  4552. create_old_man = migrations.RunPython(inner_method, inner_method)
  4553. project_state = ProjectState()
  4554. new_state = project_state.clone()
  4555. with connection.schema_editor() as editor:
  4556. create_author.state_forwards("test_authors", new_state)
  4557. create_author.database_forwards(
  4558. "test_authors", editor, project_state, new_state
  4559. )
  4560. project_state = new_state
  4561. new_state = new_state.clone()
  4562. with connection.schema_editor() as editor:
  4563. create_book.state_forwards("test_books", new_state)
  4564. create_book.database_forwards(
  4565. "test_books", editor, project_state, new_state
  4566. )
  4567. project_state = new_state
  4568. new_state = new_state.clone()
  4569. with connection.schema_editor() as editor:
  4570. add_hometown.state_forwards("test_authors", new_state)
  4571. add_hometown.database_forwards(
  4572. "test_authors", editor, project_state, new_state
  4573. )
  4574. project_state = new_state
  4575. new_state = new_state.clone()
  4576. with connection.schema_editor() as editor:
  4577. create_old_man.state_forwards("test_books", new_state)
  4578. create_old_man.database_forwards(
  4579. "test_books", editor, project_state, new_state
  4580. )
  4581. def test_model_with_bigautofield(self):
  4582. """
  4583. A model with BigAutoField can be created.
  4584. """
  4585. def create_data(models, schema_editor):
  4586. Author = models.get_model("test_author", "Author")
  4587. Book = models.get_model("test_book", "Book")
  4588. author1 = Author.objects.create(name="Hemingway")
  4589. Book.objects.create(title="Old Man and The Sea", author=author1)
  4590. Book.objects.create(id=2**33, title="A farewell to arms", author=author1)
  4591. author2 = Author.objects.create(id=2**33, name="Remarque")
  4592. Book.objects.create(title="All quiet on the western front", author=author2)
  4593. Book.objects.create(title="Arc de Triomphe", author=author2)
  4594. create_author = migrations.CreateModel(
  4595. "Author",
  4596. [
  4597. ("id", models.BigAutoField(primary_key=True)),
  4598. ("name", models.CharField(max_length=100)),
  4599. ],
  4600. options={},
  4601. )
  4602. create_book = migrations.CreateModel(
  4603. "Book",
  4604. [
  4605. ("id", models.BigAutoField(primary_key=True)),
  4606. ("title", models.CharField(max_length=100)),
  4607. (
  4608. "author",
  4609. models.ForeignKey(
  4610. to="test_author.Author", on_delete=models.CASCADE
  4611. ),
  4612. ),
  4613. ],
  4614. options={},
  4615. )
  4616. fill_data = migrations.RunPython(create_data)
  4617. project_state = ProjectState()
  4618. new_state = project_state.clone()
  4619. with connection.schema_editor() as editor:
  4620. create_author.state_forwards("test_author", new_state)
  4621. create_author.database_forwards(
  4622. "test_author", editor, project_state, new_state
  4623. )
  4624. project_state = new_state
  4625. new_state = new_state.clone()
  4626. with connection.schema_editor() as editor:
  4627. create_book.state_forwards("test_book", new_state)
  4628. create_book.database_forwards("test_book", editor, project_state, new_state)
  4629. project_state = new_state
  4630. new_state = new_state.clone()
  4631. with connection.schema_editor() as editor:
  4632. fill_data.state_forwards("fill_data", new_state)
  4633. fill_data.database_forwards("fill_data", editor, project_state, new_state)
  4634. def _test_autofield_foreignfield_growth(
  4635. self, source_field, target_field, target_value
  4636. ):
  4637. """
  4638. A field may be migrated in the following ways:
  4639. - AutoField to BigAutoField
  4640. - SmallAutoField to AutoField
  4641. - SmallAutoField to BigAutoField
  4642. """
  4643. def create_initial_data(models, schema_editor):
  4644. Article = models.get_model("test_article", "Article")
  4645. Blog = models.get_model("test_blog", "Blog")
  4646. blog = Blog.objects.create(name="web development done right")
  4647. Article.objects.create(name="Frameworks", blog=blog)
  4648. Article.objects.create(name="Programming Languages", blog=blog)
  4649. def create_big_data(models, schema_editor):
  4650. Article = models.get_model("test_article", "Article")
  4651. Blog = models.get_model("test_blog", "Blog")
  4652. blog2 = Blog.objects.create(name="Frameworks", id=target_value)
  4653. Article.objects.create(name="Django", blog=blog2)
  4654. Article.objects.create(id=target_value, name="Django2", blog=blog2)
  4655. create_blog = migrations.CreateModel(
  4656. "Blog",
  4657. [
  4658. ("id", source_field(primary_key=True)),
  4659. ("name", models.CharField(max_length=100)),
  4660. ],
  4661. options={},
  4662. )
  4663. create_article = migrations.CreateModel(
  4664. "Article",
  4665. [
  4666. ("id", source_field(primary_key=True)),
  4667. (
  4668. "blog",
  4669. models.ForeignKey(to="test_blog.Blog", on_delete=models.CASCADE),
  4670. ),
  4671. ("name", models.CharField(max_length=100)),
  4672. ("data", models.TextField(default="")),
  4673. ],
  4674. options={},
  4675. )
  4676. fill_initial_data = migrations.RunPython(
  4677. create_initial_data, create_initial_data
  4678. )
  4679. fill_big_data = migrations.RunPython(create_big_data, create_big_data)
  4680. grow_article_id = migrations.AlterField(
  4681. "Article", "id", target_field(primary_key=True)
  4682. )
  4683. grow_blog_id = migrations.AlterField(
  4684. "Blog", "id", target_field(primary_key=True)
  4685. )
  4686. project_state = ProjectState()
  4687. new_state = project_state.clone()
  4688. with connection.schema_editor() as editor:
  4689. create_blog.state_forwards("test_blog", new_state)
  4690. create_blog.database_forwards("test_blog", editor, project_state, new_state)
  4691. project_state = new_state
  4692. new_state = new_state.clone()
  4693. with connection.schema_editor() as editor:
  4694. create_article.state_forwards("test_article", new_state)
  4695. create_article.database_forwards(
  4696. "test_article", editor, project_state, new_state
  4697. )
  4698. project_state = new_state
  4699. new_state = new_state.clone()
  4700. with connection.schema_editor() as editor:
  4701. fill_initial_data.state_forwards("fill_initial_data", new_state)
  4702. fill_initial_data.database_forwards(
  4703. "fill_initial_data", editor, project_state, new_state
  4704. )
  4705. project_state = new_state
  4706. new_state = new_state.clone()
  4707. with connection.schema_editor() as editor:
  4708. grow_article_id.state_forwards("test_article", new_state)
  4709. grow_article_id.database_forwards(
  4710. "test_article", editor, project_state, new_state
  4711. )
  4712. state = new_state.clone()
  4713. article = state.apps.get_model("test_article.Article")
  4714. self.assertIsInstance(article._meta.pk, target_field)
  4715. project_state = new_state
  4716. new_state = new_state.clone()
  4717. with connection.schema_editor() as editor:
  4718. grow_blog_id.state_forwards("test_blog", new_state)
  4719. grow_blog_id.database_forwards(
  4720. "test_blog", editor, project_state, new_state
  4721. )
  4722. state = new_state.clone()
  4723. blog = state.apps.get_model("test_blog.Blog")
  4724. self.assertIsInstance(blog._meta.pk, target_field)
  4725. project_state = new_state
  4726. new_state = new_state.clone()
  4727. with connection.schema_editor() as editor:
  4728. fill_big_data.state_forwards("fill_big_data", new_state)
  4729. fill_big_data.database_forwards(
  4730. "fill_big_data", editor, project_state, new_state
  4731. )
  4732. def test_autofield__bigautofield_foreignfield_growth(self):
  4733. """A field may be migrated from AutoField to BigAutoField."""
  4734. self._test_autofield_foreignfield_growth(
  4735. models.AutoField,
  4736. models.BigAutoField,
  4737. 2**33,
  4738. )
  4739. def test_smallfield_autofield_foreignfield_growth(self):
  4740. """A field may be migrated from SmallAutoField to AutoField."""
  4741. self._test_autofield_foreignfield_growth(
  4742. models.SmallAutoField,
  4743. models.AutoField,
  4744. 2**22,
  4745. )
  4746. def test_smallfield_bigautofield_foreignfield_growth(self):
  4747. """A field may be migrated from SmallAutoField to BigAutoField."""
  4748. self._test_autofield_foreignfield_growth(
  4749. models.SmallAutoField,
  4750. models.BigAutoField,
  4751. 2**33,
  4752. )
  4753. def test_run_python_noop(self):
  4754. """
  4755. #24098 - Tests no-op RunPython operations.
  4756. """
  4757. project_state = ProjectState()
  4758. new_state = project_state.clone()
  4759. operation = migrations.RunPython(
  4760. migrations.RunPython.noop, migrations.RunPython.noop
  4761. )
  4762. with connection.schema_editor() as editor:
  4763. operation.database_forwards(
  4764. "test_runpython", editor, project_state, new_state
  4765. )
  4766. operation.database_backwards(
  4767. "test_runpython", editor, new_state, project_state
  4768. )
  4769. def test_separate_database_and_state(self):
  4770. """
  4771. Tests the SeparateDatabaseAndState operation.
  4772. """
  4773. project_state = self.set_up_test_model("test_separatedatabaseandstate")
  4774. # Create the operation
  4775. database_operation = migrations.RunSQL(
  4776. "CREATE TABLE i_love_ponies (id int, special_thing int);",
  4777. "DROP TABLE i_love_ponies;",
  4778. )
  4779. state_operation = migrations.CreateModel(
  4780. "SomethingElse", [("id", models.AutoField(primary_key=True))]
  4781. )
  4782. operation = migrations.SeparateDatabaseAndState(
  4783. state_operations=[state_operation], database_operations=[database_operation]
  4784. )
  4785. self.assertEqual(
  4786. operation.describe(), "Custom state/database change combination"
  4787. )
  4788. # Test the state alteration
  4789. new_state = project_state.clone()
  4790. operation.state_forwards("test_separatedatabaseandstate", new_state)
  4791. self.assertEqual(
  4792. len(
  4793. new_state.models[
  4794. "test_separatedatabaseandstate", "somethingelse"
  4795. ].fields
  4796. ),
  4797. 1,
  4798. )
  4799. # Make sure there's no table
  4800. self.assertTableNotExists("i_love_ponies")
  4801. # Test the database alteration
  4802. with connection.schema_editor() as editor:
  4803. operation.database_forwards(
  4804. "test_separatedatabaseandstate", editor, project_state, new_state
  4805. )
  4806. self.assertTableExists("i_love_ponies")
  4807. # And test reversal
  4808. self.assertTrue(operation.reversible)
  4809. with connection.schema_editor() as editor:
  4810. operation.database_backwards(
  4811. "test_separatedatabaseandstate", editor, new_state, project_state
  4812. )
  4813. self.assertTableNotExists("i_love_ponies")
  4814. # And deconstruction
  4815. definition = operation.deconstruct()
  4816. self.assertEqual(definition[0], "SeparateDatabaseAndState")
  4817. self.assertEqual(definition[1], [])
  4818. self.assertEqual(
  4819. sorted(definition[2]), ["database_operations", "state_operations"]
  4820. )
  4821. def test_separate_database_and_state2(self):
  4822. """
  4823. A complex SeparateDatabaseAndState operation: Multiple operations both
  4824. for state and database. Verify the state dependencies within each list
  4825. and that state ops don't affect the database.
  4826. """
  4827. app_label = "test_separatedatabaseandstate2"
  4828. project_state = self.set_up_test_model(app_label)
  4829. # Create the operation
  4830. database_operations = [
  4831. migrations.CreateModel(
  4832. "ILovePonies",
  4833. [("id", models.AutoField(primary_key=True))],
  4834. options={"db_table": "iloveponies"},
  4835. ),
  4836. migrations.CreateModel(
  4837. "ILoveMorePonies",
  4838. # We use IntegerField and not AutoField because
  4839. # the model is going to be deleted immediately
  4840. # and with an AutoField this fails on Oracle
  4841. [("id", models.IntegerField(primary_key=True))],
  4842. options={"db_table": "ilovemoreponies"},
  4843. ),
  4844. migrations.DeleteModel("ILoveMorePonies"),
  4845. migrations.CreateModel(
  4846. "ILoveEvenMorePonies",
  4847. [("id", models.AutoField(primary_key=True))],
  4848. options={"db_table": "iloveevenmoreponies"},
  4849. ),
  4850. ]
  4851. state_operations = [
  4852. migrations.CreateModel(
  4853. "SomethingElse",
  4854. [("id", models.AutoField(primary_key=True))],
  4855. options={"db_table": "somethingelse"},
  4856. ),
  4857. migrations.DeleteModel("SomethingElse"),
  4858. migrations.CreateModel(
  4859. "SomethingCompletelyDifferent",
  4860. [("id", models.AutoField(primary_key=True))],
  4861. options={"db_table": "somethingcompletelydifferent"},
  4862. ),
  4863. ]
  4864. operation = migrations.SeparateDatabaseAndState(
  4865. state_operations=state_operations,
  4866. database_operations=database_operations,
  4867. )
  4868. # Test the state alteration
  4869. new_state = project_state.clone()
  4870. operation.state_forwards(app_label, new_state)
  4871. def assertModelsAndTables(after_db):
  4872. # Tables and models exist, or don't, as they should:
  4873. self.assertNotIn((app_label, "somethingelse"), new_state.models)
  4874. self.assertEqual(
  4875. len(new_state.models[app_label, "somethingcompletelydifferent"].fields),
  4876. 1,
  4877. )
  4878. self.assertNotIn((app_label, "iloveponiesonies"), new_state.models)
  4879. self.assertNotIn((app_label, "ilovemoreponies"), new_state.models)
  4880. self.assertNotIn((app_label, "iloveevenmoreponies"), new_state.models)
  4881. self.assertTableNotExists("somethingelse")
  4882. self.assertTableNotExists("somethingcompletelydifferent")
  4883. self.assertTableNotExists("ilovemoreponies")
  4884. if after_db:
  4885. self.assertTableExists("iloveponies")
  4886. self.assertTableExists("iloveevenmoreponies")
  4887. else:
  4888. self.assertTableNotExists("iloveponies")
  4889. self.assertTableNotExists("iloveevenmoreponies")
  4890. assertModelsAndTables(after_db=False)
  4891. # Test the database alteration
  4892. with connection.schema_editor() as editor:
  4893. operation.database_forwards(app_label, editor, project_state, new_state)
  4894. assertModelsAndTables(after_db=True)
  4895. # And test reversal
  4896. self.assertTrue(operation.reversible)
  4897. with connection.schema_editor() as editor:
  4898. operation.database_backwards(app_label, editor, new_state, project_state)
  4899. assertModelsAndTables(after_db=False)
  4900. class SwappableOperationTests(OperationTestBase):
  4901. """
  4902. Key operations ignore swappable models
  4903. (we don't want to replicate all of them here, as the functionality
  4904. is in a common base class anyway)
  4905. """
  4906. available_apps = ["migrations"]
  4907. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  4908. def test_create_ignore_swapped(self):
  4909. """
  4910. The CreateTable operation ignores swapped models.
  4911. """
  4912. operation = migrations.CreateModel(
  4913. "Pony",
  4914. [
  4915. ("id", models.AutoField(primary_key=True)),
  4916. ("pink", models.IntegerField(default=1)),
  4917. ],
  4918. options={
  4919. "swappable": "TEST_SWAP_MODEL",
  4920. },
  4921. )
  4922. # Test the state alteration (it should still be there!)
  4923. project_state = ProjectState()
  4924. new_state = project_state.clone()
  4925. operation.state_forwards("test_crigsw", new_state)
  4926. self.assertEqual(new_state.models["test_crigsw", "pony"].name, "Pony")
  4927. self.assertEqual(len(new_state.models["test_crigsw", "pony"].fields), 2)
  4928. # Test the database alteration
  4929. self.assertTableNotExists("test_crigsw_pony")
  4930. with connection.schema_editor() as editor:
  4931. operation.database_forwards("test_crigsw", editor, project_state, new_state)
  4932. self.assertTableNotExists("test_crigsw_pony")
  4933. # And test reversal
  4934. with connection.schema_editor() as editor:
  4935. operation.database_backwards(
  4936. "test_crigsw", editor, new_state, project_state
  4937. )
  4938. self.assertTableNotExists("test_crigsw_pony")
  4939. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  4940. def test_delete_ignore_swapped(self):
  4941. """
  4942. Tests the DeleteModel operation ignores swapped models.
  4943. """
  4944. operation = migrations.DeleteModel("Pony")
  4945. project_state, new_state = self.make_test_state("test_dligsw", operation)
  4946. # Test the database alteration
  4947. self.assertTableNotExists("test_dligsw_pony")
  4948. with connection.schema_editor() as editor:
  4949. operation.database_forwards("test_dligsw", editor, project_state, new_state)
  4950. self.assertTableNotExists("test_dligsw_pony")
  4951. # And test reversal
  4952. with connection.schema_editor() as editor:
  4953. operation.database_backwards(
  4954. "test_dligsw", editor, new_state, project_state
  4955. )
  4956. self.assertTableNotExists("test_dligsw_pony")
  4957. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  4958. def test_add_field_ignore_swapped(self):
  4959. """
  4960. Tests the AddField operation.
  4961. """
  4962. # Test the state alteration
  4963. operation = migrations.AddField(
  4964. "Pony",
  4965. "height",
  4966. models.FloatField(null=True, default=5),
  4967. )
  4968. project_state, new_state = self.make_test_state("test_adfligsw", operation)
  4969. # Test the database alteration
  4970. self.assertTableNotExists("test_adfligsw_pony")
  4971. with connection.schema_editor() as editor:
  4972. operation.database_forwards(
  4973. "test_adfligsw", editor, project_state, new_state
  4974. )
  4975. self.assertTableNotExists("test_adfligsw_pony")
  4976. # And test reversal
  4977. with connection.schema_editor() as editor:
  4978. operation.database_backwards(
  4979. "test_adfligsw", editor, new_state, project_state
  4980. )
  4981. self.assertTableNotExists("test_adfligsw_pony")
  4982. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  4983. def test_indexes_ignore_swapped(self):
  4984. """
  4985. Add/RemoveIndex operations ignore swapped models.
  4986. """
  4987. operation = migrations.AddIndex(
  4988. "Pony", models.Index(fields=["pink"], name="my_name_idx")
  4989. )
  4990. project_state, new_state = self.make_test_state("test_adinigsw", operation)
  4991. with connection.schema_editor() as editor:
  4992. # No database queries should be run for swapped models
  4993. operation.database_forwards(
  4994. "test_adinigsw", editor, project_state, new_state
  4995. )
  4996. operation.database_backwards(
  4997. "test_adinigsw", editor, new_state, project_state
  4998. )
  4999. operation = migrations.RemoveIndex(
  5000. "Pony", models.Index(fields=["pink"], name="my_name_idx")
  5001. )
  5002. project_state, new_state = self.make_test_state("test_rminigsw", operation)
  5003. with connection.schema_editor() as editor:
  5004. operation.database_forwards(
  5005. "test_rminigsw", editor, project_state, new_state
  5006. )
  5007. operation.database_backwards(
  5008. "test_rminigsw", editor, new_state, project_state
  5009. )
  5010. class TestCreateModel(SimpleTestCase):
  5011. def test_references_model_mixin(self):
  5012. migrations.CreateModel(
  5013. "name",
  5014. fields=[],
  5015. bases=(Mixin, models.Model),
  5016. ).references_model("other_model", "migrations")
  5017. class FieldOperationTests(SimpleTestCase):
  5018. def test_references_model(self):
  5019. operation = FieldOperation(
  5020. "MoDel", "field", models.ForeignKey("Other", models.CASCADE)
  5021. )
  5022. # Model name match.
  5023. self.assertIs(operation.references_model("mOdEl", "migrations"), True)
  5024. # Referenced field.
  5025. self.assertIs(operation.references_model("oTher", "migrations"), True)
  5026. # Doesn't reference.
  5027. self.assertIs(operation.references_model("Whatever", "migrations"), False)
  5028. def test_references_field_by_name(self):
  5029. operation = FieldOperation("MoDel", "field", models.BooleanField(default=False))
  5030. self.assertIs(operation.references_field("model", "field", "migrations"), True)
  5031. def test_references_field_by_remote_field_model(self):
  5032. operation = FieldOperation(
  5033. "Model", "field", models.ForeignKey("Other", models.CASCADE)
  5034. )
  5035. self.assertIs(
  5036. operation.references_field("Other", "whatever", "migrations"), True
  5037. )
  5038. self.assertIs(
  5039. operation.references_field("Missing", "whatever", "migrations"), False
  5040. )
  5041. def test_references_field_by_from_fields(self):
  5042. operation = FieldOperation(
  5043. "Model",
  5044. "field",
  5045. models.fields.related.ForeignObject(
  5046. "Other", models.CASCADE, ["from"], ["to"]
  5047. ),
  5048. )
  5049. self.assertIs(operation.references_field("Model", "from", "migrations"), True)
  5050. self.assertIs(operation.references_field("Model", "to", "migrations"), False)
  5051. self.assertIs(operation.references_field("Other", "from", "migrations"), False)
  5052. self.assertIs(operation.references_field("Model", "to", "migrations"), False)
  5053. def test_references_field_by_to_fields(self):
  5054. operation = FieldOperation(
  5055. "Model",
  5056. "field",
  5057. models.ForeignKey("Other", models.CASCADE, to_field="field"),
  5058. )
  5059. self.assertIs(operation.references_field("Other", "field", "migrations"), True)
  5060. self.assertIs(
  5061. operation.references_field("Other", "whatever", "migrations"), False
  5062. )
  5063. self.assertIs(
  5064. operation.references_field("Missing", "whatever", "migrations"), False
  5065. )
  5066. def test_references_field_by_through(self):
  5067. operation = FieldOperation(
  5068. "Model", "field", models.ManyToManyField("Other", through="Through")
  5069. )
  5070. self.assertIs(
  5071. operation.references_field("Other", "whatever", "migrations"), True
  5072. )
  5073. self.assertIs(
  5074. operation.references_field("Through", "whatever", "migrations"), True
  5075. )
  5076. self.assertIs(
  5077. operation.references_field("Missing", "whatever", "migrations"), False
  5078. )
  5079. def test_reference_field_by_through_fields(self):
  5080. operation = FieldOperation(
  5081. "Model",
  5082. "field",
  5083. models.ManyToManyField(
  5084. "Other", through="Through", through_fields=("first", "second")
  5085. ),
  5086. )
  5087. self.assertIs(
  5088. operation.references_field("Other", "whatever", "migrations"), True
  5089. )
  5090. self.assertIs(
  5091. operation.references_field("Through", "whatever", "migrations"), False
  5092. )
  5093. self.assertIs(
  5094. operation.references_field("Through", "first", "migrations"), True
  5095. )
  5096. self.assertIs(
  5097. operation.references_field("Through", "second", "migrations"), True
  5098. )