test_operations.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from __future__ import unicode_literals
  2. from django.db import connection, migrations, models
  3. from django.db.migrations.migration import Migration
  4. from django.db.migrations.state import ProjectState
  5. from django.test import TransactionTestCase, skipUnlessDBFeature
  6. from ..utils import mysql
  7. if connection.features.gis_enabled:
  8. from django.contrib.gis.db.models import fields
  9. try:
  10. GeometryColumns = connection.ops.geometry_columns()
  11. HAS_GEOMETRY_COLUMNS = True
  12. except NotImplementedError:
  13. HAS_GEOMETRY_COLUMNS = False
  14. @skipUnlessDBFeature("gis_enabled")
  15. class OperationTests(TransactionTestCase):
  16. available_apps = ["gis_tests.gis_migrations"]
  17. def tearDown(self):
  18. # Delete table after testing
  19. self.apply_operations('gis', self.current_state, [migrations.DeleteModel("Neighborhood")])
  20. super(OperationTests, self).tearDown()
  21. def get_table_description(self, table):
  22. with connection.cursor() as cursor:
  23. return connection.introspection.get_table_description(cursor, table)
  24. def assertColumnExists(self, table, column):
  25. self.assertIn(column, [c.name for c in self.get_table_description(table)])
  26. def assertColumnNotExists(self, table, column):
  27. self.assertNotIn(column, [c.name for c in self.get_table_description(table)])
  28. def apply_operations(self, app_label, project_state, operations):
  29. migration = Migration('name', app_label)
  30. migration.operations = operations
  31. with connection.schema_editor() as editor:
  32. return migration.apply(project_state, editor)
  33. def set_up_test_model(self):
  34. operations = [migrations.CreateModel(
  35. "Neighborhood",
  36. [
  37. ("id", models.AutoField(primary_key=True)),
  38. ('name', models.CharField(max_length=100, unique=True)),
  39. ('geom', fields.MultiPolygonField(srid=4326)),
  40. ],
  41. )]
  42. return self.apply_operations('gis', ProjectState(), operations)
  43. def assertGeometryColumnsCount(self, expected_count):
  44. table_name = "gis_neighborhood"
  45. if connection.features.uppercases_column_names:
  46. table_name = table_name.upper()
  47. self.assertEqual(
  48. GeometryColumns.objects.filter(**{
  49. GeometryColumns.table_name_col(): table_name,
  50. }).count(),
  51. expected_count
  52. )
  53. def test_add_gis_field(self):
  54. """
  55. Tests the AddField operation with a GIS-enabled column.
  56. """
  57. project_state = self.set_up_test_model()
  58. self.current_state = project_state
  59. operation = migrations.AddField(
  60. "Neighborhood",
  61. "path",
  62. fields.LineStringField(srid=4326),
  63. )
  64. new_state = project_state.clone()
  65. operation.state_forwards("gis", new_state)
  66. with connection.schema_editor() as editor:
  67. operation.database_forwards("gis", editor, project_state, new_state)
  68. self.current_state = new_state
  69. self.assertColumnExists("gis_neighborhood", "path")
  70. # Test GeometryColumns when available
  71. if HAS_GEOMETRY_COLUMNS:
  72. self.assertGeometryColumnsCount(2)
  73. if self.has_spatial_indexes:
  74. with connection.cursor() as cursor:
  75. indexes = connection.introspection.get_indexes(cursor, "gis_neighborhood")
  76. self.assertIn('path', indexes)
  77. def test_add_blank_gis_field(self):
  78. """
  79. Should be able to add a GeometryField with blank=True.
  80. """
  81. project_state = self.set_up_test_model()
  82. self.current_state = project_state
  83. operation = migrations.AddField(
  84. "Neighborhood",
  85. "path",
  86. fields.LineStringField(blank=True, srid=4326),
  87. )
  88. new_state = project_state.clone()
  89. operation.state_forwards("gis", new_state)
  90. with connection.schema_editor() as editor:
  91. operation.database_forwards("gis", editor, project_state, new_state)
  92. self.current_state = new_state
  93. self.assertColumnExists("gis_neighborhood", "path")
  94. # Test GeometryColumns when available
  95. if HAS_GEOMETRY_COLUMNS:
  96. self.assertGeometryColumnsCount(2)
  97. if self.has_spatial_indexes:
  98. with connection.cursor() as cursor:
  99. indexes = connection.introspection.get_indexes(cursor, "gis_neighborhood")
  100. self.assertIn('path', indexes)
  101. def test_remove_gis_field(self):
  102. """
  103. Tests the RemoveField operation with a GIS-enabled column.
  104. """
  105. project_state = self.set_up_test_model()
  106. self.current_state = project_state
  107. operation = migrations.RemoveField("Neighborhood", "geom")
  108. new_state = project_state.clone()
  109. operation.state_forwards("gis", new_state)
  110. with connection.schema_editor() as editor:
  111. operation.database_forwards("gis", editor, project_state, new_state)
  112. self.current_state = new_state
  113. self.assertColumnNotExists("gis_neighborhood", "geom")
  114. # Test GeometryColumns when available
  115. if HAS_GEOMETRY_COLUMNS:
  116. self.assertGeometryColumnsCount(0)
  117. def test_create_model_spatial_index(self):
  118. self.current_state = self.set_up_test_model()
  119. if not self.has_spatial_indexes:
  120. self.skipTest("No support for Spatial indexes")
  121. with connection.cursor() as cursor:
  122. indexes = connection.introspection.get_indexes(cursor, "gis_neighborhood")
  123. self.assertIn('geom', indexes)
  124. @property
  125. def has_spatial_indexes(self):
  126. if mysql:
  127. with connection.cursor() as cursor:
  128. return connection.introspection.supports_spatial_index(cursor, "gis_neighborhood")
  129. return True