test_operations.py 107 KB


  1. from __future__ import unicode_literals
  2. import unittest
  3. from django.db import connection, migrations, models, transaction
  4. from django.db.migrations.migration import Migration
  5. from django.db.migrations.state import ProjectState
  6. from django.db.models.fields import NOT_PROVIDED
  7. from django.db.transaction import atomic
  8. from django.db.utils import IntegrityError
  9. from django.test import override_settings, skipUnlessDBFeature
  10. from .models import FoodManager, FoodQuerySet, UnicodeModel
  11. from .test_base import MigrationTestBase
  12. try:
  13. import sqlparse
  14. except ImportError:
  15. sqlparse = None
  16. class Mixin(object):
  17. pass
  18. class OperationTestBase(MigrationTestBase):
  19. """
  20. Common functions to help test operations.
  21. """
  22. def apply_operations(self, app_label, project_state, operations):
  23. migration = Migration('name', app_label)
  24. migration.operations = operations
  25. with connection.schema_editor() as editor:
  26. return migration.apply(project_state, editor)
  27. def unapply_operations(self, app_label, project_state, operations):
  28. migration = Migration('name', app_label)
  29. migration.operations = operations
  30. with connection.schema_editor() as editor:
  31. return migration.unapply(project_state, editor)
  32. def make_test_state(self, app_label, operation, **kwargs):
  33. """
  34. Makes a test state using set_up_test_model and returns the
  35. original state and the state after the migration is applied.
  36. """
  37. project_state = self.set_up_test_model(app_label, **kwargs)
  38. new_state = project_state.clone()
  39. operation.state_forwards(app_label, new_state)
  40. return project_state, new_state
  41. def set_up_test_model(
  42. self, app_label, second_model=False, third_model=False,
  43. related_model=False, mti_model=False, proxy_model=False, manager_model=False,
  44. unique_together=False, options=False, db_table=None, index_together=False):
  45. """
  46. Creates a test model state and database table.
  47. """
  48. # Delete the tables if they already exist
  49. table_names = [
  50. # Start with ManyToMany tables
  51. '_pony_stables', '_pony_vans',
  52. # Then standard model tables
  53. '_pony', '_stable', '_van',
  54. ]
  55. tables = [(app_label + table_name) for table_name in table_names]
  56. with connection.cursor() as cursor:
  57. table_names = connection.introspection.table_names(cursor)
  58. connection.disable_constraint_checking()
  59. sql_delete_table = connection.schema_editor().sql_delete_table
  60. with transaction.atomic():
  61. for table in tables:
  62. if table in table_names:
  63. cursor.execute(sql_delete_table % {
  64. "table": connection.ops.quote_name(table),
  65. })
  66. connection.enable_constraint_checking()
  67. # Make the "current" state
  68. model_options = {
  69. "swappable": "TEST_SWAP_MODEL",
  70. "index_together": [["weight", "pink"]] if index_together else [],
  71. "unique_together": [["pink", "weight"]] if unique_together else [],
  72. }
  73. if options:
  74. model_options["permissions"] = [("can_groom", "Can groom")]
  75. if db_table:
  76. model_options["db_table"] = db_table
  77. operations = [migrations.CreateModel(
  78. "Pony",
  79. [
  80. ("id", models.AutoField(primary_key=True)),
  81. ("pink", models.IntegerField(default=3)),
  82. ("weight", models.FloatField()),
  83. ],
  84. options=model_options,
  85. )]
  86. if second_model:
  87. operations.append(migrations.CreateModel(
  88. "Stable",
  89. [
  90. ("id", models.AutoField(primary_key=True)),
  91. ]
  92. ))
  93. if third_model:
  94. operations.append(migrations.CreateModel(
  95. "Van",
  96. [
  97. ("id", models.AutoField(primary_key=True)),
  98. ]
  99. ))
  100. if related_model:
  101. operations.append(migrations.CreateModel(
  102. "Rider",
  103. [
  104. ("id", models.AutoField(primary_key=True)),
  105. ("pony", models.ForeignKey("Pony", models.CASCADE)),
  106. ("friend", models.ForeignKey("self", models.CASCADE))
  107. ],
  108. ))
  109. if mti_model:
  110. operations.append(migrations.CreateModel(
  111. "ShetlandPony",
  112. fields=[
  113. ('pony_ptr', models.OneToOneField(
  114. 'Pony',
  115. models.CASCADE,
  116. auto_created=True,
  117. parent_link=True,
  118. primary_key=True,
  119. to_field='id',
  120. serialize=False,
  121. )),
  122. ("cuteness", models.IntegerField(default=1)),
  123. ],
  124. bases=['%s.Pony' % app_label],
  125. ))
  126. if proxy_model:
  127. operations.append(migrations.CreateModel(
  128. "ProxyPony",
  129. fields=[],
  130. options={"proxy": True},
  131. bases=['%s.Pony' % app_label],
  132. ))
  133. if manager_model:
  134. operations.append(migrations.CreateModel(
  135. "Food",
  136. fields=[
  137. ("id", models.AutoField(primary_key=True)),
  138. ],
  139. managers=[
  140. ("food_qs", FoodQuerySet.as_manager()),
  141. ("food_mgr", FoodManager("a", "b")),
  142. ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)),
  143. ]
  144. ))
  145. return self.apply_operations(app_label, ProjectState(), operations)
  146. class OperationTests(OperationTestBase):
  147. """
  148. Tests running the operations and making sure they do what they say they do.
  149. Each test looks at their state changing, and then their database operation -
  150. both forwards and backwards.
  151. """
  152. def test_create_model(self):
  153. """
  154. Tests the CreateModel operation.
  155. Most other tests use this operation as part of setup, so check failures here first.
  156. """
  157. operation = migrations.CreateModel(
  158. "Pony",
  159. [
  160. ("id", models.AutoField(primary_key=True)),
  161. ("pink", models.IntegerField(default=1)),
  162. ],
  163. )
  164. self.assertEqual(operation.describe(), "Create model Pony")
  165. # Test the state alteration
  166. project_state = ProjectState()
  167. new_state = project_state.clone()
  168. operation.state_forwards("test_crmo", new_state)
  169. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  170. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  171. # Test the database alteration
  172. self.assertTableNotExists("test_crmo_pony")
  173. with connection.schema_editor() as editor:
  174. operation.database_forwards("test_crmo", editor, project_state, new_state)
  175. self.assertTableExists("test_crmo_pony")
  176. # And test reversal
  177. with connection.schema_editor() as editor:
  178. operation.database_backwards("test_crmo", editor, new_state, project_state)
  179. self.assertTableNotExists("test_crmo_pony")
  180. # And deconstruction
  181. definition = operation.deconstruct()
  182. self.assertEqual(definition[0], "CreateModel")
  183. self.assertEqual(definition[1], [])
  184. self.assertEqual(sorted(definition[2].keys()), ["fields", "name"])
  185. # And default manager not in set
  186. operation = migrations.CreateModel("Foo", fields=[], managers=[("objects", models.Manager())])
  187. definition = operation.deconstruct()
  188. self.assertNotIn('managers', definition[2])
  189. def test_create_model_with_duplicate_field_name(self):
  190. with self.assertRaisesMessage(ValueError, 'Found duplicate value pink in CreateModel fields argument.'):
  191. migrations.CreateModel(
  192. "Pony",
  193. [
  194. ("id", models.AutoField(primary_key=True)),
  195. ("pink", models.TextField()),
  196. ("pink", models.IntegerField(default=1)),
  197. ],
  198. )
  199. def test_create_model_with_duplicate_base(self):
  200. message = 'Found duplicate value test_crmo.pony in CreateModel bases argument.'
  201. with self.assertRaisesMessage(ValueError, message):
  202. migrations.CreateModel(
  203. "Pony",
  204. fields=[],
  205. bases=("test_crmo.Pony", "test_crmo.Pony",),
  206. )
  207. with self.assertRaisesMessage(ValueError, message):
  208. migrations.CreateModel(
  209. "Pony",
  210. fields=[],
  211. bases=("test_crmo.Pony", "test_crmo.pony",),
  212. )
  213. message = 'Found duplicate value migrations.unicodemodel in CreateModel bases argument.'
  214. with self.assertRaisesMessage(ValueError, message):
  215. migrations.CreateModel(
  216. "Pony",
  217. fields=[],
  218. bases=(UnicodeModel, UnicodeModel,),
  219. )
  220. with self.assertRaisesMessage(ValueError, message):
  221. migrations.CreateModel(
  222. "Pony",
  223. fields=[],
  224. bases=(UnicodeModel, 'migrations.unicodemodel',),
  225. )
  226. with self.assertRaisesMessage(ValueError, message):
  227. migrations.CreateModel(
  228. "Pony",
  229. fields=[],
  230. bases=(UnicodeModel, 'migrations.UnicodeModel',),
  231. )
  232. message = "Found duplicate value <class 'django.db.models.base.Model'> in CreateModel bases argument."
  233. with self.assertRaisesMessage(ValueError, message):
  234. migrations.CreateModel(
  235. "Pony",
  236. fields=[],
  237. bases=(models.Model, models.Model,),
  238. )
  239. message = "Found duplicate value <class 'migrations.test_operations.Mixin'> in CreateModel bases argument."
  240. with self.assertRaisesMessage(ValueError, message):
  241. migrations.CreateModel(
  242. "Pony",
  243. fields=[],
  244. bases=(Mixin, Mixin,),
  245. )
  246. def test_create_model_with_duplicate_manager_name(self):
  247. with self.assertRaisesMessage(ValueError, 'Found duplicate value objects in CreateModel managers argument.'):
  248. migrations.CreateModel(
  249. "Pony",
  250. fields=[],
  251. managers=[
  252. ("objects", models.Manager()),
  253. ("objects", models.Manager()),
  254. ],
  255. )
  256. def test_create_model_with_unique_after(self):
  257. """
  258. Tests the CreateModel operation directly followed by an
  259. AlterUniqueTogether (bug #22844 - sqlite remake issues)
  260. """
  261. operation1 = migrations.CreateModel(
  262. "Pony",
  263. [
  264. ("id", models.AutoField(primary_key=True)),
  265. ("pink", models.IntegerField(default=1)),
  266. ],
  267. )
  268. operation2 = migrations.CreateModel(
  269. "Rider",
  270. [
  271. ("id", models.AutoField(primary_key=True)),
  272. ("number", models.IntegerField(default=1)),
  273. ("pony", models.ForeignKey("test_crmoua.Pony", models.CASCADE)),
  274. ],
  275. )
  276. operation3 = migrations.AlterUniqueTogether(
  277. "Rider",
  278. [
  279. ("number", "pony"),
  280. ],
  281. )
  282. # Test the database alteration
  283. project_state = ProjectState()
  284. self.assertTableNotExists("test_crmoua_pony")
  285. self.assertTableNotExists("test_crmoua_rider")
  286. with connection.schema_editor() as editor:
  287. new_state = project_state.clone()
  288. operation1.state_forwards("test_crmoua", new_state)
  289. operation1.database_forwards("test_crmoua", editor, project_state, new_state)
  290. project_state, new_state = new_state, new_state.clone()
  291. operation2.state_forwards("test_crmoua", new_state)
  292. operation2.database_forwards("test_crmoua", editor, project_state, new_state)
  293. project_state, new_state = new_state, new_state.clone()
  294. operation3.state_forwards("test_crmoua", new_state)
  295. operation3.database_forwards("test_crmoua", editor, project_state, new_state)
  296. self.assertTableExists("test_crmoua_pony")
  297. self.assertTableExists("test_crmoua_rider")
  298. def test_create_model_m2m(self):
  299. """
  300. Test the creation of a model with a ManyToMany field and the
  301. auto-created "through" model.
  302. """
  303. project_state = self.set_up_test_model("test_crmomm")
  304. operation = migrations.CreateModel(
  305. "Stable",
  306. [
  307. ("id", models.AutoField(primary_key=True)),
  308. ("ponies", models.ManyToManyField("Pony", related_name="stables"))
  309. ]
  310. )
  311. # Test the state alteration
  312. new_state = project_state.clone()
  313. operation.state_forwards("test_crmomm", new_state)
  314. # Test the database alteration
  315. self.assertTableNotExists("test_crmomm_stable_ponies")
  316. with connection.schema_editor() as editor:
  317. operation.database_forwards("test_crmomm", editor, project_state, new_state)
  318. self.assertTableExists("test_crmomm_stable")
  319. self.assertTableExists("test_crmomm_stable_ponies")
  320. self.assertColumnNotExists("test_crmomm_stable", "ponies")
  321. # Make sure the M2M field actually works
  322. with atomic():
  323. Pony = new_state.apps.get_model("test_crmomm", "Pony")
  324. Stable = new_state.apps.get_model("test_crmomm", "Stable")
  325. stable = Stable.objects.create()
  326. p1 = Pony.objects.create(pink=False, weight=4.55)
  327. p2 = Pony.objects.create(pink=True, weight=5.43)
  328. stable.ponies.add(p1, p2)
  329. self.assertEqual(stable.ponies.count(), 2)
  330. stable.ponies.all().delete()
  331. # And test reversal
  332. with connection.schema_editor() as editor:
  333. operation.database_backwards("test_crmomm", editor, new_state, project_state)
  334. self.assertTableNotExists("test_crmomm_stable")
  335. self.assertTableNotExists("test_crmomm_stable_ponies")
  336. def test_create_model_inheritance(self):
  337. """
  338. Tests the CreateModel operation on a multi-table inheritance setup.
  339. """
  340. project_state = self.set_up_test_model("test_crmoih")
  341. # Test the state alteration
  342. operation = migrations.CreateModel(
  343. "ShetlandPony",
  344. [
  345. ('pony_ptr', models.OneToOneField(
  346. 'test_crmoih.Pony',
  347. models.CASCADE,
  348. auto_created=True,
  349. primary_key=True,
  350. to_field='id',
  351. serialize=False,
  352. )),
  353. ("cuteness", models.IntegerField(default=1)),
  354. ],
  355. )
  356. new_state = project_state.clone()
  357. operation.state_forwards("test_crmoih", new_state)
  358. self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
  359. # Test the database alteration
  360. self.assertTableNotExists("test_crmoih_shetlandpony")
  361. with connection.schema_editor() as editor:
  362. operation.database_forwards("test_crmoih", editor, project_state, new_state)
  363. self.assertTableExists("test_crmoih_shetlandpony")
  364. # And test reversal
  365. with connection.schema_editor() as editor:
  366. operation.database_backwards("test_crmoih", editor, new_state, project_state)
  367. self.assertTableNotExists("test_crmoih_shetlandpony")
  368. def test_create_proxy_model(self):
  369. """
  370. Tests that CreateModel ignores proxy models.
  371. """
  372. project_state = self.set_up_test_model("test_crprmo")
  373. # Test the state alteration
  374. operation = migrations.CreateModel(
  375. "ProxyPony",
  376. [],
  377. options={"proxy": True},
  378. bases=("test_crprmo.Pony", ),
  379. )
  380. self.assertEqual(operation.describe(), "Create proxy model ProxyPony")
  381. new_state = project_state.clone()
  382. operation.state_forwards("test_crprmo", new_state)
  383. self.assertIn(("test_crprmo", "proxypony"), new_state.models)
  384. # Test the database alteration
  385. self.assertTableNotExists("test_crprmo_proxypony")
  386. self.assertTableExists("test_crprmo_pony")
  387. with connection.schema_editor() as editor:
  388. operation.database_forwards("test_crprmo", editor, project_state, new_state)
  389. self.assertTableNotExists("test_crprmo_proxypony")
  390. self.assertTableExists("test_crprmo_pony")
  391. # And test reversal
  392. with connection.schema_editor() as editor:
  393. operation.database_backwards("test_crprmo", editor, new_state, project_state)
  394. self.assertTableNotExists("test_crprmo_proxypony")
  395. self.assertTableExists("test_crprmo_pony")
  396. # And deconstruction
  397. definition = operation.deconstruct()
  398. self.assertEqual(definition[0], "CreateModel")
  399. self.assertEqual(definition[1], [])
  400. self.assertEqual(sorted(definition[2].keys()), ["bases", "fields", "name", "options"])
  401. def test_create_unmanaged_model(self):
  402. """
  403. Tests that CreateModel ignores unmanaged models.
  404. """
  405. project_state = self.set_up_test_model("test_crummo")
  406. # Test the state alteration
  407. operation = migrations.CreateModel(
  408. "UnmanagedPony",
  409. [],
  410. options={"proxy": True},
  411. bases=("test_crummo.Pony", ),
  412. )
  413. self.assertEqual(operation.describe(), "Create proxy model UnmanagedPony")
  414. new_state = project_state.clone()
  415. operation.state_forwards("test_crummo", new_state)
  416. self.assertIn(("test_crummo", "unmanagedpony"), new_state.models)
  417. # Test the database alteration
  418. self.assertTableNotExists("test_crummo_unmanagedpony")
  419. self.assertTableExists("test_crummo_pony")
  420. with connection.schema_editor() as editor:
  421. operation.database_forwards("test_crummo", editor, project_state, new_state)
  422. self.assertTableNotExists("test_crummo_unmanagedpony")
  423. self.assertTableExists("test_crummo_pony")
  424. # And test reversal
  425. with connection.schema_editor() as editor:
  426. operation.database_backwards("test_crummo", editor, new_state, project_state)
  427. self.assertTableNotExists("test_crummo_unmanagedpony")
  428. self.assertTableExists("test_crummo_pony")
  429. def test_create_model_managers(self):
  430. """
  431. Tests that the managers on a model are set.
  432. """
  433. project_state = self.set_up_test_model("test_cmoma")
  434. # Test the state alteration
  435. operation = migrations.CreateModel(
  436. "Food",
  437. fields=[
  438. ("id", models.AutoField(primary_key=True)),
  439. ],
  440. managers=[
  441. ("food_qs", FoodQuerySet.as_manager()),
  442. ("food_mgr", FoodManager("a", "b")),
  443. ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)),
  444. ]
  445. )
  446. self.assertEqual(operation.describe(), "Create model Food")
  447. new_state = project_state.clone()
  448. operation.state_forwards("test_cmoma", new_state)
  449. self.assertIn(("test_cmoma", "food"), new_state.models)
  450. managers = new_state.models["test_cmoma", "food"].managers
  451. self.assertEqual(managers[0][0], "food_qs")
  452. self.assertIsInstance(managers[0][1], models.Manager)
  453. self.assertEqual(managers[1][0], "food_mgr")
  454. self.assertIsInstance(managers[1][1], FoodManager)
  455. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  456. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  457. self.assertIsInstance(managers[2][1], FoodManager)
  458. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  459. def test_delete_model(self):
  460. """
  461. Tests the DeleteModel operation.
  462. """
  463. project_state = self.set_up_test_model("test_dlmo")
  464. # Test the state alteration
  465. operation = migrations.DeleteModel("Pony")
  466. self.assertEqual(operation.describe(), "Delete model Pony")
  467. new_state = project_state.clone()
  468. operation.state_forwards("test_dlmo", new_state)
  469. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  470. # Test the database alteration
  471. self.assertTableExists("test_dlmo_pony")
  472. with connection.schema_editor() as editor:
  473. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  474. self.assertTableNotExists("test_dlmo_pony")
  475. # And test reversal
  476. with connection.schema_editor() as editor:
  477. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  478. self.assertTableExists("test_dlmo_pony")
  479. # And deconstruction
  480. definition = operation.deconstruct()
  481. self.assertEqual(definition[0], "DeleteModel")
  482. self.assertEqual(definition[1], [])
  483. self.assertEqual(list(definition[2]), ["name"])
  484. def test_delete_proxy_model(self):
  485. """
  486. Tests the DeleteModel operation ignores proxy models.
  487. """
  488. project_state = self.set_up_test_model("test_dlprmo", proxy_model=True)
  489. # Test the state alteration
  490. operation = migrations.DeleteModel("ProxyPony")
  491. new_state = project_state.clone()
  492. operation.state_forwards("test_dlprmo", new_state)
  493. self.assertIn(("test_dlprmo", "proxypony"), project_state.models)
  494. self.assertNotIn(("test_dlprmo", "proxypony"), new_state.models)
  495. # Test the database alteration
  496. self.assertTableExists("test_dlprmo_pony")
  497. self.assertTableNotExists("test_dlprmo_proxypony")
  498. with connection.schema_editor() as editor:
  499. operation.database_forwards("test_dlprmo", editor, project_state, new_state)
  500. self.assertTableExists("test_dlprmo_pony")
  501. self.assertTableNotExists("test_dlprmo_proxypony")
  502. # And test reversal
  503. with connection.schema_editor() as editor:
  504. operation.database_backwards("test_dlprmo", editor, new_state, project_state)
  505. self.assertTableExists("test_dlprmo_pony")
  506. self.assertTableNotExists("test_dlprmo_proxypony")
  507. def test_rename_model(self):
  508. """
  509. Tests the RenameModel operation.
  510. """
  511. project_state = self.set_up_test_model("test_rnmo", related_model=True)
  512. # Test the state alteration
  513. operation = migrations.RenameModel("Pony", "Horse")
  514. self.assertEqual(operation.describe(), "Rename model Pony to Horse")
  515. # Test initial state and database
  516. self.assertIn(("test_rnmo", "pony"), project_state.models)
  517. self.assertNotIn(("test_rnmo", "horse"), project_state.models)
  518. self.assertTableExists("test_rnmo_pony")
  519. self.assertTableNotExists("test_rnmo_horse")
  520. if connection.features.supports_foreign_keys:
  521. self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id"))
  522. self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id"))
  523. # Migrate forwards
  524. new_state = project_state.clone()
  525. new_state = self.apply_operations("test_rnmo", new_state, [operation])
  526. # Test new state and database
  527. self.assertNotIn(("test_rnmo", "pony"), new_state.models)
  528. self.assertIn(("test_rnmo", "horse"), new_state.models)
  529. # RenameModel also repoints all incoming FKs and M2Ms
  530. self.assertEqual("test_rnmo.Horse", new_state.models["test_rnmo", "rider"].fields[1][1].remote_field.model)
  531. self.assertTableNotExists("test_rnmo_pony")
  532. self.assertTableExists("test_rnmo_horse")
  533. if connection.features.supports_foreign_keys:
  534. self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id"))
  535. self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id"))
  536. # Migrate backwards
  537. original_state = self.unapply_operations("test_rnmo", project_state, [operation])
  538. # Test original state and database
  539. self.assertIn(("test_rnmo", "pony"), original_state.models)
  540. self.assertNotIn(("test_rnmo", "horse"), original_state.models)
  541. self.assertEqual("Pony", original_state.models["test_rnmo", "rider"].fields[1][1].remote_field.model)
  542. self.assertTableExists("test_rnmo_pony")
  543. self.assertTableNotExists("test_rnmo_horse")
  544. if connection.features.supports_foreign_keys:
  545. self.assertFKExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_pony", "id"))
  546. self.assertFKNotExists("test_rnmo_rider", ["pony_id"], ("test_rnmo_horse", "id"))
  547. # And deconstruction
  548. definition = operation.deconstruct()
  549. self.assertEqual(definition[0], "RenameModel")
  550. self.assertEqual(definition[1], [])
  551. self.assertEqual(definition[2], {'old_name': "Pony", 'new_name': "Horse"})
  552. def test_rename_model_with_self_referential_fk(self):
  553. """
  554. Tests the RenameModel operation on model with self referential FK.
  555. """
  556. project_state = self.set_up_test_model("test_rmwsrf", related_model=True)
  557. # Test the state alteration
  558. operation = migrations.RenameModel("Rider", "HorseRider")
  559. self.assertEqual(operation.describe(), "Rename model Rider to HorseRider")
  560. new_state = project_state.clone()
  561. operation.state_forwards("test_rmwsrf", new_state)
  562. self.assertNotIn(("test_rmwsrf", "rider"), new_state.models)
  563. self.assertIn(("test_rmwsrf", "horserider"), new_state.models)
  564. # Remember, RenameModel also repoints all incoming FKs and M2Ms
  565. self.assertEqual(
  566. "test_rmwsrf.HorseRider",
  567. new_state.models["test_rmwsrf", "horserider"].fields[2][1].remote_field.model
  568. )
  569. # Test the database alteration
  570. self.assertTableExists("test_rmwsrf_rider")
  571. self.assertTableNotExists("test_rmwsrf_horserider")
  572. if connection.features.supports_foreign_keys:
  573. self.assertFKExists("test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_rider", "id"))
  574. self.assertFKNotExists("test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_horserider", "id"))
  575. with connection.schema_editor() as editor:
  576. operation.database_forwards("test_rmwsrf", editor, project_state, new_state)
  577. self.assertTableNotExists("test_rmwsrf_rider")
  578. self.assertTableExists("test_rmwsrf_horserider")
  579. if connection.features.supports_foreign_keys:
  580. self.assertFKNotExists("test_rmwsrf_horserider", ["friend_id"], ("test_rmwsrf_rider", "id"))
  581. self.assertFKExists("test_rmwsrf_horserider", ["friend_id"], ("test_rmwsrf_horserider", "id"))
  582. # And test reversal
  583. with connection.schema_editor() as editor:
  584. operation.database_backwards("test_rmwsrf", editor, new_state, project_state)
  585. self.assertTableExists("test_rmwsrf_rider")
  586. self.assertTableNotExists("test_rmwsrf_horserider")
  587. if connection.features.supports_foreign_keys:
  588. self.assertFKExists("test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_rider", "id"))
  589. self.assertFKNotExists("test_rmwsrf_rider", ["friend_id"], ("test_rmwsrf_horserider", "id"))
  590. def test_rename_model_with_superclass_fk(self):
  591. """
  592. Tests the RenameModel operation on a model which has a superclass that
  593. has a foreign key.
  594. """
  595. project_state = self.set_up_test_model("test_rmwsc", related_model=True, mti_model=True)
  596. # Test the state alteration
  597. operation = migrations.RenameModel("ShetlandPony", "LittleHorse")
  598. self.assertEqual(operation.describe(), "Rename model ShetlandPony to LittleHorse")
  599. new_state = project_state.clone()
  600. operation.state_forwards("test_rmwsc", new_state)
  601. self.assertNotIn(("test_rmwsc", "shetlandpony"), new_state.models)
  602. self.assertIn(("test_rmwsc", "littlehorse"), new_state.models)
  603. # RenameModel shouldn't repoint the superclass's relations, only local ones
  604. self.assertEqual(
  605. project_state.models["test_rmwsc", "rider"].fields[1][1].remote_field.model,
  606. new_state.models["test_rmwsc", "rider"].fields[1][1].remote_field.model
  607. )
  608. # Before running the migration we have a table for Shetland Pony, not Little Horse
  609. self.assertTableExists("test_rmwsc_shetlandpony")
  610. self.assertTableNotExists("test_rmwsc_littlehorse")
  611. if connection.features.supports_foreign_keys:
  612. # and the foreign key on rider points to pony, not shetland pony
  613. self.assertFKExists("test_rmwsc_rider", ["pony_id"], ("test_rmwsc_pony", "id"))
  614. self.assertFKNotExists("test_rmwsc_rider", ["pony_id"], ("test_rmwsc_shetlandpony", "id"))
  615. with connection.schema_editor() as editor:
  616. operation.database_forwards("test_rmwsc", editor, project_state, new_state)
  617. # Now we have a little horse table, not shetland pony
  618. self.assertTableNotExists("test_rmwsc_shetlandpony")
  619. self.assertTableExists("test_rmwsc_littlehorse")
  620. if connection.features.supports_foreign_keys:
  621. # but the Foreign keys still point at pony, not little horse
  622. self.assertFKExists("test_rmwsc_rider", ["pony_id"], ("test_rmwsc_pony", "id"))
  623. self.assertFKNotExists("test_rmwsc_rider", ["pony_id"], ("test_rmwsc_littlehorse", "id"))
  624. def test_rename_model_with_self_referential_m2m(self):
  625. app_label = "test_rename_model_with_self_referential_m2m"
  626. project_state = self.apply_operations(app_label, ProjectState(), operations=[
  627. migrations.CreateModel("ReflexivePony", fields=[
  628. ("id", models.AutoField(primary_key=True)),
  629. ("ponies", models.ManyToManyField("self")),
  630. ]),
  631. ])
  632. project_state = self.apply_operations(app_label, project_state, operations=[
  633. migrations.RenameModel("ReflexivePony", "ReflexivePony2"),
  634. ])
  635. Pony = project_state.apps.get_model(app_label, "ReflexivePony2")
  636. pony = Pony.objects.create()
  637. pony.ponies.add(pony)
  638. def test_rename_model_with_m2m(self):
  639. app_label = "test_rename_model_with_m2m"
  640. project_state = self.apply_operations(app_label, ProjectState(), operations=[
  641. migrations.CreateModel("Rider", fields=[
  642. ("id", models.AutoField(primary_key=True)),
  643. ]),
  644. migrations.CreateModel("Pony", fields=[
  645. ("id", models.AutoField(primary_key=True)),
  646. ("riders", models.ManyToManyField("Rider")),
  647. ]),
  648. ])
  649. Pony = project_state.apps.get_model(app_label, "Pony")
  650. Rider = project_state.apps.get_model(app_label, "Rider")
  651. pony = Pony.objects.create()
  652. rider = Rider.objects.create()
  653. pony.riders.add(rider)
  654. project_state = self.apply_operations(app_label, project_state, operations=[
  655. migrations.RenameModel("Pony", "Pony2"),
  656. ])
  657. Pony = project_state.apps.get_model(app_label, "Pony2")
  658. Rider = project_state.apps.get_model(app_label, "Rider")
  659. pony = Pony.objects.create()
  660. rider = Rider.objects.create()
  661. pony.riders.add(rider)
  662. self.assertEqual(Pony.objects.count(), 2)
  663. self.assertEqual(Rider.objects.count(), 2)
  664. self.assertEqual(Pony._meta.get_field('riders').remote_field.through.objects.count(), 2)
  665. def test_rename_m2m_target_model(self):
  666. app_label = "test_rename_m2m_target_model"
  667. project_state = self.apply_operations(app_label, ProjectState(), operations=[
  668. migrations.CreateModel("Rider", fields=[
  669. ("id", models.AutoField(primary_key=True)),
  670. ]),
  671. migrations.CreateModel("Pony", fields=[
  672. ("id", models.AutoField(primary_key=True)),
  673. ("riders", models.ManyToManyField("Rider")),
  674. ]),
  675. ])
  676. Pony = project_state.apps.get_model(app_label, "Pony")
  677. Rider = project_state.apps.get_model(app_label, "Rider")
  678. pony = Pony.objects.create()
  679. rider = Rider.objects.create()
  680. pony.riders.add(rider)
  681. project_state = self.apply_operations(app_label, project_state, operations=[
  682. migrations.RenameModel("Rider", "Rider2"),
  683. ])
  684. Pony = project_state.apps.get_model(app_label, "Pony")
  685. Rider = project_state.apps.get_model(app_label, "Rider2")
  686. pony = Pony.objects.create()
  687. rider = Rider.objects.create()
  688. pony.riders.add(rider)
  689. self.assertEqual(Pony.objects.count(), 2)
  690. self.assertEqual(Rider.objects.count(), 2)
  691. self.assertEqual(Pony._meta.get_field('riders').remote_field.through.objects.count(), 2)
  692. def test_rename_m2m_through_model(self):
  693. app_label = "test_rename_through"
  694. project_state = self.apply_operations(app_label, ProjectState(), operations=[
  695. migrations.CreateModel("Rider", fields=[
  696. ("id", models.AutoField(primary_key=True)),
  697. ]),
  698. migrations.CreateModel("Pony", fields=[
  699. ("id", models.AutoField(primary_key=True)),
  700. ]),
  701. migrations.CreateModel("PonyRider", fields=[
  702. ("id", models.AutoField(primary_key=True)),
  703. ("rider", models.ForeignKey("test_rename_through.Rider", models.CASCADE)),
  704. ("pony", models.ForeignKey("test_rename_through.Pony", models.CASCADE)),
  705. ]),
  706. migrations.AddField(
  707. "Pony",
  708. "riders",
  709. models.ManyToManyField("test_rename_through.Rider", through="test_rename_through.PonyRider"),
  710. ),
  711. ])
  712. Pony = project_state.apps.get_model(app_label, "Pony")
  713. Rider = project_state.apps.get_model(app_label, "Rider")
  714. PonyRider = project_state.apps.get_model(app_label, "PonyRider")
  715. pony = Pony.objects.create()
  716. rider = Rider.objects.create()
  717. PonyRider.objects.create(pony=pony, rider=rider)
  718. project_state = self.apply_operations(app_label, project_state, operations=[
  719. migrations.RenameModel("PonyRider", "PonyRider2"),
  720. ])
  721. Pony = project_state.apps.get_model(app_label, "Pony")
  722. Rider = project_state.apps.get_model(app_label, "Rider")
  723. PonyRider = project_state.apps.get_model(app_label, "PonyRider2")
  724. pony = Pony.objects.first()
  725. rider = Rider.objects.create()
  726. PonyRider.objects.create(pony=pony, rider=rider)
  727. self.assertEqual(Pony.objects.count(), 1)
  728. self.assertEqual(Rider.objects.count(), 2)
  729. self.assertEqual(PonyRider.objects.count(), 2)
  730. self.assertEqual(pony.riders.count(), 2)
  731. def test_add_field(self):
  732. """
  733. Tests the AddField operation.
  734. """
  735. # Test the state alteration
  736. operation = migrations.AddField(
  737. "Pony",
  738. "height",
  739. models.FloatField(null=True, default=5),
  740. )
  741. self.assertEqual(operation.describe(), "Add field height to Pony")
  742. project_state, new_state = self.make_test_state("test_adfl", operation)
  743. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  744. field = [
  745. f for n, f in new_state.models["test_adfl", "pony"].fields
  746. if n == "height"
  747. ][0]
  748. self.assertEqual(field.default, 5)
  749. # Test the database alteration
  750. self.assertColumnNotExists("test_adfl_pony", "height")
  751. with connection.schema_editor() as editor:
  752. operation.database_forwards("test_adfl", editor, project_state, new_state)
  753. self.assertColumnExists("test_adfl_pony", "height")
  754. # And test reversal
  755. with connection.schema_editor() as editor:
  756. operation.database_backwards("test_adfl", editor, new_state, project_state)
  757. self.assertColumnNotExists("test_adfl_pony", "height")
  758. # And deconstruction
  759. definition = operation.deconstruct()
  760. self.assertEqual(definition[0], "AddField")
  761. self.assertEqual(definition[1], [])
  762. self.assertEqual(sorted(definition[2]), ["field", "model_name", "name"])
  763. def test_add_charfield(self):
  764. """
  765. Tests the AddField operation on TextField.
  766. """
  767. project_state = self.set_up_test_model("test_adchfl")
  768. Pony = project_state.apps.get_model("test_adchfl", "Pony")
  769. pony = Pony.objects.create(weight=42)
  770. new_state = self.apply_operations("test_adchfl", project_state, [
  771. migrations.AddField(
  772. "Pony",
  773. "text",
  774. models.CharField(max_length=10, default="some text"),
  775. ),
  776. migrations.AddField(
  777. "Pony",
  778. "empty",
  779. models.CharField(max_length=10, default=""),
  780. ),
  781. # If not properly quoted digits would be interpreted as an int.
  782. migrations.AddField(
  783. "Pony",
  784. "digits",
  785. models.CharField(max_length=10, default="42"),
  786. ),
  787. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  788. migrations.AddField(
  789. "Pony",
  790. "quotes",
  791. models.CharField(max_length=10, default='"\'"'),
  792. ),
  793. ])
  794. Pony = new_state.apps.get_model("test_adchfl", "Pony")
  795. pony = Pony.objects.get(pk=pony.pk)
  796. self.assertEqual(pony.text, "some text")
  797. self.assertEqual(pony.empty, "")
  798. self.assertEqual(pony.digits, "42")
  799. self.assertEqual(pony.quotes, '"\'"')
  800. def test_add_textfield(self):
  801. """
  802. Tests the AddField operation on TextField.
  803. """
  804. project_state = self.set_up_test_model("test_adtxtfl")
  805. Pony = project_state.apps.get_model("test_adtxtfl", "Pony")
  806. pony = Pony.objects.create(weight=42)
  807. new_state = self.apply_operations("test_adtxtfl", project_state, [
  808. migrations.AddField(
  809. "Pony",
  810. "text",
  811. models.TextField(default="some text"),
  812. ),
  813. migrations.AddField(
  814. "Pony",
  815. "empty",
  816. models.TextField(default=""),
  817. ),
  818. # If not properly quoted digits would be interpreted as an int.
  819. migrations.AddField(
  820. "Pony",
  821. "digits",
  822. models.TextField(default="42"),
  823. ),
  824. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  825. migrations.AddField(
  826. "Pony",
  827. "quotes",
  828. models.TextField(default='"\'"'),
  829. ),
  830. ])
  831. Pony = new_state.apps.get_model("test_adtxtfl", "Pony")
  832. pony = Pony.objects.get(pk=pony.pk)
  833. self.assertEqual(pony.text, "some text")
  834. self.assertEqual(pony.empty, "")
  835. self.assertEqual(pony.digits, "42")
  836. self.assertEqual(pony.quotes, '"\'"')
  837. def test_add_binaryfield(self):
  838. """
  839. Tests the AddField operation on TextField/BinaryField.
  840. """
  841. project_state = self.set_up_test_model("test_adbinfl")
  842. Pony = project_state.apps.get_model("test_adbinfl", "Pony")
  843. pony = Pony.objects.create(weight=42)
  844. new_state = self.apply_operations("test_adbinfl", project_state, [
  845. migrations.AddField(
  846. "Pony",
  847. "blob",
  848. models.BinaryField(default=b"some text"),
  849. ),
  850. migrations.AddField(
  851. "Pony",
  852. "empty",
  853. models.BinaryField(default=b""),
  854. ),
  855. # If not properly quoted digits would be interpreted as an int.
  856. migrations.AddField(
  857. "Pony",
  858. "digits",
  859. models.BinaryField(default=b"42"),
  860. ),
  861. # Manual quoting is fragile and could trip on quotes. Refs #xyz.
  862. migrations.AddField(
  863. "Pony",
  864. "quotes",
  865. models.BinaryField(default=b'"\'"'),
  866. ),
  867. ])
  868. Pony = new_state.apps.get_model("test_adbinfl", "Pony")
  869. pony = Pony.objects.get(pk=pony.pk)
  870. # SQLite returns buffer/memoryview, cast to bytes for checking.
  871. self.assertEqual(bytes(pony.blob), b"some text")
  872. self.assertEqual(bytes(pony.empty), b"")
  873. self.assertEqual(bytes(pony.digits), b"42")
  874. self.assertEqual(bytes(pony.quotes), b'"\'"')
  875. def test_column_name_quoting(self):
  876. """
  877. Column names that are SQL keywords shouldn't cause problems when used
  878. in migrations (#22168).
  879. """
  880. project_state = self.set_up_test_model("test_regr22168")
  881. operation = migrations.AddField(
  882. "Pony",
  883. "order",
  884. models.IntegerField(default=0),
  885. )
  886. new_state = project_state.clone()
  887. operation.state_forwards("test_regr22168", new_state)
  888. with connection.schema_editor() as editor:
  889. operation.database_forwards("test_regr22168", editor, project_state, new_state)
  890. self.assertColumnExists("test_regr22168_pony", "order")
  891. def test_add_field_preserve_default(self):
  892. """
  893. Tests the AddField operation's state alteration
  894. when preserve_default = False.
  895. """
  896. project_state = self.set_up_test_model("test_adflpd")
  897. # Test the state alteration
  898. operation = migrations.AddField(
  899. "Pony",
  900. "height",
  901. models.FloatField(null=True, default=4),
  902. preserve_default=False,
  903. )
  904. new_state = project_state.clone()
  905. operation.state_forwards("test_adflpd", new_state)
  906. self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
  907. field = [
  908. f for n, f in new_state.models["test_adflpd", "pony"].fields
  909. if n == "height"
  910. ][0]
  911. self.assertEqual(field.default, NOT_PROVIDED)
  912. # Test the database alteration
  913. project_state.apps.get_model("test_adflpd", "pony").objects.create(
  914. weight=4,
  915. )
  916. self.assertColumnNotExists("test_adflpd_pony", "height")
  917. with connection.schema_editor() as editor:
  918. operation.database_forwards("test_adflpd", editor, project_state, new_state)
  919. self.assertColumnExists("test_adflpd_pony", "height")
  920. # And deconstruction
  921. definition = operation.deconstruct()
  922. self.assertEqual(definition[0], "AddField")
  923. self.assertEqual(definition[1], [])
  924. self.assertEqual(sorted(definition[2]), ["field", "model_name", "name", "preserve_default"])
  925. def test_add_field_m2m(self):
  926. """
  927. Tests the AddField operation with a ManyToManyField.
  928. """
  929. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  930. # Test the state alteration
  931. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  932. new_state = project_state.clone()
  933. operation.state_forwards("test_adflmm", new_state)
  934. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  935. # Test the database alteration
  936. self.assertTableNotExists("test_adflmm_pony_stables")
  937. with connection.schema_editor() as editor:
  938. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  939. self.assertTableExists("test_adflmm_pony_stables")
  940. self.assertColumnNotExists("test_adflmm_pony", "stables")
  941. # Make sure the M2M field actually works
  942. with atomic():
  943. Pony = new_state.apps.get_model("test_adflmm", "Pony")
  944. p = Pony.objects.create(pink=False, weight=4.55)
  945. p.stables.create()
  946. self.assertEqual(p.stables.count(), 1)
  947. p.stables.all().delete()
  948. # And test reversal
  949. with connection.schema_editor() as editor:
  950. operation.database_backwards("test_adflmm", editor, new_state, project_state)
  951. self.assertTableNotExists("test_adflmm_pony_stables")
  952. def test_alter_field_m2m(self):
  953. project_state = self.set_up_test_model("test_alflmm", second_model=True)
  954. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  955. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  956. ])
  957. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  958. self.assertFalse(Pony._meta.get_field('stables').blank)
  959. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  960. migrations.AlterField(
  961. "Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True)
  962. )
  963. ])
  964. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  965. self.assertTrue(Pony._meta.get_field('stables').blank)
  966. def test_repoint_field_m2m(self):
  967. project_state = self.set_up_test_model("test_alflmm", second_model=True, third_model=True)
  968. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  969. migrations.AddField("Pony", "places", models.ManyToManyField("Stable", related_name="ponies"))
  970. ])
  971. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  972. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  973. migrations.AlterField("Pony", "places", models.ManyToManyField(to="Van", related_name="ponies"))
  974. ])
  975. # Ensure the new field actually works
  976. Pony = project_state.apps.get_model("test_alflmm", "Pony")
  977. p = Pony.objects.create(pink=False, weight=4.55)
  978. p.places.create()
  979. self.assertEqual(p.places.count(), 1)
  980. p.places.all().delete()
  981. def test_remove_field_m2m(self):
  982. project_state = self.set_up_test_model("test_rmflmm", second_model=True)
  983. project_state = self.apply_operations("test_rmflmm", project_state, operations=[
  984. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  985. ])
  986. self.assertTableExists("test_rmflmm_pony_stables")
  987. with_field_state = project_state.clone()
  988. operations = [migrations.RemoveField("Pony", "stables")]
  989. project_state = self.apply_operations("test_rmflmm", project_state, operations=operations)
  990. self.assertTableNotExists("test_rmflmm_pony_stables")
  991. # And test reversal
  992. self.unapply_operations("test_rmflmm", with_field_state, operations=operations)
  993. self.assertTableExists("test_rmflmm_pony_stables")
  994. def test_remove_field_m2m_with_through(self):
  995. project_state = self.set_up_test_model("test_rmflmmwt", second_model=True)
  996. self.assertTableNotExists("test_rmflmmwt_ponystables")
  997. project_state = self.apply_operations("test_rmflmmwt", project_state, operations=[
  998. migrations.CreateModel("PonyStables", fields=[
  999. ("pony", models.ForeignKey('test_rmflmmwt.Pony', models.CASCADE)),
  1000. ("stable", models.ForeignKey('test_rmflmmwt.Stable', models.CASCADE)),
  1001. ]),
  1002. migrations.AddField(
  1003. "Pony", "stables",
  1004. models.ManyToManyField("Stable", related_name="ponies", through='test_rmflmmwt.PonyStables')
  1005. )
  1006. ])
  1007. self.assertTableExists("test_rmflmmwt_ponystables")
  1008. operations = [migrations.RemoveField("Pony", "stables")]
  1009. self.apply_operations("test_rmflmmwt", project_state, operations=operations)
  1010. def test_remove_field(self):
  1011. """
  1012. Tests the RemoveField operation.
  1013. """
  1014. project_state = self.set_up_test_model("test_rmfl")
  1015. # Test the state alteration
  1016. operation = migrations.RemoveField("Pony", "pink")
  1017. self.assertEqual(operation.describe(), "Remove field pink from Pony")
  1018. new_state = project_state.clone()
  1019. operation.state_forwards("test_rmfl", new_state)
  1020. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  1021. # Test the database alteration
  1022. self.assertColumnExists("test_rmfl_pony", "pink")
  1023. with connection.schema_editor() as editor:
  1024. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  1025. self.assertColumnNotExists("test_rmfl_pony", "pink")
  1026. # And test reversal
  1027. with connection.schema_editor() as editor:
  1028. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  1029. self.assertColumnExists("test_rmfl_pony", "pink")
  1030. # And deconstruction
  1031. definition = operation.deconstruct()
  1032. self.assertEqual(definition[0], "RemoveField")
  1033. self.assertEqual(definition[1], [])
  1034. self.assertEqual(definition[2], {'model_name': "Pony", 'name': 'pink'})
  1035. def test_remove_fk(self):
  1036. """
  1037. Tests the RemoveField operation on a foreign key.
  1038. """
  1039. project_state = self.set_up_test_model("test_rfk", related_model=True)
  1040. self.assertColumnExists("test_rfk_rider", "pony_id")
  1041. operation = migrations.RemoveField("Rider", "pony")
  1042. new_state = project_state.clone()
  1043. operation.state_forwards("test_rfk", new_state)
  1044. with connection.schema_editor() as editor:
  1045. operation.database_forwards("test_rfk", editor, project_state, new_state)
  1046. self.assertColumnNotExists("test_rfk_rider", "pony_id")
  1047. with connection.schema_editor() as editor:
  1048. operation.database_backwards("test_rfk", editor, new_state, project_state)
  1049. self.assertColumnExists("test_rfk_rider", "pony_id")
  1050. def test_alter_model_table(self):
  1051. """
  1052. Tests the AlterModelTable operation.
  1053. """
  1054. project_state = self.set_up_test_model("test_almota")
  1055. # Test the state alteration
  1056. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  1057. self.assertEqual(operation.describe(), "Rename table for Pony to test_almota_pony_2")
  1058. new_state = project_state.clone()
  1059. operation.state_forwards("test_almota", new_state)
  1060. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
  1061. # Test the database alteration
  1062. self.assertTableExists("test_almota_pony")
  1063. self.assertTableNotExists("test_almota_pony_2")
  1064. with connection.schema_editor() as editor:
  1065. operation.database_forwards("test_almota", editor, project_state, new_state)
  1066. self.assertTableNotExists("test_almota_pony")
  1067. self.assertTableExists("test_almota_pony_2")
  1068. # And test reversal
  1069. with connection.schema_editor() as editor:
  1070. operation.database_backwards("test_almota", editor, new_state, project_state)
  1071. self.assertTableExists("test_almota_pony")
  1072. self.assertTableNotExists("test_almota_pony_2")
  1073. # And deconstruction
  1074. definition = operation.deconstruct()
  1075. self.assertEqual(definition[0], "AlterModelTable")
  1076. self.assertEqual(definition[1], [])
  1077. self.assertEqual(definition[2], {'name': "Pony", 'table': "test_almota_pony_2"})
  1078. def test_alter_model_table_noop(self):
  1079. """
  1080. Tests the AlterModelTable operation if the table name is not changed.
  1081. """
  1082. project_state = self.set_up_test_model("test_almota")
  1083. # Test the state alteration
  1084. operation = migrations.AlterModelTable("Pony", "test_almota_pony")
  1085. new_state = project_state.clone()
  1086. operation.state_forwards("test_almota", new_state)
  1087. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony")
  1088. # Test the database alteration
  1089. self.assertTableExists("test_almota_pony")
  1090. with connection.schema_editor() as editor:
  1091. operation.database_forwards("test_almota", editor, project_state, new_state)
  1092. self.assertTableExists("test_almota_pony")
  1093. # And test reversal
  1094. with connection.schema_editor() as editor:
  1095. operation.database_backwards("test_almota", editor, new_state, project_state)
  1096. self.assertTableExists("test_almota_pony")
  1097. def test_alter_model_table_m2m(self):
  1098. """
  1099. AlterModelTable should rename auto-generated M2M tables.
  1100. """
  1101. app_label = "test_talflmltlm2m"
  1102. pony_db_table = 'pony_foo'
  1103. project_state = self.set_up_test_model(app_label, second_model=True, db_table=pony_db_table)
  1104. # Add the M2M field
  1105. first_state = project_state.clone()
  1106. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable"))
  1107. operation.state_forwards(app_label, first_state)
  1108. with connection.schema_editor() as editor:
  1109. operation.database_forwards(app_label, editor, project_state, first_state)
  1110. original_m2m_table = "%s_%s" % (pony_db_table, "stables")
  1111. new_m2m_table = "%s_%s" % (app_label, "pony_stables")
  1112. self.assertTableExists(original_m2m_table)
  1113. self.assertTableNotExists(new_m2m_table)
  1114. # Rename the Pony db_table which should also rename the m2m table.
  1115. second_state = first_state.clone()
  1116. operation = migrations.AlterModelTable(name='pony', table=None)
  1117. operation.state_forwards(app_label, second_state)
  1118. with connection.schema_editor() as editor:
  1119. operation.database_forwards(app_label, editor, first_state, second_state)
  1120. self.assertTableExists(new_m2m_table)
  1121. self.assertTableNotExists(original_m2m_table)
  1122. # And test reversal
  1123. with connection.schema_editor() as editor:
  1124. operation.database_backwards(app_label, editor, second_state, first_state)
  1125. self.assertTableExists(original_m2m_table)
  1126. self.assertTableNotExists(new_m2m_table)
  1127. def test_alter_field(self):
  1128. """
  1129. Tests the AlterField operation.
  1130. """
  1131. project_state = self.set_up_test_model("test_alfl")
  1132. # Test the state alteration
  1133. operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
  1134. self.assertEqual(operation.describe(), "Alter field pink on Pony")
  1135. new_state = project_state.clone()
  1136. operation.state_forwards("test_alfl", new_state)
  1137. self.assertIs(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
  1138. self.assertIs(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
  1139. # Test the database alteration
  1140. self.assertColumnNotNull("test_alfl_pony", "pink")
  1141. with connection.schema_editor() as editor:
  1142. operation.database_forwards("test_alfl", editor, project_state, new_state)
  1143. self.assertColumnNull("test_alfl_pony", "pink")
  1144. # And test reversal
  1145. with connection.schema_editor() as editor:
  1146. operation.database_backwards("test_alfl", editor, new_state, project_state)
  1147. self.assertColumnNotNull("test_alfl_pony", "pink")
  1148. # And deconstruction
  1149. definition = operation.deconstruct()
  1150. self.assertEqual(definition[0], "AlterField")
  1151. self.assertEqual(definition[1], [])
  1152. self.assertEqual(sorted(definition[2]), ["field", "model_name", "name"])
  1153. def test_alter_field_pk(self):
  1154. """
  1155. Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness)
  1156. """
  1157. project_state = self.set_up_test_model("test_alflpk")
  1158. # Test the state alteration
  1159. operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True))
  1160. new_state = project_state.clone()
  1161. operation.state_forwards("test_alflpk", new_state)
  1162. self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField)
  1163. self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField)
  1164. # Test the database alteration
  1165. with connection.schema_editor() as editor:
  1166. operation.database_forwards("test_alflpk", editor, project_state, new_state)
  1167. # And test reversal
  1168. with connection.schema_editor() as editor:
  1169. operation.database_backwards("test_alflpk", editor, new_state, project_state)
  1170. @skipUnlessDBFeature('supports_foreign_keys')
  1171. def test_alter_field_pk_fk(self):
  1172. """
  1173. Tests the AlterField operation on primary keys changes any FKs pointing to it.
  1174. """
  1175. project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
  1176. # Test the state alteration
  1177. operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True))
  1178. new_state = project_state.clone()
  1179. operation.state_forwards("test_alflpkfk", new_state)
  1180. self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField)
  1181. self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField)
  1182. def assertIdTypeEqualsFkType():
  1183. with connection.cursor() as cursor:
  1184. id_type, id_null = [
  1185. (c.type_code, c.null_ok)
  1186. for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony")
  1187. if c.name == "id"
  1188. ][0]
  1189. fk_type, fk_null = [
  1190. (c.type_code, c.null_ok)
  1191. for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider")
  1192. if c.name == "pony_id"
  1193. ][0]
  1194. self.assertEqual(id_type, fk_type)
  1195. self.assertEqual(id_null, fk_null)
  1196. assertIdTypeEqualsFkType()
  1197. # Test the database alteration
  1198. with connection.schema_editor() as editor:
  1199. operation.database_forwards("test_alflpkfk", editor, project_state, new_state)
  1200. assertIdTypeEqualsFkType()
  1201. # And test reversal
  1202. with connection.schema_editor() as editor:
  1203. operation.database_backwards("test_alflpkfk", editor, new_state, project_state)
  1204. assertIdTypeEqualsFkType()
  1205. def test_rename_field(self):
  1206. """
  1207. Tests the RenameField operation.
  1208. """
  1209. project_state = self.set_up_test_model("test_rnfl", unique_together=True, index_together=True)
  1210. # Test the state alteration
  1211. operation = migrations.RenameField("Pony", "pink", "blue")
  1212. self.assertEqual(operation.describe(), "Rename field pink on Pony to blue")
  1213. new_state = project_state.clone()
  1214. operation.state_forwards("test_rnfl", new_state)
  1215. self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  1216. self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  1217. # Make sure the unique_together has the renamed column too
  1218. self.assertIn("blue", new_state.models["test_rnfl", "pony"].options['unique_together'][0])
  1219. self.assertNotIn("pink", new_state.models["test_rnfl", "pony"].options['unique_together'][0])
  1220. # Make sure the index_together has the renamed column too
  1221. self.assertIn("blue", new_state.models["test_rnfl", "pony"].options['index_together'][0])
  1222. self.assertNotIn("pink", new_state.models["test_rnfl", "pony"].options['index_together'][0])
  1223. # Test the database alteration
  1224. self.assertColumnExists("test_rnfl_pony", "pink")
  1225. self.assertColumnNotExists("test_rnfl_pony", "blue")
  1226. with connection.schema_editor() as editor:
  1227. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  1228. self.assertColumnExists("test_rnfl_pony", "blue")
  1229. self.assertColumnNotExists("test_rnfl_pony", "pink")
  1230. # Ensure the unique constraint has been ported over
  1231. with connection.cursor() as cursor:
  1232. cursor.execute("INSERT INTO test_rnfl_pony (blue, weight) VALUES (1, 1)")
  1233. with self.assertRaises(IntegrityError):
  1234. with atomic():
  1235. cursor.execute("INSERT INTO test_rnfl_pony (blue, weight) VALUES (1, 1)")
  1236. cursor.execute("DELETE FROM test_rnfl_pony")
  1237. # Ensure the index constraint has been ported over
  1238. self.assertIndexExists("test_rnfl_pony", ["weight", "blue"])
  1239. # And test reversal
  1240. with connection.schema_editor() as editor:
  1241. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  1242. self.assertColumnExists("test_rnfl_pony", "pink")
  1243. self.assertColumnNotExists("test_rnfl_pony", "blue")
  1244. # Ensure the index constraint has been reset
  1245. self.assertIndexExists("test_rnfl_pony", ["weight", "pink"])
  1246. # And deconstruction
  1247. definition = operation.deconstruct()
  1248. self.assertEqual(definition[0], "RenameField")
  1249. self.assertEqual(definition[1], [])
  1250. self.assertEqual(definition[2], {'model_name': "Pony", 'old_name': "pink", 'new_name': "blue"})
  1251. def test_alter_unique_together(self):
  1252. """
  1253. Tests the AlterUniqueTogether operation.
  1254. """
  1255. project_state = self.set_up_test_model("test_alunto")
  1256. # Test the state alteration
  1257. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  1258. self.assertEqual(operation.describe(), "Alter unique_together for Pony (1 constraint(s))")
  1259. new_state = project_state.clone()
  1260. operation.state_forwards("test_alunto", new_state)
  1261. self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
  1262. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  1263. # Make sure we can insert duplicate rows
  1264. with connection.cursor() as cursor:
  1265. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1266. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1267. cursor.execute("DELETE FROM test_alunto_pony")
  1268. # Test the database alteration
  1269. with connection.schema_editor() as editor:
  1270. operation.database_forwards("test_alunto", editor, project_state, new_state)
  1271. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1272. with self.assertRaises(IntegrityError):
  1273. with atomic():
  1274. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1275. cursor.execute("DELETE FROM test_alunto_pony")
  1276. # And test reversal
  1277. with connection.schema_editor() as editor:
  1278. operation.database_backwards("test_alunto", editor, new_state, project_state)
  1279. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1280. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  1281. cursor.execute("DELETE FROM test_alunto_pony")
  1282. # Test flat unique_together
  1283. operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
  1284. operation.state_forwards("test_alunto", new_state)
  1285. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  1286. # And deconstruction
  1287. definition = operation.deconstruct()
  1288. self.assertEqual(definition[0], "AlterUniqueTogether")
  1289. self.assertEqual(definition[1], [])
  1290. self.assertEqual(definition[2], {'name': "Pony", 'unique_together': {("pink", "weight")}})
  1291. def test_alter_unique_together_remove(self):
  1292. operation = migrations.AlterUniqueTogether("Pony", None)
  1293. self.assertEqual(operation.describe(), "Alter unique_together for Pony (0 constraint(s))")
  1294. def test_alter_index_together(self):
  1295. """
  1296. Tests the AlterIndexTogether operation.
  1297. """
  1298. project_state = self.set_up_test_model("test_alinto")
  1299. # Test the state alteration
  1300. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  1301. self.assertEqual(operation.describe(), "Alter index_together for Pony (1 constraint(s))")
  1302. new_state = project_state.clone()
  1303. operation.state_forwards("test_alinto", new_state)
  1304. self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
  1305. self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
  1306. # Make sure there's no matching index
  1307. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  1308. # Test the database alteration
  1309. with connection.schema_editor() as editor:
  1310. operation.database_forwards("test_alinto", editor, project_state, new_state)
  1311. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  1312. # And test reversal
  1313. with connection.schema_editor() as editor:
  1314. operation.database_backwards("test_alinto", editor, new_state, project_state)
  1315. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  1316. # And deconstruction
  1317. definition = operation.deconstruct()
  1318. self.assertEqual(definition[0], "AlterIndexTogether")
  1319. self.assertEqual(definition[1], [])
  1320. self.assertEqual(definition[2], {'name': "Pony", 'index_together': {("pink", "weight")}})
  1321. def test_alter_index_together_remove(self):
  1322. operation = migrations.AlterIndexTogether("Pony", None)
  1323. self.assertEqual(operation.describe(), "Alter index_together for Pony (0 constraint(s))")
  1324. def test_alter_model_options(self):
  1325. """
  1326. Tests the AlterModelOptions operation.
  1327. """
  1328. project_state = self.set_up_test_model("test_almoop")
  1329. # Test the state alteration (no DB alteration to test)
  1330. operation = migrations.AlterModelOptions("Pony", {"permissions": [("can_groom", "Can groom")]})
  1331. self.assertEqual(operation.describe(), "Change Meta options on Pony")
  1332. new_state = project_state.clone()
  1333. operation.state_forwards("test_almoop", new_state)
  1334. self.assertEqual(len(project_state.models["test_almoop", "pony"].options.get("permissions", [])), 0)
  1335. self.assertEqual(len(new_state.models["test_almoop", "pony"].options.get("permissions", [])), 1)
  1336. self.assertEqual(new_state.models["test_almoop", "pony"].options["permissions"][0][0], "can_groom")
  1337. # And deconstruction
  1338. definition = operation.deconstruct()
  1339. self.assertEqual(definition[0], "AlterModelOptions")
  1340. self.assertEqual(definition[1], [])
  1341. self.assertEqual(definition[2], {'name': "Pony", 'options': {"permissions": [("can_groom", "Can groom")]}})
  1342. def test_alter_model_options_emptying(self):
  1343. """
  1344. Tests that the AlterModelOptions operation removes keys from the dict (#23121)
  1345. """
  1346. project_state = self.set_up_test_model("test_almoop", options=True)
  1347. # Test the state alteration (no DB alteration to test)
  1348. operation = migrations.AlterModelOptions("Pony", {})
  1349. self.assertEqual(operation.describe(), "Change Meta options on Pony")
  1350. new_state = project_state.clone()
  1351. operation.state_forwards("test_almoop", new_state)
  1352. self.assertEqual(len(project_state.models["test_almoop", "pony"].options.get("permissions", [])), 1)
  1353. self.assertEqual(len(new_state.models["test_almoop", "pony"].options.get("permissions", [])), 0)
  1354. # And deconstruction
  1355. definition = operation.deconstruct()
  1356. self.assertEqual(definition[0], "AlterModelOptions")
  1357. self.assertEqual(definition[1], [])
  1358. self.assertEqual(definition[2], {'name': "Pony", 'options': {}})
  1359. def test_alter_order_with_respect_to(self):
  1360. """
  1361. Tests the AlterOrderWithRespectTo operation.
  1362. """
  1363. project_state = self.set_up_test_model("test_alorwrtto", related_model=True)
  1364. # Test the state alteration
  1365. operation = migrations.AlterOrderWithRespectTo("Rider", "pony")
  1366. self.assertEqual(operation.describe(), "Set order_with_respect_to on Rider to pony")
  1367. new_state = project_state.clone()
  1368. operation.state_forwards("test_alorwrtto", new_state)
  1369. self.assertIsNone(
  1370. project_state.models["test_alorwrtto", "rider"].options.get("order_with_respect_to", None)
  1371. )
  1372. self.assertEqual(
  1373. new_state.models["test_alorwrtto", "rider"].options.get("order_with_respect_to", None),
  1374. "pony"
  1375. )
  1376. # Make sure there's no matching index
  1377. self.assertColumnNotExists("test_alorwrtto_rider", "_order")
  1378. # Create some rows before alteration
  1379. rendered_state = project_state.apps
  1380. pony = rendered_state.get_model("test_alorwrtto", "Pony").objects.create(weight=50)
  1381. rendered_state.get_model("test_alorwrtto", "Rider").objects.create(pony=pony, friend_id=1)
  1382. rendered_state.get_model("test_alorwrtto", "Rider").objects.create(pony=pony, friend_id=2)
  1383. # Test the database alteration
  1384. with connection.schema_editor() as editor:
  1385. operation.database_forwards("test_alorwrtto", editor, project_state, new_state)
  1386. self.assertColumnExists("test_alorwrtto_rider", "_order")
  1387. # Check for correct value in rows
  1388. updated_riders = new_state.apps.get_model("test_alorwrtto", "Rider").objects.all()
  1389. self.assertEqual(updated_riders[0]._order, 0)
  1390. self.assertEqual(updated_riders[1]._order, 0)
  1391. # And test reversal
  1392. with connection.schema_editor() as editor:
  1393. operation.database_backwards("test_alorwrtto", editor, new_state, project_state)
  1394. self.assertColumnNotExists("test_alorwrtto_rider", "_order")
  1395. # And deconstruction
  1396. definition = operation.deconstruct()
  1397. self.assertEqual(definition[0], "AlterOrderWithRespectTo")
  1398. self.assertEqual(definition[1], [])
  1399. self.assertEqual(definition[2], {'name': "Rider", 'order_with_respect_to': "pony"})
  1400. def test_alter_model_managers(self):
  1401. """
  1402. Tests that the managers on a model are set.
  1403. """
  1404. project_state = self.set_up_test_model("test_almoma")
  1405. # Test the state alteration
  1406. operation = migrations.AlterModelManagers(
  1407. "Pony",
  1408. managers=[
  1409. ("food_qs", FoodQuerySet.as_manager()),
  1410. ("food_mgr", FoodManager("a", "b")),
  1411. ("food_mgr_kwargs", FoodManager("x", "y", 3, 4)),
  1412. ]
  1413. )
  1414. self.assertEqual(operation.describe(), "Change managers on Pony")
  1415. managers = project_state.models["test_almoma", "pony"].managers
  1416. self.assertEqual(managers, [])
  1417. new_state = project_state.clone()
  1418. operation.state_forwards("test_almoma", new_state)
  1419. self.assertIn(("test_almoma", "pony"), new_state.models)
  1420. managers = new_state.models["test_almoma", "pony"].managers
  1421. self.assertEqual(managers[0][0], "food_qs")
  1422. self.assertIsInstance(managers[0][1], models.Manager)
  1423. self.assertEqual(managers[1][0], "food_mgr")
  1424. self.assertIsInstance(managers[1][1], FoodManager)
  1425. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  1426. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  1427. self.assertIsInstance(managers[2][1], FoodManager)
  1428. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  1429. rendered_state = new_state.apps
  1430. model = rendered_state.get_model('test_almoma', 'pony')
  1431. self.assertIsInstance(model.food_qs, models.Manager)
  1432. self.assertIsInstance(model.food_mgr, FoodManager)
  1433. self.assertIsInstance(model.food_mgr_kwargs, FoodManager)
  1434. def test_alter_model_managers_emptying(self):
  1435. """
  1436. Tests that the managers on a model are set.
  1437. """
  1438. project_state = self.set_up_test_model("test_almomae", manager_model=True)
  1439. # Test the state alteration
  1440. operation = migrations.AlterModelManagers("Food", managers=[])
  1441. self.assertEqual(operation.describe(), "Change managers on Food")
  1442. self.assertIn(("test_almomae", "food"), project_state.models)
  1443. managers = project_state.models["test_almomae", "food"].managers
  1444. self.assertEqual(managers[0][0], "food_qs")
  1445. self.assertIsInstance(managers[0][1], models.Manager)
  1446. self.assertEqual(managers[1][0], "food_mgr")
  1447. self.assertIsInstance(managers[1][1], FoodManager)
  1448. self.assertEqual(managers[1][1].args, ("a", "b", 1, 2))
  1449. self.assertEqual(managers[2][0], "food_mgr_kwargs")
  1450. self.assertIsInstance(managers[2][1], FoodManager)
  1451. self.assertEqual(managers[2][1].args, ("x", "y", 3, 4))
  1452. new_state = project_state.clone()
  1453. operation.state_forwards("test_almomae", new_state)
  1454. managers = new_state.models["test_almomae", "food"].managers
  1455. self.assertEqual(managers, [])
  1456. def test_alter_fk(self):
  1457. """
  1458. Tests that creating and then altering an FK works correctly
  1459. and deals with the pending SQL (#23091)
  1460. """
  1461. project_state = self.set_up_test_model("test_alfk")
  1462. # Test adding and then altering the FK in one go
  1463. create_operation = migrations.CreateModel(
  1464. name="Rider",
  1465. fields=[
  1466. ("id", models.AutoField(primary_key=True)),
  1467. ("pony", models.ForeignKey("Pony", models.CASCADE)),
  1468. ],
  1469. )
  1470. create_state = project_state.clone()
  1471. create_operation.state_forwards("test_alfk", create_state)
  1472. alter_operation = migrations.AlterField(
  1473. model_name='Rider',
  1474. name='pony',
  1475. field=models.ForeignKey("Pony", models.CASCADE, editable=False),
  1476. )
  1477. alter_state = create_state.clone()
  1478. alter_operation.state_forwards("test_alfk", alter_state)
  1479. with connection.schema_editor() as editor:
  1480. create_operation.database_forwards("test_alfk", editor, project_state, create_state)
  1481. alter_operation.database_forwards("test_alfk", editor, create_state, alter_state)
  1482. def test_alter_fk_non_fk(self):
  1483. """
  1484. Tests that altering an FK to a non-FK works (#23244)
  1485. """
  1486. # Test the state alteration
  1487. operation = migrations.AlterField(
  1488. model_name="Rider",
  1489. name="pony",
  1490. field=models.FloatField(),
  1491. )
  1492. project_state, new_state = self.make_test_state("test_afknfk", operation, related_model=True)
  1493. # Test the database alteration
  1494. self.assertColumnExists("test_afknfk_rider", "pony_id")
  1495. self.assertColumnNotExists("test_afknfk_rider", "pony")
  1496. with connection.schema_editor() as editor:
  1497. operation.database_forwards("test_afknfk", editor, project_state, new_state)
  1498. self.assertColumnExists("test_afknfk_rider", "pony")
  1499. self.assertColumnNotExists("test_afknfk_rider", "pony_id")
  1500. # And test reversal
  1501. with connection.schema_editor() as editor:
  1502. operation.database_backwards("test_afknfk", editor, new_state, project_state)
  1503. self.assertColumnExists("test_afknfk_rider", "pony_id")
  1504. self.assertColumnNotExists("test_afknfk_rider", "pony")
  1505. @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
  1506. def test_run_sql(self):
  1507. """
  1508. Tests the RunSQL operation.
  1509. """
  1510. project_state = self.set_up_test_model("test_runsql")
  1511. # Create the operation
  1512. operation = migrations.RunSQL(
  1513. # Use a multi-line string with a comment to test splitting on SQLite and MySQL respectively
  1514. "CREATE TABLE i_love_ponies (id int, special_thing varchar(15));\n"
  1515. "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 'i love ponies'); -- this is magic!\n"
  1516. "INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 'i love django');\n"
  1517. "UPDATE i_love_ponies SET special_thing = 'Ponies' WHERE special_thing LIKE '%%ponies';"
  1518. "UPDATE i_love_ponies SET special_thing = 'Django' WHERE special_thing LIKE '%django';",
  1519. # Run delete queries to test for parameter substitution failure
  1520. # reported in #23426
  1521. "DELETE FROM i_love_ponies WHERE special_thing LIKE '%Django%';"
  1522. "DELETE FROM i_love_ponies WHERE special_thing LIKE '%%Ponies%%';"
  1523. "DROP TABLE i_love_ponies",
  1524. state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])],
  1525. )
  1526. self.assertEqual(operation.describe(), "Raw SQL operation")
  1527. # Test the state alteration
  1528. new_state = project_state.clone()
  1529. operation.state_forwards("test_runsql", new_state)
  1530. self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1)
  1531. # Make sure there's no table
  1532. self.assertTableNotExists("i_love_ponies")
  1533. # Test SQL collection
  1534. with connection.schema_editor(collect_sql=True) as editor:
  1535. operation.database_forwards("test_runsql", editor, project_state, new_state)
  1536. self.assertIn("LIKE '%%ponies';", "\n".join(editor.collected_sql))
  1537. operation.database_backwards("test_runsql", editor, project_state, new_state)
  1538. self.assertIn("LIKE '%%Ponies%%';", "\n".join(editor.collected_sql))
  1539. # Test the database alteration
  1540. with connection.schema_editor() as editor:
  1541. operation.database_forwards("test_runsql", editor, project_state, new_state)
  1542. self.assertTableExists("i_love_ponies")
  1543. # Make sure all the SQL was processed
  1544. with connection.cursor() as cursor:
  1545. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  1546. self.assertEqual(cursor.fetchall()[0][0], 2)
  1547. cursor.execute("SELECT COUNT(*) FROM i_love_ponies WHERE special_thing = 'Django'")
  1548. self.assertEqual(cursor.fetchall()[0][0], 1)
  1549. cursor.execute("SELECT COUNT(*) FROM i_love_ponies WHERE special_thing = 'Ponies'")
  1550. self.assertEqual(cursor.fetchall()[0][0], 1)
  1551. # And test reversal
  1552. self.assertTrue(operation.reversible)
  1553. with connection.schema_editor() as editor:
  1554. operation.database_backwards("test_runsql", editor, new_state, project_state)
  1555. self.assertTableNotExists("i_love_ponies")
  1556. # And deconstruction
  1557. definition = operation.deconstruct()
  1558. self.assertEqual(definition[0], "RunSQL")
  1559. self.assertEqual(definition[1], [])
  1560. self.assertEqual(sorted(definition[2]), ["reverse_sql", "sql", "state_operations"])
  1561. # And elidable reduction
  1562. self.assertIs(False, operation.reduce(operation, []))
  1563. elidable_operation = migrations.RunSQL('SELECT 1 FROM void;', elidable=True)
  1564. self.assertEqual(elidable_operation.reduce(operation, []), [operation])
  1565. def test_run_sql_params(self):
  1566. """
  1567. #23426 - RunSQL should accept parameters.
  1568. """
  1569. project_state = self.set_up_test_model("test_runsql")
  1570. # Create the operation
  1571. operation = migrations.RunSQL(
  1572. ["CREATE TABLE i_love_ponies (id int, special_thing varchar(15));"],
  1573. ["DROP TABLE i_love_ponies"],
  1574. )
  1575. param_operation = migrations.RunSQL(
  1576. # forwards
  1577. (
  1578. "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 'Django');",
  1579. ["INSERT INTO i_love_ponies (id, special_thing) VALUES (2, %s);", ['Ponies']],
  1580. ("INSERT INTO i_love_ponies (id, special_thing) VALUES (%s, %s);", (3, 'Python',)),
  1581. ),
  1582. # backwards
  1583. [
  1584. "DELETE FROM i_love_ponies WHERE special_thing = 'Django';",
  1585. ["DELETE FROM i_love_ponies WHERE special_thing = 'Ponies';", None],
  1586. ("DELETE FROM i_love_ponies WHERE id = %s OR special_thing = %s;", [3, 'Python']),
  1587. ]
  1588. )
  1589. # Make sure there's no table
  1590. self.assertTableNotExists("i_love_ponies")
  1591. new_state = project_state.clone()
  1592. # Test the database alteration
  1593. with connection.schema_editor() as editor:
  1594. operation.database_forwards("test_runsql", editor, project_state, new_state)
  1595. # Test parameter passing
  1596. with connection.schema_editor() as editor:
  1597. param_operation.database_forwards("test_runsql", editor, project_state, new_state)
  1598. # Make sure all the SQL was processed
  1599. with connection.cursor() as cursor:
  1600. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  1601. self.assertEqual(cursor.fetchall()[0][0], 3)
  1602. with connection.schema_editor() as editor:
  1603. param_operation.database_backwards("test_runsql", editor, new_state, project_state)
  1604. with connection.cursor() as cursor:
  1605. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  1606. self.assertEqual(cursor.fetchall()[0][0], 0)
  1607. # And test reversal
  1608. with connection.schema_editor() as editor:
  1609. operation.database_backwards("test_runsql", editor, new_state, project_state)
  1610. self.assertTableNotExists("i_love_ponies")
  1611. def test_run_sql_params_invalid(self):
  1612. """
  1613. #23426 - RunSQL should fail when a list of statements with an incorrect
  1614. number of tuples is given.
  1615. """
  1616. project_state = self.set_up_test_model("test_runsql")
  1617. new_state = project_state.clone()
  1618. operation = migrations.RunSQL(
  1619. # forwards
  1620. [
  1621. ["INSERT INTO foo (bar) VALUES ('buz');"]
  1622. ],
  1623. # backwards
  1624. (
  1625. ("DELETE FROM foo WHERE bar = 'buz';", 'invalid', 'parameter count'),
  1626. ),
  1627. )
  1628. with connection.schema_editor() as editor:
  1629. with self.assertRaisesMessage(ValueError, "Expected a 2-tuple but got 1"):
  1630. operation.database_forwards("test_runsql", editor, project_state, new_state)
  1631. with connection.schema_editor() as editor:
  1632. with self.assertRaisesMessage(ValueError, "Expected a 2-tuple but got 3"):
  1633. operation.database_backwards("test_runsql", editor, new_state, project_state)
  1634. def test_run_sql_noop(self):
  1635. """
  1636. #24098 - Tests no-op RunSQL operations.
  1637. """
  1638. operation = migrations.RunSQL(migrations.RunSQL.noop, migrations.RunSQL.noop)
  1639. with connection.schema_editor() as editor:
  1640. operation.database_forwards("test_runsql", editor, None, None)
  1641. operation.database_backwards("test_runsql", editor, None, None)
  1642. def test_run_python(self):
  1643. """
  1644. Tests the RunPython operation
  1645. """
  1646. project_state = self.set_up_test_model("test_runpython", mti_model=True)
  1647. # Create the operation
  1648. def inner_method(models, schema_editor):
  1649. Pony = models.get_model("test_runpython", "Pony")
  1650. Pony.objects.create(pink=1, weight=3.55)
  1651. Pony.objects.create(weight=5)
  1652. def inner_method_reverse(models, schema_editor):
  1653. Pony = models.get_model("test_runpython", "Pony")
  1654. Pony.objects.filter(pink=1, weight=3.55).delete()
  1655. Pony.objects.filter(weight=5).delete()
  1656. operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse)
  1657. self.assertEqual(operation.describe(), "Raw Python operation")
  1658. # Test the state alteration does nothing
  1659. new_state = project_state.clone()
  1660. operation.state_forwards("test_runpython", new_state)
  1661. self.assertEqual(new_state, project_state)
  1662. # Test the database alteration
  1663. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 0)
  1664. with connection.schema_editor() as editor:
  1665. operation.database_forwards("test_runpython", editor, project_state, new_state)
  1666. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 2)
  1667. # Now test reversal
  1668. self.assertTrue(operation.reversible)
  1669. with connection.schema_editor() as editor:
  1670. operation.database_backwards("test_runpython", editor, project_state, new_state)
  1671. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 0)
  1672. # Now test we can't use a string
  1673. with self.assertRaises(ValueError):
  1674. migrations.RunPython("print 'ahahaha'")
  1675. # And deconstruction
  1676. definition = operation.deconstruct()
  1677. self.assertEqual(definition[0], "RunPython")
  1678. self.assertEqual(definition[1], [])
  1679. self.assertEqual(sorted(definition[2]), ["code", "reverse_code"])
  1680. # Also test reversal fails, with an operation identical to above but without reverse_code set
  1681. no_reverse_operation = migrations.RunPython(inner_method)
  1682. self.assertFalse(no_reverse_operation.reversible)
  1683. with connection.schema_editor() as editor:
  1684. no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state)
  1685. with self.assertRaises(NotImplementedError):
  1686. no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state)
  1687. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 2)
  1688. def create_ponies(models, schema_editor):
  1689. Pony = models.get_model("test_runpython", "Pony")
  1690. pony1 = Pony.objects.create(pink=1, weight=3.55)
  1691. self.assertIsNot(pony1.pk, None)
  1692. pony2 = Pony.objects.create(weight=5)
  1693. self.assertIsNot(pony2.pk, None)
  1694. self.assertNotEqual(pony1.pk, pony2.pk)
  1695. operation = migrations.RunPython(create_ponies)
  1696. with connection.schema_editor() as editor:
  1697. operation.database_forwards("test_runpython", editor, project_state, new_state)
  1698. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 4)
  1699. # And deconstruction
  1700. definition = operation.deconstruct()
  1701. self.assertEqual(definition[0], "RunPython")
  1702. self.assertEqual(definition[1], [])
  1703. self.assertEqual(sorted(definition[2]), ["code"])
  1704. def create_shetlandponies(models, schema_editor):
  1705. ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
  1706. pony1 = ShetlandPony.objects.create(weight=4.0)
  1707. self.assertIsNot(pony1.pk, None)
  1708. pony2 = ShetlandPony.objects.create(weight=5.0)
  1709. self.assertIsNot(pony2.pk, None)
  1710. self.assertNotEqual(pony1.pk, pony2.pk)
  1711. operation = migrations.RunPython(create_shetlandponies)
  1712. with connection.schema_editor() as editor:
  1713. operation.database_forwards("test_runpython", editor, project_state, new_state)
  1714. self.assertEqual(project_state.apps.get_model("test_runpython", "Pony").objects.count(), 6)
  1715. self.assertEqual(project_state.apps.get_model("test_runpython", "ShetlandPony").objects.count(), 2)
  1716. # And elidable reduction
  1717. self.assertIs(False, operation.reduce(operation, []))
  1718. elidable_operation = migrations.RunPython(inner_method, elidable=True)
  1719. self.assertEqual(elidable_operation.reduce(operation, []), [operation])
  1720. def test_run_python_atomic(self):
  1721. """
  1722. Tests the RunPython operation correctly handles the "atomic" keyword
  1723. """
  1724. project_state = self.set_up_test_model("test_runpythonatomic", mti_model=True)
  1725. def inner_method(models, schema_editor):
  1726. Pony = models.get_model("test_runpythonatomic", "Pony")
  1727. Pony.objects.create(pink=1, weight=3.55)
  1728. raise ValueError("Adrian hates ponies.")
  1729. atomic_migration = Migration("test", "test_runpythonatomic")
  1730. atomic_migration.operations = [migrations.RunPython(inner_method)]
  1731. non_atomic_migration = Migration("test", "test_runpythonatomic")
  1732. non_atomic_migration.operations = [migrations.RunPython(inner_method, atomic=False)]
  1733. # If we're a fully-transactional database, both versions should rollback
  1734. if connection.features.can_rollback_ddl:
  1735. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 0)
  1736. with self.assertRaises(ValueError):
  1737. with connection.schema_editor() as editor:
  1738. atomic_migration.apply(project_state, editor)
  1739. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 0)
  1740. with self.assertRaises(ValueError):
  1741. with connection.schema_editor() as editor:
  1742. non_atomic_migration.apply(project_state, editor)
  1743. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 0)
  1744. # Otherwise, the non-atomic operation should leave a row there
  1745. else:
  1746. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 0)
  1747. with self.assertRaises(ValueError):
  1748. with connection.schema_editor() as editor:
  1749. atomic_migration.apply(project_state, editor)
  1750. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 0)
  1751. with self.assertRaises(ValueError):
  1752. with connection.schema_editor() as editor:
  1753. non_atomic_migration.apply(project_state, editor)
  1754. self.assertEqual(project_state.apps.get_model("test_runpythonatomic", "Pony").objects.count(), 1)
  1755. # And deconstruction
  1756. definition = non_atomic_migration.operations[0].deconstruct()
  1757. self.assertEqual(definition[0], "RunPython")
  1758. self.assertEqual(definition[1], [])
  1759. self.assertEqual(sorted(definition[2]), ["atomic", "code"])
  1760. def test_run_python_related_assignment(self):
  1761. """
  1762. #24282 - Tests that model changes to a FK reverse side update the model
  1763. on the FK side as well.
  1764. """
  1765. def inner_method(models, schema_editor):
  1766. Author = models.get_model("test_authors", "Author")
  1767. Book = models.get_model("test_books", "Book")
  1768. author = Author.objects.create(name="Hemingway")
  1769. Book.objects.create(title="Old Man and The Sea", author=author)
  1770. create_author = migrations.CreateModel(
  1771. "Author",
  1772. [
  1773. ("id", models.AutoField(primary_key=True)),
  1774. ("name", models.CharField(max_length=100)),
  1775. ],
  1776. options={},
  1777. )
  1778. create_book = migrations.CreateModel(
  1779. "Book",
  1780. [
  1781. ("id", models.AutoField(primary_key=True)),
  1782. ("title", models.CharField(max_length=100)),
  1783. ("author", models.ForeignKey("test_authors.Author", models.CASCADE))
  1784. ],
  1785. options={},
  1786. )
  1787. add_hometown = migrations.AddField(
  1788. "Author",
  1789. "hometown",
  1790. models.CharField(max_length=100),
  1791. )
  1792. create_old_man = migrations.RunPython(inner_method, inner_method)
  1793. project_state = ProjectState()
  1794. new_state = project_state.clone()
  1795. with connection.schema_editor() as editor:
  1796. create_author.state_forwards("test_authors", new_state)
  1797. create_author.database_forwards("test_authors", editor, project_state, new_state)
  1798. project_state = new_state
  1799. new_state = new_state.clone()
  1800. with connection.schema_editor() as editor:
  1801. create_book.state_forwards("test_books", new_state)
  1802. create_book.database_forwards("test_books", editor, project_state, new_state)
  1803. project_state = new_state
  1804. new_state = new_state.clone()
  1805. with connection.schema_editor() as editor:
  1806. add_hometown.state_forwards("test_authors", new_state)
  1807. add_hometown.database_forwards("test_authors", editor, project_state, new_state)
  1808. project_state = new_state
  1809. new_state = new_state.clone()
  1810. with connection.schema_editor() as editor:
  1811. create_old_man.state_forwards("test_books", new_state)
  1812. create_old_man.database_forwards("test_books", editor, project_state, new_state)
  1813. def test_model_with_bigautofield(self):
  1814. """
  1815. A model with BigAutoField can be created.
  1816. """
  1817. def create_data(models, schema_editor):
  1818. Author = models.get_model("test_author", "Author")
  1819. Book = models.get_model("test_book", "Book")
  1820. author1 = Author.objects.create(name="Hemingway")
  1821. Book.objects.create(title="Old Man and The Sea", author=author1)
  1822. Book.objects.create(id=2 ** 33, title="A farewell to arms", author=author1)
  1823. author2 = Author.objects.create(id=2 ** 33, name="Remarque")
  1824. Book.objects.create(title="All quiet on the western front", author=author2)
  1825. Book.objects.create(title="Arc de Triomphe", author=author2)
  1826. create_author = migrations.CreateModel(
  1827. "Author",
  1828. [
  1829. ("id", models.BigAutoField(primary_key=True)),
  1830. ("name", models.CharField(max_length=100)),
  1831. ],
  1832. options={},
  1833. )
  1834. create_book = migrations.CreateModel(
  1835. "Book",
  1836. [
  1837. ("id", models.BigAutoField(primary_key=True)),
  1838. ("title", models.CharField(max_length=100)),
  1839. ("author", models.ForeignKey(to="test_author.Author", on_delete=models.CASCADE))
  1840. ],
  1841. options={},
  1842. )
  1843. fill_data = migrations.RunPython(create_data)
  1844. project_state = ProjectState()
  1845. new_state = project_state.clone()
  1846. with connection.schema_editor() as editor:
  1847. create_author.state_forwards("test_author", new_state)
  1848. create_author.database_forwards("test_author", editor, project_state, new_state)
  1849. project_state = new_state
  1850. new_state = new_state.clone()
  1851. with connection.schema_editor() as editor:
  1852. create_book.state_forwards("test_book", new_state)
  1853. create_book.database_forwards("test_book", editor, project_state, new_state)
  1854. project_state = new_state
  1855. new_state = new_state.clone()
  1856. with connection.schema_editor() as editor:
  1857. fill_data.state_forwards("fill_data", new_state)
  1858. fill_data.database_forwards("fill_data", editor, project_state, new_state)
  1859. def test_autofield_foreignfield_growth(self):
  1860. """
  1861. A field may be migrated from AutoField to BigAutoField.
  1862. """
  1863. def create_initial_data(models, schema_editor):
  1864. Article = models.get_model("test_article", "Article")
  1865. Blog = models.get_model("test_blog", "Blog")
  1866. blog = Blog.objects.create(name="web development done right")
  1867. Article.objects.create(name="Frameworks", blog=blog)
  1868. Article.objects.create(name="Programming Languages", blog=blog)
  1869. def create_big_data(models, schema_editor):
  1870. Article = models.get_model("test_article", "Article")
  1871. Blog = models.get_model("test_blog", "Blog")
  1872. blog2 = Blog.objects.create(name="Frameworks", id=2 ** 33)
  1873. Article.objects.create(name="Django", blog=blog2)
  1874. Article.objects.create(id=2 ** 33, name="Django2", blog=blog2)
  1875. create_blog = migrations.CreateModel(
  1876. "Blog",
  1877. [
  1878. ("id", models.AutoField(primary_key=True)),
  1879. ("name", models.CharField(max_length=100)),
  1880. ],
  1881. options={},
  1882. )
  1883. create_article = migrations.CreateModel(
  1884. "Article",
  1885. [
  1886. ("id", models.AutoField(primary_key=True)),
  1887. ("blog", models.ForeignKey(to="test_blog.Blog", on_delete=models.CASCADE)),
  1888. ("name", models.CharField(max_length=100)),
  1889. ("data", models.TextField(default="")),
  1890. ],
  1891. options={},
  1892. )
  1893. fill_initial_data = migrations.RunPython(create_initial_data, create_initial_data)
  1894. fill_big_data = migrations.RunPython(create_big_data, create_big_data)
  1895. grow_article_id = migrations.AlterField("Article", "id", models.BigAutoField(primary_key=True))
  1896. grow_blog_id = migrations.AlterField("Blog", "id", models.BigAutoField(primary_key=True))
  1897. project_state = ProjectState()
  1898. new_state = project_state.clone()
  1899. with connection.schema_editor() as editor:
  1900. create_blog.state_forwards("test_blog", new_state)
  1901. create_blog.database_forwards("test_blog", editor, project_state, new_state)
  1902. project_state = new_state
  1903. new_state = new_state.clone()
  1904. with connection.schema_editor() as editor:
  1905. create_article.state_forwards("test_article", new_state)
  1906. create_article.database_forwards("test_article", editor, project_state, new_state)
  1907. project_state = new_state
  1908. new_state = new_state.clone()
  1909. with connection.schema_editor() as editor:
  1910. fill_initial_data.state_forwards("fill_initial_data", new_state)
  1911. fill_initial_data.database_forwards("fill_initial_data", editor, project_state, new_state)
  1912. project_state = new_state
  1913. new_state = new_state.clone()
  1914. with connection.schema_editor() as editor:
  1915. grow_article_id.state_forwards("test_article", new_state)
  1916. grow_article_id.database_forwards("test_article", editor, project_state, new_state)
  1917. state = new_state.clone()
  1918. article = state.apps.get_model("test_article.Article")
  1919. self.assertIsInstance(article._meta.pk, models.BigAutoField)
  1920. project_state = new_state
  1921. new_state = new_state.clone()
  1922. with connection.schema_editor() as editor:
  1923. grow_blog_id.state_forwards("test_blog", new_state)
  1924. grow_blog_id.database_forwards("test_blog", editor, project_state, new_state)
  1925. state = new_state.clone()
  1926. blog = state.apps.get_model("test_blog.Blog")
  1927. self.assertIsInstance(blog._meta.pk, models.BigAutoField)
  1928. project_state = new_state
  1929. new_state = new_state.clone()
  1930. with connection.schema_editor() as editor:
  1931. fill_big_data.state_forwards("fill_big_data", new_state)
  1932. fill_big_data.database_forwards("fill_big_data", editor, project_state, new_state)
  1933. def test_run_python_noop(self):
  1934. """
  1935. #24098 - Tests no-op RunPython operations.
  1936. """
  1937. project_state = ProjectState()
  1938. new_state = project_state.clone()
  1939. operation = migrations.RunPython(migrations.RunPython.noop, migrations.RunPython.noop)
  1940. with connection.schema_editor() as editor:
  1941. operation.database_forwards("test_runpython", editor, project_state, new_state)
  1942. operation.database_backwards("test_runpython", editor, new_state, project_state)
  1943. @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
  1944. def test_separate_database_and_state(self):
  1945. """
  1946. Tests the SeparateDatabaseAndState operation.
  1947. """
  1948. project_state = self.set_up_test_model("test_separatedatabaseandstate")
  1949. # Create the operation
  1950. database_operation = migrations.RunSQL(
  1951. "CREATE TABLE i_love_ponies (id int, special_thing int);",
  1952. "DROP TABLE i_love_ponies;"
  1953. )
  1954. state_operation = migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])
  1955. operation = migrations.SeparateDatabaseAndState(
  1956. state_operations=[state_operation],
  1957. database_operations=[database_operation]
  1958. )
  1959. self.assertEqual(operation.describe(), "Custom state/database change combination")
  1960. # Test the state alteration
  1961. new_state = project_state.clone()
  1962. operation.state_forwards("test_separatedatabaseandstate", new_state)
  1963. self.assertEqual(len(new_state.models["test_separatedatabaseandstate", "somethingelse"].fields), 1)
  1964. # Make sure there's no table
  1965. self.assertTableNotExists("i_love_ponies")
  1966. # Test the database alteration
  1967. with connection.schema_editor() as editor:
  1968. operation.database_forwards("test_separatedatabaseandstate", editor, project_state, new_state)
  1969. self.assertTableExists("i_love_ponies")
  1970. # And test reversal
  1971. self.assertTrue(operation.reversible)
  1972. with connection.schema_editor() as editor:
  1973. operation.database_backwards("test_separatedatabaseandstate", editor, new_state, project_state)
  1974. self.assertTableNotExists("i_love_ponies")
  1975. # And deconstruction
  1976. definition = operation.deconstruct()
  1977. self.assertEqual(definition[0], "SeparateDatabaseAndState")
  1978. self.assertEqual(definition[1], [])
  1979. self.assertEqual(sorted(definition[2]), ["database_operations", "state_operations"])
  1980. def test_separate_database_and_state2(self):
  1981. """
  1982. A complex SeparateDatabaseAndState operation: Multiple operations both
  1983. for state and database. Verify the state dependencies within each list
  1984. and that state ops don't affect the database.
  1985. """
  1986. app_label = "test_separatedatabaseandstate2"
  1987. project_state = self.set_up_test_model(app_label)
  1988. # Create the operation
  1989. database_operations = [
  1990. migrations.CreateModel(
  1991. "ILovePonies",
  1992. [("id", models.AutoField(primary_key=True))],
  1993. options={"db_table": "iloveponies"},
  1994. ),
  1995. migrations.CreateModel(
  1996. "ILoveMorePonies",
  1997. # We use IntegerField and not AutoField because
  1998. # the model is going to be deleted immediately
  1999. # and with an AutoField this fails on Oracle
  2000. [("id", models.IntegerField(primary_key=True))],
  2001. options={"db_table": "ilovemoreponies"},
  2002. ),
  2003. migrations.DeleteModel("ILoveMorePonies"),
  2004. migrations.CreateModel(
  2005. "ILoveEvenMorePonies",
  2006. [("id", models.AutoField(primary_key=True))],
  2007. options={"db_table": "iloveevenmoreponies"},
  2008. ),
  2009. ]
  2010. state_operations = [
  2011. migrations.CreateModel(
  2012. "SomethingElse",
  2013. [("id", models.AutoField(primary_key=True))],
  2014. options={"db_table": "somethingelse"},
  2015. ),
  2016. migrations.DeleteModel("SomethingElse"),
  2017. migrations.CreateModel(
  2018. "SomethingCompletelyDifferent",
  2019. [("id", models.AutoField(primary_key=True))],
  2020. options={"db_table": "somethingcompletelydifferent"},
  2021. ),
  2022. ]
  2023. operation = migrations.SeparateDatabaseAndState(
  2024. state_operations=state_operations,
  2025. database_operations=database_operations,
  2026. )
  2027. # Test the state alteration
  2028. new_state = project_state.clone()
  2029. operation.state_forwards(app_label, new_state)
  2030. def assertModelsAndTables(after_db):
  2031. # Check that tables and models exist, or don't, as they should:
  2032. self.assertNotIn((app_label, "somethingelse"), new_state.models)
  2033. self.assertEqual(len(new_state.models[app_label, "somethingcompletelydifferent"].fields), 1)
  2034. self.assertNotIn((app_label, "iloveponiesonies"), new_state.models)
  2035. self.assertNotIn((app_label, "ilovemoreponies"), new_state.models)
  2036. self.assertNotIn((app_label, "iloveevenmoreponies"), new_state.models)
  2037. self.assertTableNotExists("somethingelse")
  2038. self.assertTableNotExists("somethingcompletelydifferent")
  2039. self.assertTableNotExists("ilovemoreponies")
  2040. if after_db:
  2041. self.assertTableExists("iloveponies")
  2042. self.assertTableExists("iloveevenmoreponies")
  2043. else:
  2044. self.assertTableNotExists("iloveponies")
  2045. self.assertTableNotExists("iloveevenmoreponies")
  2046. assertModelsAndTables(after_db=False)
  2047. # Test the database alteration
  2048. with connection.schema_editor() as editor:
  2049. operation.database_forwards(app_label, editor, project_state, new_state)
  2050. assertModelsAndTables(after_db=True)
  2051. # And test reversal
  2052. self.assertTrue(operation.reversible)
  2053. with connection.schema_editor() as editor:
  2054. operation.database_backwards(app_label, editor, new_state, project_state)
  2055. assertModelsAndTables(after_db=False)
  2056. class SwappableOperationTests(OperationTestBase):
  2057. """
  2058. Tests that key operations ignore swappable models
  2059. (we don't want to replicate all of them here, as the functionality
  2060. is in a common base class anyway)
  2061. """
  2062. available_apps = [
  2063. "migrations",
  2064. "django.contrib.auth",
  2065. "django.contrib.contenttypes",
  2066. ]
  2067. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  2068. def test_create_ignore_swapped(self):
  2069. """
  2070. Tests that the CreateTable operation ignores swapped models.
  2071. """
  2072. operation = migrations.CreateModel(
  2073. "Pony",
  2074. [
  2075. ("id", models.AutoField(primary_key=True)),
  2076. ("pink", models.IntegerField(default=1)),
  2077. ],
  2078. options={
  2079. "swappable": "TEST_SWAP_MODEL",
  2080. },
  2081. )
  2082. # Test the state alteration (it should still be there!)
  2083. project_state = ProjectState()
  2084. new_state = project_state.clone()
  2085. operation.state_forwards("test_crigsw", new_state)
  2086. self.assertEqual(new_state.models["test_crigsw", "pony"].name, "Pony")
  2087. self.assertEqual(len(new_state.models["test_crigsw", "pony"].fields), 2)
  2088. # Test the database alteration
  2089. self.assertTableNotExists("test_crigsw_pony")
  2090. with connection.schema_editor() as editor:
  2091. operation.database_forwards("test_crigsw", editor, project_state, new_state)
  2092. self.assertTableNotExists("test_crigsw_pony")
  2093. # And test reversal
  2094. with connection.schema_editor() as editor:
  2095. operation.database_backwards("test_crigsw", editor, new_state, project_state)
  2096. self.assertTableNotExists("test_crigsw_pony")
  2097. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  2098. def test_delete_ignore_swapped(self):
  2099. """
  2100. Tests the DeleteModel operation ignores swapped models.
  2101. """
  2102. operation = migrations.DeleteModel("Pony")
  2103. project_state, new_state = self.make_test_state("test_dligsw", operation)
  2104. # Test the database alteration
  2105. self.assertTableNotExists("test_dligsw_pony")
  2106. with connection.schema_editor() as editor:
  2107. operation.database_forwards("test_dligsw", editor, project_state, new_state)
  2108. self.assertTableNotExists("test_dligsw_pony")
  2109. # And test reversal
  2110. with connection.schema_editor() as editor:
  2111. operation.database_backwards("test_dligsw", editor, new_state, project_state)
  2112. self.assertTableNotExists("test_dligsw_pony")
  2113. @override_settings(TEST_SWAP_MODEL="migrations.SomeFakeModel")
  2114. def test_add_field_ignore_swapped(self):
  2115. """
  2116. Tests the AddField operation.
  2117. """
  2118. # Test the state alteration
  2119. operation = migrations.AddField(
  2120. "Pony",
  2121. "height",
  2122. models.FloatField(null=True, default=5),
  2123. )
  2124. project_state, new_state = self.make_test_state("test_adfligsw", operation)
  2125. # Test the database alteration
  2126. self.assertTableNotExists("test_adfligsw_pony")
  2127. with connection.schema_editor() as editor:
  2128. operation.database_forwards("test_adfligsw", editor, project_state, new_state)
  2129. self.assertTableNotExists("test_adfligsw_pony")
  2130. # And test reversal
  2131. with connection.schema_editor() as editor:
  2132. operation.database_backwards("test_adfligsw", editor, new_state, project_state)
  2133. self.assertTableNotExists("test_adfligsw_pony")