test_operations.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import unittest
  2. from migrations.test_base import OperationTestBase
  3. from django.db import NotSupportedError, connection
  4. from django.db.migrations.state import ProjectState
  5. from django.db.models import Index
  6. from django.test import modify_settings, override_settings
  7. from django.test.utils import CaptureQueriesContext
  8. from . import PostgreSQLTestCase
  9. try:
  10. from django.contrib.postgres.indexes import BrinIndex, BTreeIndex
  11. from django.contrib.postgres.operations import (
  12. AddIndexConcurrently, BloomExtension, CreateExtension,
  13. RemoveIndexConcurrently,
  14. )
  15. except ImportError:
  16. pass
  17. @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
  18. @modify_settings(INSTALLED_APPS={'append': 'migrations'})
  19. class AddIndexConcurrentlyTests(OperationTestBase):
  20. app_label = 'test_add_concurrently'
  21. def test_requires_atomic_false(self):
  22. project_state = self.set_up_test_model(self.app_label)
  23. new_state = project_state.clone()
  24. operation = AddIndexConcurrently(
  25. 'Pony',
  26. Index(fields=['pink'], name='pony_pink_idx'),
  27. )
  28. msg = (
  29. 'The AddIndexConcurrently operation cannot be executed inside '
  30. 'a transaction (set atomic = False on the migration).'
  31. )
  32. with self.assertRaisesMessage(NotSupportedError, msg):
  33. with connection.schema_editor(atomic=True) as editor:
  34. operation.database_forwards(self.app_label, editor, project_state, new_state)
  35. def test_add(self):
  36. project_state = self.set_up_test_model(self.app_label, index=False)
  37. table_name = '%s_pony' % self.app_label
  38. index = Index(fields=['pink'], name='pony_pink_idx')
  39. new_state = project_state.clone()
  40. operation = AddIndexConcurrently('Pony', index)
  41. self.assertEqual(
  42. operation.describe(),
  43. 'Concurrently create index pony_pink_idx on field(s) pink of '
  44. 'model Pony'
  45. )
  46. operation.state_forwards(self.app_label, new_state)
  47. self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 1)
  48. self.assertIndexNotExists(table_name, ['pink'])
  49. # Add index.
  50. with connection.schema_editor(atomic=False) as editor:
  51. operation.database_forwards(self.app_label, editor, project_state, new_state)
  52. self.assertIndexExists(table_name, ['pink'])
  53. # Reversal.
  54. with connection.schema_editor(atomic=False) as editor:
  55. operation.database_backwards(self.app_label, editor, new_state, project_state)
  56. self.assertIndexNotExists(table_name, ['pink'])
  57. # Deconstruction.
  58. name, args, kwargs = operation.deconstruct()
  59. self.assertEqual(name, 'AddIndexConcurrently')
  60. self.assertEqual(args, [])
  61. self.assertEqual(kwargs, {'model_name': 'Pony', 'index': index})
  62. def test_add_other_index_type(self):
  63. project_state = self.set_up_test_model(self.app_label, index=False)
  64. table_name = '%s_pony' % self.app_label
  65. new_state = project_state.clone()
  66. operation = AddIndexConcurrently(
  67. 'Pony',
  68. BrinIndex(fields=['pink'], name='pony_pink_brin_idx'),
  69. )
  70. self.assertIndexNotExists(table_name, ['pink'])
  71. # Add index.
  72. with connection.schema_editor(atomic=False) as editor:
  73. operation.database_forwards(self.app_label, editor, project_state, new_state)
  74. self.assertIndexExists(table_name, ['pink'], index_type='brin')
  75. # Reversal.
  76. with connection.schema_editor(atomic=False) as editor:
  77. operation.database_backwards(self.app_label, editor, new_state, project_state)
  78. self.assertIndexNotExists(table_name, ['pink'])
  79. def test_add_with_options(self):
  80. project_state = self.set_up_test_model(self.app_label, index=False)
  81. table_name = '%s_pony' % self.app_label
  82. new_state = project_state.clone()
  83. index = BTreeIndex(fields=['pink'], name='pony_pink_btree_idx', fillfactor=70)
  84. operation = AddIndexConcurrently('Pony', index)
  85. self.assertIndexNotExists(table_name, ['pink'])
  86. # Add index.
  87. with connection.schema_editor(atomic=False) as editor:
  88. operation.database_forwards(self.app_label, editor, project_state, new_state)
  89. self.assertIndexExists(table_name, ['pink'], index_type='btree')
  90. # Reversal.
  91. with connection.schema_editor(atomic=False) as editor:
  92. operation.database_backwards(self.app_label, editor, new_state, project_state)
  93. self.assertIndexNotExists(table_name, ['pink'])
  94. @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
  95. @modify_settings(INSTALLED_APPS={'append': 'migrations'})
  96. class RemoveIndexConcurrentlyTests(OperationTestBase):
  97. app_label = 'test_rm_concurrently'
  98. def test_requires_atomic_false(self):
  99. project_state = self.set_up_test_model(self.app_label, index=True)
  100. new_state = project_state.clone()
  101. operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx')
  102. msg = (
  103. 'The RemoveIndexConcurrently operation cannot be executed inside '
  104. 'a transaction (set atomic = False on the migration).'
  105. )
  106. with self.assertRaisesMessage(NotSupportedError, msg):
  107. with connection.schema_editor(atomic=True) as editor:
  108. operation.database_forwards(self.app_label, editor, project_state, new_state)
  109. def test_remove(self):
  110. project_state = self.set_up_test_model(self.app_label, index=True)
  111. table_name = '%s_pony' % self.app_label
  112. self.assertTableExists(table_name)
  113. new_state = project_state.clone()
  114. operation = RemoveIndexConcurrently('Pony', 'pony_pink_idx')
  115. self.assertEqual(
  116. operation.describe(),
  117. 'Concurrently remove index pony_pink_idx from Pony',
  118. )
  119. operation.state_forwards(self.app_label, new_state)
  120. self.assertEqual(len(new_state.models[self.app_label, 'pony'].options['indexes']), 0)
  121. self.assertIndexExists(table_name, ['pink'])
  122. # Remove index.
  123. with connection.schema_editor(atomic=False) as editor:
  124. operation.database_forwards(self.app_label, editor, project_state, new_state)
  125. self.assertIndexNotExists(table_name, ['pink'])
  126. # Reversal.
  127. with connection.schema_editor(atomic=False) as editor:
  128. operation.database_backwards(self.app_label, editor, new_state, project_state)
  129. self.assertIndexExists(table_name, ['pink'])
  130. # Deconstruction.
  131. name, args, kwargs = operation.deconstruct()
  132. self.assertEqual(name, 'RemoveIndexConcurrently')
  133. self.assertEqual(args, [])
  134. self.assertEqual(kwargs, {'model_name': 'Pony', 'name': 'pony_pink_idx'})
  135. class NoExtensionRouter():
  136. def allow_migrate(self, db, app_label, **hints):
  137. return False
  138. @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific tests.')
  139. class CreateExtensionTests(PostgreSQLTestCase):
  140. app_label = 'test_allow_create_extention'
  141. @override_settings(DATABASE_ROUTERS=[NoExtensionRouter()])
  142. def test_no_allow_migrate(self):
  143. operation = CreateExtension('tablefunc')
  144. project_state = ProjectState()
  145. new_state = project_state.clone()
  146. # Don't create an extension.
  147. with CaptureQueriesContext(connection) as captured_queries:
  148. with connection.schema_editor(atomic=False) as editor:
  149. operation.database_forwards(self.app_label, editor, project_state, new_state)
  150. self.assertEqual(len(captured_queries), 0)
  151. # Reversal.
  152. with CaptureQueriesContext(connection) as captured_queries:
  153. with connection.schema_editor(atomic=False) as editor:
  154. operation.database_backwards(self.app_label, editor, new_state, project_state)
  155. self.assertEqual(len(captured_queries), 0)
  156. def test_allow_migrate(self):
  157. operation = CreateExtension('tablefunc')
  158. self.assertEqual(operation.migration_name_fragment, 'create_extension_tablefunc')
  159. project_state = ProjectState()
  160. new_state = project_state.clone()
  161. # Create an extension.
  162. with CaptureQueriesContext(connection) as captured_queries:
  163. with connection.schema_editor(atomic=False) as editor:
  164. operation.database_forwards(self.app_label, editor, project_state, new_state)
  165. self.assertEqual(len(captured_queries), 4)
  166. self.assertIn('CREATE EXTENSION IF NOT EXISTS', captured_queries[1]['sql'])
  167. # Reversal.
  168. with CaptureQueriesContext(connection) as captured_queries:
  169. with connection.schema_editor(atomic=False) as editor:
  170. operation.database_backwards(self.app_label, editor, new_state, project_state)
  171. self.assertEqual(len(captured_queries), 2)
  172. self.assertIn('DROP EXTENSION IF EXISTS', captured_queries[1]['sql'])
  173. def test_create_existing_extension(self):
  174. operation = BloomExtension()
  175. self.assertEqual(operation.migration_name_fragment, 'create_extension_bloom')
  176. project_state = ProjectState()
  177. new_state = project_state.clone()
  178. # Don't create an existing extension.
  179. with CaptureQueriesContext(connection) as captured_queries:
  180. with connection.schema_editor(atomic=False) as editor:
  181. operation.database_forwards(self.app_label, editor, project_state, new_state)
  182. self.assertEqual(len(captured_queries), 3)
  183. self.assertIn('SELECT', captured_queries[0]['sql'])
  184. def test_drop_nonexistent_extension(self):
  185. operation = CreateExtension('tablefunc')
  186. project_state = ProjectState()
  187. new_state = project_state.clone()
  188. # Don't drop a nonexistent extension.
  189. with CaptureQueriesContext(connection) as captured_queries:
  190. with connection.schema_editor(atomic=False) as editor:
  191. operation.database_backwards(self.app_label, editor, project_state, new_state)
  192. self.assertEqual(len(captured_queries), 1)
  193. self.assertIn('SELECT', captured_queries[0]['sql'])