test_operations.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. from django.db import connection, models, migrations, router
  2. from django.db.transaction import atomic
  3. from django.db.utils import IntegrityError
  4. from django.db.migrations.state import ProjectState
  5. from .test_base import MigrationTestBase
  6. class OperationTests(MigrationTestBase):
  7. """
  8. Tests running the operations and making sure they do what they say they do.
  9. Each test looks at their state changing, and then their database operation -
  10. both forwards and backwards.
  11. """
  12. def set_up_test_model(self, app_label, second_model=False):
  13. """
  14. Creates a test model state and database table.
  15. """
  16. # Make the "current" state
  17. operations = [migrations.CreateModel(
  18. "Pony",
  19. [
  20. ("id", models.AutoField(primary_key=True)),
  21. ("pink", models.IntegerField(default=3)),
  22. ("weight", models.FloatField()),
  23. ],
  24. )]
  25. if second_model:
  26. operations.append(migrations.CreateModel("Stable", [("id", models.AutoField(primary_key=True))]))
  27. project_state = ProjectState()
  28. for operation in operations:
  29. operation.state_forwards(app_label, project_state)
  30. # Set up the database
  31. with connection.schema_editor() as editor:
  32. for operation in operations:
  33. operation.database_forwards(app_label, editor, ProjectState(), project_state)
  34. return project_state
  35. def test_create_model(self):
  36. """
  37. Tests the CreateModel operation.
  38. Most other tests use this operation as part of setup, so check failures here first.
  39. """
  40. operation = migrations.CreateModel(
  41. "Pony",
  42. [
  43. ("id", models.AutoField(primary_key=True)),
  44. ("pink", models.IntegerField(default=1)),
  45. ],
  46. )
  47. # Test the state alteration
  48. project_state = ProjectState()
  49. new_state = project_state.clone()
  50. operation.state_forwards("test_crmo", new_state)
  51. self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
  52. self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
  53. # Test the database alteration
  54. self.assertTableNotExists("test_crmo_pony")
  55. with connection.schema_editor() as editor:
  56. operation.database_forwards("test_crmo", editor, project_state, new_state)
  57. self.assertTableExists("test_crmo_pony")
  58. # And test reversal
  59. with connection.schema_editor() as editor:
  60. operation.database_backwards("test_crmo", editor, new_state, project_state)
  61. self.assertTableNotExists("test_crmo_pony")
  62. # And deconstruction
  63. definition = operation.deconstruct()
  64. self.assertEqual(definition[0], "CreateModel")
  65. self.assertEqual(len(definition[1]), 2)
  66. self.assertEqual(len(definition[2]), 0)
  67. self.assertEqual(definition[1][0], "Pony")
  68. def test_delete_model(self):
  69. """
  70. Tests the DeleteModel operation.
  71. """
  72. project_state = self.set_up_test_model("test_dlmo")
  73. # Test the state alteration
  74. operation = migrations.DeleteModel("Pony")
  75. new_state = project_state.clone()
  76. operation.state_forwards("test_dlmo", new_state)
  77. self.assertNotIn(("test_dlmo", "pony"), new_state.models)
  78. # Test the database alteration
  79. self.assertTableExists("test_dlmo_pony")
  80. with connection.schema_editor() as editor:
  81. operation.database_forwards("test_dlmo", editor, project_state, new_state)
  82. self.assertTableNotExists("test_dlmo_pony")
  83. # And test reversal
  84. with connection.schema_editor() as editor:
  85. operation.database_backwards("test_dlmo", editor, new_state, project_state)
  86. self.assertTableExists("test_dlmo_pony")
  87. def test_add_field(self):
  88. """
  89. Tests the AddField operation.
  90. """
  91. project_state = self.set_up_test_model("test_adfl")
  92. # Test the state alteration
  93. operation = migrations.AddField("Pony", "height", models.FloatField(null=True))
  94. new_state = project_state.clone()
  95. operation.state_forwards("test_adfl", new_state)
  96. self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
  97. # Test the database alteration
  98. self.assertColumnNotExists("test_adfl_pony", "height")
  99. with connection.schema_editor() as editor:
  100. operation.database_forwards("test_adfl", editor, project_state, new_state)
  101. self.assertColumnExists("test_adfl_pony", "height")
  102. # And test reversal
  103. with connection.schema_editor() as editor:
  104. operation.database_backwards("test_adfl", editor, new_state, project_state)
  105. self.assertColumnNotExists("test_adfl_pony", "height")
  106. def test_add_field_m2m(self):
  107. """
  108. Tests the AddField operation with a ManyToManyField.
  109. """
  110. project_state = self.set_up_test_model("test_adflmm", second_model=True)
  111. # Test the state alteration
  112. operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable"))
  113. new_state = project_state.clone()
  114. operation.state_forwards("test_adflmm", new_state)
  115. self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
  116. # Test the database alteration
  117. self.assertTableNotExists("test_adflmm_pony_stables")
  118. with connection.schema_editor() as editor:
  119. operation.database_forwards("test_adflmm", editor, project_state, new_state)
  120. self.assertTableExists("test_adflmm_pony_stables")
  121. self.assertColumnNotExists("test_adflmm_pony", "stables")
  122. # And test reversal
  123. with connection.schema_editor() as editor:
  124. operation.database_backwards("test_adflmm", editor, new_state, project_state)
  125. self.assertTableNotExists("test_adflmm_pony_stables")
  126. def test_remove_field(self):
  127. """
  128. Tests the RemoveField operation.
  129. """
  130. project_state = self.set_up_test_model("test_rmfl")
  131. # Test the state alteration
  132. operation = migrations.RemoveField("Pony", "pink")
  133. new_state = project_state.clone()
  134. operation.state_forwards("test_rmfl", new_state)
  135. self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
  136. # Test the database alteration
  137. self.assertColumnExists("test_rmfl_pony", "pink")
  138. with connection.schema_editor() as editor:
  139. operation.database_forwards("test_rmfl", editor, project_state, new_state)
  140. self.assertColumnNotExists("test_rmfl_pony", "pink")
  141. # And test reversal
  142. with connection.schema_editor() as editor:
  143. operation.database_backwards("test_rmfl", editor, new_state, project_state)
  144. self.assertColumnExists("test_rmfl_pony", "pink")
  145. def test_alter_model_table(self):
  146. """
  147. Tests the AlterModelTable operation.
  148. """
  149. project_state = self.set_up_test_model("test_almota")
  150. # Test the state alteration
  151. operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
  152. new_state = project_state.clone()
  153. operation.state_forwards("test_almota", new_state)
  154. self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
  155. # Test the database alteration
  156. self.assertTableExists("test_almota_pony")
  157. self.assertTableNotExists("test_almota_pony_2")
  158. with connection.schema_editor() as editor:
  159. operation.database_forwards("test_almota", editor, project_state, new_state)
  160. self.assertTableNotExists("test_almota_pony")
  161. self.assertTableExists("test_almota_pony_2")
  162. # And test reversal
  163. with connection.schema_editor() as editor:
  164. operation.database_backwards("test_almota", editor, new_state, project_state)
  165. self.assertTableExists("test_almota_pony")
  166. self.assertTableNotExists("test_almota_pony_2")
  167. def test_alter_field(self):
  168. """
  169. Tests the AlterField operation.
  170. """
  171. project_state = self.set_up_test_model("test_alfl")
  172. # Test the state alteration
  173. operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
  174. new_state = project_state.clone()
  175. operation.state_forwards("test_alfl", new_state)
  176. self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
  177. self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
  178. # Test the database alteration
  179. self.assertColumnNotNull("test_alfl_pony", "pink")
  180. with connection.schema_editor() as editor:
  181. operation.database_forwards("test_alfl", editor, project_state, new_state)
  182. self.assertColumnNull("test_alfl_pony", "pink")
  183. # And test reversal
  184. with connection.schema_editor() as editor:
  185. operation.database_backwards("test_alfl", editor, new_state, project_state)
  186. self.assertColumnNotNull("test_alfl_pony", "pink")
  187. def test_rename_field(self):
  188. """
  189. Tests the RenameField operation.
  190. """
  191. project_state = self.set_up_test_model("test_rnfl")
  192. # Test the state alteration
  193. operation = migrations.RenameField("Pony", "pink", "blue")
  194. new_state = project_state.clone()
  195. operation.state_forwards("test_rnfl", new_state)
  196. self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  197. self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
  198. # Test the database alteration
  199. self.assertColumnExists("test_rnfl_pony", "pink")
  200. self.assertColumnNotExists("test_rnfl_pony", "blue")
  201. with connection.schema_editor() as editor:
  202. operation.database_forwards("test_rnfl", editor, project_state, new_state)
  203. self.assertColumnExists("test_rnfl_pony", "blue")
  204. self.assertColumnNotExists("test_rnfl_pony", "pink")
  205. # And test reversal
  206. with connection.schema_editor() as editor:
  207. operation.database_backwards("test_rnfl", editor, new_state, project_state)
  208. self.assertColumnExists("test_rnfl_pony", "pink")
  209. self.assertColumnNotExists("test_rnfl_pony", "blue")
  210. def test_alter_unique_together(self):
  211. """
  212. Tests the AlterUniqueTogether operation.
  213. """
  214. project_state = self.set_up_test_model("test_alunto")
  215. # Test the state alteration
  216. operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
  217. new_state = project_state.clone()
  218. operation.state_forwards("test_alunto", new_state)
  219. self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
  220. self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
  221. # Make sure we can insert duplicate rows
  222. cursor = connection.cursor()
  223. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (1, 1, 1)")
  224. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (2, 1, 1)")
  225. cursor.execute("DELETE FROM test_alunto_pony")
  226. # Test the database alteration
  227. with connection.schema_editor() as editor:
  228. operation.database_forwards("test_alunto", editor, project_state, new_state)
  229. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (1, 1, 1)")
  230. with self.assertRaises(IntegrityError):
  231. with atomic():
  232. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (2, 1, 1)")
  233. cursor.execute("DELETE FROM test_alunto_pony")
  234. # And test reversal
  235. with connection.schema_editor() as editor:
  236. operation.database_backwards("test_alunto", editor, new_state, project_state)
  237. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (1, 1, 1)")
  238. cursor.execute("INSERT INTO test_alunto_pony (id, pink, weight) VALUES (2, 1, 1)")
  239. cursor.execute("DELETE FROM test_alunto_pony")
  240. def test_alter_index_together(self):
  241. """
  242. Tests the AlterIndexTogether operation.
  243. """
  244. project_state = self.set_up_test_model("test_alinto")
  245. # Test the state alteration
  246. operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
  247. new_state = project_state.clone()
  248. operation.state_forwards("test_alinto", new_state)
  249. self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
  250. self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
  251. # Make sure there's no matching index
  252. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  253. # Test the database alteration
  254. with connection.schema_editor() as editor:
  255. operation.database_forwards("test_alinto", editor, project_state, new_state)
  256. self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
  257. # And test reversal
  258. with connection.schema_editor() as editor:
  259. operation.database_backwards("test_alinto", editor, new_state, project_state)
  260. self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
  261. class MigrateNothingRouter(object):
  262. """
  263. A router that sends all writes to the other database.
  264. """
  265. def allow_migrate(self, db, model):
  266. return False
  267. class MultiDBOperationTests(MigrationTestBase):
  268. multi_db = True
  269. def setUp(self):
  270. # Make the 'other' database appear to be a slave of the 'default'
  271. self.old_routers = router.routers
  272. router.routers = [MigrateNothingRouter()]
  273. def tearDown(self):
  274. # Restore the 'other' database as an independent database
  275. router.routers = self.old_routers
  276. def test_create_model(self):
  277. """
  278. Tests that CreateModel honours multi-db settings.
  279. """
  280. operation = migrations.CreateModel(
  281. "Pony",
  282. [
  283. ("id", models.AutoField(primary_key=True)),
  284. ("pink", models.IntegerField(default=1)),
  285. ],
  286. )
  287. # Test the state alteration
  288. project_state = ProjectState()
  289. new_state = project_state.clone()
  290. operation.state_forwards("test_crmo", new_state)
  291. # Test the database alteration
  292. self.assertTableNotExists("test_crmo_pony")
  293. with connection.schema_editor() as editor:
  294. operation.database_forwards("test_crmo", editor, project_state, new_state)
  295. self.assertTableNotExists("test_crmo_pony")
  296. # And test reversal
  297. with connection.schema_editor() as editor:
  298. operation.database_backwards("test_crmo", editor, new_state, project_state)
  299. self.assertTableNotExists("test_crmo_pony")