test_operations.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750
  1. import unittest
  2. from django.db import connection, models, migrations, router
  3. from django.db.models.fields import NOT_PROVIDED
  4. from django.db.transaction import atomic
  5. from django.db.utils import IntegrityError
  6. from django.db.migrations.state import ProjectState
  7. from .test_base import MigrationTestBase
  8. class OperationTests(MigrationTestBase):
  9. """
  10. Tests running the operations and making sure they do what they say they do.
  11. Each test looks at their state changing, and then their database operation -
  12. both forwards and backwards.
  13. """
  14. def apply_operations(self, app_label, project_state, operations):
  15. new_state = project_state.clone()
  16. for operation in operations:
  17. operation.state_forwards(app_label, new_state)
  18. # Set up the database
  19. with connection.schema_editor() as editor:
  20. for operation in operations:
  21. operation.database_forwards(app_label, editor, project_state, new_state)
  22. return new_state
  23. def set_up_test_model(self, app_label, second_model=False, related_model=False, mti_model=False):
  24. """
  25. Creates a test model state and database table.
  26. """
  27. # Delete the tables if they already exist
  28. with connection.cursor() as cursor:
  29. try:
  30. cursor.execute("DROP TABLE %s_pony" % app_label)
  31. except:
  32. pass
  33. try:
  34. cursor.execute("DROP TABLE %s_stable" % app_label)
  35. except:
  36. pass
  37. # Make the "current" state
  38. operations = [migrations.CreateModel(
  39. "Pony",
  40. [
  41. ("id", models.AutoField(primary_key=True)),
  42. ("pink", models.IntegerField(default=3)),
  43. ("weight", models.FloatField()),
  44. ],
  45. )]
  46. if second_model:
  47. operations.append(migrations.CreateModel(
  48. "Stable",
  49. [
  50. ("id", models.AutoField(primary_key=True)),
  51. ]
  52. ))
  53. if related_model:
  54. operations.append(migrations.CreateModel(
  55. "Rider",
  56. [
  57. ("id", models.AutoField(primary_key=True)),
  58. ("pony", models.ForeignKey("Pony")),
  59. ],
  60. ))
  61. if mti_model:
  62. operations.append(migrations.CreateModel(
  63. "ShetlandPony",
  64. fields=[
  65. ('pony_ptr', models.OneToOneField(
  66. auto_created=True,
  67. primary_key=True,
  68. to_field='id',
  69. serialize=False,
  70. to='Pony',
  71. )),
  72. ("cuteness", models.IntegerField(default=1)),
  73. ],
  74. bases=['%s.Pony' % app_label],
  75. ))
  76. return self.apply_operations(app_label, ProjectState(), operations)
  77. def test_create_model(self):
  78. """
  79. Tests the CreateModel operation.
  80. Most other tests use this operation as part of setup, so check failures here first.
  81. """
  82. operation = migrations.CreateModel(
  83. "Pony",
  84. [
  85. ("id", models.AutoField(primary_key=True)),
  86. ("pink", models.IntegerField(default=1)),
  87. ],
  88. )
  89. # Test the state alteration
  90. project_state = ProjectState()
  91. new_state = project_state.clone()
  92. operation.state_forwards("test_crmo", new_state)
  93. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  94. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  95. # Test the database alteration
  96. self.assertTableNotExists("test_crmo_pony")
  97. with connection.schema_editor() as editor:
  98. operation.database_forwards("test_crmo", editor, project_state, new_state)
  99. self.assertTableExists("test_crmo_pony")
  100. # And test reversal
  101. with connection.schema_editor() as editor:
  102. operation.database_backwards("test_crmo", editor, new_state, project_state)
  103. self.assertTableNotExists("test_crmo_pony")
  104. # And deconstruction
  105. definition = operation.deconstruct()
  106. self.assertEqual(definition[0], "CreateModel")
  107. self.assertEqual(len(definition[1]), 2)
  108. self.assertEqual(len(definition[2]), 0)
  109. self.assertEqual(definition[1][0], "Pony")
  110. def test_create_model_m2m(self):
  111. """
  112. Test the creation of a model with a ManyToMany field and the
  113. auto-created "through" model.
  114. """
  115. project_state = self.set_up_test_model("test_crmomm")
  116. operation = migrations.CreateModel(
  117. "Stable",
  118. [
  119. ("id", models.AutoField(primary_key=True)),
  120. ("ponies", models.ManyToManyField("Pony", related_name="stables"))
  121. ]
  122. )
  123. # Test the state alteration
  124. new_state = project_state.clone()
  125. operation.state_forwards("test_crmomm", new_state)
  126. # Test the database alteration
  127. self.assertTableNotExists("test_crmomm_stable_ponies")
  128. with connection.schema_editor() as editor:
  129. operation.database_forwards("test_crmomm", editor, project_state, new_state)
  130. self.assertTableExists("test_crmomm_stable")
  131. self.assertTableExists("test_crmomm_stable_ponies")
  132. self.assertColumnNotExists("test_crmomm_stable", "ponies")
  133. # Make sure the M2M field actually works
  134. with atomic():
  135. new_apps = new_state.render()
  136. Pony = new_apps.get_model("test_crmomm", "Pony")
  137. Stable = new_apps.get_model("test_crmomm", "Stable")
  138. stable = Stable.objects.create()
  139. p1 = Pony.objects.create(pink=False, weight=4.55)
  140. p2 = Pony.objects.create(pink=True, weight=5.43)
  141. stable.ponies.add(p1, p2)
  142. self.assertEqual(stable.ponies.count(), 2)
  143. stable.ponies.all().delete()
  144. # And test reversal
  145. with connection.schema_editor() as editor:
  146. operation.database_backwards("test_crmomm", editor, new_state, project_state)
  147. self.assertTableNotExists("test_crmomm_stable")
  148. self.assertTableNotExists("test_crmomm_stable_ponies")
  149. def test_create_model_inheritance(self):
  150. """
  151. Tests the CreateModel operation on a multi-table inheritance setup.
  152. """
  153. project_state = self.set_up_test_model("test_crmoih")
  154. # Test the state alteration
  155. operation = migrations.CreateModel(
  156. "ShetlandPony",
  157. [
  158. ('pony_ptr', models.OneToOneField(
  159. auto_created=True,
  160. primary_key=True,
  161. to_field='id',
  162. serialize=False,
  163. to='test_crmoih.Pony',
  164. )),
  165. ("cuteness", models.IntegerField(default=1)),
  166. ],
  167. )
  168. new_state = project_state.clone()
  169. operation.state_forwards("test_crmoih", new_state)
  170. self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
  171. # Test the database alteration
  172. self.assertTableNotExists("test_crmoih_shetlandpony")
  173. with connection.schema_editor() as editor:
  174. operation.database_forwards("test_crmoih", editor, project_state, new_state)
  175. self.assertTableExists("test_crmoih_shetlandpony")
  176. # And test reversal
  177. with connection.schema_editor() as editor:
  178. operation.database_backwards("test_crmoih", editor, new_state, project_state)
  179. self.assertTableNotExists("test_crmoih_shetlandpony")
  180. def test_delete_model(self):
  181. """
  182. Tests the DeleteModel operation.
  183. """
  184. project_state = self.set_up_test_model("test_dlmo")
  185. # Test the state alteration
  186. operation = migrations.DeleteModel("Pony")
  187. new_state = project_state.clone()
  188. operation.state_forwards("test_dlmo", new_state)
  189. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  190. # Test the database alteration
  191. self.assertTableExists("test_dlmo_pony")
  192. with connection.schema_editor() as editor:
  193. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  194. self.assertTableNotExists("test_dlmo_pony")
  195. # And test reversal
  196. with connection.schema_editor() as editor:
  197. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  198. self.assertTableExists("test_dlmo_pony")
  199. def test_rename_model(self):
  200. """
  201. Tests the RenameModel operation.
  202. """
  203. project_state = self.set_up_test_model("test_rnmo")
  204. # Test the state alteration
  205. operation = migrations.RenameModel("Pony", "Horse")
  206. new_state = project_state.clone()
  207. operation.state_forwards("test_rnmo", new_state)
  208. self.assertNotIn(("test_rnmo", "pony"), new_state.models)
  209. self.assertIn(("test_rnmo", "horse"), new_state.models)
  210. # Test the database alteration
  211. self.assertTableExists("test_rnmo_pony")
  212. self.assertTableNotExists("test_rnmo_horse")
  213. with connection.schema_editor() as editor:
  214. operation.database_forwards("test_rnmo", editor, project_state, new_state)
  215. self.assertTableNotExists("test_rnmo_pony")
  216. self.assertTableExists("test_rnmo_horse")
  217. # And test reversal
  218. with connection.schema_editor() as editor:
  219. operation.database_backwards("test_rnmo", editor, new_state, project_state)
  220. self.assertTableExists("test_dlmo_pony")
  221. self.assertTableNotExists("test_rnmo_horse")
  222. # See #22248 - this will fail until that's fixed.
  223. #
  224. # def test_rename_model_with_related(self):
  225. # """
  226. # Tests the real-world combo of a RenameModel operation with AlterField
  227. # for a related field.
  228. # """
  229. # project_state = self.set_up_test_model(
  230. # "test_rnmowr", related_model=True)
  231. # # Test the state alterations
  232. # model_operation = migrations.RenameModel("Pony", "Horse")
  233. # new_state = project_state.clone()
  234. # model_operation.state_forwards("test_rnmowr", new_state)
  235. # self.assertNotIn(("test_rnmowr", "pony"), new_state.models)
  236. # self.assertIn(("test_rnmowr", "horse"), new_state.models)
  237. # self.assertEqual(
  238. # "Pony",
  239. # project_state.render().get_model("test_rnmowr", "rider")
  240. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  241. # field_operation = migrations.AlterField(
  242. # "Rider", "pony", models.ForeignKey("Horse"))
  243. # field_operation.state_forwards("test_rnmowr", new_state)
  244. # self.assertEqual(
  245. # "Horse",
  246. # new_state.render().get_model("test_rnmowr", "rider")
  247. # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
  248. # # Test the database alterations
  249. # self.assertTableExists("test_rnmowr_pony")
  250. # self.assertTableNotExists("test_rnmowr_horse")
  251. # with connection.schema_editor() as editor:
  252. # model_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  253. # field_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
  254. # self.assertTableNotExists("test_rnmowr_pony")
  255. # self.assertTableExists("test_rnmowr_horse")
  256. # # And test reversal
  257. # with connection.schema_editor() as editor:
  258. # field_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  259. # model_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
  260. # self.assertTableExists("test_rnmowr_pony")
  261. # self.assertTableNotExists("test_rnmowr_horse")
  262. def test_add_field(self):
  263. """
  264. Tests the AddField operation.
  265. """
  266. project_state = self.set_up_test_model("test_adfl")
  267. # Test the state alteration
  268. operation = migrations.AddField(
  269. "Pony",
  270. "height",
  271. models.FloatField(null=True, default=5),
  272. )
  273. new_state = project_state.clone()
  274. operation.state_forwards("test_adfl", new_state)
  275. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  276. field = [
  277. f for n, f in new_state.models["test_adfl", "pony"].fields
  278. if n == "height"
  279. ][0]
  280. self.assertEqual(field.default, 5)
  281. # Test the database alteration
  282. self.assertColumnNotExists("test_adfl_pony", "height")
  283. with connection.schema_editor() as editor:
  284. operation.database_forwards("test_adfl", editor, project_state, new_state)
  285. self.assertColumnExists("test_adfl_pony", "height")
  286. # And test reversal
  287. with connection.schema_editor() as editor:
  288. operation.database_backwards("test_adfl", editor, new_state, project_state)
  289. self.assertColumnNotExists("test_adfl_pony", "height")
  290. def test_column_name_quoting(self):
  291. """
  292. Column names that are SQL keywords shouldn't cause problems when used
  293. in migrations (#22168).
  294. """
  295. project_state = self.set_up_test_model("test_regr22168")
  296. operation = migrations.AddField(
  297. "Pony",
  298. "order",
  299. models.IntegerField(default=0),
  300. )
  301. new_state = project_state.clone()
  302. operation.state_forwards("test_regr22168", new_state)
  303. with connection.schema_editor() as editor:
  304. operation.database_forwards("test_regr22168", editor, project_state, new_state)
  305. self.assertColumnExists("test_regr22168_pony", "order")
  306. def test_add_field_preserve_default(self):
  307. """
  308. Tests the AddField operation's state alteration
  309. when preserve_default = False.
  310. """
  311. project_state = self.set_up_test_model("test_adflpd")
  312. # Test the state alteration
  313. operation = migrations.AddField(
  314. "Pony",
  315. "height",
  316. models.FloatField(null=True, default=4),
  317. preserve_default=False,
  318. )
  319. new_state = project_state.clone()
  320. operation.state_forwards("test_adflpd", new_state)
  321. self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
  322. field = [
  323. f for n, f in new_state.models["test_adflpd", "pony"].fields
  324. if n == "height"
  325. ][0]
  326. self.assertEqual(field.default, NOT_PROVIDED)
  327. # Test the database alteration
  328. project_state.render().get_model("test_adflpd", "pony").objects.create(
  329. weight=4,
  330. )
  331. self.assertColumnNotExists("test_adflpd_pony", "height")
  332. with connection.schema_editor() as editor:
  333. operation.database_forwards("test_adflpd", editor, project_state, new_state)
  334. self.assertColumnExists("test_adflpd_pony", "height")
  335. def test_add_field_m2m(self):
  336. """
  337. Tests the AddField operation with a ManyToManyField.
  338. """
  339. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  340. # Test the state alteration
  341. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  342. new_state = project_state.clone()
  343. operation.state_forwards("test_adflmm", new_state)
  344. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  345. # Test the database alteration
  346. self.assertTableNotExists("test_adflmm_pony_stables")
  347. with connection.schema_editor() as editor:
  348. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  349. self.assertTableExists("test_adflmm_pony_stables")
  350. self.assertColumnNotExists("test_adflmm_pony", "stables")
  351. # Make sure the M2M field actually works
  352. with atomic():
  353. new_apps = new_state.render()
  354. Pony = new_apps.get_model("test_adflmm", "Pony")
  355. p = Pony.objects.create(pink=False, weight=4.55)
  356. p.stables.create()
  357. self.assertEqual(p.stables.count(), 1)
  358. p.stables.all().delete()
  359. # And test reversal
  360. with connection.schema_editor() as editor:
  361. operation.database_backwards("test_adflmm", editor, new_state, project_state)
  362. self.assertTableNotExists("test_adflmm_pony_stables")
  363. def test_alter_field_m2m(self):
  364. project_state = self.set_up_test_model("test_alflmm", second_model=True)
  365. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  366. migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
  367. ])
  368. new_apps = project_state.render()
  369. Pony = new_apps.get_model("test_alflmm", "Pony")
  370. self.assertFalse(Pony._meta.get_field('stables').blank)
  371. project_state = self.apply_operations("test_alflmm", project_state, operations=[
  372. migrations.AlterField("Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True))
  373. ])
  374. new_apps = project_state.render()
  375. Pony = new_apps.get_model("test_alflmm", "Pony")
  376. self.assertTrue(Pony._meta.get_field('stables').blank)
  377. def test_remove_field(self):
  378. """
  379. Tests the RemoveField operation.
  380. """
  381. project_state = self.set_up_test_model("test_rmfl")
  382. # Test the state alteration
  383. operation = migrations.RemoveField("Pony", "pink")
  384. new_state = project_state.clone()
  385. operation.state_forwards("test_rmfl", new_state)
  386. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  387. # Test the database alteration
  388. self.assertColumnExists("test_rmfl_pony", "pink")
  389. with connection.schema_editor() as editor:
  390. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  391. self.assertColumnNotExists("test_rmfl_pony", "pink")
  392. # And test reversal
  393. with connection.schema_editor() as editor:
  394. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  395. self.assertColumnExists("test_rmfl_pony", "pink")
  396. def test_remove_fk(self):
  397. """
  398. Tests the RemoveField operation on a foreign key.
  399. """
  400. project_state = self.set_up_test_model("test_rfk", related_model=True)
  401. self.assertColumnExists("test_rfk_rider", "pony_id")
  402. operation = migrations.RemoveField("Rider", "pony")
  403. new_state = project_state.clone()
  404. operation.state_forwards("test_rfk", new_state)
  405. with connection.schema_editor() as editor:
  406. operation.database_forwards("test_rfk", editor, project_state, new_state)
  407. self.assertColumnNotExists("test_rfk_rider", "pony_id")
  408. with connection.schema_editor() as editor:
  409. operation.database_backwards("test_rfk", editor, new_state, project_state)
  410. self.assertColumnExists("test_rfk_rider", "pony_id")
  411. def test_alter_model_table(self):
  412. """
  413. Tests the AlterModelTable operation.
  414. """
  415. project_state = self.set_up_test_model("test_almota")
  416. # Test the state alteration
  417. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  418. new_state = project_state.clone()
  419. operation.state_forwards("test_almota", new_state)
  420. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
  421. # Test the database alteration
  422. self.assertTableExists("test_almota_pony")
  423. self.assertTableNotExists("test_almota_pony_2")
  424. with connection.schema_editor() as editor:
  425. operation.database_forwards("test_almota", editor, project_state, new_state)
  426. self.assertTableNotExists("test_almota_pony")
  427. self.assertTableExists("test_almota_pony_2")
  428. # And test reversal
  429. with connection.schema_editor() as editor:
  430. operation.database_backwards("test_almota", editor, new_state, project_state)
  431. self.assertTableExists("test_almota_pony")
  432. self.assertTableNotExists("test_almota_pony_2")
  433. def test_alter_field(self):
  434. """
  435. Tests the AlterField operation.
  436. """
  437. project_state = self.set_up_test_model("test_alfl")
  438. # Test the state alteration
  439. operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
  440. new_state = project_state.clone()
  441. operation.state_forwards("test_alfl", new_state)
  442. self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
  443. self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
  444. # Test the database alteration
  445. self.assertColumnNotNull("test_alfl_pony", "pink")
  446. with connection.schema_editor() as editor:
  447. operation.database_forwards("test_alfl", editor, project_state, new_state)
  448. self.assertColumnNull("test_alfl_pony", "pink")
  449. # And test reversal
  450. with connection.schema_editor() as editor:
  451. operation.database_backwards("test_alfl", editor, new_state, project_state)
  452. self.assertColumnNotNull("test_alfl_pony", "pink")
  453. def test_alter_field_pk(self):
  454. """
  455. Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness)
  456. """
  457. project_state = self.set_up_test_model("test_alflpk")
  458. # Test the state alteration
  459. operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True))
  460. new_state = project_state.clone()
  461. operation.state_forwards("test_alflpk", new_state)
  462. self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField)
  463. self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField)
  464. # Test the database alteration
  465. with connection.schema_editor() as editor:
  466. operation.database_forwards("test_alflpk", editor, project_state, new_state)
  467. # And test reversal
  468. with connection.schema_editor() as editor:
  469. operation.database_backwards("test_alflpk", editor, new_state, project_state)
  470. @unittest.skipUnless(connection.features.supports_foreign_keys, "No FK support")
  471. def test_alter_field_pk_fk(self):
  472. """
  473. Tests the AlterField operation on primary keys changes any FKs pointing to it.
  474. """
  475. project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
  476. # Test the state alteration
  477. operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True))
  478. new_state = project_state.clone()
  479. operation.state_forwards("test_alflpkfk", new_state)
  480. self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField)
  481. self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField)
  482. def assertIdTypeEqualsFkType():
  483. with connection.cursor() as cursor:
  484. id_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony") if c.name == "id"][0]
  485. fk_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider") if c.name == "pony_id"][0]
  486. self.assertEqual(id_type, fk_type)
  487. assertIdTypeEqualsFkType()
  488. # Test the database alteration
  489. with connection.schema_editor() as editor:
  490. operation.database_forwards("test_alflpkfk", editor, project_state, new_state)
  491. assertIdTypeEqualsFkType()
  492. # And test reversal
  493. with connection.schema_editor() as editor:
  494. operation.database_backwards("test_alflpkfk", editor, new_state, project_state)
  495. assertIdTypeEqualsFkType()
  496. def test_rename_field(self):
  497. """
  498. Tests the RenameField operation.
  499. """
  500. project_state = self.set_up_test_model("test_rnfl")
  501. # Test the state alteration
  502. operation = migrations.RenameField("Pony", "pink", "blue")
  503. new_state = project_state.clone()
  504. operation.state_forwards("test_rnfl", new_state)
  505. self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  506. self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  507. # Test the database alteration
  508. self.assertColumnExists("test_rnfl_pony", "pink")
  509. self.assertColumnNotExists("test_rnfl_pony", "blue")
  510. with connection.schema_editor() as editor:
  511. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  512. self.assertColumnExists("test_rnfl_pony", "blue")
  513. self.assertColumnNotExists("test_rnfl_pony", "pink")
  514. # And test reversal
  515. with connection.schema_editor() as editor:
  516. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  517. self.assertColumnExists("test_rnfl_pony", "pink")
  518. self.assertColumnNotExists("test_rnfl_pony", "blue")
  519. def test_alter_unique_together(self):
  520. """
  521. Tests the AlterUniqueTogether operation.
  522. """
  523. project_state = self.set_up_test_model("test_alunto")
  524. # Test the state alteration
  525. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  526. new_state = project_state.clone()
  527. operation.state_forwards("test_alunto", new_state)
  528. self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
  529. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  530. # Make sure we can insert duplicate rows
  531. with connection.cursor() as cursor:
  532. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  533. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  534. cursor.execute("DELETE FROM test_alunto_pony")
  535. # Test the database alteration
  536. with connection.schema_editor() as editor:
  537. operation.database_forwards("test_alunto", editor, project_state, new_state)
  538. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  539. with self.assertRaises(IntegrityError):
  540. with atomic():
  541. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  542. cursor.execute("DELETE FROM test_alunto_pony")
  543. # And test reversal
  544. with connection.schema_editor() as editor:
  545. operation.database_backwards("test_alunto", editor, new_state, project_state)
  546. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  547. cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
  548. cursor.execute("DELETE FROM test_alunto_pony")
  549. # Test flat unique_together
  550. operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
  551. operation.state_forwards("test_alunto", new_state)
  552. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  553. def test_alter_index_together(self):
  554. """
  555. Tests the AlterIndexTogether operation.
  556. """
  557. project_state = self.set_up_test_model("test_alinto")
  558. # Test the state alteration
  559. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  560. new_state = project_state.clone()
  561. operation.state_forwards("test_alinto", new_state)
  562. self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
  563. self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
  564. # Make sure there's no matching index
  565. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  566. # Test the database alteration
  567. with connection.schema_editor() as editor:
  568. operation.database_forwards("test_alinto", editor, project_state, new_state)
  569. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  570. # And test reversal
  571. with connection.schema_editor() as editor:
  572. operation.database_backwards("test_alinto", editor, new_state, project_state)
  573. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  574. def test_run_sql(self):
  575. """
  576. Tests the RunSQL operation.
  577. """
  578. project_state = self.set_up_test_model("test_runsql")
  579. # Create the operation
  580. operation = migrations.RunSQL(
  581. "CREATE TABLE i_love_ponies (id int, special_thing int)",
  582. "DROP TABLE i_love_ponies",
  583. state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])],
  584. )
  585. # Test the state alteration
  586. new_state = project_state.clone()
  587. operation.state_forwards("test_runsql", new_state)
  588. self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1)
  589. # Make sure there's no table
  590. self.assertTableNotExists("i_love_ponies")
  591. # Test the database alteration
  592. with connection.schema_editor() as editor:
  593. operation.database_forwards("test_runsql", editor, project_state, new_state)
  594. self.assertTableExists("i_love_ponies")
  595. # And test reversal
  596. self.assertTrue(operation.reversible)
  597. with connection.schema_editor() as editor:
  598. operation.database_backwards("test_runsql", editor, new_state, project_state)
  599. self.assertTableNotExists("i_love_ponies")
  600. def test_run_python(self):
  601. """
  602. Tests the RunPython operation
  603. """
  604. project_state = self.set_up_test_model("test_runpython", mti_model=True)
  605. # Create the operation
  606. def inner_method(models, schema_editor):
  607. Pony = models.get_model("test_runpython", "Pony")
  608. Pony.objects.create(pink=1, weight=3.55)
  609. Pony.objects.create(weight=5)
  610. def inner_method_reverse(models, schema_editor):
  611. Pony = models.get_model("test_runpython", "Pony")
  612. Pony.objects.filter(pink=1, weight=3.55).delete()
  613. Pony.objects.filter(weight=5).delete()
  614. operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse)
  615. # Test the state alteration does nothing
  616. new_state = project_state.clone()
  617. operation.state_forwards("test_runpython", new_state)
  618. self.assertEqual(new_state, project_state)
  619. # Test the database alteration
  620. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  621. with connection.schema_editor() as editor:
  622. operation.database_forwards("test_runpython", editor, project_state, new_state)
  623. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  624. # Now test reversal
  625. self.assertTrue(operation.reversible)
  626. with connection.schema_editor() as editor:
  627. operation.database_backwards("test_runpython", editor, project_state, new_state)
  628. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
  629. # Now test we can't use a string
  630. with self.assertRaises(ValueError):
  631. operation = migrations.RunPython("print 'ahahaha'")
  632. # Also test reversal fails, with an operation identical to above but without reverse_code set
  633. no_reverse_operation = migrations.RunPython(inner_method)
  634. self.assertFalse(no_reverse_operation.reversible)
  635. with connection.schema_editor() as editor:
  636. no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state)
  637. with self.assertRaises(NotImplementedError):
  638. no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state)
  639. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
  640. def create_ponies(models, schema_editor):
  641. Pony = models.get_model("test_runpython", "Pony")
  642. pony1 = Pony.objects.create(pink=1, weight=3.55)
  643. self.assertIsNot(pony1.pk, None)
  644. pony2 = Pony.objects.create(weight=5)
  645. self.assertIsNot(pony2.pk, None)
  646. self.assertNotEqual(pony1.pk, pony2.pk)
  647. operation = migrations.RunPython(create_ponies)
  648. with connection.schema_editor() as editor:
  649. operation.database_forwards("test_runpython", editor, project_state, new_state)
  650. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 4)
  651. def create_shetlandponies(models, schema_editor):
  652. ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
  653. pony1 = ShetlandPony.objects.create(weight=4.0)
  654. self.assertIsNot(pony1.pk, None)
  655. pony2 = ShetlandPony.objects.create(weight=5.0)
  656. self.assertIsNot(pony2.pk, None)
  657. self.assertNotEqual(pony1.pk, pony2.pk)
  658. operation = migrations.RunPython(create_shetlandponies)
  659. with connection.schema_editor() as editor:
  660. operation.database_forwards("test_runpython", editor, project_state, new_state)
  661. self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 6)
  662. self.assertEqual(project_state.render().get_model("test_runpython", "ShetlandPony").objects.count(), 2)
  663. class MigrateNothingRouter(object):
  664. """
  665. A router that sends all writes to the other database.
  666. """
  667. def allow_migrate(self, db, model):
  668. return False
  669. class MultiDBOperationTests(MigrationTestBase):
  670. multi_db = True
  671. def setUp(self):
  672. # Make the 'other' database appear to be a slave of the 'default'
  673. self.old_routers = router.routers
  674. router.routers = [MigrateNothingRouter()]
  675. def tearDown(self):
  676. # Restore the 'other' database as an independent database
  677. router.routers = self.old_routers
  678. def test_create_model(self):
  679. """
  680. Tests that CreateModel honours multi-db settings.
  681. """
  682. operation = migrations.CreateModel(
  683. "Pony",
  684. [
  685. ("id", models.AutoField(primary_key=True)),
  686. ("pink", models.IntegerField(default=1)),
  687. ],
  688. )
  689. # Test the state alteration
  690. project_state = ProjectState()
  691. new_state = project_state.clone()
  692. operation.state_forwards("test_crmo", new_state)
  693. # Test the database alteration
  694. self.assertTableNotExists("test_crmo_pony")
  695. with connection.schema_editor() as editor:
  696. operation.database_forwards("test_crmo", editor, project_state, new_state)
  697. self.assertTableNotExists("test_crmo_pony")
  698. # And test reversal
  699. with connection.schema_editor() as editor:
  700. operation.database_backwards("test_crmo", editor, new_state, project_state)
  701. self.assertTableNotExists("test_crmo_pony")