123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209 |
- import operator
- import uuid
- from unittest import mock
- from django import forms
- from django.core import serializers
- from django.core.exceptions import ValidationError
- from django.core.serializers.json import DjangoJSONEncoder
- from django.db import (
- DataError,
- IntegrityError,
- NotSupportedError,
- OperationalError,
- connection,
- models,
- transaction,
- )
- from django.db.models import (
- Count,
- ExpressionWrapper,
- F,
- IntegerField,
- JSONField,
- OuterRef,
- Q,
- Subquery,
- Transform,
- Value,
- )
- from django.db.models.expressions import RawSQL
- from django.db.models.fields.json import (
- KT,
- HasKey,
- KeyTextTransform,
- KeyTransform,
- KeyTransformFactory,
- KeyTransformTextLookupMixin,
- )
- from django.db.models.functions import Cast
- from django.test import SimpleTestCase, TestCase, skipIfDBFeature, skipUnlessDBFeature
- from django.test.utils import CaptureQueriesContext
- from .models import (
- CustomJSONDecoder,
- CustomSerializationJSONModel,
- JSONModel,
- NullableJSONModel,
- RelatedJSONModel,
- )
- @skipUnlessDBFeature("supports_json_field")
- class JSONFieldTests(TestCase):
- def test_invalid_value(self):
- msg = "is not JSON serializable"
- with self.assertRaisesMessage(TypeError, msg):
- NullableJSONModel.objects.create(
- value={
- "uuid": uuid.UUID("d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475"),
- }
- )
- def test_custom_encoder_decoder(self):
- value = {"uuid": uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}")}
- obj = NullableJSONModel(value_custom=value)
- obj.clean_fields()
- obj.save()
- obj.refresh_from_db()
- self.assertEqual(obj.value_custom, value)
- def test_db_check_constraints(self):
- value = "{@!invalid json value 123 $!@#"
- with mock.patch.object(DjangoJSONEncoder, "encode", return_value=value):
- with self.assertRaises((IntegrityError, DataError, OperationalError)):
- NullableJSONModel.objects.create(value_custom=value)
- class TestMethods(SimpleTestCase):
- def test_deconstruct(self):
- field = models.JSONField()
- name, path, args, kwargs = field.deconstruct()
- self.assertEqual(path, "django.db.models.JSONField")
- self.assertEqual(args, [])
- self.assertEqual(kwargs, {})
- def test_deconstruct_custom_encoder_decoder(self):
- field = models.JSONField(encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder)
- name, path, args, kwargs = field.deconstruct()
- self.assertEqual(kwargs["encoder"], DjangoJSONEncoder)
- self.assertEqual(kwargs["decoder"], CustomJSONDecoder)
- def test_get_transforms(self):
- @models.JSONField.register_lookup
- class MyTransform(Transform):
- lookup_name = "my_transform"
- field = models.JSONField()
- transform = field.get_transform("my_transform")
- self.assertIs(transform, MyTransform)
- models.JSONField._unregister_lookup(MyTransform)
- transform = field.get_transform("my_transform")
- self.assertIsInstance(transform, KeyTransformFactory)
- def test_key_transform_text_lookup_mixin_non_key_transform(self):
- transform = Transform("test")
- msg = (
- "Transform should be an instance of KeyTransform in order to use "
- "this lookup."
- )
- with self.assertRaisesMessage(TypeError, msg):
- KeyTransformTextLookupMixin(transform)
- def test_get_prep_value(self):
- class JSONFieldGetPrepValue(models.JSONField):
- def get_prep_value(self, value):
- if value is True:
- return {"value": True}
- return value
- def noop_adapt_json_value(value, encoder):
- return value
- field = JSONFieldGetPrepValue()
- with mock.patch.object(
- connection.ops, "adapt_json_value", noop_adapt_json_value
- ):
- self.assertEqual(
- field.get_db_prep_value(True, connection, prepared=False),
- {"value": True},
- )
- self.assertIs(
- field.get_db_prep_value(True, connection, prepared=True), True
- )
- self.assertEqual(field.get_db_prep_value(1, connection, prepared=False), 1)
- class TestValidation(SimpleTestCase):
- def test_invalid_encoder(self):
- msg = "The encoder parameter must be a callable object."
- with self.assertRaisesMessage(ValueError, msg):
- models.JSONField(encoder=DjangoJSONEncoder())
- def test_invalid_decoder(self):
- msg = "The decoder parameter must be a callable object."
- with self.assertRaisesMessage(ValueError, msg):
- models.JSONField(decoder=CustomJSONDecoder())
- def test_validation_error(self):
- field = models.JSONField()
- msg = "Value must be valid JSON."
- value = uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}")
- with self.assertRaisesMessage(ValidationError, msg):
- field.clean({"uuid": value}, None)
- def test_custom_encoder(self):
- field = models.JSONField(encoder=DjangoJSONEncoder)
- value = uuid.UUID("{d85e2076-b67c-4ee7-8c3a-2bf5a2cc2475}")
- field.clean({"uuid": value}, None)
- class TestFormField(SimpleTestCase):
- def test_formfield(self):
- model_field = models.JSONField()
- form_field = model_field.formfield()
- self.assertIsInstance(form_field, forms.JSONField)
- def test_formfield_custom_encoder_decoder(self):
- model_field = models.JSONField(
- encoder=DjangoJSONEncoder, decoder=CustomJSONDecoder
- )
- form_field = model_field.formfield()
- self.assertIs(form_field.encoder, DjangoJSONEncoder)
- self.assertIs(form_field.decoder, CustomJSONDecoder)
- class TestSerialization(SimpleTestCase):
- test_data = (
- '[{"fields": {"value": %s}, "model": "model_fields.jsonmodel", "pk": null}]'
- )
- test_values = (
- # (Python value, serialized value),
- ({"a": "b", "c": None}, '{"a": "b", "c": null}'),
- ("abc", '"abc"'),
- ('{"a": "a"}', '"{\\"a\\": \\"a\\"}"'),
- )
- def test_dumping(self):
- for value, serialized in self.test_values:
- with self.subTest(value=value):
- instance = JSONModel(value=value)
- data = serializers.serialize("json", [instance])
- self.assertJSONEqual(data, self.test_data % serialized)
- def test_loading(self):
- for value, serialized in self.test_values:
- with self.subTest(value=value):
- instance = list(
- serializers.deserialize("json", self.test_data % serialized)
- )[0].object
- self.assertEqual(instance.value, value)
- def test_xml_serialization(self):
- test_xml_data = (
- '<django-objects version="1.0">'
- '<object model="model_fields.nullablejsonmodel">'
- '<field name="value" type="JSONField">%s'
- "</field></object></django-objects>"
- )
- for value, serialized in self.test_values:
- with self.subTest(value=value):
- instance = NullableJSONModel(value=value)
- data = serializers.serialize("xml", [instance], fields=["value"])
- self.assertXMLEqual(data, test_xml_data % serialized)
- new_instance = list(serializers.deserialize("xml", data))[0].object
- self.assertEqual(new_instance.value, instance.value)
- @skipUnlessDBFeature("supports_json_field")
- class TestSaveLoad(TestCase):
- def test_null(self):
- obj = NullableJSONModel(value=None)
- obj.save()
- obj.refresh_from_db()
- self.assertIsNone(obj.value)
- @skipUnlessDBFeature("supports_primitives_in_json_field")
- def test_json_null_different_from_sql_null(self):
- json_null = NullableJSONModel.objects.create(value=Value(None, JSONField()))
- NullableJSONModel.objects.update(value=Value(None, JSONField()))
- json_null.refresh_from_db()
- sql_null = NullableJSONModel.objects.create(value=None)
- sql_null.refresh_from_db()
- # 'null' is not equal to NULL in the database.
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value=Value(None, JSONField())),
- [json_null],
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value=None),
- [json_null],
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__isnull=True),
- [sql_null],
- )
- # 'null' is equal to NULL in Python (None).
- self.assertEqual(json_null.value, sql_null.value)
- @skipUnlessDBFeature("supports_primitives_in_json_field")
- def test_primitives(self):
- values = [
- True,
- 1,
- 1.45,
- "String",
- "",
- ]
- for value in values:
- with self.subTest(value=value):
- obj = JSONModel(value=value)
- obj.save()
- obj.refresh_from_db()
- self.assertEqual(obj.value, value)
- def test_dict(self):
- values = [
- {},
- {"name": "John", "age": 20, "height": 180.3},
- {"a": True, "b": {"b1": False, "b2": None}},
- ]
- for value in values:
- with self.subTest(value=value):
- obj = JSONModel.objects.create(value=value)
- obj.refresh_from_db()
- self.assertEqual(obj.value, value)
- def test_list(self):
- values = [
- [],
- ["John", 20, 180.3],
- [True, [False, None]],
- ]
- for value in values:
- with self.subTest(value=value):
- obj = JSONModel.objects.create(value=value)
- obj.refresh_from_db()
- self.assertEqual(obj.value, value)
- def test_realistic_object(self):
- value = {
- "name": "John",
- "age": 20,
- "pets": [
- {"name": "Kit", "type": "cat", "age": 2},
- {"name": "Max", "type": "dog", "age": 1},
- ],
- "courses": [
- ["A1", "A2", "A3"],
- ["B1", "B2"],
- ["C1"],
- ],
- }
- obj = JSONModel.objects.create(value=value)
- obj.refresh_from_db()
- self.assertEqual(obj.value, value)
- def test_bulk_update_custom_get_prep_value(self):
- objs = CustomSerializationJSONModel.objects.bulk_create(
- [CustomSerializationJSONModel(pk=1, json_field={"version": "1"})]
- )
- objs[0].json_field["version"] = "1-alpha"
- CustomSerializationJSONModel.objects.bulk_update(objs, ["json_field"])
- self.assertSequenceEqual(
- CustomSerializationJSONModel.objects.values("json_field"),
- [{"json_field": '{"version": "1-alpha"}'}],
- )
- @skipUnlessDBFeature("supports_json_field")
- class TestQuerying(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.primitives = [True, False, "yes", 7, 9.6]
- values = [
- None,
- [],
- {},
- {"a": "b", "c": 14},
- {
- "a": "b",
- "c": 14,
- "d": ["e", {"f": "g"}],
- "h": True,
- "i": False,
- "j": None,
- "k": {"l": "m"},
- "n": [None, True, False],
- "o": '"quoted"',
- "p": 4.2,
- "r": {"s": True, "t": False},
- },
- [1, [2]],
- {"k": True, "l": False, "foo": "bax"},
- {
- "foo": "bar",
- "baz": {"a": "b", "c": "d"},
- "bar": ["foo", "bar"],
- "bax": {"foo": "bar"},
- },
- ]
- cls.objs = [NullableJSONModel.objects.create(value=value) for value in values]
- if connection.features.supports_primitives_in_json_field:
- cls.objs.extend(
- [
- NullableJSONModel.objects.create(value=value)
- for value in cls.primitives
- ]
- )
- cls.raw_sql = "%s::jsonb" if connection.vendor == "postgresql" else "%s"
- def test_exact(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__exact={}),
- [self.objs[2]],
- )
- def test_exact_complex(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__exact={"a": "b", "c": 14}),
- [self.objs[3]],
- )
- def test_icontains(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__icontains="BaX"),
- self.objs[6:8],
- )
- def test_isnull(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__isnull=True),
- [self.objs[0]],
- )
- def test_ordering_by_transform(self):
- mariadb = connection.vendor == "mysql" and connection.mysql_is_mariadb
- values = [
- {"ord": 93, "name": "bar"},
- {"ord": 22.1, "name": "foo"},
- {"ord": -1, "name": "baz"},
- {"ord": 21.931902, "name": "spam"},
- {"ord": -100291029, "name": "eggs"},
- ]
- for field_name in ["value", "value_custom"]:
- with self.subTest(field=field_name):
- objs = [
- NullableJSONModel.objects.create(**{field_name: value})
- for value in values
- ]
- query = NullableJSONModel.objects.filter(
- **{"%s__name__isnull" % field_name: False},
- ).order_by("%s__ord" % field_name)
- expected = [objs[4], objs[2], objs[3], objs[1], objs[0]]
- if mariadb or connection.vendor == "oracle":
- # MariaDB and Oracle return JSON values as strings.
- expected = [objs[2], objs[4], objs[3], objs[1], objs[0]]
- self.assertSequenceEqual(query, expected)
- def test_ordering_grouping_by_key_transform(self):
- base_qs = NullableJSONModel.objects.filter(value__d__0__isnull=False)
- for qs in (
- base_qs.order_by("value__d__0"),
- base_qs.annotate(
- key=KeyTransform("0", KeyTransform("d", "value"))
- ).order_by("key"),
- ):
- self.assertSequenceEqual(qs, [self.objs[4]])
- none_val = "" if connection.features.interprets_empty_strings_as_nulls else None
- qs = NullableJSONModel.objects.filter(value__isnull=False)
- self.assertQuerySetEqual(
- qs.filter(value__isnull=False)
- .annotate(key=KT("value__d__1__f"))
- .values("key")
- .annotate(count=Count("key"))
- .order_by("count"),
- [(none_val, 0), ("g", 1)],
- operator.itemgetter("key", "count"),
- )
- def test_ordering_grouping_by_count(self):
- qs = (
- NullableJSONModel.objects.filter(
- value__isnull=False,
- )
- .values("value__d__0")
- .annotate(count=Count("value__d__0"))
- .order_by("count")
- )
- self.assertQuerySetEqual(qs, [0, 1], operator.itemgetter("count"))
- def test_order_grouping_custom_decoder(self):
- NullableJSONModel.objects.create(value_custom={"a": "b"})
- qs = NullableJSONModel.objects.filter(value_custom__isnull=False)
- self.assertSequenceEqual(
- qs.values(
- "value_custom__a",
- )
- .annotate(
- count=Count("id"),
- )
- .order_by("value_custom__a"),
- [{"value_custom__a": "b", "count": 1}],
- )
- def test_key_transform_raw_expression(self):
- expr = RawSQL(self.raw_sql, ['{"x": "bar"}'])
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__foo=KeyTransform("x", expr)),
- [self.objs[7]],
- )
- def test_nested_key_transform_raw_expression(self):
- expr = RawSQL(self.raw_sql, ['{"x": {"y": "bar"}}'])
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(
- value__foo=KeyTransform("y", KeyTransform("x", expr))
- ),
- [self.objs[7]],
- )
- def test_key_transform_expression(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__0__isnull=False)
- .annotate(
- key=KeyTransform("d", "value"),
- chain=KeyTransform("0", "key"),
- expr=KeyTransform("0", Cast("key", models.JSONField())),
- )
- .filter(chain=F("expr")),
- [self.objs[4]],
- )
- def test_key_transform_annotation_expression(self):
- obj = NullableJSONModel.objects.create(value={"d": ["e", "e"]})
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__0__isnull=False)
- .annotate(
- key=F("value__d"),
- chain=F("key__0"),
- expr=Cast("key", models.JSONField()),
- )
- .filter(chain=F("expr__1")),
- [obj],
- )
- def test_nested_key_transform_expression(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__0__isnull=False)
- .annotate(
- key=KeyTransform("d", "value"),
- chain=KeyTransform("f", KeyTransform("1", "key")),
- expr=KeyTransform(
- "f", KeyTransform("1", Cast("key", models.JSONField()))
- ),
- )
- .filter(chain=F("expr")),
- [self.objs[4]],
- )
- def test_nested_key_transform_annotation_expression(self):
- obj = NullableJSONModel.objects.create(
- value={"d": ["e", {"f": "g"}, {"f": "g"}]},
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__0__isnull=False)
- .annotate(
- key=F("value__d"),
- chain=F("key__1__f"),
- expr=Cast("key", models.JSONField()),
- )
- .filter(chain=F("expr__2__f")),
- [obj],
- )
- def test_nested_key_transform_on_subquery(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__0__isnull=False)
- .annotate(
- subquery_value=Subquery(
- NullableJSONModel.objects.filter(pk=OuterRef("pk")).values("value")
- ),
- key=KeyTransform("d", "subquery_value"),
- chain=KeyTransform("f", KeyTransform("1", "key")),
- )
- .filter(chain="g"),
- [self.objs[4]],
- )
- def test_key_text_transform_char_lookup(self):
- qs = NullableJSONModel.objects.annotate(
- char_value=KeyTextTransform("foo", "value"),
- ).filter(char_value__startswith="bar")
- self.assertSequenceEqual(qs, [self.objs[7]])
- qs = NullableJSONModel.objects.annotate(
- char_value=KeyTextTransform(1, KeyTextTransform("bar", "value")),
- ).filter(char_value__startswith="bar")
- self.assertSequenceEqual(qs, [self.objs[7]])
- def test_expression_wrapper_key_transform(self):
- self.assertCountEqual(
- NullableJSONModel.objects.annotate(
- expr=ExpressionWrapper(
- KeyTransform("c", "value"),
- output_field=IntegerField(),
- ),
- ).filter(expr__isnull=False),
- self.objs[3:5],
- )
- def test_has_key(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__has_key="a"),
- [self.objs[3], self.objs[4]],
- )
- def test_has_key_null_value(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__has_key="j"),
- [self.objs[4]],
- )
- def test_has_key_deep(self):
- tests = [
- (Q(value__baz__has_key="a"), self.objs[7]),
- (
- Q(value__has_key=KeyTransform("a", KeyTransform("baz", "value"))),
- self.objs[7],
- ),
- (Q(value__has_key=F("value__baz__a")), self.objs[7]),
- (
- Q(value__has_key=KeyTransform("c", KeyTransform("baz", "value"))),
- self.objs[7],
- ),
- (Q(value__has_key=F("value__baz__c")), self.objs[7]),
- (Q(value__d__1__has_key="f"), self.objs[4]),
- (
- Q(
- value__has_key=KeyTransform(
- "f", KeyTransform("1", KeyTransform("d", "value"))
- )
- ),
- self.objs[4],
- ),
- (Q(value__has_key=F("value__d__1__f")), self.objs[4]),
- ]
- for condition, expected in tests:
- with self.subTest(condition=condition):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(condition),
- [expected],
- )
- def test_has_key_literal_lookup(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(
- HasKey(Value({"foo": "bar"}, JSONField()), "foo")
- ).order_by("id"),
- self.objs,
- )
- def test_has_key_list(self):
- obj = NullableJSONModel.objects.create(value=[{"a": 1}, {"b": "x"}])
- tests = [
- Q(value__1__has_key="b"),
- Q(value__has_key=KeyTransform("b", KeyTransform(1, "value"))),
- Q(value__has_key=KeyTransform("b", KeyTransform("1", "value"))),
- Q(value__has_key=F("value__1__b")),
- ]
- for condition in tests:
- with self.subTest(condition=condition):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(condition),
- [obj],
- )
- def test_has_keys(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__has_keys=["a", "c", "h"]),
- [self.objs[4]],
- )
- def test_has_any_keys(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__has_any_keys=["c", "l"]),
- [self.objs[3], self.objs[4], self.objs[6]],
- )
- def test_has_key_number(self):
- obj = NullableJSONModel.objects.create(
- value={
- "123": "value",
- "nested": {"456": "bar", "lorem": "abc", "999": True},
- "array": [{"789": "baz", "777": "def", "ipsum": 200}],
- "000": "val",
- }
- )
- tests = [
- Q(value__has_key="123"),
- Q(value__nested__has_key="456"),
- Q(value__array__0__has_key="789"),
- Q(value__has_keys=["nested", "123", "array", "000"]),
- Q(value__nested__has_keys=["lorem", "999", "456"]),
- Q(value__array__0__has_keys=["789", "ipsum", "777"]),
- Q(value__has_any_keys=["000", "nonexistent"]),
- Q(value__nested__has_any_keys=["999", "nonexistent"]),
- Q(value__array__0__has_any_keys=["777", "nonexistent"]),
- ]
- for condition in tests:
- with self.subTest(condition=condition):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(condition),
- [obj],
- )
- @skipUnlessDBFeature("supports_json_field_contains")
- def test_contains(self):
- tests = [
- ({}, self.objs[2:5] + self.objs[6:8]),
- ({"baz": {"a": "b", "c": "d"}}, [self.objs[7]]),
- ({"baz": {"a": "b"}}, [self.objs[7]]),
- ({"baz": {"c": "d"}}, [self.objs[7]]),
- ({"k": True, "l": False}, [self.objs[6]]),
- ({"d": ["e", {"f": "g"}]}, [self.objs[4]]),
- ({"d": ["e"]}, [self.objs[4]]),
- ({"d": [{"f": "g"}]}, [self.objs[4]]),
- ([1, [2]], [self.objs[5]]),
- ([1], [self.objs[5]]),
- ([[2]], [self.objs[5]]),
- ({"n": [None, True, False]}, [self.objs[4]]),
- ({"j": None}, [self.objs[4]]),
- ]
- for value, expected in tests:
- with self.subTest(value=value):
- qs = NullableJSONModel.objects.filter(value__contains=value)
- self.assertCountEqual(qs, expected)
- @skipIfDBFeature("supports_json_field_contains")
- def test_contains_unsupported(self):
- msg = "contains lookup is not supported on this database backend."
- with self.assertRaisesMessage(NotSupportedError, msg):
- NullableJSONModel.objects.filter(
- value__contains={"baz": {"a": "b", "c": "d"}},
- ).get()
- @skipUnlessDBFeature(
- "supports_primitives_in_json_field",
- "supports_json_field_contains",
- )
- def test_contains_primitives(self):
- for value in self.primitives:
- with self.subTest(value=value):
- qs = NullableJSONModel.objects.filter(value__contains=value)
- self.assertIs(qs.exists(), True)
- @skipUnlessDBFeature("supports_json_field_contains")
- def test_contained_by(self):
- qs = NullableJSONModel.objects.filter(
- value__contained_by={"a": "b", "c": 14, "h": True}
- )
- self.assertCountEqual(qs, self.objs[2:4])
- @skipIfDBFeature("supports_json_field_contains")
- def test_contained_by_unsupported(self):
- msg = "contained_by lookup is not supported on this database backend."
- with self.assertRaisesMessage(NotSupportedError, msg):
- NullableJSONModel.objects.filter(value__contained_by={"a": "b"}).get()
- def test_deep_values(self):
- qs = NullableJSONModel.objects.values_list("value__k__l").order_by("pk")
- expected_objs = [(None,)] * len(self.objs)
- expected_objs[4] = ("m",)
- self.assertSequenceEqual(qs, expected_objs)
- @skipUnlessDBFeature("can_distinct_on_fields")
- def test_deep_distinct(self):
- query = NullableJSONModel.objects.distinct("value__k__l").values_list(
- "value__k__l"
- )
- expected = [("m",), (None,)]
- if not connection.features.nulls_order_largest:
- expected.reverse()
- self.assertSequenceEqual(query, expected)
- def test_isnull_key(self):
- # key__isnull=False works the same as has_key='key'.
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__a__isnull=True),
- self.objs[:3] + self.objs[5:],
- )
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__j__isnull=True),
- self.objs[:4] + self.objs[5:],
- )
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__a__isnull=False),
- [self.objs[3], self.objs[4]],
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__j__isnull=False),
- [self.objs[4]],
- )
- def test_isnull_key_or_none(self):
- obj = NullableJSONModel.objects.create(value={"a": None})
- self.assertCountEqual(
- NullableJSONModel.objects.filter(
- Q(value__a__isnull=True) | Q(value__a=None)
- ),
- self.objs[:3] + self.objs[5:] + [obj],
- )
- def test_none_key(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__j=None),
- [self.objs[4]],
- )
- def test_none_key_exclude(self):
- obj = NullableJSONModel.objects.create(value={"j": 1})
- if connection.vendor == "oracle":
- # Oracle supports filtering JSON objects with NULL keys, but the
- # current implementation doesn't support it.
- self.assertSequenceEqual(
- NullableJSONModel.objects.exclude(value__j=None),
- self.objs[1:4] + self.objs[5:] + [obj],
- )
- else:
- self.assertSequenceEqual(
- NullableJSONModel.objects.exclude(value__j=None), [obj]
- )
- def test_shallow_list_lookup(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__0=1),
- [self.objs[5]],
- )
- def test_shallow_obj_lookup(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__a="b"),
- [self.objs[3], self.objs[4]],
- )
- def test_obj_subquery_lookup(self):
- qs = NullableJSONModel.objects.annotate(
- field=Subquery(
- NullableJSONModel.objects.filter(pk=OuterRef("pk")).values("value")
- ),
- ).filter(field__a="b")
- self.assertCountEqual(qs, [self.objs[3], self.objs[4]])
- def test_deep_lookup_objs(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__k__l="m"),
- [self.objs[4]],
- )
- def test_shallow_lookup_obj_target(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__k={"l": "m"}),
- [self.objs[4]],
- )
- def test_deep_lookup_array(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__1__0=2),
- [self.objs[5]],
- )
- def test_deep_lookup_mixed(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__d__1__f="g"),
- [self.objs[4]],
- )
- def test_deep_lookup_transform(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__c__gt=2),
- [self.objs[3], self.objs[4]],
- )
- self.assertCountEqual(
- NullableJSONModel.objects.filter(value__c__gt=2.33),
- [self.objs[3], self.objs[4]],
- )
- self.assertIs(NullableJSONModel.objects.filter(value__c__lt=5).exists(), False)
- def test_lookups_special_chars(self):
- test_keys = [
- "CONTROL",
- "single'",
- "dollar$",
- "dot.dot",
- "with space",
- "back\\slash",
- "question?mark",
- "user@name",
- "emo🤡'ji",
- "com,ma",
- "curly{{{brace}}}s",
- "escape\uffff'seq'\uffffue\uffff'nce",
- ]
- json_value = {key: "some value" for key in test_keys}
- obj = NullableJSONModel.objects.create(value=json_value)
- obj.refresh_from_db()
- self.assertEqual(obj.value, json_value)
- for key in test_keys:
- lookups = {
- "has_key": Q(value__has_key=key),
- "has_keys": Q(value__has_keys=[key, "CONTROL"]),
- "has_any_keys": Q(value__has_any_keys=[key, "does_not_exist"]),
- "exact": Q(**{f"value__{key}": "some value"}),
- }
- for lookup, condition in lookups.items():
- results = NullableJSONModel.objects.filter(condition)
- with self.subTest(key=key, lookup=lookup):
- self.assertSequenceEqual(results, [obj])
- def test_lookups_special_chars_double_quotes(self):
- test_keys = [
- 'double"',
- "m\\i@x. m🤡'a,t{{{ch}}}e?d$\"'es\uffff'ca\uffff'pe",
- ]
- json_value = {key: "some value" for key in test_keys}
- obj = NullableJSONModel.objects.create(value=json_value)
- obj.refresh_from_db()
- self.assertEqual(obj.value, json_value)
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__has_keys=test_keys), [obj]
- )
- for key in test_keys:
- with self.subTest(key=key):
- results = NullableJSONModel.objects.filter(
- Q(value__has_key=key),
- Q(value__has_any_keys=[key, "does_not_exist"]),
- Q(**{f"value__{key}": "some value"}),
- )
- self.assertSequenceEqual(results, [obj])
- def test_lookup_exclude(self):
- tests = [
- (Q(value__a="b"), [self.objs[0]]),
- (Q(value__foo="bax"), [self.objs[0], self.objs[7]]),
- ]
- for condition, expected in tests:
- self.assertCountEqual(
- NullableJSONModel.objects.exclude(condition),
- expected,
- )
- self.assertCountEqual(
- NullableJSONModel.objects.filter(~condition),
- expected,
- )
- def test_lookup_exclude_nonexistent_key(self):
- # Values without the key are ignored.
- condition = Q(value__foo="bax")
- objs_with_value = [self.objs[6]]
- objs_with_different_value = [self.objs[0], self.objs[7]]
- self.assertCountEqual(
- NullableJSONModel.objects.exclude(condition),
- objs_with_different_value,
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.exclude(~condition),
- objs_with_value,
- )
- self.assertCountEqual(
- NullableJSONModel.objects.filter(condition | ~condition),
- objs_with_value + objs_with_different_value,
- )
- self.assertCountEqual(
- NullableJSONModel.objects.exclude(condition & ~condition),
- objs_with_value + objs_with_different_value,
- )
- # Add the __isnull lookup to get an exhaustive set.
- self.assertCountEqual(
- NullableJSONModel.objects.exclude(condition & Q(value__foo__isnull=False)),
- self.objs[0:6] + self.objs[7:],
- )
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(condition & Q(value__foo__isnull=False)),
- objs_with_value,
- )
- def test_usage_in_subquery(self):
- self.assertCountEqual(
- NullableJSONModel.objects.filter(
- id__in=NullableJSONModel.objects.filter(value__c=14),
- ),
- self.objs[3:5],
- )
- @skipUnlessDBFeature("supports_json_field_contains")
- def test_array_key_contains(self):
- tests = [
- ([], [self.objs[7]]),
- ("bar", [self.objs[7]]),
- (["bar"], [self.objs[7]]),
- ("ar", []),
- ]
- for value, expected in tests:
- with self.subTest(value=value):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__bar__contains=value),
- expected,
- )
- def test_key_iexact(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__iexact="BaR").exists(), True
- )
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__iexact='"BaR"').exists(), False
- )
- def test_key_in(self):
- tests = [
- ("value__c__in", [14], self.objs[3:5]),
- ("value__c__in", [14, 15], self.objs[3:5]),
- ("value__0__in", [1], [self.objs[5]]),
- ("value__0__in", [1, 3], [self.objs[5]]),
- ("value__foo__in", ["bar"], [self.objs[7]]),
- (
- "value__foo__in",
- [KeyTransform("foo", KeyTransform("bax", "value"))],
- [self.objs[7]],
- ),
- ("value__foo__in", [F("value__bax__foo")], [self.objs[7]]),
- (
- "value__foo__in",
- [KeyTransform("foo", KeyTransform("bax", "value")), "baz"],
- [self.objs[7]],
- ),
- ("value__foo__in", [F("value__bax__foo"), "baz"], [self.objs[7]]),
- ("value__foo__in", ["bar", "baz"], [self.objs[7]]),
- ("value__bar__in", [["foo", "bar"]], [self.objs[7]]),
- ("value__bar__in", [["foo", "bar"], ["a"]], [self.objs[7]]),
- ("value__bax__in", [{"foo": "bar"}, {"a": "b"}], [self.objs[7]]),
- ("value__h__in", [True, "foo"], [self.objs[4]]),
- ("value__i__in", [False, "foo"], [self.objs[4]]),
- ]
- for lookup, value, expected in tests:
- with self.subTest(lookup=lookup, value=value), transaction.atomic():
- self.assertCountEqual(
- NullableJSONModel.objects.filter(**{lookup: value}),
- expected,
- )
- def test_key_values(self):
- qs = NullableJSONModel.objects.filter(value__h=True)
- tests = [
- ("value__a", "b"),
- ("value__c", 14),
- ("value__d", ["e", {"f": "g"}]),
- ("value__h", True),
- ("value__i", False),
- ("value__j", None),
- ("value__k", {"l": "m"}),
- ("value__n", [None, True, False]),
- ("value__p", 4.2),
- ("value__r", {"s": True, "t": False}),
- ]
- for lookup, expected in tests:
- with self.subTest(lookup=lookup):
- self.assertEqual(qs.values_list(lookup, flat=True).get(), expected)
- def test_key_values_boolean(self):
- qs = NullableJSONModel.objects.filter(value__h=True, value__i=False)
- tests = [
- ("value__h", True),
- ("value__i", False),
- ]
- for lookup, expected in tests:
- with self.subTest(lookup=lookup):
- self.assertIs(qs.values_list(lookup, flat=True).get(), expected)
- @skipUnlessDBFeature("supports_json_field_contains")
- def test_key_contains(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__contains="ar").exists(), False
- )
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__contains="bar").exists(), True
- )
- def test_key_icontains(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__icontains="Ar").exists(), True
- )
- def test_key_startswith(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__startswith="b").exists(), True
- )
- def test_key_istartswith(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__istartswith="B").exists(), True
- )
- def test_key_endswith(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__endswith="r").exists(), True
- )
- def test_key_iendswith(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__iendswith="R").exists(), True
- )
- def test_key_regex(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__regex=r"^bar$").exists(), True
- )
- def test_key_iregex(self):
- self.assertIs(
- NullableJSONModel.objects.filter(value__foo__iregex=r"^bAr$").exists(), True
- )
- def test_key_quoted_string(self):
- self.assertEqual(
- NullableJSONModel.objects.filter(value__o='"quoted"').get(),
- self.objs[4],
- )
- @skipUnlessDBFeature("has_json_operators")
- def test_key_sql_injection(self):
- with CaptureQueriesContext(connection) as queries:
- self.assertIs(
- NullableJSONModel.objects.filter(
- **{
- """value__test' = '"a"') OR 1 = 1 OR ('d""": "x",
- }
- ).exists(),
- False,
- )
- self.assertIn(
- """."value" -> 'test'' = ''"a"'') OR 1 = 1 OR (''d') = '"x"'""",
- queries[0]["sql"],
- )
- @skipIfDBFeature("has_json_operators")
- def test_key_sql_injection_escape(self):
- query = str(
- JSONModel.objects.filter(
- **{
- """value__test") = '"a"' OR 1 = 1 OR ("d""": "x",
- }
- ).query
- )
- self.assertIn('"test\\"', query)
- self.assertIn('\\"d', query)
- def test_key_escape(self):
- obj = NullableJSONModel.objects.create(value={"%total": 10})
- self.assertEqual(
- NullableJSONModel.objects.filter(**{"value__%total": 10}).get(), obj
- )
- def test_none_key_and_exact_lookup(self):
- self.assertSequenceEqual(
- NullableJSONModel.objects.filter(value__a="b", value__j=None),
- [self.objs[4]],
- )
- def test_lookups_with_key_transform(self):
- tests = (
- ("value__baz__has_key", "c"),
- ("value__baz__has_keys", ["a", "c"]),
- ("value__baz__has_any_keys", ["a", "x"]),
- ("value__has_key", KeyTextTransform("foo", "value")),
- )
- for lookup, value in tests:
- with self.subTest(lookup=lookup):
- self.assertIs(
- NullableJSONModel.objects.filter(
- **{lookup: value},
- ).exists(),
- True,
- )
- @skipUnlessDBFeature("supports_json_field_contains")
- def test_contains_contained_by_with_key_transform(self):
- tests = [
- ("value__d__contains", "e"),
- ("value__d__contains", [{"f": "g"}]),
- ("value__contains", KeyTransform("bax", "value")),
- ("value__contains", F("value__bax")),
- ("value__baz__contains", {"a": "b"}),
- ("value__baz__contained_by", {"a": "b", "c": "d", "e": "f"}),
- (
- "value__contained_by",
- KeyTransform(
- "x",
- RawSQL(
- self.raw_sql,
- ['{"x": {"a": "b", "c": 1, "d": "e"}}'],
- ),
- ),
- ),
- ]
- # For databases where {'f': 'g'} (without surrounding []) matches
- # [{'f': 'g'}].
- if not connection.features.json_key_contains_list_matching_requires_list:
- tests.append(("value__d__contains", {"f": "g"}))
- for lookup, value in tests:
- with self.subTest(lookup=lookup, value=value):
- self.assertIs(
- NullableJSONModel.objects.filter(
- **{lookup: value},
- ).exists(),
- True,
- )
- def test_join_key_transform_annotation_expression(self):
- related_obj = RelatedJSONModel.objects.create(
- value={"d": ["f", "e"]},
- json_model=self.objs[4],
- )
- RelatedJSONModel.objects.create(
- value={"d": ["e", "f"]},
- json_model=self.objs[4],
- )
- self.assertSequenceEqual(
- RelatedJSONModel.objects.annotate(
- key=F("value__d"),
- related_key=F("json_model__value__d"),
- chain=F("key__1"),
- expr=Cast("key", models.JSONField()),
- ).filter(chain=F("related_key__0")),
- [related_obj],
- )
- def test_key_text_transform_from_lookup(self):
- qs = NullableJSONModel.objects.annotate(b=KT("value__bax__foo")).filter(
- b__contains="ar",
- )
- self.assertSequenceEqual(qs, [self.objs[7]])
- qs = NullableJSONModel.objects.annotate(c=KT("value__o")).filter(
- c__contains="uot",
- )
- self.assertSequenceEqual(qs, [self.objs[4]])
- def test_key_text_transform_from_lookup_invalid(self):
- msg = "Lookup must contain key or index transforms."
- with self.assertRaisesMessage(ValueError, msg):
- KT("value")
- with self.assertRaisesMessage(ValueError, msg):
- KT("")
- def test_literal_annotation_filtering(self):
- all_objects = NullableJSONModel.objects.order_by("id")
- qs = all_objects.annotate(data=Value({"foo": "bar"}, JSONField())).filter(
- data__foo="bar"
- )
- self.assertQuerySetEqual(qs, all_objects)
|