123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391 |
- import json
- from django.contrib.gis.db.models.fields import BaseSpatialField
- from django.contrib.gis.db.models.functions import Distance
- from django.contrib.gis.db.models.lookups import DistanceLookupBase, GISLookup
- from django.contrib.gis.gdal import GDALRaster
- from django.contrib.gis.geos import GEOSGeometry
- from django.contrib.gis.measure import D
- from django.contrib.gis.shortcuts import numpy
- from django.db import connection
- from django.db.models import F, Func, Q
- from django.test import TransactionTestCase, skipUnlessDBFeature
- from django.test.utils import CaptureQueriesContext
- from ..data.rasters.textrasters import JSON_RASTER
- from .models import RasterModel, RasterRelatedModel
- @skipUnlessDBFeature('supports_raster')
- class RasterFieldTest(TransactionTestCase):
- available_apps = ['gis_tests.rasterapp']
- def setUp(self):
- rast = GDALRaster({
- "srid": 4326,
- "origin": [0, 0],
- "scale": [-1, 1],
- "skew": [0, 0],
- "width": 5,
- "height": 5,
- "nr_of_bands": 2,
- "bands": [{"data": range(25)}, {"data": range(25, 50)}],
- })
- model_instance = RasterModel.objects.create(
- rast=rast,
- rastprojected=rast,
- geom="POINT (-95.37040 29.70486)",
- )
- RasterRelatedModel.objects.create(rastermodel=model_instance)
- def test_field_null_value(self):
- """
- Test creating a model where the RasterField has a null value.
- """
- r = RasterModel.objects.create(rast=None)
- r.refresh_from_db()
- self.assertIsNone(r.rast)
- def test_access_band_data_directly_from_queryset(self):
- RasterModel.objects.create(rast=JSON_RASTER)
- qs = RasterModel.objects.all()
- qs[0].rast.bands[0].data()
- def test_deserialize_with_pixeltype_flags(self):
- no_data = 3
- rast = GDALRaster({
- 'srid': 4326,
- 'origin': [0, 0],
- 'scale': [-1, 1],
- 'skew': [0, 0],
- 'width': 1,
- 'height': 1,
- 'nr_of_bands': 1,
- 'bands': [{'data': [no_data], 'nodata_value': no_data}],
- })
- r = RasterModel.objects.create(rast=rast)
- RasterModel.objects.filter(pk=r.pk).update(
- rast=Func(F('rast'), function='ST_SetBandIsNoData'),
- )
- r.refresh_from_db()
- band = r.rast.bands[0].data()
- if numpy:
- band = band.flatten().tolist()
- self.assertEqual(band, [no_data])
- self.assertEqual(r.rast.bands[0].nodata_value, no_data)
- def test_model_creation(self):
- """
- Test RasterField through a test model.
- """
- # Create model instance from JSON raster
- r = RasterModel.objects.create(rast=JSON_RASTER)
- r.refresh_from_db()
- # Test raster metadata properties
- self.assertEqual((5, 5), (r.rast.width, r.rast.height))
- self.assertEqual([0.0, -1.0, 0.0, 0.0, 0.0, 1.0], r.rast.geotransform)
- self.assertIsNone(r.rast.bands[0].nodata_value)
- # Compare srs
- self.assertEqual(r.rast.srs.srid, 4326)
- # Compare pixel values
- band = r.rast.bands[0].data()
- # If numpy, convert result to list
- if numpy:
- band = band.flatten().tolist()
- # Loop through rows in band data and assert single
- # value is as expected.
- self.assertEqual(
- [
- 0.0, 1.0, 2.0, 3.0, 4.0,
- 5.0, 6.0, 7.0, 8.0, 9.0,
- 10.0, 11.0, 12.0, 13.0, 14.0,
- 15.0, 16.0, 17.0, 18.0, 19.0,
- 20.0, 21.0, 22.0, 23.0, 24.0
- ],
- band
- )
- def test_implicit_raster_transformation(self):
- """
- Test automatic transformation of rasters with srid different from the
- field srid.
- """
- # Parse json raster
- rast = json.loads(JSON_RASTER)
- # Update srid to another value
- rast['srid'] = 3086
- # Save model and get it from db
- r = RasterModel.objects.create(rast=rast)
- r.refresh_from_db()
- # Confirm raster has been transformed to the default srid
- self.assertEqual(r.rast.srs.srid, 4326)
- # Confirm geotransform is in lat/lon
- expected = [
- -87.9298551266551, 9.459646421449934e-06, 0.0, 23.94249275457565,
- 0.0, -9.459646421449934e-06,
- ]
- for val, exp in zip(r.rast.geotransform, expected):
- self.assertAlmostEqual(exp, val)
- def test_verbose_name_arg(self):
- """
- RasterField should accept a positional verbose name argument.
- """
- self.assertEqual(
- RasterModel._meta.get_field('rast').verbose_name,
- 'A Verbose Raster Name'
- )
- def test_all_gis_lookups_with_rasters(self):
- """
- Evaluate all possible lookups for all input combinations (i.e.
- raster-raster, raster-geom, geom-raster) and for projected and
- unprojected coordinate systems. This test just checks that the lookup
- can be called, but doesn't check if the result makes logical sense.
- """
- from django.contrib.gis.db.backends.postgis.operations import (
- PostGISOperations,
- )
- # Create test raster and geom.
- rast = GDALRaster(json.loads(JSON_RASTER))
- stx_pnt = GEOSGeometry('POINT (-95.370401017314293 29.704867409475465)', 4326)
- stx_pnt.transform(3086)
- lookups = [
- (name, lookup)
- for name, lookup in BaseSpatialField.get_lookups().items()
- if issubclass(lookup, GISLookup)
- ]
- self.assertNotEqual(lookups, [], 'No lookups found')
- # Loop through all the GIS lookups.
- for name, lookup in lookups:
- # Construct lookup filter strings.
- combo_keys = [
- field + name for field in [
- 'rast__', 'rast__', 'rastprojected__0__', 'rast__',
- 'rastprojected__', 'geom__', 'rast__',
- ]
- ]
- if issubclass(lookup, DistanceLookupBase):
- # Set lookup values for distance lookups.
- combo_values = [
- (rast, 50, 'spheroid'),
- (rast, 0, 50, 'spheroid'),
- (rast, 0, D(km=1)),
- (stx_pnt, 0, 500),
- (stx_pnt, D(km=1000)),
- (rast, 500),
- (json.loads(JSON_RASTER), 500),
- ]
- elif name == 'relate':
- # Set lookup values for the relate lookup.
- combo_values = [
- (rast, 'T*T***FF*'),
- (rast, 0, 'T*T***FF*'),
- (rast, 0, 'T*T***FF*'),
- (stx_pnt, 0, 'T*T***FF*'),
- (stx_pnt, 'T*T***FF*'),
- (rast, 'T*T***FF*'),
- (json.loads(JSON_RASTER), 'T*T***FF*'),
- ]
- elif name == 'isvalid':
- # The isvalid lookup doesn't make sense for rasters.
- continue
- elif PostGISOperations.gis_operators[name].func:
- # Set lookup values for all function based operators.
- combo_values = [
- rast, (rast, 0), (rast, 0), (stx_pnt, 0), stx_pnt,
- rast, json.loads(JSON_RASTER)
- ]
- else:
- # Override band lookup for these, as it's not supported.
- combo_keys[2] = 'rastprojected__' + name
- # Set lookup values for all other operators.
- combo_values = [rast, None, rast, stx_pnt, stx_pnt, rast, json.loads(JSON_RASTER)]
- # Create query filter combinations.
- self.assertEqual(
- len(combo_keys),
- len(combo_values),
- 'Number of lookup names and values should be the same',
- )
- combos = [x for x in zip(combo_keys, combo_values) if x[1]]
- self.assertEqual(
- [(n, x) for n, x in enumerate(combos) if x in combos[:n]],
- [],
- 'There are repeated test lookups',
- )
- combos = [{k: v} for k, v in combos]
- for combo in combos:
- # Apply this query filter.
- qs = RasterModel.objects.filter(**combo)
- # Evaluate normal filter qs.
- self.assertIn(qs.count(), [0, 1])
- # Evaluate on conditional Q expressions.
- qs = RasterModel.objects.filter(Q(**combos[0]) & Q(**combos[1]))
- self.assertIn(qs.count(), [0, 1])
- def test_dwithin_gis_lookup_output_with_rasters(self):
- """
- Check the logical functionality of the dwithin lookup for different
- input parameters.
- """
- # Create test raster and geom.
- rast = GDALRaster(json.loads(JSON_RASTER))
- stx_pnt = GEOSGeometry('POINT (-95.370401017314293 29.704867409475465)', 4326)
- stx_pnt.transform(3086)
- # Filter raster with different lookup raster formats.
- qs = RasterModel.objects.filter(rastprojected__dwithin=(rast, D(km=1)))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rastprojected__dwithin=(json.loads(JSON_RASTER), D(km=1)))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rastprojected__dwithin=(JSON_RASTER, D(km=1)))
- self.assertEqual(qs.count(), 1)
- # Filter in an unprojected coordinate system.
- qs = RasterModel.objects.filter(rast__dwithin=(rast, 40))
- self.assertEqual(qs.count(), 1)
- # Filter with band index transform.
- qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 1, 40))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rast__1__dwithin=(rast, 40))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rast__dwithin=(rast, 1, 40))
- self.assertEqual(qs.count(), 1)
- # Filter raster by geom.
- qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 500))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=10000)))
- self.assertEqual(qs.count(), 1)
- qs = RasterModel.objects.filter(rast__dwithin=(stx_pnt, 5))
- self.assertEqual(qs.count(), 0)
- qs = RasterModel.objects.filter(rastprojected__dwithin=(stx_pnt, D(km=100)))
- self.assertEqual(qs.count(), 0)
- # Filter geom by raster.
- qs = RasterModel.objects.filter(geom__dwithin=(rast, 500))
- self.assertEqual(qs.count(), 1)
- # Filter through related model.
- qs = RasterRelatedModel.objects.filter(rastermodel__rast__dwithin=(rast, 40))
- self.assertEqual(qs.count(), 1)
- # Filter through related model with band index transform
- qs = RasterRelatedModel.objects.filter(rastermodel__rast__1__dwithin=(rast, 40))
- self.assertEqual(qs.count(), 1)
- # Filter through conditional statements.
- qs = RasterModel.objects.filter(Q(rast__dwithin=(rast, 40)) & Q(rastprojected__dwithin=(stx_pnt, D(km=10000))))
- self.assertEqual(qs.count(), 1)
- # Filter through different lookup.
- qs = RasterModel.objects.filter(rastprojected__bbcontains=rast)
- self.assertEqual(qs.count(), 1)
- def test_lookup_input_tuple_too_long(self):
- rast = GDALRaster(json.loads(JSON_RASTER))
- msg = 'Tuple too long for lookup bbcontains.'
- with self.assertRaisesMessage(ValueError, msg):
- RasterModel.objects.filter(rast__bbcontains=(rast, 1, 2))
- def test_lookup_input_band_not_allowed(self):
- rast = GDALRaster(json.loads(JSON_RASTER))
- qs = RasterModel.objects.filter(rast__bbcontains=(rast, 1))
- msg = 'Band indices are not allowed for this operator, it works on bbox only.'
- with self.assertRaisesMessage(ValueError, msg):
- qs.count()
- def test_isvalid_lookup_with_raster_error(self):
- qs = RasterModel.objects.filter(rast__isvalid=True)
- msg = 'IsValid function requires a GeometryField in position 1, got RasterField.'
- with self.assertRaisesMessage(TypeError, msg):
- qs.count()
- def test_result_of_gis_lookup_with_rasters(self):
- # Point is in the interior
- qs = RasterModel.objects.filter(rast__contains=GEOSGeometry('POINT (-0.5 0.5)', 4326))
- self.assertEqual(qs.count(), 1)
- # Point is in the exterior
- qs = RasterModel.objects.filter(rast__contains=GEOSGeometry('POINT (0.5 0.5)', 4326))
- self.assertEqual(qs.count(), 0)
- # A point on the boundary is not contained properly
- qs = RasterModel.objects.filter(rast__contains_properly=GEOSGeometry('POINT (0 0)', 4326))
- self.assertEqual(qs.count(), 0)
- # Raster is located left of the point
- qs = RasterModel.objects.filter(rast__left=GEOSGeometry('POINT (1 0)', 4326))
- self.assertEqual(qs.count(), 1)
- def test_lookup_with_raster_bbox(self):
- rast = GDALRaster(json.loads(JSON_RASTER))
- # Shift raster upward
- rast.origin.y = 2
- # The raster in the model is not strictly below
- qs = RasterModel.objects.filter(rast__strictly_below=rast)
- self.assertEqual(qs.count(), 0)
- # Shift raster further upward
- rast.origin.y = 6
- # The raster in the model is strictly below
- qs = RasterModel.objects.filter(rast__strictly_below=rast)
- self.assertEqual(qs.count(), 1)
- def test_lookup_with_polygonized_raster(self):
- rast = GDALRaster(json.loads(JSON_RASTER))
- # Move raster to overlap with the model point on the left side
- rast.origin.x = -95.37040 + 1
- rast.origin.y = 29.70486
- # Raster overlaps with point in model
- qs = RasterModel.objects.filter(geom__intersects=rast)
- self.assertEqual(qs.count(), 1)
- # Change left side of raster to be nodata values
- rast.bands[0].data(data=[0, 0, 0, 1, 1], shape=(5, 1))
- rast.bands[0].nodata_value = 0
- qs = RasterModel.objects.filter(geom__intersects=rast)
- # Raster does not overlap anymore after polygonization
- # where the nodata zone is not included.
- self.assertEqual(qs.count(), 0)
- def test_lookup_value_error(self):
- # Test with invalid dict lookup parameter
- obj = {}
- msg = "Couldn't create spatial object from lookup value '%s'." % obj
- with self.assertRaisesMessage(ValueError, msg):
- RasterModel.objects.filter(geom__intersects=obj)
- # Test with invalid string lookup parameter
- obj = '00000'
- msg = "Couldn't create spatial object from lookup value '%s'." % obj
- with self.assertRaisesMessage(ValueError, msg):
- RasterModel.objects.filter(geom__intersects=obj)
- def test_db_function_errors(self):
- """
- Errors are raised when using DB functions with raster content.
- """
- point = GEOSGeometry("SRID=3086;POINT (-697024.9213808845 683729.1705516104)")
- rast = GDALRaster(json.loads(JSON_RASTER))
- msg = "Distance function requires a geometric argument in position 2."
- with self.assertRaisesMessage(TypeError, msg):
- RasterModel.objects.annotate(distance_from_point=Distance("geom", rast))
- with self.assertRaisesMessage(TypeError, msg):
- RasterModel.objects.annotate(distance_from_point=Distance("rastprojected", rast))
- msg = "Distance function requires a GeometryField in position 1, got RasterField."
- with self.assertRaisesMessage(TypeError, msg):
- RasterModel.objects.annotate(distance_from_point=Distance("rastprojected", point)).count()
- def test_lhs_with_index_rhs_without_index(self):
- with CaptureQueriesContext(connection) as queries:
- RasterModel.objects.filter(rast__0__contains=json.loads(JSON_RASTER)).exists()
- # It's easier to check the indexes in the generated SQL than to write
- # tests that cover all index combinations.
- self.assertRegex(queries[-1]['sql'], r'WHERE ST_Contains\([^)]*, 1, [^)]*, 1\)')
|