test_operations.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799
  1. import unittest
  2. try:
  3. import sqlparse
  4. except ImportError:
  5. sqlparse = None
  6. from django.db import connection, migrations, models, router
  7. from django.db.migrations.migration import Migration
  8. from django.db.migrations.state import ProjectState
  9. from django.db.models.fields import NOT_PROVIDED
  10. from django.db.transaction import atomic
  11. from django.db.utils import IntegrityError, DatabaseError
  12. from .test_base import MigrationTestBase
  13. class OperationTests(MigrationTestBase):
  14. """
  15. Tests running the operations and making sure they do what they say they do.
  16. Each test looks at their state changing, and then their database operation -
  17. both forwards and backwards.
  18. """
  19. def apply_operations(self, app_label, project_state, operations):
  20. migration = Migration('name', app_label)
  21. migration.operations = operations
  22. with connection.schema_editor() as editor:
  23. return migration.apply(project_state, editor)
  24. def unapply_operations(self, app_label, project_state, operations):
  25. migration = Migration('name', app_label)
  26. migration.operations = operations
  27. with connection.schema_editor() as editor:
  28. return migration.unapply(project_state, editor)
  29. def set_up_test_model(self, app_label, second_model=False, related_model=False, mti_model=False):
  30. """
  31. Creates a test model state and database table.
  32. """
  33. # Delete the tables if they already exist
  34. with connection.cursor() as cursor:
  35. try:
  36. cursor.execute("DROP TABLE %s_pony" % app_label)
  37. except DatabaseError:
  38. pass
  39. try:
  40. cursor.execute("DROP TABLE %s_stable" % app_label)
  41. except DatabaseError:
  42. pass
  43. # Make the "current" state
  44. operations = [migrations.CreateModel(
  45. "Pony",
  46. [
  47. ("id", models.AutoField(primary_key=True)),
  48. ("pink", models.IntegerField(default=3)),
  49. ("weight", models.FloatField()),
  50. ],
  51. )]
  52. if second_model:
  53. operations.append(migrations.CreateModel(
  54. "Stable",
  55. [
  56. ("id", models.AutoField(primary_key=True)),
  57. ]
  58. ))
  59. if related_model:
  60. operations.append(migrations.CreateModel(
  61. "Rider",
  62. [
  63. ("id", models.AutoField(primary_key=True)),
  64. ("pony", models.ForeignKey("Pony")),
  65. ],
  66. ))
  67. if mti_model:
  68. operations.append(migrations.CreateModel(
  69. "ShetlandPony",
  70. fields=[
  71. ('pony_ptr', models.OneToOneField(
  72. auto_created=True,
  73. primary_key=True,
  74. to_field='id',
  75. serialize=False,
  76. to='Pony',
  77. )),
  78. ("cuteness", models.IntegerField(default=1)),
  79. ],
  80. bases=['%s.Pony' % app_label],
  81. ))
  82. return self.apply_operations(app_label, ProjectState(), operations)
  83. def test_create_model(self):
  84. """
  85. Tests the CreateModel operation.
  86. Most other tests use this operation as part of setup, so check failures here first.
  87. """
  88. operation = migrations.CreateModel(
  89. "Pony",
  90. [
  91. ("id", models.AutoField(primary_key=True)),
  92. ("pink", models.IntegerField(default=1)),
  93. ],
  94. )
  95. # Test the state alteration
  96. project_state = ProjectState()
  97. new_state = project_state.clone()
  98. operation.state_forwards("test_crmo", new_state)
  99. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  100. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  101. # Test the database alteration
  102. self.assertTableNotExists("test_crmo_pony")
  103. with connection.schema_editor() as editor:
  104. operation.database_forwards("test_crmo", editor, project_state, new_state)
  105. self.assertTableExists("test_crmo_pony")
  106. # And test reversal
  107. with connection.schema_editor() as editor:
  108. operation.database_backwards("test_crmo", editor, new_state, project_state)
  109. self.assertTableNotExists("test_crmo_pony")
  110. # And deconstruction
  111. definition = operation.deconstruct()
  112. self.assertEqual(definition[0], "CreateModel")
  113. self.assertEqual(len(definition[1]), 2)
  114. self.assertEqual(len(definition[2]), 0)
  115. self.assertEqual(definition[1][0], "Pony")
  116. def test_create_model_m2m(self):
  117. """
  118. Test the creation of a model with a ManyToMany field and the
  119. auto-created "through" model.
  120. """
  121. project_state = self.set_up_test_model("test_crmomm")
  122. operation = migrations.CreateModel(
  123. "Stable",
  124. [
  125. ("id", models.AutoField(primary_key=True)),
  126. ("ponies", models.ManyToManyField("Pony", related_name="stables"))
  127. ]
  128. )
  129. # Test the state alteration
  130. new_state = project_state.clone()
  131. operation.state_forwards("test_crmomm", new_state)
  132. # Test the database alteration
  133. self.assertTableNotExists("test_crmomm_stable_ponies")
  134. with connection.schema_editor() as editor:
  135. operation.database_forwards("test_crmomm", editor, project_state, new_state)
  136. self.assertTableExists("test_crmomm_stable")
  137. self.assertTableExists("test_crmomm_stable_ponies")
  138. self.assertColumnNotExists("test_crmomm_stable", "ponies")
  139. # Make sure the M2M field actually works
  140. with atomic():
  141. new_apps = new_state.render()
  142. Pony = new_apps.get_model("test_crmomm", "Pony")
  143. Stable = new_apps.get_model("test_crmomm", "Stable")
  144. stable = Stable.objects.create()
  145. p1 = Pony.objects.create(pink=False, weight=4.55)
  146. p2 = Pony.objects.create(pink=True, weight=5.43)
  147. stable.ponies.add(p1, p2)
  148. self.assertEqual(stable.ponies.count(), 2)
  149. stable.ponies.all().delete()
  150. # And test reversal
  151. with connection.schema_editor() as editor:
  152. operation.database_backwards("test_crmomm", editor, new_state, project_state)
  153. self.assertTableNotExists("test_crmomm_stable")
  154. self.assertTableNotExists("test_crmomm_stable_ponies")
  155. def test_create_model_inheritance(self):
  156. """
  157. Tests the CreateModel operation on a multi-table inheritance setup.
  158. """
  159. project_state = self.set_up_test_model("test_crmoih")
  160. # Test the state alteration
  161. operation = migrations.CreateModel(
  162. "ShetlandPony",
  163. [
  164. ('pony_ptr', models.OneToOneField(
  165. auto_created=True,
  166. primary_key=True,
  167. to_field='id',
  168. serialize=False,
  169. to='test_crmoih.Pony',
  170. )),
  171. ("cuteness", models.IntegerField(default=1)),
  172. ],
  173. )
  174. new_state = project_state.clone()
  175. operation.state_forwards("test_crmoih", new_state)
  176. self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
  177. # Test the database alteration
  178. self.assertTableNotExists("test_crmoih_shetlandpony")
  179. with connection.schema_editor() as editor:
  180. operation.database_forwards("test_crmoih", editor, project_state, new_state)
  181. self.assertTableExists("test_crmoih_shetlandpony")
  182. # And test reversal
  183. with connection.schema_editor() as editor:
  184. operation.database_backwards("test_crmoih", editor, new_state, project_state)
  185. self.assertTableNotExists("test_crmoih_shetlandpony")
  186. def test_delete_model(self):
  187. """
  188. Tests the DeleteModel operation.
  189. """
  190. project_state = self.set_up_test_model("test_dlmo")
  191. # Test the state alteration
  192. operation = migrations.DeleteModel("Pony")
  193. new_state = project_state.clone()
  194. operation.state_forwards("test_dlmo", new_state)
  195. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  196. # Test the database alteration
  197. self.assertTableExists("test_dlmo_pony")
  198. with connection.schema_editor() as editor:
  199. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  200. self.assertTableNotExists("test_dlmo_pony")
  201. # And test reversal
  202. with connection.schema_editor() as editor:
  203. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  204. self.assertTableExists("test_dlmo_pony")
  205. def test_rename_model(self):
  206. """
  207. Tests the RenameModel operation.
  208. """
  209. project_state = self.set_up_test_model("test_rnmo")
  210. # Test the state alteration
  211. operation = migrations.RenameModel("Pony", "Horse")
  212. new_state = project_state.clone()
  213. operation.state_forwards("test_rnmo", new_state)
  214. self.assertNotIn(("test_rnmo", "pony"), new_state.models)
  215. self.assertIn(("test_rnmo", "horse"), new_state.models)
  216. # Test the database alteration
  217. self.assertTableExists("test_rnmo_pony")
  218. self.assertTableNotExists("test_rnmo_horse")
  219. with connection.schema_editor() as editor:
  220. operation.database_forwards("test_rnmo", editor, project_state, new_state)
  221. self.assertTableNotExists("test_rnmo_pony")
  222. self.assertTableExists("test_rnmo_horse")
  223. # And test reversal
  224. with connection.schema_editor() as editor:
  225. operation.database_backwards("test_rnmo", editor, new_state, project_state)
  226. self.assertTableExists("test_dlmo_pony")
  227. self.assertTableNotExists("test_rnmo_horse")
  228. # See #22248 - this will fail until that's fixed.
  229. #
  230. # def test_rename_model_with_related(self):
  231. # """
  232. # Tests the real-world combo of a RenameModel operation with AlterField
  233. # for a related field.
  234. # """
  235. # project_state = self.set_up_test_model(
  236. # "test_rnmowr", related_model=True)
  237. # # Test the state alterations
  238. # model_operation = migrations.RenameModel("Pony", "Horse")
  239. # new_state = project_state.clone()
  240. # model_operation.state_forwards("test_rnmowr", new_state)
  241. # self.assertNotIn(("test_rnmowr", "pony"), new_state.models)
  242. # self.assertIn(("test_rnmowr", "horse"), new_state.models)
  243. # self.assertEqual(
  244. # "Pony",
  245. # project_state.render().get_model("test_rnmowr", "rider")
  246. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  247. # field_operation = migrations.AlterField(
  248. # "Rider", "pony", models.ForeignKey("Horse"))
  249. # field_operation.state_forwards("test_rnmowr", new_state)
  250. # self.assertEqual(
  251. # "Horse",
  252. # new_state.render().get_model("test_rnmowr", "rider")
  253. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  254. # # Test the database alterations
  255. # self.assertTableExists("test_rnmowr_pony")
  256. # self.assertTableNotExists("test_rnmowr_horse")
  257. # with connection.schema_editor() as editor:
  258. # model_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  259. # field_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  260. # self.assertTableNotExists("test_rnmowr_pony")
  261. # self.assertTableExists("test_rnmowr_horse")
  262. # # And test reversal
  263. # with connection.schema_editor() as editor:
  264. # field_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  265. # model_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  266. # self.assertTableExists("test_rnmowr_pony")
  267. # self.assertTableNotExists("test_rnmowr_horse")
  268. def test_add_field(self):
  269. """
  270. Tests the AddField operation.
  271. """
  272. project_state = self.set_up_test_model("test_adfl")
  273. # Test the state alteration
  274. operation = migrations.AddField(
  275. "Pony",
  276. "height",
  277. models.FloatField(null=True, default=5),
  278. )
  279. new_state = project_state.clone()
  280. operation.state_forwards("test_adfl", new_state)
  281. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  282. field = [
  283. f for n, f in new_state.models["test_adfl", "pony"].fields
  284. if n == "height"
  285. ][0]
  286. self.assertEqual(field.default, 5)
  287. # Test the database alteration
  288. self.assertColumnNotExists("test_adfl_pony", "height")
  289. with connection.schema_editor() as editor:
  290. operation.database_forwards("test_adfl", editor, project_state, new_state)
  291. self.assertColumnExists("test_adfl_pony", "height")
  292. # And test reversal
  293. with connection.schema_editor() as editor:
  294. operation.database_backwards("test_adfl", editor, new_state, project_state)
  295. self.assertColumnNotExists("test_adfl_pony", "height")
  296. def test_column_name_quoting(self):
  297. """
  298. Column names that are SQL keywords shouldn't cause problems when used
  299. in migrations (#22168).
  300. """
  301. project_state = self.set_up_test_model("test_regr22168")
  302. operation = migrations.AddField(
  303. "Pony",
  304. "order",
  305. models.IntegerField(default=0),
  306. )
  307. new_state = project_state.clone()
  308. operation.state_forwards("test_regr22168", new_state)
  309. with connection.schema_editor() as editor:
  310. operation.database_forwards("test_regr22168", editor, project_state, new_state)
  311. self.assertColumnExists("test_regr22168_pony", "order")
  312. def test_add_field_preserve_default(self):
  313. """
  314. Tests the AddField operation's state alteration
  315. when preserve_default = False.
  316. """
  317. project_state = self.set_up_test_model("test_adflpd")
  318. # Test the state alteration
  319. operation = migrations.AddField(
  320. "Pony",
  321. "height",
  322. models.FloatField(null=True, default=4),
  323. preserve_default=False,
  324. )
  325. new_state = project_state.clone()
  326. operation.state_forwards("test_adflpd", new_state)
  327. self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
  328. field = [
  329. f for n, f in new_state.models["test_adflpd", "pony"].fields
  330. if n == "height"
  331. ][0]
  332. self.assertEqual(field.default, NOT_PROVIDED)
  333. # Test the database alteration
  334. project_state.render().get_model("test_adflpd", "pony").objects.create(
  335. weight=4,
  336. )
  337. self.assertColumnNotExists("test_adflpd_pony", "height")
  338. with connection.schema_editor() as editor:
  339. operation.database_forwards("test_adflpd", editor, project_state, new_state)
  340. self.assertColumnExists("test_adflpd_pony", "height")
  341. def test_add_field_m2m(self):
  342. """
  343. Tests the AddField operation with a ManyToManyField.
  344. """
  345. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  346. # Test the state alteration
  347. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  348. new_state = project_state.clone()
  349. operation.state_forwards("test_adflmm", new_state)
  350. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  351. # Test the database alteration
  352. self.assertTableNotExists("test_adflmm_pony_stables")
  353. with connection.schema_editor() as editor:
  354. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  355. self.assertTableExists("test_adflmm_pony_stables")
  356. self.assertColumnNotExists("test_adflmm_pony", "stables")
  357. # Make sure the M2M field actually works
  358. with atomic():
  359. new_apps = new_state.render()
  360. Pony = new_apps.get_model("test_adflmm", "Pony")
  361. p = Pony.objects.create(pink=False, weight=4.55)
  362. p.stables.create()
  363. self.assertEqual(p.stables.count(), 1)
  364. p.stables.all().delete()
  365. # And test reversal
  366. with connection.schema_editor() as editor:
  367. operation.database_backwards("test_adflmm", editor, new_state, project_state)
  368. self.assertTableNotExists("test_adflmm_pony_stables")
  369. def test_alter_field_m2m(self):
  370. project_state = self.set_up_test_model("test_alflmm", second_model=True)
  371. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  372. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  373. ])
  374. new_apps = project_state.render()
  375. Pony = new_apps.get_model("test_alflmm", "Pony")
  376. self.assertFalse(Pony._meta.get_field('stables').blank)
  377. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  378. migrations.AlterField("Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True))
  379. ])
  380. new_apps = project_state.render()
  381. Pony = new_apps.get_model("test_alflmm", "Pony")
  382. self.assertTrue(Pony._meta.get_field('stables').blank)
  383. def test_remove_field_m2m(self):
  384. project_state = self.set_up_test_model("test_rmflmm", second_model=True)
  385. project_state = self.apply_operations("test_rmflmm", project_state, operations=[
  386. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  387. ])
  388. self.assertTableExists("test_rmflmm_pony_stables")
  389. operations = [migrations.RemoveField("Pony", "stables")]
  390. self.apply_operations("test_rmflmm", project_state, operations=operations)
  391. self.assertTableNotExists("test_rmflmm_pony_stables")
  392. # And test reversal
  393. self.unapply_operations("test_rmflmm", project_state, operations=operations)
  394. self.assertTableExists("test_rmflmm_pony_stables")
  395. def test_remove_field_m2m_with_through(self):
  396. project_state = self.set_up_test_model("test_rmflmmwt", second_model=True)
  397. self.assertTableNotExists("test_rmflmmwt_ponystables")
  398. project_state = self.apply_operations("test_rmflmmwt", project_state, operations=[
  399. migrations.CreateModel("PonyStables", fields=[
  400. ("pony", models.ForeignKey('test_rmflmmwt.Pony')),
  401. ("stable", models.ForeignKey('test_rmflmmwt.Stable')),
  402. ]),
  403. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies", through='test_rmflmmwt.PonyStables'))
  404. ])
  405. self.assertTableExists("test_rmflmmwt_ponystables")
  406. operations = [migrations.RemoveField("Pony", "stables")]
  407. self.apply_operations("test_rmflmmwt", project_state, operations=operations)
  408. def test_remove_field(self):
  409. """
  410. Tests the RemoveField operation.
  411. """
  412. project_state = self.set_up_test_model("test_rmfl")
  413. # Test the state alteration
  414. operation = migrations.RemoveField("Pony", "pink")
  415. new_state = project_state.clone()
  416. operation.state_forwards("test_rmfl", new_state)
  417. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  418. # Test the database alteration
  419. self.assertColumnExists("test_rmfl_pony", "pink")
  420. with connection.schema_editor() as editor:
  421. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  422. self.assertColumnNotExists("test_rmfl_pony", "pink")
  423. # And test reversal
  424. with connection.schema_editor() as editor:
  425. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  426. self.assertColumnExists("test_rmfl_pony", "pink")
  427. def test_remove_fk(self):
  428. """
  429. Tests the RemoveField operation on a foreign key.
  430. """
  431. project_state = self.set_up_test_model("test_rfk", related_model=True)
  432. self.assertColumnExists("test_rfk_rider", "pony_id")
  433. operation = migrations.RemoveField("Rider", "pony")
  434. new_state = project_state.clone()
  435. operation.state_forwards("test_rfk", new_state)
  436. with connection.schema_editor() as editor:
  437. operation.database_forwards("test_rfk", editor, project_state, new_state)
  438. self.assertColumnNotExists("test_rfk_rider", "pony_id")
  439. with connection.schema_editor() as editor:
  440. operation.database_backwards("test_rfk", editor, new_state, project_state)
  441. self.assertColumnExists("test_rfk_rider", "pony_id")
  442. def test_alter_model_table(self):
  443. """
  444. Tests the AlterModelTable operation.
  445. """
  446. project_state = self.set_up_test_model("test_almota")
  447. # Test the state alteration
  448. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  449. new_state = project_state.clone()
  450. operation.state_forwards("test_almota", new_state)
  451. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
  452. # Test the database alteration
  453. self.assertTableExists("test_almota_pony")
  454. self.assertTableNotExists("test_almota_pony_2")
  455. with connection.schema_editor() as editor:
  456. operation.database_forwards("test_almota", editor, project_state, new_state)
  457. self.assertTableNotExists("test_almota_pony")
  458. self.assertTableExists("test_almota_pony_2")
  459. # And test reversal
  460. with connection.schema_editor() as editor:
  461. operation.database_backwards("test_almota", editor, new_state, project_state)
  462. self.assertTableExists("test_almota_pony")
  463. self.assertTableNotExists("test_almota_pony_2")
  464. def test_alter_field(self):
  465. """
  466. Tests the AlterField operation.
  467. """
  468. project_state = self.set_up_test_model("test_alfl")
  469. # Test the state alteration
  470. operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
  471. new_state = project_state.clone()
  472. operation.state_forwards("test_alfl", new_state)
  473. self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
  474. self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
  475. # Test the database alteration
  476. self.assertColumnNotNull("test_alfl_pony", "pink")
  477. with connection.schema_editor() as editor:
  478. operation.database_forwards("test_alfl", editor, project_state, new_state)
  479. self.assertColumnNull("test_alfl_pony", "pink")
  480. # And test reversal
  481. with connection.schema_editor() as editor:
  482. operation.database_backwards("test_alfl", editor, new_state, project_state)
  483. self.assertColumnNotNull("test_alfl_pony", "pink")
  484. def test_alter_field_pk(self):
  485. """
  486. Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness)
  487. """
  488. project_state = self.set_up_test_model("test_alflpk")
  489. # Test the state alteration
  490. operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True))
  491. new_state = project_state.clone()
  492. operation.state_forwards("test_alflpk", new_state)
  493. self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField)
  494. self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField)
  495. # Test the database alteration
  496. with connection.schema_editor() as editor:
  497. operation.database_forwards("test_alflpk", editor, project_state, new_state)
  498. # And test reversal
  499. with connection.schema_editor() as editor:
  500. operation.database_backwards("test_alflpk", editor, new_state, project_state)
  501. @unittest.skipUnless(connection.features.supports_foreign_keys, "No FK support")
  502. def test_alter_field_pk_fk(self):
  503. """
  504. Tests the AlterField operation on primary keys changes any FKs pointing to it.
  505. """
  506. project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
  507. # Test the state alteration
  508. operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True))
  509. new_state = project_state.clone()
  510. operation.state_forwards("test_alflpkfk", new_state)
  511. self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField)
  512. self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField)
  513. def assertIdTypeEqualsFkType():
  514. with connection.cursor() as cursor:
  515. id_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony") if c.name == "id"][0]
  516. fk_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider") if c.name == "pony_id"][0]
  517. self.assertEqual(id_type, fk_type)
  518. assertIdTypeEqualsFkType()
  519. # Test the database alteration
  520. with connection.schema_editor() as editor:
  521. operation.database_forwards("test_alflpkfk", editor, project_state, new_state)
  522. assertIdTypeEqualsFkType()
  523. # And test reversal
  524. with connection.schema_editor() as editor:
  525. operation.database_backwards("test_alflpkfk", editor, new_state, project_state)
  526. assertIdTypeEqualsFkType()
  527. def test_rename_field(self):
  528. """
  529. Tests the RenameField operation.
  530. """
  531. project_state = self.set_up_test_model("test_rnfl")
  532. # Test the state alteration
  533. operation = migrations.RenameField("Pony", "pink", "blue")
  534. new_state = project_state.clone()
  535. operation.state_forwards("test_rnfl", new_state)
  536. self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  537. self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  538. # Test the database alteration
  539. self.assertColumnExists("test_rnfl_pony", "pink")
  540. self.assertColumnNotExists("test_rnfl_pony", "blue")
  541. with connection.schema_editor() as editor:
  542. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  543. self.assertColumnExists("test_rnfl_pony", "blue")
  544. self.assertColumnNotExists("test_rnfl_pony", "pink")
  545. # And test reversal
  546. with connection.schema_editor() as editor:
  547. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  548. self.assertColumnExists("test_rnfl_pony", "pink")
  549. self.assertColumnNotExists("test_rnfl_pony", "blue")
  550. def test_alter_unique_together(self):
  551. """
  552. Tests the AlterUniqueTogether operation.
  553. """
  554. project_state = self.set_up_test_model("test_alunto")
  555. # Test the state alteration
  556. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  557. new_state = project_state.clone()
  558. operation.state_forwards("test_alunto", new_state)
  559. self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
  560. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  561. # Make sure we can insert duplicate rows
  562. with connection.cursor() as cursor:
  563. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  564. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  565. cursor.execute("DELETE FROM test_alunto_pony")
  566. # Test the database alteration
  567. with connection.schema_editor() as editor:
  568. operation.database_forwards("test_alunto", editor, project_state, new_state)
  569. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  570. with self.assertRaises(IntegrityError):
  571. with atomic():
  572. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  573. cursor.execute("DELETE FROM test_alunto_pony")
  574. # And test reversal
  575. with connection.schema_editor() as editor:
  576. operation.database_backwards("test_alunto", editor, new_state, project_state)
  577. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  578. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  579. cursor.execute("DELETE FROM test_alunto_pony")
  580. # Test flat unique_together
  581. operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
  582. operation.state_forwards("test_alunto", new_state)
  583. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  584. def test_alter_index_together(self):
  585. """
  586. Tests the AlterIndexTogether operation.
  587. """
  588. project_state = self.set_up_test_model("test_alinto")
  589. # Test the state alteration
  590. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  591. new_state = project_state.clone()
  592. operation.state_forwards("test_alinto", new_state)
  593. self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
  594. self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
  595. # Make sure there's no matching index
  596. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  597. # Test the database alteration
  598. with connection.schema_editor() as editor:
  599. operation.database_forwards("test_alinto", editor, project_state, new_state)
  600. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  601. # And test reversal
  602. with connection.schema_editor() as editor:
  603. operation.database_backwards("test_alinto", editor, new_state, project_state)
  604. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  605. @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
  606. def test_run_sql(self):
  607. """
  608. Tests the RunSQL operation.
  609. """
  610. project_state = self.set_up_test_model("test_runsql")
  611. # Create the operation
  612. operation = migrations.RunSQL(
  613. # Use a multi-line string with a commment to test splitting on SQLite and MySQL respectively
  614. "CREATE TABLE i_love_ponies (id int, special_thing int);\n"
  615. "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 42); -- this is magic!\n"
  616. "INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 51);\n",
  617. "DROP TABLE i_love_ponies",
  618. state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])],
  619. )
  620. # Test the state alteration
  621. new_state = project_state.clone()
  622. operation.state_forwards("test_runsql", new_state)
  623. self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1)
  624. # Make sure there's no table
  625. self.assertTableNotExists("i_love_ponies")
  626. # Test the database alteration
  627. with connection.schema_editor() as editor:
  628. operation.database_forwards("test_runsql", editor, project_state, new_state)
  629. self.assertTableExists("i_love_ponies")
  630. # Make sure all the SQL was processed
  631. with connection.cursor() as cursor:
  632. cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
  633. self.assertEqual(cursor.fetchall()[0][0], 2)
  634. # And test reversal
  635. self.assertTrue(operation.reversible)
  636. with connection.schema_editor() as editor:
  637. operation.database_backwards("test_runsql", editor, new_state, project_state)
  638. self.assertTableNotExists("i_love_ponies")
  639. def test_run_python(self):
  640. """
  641. Tests the RunPython operation
  642. """
  643. project_state = self.set_up_test_model("test_runpython", mti_model=True)
  644. # Create the operation
  645. def inner_method(models, schema_editor):
  646. Pony = models.get_model("test_runpython", "Pony")
  647. Pony.objects.create(pink=1, weight=3.55)
  648. Pony.objects.create(weight=5)
  649. def inner_method_reverse(models, schema_editor):
  650. Pony = models.get_model("test_runpython", "Pony")
  651. Pony.objects.filter(pink=1, weight=3.55).delete()
  652. Pony.objects.filter(weight=5).delete()
  653. operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse)
  654. # Test the state alteration does nothing
  655. new_state = project_state.clone()
  656. operation.state_forwards("test_runpython", new_state)
  657. self.assertEqual(new_state, project_state)
  658. # Test the database alteration
  659. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  660. with connection.schema_editor() as editor:
  661. operation.database_forwards("test_runpython", editor, project_state, new_state)
  662. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  663. # Now test reversal
  664. self.assertTrue(operation.reversible)
  665. with connection.schema_editor() as editor:
  666. operation.database_backwards("test_runpython", editor, project_state, new_state)
  667. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  668. # Now test we can't use a string
  669. with self.assertRaises(ValueError):
  670. operation = migrations.RunPython("print 'ahahaha'")
  671. # Also test reversal fails, with an operation identical to above but without reverse_code set
  672. no_reverse_operation = migrations.RunPython(inner_method)
  673. self.assertFalse(no_reverse_operation.reversible)
  674. with connection.schema_editor() as editor:
  675. no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state)
  676. with self.assertRaises(NotImplementedError):
  677. no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state)
  678. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  679. def create_ponies(models, schema_editor):
  680. Pony = models.get_model("test_runpython", "Pony")
  681. pony1 = Pony.objects.create(pink=1, weight=3.55)
  682. self.assertIsNot(pony1.pk, None)
  683. pony2 = Pony.objects.create(weight=5)
  684. self.assertIsNot(pony2.pk, None)
  685. self.assertNotEqual(pony1.pk, pony2.pk)
  686. operation = migrations.RunPython(create_ponies)
  687. with connection.schema_editor() as editor:
  688. operation.database_forwards("test_runpython", editor, project_state, new_state)
  689. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 4)
  690. def create_shetlandponies(models, schema_editor):
  691. ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
  692. pony1 = ShetlandPony.objects.create(weight=4.0)
  693. self.assertIsNot(pony1.pk, None)
  694. pony2 = ShetlandPony.objects.create(weight=5.0)
  695. self.assertIsNot(pony2.pk, None)
  696. self.assertNotEqual(pony1.pk, pony2.pk)
  697. operation = migrations.RunPython(create_shetlandponies)
  698. with connection.schema_editor() as editor:
  699. operation.database_forwards("test_runpython", editor, project_state, new_state)
  700. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 6)
  701. self.assertEqual(project_state.render().get_model("test_runpython", "ShetlandPony").objects.count(), 2)
  702. class MigrateNothingRouter(object):
  703. """
  704. A router that sends all writes to the other database.
  705. """
  706. def allow_migrate(self, db, model):
  707. return False
  708. class MultiDBOperationTests(MigrationTestBase):
  709. multi_db = True
  710. def setUp(self):
  711. # Make the 'other' database appear to be a follower of the 'default'
  712. self.old_routers = router.routers
  713. router.routers = [MigrateNothingRouter()]
  714. def tearDown(self):
  715. # Restore the 'other' database as an independent database
  716. router.routers = self.old_routers
  717. def test_create_model(self):
  718. """
  719. Tests that CreateModel honours multi-db settings.
  720. """
  721. operation = migrations.CreateModel(
  722. "Pony",
  723. [
  724. ("id", models.AutoField(primary_key=True)),
  725. ("pink", models.IntegerField(default=1)),
  726. ],
  727. )
  728. # Test the state alteration
  729. project_state = ProjectState()
  730. new_state = project_state.clone()
  731. operation.state_forwards("test_crmo", new_state)
  732. # Test the database alteration
  733. self.assertTableNotExists("test_crmo_pony")
  734. with connection.schema_editor() as editor:
  735. operation.database_forwards("test_crmo", editor, project_state, new_state)
  736. self.assertTableNotExists("test_crmo_pony")
  737. # And test reversal
  738. with connection.schema_editor() as editor:
  739. operation.database_backwards("test_crmo", editor, new_state, project_state)
  740. self.assertTableNotExists("test_crmo_pony")