123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581 |
- import unittest
- from migrations.test_base import OperationTestBase
- from django.db import IntegrityError, NotSupportedError, connection, transaction
- from django.db.migrations.state import ProjectState
- from django.db.models import CheckConstraint, Index, Q, UniqueConstraint
- from django.db.utils import ProgrammingError
- from django.test import modify_settings, override_settings
- from django.test.utils import CaptureQueriesContext
- from . import PostgreSQLTestCase
- try:
- from django.contrib.postgres.indexes import BrinIndex, BTreeIndex
- from django.contrib.postgres.operations import (
- AddConstraintNotValid,
- AddIndexConcurrently,
- BloomExtension,
- CreateCollation,
- CreateExtension,
- RemoveCollation,
- RemoveIndexConcurrently,
- ValidateConstraint,
- )
- except ImportError:
- pass
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- @modify_settings(INSTALLED_APPS={"append": "migrations"})
- class AddIndexConcurrentlyTests(OperationTestBase):
- app_label = "test_add_concurrently"
- def test_requires_atomic_false(self):
- project_state = self.set_up_test_model(self.app_label)
- new_state = project_state.clone()
- operation = AddIndexConcurrently(
- "Pony",
- Index(fields=["pink"], name="pony_pink_idx"),
- )
- msg = (
- "The AddIndexConcurrently operation cannot be executed inside "
- "a transaction (set atomic = False on the migration)."
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- def test_add(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = "%s_pony" % self.app_label
- index = Index(fields=["pink"], name="pony_pink_idx")
- new_state = project_state.clone()
- operation = AddIndexConcurrently("Pony", index)
- self.assertEqual(
- operation.describe(),
- "Concurrently create index pony_pink_idx on field(s) pink of model Pony",
- )
- self.assertEqual(
- operation.formatted_description(),
- "+ Concurrently create index pony_pink_idx on field(s) pink of model Pony",
- )
- operation.state_forwards(self.app_label, new_state)
- self.assertEqual(
- len(new_state.models[self.app_label, "pony"].options["indexes"]), 1
- )
- self.assertIndexNotExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertIndexExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertIndexNotExists(table_name, ["pink"])
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "AddIndexConcurrently")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"model_name": "Pony", "index": index})
- def test_add_other_index_type(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = "%s_pony" % self.app_label
- new_state = project_state.clone()
- operation = AddIndexConcurrently(
- "Pony",
- BrinIndex(fields=["pink"], name="pony_pink_brin_idx"),
- )
- self.assertIndexNotExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertIndexExists(table_name, ["pink"], index_type="brin")
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertIndexNotExists(table_name, ["pink"])
- def test_add_with_options(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = "%s_pony" % self.app_label
- new_state = project_state.clone()
- index = BTreeIndex(fields=["pink"], name="pony_pink_btree_idx", fillfactor=70)
- operation = AddIndexConcurrently("Pony", index)
- self.assertIndexNotExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertIndexExists(table_name, ["pink"], index_type="btree")
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertIndexNotExists(table_name, ["pink"])
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- @modify_settings(INSTALLED_APPS={"append": "migrations"})
- class RemoveIndexConcurrentlyTests(OperationTestBase):
- app_label = "test_rm_concurrently"
- def test_requires_atomic_false(self):
- project_state = self.set_up_test_model(self.app_label, index=True)
- new_state = project_state.clone()
- operation = RemoveIndexConcurrently("Pony", "pony_pink_idx")
- msg = (
- "The RemoveIndexConcurrently operation cannot be executed inside "
- "a transaction (set atomic = False on the migration)."
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- def test_remove(self):
- project_state = self.set_up_test_model(self.app_label, index=True)
- table_name = "%s_pony" % self.app_label
- self.assertTableExists(table_name)
- new_state = project_state.clone()
- operation = RemoveIndexConcurrently("Pony", "pony_pink_idx")
- self.assertEqual(
- operation.describe(),
- "Concurrently remove index pony_pink_idx from Pony",
- )
- self.assertEqual(
- operation.formatted_description(),
- "- Concurrently remove index pony_pink_idx from Pony",
- )
- operation.state_forwards(self.app_label, new_state)
- self.assertEqual(
- len(new_state.models[self.app_label, "pony"].options["indexes"]), 0
- )
- self.assertIndexExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertIndexNotExists(table_name, ["pink"])
-
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertIndexExists(table_name, ["pink"])
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "RemoveIndexConcurrently")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"model_name": "Pony", "name": "pony_pink_idx"})
- class NoMigrationRouter:
- def allow_migrate(self, db, app_label, **hints):
- return False
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- class CreateExtensionTests(PostgreSQLTestCase):
- app_label = "test_allow_create_extention"
- @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
- def test_no_allow_migrate(self):
- operation = CreateExtension("tablefunc")
- self.assertEqual(
- operation.formatted_description(), "+ Creates extension tablefunc"
- )
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 0)
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 0)
- def test_allow_migrate(self):
- operation = CreateExtension("tablefunc")
- self.assertEqual(
- operation.migration_name_fragment, "create_extension_tablefunc"
- )
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 4)
- self.assertIn("CREATE EXTENSION IF NOT EXISTS", captured_queries[1]["sql"])
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 2)
- self.assertIn("DROP EXTENSION IF EXISTS", captured_queries[1]["sql"])
- def test_create_existing_extension(self):
- operation = BloomExtension()
- self.assertEqual(operation.migration_name_fragment, "create_extension_bloom")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 3)
- self.assertIn("SELECT", captured_queries[0]["sql"])
- def test_drop_nonexistent_extension(self):
- operation = CreateExtension("tablefunc")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("SELECT", captured_queries[0]["sql"])
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- class CreateCollationTests(PostgreSQLTestCase):
- app_label = "test_allow_create_collation"
- @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
- def test_no_allow_migrate(self):
- operation = CreateCollation("C_test", locale="C")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 0)
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 0)
- def test_create(self):
- operation = CreateCollation("C_test", locale="C")
- self.assertEqual(operation.migration_name_fragment, "create_collation_c_test")
- self.assertEqual(operation.describe(), "Create collation C_test")
- self.assertEqual(operation.formatted_description(), "+ Create collation C_test")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
-
- with self.assertRaisesMessage(ProgrammingError, "already exists"):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "CreateCollation")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"name": "C_test", "locale": "C"})
- def test_create_non_deterministic_collation(self):
- operation = CreateCollation(
- "case_insensitive_test",
- "und-u-ks-level2",
- provider="icu",
- deterministic=False,
- )
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "CreateCollation")
- self.assertEqual(args, [])
- self.assertEqual(
- kwargs,
- {
- "name": "case_insensitive_test",
- "locale": "und-u-ks-level2",
- "provider": "icu",
- "deterministic": False,
- },
- )
- def test_create_collation_alternate_provider(self):
- operation = CreateCollation(
- "german_phonebook_test",
- provider="icu",
- locale="de-u-co-phonebk",
- )
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- class RemoveCollationTests(PostgreSQLTestCase):
- app_label = "test_allow_remove_collation"
- @override_settings(DATABASE_ROUTERS=[NoMigrationRouter()])
- def test_no_allow_migrate(self):
- operation = RemoveCollation("C_test", locale="C")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 0)
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 0)
- def test_remove(self):
- operation = CreateCollation("C_test", locale="C")
- project_state = ProjectState()
- new_state = project_state.clone()
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- operation = RemoveCollation("C_test", locale="C")
- self.assertEqual(operation.migration_name_fragment, "remove_collation_c_test")
- self.assertEqual(operation.describe(), "Remove collation C_test")
- self.assertEqual(operation.formatted_description(), "- Remove collation C_test")
- project_state = ProjectState()
- new_state = project_state.clone()
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("DROP COLLATION", captured_queries[0]["sql"])
-
- with self.assertRaisesMessage(ProgrammingError, "does not exist"):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
-
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
- self.assertEqual(len(captured_queries), 1)
- self.assertIn("CREATE COLLATION", captured_queries[0]["sql"])
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "RemoveCollation")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"name": "C_test", "locale": "C"})
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- @modify_settings(INSTALLED_APPS={"append": "migrations"})
- class AddConstraintNotValidTests(OperationTestBase):
- app_label = "test_add_constraint_not_valid"
- def test_non_check_constraint_not_supported(self):
- constraint = UniqueConstraint(fields=["pink"], name="pony_pink_uniq")
- msg = "AddConstraintNotValid.constraint must be a check constraint."
- with self.assertRaisesMessage(TypeError, msg):
- AddConstraintNotValid(model_name="pony", constraint=constraint)
- def test_add(self):
- table_name = f"{self.app_label}_pony"
- constraint_name = "pony_pink_gte_check"
- constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name)
- operation = AddConstraintNotValid("Pony", constraint=constraint)
- project_state, new_state = self.make_test_state(self.app_label, operation)
- self.assertEqual(
- operation.describe(),
- f"Create not valid constraint {constraint_name} on model Pony",
- )
- self.assertEqual(
- operation.formatted_description(),
- f"+ Create not valid constraint {constraint_name} on model Pony",
- )
- self.assertEqual(
- operation.migration_name_fragment,
- f"pony_{constraint_name}_not_valid",
- )
- self.assertEqual(
- len(new_state.models[self.app_label, "pony"].options["constraints"]),
- 1,
- )
- self.assertConstraintNotExists(table_name, constraint_name)
- Pony = new_state.apps.get_model(self.app_label, "Pony")
- self.assertEqual(len(Pony._meta.constraints), 1)
- Pony.objects.create(pink=2, weight=1.0)
-
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- msg = f'check constraint "{constraint_name}"'
- with self.assertRaisesMessage(IntegrityError, msg), transaction.atomic():
- Pony.objects.create(pink=3, weight=1.0)
- self.assertConstraintExists(table_name, constraint_name)
-
- with connection.schema_editor(atomic=True) as editor:
- operation.database_backwards(
- self.app_label, editor, project_state, new_state
- )
- self.assertConstraintNotExists(table_name, constraint_name)
- Pony.objects.create(pink=3, weight=1.0)
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "AddConstraintNotValid")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"model_name": "Pony", "constraint": constraint})
- @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL specific tests.")
- @modify_settings(INSTALLED_APPS={"append": "migrations"})
- class ValidateConstraintTests(OperationTestBase):
- app_label = "test_validate_constraint"
- def test_validate(self):
- constraint_name = "pony_pink_gte_check"
- constraint = CheckConstraint(check=Q(pink__gte=4), name=constraint_name)
- operation = AddConstraintNotValid("Pony", constraint=constraint)
- project_state, new_state = self.make_test_state(self.app_label, operation)
- Pony = new_state.apps.get_model(self.app_label, "Pony")
- obj = Pony.objects.create(pink=2, weight=1.0)
-
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- project_state = new_state
- new_state = new_state.clone()
- operation = ValidateConstraint("Pony", name=constraint_name)
- operation.state_forwards(self.app_label, new_state)
- self.assertEqual(
- operation.describe(),
- f"Validate constraint {constraint_name} on model Pony",
- )
- self.assertEqual(
- operation.formatted_description(),
- f"~ Validate constraint {constraint_name} on model Pony",
- )
- self.assertEqual(
- operation.migration_name_fragment,
- f"pony_validate_{constraint_name}",
- )
-
- with connection.schema_editor(atomic=True) as editor:
- msg = f'check constraint "{constraint_name}"'
- with self.assertRaisesMessage(IntegrityError, msg):
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
- obj.pink = 5
- obj.save()
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(
- self.app_label, editor, project_state, new_state
- )
-
- with connection.schema_editor() as editor:
- with self.assertNumQueries(0):
- operation.database_backwards(
- self.app_label, editor, new_state, project_state
- )
-
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, "ValidateConstraint")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {"model_name": "Pony", "name": constraint_name})
|