123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410 |
- import datetime
- import unittest
- from copy import copy
- from decimal import Decimal
- from pathlib import Path
- from django.conf import settings
- from django.contrib.gis.gdal import DataSource
- from django.contrib.gis.utils.layermapping import (
- InvalidDecimal, InvalidString, LayerMapError, LayerMapping,
- MissingForeignKey,
- )
- from django.db import connection
- from django.test import TestCase, override_settings
- from .models import (
- City, County, CountyFeat, DoesNotAllowNulls, HasNulls, ICity1, ICity2,
- Interstate, Invalid, State, city_mapping, co_mapping, cofeat_mapping,
- has_nulls_mapping, inter_mapping,
- )
- shp_path = Path(__file__).resolve().parent.parent / 'data'
- city_shp = shp_path / 'cities' / 'cities.shp'
- co_shp = shp_path / 'counties' / 'counties.shp'
- inter_shp = shp_path / 'interstates' / 'interstates.shp'
- invalid_shp = shp_path / 'invalid' / 'emptypoints.shp'
- has_nulls_geojson = shp_path / 'has_nulls' / 'has_nulls.geojson'
- NAMES = ['Bexar', 'Galveston', 'Harris', 'Honolulu', 'Pueblo']
- NUMS = [1, 2, 1, 19, 1]
- STATES = ['Texas', 'Texas', 'Texas', 'Hawaii', 'Colorado']
- class LayerMapTest(TestCase):
- def test_init(self):
- "Testing LayerMapping initialization."
-
- bad1 = copy(city_mapping)
- bad1['foobar'] = 'FooField'
-
- bad2 = copy(city_mapping)
- bad2['name'] = 'Nombre'
-
- bad3 = copy(city_mapping)
- bad3['point'] = 'CURVE'
-
-
- for bad_map in (bad1, bad2, bad3):
- with self.assertRaises(LayerMapError):
- LayerMapping(City, city_shp, bad_map)
-
- with self.assertRaises(LookupError):
- LayerMapping(City, city_shp, city_mapping, encoding='foobar')
- def test_simple_layermap(self):
- "Test LayerMapping import of a simple point shapefile."
-
- lm = LayerMapping(City, city_shp, city_mapping)
- lm.save()
-
- self.assertEqual(3, City.objects.count())
-
-
- ds = DataSource(city_shp)
- layer = ds[0]
- for feat in layer:
- city = City.objects.get(name=feat['Name'].value)
- self.assertEqual(feat['Population'].value, city.population)
- self.assertEqual(Decimal(str(feat['Density'])), city.density)
- self.assertEqual(feat['Created'].value, city.dt)
-
- pnt1, pnt2 = feat.geom, city.point
- self.assertAlmostEqual(pnt1.x, pnt2.x, 5)
- self.assertAlmostEqual(pnt1.y, pnt2.y, 5)
- def test_data_source_str(self):
- lm = LayerMapping(City, str(city_shp), city_mapping)
- lm.save()
- self.assertEqual(City.objects.count(), 3)
- def test_layermap_strict(self):
- "Testing the `strict` keyword, and import of a LineString shapefile."
-
-
- with self.assertRaises(InvalidDecimal):
- lm = LayerMapping(Interstate, inter_shp, inter_mapping)
- lm.save(silent=True, strict=True)
- Interstate.objects.all().delete()
-
- lm = LayerMapping(Interstate, inter_shp, inter_mapping)
- lm.save(silent=True)
-
- self.assertEqual(2, Interstate.objects.count())
-
- ds = DataSource(inter_shp)
-
- valid_feats = ds[0][:2]
- for feat in valid_feats:
- istate = Interstate.objects.get(name=feat['Name'].value)
- if feat.fid == 0:
- self.assertEqual(Decimal(str(feat['Length'])), istate.length)
- elif feat.fid == 1:
-
-
- self.assertAlmostEqual(feat.get('Length'), float(istate.length), 2)
- for p1, p2 in zip(feat.geom, istate.path):
- self.assertAlmostEqual(p1[0], p2[0], 6)
- self.assertAlmostEqual(p1[1], p2[1], 6)
- def county_helper(self, county_feat=True):
- "Helper function for ensuring the integrity of the mapped County models."
- for name, n, st in zip(NAMES, NUMS, STATES):
-
- c = County.objects.get(name=name)
- self.assertEqual(n, len(c.mpoly))
- self.assertEqual(st, c.state.name)
-
- if county_feat:
- qs = CountyFeat.objects.filter(name=name)
- self.assertEqual(n, qs.count())
- def test_layermap_unique_multigeometry_fk(self):
- "Testing the `unique`, and `transform`, geometry collection conversion, and ForeignKey mappings."
-
-
- lm = LayerMapping(County, co_shp, co_mapping, transform=False)
-
- lm = LayerMapping(County, co_shp, co_mapping, source_srs=4269)
- lm = LayerMapping(County, co_shp, co_mapping, source_srs='NAD83')
-
- for arg in ('name', ('name', 'mpoly')):
- lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique=arg)
-
-
- for e, arg in ((TypeError, 5.0), (ValueError, 'foobar'), (ValueError, ('name', 'mpolygon'))):
- with self.assertRaises(e):
- LayerMapping(County, co_shp, co_mapping, transform=False, unique=arg)
-
- if connection.features.supports_transform:
- with self.assertRaises(LayerMapError):
- LayerMapping(County, co_shp, co_mapping)
-
-
- bad_fk_map1 = copy(co_mapping)
- bad_fk_map1['state'] = 'name'
- bad_fk_map2 = copy(co_mapping)
- bad_fk_map2['state'] = {'nombre': 'State'}
- with self.assertRaises(TypeError):
- LayerMapping(County, co_shp, bad_fk_map1, transform=False)
- with self.assertRaises(LayerMapError):
- LayerMapping(County, co_shp, bad_fk_map2, transform=False)
-
-
-
- lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
- with self.assertRaises(MissingForeignKey):
- lm.save(silent=True, strict=True)
-
- State.objects.bulk_create([
- State(name='Colorado'), State(name='Hawaii'), State(name='Texas')
- ])
-
-
-
-
-
-
-
-
-
-
-
-
-
- lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
- lm.save(silent=True, strict=True)
-
-
- lm = LayerMapping(CountyFeat, co_shp, cofeat_mapping, transform=False)
- lm.save(silent=True, strict=True)
-
- self.county_helper()
- def test_test_fid_range_step(self):
- "Tests the `fid_range` keyword and the `step` keyword of .save()."
-
- def clear_counties():
- County.objects.all().delete()
- State.objects.bulk_create([
- State(name='Colorado'), State(name='Hawaii'), State(name='Texas')
- ])
-
- lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
-
- bad_ranges = (5.0, 'foo', co_shp)
- for bad in bad_ranges:
- with self.assertRaises(TypeError):
- lm.save(fid_range=bad)
-
- fr = (3, 5)
- with self.assertRaises(LayerMapError):
- lm.save(fid_range=fr, step=10)
- lm.save(fid_range=fr)
-
-
- qs = County.objects.all()
- self.assertEqual(1, qs.count())
- self.assertEqual('Galveston', qs[0].name)
-
-
- clear_counties()
- lm.save(fid_range=slice(5, None), silent=True, strict=True)
- lm.save(fid_range=slice(None, 1), silent=True, strict=True)
-
-
-
- qs = County.objects.order_by('name')
- self.assertEqual(2, qs.count())
- hi, co = tuple(qs)
- hi_idx, co_idx = tuple(map(NAMES.index, ('Honolulu', 'Pueblo')))
- self.assertEqual('Pueblo', co.name)
- self.assertEqual(NUMS[co_idx], len(co.mpoly))
- self.assertEqual('Honolulu', hi.name)
- self.assertEqual(NUMS[hi_idx], len(hi.mpoly))
-
-
-
- for st in (4, 7, 1000):
- clear_counties()
- lm.save(step=st, strict=True)
- self.county_helper(county_feat=False)
- def test_model_inheritance(self):
- "Tests LayerMapping on inherited models. See #12093."
- icity_mapping = {
- 'name': 'Name',
- 'population': 'Population',
- 'density': 'Density',
- 'point': 'POINT',
- 'dt': 'Created',
- }
-
- lm1 = LayerMapping(ICity1, city_shp, icity_mapping)
- lm1.save()
-
- lm2 = LayerMapping(ICity2, city_shp, icity_mapping)
- lm2.save()
- self.assertEqual(6, ICity1.objects.count())
- self.assertEqual(3, ICity2.objects.count())
- def test_invalid_layer(self):
- "Tests LayerMapping on invalid geometries. See #15378."
- invalid_mapping = {'point': 'POINT'}
- lm = LayerMapping(Invalid, invalid_shp, invalid_mapping,
- source_srs=4326)
- lm.save(silent=True)
- def test_charfield_too_short(self):
- mapping = copy(city_mapping)
- mapping['name_short'] = 'Name'
- lm = LayerMapping(City, city_shp, mapping)
- with self.assertRaises(InvalidString):
- lm.save(silent=True, strict=True)
- def test_textfield(self):
- "String content fits also in a TextField"
- mapping = copy(city_mapping)
- mapping['name_txt'] = 'Name'
- lm = LayerMapping(City, city_shp, mapping)
- lm.save(silent=True, strict=True)
- self.assertEqual(City.objects.count(), 3)
- self.assertEqual(City.objects.get(name='Houston').name_txt, "Houston")
- def test_encoded_name(self):
- """ Test a layer containing utf-8-encoded name """
- city_shp = shp_path / 'ch-city' / 'ch-city.shp'
- lm = LayerMapping(City, city_shp, city_mapping)
- lm.save(silent=True, strict=True)
- self.assertEqual(City.objects.count(), 1)
- self.assertEqual(City.objects.all()[0].name, "Zürich")
- def test_null_geom_with_unique(self):
- """LayerMapping may be created with a unique and a null geometry."""
- State.objects.bulk_create([State(name='Colorado'), State(name='Hawaii'), State(name='Texas')])
- hw = State.objects.get(name='Hawaii')
- hu = County.objects.create(name='Honolulu', state=hw, mpoly=None)
- lm = LayerMapping(County, co_shp, co_mapping, transform=False, unique='name')
- lm.save(silent=True, strict=True)
- hu.refresh_from_db()
- self.assertIsNotNone(hu.mpoly)
- self.assertEqual(hu.mpoly.ogr.num_coords, 449)
- def test_null_number_imported(self):
- """LayerMapping import of GeoJSON with a null numeric value."""
- lm = LayerMapping(HasNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save()
- self.assertEqual(HasNulls.objects.count(), 3)
- self.assertEqual(HasNulls.objects.filter(num=0).count(), 1)
- self.assertEqual(HasNulls.objects.filter(num__isnull=True).count(), 1)
- def test_null_string_imported(self):
- "Test LayerMapping import of GeoJSON with a null string value."
- lm = LayerMapping(HasNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save()
- self.assertEqual(HasNulls.objects.filter(name='None').count(), 0)
- num_empty = 1 if connection.features.interprets_empty_strings_as_nulls else 0
- self.assertEqual(HasNulls.objects.filter(name='').count(), num_empty)
- self.assertEqual(HasNulls.objects.filter(name__isnull=True).count(), 1)
- def test_nullable_boolean_imported(self):
- """LayerMapping import of GeoJSON with a nullable boolean value."""
- lm = LayerMapping(HasNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save()
- self.assertEqual(HasNulls.objects.filter(boolean=True).count(), 1)
- self.assertEqual(HasNulls.objects.filter(boolean=False).count(), 1)
- self.assertEqual(HasNulls.objects.filter(boolean__isnull=True).count(), 1)
- def test_nullable_datetime_imported(self):
- """LayerMapping import of GeoJSON with a nullable date/time value."""
- lm = LayerMapping(HasNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save()
- self.assertEqual(HasNulls.objects.filter(datetime__lt=datetime.date(1994, 8, 15)).count(), 1)
- self.assertEqual(HasNulls.objects.filter(datetime='2018-11-29T03:02:52').count(), 1)
- self.assertEqual(HasNulls.objects.filter(datetime__isnull=True).count(), 1)
- def test_uuids_imported(self):
- """LayerMapping import of GeoJSON with UUIDs."""
- lm = LayerMapping(HasNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save()
- self.assertEqual(HasNulls.objects.filter(uuid='1378c26f-cbe6-44b0-929f-eb330d4991f5').count(), 1)
- def test_null_number_imported_not_allowed(self):
- """
- LayerMapping import of GeoJSON with nulls to fields that don't permit
- them.
- """
- lm = LayerMapping(DoesNotAllowNulls, has_nulls_geojson, has_nulls_mapping)
- lm.save(silent=True)
-
-
-
-
-
- self.assertLessEqual(DoesNotAllowNulls.objects.count(), 1)
- class OtherRouter:
- def db_for_read(self, model, **hints):
- return 'other'
- def db_for_write(self, model, **hints):
- return self.db_for_read(model, **hints)
- def allow_relation(self, obj1, obj2, **hints):
-
-
-
- return True
- def allow_migrate(self, db, app_label, **hints):
- return True
- @override_settings(DATABASE_ROUTERS=[OtherRouter()])
- class LayerMapRouterTest(TestCase):
- databases = {'default', 'other'}
- @unittest.skipUnless(len(settings.DATABASES) > 1, 'multiple databases required')
- def test_layermapping_default_db(self):
- lm = LayerMapping(City, city_shp, city_mapping)
- self.assertEqual(lm.using, 'other')
|