tests.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import os
  2. import re
  3. from io import StringIO
  4. from django.contrib.gis.gdal import GDAL_VERSION, Driver, GDALException
  5. from django.contrib.gis.utils.ogrinspect import ogrinspect
  6. from django.core.management import call_command
  7. from django.db import connection, connections
  8. from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
  9. from django.test.utils import modify_settings
  10. from ..test_data import TEST_DATA
  11. from .models import AllOGRFields
  12. class InspectDbTests(TestCase):
  13. def test_geom_columns(self):
  14. """
  15. Test the geo-enabled inspectdb command.
  16. """
  17. out = StringIO()
  18. call_command(
  19. 'inspectdb',
  20. table_name_filter=lambda tn: tn == 'inspectapp_allogrfields',
  21. stdout=out
  22. )
  23. output = out.getvalue()
  24. if connection.features.supports_geometry_field_introspection:
  25. self.assertIn('geom = models.PolygonField()', output)
  26. self.assertIn('point = models.PointField()', output)
  27. else:
  28. self.assertIn('geom = models.GeometryField(', output)
  29. self.assertIn('point = models.GeometryField(', output)
  30. @skipUnlessDBFeature("supports_3d_storage")
  31. def test_3d_columns(self):
  32. out = StringIO()
  33. call_command(
  34. 'inspectdb',
  35. table_name_filter=lambda tn: tn == 'inspectapp_fields3d',
  36. stdout=out
  37. )
  38. output = out.getvalue()
  39. if connection.features.supports_geometry_field_introspection:
  40. self.assertIn('point = models.PointField(dim=3)', output)
  41. if connection.features.supports_geography:
  42. self.assertIn('pointg = models.PointField(geography=True, dim=3)', output)
  43. else:
  44. self.assertIn('pointg = models.PointField(dim=3)', output)
  45. self.assertIn('line = models.LineStringField(dim=3)', output)
  46. self.assertIn('poly = models.PolygonField(dim=3)', output)
  47. else:
  48. self.assertIn('point = models.GeometryField(', output)
  49. self.assertIn('pointg = models.GeometryField(', output)
  50. self.assertIn('line = models.GeometryField(', output)
  51. self.assertIn('poly = models.GeometryField(', output)
  52. @modify_settings(
  53. INSTALLED_APPS={'append': 'django.contrib.gis'},
  54. )
  55. class OGRInspectTest(SimpleTestCase):
  56. maxDiff = 1024
  57. def test_poly(self):
  58. shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
  59. model_def = ogrinspect(shp_file, 'MyModel')
  60. expected = [
  61. '# This is an auto-generated Django model module created by ogrinspect.',
  62. 'from django.contrib.gis.db import models',
  63. '',
  64. '',
  65. 'class MyModel(models.Model):',
  66. ' float = models.FloatField()',
  67. ' int = models.BigIntegerField()',
  68. ' str = models.CharField(max_length=80)',
  69. ' geom = models.PolygonField()',
  70. ]
  71. self.assertEqual(model_def, '\n'.join(expected))
  72. def test_poly_multi(self):
  73. shp_file = os.path.join(TEST_DATA, 'test_poly', 'test_poly.shp')
  74. model_def = ogrinspect(shp_file, 'MyModel', multi_geom=True)
  75. self.assertIn('geom = models.MultiPolygonField()', model_def)
  76. # Same test with a 25D-type geometry field
  77. shp_file = os.path.join(TEST_DATA, 'gas_lines', 'gas_leitung.shp')
  78. model_def = ogrinspect(shp_file, 'MyModel', multi_geom=True)
  79. srid = '-1' if GDAL_VERSION < (2, 3) else '31253'
  80. self.assertIn('geom = models.MultiLineStringField(srid=%s)' % srid, model_def)
  81. def test_date_field(self):
  82. shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
  83. model_def = ogrinspect(shp_file, 'City')
  84. expected = [
  85. '# This is an auto-generated Django model module created by ogrinspect.',
  86. 'from django.contrib.gis.db import models',
  87. '',
  88. '',
  89. 'class City(models.Model):',
  90. ' name = models.CharField(max_length=80)',
  91. ' population = models.BigIntegerField()',
  92. ' density = models.FloatField()',
  93. ' created = models.DateField()',
  94. ' geom = models.PointField()',
  95. ]
  96. self.assertEqual(model_def, '\n'.join(expected))
  97. def test_time_field(self):
  98. # Getting the database identifier used by OGR, if None returned
  99. # GDAL does not have the support compiled in.
  100. ogr_db = get_ogr_db_string()
  101. if not ogr_db:
  102. self.skipTest("Unable to setup an OGR connection to your database")
  103. try:
  104. # Writing shapefiles via GDAL currently does not support writing OGRTime
  105. # fields, so we need to actually use a database
  106. model_def = ogrinspect(ogr_db, 'Measurement',
  107. layer_key=AllOGRFields._meta.db_table,
  108. decimal=['f_decimal'])
  109. except GDALException:
  110. self.skipTest("Unable to setup an OGR connection to your database")
  111. self.assertTrue(model_def.startswith(
  112. '# This is an auto-generated Django model module created by ogrinspect.\n'
  113. 'from django.contrib.gis.db import models\n'
  114. '\n'
  115. '\n'
  116. 'class Measurement(models.Model):\n'
  117. ))
  118. # The ordering of model fields might vary depending on several factors (version of GDAL, etc.)
  119. if connection.vendor == 'sqlite':
  120. # SpatiaLite introspection is somewhat lacking (#29461).
  121. self.assertIn(' f_decimal = models.CharField(max_length=0)', model_def)
  122. else:
  123. self.assertIn(' f_decimal = models.DecimalField(max_digits=0, decimal_places=0)', model_def)
  124. self.assertIn(' f_int = models.IntegerField()', model_def)
  125. if not connection.ops.mariadb:
  126. # Probably a bug between GDAL and MariaDB on time fields.
  127. self.assertIn(' f_datetime = models.DateTimeField()', model_def)
  128. self.assertIn(' f_time = models.TimeField()', model_def)
  129. if connection.vendor == 'sqlite':
  130. self.assertIn(' f_float = models.CharField(max_length=0)', model_def)
  131. else:
  132. self.assertIn(' f_float = models.FloatField()', model_def)
  133. max_length = 0 if connection.vendor == 'sqlite' else 10
  134. self.assertIn(' f_char = models.CharField(max_length=%s)' % max_length, model_def)
  135. self.assertIn(' f_date = models.DateField()', model_def)
  136. # Some backends may have srid=-1
  137. self.assertIsNotNone(re.search(r' geom = models.PolygonField\(([^\)])*\)', model_def))
  138. def test_management_command(self):
  139. shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
  140. out = StringIO()
  141. call_command('ogrinspect', shp_file, 'City', stdout=out)
  142. output = out.getvalue()
  143. self.assertIn('class City(models.Model):', output)
  144. def test_mapping_option(self):
  145. expected = (
  146. " geom = models.PointField()\n"
  147. "\n"
  148. "\n"
  149. "# Auto-generated `LayerMapping` dictionary for City model\n"
  150. "city_mapping = {\n"
  151. " 'name': 'Name',\n"
  152. " 'population': 'Population',\n"
  153. " 'density': 'Density',\n"
  154. " 'created': 'Created',\n"
  155. " 'geom': 'POINT',\n"
  156. "}\n")
  157. shp_file = os.path.join(TEST_DATA, 'cities', 'cities.shp')
  158. out = StringIO()
  159. call_command('ogrinspect', shp_file, '--mapping', 'City', stdout=out)
  160. self.assertIn(expected, out.getvalue())
  161. def get_ogr_db_string():
  162. """
  163. Construct the DB string that GDAL will use to inspect the database.
  164. GDAL will create its own connection to the database, so we re-use the
  165. connection settings from the Django test.
  166. """
  167. db = connections.databases['default']
  168. # Map from the django backend into the OGR driver name and database identifier
  169. # https://gdal.org/drivers/vector/
  170. #
  171. # TODO: Support Oracle (OCI).
  172. drivers = {
  173. 'django.contrib.gis.db.backends.postgis': ('PostgreSQL', "PG:dbname='%(db_name)s'", ' '),
  174. 'django.contrib.gis.db.backends.mysql': ('MySQL', 'MYSQL:"%(db_name)s"', ','),
  175. 'django.contrib.gis.db.backends.spatialite': ('SQLite', '%(db_name)s', '')
  176. }
  177. db_engine = db['ENGINE']
  178. if db_engine not in drivers:
  179. return None
  180. drv_name, db_str, param_sep = drivers[db_engine]
  181. # Ensure that GDAL library has driver support for the database.
  182. try:
  183. Driver(drv_name)
  184. except GDALException:
  185. return None
  186. # SQLite/SpatiaLite in-memory databases
  187. if db['NAME'] == ":memory:":
  188. return None
  189. # Build the params of the OGR database connection string
  190. params = [db_str % {'db_name': db['NAME']}]
  191. def add(key, template):
  192. value = db.get(key, None)
  193. # Don't add the parameter if it is not in django's settings
  194. if value:
  195. params.append(template % value)
  196. add('HOST', "host='%s'")
  197. add('PORT', "port='%s'")
  198. add('USER', "user='%s'")
  199. add('PASSWORD', "password='%s'")
  200. return param_sep.join(params)