test_operations.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. import unittest
  2. from django.db import connection, migrations, models, router
  3. from django.db.migrations.migration import Migration
  4. from django.db.migrations.state import ProjectState
  5. from django.db.models.fields import NOT_PROVIDED
  6. from django.db.transaction import atomic
  7. from django.db.utils import IntegrityError
  8. from .test_base import MigrationTestBase
  9. class OperationTests(MigrationTestBase):
  10. """
  11. Tests running the operations and making sure they do what they say they do.
  12. Each test looks at their state changing, and then their database operation -
  13. both forwards and backwards.
  14. """
  15. def apply_operations(self, app_label, project_state, operations):
  16. migration = Migration('name', app_label)
  17. migration.operations = operations
  18. with connection.schema_editor() as editor:
  19. return migration.apply(project_state, editor)
  20. def unapply_operations(self, app_label, project_state, operations):
  21. migration = Migration('name', app_label)
  22. migration.operations = operations
  23. with connection.schema_editor() as editor:
  24. return migration.unapply(project_state, editor)
  25. def set_up_test_model(self, app_label, second_model=False, related_model=False, mti_model=False):
  26. """
  27. Creates a test model state and database table.
  28. """
  29. # Delete the tables if they already exist
  30. with connection.cursor() as cursor:
  31. try:
  32. cursor.execute("DROP TABLE %s_pony" % app_label)
  33. except:
  34. pass
  35. try:
  36. cursor.execute("DROP TABLE %s_stable" % app_label)
  37. except:
  38. pass
  39. # Make the "current" state
  40. operations = [migrations.CreateModel(
  41. "Pony",
  42. [
  43. ("id", models.AutoField(primary_key=True)),
  44. ("pink", models.IntegerField(default=3)),
  45. ("weight", models.FloatField()),
  46. ],
  47. )]
  48. if second_model:
  49. operations.append(migrations.CreateModel(
  50. "Stable",
  51. [
  52. ("id", models.AutoField(primary_key=True)),
  53. ]
  54. ))
  55. if related_model:
  56. operations.append(migrations.CreateModel(
  57. "Rider",
  58. [
  59. ("id", models.AutoField(primary_key=True)),
  60. ("pony", models.ForeignKey("Pony")),
  61. ],
  62. ))
  63. if mti_model:
  64. operations.append(migrations.CreateModel(
  65. "ShetlandPony",
  66. fields=[
  67. ('pony_ptr', models.OneToOneField(
  68. auto_created=True,
  69. primary_key=True,
  70. to_field='id',
  71. serialize=False,
  72. to='Pony',
  73. )),
  74. ("cuteness", models.IntegerField(default=1)),
  75. ],
  76. bases=['%s.Pony' % app_label],
  77. ))
  78. return self.apply_operations(app_label, ProjectState(), operations)
  79. def test_create_model(self):
  80. """
  81. Tests the CreateModel operation.
  82. Most other tests use this operation as part of setup, so check failures here first.
  83. """
  84. operation = migrations.CreateModel(
  85. "Pony",
  86. [
  87. ("id", models.AutoField(primary_key=True)),
  88. ("pink", models.IntegerField(default=1)),
  89. ],
  90. )
  91. # Test the state alteration
  92. project_state = ProjectState()
  93. new_state = project_state.clone()
  94. operation.state_forwards("test_crmo", new_state)
  95. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  96. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  97. # Test the database alteration
  98. self.assertTableNotExists("test_crmo_pony")
  99. with connection.schema_editor() as editor:
  100. operation.database_forwards("test_crmo", editor, project_state, new_state)
  101. self.assertTableExists("test_crmo_pony")
  102. # And test reversal
  103. with connection.schema_editor() as editor:
  104. operation.database_backwards("test_crmo", editor, new_state, project_state)
  105. self.assertTableNotExists("test_crmo_pony")
  106. # And deconstruction
  107. definition = operation.deconstruct()
  108. self.assertEqual(definition[0], "CreateModel")
  109. self.assertEqual(len(definition[1]), 2)
  110. self.assertEqual(len(definition[2]), 0)
  111. self.assertEqual(definition[1][0], "Pony")
  112. def test_create_model_m2m(self):
  113. """
  114. Test the creation of a model with a ManyToMany field and the
  115. auto-created "through" model.
  116. """
  117. project_state = self.set_up_test_model("test_crmomm")
  118. operation = migrations.CreateModel(
  119. "Stable",
  120. [
  121. ("id", models.AutoField(primary_key=True)),
  122. ("ponies", models.ManyToManyField("Pony", related_name="stables"))
  123. ]
  124. )
  125. # Test the state alteration
  126. new_state = project_state.clone()
  127. operation.state_forwards("test_crmomm", new_state)
  128. # Test the database alteration
  129. self.assertTableNotExists("test_crmomm_stable_ponies")
  130. with connection.schema_editor() as editor:
  131. operation.database_forwards("test_crmomm", editor, project_state, new_state)
  132. self.assertTableExists("test_crmomm_stable")
  133. self.assertTableExists("test_crmomm_stable_ponies")
  134. self.assertColumnNotExists("test_crmomm_stable", "ponies")
  135. # Make sure the M2M field actually works
  136. with atomic():
  137. new_apps = new_state.render()
  138. Pony = new_apps.get_model("test_crmomm", "Pony")
  139. Stable = new_apps.get_model("test_crmomm", "Stable")
  140. stable = Stable.objects.create()
  141. p1 = Pony.objects.create(pink=False, weight=4.55)
  142. p2 = Pony.objects.create(pink=True, weight=5.43)
  143. stable.ponies.add(p1, p2)
  144. self.assertEqual(stable.ponies.count(), 2)
  145. stable.ponies.all().delete()
  146. # And test reversal
  147. with connection.schema_editor() as editor:
  148. operation.database_backwards("test_crmomm", editor, new_state, project_state)
  149. self.assertTableNotExists("test_crmomm_stable")
  150. self.assertTableNotExists("test_crmomm_stable_ponies")
  151. def test_create_model_inheritance(self):
  152. """
  153. Tests the CreateModel operation on a multi-table inheritance setup.
  154. """
  155. project_state = self.set_up_test_model("test_crmoih")
  156. # Test the state alteration
  157. operation = migrations.CreateModel(
  158. "ShetlandPony",
  159. [
  160. ('pony_ptr', models.OneToOneField(
  161. auto_created=True,
  162. primary_key=True,
  163. to_field='id',
  164. serialize=False,
  165. to='test_crmoih.Pony',
  166. )),
  167. ("cuteness", models.IntegerField(default=1)),
  168. ],
  169. )
  170. new_state = project_state.clone()
  171. operation.state_forwards("test_crmoih", new_state)
  172. self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
  173. # Test the database alteration
  174. self.assertTableNotExists("test_crmoih_shetlandpony")
  175. with connection.schema_editor() as editor:
  176. operation.database_forwards("test_crmoih", editor, project_state, new_state)
  177. self.assertTableExists("test_crmoih_shetlandpony")
  178. # And test reversal
  179. with connection.schema_editor() as editor:
  180. operation.database_backwards("test_crmoih", editor, new_state, project_state)
  181. self.assertTableNotExists("test_crmoih_shetlandpony")
  182. def test_delete_model(self):
  183. """
  184. Tests the DeleteModel operation.
  185. """
  186. project_state = self.set_up_test_model("test_dlmo")
  187. # Test the state alteration
  188. operation = migrations.DeleteModel("Pony")
  189. new_state = project_state.clone()
  190. operation.state_forwards("test_dlmo", new_state)
  191. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  192. # Test the database alteration
  193. self.assertTableExists("test_dlmo_pony")
  194. with connection.schema_editor() as editor:
  195. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  196. self.assertTableNotExists("test_dlmo_pony")
  197. # And test reversal
  198. with connection.schema_editor() as editor:
  199. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  200. self.assertTableExists("test_dlmo_pony")
  201. def test_rename_model(self):
  202. """
  203. Tests the RenameModel operation.
  204. """
  205. project_state = self.set_up_test_model("test_rnmo")
  206. # Test the state alteration
  207. operation = migrations.RenameModel("Pony", "Horse")
  208. new_state = project_state.clone()
  209. operation.state_forwards("test_rnmo", new_state)
  210. self.assertNotIn(("test_rnmo", "pony"), new_state.models)
  211. self.assertIn(("test_rnmo", "horse"), new_state.models)
  212. # Test the database alteration
  213. self.assertTableExists("test_rnmo_pony")
  214. self.assertTableNotExists("test_rnmo_horse")
  215. with connection.schema_editor() as editor:
  216. operation.database_forwards("test_rnmo", editor, project_state, new_state)
  217. self.assertTableNotExists("test_rnmo_pony")
  218. self.assertTableExists("test_rnmo_horse")
  219. # And test reversal
  220. with connection.schema_editor() as editor:
  221. operation.database_backwards("test_rnmo", editor, new_state, project_state)
  222. self.assertTableExists("test_dlmo_pony")
  223. self.assertTableNotExists("test_rnmo_horse")
  224. # See #22248 - this will fail until that's fixed.
  225. #
  226. # def test_rename_model_with_related(self):
  227. # """
  228. # Tests the real-world combo of a RenameModel operation with AlterField
  229. # for a related field.
  230. # """
  231. # project_state = self.set_up_test_model(
  232. # "test_rnmowr", related_model=True)
  233. # # Test the state alterations
  234. # model_operation = migrations.RenameModel("Pony", "Horse")
  235. # new_state = project_state.clone()
  236. # model_operation.state_forwards("test_rnmowr", new_state)
  237. # self.assertNotIn(("test_rnmowr", "pony"), new_state.models)
  238. # self.assertIn(("test_rnmowr", "horse"), new_state.models)
  239. # self.assertEqual(
  240. # "Pony",
  241. # project_state.render().get_model("test_rnmowr", "rider")
  242. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  243. # field_operation = migrations.AlterField(
  244. # "Rider", "pony", models.ForeignKey("Horse"))
  245. # field_operation.state_forwards("test_rnmowr", new_state)
  246. # self.assertEqual(
  247. # "Horse",
  248. # new_state.render().get_model("test_rnmowr", "rider")
  249. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  250. # # Test the database alterations
  251. # self.assertTableExists("test_rnmowr_pony")
  252. # self.assertTableNotExists("test_rnmowr_horse")
  253. # with connection.schema_editor() as editor:
  254. # model_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  255. # field_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  256. # self.assertTableNotExists("test_rnmowr_pony")
  257. # self.assertTableExists("test_rnmowr_horse")
  258. # # And test reversal
  259. # with connection.schema_editor() as editor:
  260. # field_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  261. # model_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  262. # self.assertTableExists("test_rnmowr_pony")
  263. # self.assertTableNotExists("test_rnmowr_horse")
  264. def test_add_field(self):
  265. """
  266. Tests the AddField operation.
  267. """
  268. project_state = self.set_up_test_model("test_adfl")
  269. # Test the state alteration
  270. operation = migrations.AddField(
  271. "Pony",
  272. "height",
  273. models.FloatField(null=True, default=5),
  274. )
  275. new_state = project_state.clone()
  276. operation.state_forwards("test_adfl", new_state)
  277. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  278. field = [
  279. f for n, f in new_state.models["test_adfl", "pony"].fields
  280. if n == "height"
  281. ][0]
  282. self.assertEqual(field.default, 5)
  283. # Test the database alteration
  284. self.assertColumnNotExists("test_adfl_pony", "height")
  285. with connection.schema_editor() as editor:
  286. operation.database_forwards("test_adfl", editor, project_state, new_state)
  287. self.assertColumnExists("test_adfl_pony", "height")
  288. # And test reversal
  289. with connection.schema_editor() as editor:
  290. operation.database_backwards("test_adfl", editor, new_state, project_state)
  291. self.assertColumnNotExists("test_adfl_pony", "height")
  292. def test_column_name_quoting(self):
  293. """
  294. Column names that are SQL keywords shouldn't cause problems when used
  295. in migrations (#22168).
  296. """
  297. project_state = self.set_up_test_model("test_regr22168")
  298. operation = migrations.AddField(
  299. "Pony",
  300. "order",
  301. models.IntegerField(default=0),
  302. )
  303. new_state = project_state.clone()
  304. operation.state_forwards("test_regr22168", new_state)
  305. with connection.schema_editor() as editor:
  306. operation.database_forwards("test_regr22168", editor, project_state, new_state)
  307. self.assertColumnExists("test_regr22168_pony", "order")
  308. def test_add_field_preserve_default(self):
  309. """
  310. Tests the AddField operation's state alteration
  311. when preserve_default = False.
  312. """
  313. project_state = self.set_up_test_model("test_adflpd")
  314. # Test the state alteration
  315. operation = migrations.AddField(
  316. "Pony",
  317. "height",
  318. models.FloatField(null=True, default=4),
  319. preserve_default=False,
  320. )
  321. new_state = project_state.clone()
  322. operation.state_forwards("test_adflpd", new_state)
  323. self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
  324. field = [
  325. f for n, f in new_state.models["test_adflpd", "pony"].fields
  326. if n == "height"
  327. ][0]
  328. self.assertEqual(field.default, NOT_PROVIDED)
  329. # Test the database alteration
  330. project_state.render().get_model("test_adflpd", "pony").objects.create(
  331. weight=4,
  332. )
  333. self.assertColumnNotExists("test_adflpd_pony", "height")
  334. with connection.schema_editor() as editor:
  335. operation.database_forwards("test_adflpd", editor, project_state, new_state)
  336. self.assertColumnExists("test_adflpd_pony", "height")
  337. def test_add_field_m2m(self):
  338. """
  339. Tests the AddField operation with a ManyToManyField.
  340. """
  341. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  342. # Test the state alteration
  343. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  344. new_state = project_state.clone()
  345. operation.state_forwards("test_adflmm", new_state)
  346. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  347. # Test the database alteration
  348. self.assertTableNotExists("test_adflmm_pony_stables")
  349. with connection.schema_editor() as editor:
  350. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  351. self.assertTableExists("test_adflmm_pony_stables")
  352. self.assertColumnNotExists("test_adflmm_pony", "stables")
  353. # Make sure the M2M field actually works
  354. with atomic():
  355. new_apps = new_state.render()
  356. Pony = new_apps.get_model("test_adflmm", "Pony")
  357. p = Pony.objects.create(pink=False, weight=4.55)
  358. p.stables.create()
  359. self.assertEqual(p.stables.count(), 1)
  360. p.stables.all().delete()
  361. # And test reversal
  362. with connection.schema_editor() as editor:
  363. operation.database_backwards("test_adflmm", editor, new_state, project_state)
  364. self.assertTableNotExists("test_adflmm_pony_stables")
  365. def test_alter_field_m2m(self):
  366. project_state = self.set_up_test_model("test_alflmm", second_model=True)
  367. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  368. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  369. ])
  370. new_apps = project_state.render()
  371. Pony = new_apps.get_model("test_alflmm", "Pony")
  372. self.assertFalse(Pony._meta.get_field('stables').blank)
  373. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  374. migrations.AlterField("Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True))
  375. ])
  376. new_apps = project_state.render()
  377. Pony = new_apps.get_model("test_alflmm", "Pony")
  378. self.assertTrue(Pony._meta.get_field('stables').blank)
  379. def test_remove_field_m2m(self):
  380. project_state = self.set_up_test_model("test_rmflmm", second_model=True)
  381. project_state = self.apply_operations("test_rmflmm", project_state, operations=[
  382. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  383. ])
  384. self.assertTableExists("test_rmflmm_pony_stables")
  385. operations = [migrations.RemoveField("Pony", "stables")]
  386. self.apply_operations("test_rmflmm", project_state, operations=operations)
  387. self.assertTableNotExists("test_rmflmm_pony_stables")
  388. # And test reversal
  389. self.unapply_operations("test_rmflmm", project_state, operations=operations)
  390. self.assertTableExists("test_rmflmm_pony_stables")
  391. def test_remove_field_m2m_with_through(self):
  392. project_state = self.set_up_test_model("test_rmflmmwt", second_model=True)
  393. self.assertTableNotExists("test_rmflmmwt_ponystables")
  394. project_state = self.apply_operations("test_rmflmmwt", project_state, operations=[
  395. migrations.CreateModel("PonyStables", fields=[
  396. ("pony", models.ForeignKey('test_rmflmmwt.Pony')),
  397. ("stable", models.ForeignKey('test_rmflmmwt.Stable')),
  398. ]),
  399. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies", through='test_rmflmmwt.PonyStables'))
  400. ])
  401. self.assertTableExists("test_rmflmmwt_ponystables")
  402. operations = [migrations.RemoveField("Pony", "stables")]
  403. self.apply_operations("test_rmflmmwt", project_state, operations=operations)
  404. def test_remove_field(self):
  405. """
  406. Tests the RemoveField operation.
  407. """
  408. project_state = self.set_up_test_model("test_rmfl")
  409. # Test the state alteration
  410. operation = migrations.RemoveField("Pony", "pink")
  411. new_state = project_state.clone()
  412. operation.state_forwards("test_rmfl", new_state)
  413. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  414. # Test the database alteration
  415. self.assertColumnExists("test_rmfl_pony", "pink")
  416. with connection.schema_editor() as editor:
  417. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  418. self.assertColumnNotExists("test_rmfl_pony", "pink")
  419. # And test reversal
  420. with connection.schema_editor() as editor:
  421. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  422. self.assertColumnExists("test_rmfl_pony", "pink")
  423. def test_remove_fk(self):
  424. """
  425. Tests the RemoveField operation on a foreign key.
  426. """
  427. project_state = self.set_up_test_model("test_rfk", related_model=True)
  428. self.assertColumnExists("test_rfk_rider", "pony_id")
  429. operation = migrations.RemoveField("Rider", "pony")
  430. new_state = project_state.clone()
  431. operation.state_forwards("test_rfk", new_state)
  432. with connection.schema_editor() as editor:
  433. operation.database_forwards("test_rfk", editor, project_state, new_state)
  434. self.assertColumnNotExists("test_rfk_rider", "pony_id")
  435. with connection.schema_editor() as editor:
  436. operation.database_backwards("test_rfk", editor, new_state, project_state)
  437. self.assertColumnExists("test_rfk_rider", "pony_id")
  438. def test_alter_model_table(self):
  439. """
  440. Tests the AlterModelTable operation.
  441. """
  442. project_state = self.set_up_test_model("test_almota")
  443. # Test the state alteration
  444. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  445. new_state = project_state.clone()
  446. operation.state_forwards("test_almota", new_state)
  447. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
  448. # Test the database alteration
  449. self.assertTableExists("test_almota_pony")
  450. self.assertTableNotExists("test_almota_pony_2")
  451. with connection.schema_editor() as editor:
  452. operation.database_forwards("test_almota", editor, project_state, new_state)
  453. self.assertTableNotExists("test_almota_pony")
  454. self.assertTableExists("test_almota_pony_2")
  455. # And test reversal
  456. with connection.schema_editor() as editor:
  457. operation.database_backwards("test_almota", editor, new_state, project_state)
  458. self.assertTableExists("test_almota_pony")
  459. self.assertTableNotExists("test_almota_pony_2")
  460. def test_alter_field(self):
  461. """
  462. Tests the AlterField operation.
  463. """
  464. project_state = self.set_up_test_model("test_alfl")
  465. # Test the state alteration
  466. operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
  467. new_state = project_state.clone()
  468. operation.state_forwards("test_alfl", new_state)
  469. self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
  470. self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
  471. # Test the database alteration
  472. self.assertColumnNotNull("test_alfl_pony", "pink")
  473. with connection.schema_editor() as editor:
  474. operation.database_forwards("test_alfl", editor, project_state, new_state)
  475. self.assertColumnNull("test_alfl_pony", "pink")
  476. # And test reversal
  477. with connection.schema_editor() as editor:
  478. operation.database_backwards("test_alfl", editor, new_state, project_state)
  479. self.assertColumnNotNull("test_alfl_pony", "pink")
  480. def test_alter_field_pk(self):
  481. """
  482. Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness)
  483. """
  484. project_state = self.set_up_test_model("test_alflpk")
  485. # Test the state alteration
  486. operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True))
  487. new_state = project_state.clone()
  488. operation.state_forwards("test_alflpk", new_state)
  489. self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField)
  490. self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField)
  491. # Test the database alteration
  492. with connection.schema_editor() as editor:
  493. operation.database_forwards("test_alflpk", editor, project_state, new_state)
  494. # And test reversal
  495. with connection.schema_editor() as editor:
  496. operation.database_backwards("test_alflpk", editor, new_state, project_state)
  497. @unittest.skipUnless(connection.features.supports_foreign_keys, "No FK support")
  498. def test_alter_field_pk_fk(self):
  499. """
  500. Tests the AlterField operation on primary keys changes any FKs pointing to it.
  501. """
  502. project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
  503. # Test the state alteration
  504. operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True))
  505. new_state = project_state.clone()
  506. operation.state_forwards("test_alflpkfk", new_state)
  507. self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField)
  508. self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField)
  509. def assertIdTypeEqualsFkType():
  510. with connection.cursor() as cursor:
  511. id_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony") if c.name == "id"][0]
  512. fk_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider") if c.name == "pony_id"][0]
  513. self.assertEqual(id_type, fk_type)
  514. assertIdTypeEqualsFkType()
  515. # Test the database alteration
  516. with connection.schema_editor() as editor:
  517. operation.database_forwards("test_alflpkfk", editor, project_state, new_state)
  518. assertIdTypeEqualsFkType()
  519. # And test reversal
  520. with connection.schema_editor() as editor:
  521. operation.database_backwards("test_alflpkfk", editor, new_state, project_state)
  522. assertIdTypeEqualsFkType()
  523. def test_rename_field(self):
  524. """
  525. Tests the RenameField operation.
  526. """
  527. project_state = self.set_up_test_model("test_rnfl")
  528. # Test the state alteration
  529. operation = migrations.RenameField("Pony", "pink", "blue")
  530. new_state = project_state.clone()
  531. operation.state_forwards("test_rnfl", new_state)
  532. self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  533. self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  534. # Test the database alteration
  535. self.assertColumnExists("test_rnfl_pony", "pink")
  536. self.assertColumnNotExists("test_rnfl_pony", "blue")
  537. with connection.schema_editor() as editor:
  538. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  539. self.assertColumnExists("test_rnfl_pony", "blue")
  540. self.assertColumnNotExists("test_rnfl_pony", "pink")
  541. # And test reversal
  542. with connection.schema_editor() as editor:
  543. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  544. self.assertColumnExists("test_rnfl_pony", "pink")
  545. self.assertColumnNotExists("test_rnfl_pony", "blue")
  546. def test_alter_unique_together(self):
  547. """
  548. Tests the AlterUniqueTogether operation.
  549. """
  550. project_state = self.set_up_test_model("test_alunto")
  551. # Test the state alteration
  552. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  553. new_state = project_state.clone()
  554. operation.state_forwards("test_alunto", new_state)
  555. self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
  556. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  557. # Make sure we can insert duplicate rows
  558. with connection.cursor() as cursor:
  559. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  560. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  561. cursor.execute("DELETE FROM test_alunto_pony")
  562. # Test the database alteration
  563. with connection.schema_editor() as editor:
  564. operation.database_forwards("test_alunto", editor, project_state, new_state)
  565. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  566. with self.assertRaises(IntegrityError):
  567. with atomic():
  568. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  569. cursor.execute("DELETE FROM test_alunto_pony")
  570. # And test reversal
  571. with connection.schema_editor() as editor:
  572. operation.database_backwards("test_alunto", editor, new_state, project_state)
  573. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  574. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  575. cursor.execute("DELETE FROM test_alunto_pony")
  576. # Test flat unique_together
  577. operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
  578. operation.state_forwards("test_alunto", new_state)
  579. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  580. def test_alter_index_together(self):
  581. """
  582. Tests the AlterIndexTogether operation.
  583. """
  584. project_state = self.set_up_test_model("test_alinto")
  585. # Test the state alteration
  586. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  587. new_state = project_state.clone()
  588. operation.state_forwards("test_alinto", new_state)
  589. self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
  590. self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
  591. # Make sure there's no matching index
  592. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  593. # Test the database alteration
  594. with connection.schema_editor() as editor:
  595. operation.database_forwards("test_alinto", editor, project_state, new_state)
  596. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  597. # And test reversal
  598. with connection.schema_editor() as editor:
  599. operation.database_backwards("test_alinto", editor, new_state, project_state)
  600. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  601. def test_run_sql(self):
  602. """
  603. Tests the RunSQL operation.
  604. """
  605. project_state = self.set_up_test_model("test_runsql")
  606. # Create the operation
  607. operation = migrations.RunSQL(
  608. "CREATE TABLE i_love_ponies (id int, special_thing int)",
  609. "DROP TABLE i_love_ponies",
  610. state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])],
  611. )
  612. # Test the state alteration
  613. new_state = project_state.clone()
  614. operation.state_forwards("test_runsql", new_state)
  615. self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1)
  616. # Make sure there's no table
  617. self.assertTableNotExists("i_love_ponies")
  618. # Test the database alteration
  619. with connection.schema_editor() as editor:
  620. operation.database_forwards("test_runsql", editor, project_state, new_state)
  621. self.assertTableExists("i_love_ponies")
  622. # And test reversal
  623. self.assertTrue(operation.reversible)
  624. with connection.schema_editor() as editor:
  625. operation.database_backwards("test_runsql", editor, new_state, project_state)
  626. self.assertTableNotExists("i_love_ponies")
  627. def test_run_python(self):
  628. """
  629. Tests the RunPython operation
  630. """
  631. project_state = self.set_up_test_model("test_runpython", mti_model=True)
  632. # Create the operation
  633. def inner_method(models, schema_editor):
  634. Pony = models.get_model("test_runpython", "Pony")
  635. Pony.objects.create(pink=1, weight=3.55)
  636. Pony.objects.create(weight=5)
  637. def inner_method_reverse(models, schema_editor):
  638. Pony = models.get_model("test_runpython", "Pony")
  639. Pony.objects.filter(pink=1, weight=3.55).delete()
  640. Pony.objects.filter(weight=5).delete()
  641. operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse)
  642. # Test the state alteration does nothing
  643. new_state = project_state.clone()
  644. operation.state_forwards("test_runpython", new_state)
  645. self.assertEqual(new_state, project_state)
  646. # Test the database alteration
  647. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  648. with connection.schema_editor() as editor:
  649. operation.database_forwards("test_runpython", editor, project_state, new_state)
  650. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  651. # Now test reversal
  652. self.assertTrue(operation.reversible)
  653. with connection.schema_editor() as editor:
  654. operation.database_backwards("test_runpython", editor, project_state, new_state)
  655. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  656. # Now test we can't use a string
  657. with self.assertRaises(ValueError):
  658. operation = migrations.RunPython("print 'ahahaha'")
  659. # Also test reversal fails, with an operation identical to above but without reverse_code set
  660. no_reverse_operation = migrations.RunPython(inner_method)
  661. self.assertFalse(no_reverse_operation.reversible)
  662. with connection.schema_editor() as editor:
  663. no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state)
  664. with self.assertRaises(NotImplementedError):
  665. no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state)
  666. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  667. def create_ponies(models, schema_editor):
  668. Pony = models.get_model("test_runpython", "Pony")
  669. pony1 = Pony.objects.create(pink=1, weight=3.55)
  670. self.assertIsNot(pony1.pk, None)
  671. pony2 = Pony.objects.create(weight=5)
  672. self.assertIsNot(pony2.pk, None)
  673. self.assertNotEqual(pony1.pk, pony2.pk)
  674. operation = migrations.RunPython(create_ponies)
  675. with connection.schema_editor() as editor:
  676. operation.database_forwards("test_runpython", editor, project_state, new_state)
  677. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 4)
  678. def create_shetlandponies(models, schema_editor):
  679. ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
  680. pony1 = ShetlandPony.objects.create(weight=4.0)
  681. self.assertIsNot(pony1.pk, None)
  682. pony2 = ShetlandPony.objects.create(weight=5.0)
  683. self.assertIsNot(pony2.pk, None)
  684. self.assertNotEqual(pony1.pk, pony2.pk)
  685. operation = migrations.RunPython(create_shetlandponies)
  686. with connection.schema_editor() as editor:
  687. operation.database_forwards("test_runpython", editor, project_state, new_state)
  688. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 6)
  689. self.assertEqual(project_state.render().get_model("test_runpython", "ShetlandPony").objects.count(), 2)
  690. class MigrateNothingRouter(object):
  691. """
  692. A router that sends all writes to the other database.
  693. """
  694. def allow_migrate(self, db, model):
  695. return False
  696. class MultiDBOperationTests(MigrationTestBase):
  697. multi_db = True
  698. def setUp(self):
  699. # Make the 'other' database appear to be a slave of the 'default'
  700. self.old_routers = router.routers
  701. router.routers = [MigrateNothingRouter()]
  702. def tearDown(self):
  703. # Restore the 'other' database as an independent database
  704. router.routers = self.old_routers
  705. def test_create_model(self):
  706. """
  707. Tests that CreateModel honours multi-db settings.
  708. """
  709. operation = migrations.CreateModel(
  710. "Pony",
  711. [
  712. ("id", models.AutoField(primary_key=True)),
  713. ("pink", models.IntegerField(default=1)),
  714. ],
  715. )
  716. # Test the state alteration
  717. project_state = ProjectState()
  718. new_state = project_state.clone()
  719. operation.state_forwards("test_crmo", new_state)
  720. # Test the database alteration
  721. self.assertTableNotExists("test_crmo_pony")
  722. with connection.schema_editor() as editor:
  723. operation.database_forwards("test_crmo", editor, project_state, new_state)
  724. self.assertTableNotExists("test_crmo_pony")
  725. # And test reversal
  726. with connection.schema_editor() as editor:
  727. operation.database_backwards("test_crmo", editor, new_state, project_state)
  728. self.assertTableNotExists("test_crmo_pony")