123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- import unittest
- from migrations.test_base import OperationTestBase
- from django.db import NotSupportedError, connection
- from django.db.migrations.state import ProjectState
- from django.db.models import Index
- from django.test import modify_settings, override_settings
- from django.test.utils import CaptureQueriesContext
- from . import PostgreSQLTestCase
- try:
- from django.contrib.postgres.indexes import BrinIndex, BTreeIndex
- from django.contrib.postgres.operations import (
- AddIndexConcurrently, BloomExtension, CreateExtension,
- RemoveIndexConcurrently,
- )
- except ImportError:
- pass
- @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
- @modify_settings(INSTALLED_APPS={'append': 'migrations'})
- class AddIndexConcurrentlyTests(OperationTestBase):
- app_label = 'test_add_concurrently'
- def test_requires_atomic_false(self):
- project_state = self.set_up_test_model(self.app_label)
- new_state = project_state.clone()
- operation = AddIndexConcurrently(
- 'Pony',
- Index(fields=['pink'], name='pony_pink_idx'),
- )
- msg = (
- 'The AddIndexConcurrently operation cannot be executed inside '
- 'a transaction (set atomic = False on the migration).'
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- def test_add(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = '%s_pony' % self.app_label
- index = Index(fields=['pink'], name='pony_pink_idx')
- new_state = project_state.clone()
- operation = AddIndexConcurrently('Pony', index)
- self.assertEqual(
- operation.describe(),
- 'Concurrently create index pony_pink_idx on field(s) pink of '
- 'model Pony'
- )
- operation.state_forwards(self.app_label, new_state)
- self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 1)
- self.assertIndexNotExists(table_name, ['pink'])
- # Add index.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertIndexExists(table_name, ['pink'])
- # Reversal.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertIndexNotExists(table_name, ['pink'])
- # Deconstruction.
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, 'AddIndexConcurrently')
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {'model_name': 'Pony', 'index': index})
- def test_add_other_index_type(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = '%s_pony' % self.app_label
- new_state = project_state.clone()
- operation = AddIndexConcurrently(
- 'Pony',
- BrinIndex(fields=['pink'], name='pony_pink_brin_idx'),
- )
- self.assertIndexNotExists(table_name, ['pink'])
- # Add index.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertIndexExists(table_name, ['pink'], index_type='brin')
- # Reversal.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertIndexNotExists(table_name, ['pink'])
- def test_add_with_options(self):
- project_state = self.set_up_test_model(self.app_label, index=False)
- table_name = '%s_pony' % self.app_label
- new_state = project_state.clone()
- index = BTreeIndex(fields=['pink'], name='pony_pink_btree_idx', fillfactor=70)
- operation = AddIndexConcurrently('Pony', index)
- self.assertIndexNotExists(table_name, ['pink'])
- # Add index.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertIndexExists(table_name, ['pink'], index_type='btree')
- # Reversal.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertIndexNotExists(table_name, ['pink'])
- @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
- @modify_settings(INSTALLED_APPS={'append': 'migrations'})
- class RemoveIndexConcurrentlyTests(OperationTestBase):
- app_label = 'test_rm_concurrently'
- def test_requires_atomic_false(self):
- project_state = self.set_up_test_model(self.app_label, index=True)
- new_state = project_state.clone()
- operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx')
- msg = (
- 'The RemoveIndexConcurrently operation cannot be executed inside '
- 'a transaction (set atomic = False on the migration).'
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- with connection.schema_editor(atomic=True) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- def test_remove(self):
- project_state = self.set_up_test_model(self.app_label, index=True)
- table_name = '%s_pony' % self.app_label
- self.assertTableExists(table_name)
- new_state = project_state.clone()
- operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx')
- self.assertEqual(
- operation.describe(),
- 'Concurrently remove index pony_pink_idx from Pony',
- )
- operation.state_forwards(self.app_label, new_state)
- self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 0)
- self.assertIndexExists(table_name, ['pink'])
- # Remove index.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertIndexNotExists(table_name, ['pink'])
- # Reversal.
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertIndexExists(table_name, ['pink'])
- # Deconstruction.
- name, args, kwargs = operation.deconstruct()
- self.assertEqual(name, 'RemoveIndexConcurrently')
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {'model_name': 'Pony', 'name': 'pony_pink_idx'})
- class NoExtensionRouter():
- def allow_migrate(self, db, app_label, **hints):
- return False
- @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
- class CreateExtensionTests(PostgreSQLTestCase):
- app_label = 'test_allow_create_extention'
- @override_settings(DATABASE_ROUTERS=[NoExtensionRouter()])
- def test_no_allow_migrate(self):
- operation = CreateExtension('tablefunc')
- project_state = ProjectState()
- new_state = project_state.clone()
- # Don't create an extension.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertEqual(len(captured_queries), 0)
- # Reversal.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertEqual(len(captured_queries), 0)
- def test_allow_migrate(self):
- operation = CreateExtension('tablefunc')
- self.assertEqual(operation.migration_name_fragment, 'create_extension_tablefunc')
- project_state = ProjectState()
- new_state = project_state.clone()
- # Create an extension.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertEqual(len(captured_queries), 4)
- self.assertIn('CREATE EXTENSION IF NOT EXISTS', captured_queries[1]['sql'])
- # Reversal.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, new_state, project_state)
- self.assertEqual(len(captured_queries), 2)
- self.assertIn('DROP EXTENSION IF EXISTS', captured_queries[1]['sql'])
- def test_create_existing_extension(self):
- operation = BloomExtension()
- self.assertEqual(operation.migration_name_fragment, 'create_extension_bloom')
- project_state = ProjectState()
- new_state = project_state.clone()
- # Don't create an existing extension.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_forwards(self.app_label, editor, project_state, new_state)
- self.assertEqual(len(captured_queries), 3)
- self.assertIn('SELECT', captured_queries[0]['sql'])
- def test_drop_nonexistent_extension(self):
- operation = CreateExtension('tablefunc')
- project_state = ProjectState()
- new_state = project_state.clone()
- # Don't drop a nonexistent extension.
- with CaptureQueriesContext(connection) as captured_queries:
- with connection.schema_editor(atomic=False) as editor:
- operation.database_backwards(self.app_label, editor, project_state, new_state)
- self.assertEqual(len(captured_queries), 1)
- self.assertIn('SELECT', captured_queries[0]['sql'])
|