123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014 |
- from __future__ import unicode_literals
- import unittest
- try:
- import sqlparse
- except ImportError:
- sqlparse = None
- from django import test
- from django.db import connection, migrations, models, router
- from django.db.migrations.migration import Migration
- from django.db.migrations.state import ProjectState
- from django.db.models.fields import NOT_PROVIDED
- from django.db.transaction import atomic
- from django.db.utils import IntegrityError, DatabaseError
- from .test_base import MigrationTestBase
- class OperationTests(MigrationTestBase):
- """
- Tests running the operations and making sure they do what they say they do.
- Each test looks at their state changing, and then their database operation -
- both forwards and backwards.
- """
- def apply_operations(self, app_label, project_state, operations):
- migration = Migration('name', app_label)
- migration.operations = operations
- with connection.schema_editor() as editor:
- return migration.apply(project_state, editor)
- def unapply_operations(self, app_label, project_state, operations):
- migration = Migration('name', app_label)
- migration.operations = operations
- with connection.schema_editor() as editor:
- return migration.unapply(project_state, editor)
- def set_up_test_model(self, app_label, second_model=False, third_model=False, related_model=False, mti_model=False):
- """
- Creates a test model state and database table.
- """
- # Delete the tables if they already exist
- with connection.cursor() as cursor:
- # Start with ManyToMany tables
- try:
- cursor.execute("DROP TABLE %s_pony_stables" % app_label)
- except DatabaseError:
- pass
- try:
- cursor.execute("DROP TABLE %s_pony_vans" % app_label)
- except DatabaseError:
- pass
- # Then standard model tables
- try:
- cursor.execute("DROP TABLE %s_pony" % app_label)
- except DatabaseError:
- pass
- try:
- cursor.execute("DROP TABLE %s_stable" % app_label)
- except DatabaseError:
- pass
- try:
- cursor.execute("DROP TABLE %s_van" % app_label)
- except DatabaseError:
- pass
- # Make the "current" state
- operations = [migrations.CreateModel(
- "Pony",
- [
- ("id", models.AutoField(primary_key=True)),
- ("pink", models.IntegerField(default=3)),
- ("weight", models.FloatField()),
- ],
- )]
- if second_model:
- operations.append(migrations.CreateModel(
- "Stable",
- [
- ("id", models.AutoField(primary_key=True)),
- ]
- ))
- if third_model:
- operations.append(migrations.CreateModel(
- "Van",
- [
- ("id", models.AutoField(primary_key=True)),
- ]
- ))
- if related_model:
- operations.append(migrations.CreateModel(
- "Rider",
- [
- ("id", models.AutoField(primary_key=True)),
- ("pony", models.ForeignKey("Pony")),
- ],
- ))
- if mti_model:
- operations.append(migrations.CreateModel(
- "ShetlandPony",
- fields=[
- ('pony_ptr', models.OneToOneField(
- auto_created=True,
- primary_key=True,
- to_field='id',
- serialize=False,
- to='Pony',
- )),
- ("cuteness", models.IntegerField(default=1)),
- ],
- bases=['%s.Pony' % app_label],
- ))
- return self.apply_operations(app_label, ProjectState(), operations)
- def test_create_model(self):
- """
- Tests the CreateModel operation.
- Most other tests use this operation as part of setup, so check failures here first.
- """
- operation = migrations.CreateModel(
- "Pony",
- [
- ("id", models.AutoField(primary_key=True)),
- ("pink", models.IntegerField(default=1)),
- ],
- )
- # Test the state alteration
- project_state = ProjectState()
- new_state = project_state.clone()
- operation.state_forwards("test_crmo", new_state)
- self.assertEqual(new_state.models["test_crmo", "pony"].name, "Pony")
- self.assertEqual(len(new_state.models["test_crmo", "pony"].fields), 2)
- # Test the database alteration
- self.assertTableNotExists("test_crmo_pony")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_crmo", editor, project_state, new_state)
- self.assertTableExists("test_crmo_pony")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_crmo", editor, new_state, project_state)
- self.assertTableNotExists("test_crmo_pony")
- # And deconstruction
- definition = operation.deconstruct()
- self.assertEqual(definition[0], "CreateModel")
- self.assertEqual(len(definition[1]), 2)
- self.assertEqual(len(definition[2]), 0)
- self.assertEqual(definition[1][0], "Pony")
- def test_create_model_m2m(self):
- """
- Test the creation of a model with a ManyToMany field and the
- auto-created "through" model.
- """
- project_state = self.set_up_test_model("test_crmomm")
- operation = migrations.CreateModel(
- "Stable",
- [
- ("id", models.AutoField(primary_key=True)),
- ("ponies", models.ManyToManyField("Pony", related_name="stables"))
- ]
- )
- # Test the state alteration
- new_state = project_state.clone()
- operation.state_forwards("test_crmomm", new_state)
- # Test the database alteration
- self.assertTableNotExists("test_crmomm_stable_ponies")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_crmomm", editor, project_state, new_state)
- self.assertTableExists("test_crmomm_stable")
- self.assertTableExists("test_crmomm_stable_ponies")
- self.assertColumnNotExists("test_crmomm_stable", "ponies")
- # Make sure the M2M field actually works
- with atomic():
- new_apps = new_state.render()
- Pony = new_apps.get_model("test_crmomm", "Pony")
- Stable = new_apps.get_model("test_crmomm", "Stable")
- stable = Stable.objects.create()
- p1 = Pony.objects.create(pink=False, weight=4.55)
- p2 = Pony.objects.create(pink=True, weight=5.43)
- stable.ponies.add(p1, p2)
- self.assertEqual(stable.ponies.count(), 2)
- stable.ponies.all().delete()
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_crmomm", editor, new_state, project_state)
- self.assertTableNotExists("test_crmomm_stable")
- self.assertTableNotExists("test_crmomm_stable_ponies")
- def test_create_model_inheritance(self):
- """
- Tests the CreateModel operation on a multi-table inheritance setup.
- """
- project_state = self.set_up_test_model("test_crmoih")
- # Test the state alteration
- operation = migrations.CreateModel(
- "ShetlandPony",
- [
- ('pony_ptr', models.OneToOneField(
- auto_created=True,
- primary_key=True,
- to_field='id',
- serialize=False,
- to='test_crmoih.Pony',
- )),
- ("cuteness", models.IntegerField(default=1)),
- ],
- )
- new_state = project_state.clone()
- operation.state_forwards("test_crmoih", new_state)
- self.assertIn(("test_crmoih", "shetlandpony"), new_state.models)
- # Test the database alteration
- self.assertTableNotExists("test_crmoih_shetlandpony")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_crmoih", editor, project_state, new_state)
- self.assertTableExists("test_crmoih_shetlandpony")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_crmoih", editor, new_state, project_state)
- self.assertTableNotExists("test_crmoih_shetlandpony")
- def test_delete_model(self):
- """
- Tests the DeleteModel operation.
- """
- project_state = self.set_up_test_model("test_dlmo")
- # Test the state alteration
- operation = migrations.DeleteModel("Pony")
- new_state = project_state.clone()
- operation.state_forwards("test_dlmo", new_state)
- self.assertNotIn(("test_dlmo", "pony"), new_state.models)
- # Test the database alteration
- self.assertTableExists("test_dlmo_pony")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_dlmo", editor, project_state, new_state)
- self.assertTableNotExists("test_dlmo_pony")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_dlmo", editor, new_state, project_state)
- self.assertTableExists("test_dlmo_pony")
- def test_rename_model(self):
- """
- Tests the RenameModel operation.
- """
- project_state = self.set_up_test_model("test_rnmo")
- # Test the state alteration
- operation = migrations.RenameModel("Pony", "Horse")
- new_state = project_state.clone()
- operation.state_forwards("test_rnmo", new_state)
- self.assertNotIn(("test_rnmo", "pony"), new_state.models)
- self.assertIn(("test_rnmo", "horse"), new_state.models)
- # Test the database alteration
- self.assertTableExists("test_rnmo_pony")
- self.assertTableNotExists("test_rnmo_horse")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_rnmo", editor, project_state, new_state)
- self.assertTableNotExists("test_rnmo_pony")
- self.assertTableExists("test_rnmo_horse")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_rnmo", editor, new_state, project_state)
- self.assertTableExists("test_dlmo_pony")
- self.assertTableNotExists("test_rnmo_horse")
- # See #22248 - this will fail until that's fixed.
- #
- # def test_rename_model_with_related(self):
- # """
- # Tests the real-world combo of a RenameModel operation with AlterField
- # for a related field.
- # """
- # project_state = self.set_up_test_model(
- # "test_rnmowr", related_model=True)
- # # Test the state alterations
- # model_operation = migrations.RenameModel("Pony", "Horse")
- # new_state = project_state.clone()
- # model_operation.state_forwards("test_rnmowr", new_state)
- # self.assertNotIn(("test_rnmowr", "pony"), new_state.models)
- # self.assertIn(("test_rnmowr", "horse"), new_state.models)
- # self.assertEqual(
- # "Pony",
- # project_state.render().get_model("test_rnmowr", "rider")
- # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
- # field_operation = migrations.AlterField(
- # "Rider", "pony", models.ForeignKey("Horse"))
- # field_operation.state_forwards("test_rnmowr", new_state)
- # self.assertEqual(
- # "Horse",
- # new_state.render().get_model("test_rnmowr", "rider")
- # ._meta.get_field_by_name("pony")[0].rel.to._meta.object_name)
- # # Test the database alterations
- # self.assertTableExists("test_rnmowr_pony")
- # self.assertTableNotExists("test_rnmowr_horse")
- # with connection.schema_editor() as editor:
- # model_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
- # field_operation.database_forwards("test_rnmowr", editor, project_state, new_state)
- # self.assertTableNotExists("test_rnmowr_pony")
- # self.assertTableExists("test_rnmowr_horse")
- # # And test reversal
- # with connection.schema_editor() as editor:
- # field_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
- # model_operation.database_backwards("test_rnmowr", editor, new_state, project_state)
- # self.assertTableExists("test_rnmowr_pony")
- # self.assertTableNotExists("test_rnmowr_horse")
- def test_add_field(self):
- """
- Tests the AddField operation.
- """
- project_state = self.set_up_test_model("test_adfl")
- # Test the state alteration
- operation = migrations.AddField(
- "Pony",
- "height",
- models.FloatField(null=True, default=5),
- )
- new_state = project_state.clone()
- operation.state_forwards("test_adfl", new_state)
- self.assertEqual(len(new_state.models["test_adfl", "pony"].fields), 4)
- field = [
- f for n, f in new_state.models["test_adfl", "pony"].fields
- if n == "height"
- ][0]
- self.assertEqual(field.default, 5)
- # Test the database alteration
- self.assertColumnNotExists("test_adfl_pony", "height")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_adfl", editor, project_state, new_state)
- self.assertColumnExists("test_adfl_pony", "height")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_adfl", editor, new_state, project_state)
- self.assertColumnNotExists("test_adfl_pony", "height")
- def test_add_charfield(self):
- """
- Tests the AddField operation on TextField.
- """
- project_state = self.set_up_test_model("test_adchfl")
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_adchfl", "Pony")
- pony = Pony.objects.create(weight=42)
- new_state = self.apply_operations("test_adchfl", project_state, [
- migrations.AddField(
- "Pony",
- "text",
- models.CharField(max_length=10, default="some text"),
- ),
- migrations.AddField(
- "Pony",
- "empty",
- models.CharField(max_length=10, default=""),
- ),
- # If not properly quoted digits would be interpreted as an int.
- migrations.AddField(
- "Pony",
- "digits",
- models.CharField(max_length=10, default="42"),
- ),
- # Manual quoting is fragile and could trip on quotes. Refs #xyz.
- migrations.AddField(
- "Pony",
- "quotes",
- models.CharField(max_length=10, default='"\'"'),
- ),
- ])
- new_apps = new_state.render()
- Pony = new_apps.get_model("test_adchfl", "Pony")
- pony = Pony.objects.get(pk=pony.pk)
- self.assertEqual(pony.text, "some text")
- self.assertEqual(pony.empty, "")
- self.assertEqual(pony.digits, "42")
- self.assertEqual(pony.quotes, '"\'"')
- def test_add_textfield(self):
- """
- Tests the AddField operation on TextField.
- """
- project_state = self.set_up_test_model("test_adtxtfl")
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_adtxtfl", "Pony")
- pony = Pony.objects.create(weight=42)
- new_state = self.apply_operations("test_adtxtfl", project_state, [
- migrations.AddField(
- "Pony",
- "text",
- models.TextField(default="some text"),
- ),
- migrations.AddField(
- "Pony",
- "empty",
- models.TextField(default=""),
- ),
- # If not properly quoted digits would be interpreted as an int.
- migrations.AddField(
- "Pony",
- "digits",
- models.TextField(default="42"),
- ),
- # Manual quoting is fragile and could trip on quotes. Refs #xyz.
- migrations.AddField(
- "Pony",
- "quotes",
- models.TextField(default='"\'"'),
- ),
- ])
- new_apps = new_state.render()
- Pony = new_apps.get_model("test_adtxtfl", "Pony")
- pony = Pony.objects.get(pk=pony.pk)
- self.assertEqual(pony.text, "some text")
- self.assertEqual(pony.empty, "")
- self.assertEqual(pony.digits, "42")
- self.assertEqual(pony.quotes, '"\'"')
- @test.skipUnlessDBFeature('supports_binary_field')
- def test_add_binaryfield(self):
- """
- Tests the AddField operation on TextField/BinaryField.
- """
- project_state = self.set_up_test_model("test_adbinfl")
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_adbinfl", "Pony")
- pony = Pony.objects.create(weight=42)
- new_state = self.apply_operations("test_adbinfl", project_state, [
- migrations.AddField(
- "Pony",
- "blob",
- models.BinaryField(default=b"some text"),
- ),
- migrations.AddField(
- "Pony",
- "empty",
- models.BinaryField(default=b""),
- ),
- # If not properly quoted digits would be interpreted as an int.
- migrations.AddField(
- "Pony",
- "digits",
- models.BinaryField(default=b"42"),
- ),
- # Manual quoting is fragile and could trip on quotes. Refs #xyz.
- migrations.AddField(
- "Pony",
- "quotes",
- models.BinaryField(default=b'"\'"'),
- ),
- ])
- new_apps = new_state.render()
- Pony = new_apps.get_model("test_adbinfl", "Pony")
- pony = Pony.objects.get(pk=pony.pk)
- # SQLite returns buffer/memoryview, cast to bytes for checking.
- self.assertEqual(bytes(pony.blob), b"some text")
- self.assertEqual(bytes(pony.empty), b"")
- self.assertEqual(bytes(pony.digits), b"42")
- self.assertEqual(bytes(pony.quotes), b'"\'"')
- def test_column_name_quoting(self):
- """
- Column names that are SQL keywords shouldn't cause problems when used
- in migrations (#22168).
- """
- project_state = self.set_up_test_model("test_regr22168")
- operation = migrations.AddField(
- "Pony",
- "order",
- models.IntegerField(default=0),
- )
- new_state = project_state.clone()
- operation.state_forwards("test_regr22168", new_state)
- with connection.schema_editor() as editor:
- operation.database_forwards("test_regr22168", editor, project_state, new_state)
- self.assertColumnExists("test_regr22168_pony", "order")
- def test_add_field_preserve_default(self):
- """
- Tests the AddField operation's state alteration
- when preserve_default = False.
- """
- project_state = self.set_up_test_model("test_adflpd")
- # Test the state alteration
- operation = migrations.AddField(
- "Pony",
- "height",
- models.FloatField(null=True, default=4),
- preserve_default=False,
- )
- new_state = project_state.clone()
- operation.state_forwards("test_adflpd", new_state)
- self.assertEqual(len(new_state.models["test_adflpd", "pony"].fields), 4)
- field = [
- f for n, f in new_state.models["test_adflpd", "pony"].fields
- if n == "height"
- ][0]
- self.assertEqual(field.default, NOT_PROVIDED)
- # Test the database alteration
- project_state.render().get_model("test_adflpd", "pony").objects.create(
- weight=4,
- )
- self.assertColumnNotExists("test_adflpd_pony", "height")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_adflpd", editor, project_state, new_state)
- self.assertColumnExists("test_adflpd_pony", "height")
- def test_add_field_m2m(self):
- """
- Tests the AddField operation with a ManyToManyField.
- """
- project_state = self.set_up_test_model("test_adflmm", second_model=True)
- # Test the state alteration
- operation = migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
- new_state = project_state.clone()
- operation.state_forwards("test_adflmm", new_state)
- self.assertEqual(len(new_state.models["test_adflmm", "pony"].fields), 4)
- # Test the database alteration
- self.assertTableNotExists("test_adflmm_pony_stables")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_adflmm", editor, project_state, new_state)
- self.assertTableExists("test_adflmm_pony_stables")
- self.assertColumnNotExists("test_adflmm_pony", "stables")
- # Make sure the M2M field actually works
- with atomic():
- new_apps = new_state.render()
- Pony = new_apps.get_model("test_adflmm", "Pony")
- p = Pony.objects.create(pink=False, weight=4.55)
- p.stables.create()
- self.assertEqual(p.stables.count(), 1)
- p.stables.all().delete()
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_adflmm", editor, new_state, project_state)
- self.assertTableNotExists("test_adflmm_pony_stables")
- def test_alter_field_m2m(self):
- project_state = self.set_up_test_model("test_alflmm", second_model=True)
- project_state = self.apply_operations("test_alflmm", project_state, operations=[
- migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
- ])
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_alflmm", "Pony")
- self.assertFalse(Pony._meta.get_field('stables').blank)
- project_state = self.apply_operations("test_alflmm", project_state, operations=[
- migrations.AlterField("Pony", "stables", models.ManyToManyField(to="Stable", related_name="ponies", blank=True))
- ])
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_alflmm", "Pony")
- self.assertTrue(Pony._meta.get_field('stables').blank)
- def test_repoint_field_m2m(self):
- project_state = self.set_up_test_model("test_alflmm", second_model=True, third_model=True)
- project_state = self.apply_operations("test_alflmm", project_state, operations=[
- migrations.AddField("Pony", "places", models.ManyToManyField("Stable", related_name="ponies"))
- ])
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_alflmm", "Pony")
- project_state = self.apply_operations("test_alflmm", project_state, operations=[
- migrations.AlterField("Pony", "places", models.ManyToManyField(to="Van", related_name="ponies"))
- ])
- # Ensure the new field actually works
- new_apps = project_state.render()
- Pony = new_apps.get_model("test_alflmm", "Pony")
- p = Pony.objects.create(pink=False, weight=4.55)
- p.places.create()
- self.assertEqual(p.places.count(), 1)
- p.places.all().delete()
- def test_remove_field_m2m(self):
- project_state = self.set_up_test_model("test_rmflmm", second_model=True)
- project_state = self.apply_operations("test_rmflmm", project_state, operations=[
- migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies"))
- ])
- self.assertTableExists("test_rmflmm_pony_stables")
- operations = [migrations.RemoveField("Pony", "stables")]
- self.apply_operations("test_rmflmm", project_state, operations=operations)
- self.assertTableNotExists("test_rmflmm_pony_stables")
- # And test reversal
- self.unapply_operations("test_rmflmm", project_state, operations=operations)
- self.assertTableExists("test_rmflmm_pony_stables")
- def test_remove_field_m2m_with_through(self):
- project_state = self.set_up_test_model("test_rmflmmwt", second_model=True)
- self.assertTableNotExists("test_rmflmmwt_ponystables")
- project_state = self.apply_operations("test_rmflmmwt", project_state, operations=[
- migrations.CreateModel("PonyStables", fields=[
- ("pony", models.ForeignKey('test_rmflmmwt.Pony')),
- ("stable", models.ForeignKey('test_rmflmmwt.Stable')),
- ]),
- migrations.AddField("Pony", "stables", models.ManyToManyField("Stable", related_name="ponies", through='test_rmflmmwt.PonyStables'))
- ])
- self.assertTableExists("test_rmflmmwt_ponystables")
- operations = [migrations.RemoveField("Pony", "stables")]
- self.apply_operations("test_rmflmmwt", project_state, operations=operations)
- def test_remove_field(self):
- """
- Tests the RemoveField operation.
- """
- project_state = self.set_up_test_model("test_rmfl")
- # Test the state alteration
- operation = migrations.RemoveField("Pony", "pink")
- new_state = project_state.clone()
- operation.state_forwards("test_rmfl", new_state)
- self.assertEqual(len(new_state.models["test_rmfl", "pony"].fields), 2)
- # Test the database alteration
- self.assertColumnExists("test_rmfl_pony", "pink")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_rmfl", editor, project_state, new_state)
- self.assertColumnNotExists("test_rmfl_pony", "pink")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_rmfl", editor, new_state, project_state)
- self.assertColumnExists("test_rmfl_pony", "pink")
- def test_remove_fk(self):
- """
- Tests the RemoveField operation on a foreign key.
- """
- project_state = self.set_up_test_model("test_rfk", related_model=True)
- self.assertColumnExists("test_rfk_rider", "pony_id")
- operation = migrations.RemoveField("Rider", "pony")
- new_state = project_state.clone()
- operation.state_forwards("test_rfk", new_state)
- with connection.schema_editor() as editor:
- operation.database_forwards("test_rfk", editor, project_state, new_state)
- self.assertColumnNotExists("test_rfk_rider", "pony_id")
- with connection.schema_editor() as editor:
- operation.database_backwards("test_rfk", editor, new_state, project_state)
- self.assertColumnExists("test_rfk_rider", "pony_id")
- def test_alter_model_table(self):
- """
- Tests the AlterModelTable operation.
- """
- project_state = self.set_up_test_model("test_almota")
- # Test the state alteration
- operation = migrations.AlterModelTable("Pony", "test_almota_pony_2")
- new_state = project_state.clone()
- operation.state_forwards("test_almota", new_state)
- self.assertEqual(new_state.models["test_almota", "pony"].options["db_table"], "test_almota_pony_2")
- # Test the database alteration
- self.assertTableExists("test_almota_pony")
- self.assertTableNotExists("test_almota_pony_2")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_almota", editor, project_state, new_state)
- self.assertTableNotExists("test_almota_pony")
- self.assertTableExists("test_almota_pony_2")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_almota", editor, new_state, project_state)
- self.assertTableExists("test_almota_pony")
- self.assertTableNotExists("test_almota_pony_2")
- def test_alter_field(self):
- """
- Tests the AlterField operation.
- """
- project_state = self.set_up_test_model("test_alfl")
- # Test the state alteration
- operation = migrations.AlterField("Pony", "pink", models.IntegerField(null=True))
- new_state = project_state.clone()
- operation.state_forwards("test_alfl", new_state)
- self.assertEqual(project_state.models["test_alfl", "pony"].get_field_by_name("pink").null, False)
- self.assertEqual(new_state.models["test_alfl", "pony"].get_field_by_name("pink").null, True)
- # Test the database alteration
- self.assertColumnNotNull("test_alfl_pony", "pink")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_alfl", editor, project_state, new_state)
- self.assertColumnNull("test_alfl_pony", "pink")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_alfl", editor, new_state, project_state)
- self.assertColumnNotNull("test_alfl_pony", "pink")
- def test_alter_field_pk(self):
- """
- Tests the AlterField operation on primary keys (for things like PostgreSQL's SERIAL weirdness)
- """
- project_state = self.set_up_test_model("test_alflpk")
- # Test the state alteration
- operation = migrations.AlterField("Pony", "id", models.IntegerField(primary_key=True))
- new_state = project_state.clone()
- operation.state_forwards("test_alflpk", new_state)
- self.assertIsInstance(project_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.AutoField)
- self.assertIsInstance(new_state.models["test_alflpk", "pony"].get_field_by_name("id"), models.IntegerField)
- # Test the database alteration
- with connection.schema_editor() as editor:
- operation.database_forwards("test_alflpk", editor, project_state, new_state)
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_alflpk", editor, new_state, project_state)
- @unittest.skipUnless(connection.features.supports_foreign_keys, "No FK support")
- def test_alter_field_pk_fk(self):
- """
- Tests the AlterField operation on primary keys changes any FKs pointing to it.
- """
- project_state = self.set_up_test_model("test_alflpkfk", related_model=True)
- # Test the state alteration
- operation = migrations.AlterField("Pony", "id", models.FloatField(primary_key=True))
- new_state = project_state.clone()
- operation.state_forwards("test_alflpkfk", new_state)
- self.assertIsInstance(project_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.AutoField)
- self.assertIsInstance(new_state.models["test_alflpkfk", "pony"].get_field_by_name("id"), models.FloatField)
- def assertIdTypeEqualsFkType():
- with connection.cursor() as cursor:
- id_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_pony") if c.name == "id"][0]
- fk_type = [c.type_code for c in connection.introspection.get_table_description(cursor, "test_alflpkfk_rider") if c.name == "pony_id"][0]
- self.assertEqual(id_type, fk_type)
- assertIdTypeEqualsFkType()
- # Test the database alteration
- with connection.schema_editor() as editor:
- operation.database_forwards("test_alflpkfk", editor, project_state, new_state)
- assertIdTypeEqualsFkType()
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_alflpkfk", editor, new_state, project_state)
- assertIdTypeEqualsFkType()
- def test_rename_field(self):
- """
- Tests the RenameField operation.
- """
- project_state = self.set_up_test_model("test_rnfl")
- # Test the state alteration
- operation = migrations.RenameField("Pony", "pink", "blue")
- new_state = project_state.clone()
- operation.state_forwards("test_rnfl", new_state)
- self.assertIn("blue", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
- self.assertNotIn("pink", [n for n, f in new_state.models["test_rnfl", "pony"].fields])
- # Test the database alteration
- self.assertColumnExists("test_rnfl_pony", "pink")
- self.assertColumnNotExists("test_rnfl_pony", "blue")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_rnfl", editor, project_state, new_state)
- self.assertColumnExists("test_rnfl_pony", "blue")
- self.assertColumnNotExists("test_rnfl_pony", "pink")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_rnfl", editor, new_state, project_state)
- self.assertColumnExists("test_rnfl_pony", "pink")
- self.assertColumnNotExists("test_rnfl_pony", "blue")
- def test_alter_unique_together(self):
- """
- Tests the AlterUniqueTogether operation.
- """
- project_state = self.set_up_test_model("test_alunto")
- # Test the state alteration
- operation = migrations.AlterUniqueTogether("Pony", [("pink", "weight")])
- new_state = project_state.clone()
- operation.state_forwards("test_alunto", new_state)
- self.assertEqual(len(project_state.models["test_alunto", "pony"].options.get("unique_together", set())), 0)
- self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
- # Make sure we can insert duplicate rows
- with connection.cursor() as cursor:
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- cursor.execute("DELETE FROM test_alunto_pony")
- # Test the database alteration
- with connection.schema_editor() as editor:
- operation.database_forwards("test_alunto", editor, project_state, new_state)
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- with self.assertRaises(IntegrityError):
- with atomic():
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- cursor.execute("DELETE FROM test_alunto_pony")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_alunto", editor, new_state, project_state)
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- cursor.execute("INSERT INTO test_alunto_pony (pink, weight) VALUES (1, 1)")
- cursor.execute("DELETE FROM test_alunto_pony")
- # Test flat unique_together
- operation = migrations.AlterUniqueTogether("Pony", ("pink", "weight"))
- operation.state_forwards("test_alunto", new_state)
- self.assertEqual(len(new_state.models["test_alunto", "pony"].options.get("unique_together", set())), 1)
- def test_alter_index_together(self):
- """
- Tests the AlterIndexTogether operation.
- """
- project_state = self.set_up_test_model("test_alinto")
- # Test the state alteration
- operation = migrations.AlterIndexTogether("Pony", [("pink", "weight")])
- new_state = project_state.clone()
- operation.state_forwards("test_alinto", new_state)
- self.assertEqual(len(project_state.models["test_alinto", "pony"].options.get("index_together", set())), 0)
- self.assertEqual(len(new_state.models["test_alinto", "pony"].options.get("index_together", set())), 1)
- # Make sure there's no matching index
- self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
- # Test the database alteration
- with connection.schema_editor() as editor:
- operation.database_forwards("test_alinto", editor, project_state, new_state)
- self.assertIndexExists("test_alinto_pony", ["pink", "weight"])
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_alinto", editor, new_state, project_state)
- self.assertIndexNotExists("test_alinto_pony", ["pink", "weight"])
- @unittest.skipIf(sqlparse is None and connection.features.requires_sqlparse_for_splitting, "Missing sqlparse")
- def test_run_sql(self):
- """
- Tests the RunSQL operation.
- """
- project_state = self.set_up_test_model("test_runsql")
- # Create the operation
- operation = migrations.RunSQL(
- # Use a multi-line string with a commment to test splitting on SQLite and MySQL respectively
- "CREATE TABLE i_love_ponies (id int, special_thing int);\n"
- "INSERT INTO i_love_ponies (id, special_thing) VALUES (1, 42); -- this is magic!\n"
- "INSERT INTO i_love_ponies (id, special_thing) VALUES (2, 51);\n",
- "DROP TABLE i_love_ponies",
- state_operations=[migrations.CreateModel("SomethingElse", [("id", models.AutoField(primary_key=True))])],
- )
- # Test the state alteration
- new_state = project_state.clone()
- operation.state_forwards("test_runsql", new_state)
- self.assertEqual(len(new_state.models["test_runsql", "somethingelse"].fields), 1)
- # Make sure there's no table
- self.assertTableNotExists("i_love_ponies")
- # Test the database alteration
- with connection.schema_editor() as editor:
- operation.database_forwards("test_runsql", editor, project_state, new_state)
- self.assertTableExists("i_love_ponies")
- # Make sure all the SQL was processed
- with connection.cursor() as cursor:
- cursor.execute("SELECT COUNT(*) FROM i_love_ponies")
- self.assertEqual(cursor.fetchall()[0][0], 2)
- # And test reversal
- self.assertTrue(operation.reversible)
- with connection.schema_editor() as editor:
- operation.database_backwards("test_runsql", editor, new_state, project_state)
- self.assertTableNotExists("i_love_ponies")
- def test_run_python(self):
- """
- Tests the RunPython operation
- """
- project_state = self.set_up_test_model("test_runpython", mti_model=True)
- # Create the operation
- def inner_method(models, schema_editor):
- Pony = models.get_model("test_runpython", "Pony")
- Pony.objects.create(pink=1, weight=3.55)
- Pony.objects.create(weight=5)
- def inner_method_reverse(models, schema_editor):
- Pony = models.get_model("test_runpython", "Pony")
- Pony.objects.filter(pink=1, weight=3.55).delete()
- Pony.objects.filter(weight=5).delete()
- operation = migrations.RunPython(inner_method, reverse_code=inner_method_reverse)
- # Test the state alteration does nothing
- new_state = project_state.clone()
- operation.state_forwards("test_runpython", new_state)
- self.assertEqual(new_state, project_state)
- # Test the database alteration
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
- with connection.schema_editor() as editor:
- operation.database_forwards("test_runpython", editor, project_state, new_state)
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
- # Now test reversal
- self.assertTrue(operation.reversible)
- with connection.schema_editor() as editor:
- operation.database_backwards("test_runpython", editor, project_state, new_state)
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 0)
- # Now test we can't use a string
- with self.assertRaises(ValueError):
- operation = migrations.RunPython("print 'ahahaha'")
- # Also test reversal fails, with an operation identical to above but without reverse_code set
- no_reverse_operation = migrations.RunPython(inner_method)
- self.assertFalse(no_reverse_operation.reversible)
- with connection.schema_editor() as editor:
- no_reverse_operation.database_forwards("test_runpython", editor, project_state, new_state)
- with self.assertRaises(NotImplementedError):
- no_reverse_operation.database_backwards("test_runpython", editor, new_state, project_state)
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 2)
- def create_ponies(models, schema_editor):
- Pony = models.get_model("test_runpython", "Pony")
- pony1 = Pony.objects.create(pink=1, weight=3.55)
- self.assertIsNot(pony1.pk, None)
- pony2 = Pony.objects.create(weight=5)
- self.assertIsNot(pony2.pk, None)
- self.assertNotEqual(pony1.pk, pony2.pk)
- operation = migrations.RunPython(create_ponies)
- with connection.schema_editor() as editor:
- operation.database_forwards("test_runpython", editor, project_state, new_state)
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 4)
- def create_shetlandponies(models, schema_editor):
- ShetlandPony = models.get_model("test_runpython", "ShetlandPony")
- pony1 = ShetlandPony.objects.create(weight=4.0)
- self.assertIsNot(pony1.pk, None)
- pony2 = ShetlandPony.objects.create(weight=5.0)
- self.assertIsNot(pony2.pk, None)
- self.assertNotEqual(pony1.pk, pony2.pk)
- operation = migrations.RunPython(create_shetlandponies)
- with connection.schema_editor() as editor:
- operation.database_forwards("test_runpython", editor, project_state, new_state)
- self.assertEqual(project_state.render().get_model("test_runpython", "Pony").objects.count(), 6)
- self.assertEqual(project_state.render().get_model("test_runpython", "ShetlandPony").objects.count(), 2)
- def test_run_python_atomic(self):
- """
- Tests the RunPython operation correctly handles the "atomic" keyword
- """
- project_state = self.set_up_test_model("test_runpythonatomic", mti_model=True)
- def inner_method(models, schema_editor):
- Pony = models.get_model("test_runpythonatomic", "Pony")
- Pony.objects.create(pink=1, weight=3.55)
- raise ValueError("Adrian hates ponies.")
- atomic_migration = Migration("test", "test_runpythonatomic")
- atomic_migration.operations = [migrations.RunPython(inner_method)]
- non_atomic_migration = Migration("test", "test_runpythonatomic")
- non_atomic_migration.operations = [migrations.RunPython(inner_method, atomic=False)]
- # If we're a fully-transactional database, both versions should rollback
- if connection.features.can_rollback_ddl:
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0)
- with self.assertRaises(ValueError):
- with connection.schema_editor() as editor:
- atomic_migration.apply(project_state, editor)
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0)
- with self.assertRaises(ValueError):
- with connection.schema_editor() as editor:
- non_atomic_migration.apply(project_state, editor)
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0)
- # Otherwise, the non-atomic operation should leave a row there
- else:
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0)
- with self.assertRaises(ValueError):
- with connection.schema_editor() as editor:
- atomic_migration.apply(project_state, editor)
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 0)
- with self.assertRaises(ValueError):
- with connection.schema_editor() as editor:
- non_atomic_migration.apply(project_state, editor)
- self.assertEqual(project_state.render().get_model("test_runpythonatomic", "Pony").objects.count(), 1)
- class MigrateNothingRouter(object):
- """
- A router that sends all writes to the other database.
- """
- def allow_migrate(self, db, model):
- return False
- class MultiDBOperationTests(MigrationTestBase):
- multi_db = True
- def setUp(self):
- # Make the 'other' database appear to be a replica of the 'default'
- self.old_routers = router.routers
- router.routers = [MigrateNothingRouter()]
- def tearDown(self):
- # Restore the 'other' database as an independent database
- router.routers = self.old_routers
- def test_create_model(self):
- """
- Tests that CreateModel honours multi-db settings.
- """
- operation = migrations.CreateModel(
- "Pony",
- [
- ("id", models.AutoField(primary_key=True)),
- ("pink", models.IntegerField(default=1)),
- ],
- )
- # Test the state alteration
- project_state = ProjectState()
- new_state = project_state.clone()
- operation.state_forwards("test_crmo", new_state)
- # Test the database alteration
- self.assertTableNotExists("test_crmo_pony")
- with connection.schema_editor() as editor:
- operation.database_forwards("test_crmo", editor, project_state, new_state)
- self.assertTableNotExists("test_crmo_pony")
- # And test reversal
- with connection.schema_editor() as editor:
- operation.database_backwards("test_crmo", editor, new_state, project_state)
- self.assertTableNotExists("test_crmo_pony")
|