tests.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. import os
  2. import unittest
  3. from copy import copy
  4. from decimal import Decimal
  5. from django.conf import settings
  6. from django.contrib.gis.gdal import HAS_GDAL
  7. from django.contrib.gis.geos import HAS_GEOS
  8. from django.db import connection
  9. from django.test import TestCase, override_settings, skipUnlessDBFeature
  10. if HAS_GEOS and HAS_GDAL:
  11. from django.contrib.gis.utils.layermapping import (
  12. LayerMapping, LayerMapError, InvalidDecimal, InvalidString,
  13. MissingForeignKey,
  14. )
  15. from django.contrib.gis.gdal import DataSource
  16. from .models import (
  17. City, County, CountyFeat, Interstate, ICity1, ICity2, Invalid, State,
  18. city_mapping, co_mapping, cofeat_mapping, inter_mapping,
  19. )
  20. shp_path = os.path.realpath(os.path.join(os.path.dirname(__file__), os.pardir, 'data'))
  21. city_shp = os.path.join(shp_path, 'cities', 'cities.shp')
  22. co_shp = os.path.join(shp_path, 'counties', 'counties.shp')
  23. inter_shp = os.path.join(shp_path, 'interstates', 'interstates.shp')
  24. invalid_shp = os.path.join(shp_path, 'invalid', 'emptypoints.shp')
  25. # Dictionaries to hold what's expected in the county shapefile.
  26. NAMES = ['Bexar', 'Galveston', 'Harris', 'Honolulu', 'Pueblo']
  27. NUMS = [1, 2, 1, 19, 1] # Number of polygons for each.
  28. STATES = ['Texas', 'Texas', 'Texas', 'Hawaii', 'Colorado']
  29. @skipUnlessDBFeature("gis_enabled")
  30. class LayerMapTest(TestCase):
  31. def test_init(self):
  32. "Testing LayerMapping initialization."
  33. # Model field that does not exist.
  34. bad1 = copy(city_mapping)
  35. bad1['foobar'] = 'FooField'
  36. # Shapefile field that does not exist.
  37. bad2 = copy(city_mapping)
  38. bad2['name'] = 'Nombre'
  39. # Nonexistent geographic field type.
  40. bad3 = copy(city_mapping)
  41. bad3['point'] = 'CURVE'
  42. # Incrementing through the bad mapping dictionaries and
  43. # ensuring that a LayerMapError is raised.
  44. for bad_map in (bad1, bad2, bad3):
  45. with self.assertRaises(LayerMapError):
  46. LayerMapping(City, city_shp, bad_map)
  47. # A LookupError should be thrown for bogus encodings.
  48. with self.assertRaises(LookupError):
  49. LayerMapping(City, city_shp, city_mapping, encoding='foobar')
  50. def test_simple_layermap(self):
  51. "Test LayerMapping import of a simple point shapefile."
  52. # Setting up for the LayerMapping.
  53. lm = LayerMapping(City, city_shp, city_mapping)
  54. lm.save()
  55. # There should be three cities in the shape file.
  56. self.assertEqual(3, City.objects.count())
  57. # Opening up the shapefile, and verifying the values in each
  58. # of the features made it to the model.
  59. ds = DataSource(city_shp)
  60. layer = ds[0]
  61. for feat in layer:
  62. city = City.objects.get(name=feat['Name'].value)
  63. self.assertEqual(feat['Population'].value, city.population)
  64. self.assertEqual(Decimal(str(feat['Density'])), city.density)
  65. self.assertEqual(feat['Created'].value, city.dt)
  66. # Comparing the geometries.
  67. pnt1, pnt2 = feat.geom, city.point
  68. self.assertAlmostEqual(pnt1.x, pnt2.x, 5)
  69. self.assertAlmostEqual(pnt1.y, pnt2.y, 5)
  70. def test_layermap_strict(self):
  71. "Testing the `strict` keyword, and import of a LineString shapefile."
  72. # When the `strict` keyword is set an error encountered will force
  73. # the importation to stop.
  74. with self.assertRaises(InvalidDecimal):
  75. lm = LayerMapping(Interstate, inter_shp, inter_mapping)
  76. lm.save(silent=True, strict=True)
  77. Interstate.objects.all().delete()
  78. # This LayerMapping should work b/c `strict` is not set.
  79. lm = LayerMapping(Interstate, inter_shp, inter_mapping)
  80. lm.save(silent=True)
  81. # Two interstate should have imported correctly.
  82. self.assertEqual(2, Interstate.objects.count())
  83. # Verifying the values in the layer w/the model.
  84. ds = DataSource(inter_shp)
  85. # Only the first two features of this shapefile are valid.
  86. valid_feats = ds[0][:2]
  87. for feat in valid_feats:
  88. istate = Interstate.objects.get(name=feat['Name'].value)
  89. if feat.fid == 0:
  90. self.assertEqual(Decimal(str(feat['Length'])), istate.length)
  91. elif feat.fid == 1:
  92. # Everything but the first two decimal digits were truncated,
  93. # because the Interstate model's `length` field has decimal_places=2.
  94. self.assertAlmostEqual(feat.get('Length'), float(istate.length), 2)
  95. for p1, p2 in zip(feat.geom, istate.path):
  96. self.assertAlmostEqual(p1[0], p2[0], 6)
  97. self.assertAlmostEqual(p1[1], p2[1], 6)
  98. def county_helper(self, county_feat=True):
  99. "Helper function for ensuring the integrity of the mapped County models."
  100. for name, n, st in zip(NAMES, NUMS, STATES):
  101. # Should only be one record b/c of `unique` keyword.
  102. c = County.objects.get(name=name)
  103. self.assertEqual(n, len(c.mpoly))
  104. self.assertEqual(st, c.state.name) # Checking ForeignKey mapping.
  105. # Multiple records because `unique` was not set.
  106. if county_feat:
  107. qs = CountyFeat.objects.filter(name=name)
  108. self.assertEqual(n, qs.count())
  109. def test_layermap_unique_multigeometry_fk(self):
  110. "Testing the `unique`, and `transform`, geometry collection conversion, and ForeignKey mappings."
  111. # All the following should work.
  112. # Telling LayerMapping that we want no transformations performed on the data.
  113. lm = LayerMapping(County, co_shp, co_mapping, transform=False)
  114. # Specifying the source spatial reference system via the `source_srs` keyword.
  115. lm = LayerMapping(County, co_shp, co_mapping, source_srs=4269)
  116. lm = LayerMapping(County, co_shp, co_mapping, source_srs='NAD83')
  117. # Unique may take tuple or string parameters.
  118. for arg in ('name', ('name', 'mpoly')):
  119. lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique=arg)
  120. # Now test for failures
  121. # Testing invalid params for the `unique` keyword.
  122. for e, arg in ((TypeError, 5.0), (ValueError, 'foobar'), (ValueError, ('name', 'mpolygon'))):
  123. with self.assertRaises(e):
  124. LayerMapping(County, co_shp, co_mapping, transform=False, unique=arg)
  125. # No source reference system defined in the shapefile, should raise an error.
  126. if connection.features.supports_transform:
  127. with self.assertRaises(LayerMapError):
  128. LayerMapping(County, co_shp, co_mapping)
  129. # Passing in invalid ForeignKey mapping parameters -- must be a dictionary
  130. # mapping for the model the ForeignKey points to.
  131. bad_fk_map1 = copy(co_mapping)
  132. bad_fk_map1['state'] = 'name'
  133. bad_fk_map2 = copy(co_mapping)
  134. bad_fk_map2['state'] = {'nombre': 'State'}
  135. with self.assertRaises(TypeError):
  136. LayerMapping(County, co_shp, bad_fk_map1, transform=False)
  137. with self.assertRaises(LayerMapError):
  138. LayerMapping(County, co_shp, bad_fk_map2, transform=False)
  139. # There exist no State models for the ForeignKey mapping to work -- should raise
  140. # a MissingForeignKey exception (this error would be ignored if the `strict`
  141. # keyword is not set).
  142. lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
  143. with self.assertRaises(MissingForeignKey):
  144. lm.save(silent=True, strict=True)
  145. # Now creating the state models so the ForeignKey mapping may work.
  146. State.objects.bulk_create([
  147. State(name='Colorado'), State(name='Hawaii'), State(name='Texas')
  148. ])
  149. # If a mapping is specified as a collection, all OGR fields that
  150. # are not collections will be converted into them. For example,
  151. # a Point column would be converted to MultiPoint. Other things being done
  152. # w/the keyword args:
  153. # `transform=False`: Specifies that no transform is to be done; this
  154. # has the effect of ignoring the spatial reference check (because the
  155. # county shapefile does not have implicit spatial reference info).
  156. #
  157. # `unique='name'`: Creates models on the condition that they have
  158. # unique county names; geometries from each feature however will be
  159. # appended to the geometry collection of the unique model. Thus,
  160. # all of the various islands in Honolulu county will be in in one
  161. # database record with a MULTIPOLYGON type.
  162. lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
  163. lm.save(silent=True, strict=True)
  164. # A reference that doesn't use the unique keyword; a new database record will
  165. # created for each polygon.
  166. lm = LayerMapping(CountyFeat, co_shp, cofeat_mapping, transform=False)
  167. lm.save(silent=True, strict=True)
  168. # The county helper is called to ensure integrity of County models.
  169. self.county_helper()
  170. def test_test_fid_range_step(self):
  171. "Tests the `fid_range` keyword and the `step` keyword of .save()."
  172. # Function for clearing out all the counties before testing.
  173. def clear_counties():
  174. County.objects.all().delete()
  175. State.objects.bulk_create([
  176. State(name='Colorado'), State(name='Hawaii'), State(name='Texas')
  177. ])
  178. # Initializing the LayerMapping object to use in these tests.
  179. lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
  180. # Bad feature id ranges should raise a type error.
  181. bad_ranges = (5.0, 'foo', co_shp)
  182. for bad in bad_ranges:
  183. with self.assertRaises(TypeError):
  184. lm.save(fid_range=bad)
  185. # Step keyword should not be allowed w/`fid_range`.
  186. fr = (3, 5) # layer[3:5]
  187. with self.assertRaises(LayerMapError):
  188. lm.save(fid_range=fr, step=10)
  189. lm.save(fid_range=fr)
  190. # Features IDs 3 & 4 are for Galveston County, Texas -- only
  191. # one model is returned because the `unique` keyword was set.
  192. qs = County.objects.all()
  193. self.assertEqual(1, qs.count())
  194. self.assertEqual('Galveston', qs[0].name)
  195. # Features IDs 5 and beyond for Honolulu County, Hawaii, and
  196. # FID 0 is for Pueblo County, Colorado.
  197. clear_counties()
  198. lm.save(fid_range=slice(5, None), silent=True, strict=True) # layer[5:]
  199. lm.save(fid_range=slice(None, 1), silent=True, strict=True) # layer[:1]
  200. # Only Pueblo & Honolulu counties should be present because of
  201. # the `unique` keyword. Have to set `order_by` on this QuerySet
  202. # or else MySQL will return a different ordering than the other dbs.
  203. qs = County.objects.order_by('name')
  204. self.assertEqual(2, qs.count())
  205. hi, co = tuple(qs)
  206. hi_idx, co_idx = tuple(map(NAMES.index, ('Honolulu', 'Pueblo')))
  207. self.assertEqual('Pueblo', co.name)
  208. self.assertEqual(NUMS[co_idx], len(co.mpoly))
  209. self.assertEqual('Honolulu', hi.name)
  210. self.assertEqual(NUMS[hi_idx], len(hi.mpoly))
  211. # Testing the `step` keyword -- should get the same counties
  212. # regardless of we use a step that divides equally, that is odd,
  213. # or that is larger than the dataset.
  214. for st in (4, 7, 1000):
  215. clear_counties()
  216. lm.save(step=st, strict=True)
  217. self.county_helper(county_feat=False)
  218. def test_model_inheritance(self):
  219. "Tests LayerMapping on inherited models. See #12093."
  220. icity_mapping = {'name': 'Name',
  221. 'population': 'Population',
  222. 'density': 'Density',
  223. 'point': 'POINT',
  224. 'dt': 'Created',
  225. }
  226. # Parent model has geometry field.
  227. lm1 = LayerMapping(ICity1, city_shp, icity_mapping)
  228. lm1.save()
  229. # Grandparent has geometry field.
  230. lm2 = LayerMapping(ICity2, city_shp, icity_mapping)
  231. lm2.save()
  232. self.assertEqual(6, ICity1.objects.count())
  233. self.assertEqual(3, ICity2.objects.count())
  234. def test_invalid_layer(self):
  235. "Tests LayerMapping on invalid geometries. See #15378."
  236. invalid_mapping = {'point': 'POINT'}
  237. lm = LayerMapping(Invalid, invalid_shp, invalid_mapping,
  238. source_srs=4326)
  239. lm.save(silent=True)
  240. def test_charfield_too_short(self):
  241. mapping = copy(city_mapping)
  242. mapping['name_short'] = 'Name'
  243. lm = LayerMapping(City, city_shp, mapping)
  244. with self.assertRaises(InvalidString):
  245. lm.save(silent=True, strict=True)
  246. def test_textfield(self):
  247. "String content fits also in a TextField"
  248. mapping = copy(city_mapping)
  249. mapping['name_txt'] = 'Name'
  250. lm = LayerMapping(City, city_shp, mapping)
  251. lm.save(silent=True, strict=True)
  252. self.assertEqual(City.objects.count(), 3)
  253. self.assertEqual(City.objects.get(name='Houston').name_txt, "Houston")
  254. def test_encoded_name(self):
  255. """ Test a layer containing utf-8-encoded name """
  256. city_shp = os.path.join(shp_path, 'ch-city', 'ch-city.shp')
  257. lm = LayerMapping(City, city_shp, city_mapping)
  258. lm.save(silent=True, strict=True)
  259. self.assertEqual(City.objects.count(), 1)
  260. self.assertEqual(City.objects.all()[0].name, "Zürich")
  261. class OtherRouter:
  262. def db_for_read(self, model, **hints):
  263. return 'other'
  264. def db_for_write(self, model, **hints):
  265. return self.db_for_read(model, **hints)
  266. def allow_relation(self, obj1, obj2, **hints):
  267. return None
  268. def allow_migrate(self, db, app_label, **hints):
  269. return True
  270. @skipUnlessDBFeature("gis_enabled")
  271. @override_settings(DATABASE_ROUTERS=[OtherRouter()])
  272. class LayerMapRouterTest(TestCase):
  273. @unittest.skipUnless(len(settings.DATABASES) > 1, 'multiple databases required')
  274. def test_layermapping_default_db(self):
  275. lm = LayerMapping(City, city_shp, city_mapping)
  276. self.assertEqual(lm.using, 'other')