123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import json
- import warnings
- from django.contrib.postgres.fields import ArrayField
- from django.db.models import Aggregate, BooleanField, JSONField, TextField, Value
- from django.utils.deprecation import RemovedInDjango50Warning, RemovedInDjango51Warning
- from .mixins import OrderableAggMixin
- __all__ = [
- "ArrayAgg",
- "BitAnd",
- "BitOr",
- "BitXor",
- "BoolAnd",
- "BoolOr",
- "JSONBAgg",
- "StringAgg",
- ]
- NOT_PROVIDED = object()
- class DeprecatedConvertValueMixin:
- def __init__(self, *expressions, default=NOT_PROVIDED, **extra):
- if default is NOT_PROVIDED:
- default = None
- self._default_provided = False
- else:
- self._default_provided = True
- super().__init__(*expressions, default=default, **extra)
- def resolve_expression(self, *args, **kwargs):
- resolved = super().resolve_expression(*args, **kwargs)
- if not self._default_provided:
- resolved.empty_result_set_value = getattr(
- self, "deprecation_empty_result_set_value", self.deprecation_value
- )
- return resolved
- def convert_value(self, value, expression, connection):
- if value is None and not self._default_provided:
- warnings.warn(self.deprecation_msg, category=RemovedInDjango50Warning)
- return self.deprecation_value
- return value
- class ArrayAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
- function = "ARRAY_AGG"
- template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
- allow_distinct = True
-
- deprecation_value = property(lambda self: [])
- deprecation_msg = (
- "In Django 5.0, ArrayAgg() will return None instead of an empty list "
- "if there are no rows. Pass default=None to opt into the new behavior "
- "and silence this warning or default=[] to keep the previous behavior."
- )
- @property
- def output_field(self):
- return ArrayField(self.source_expressions[0].output_field)
- class BitAnd(Aggregate):
- function = "BIT_AND"
- class BitOr(Aggregate):
- function = "BIT_OR"
- class BitXor(Aggregate):
- function = "BIT_XOR"
- class BoolAnd(Aggregate):
- function = "BOOL_AND"
- output_field = BooleanField()
- class BoolOr(Aggregate):
- function = "BOOL_OR"
- output_field = BooleanField()
- class JSONBAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
- function = "JSONB_AGG"
- template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
- allow_distinct = True
- output_field = JSONField()
-
- deprecation_value = "[]"
- deprecation_empty_result_set_value = property(lambda self: [])
- deprecation_msg = (
- "In Django 5.0, JSONBAgg() will return None instead of an empty list "
- "if there are no rows. Pass default=None to opt into the new behavior "
- "and silence this warning or default=[] to keep the previous "
- "behavior."
- )
-
-
-
-
- def __init__(self, *expressions, default=NOT_PROVIDED, **extra):
- super().__init__(*expressions, default=default, **extra)
- if (
- isinstance(default, Value)
- and isinstance(default.value, str)
- and not isinstance(default.output_field, JSONField)
- ):
- value = default.value
- try:
- decoded = json.loads(value)
- except json.JSONDecodeError:
- warnings.warn(
- "Passing a Value() with an output_field that isn't a JSONField as "
- "JSONBAgg(default) is deprecated. Pass default="
- f"Value({value!r}, output_field=JSONField()) instead.",
- stacklevel=2,
- category=RemovedInDjango51Warning,
- )
- self.default.output_field = self.output_field
- else:
- self.default = Value(decoded, self.output_field)
- warnings.warn(
- "Passing an encoded JSON string as JSONBAgg(default) is "
- f"deprecated. Pass default={decoded!r} instead.",
- stacklevel=2,
- category=RemovedInDjango51Warning,
- )
- class StringAgg(DeprecatedConvertValueMixin, OrderableAggMixin, Aggregate):
- function = "STRING_AGG"
- template = "%(function)s(%(distinct)s%(expressions)s %(ordering)s)"
- allow_distinct = True
- output_field = TextField()
-
- deprecation_value = ""
- deprecation_msg = (
- "In Django 5.0, StringAgg() will return None instead of an empty "
- "string if there are no rows. Pass default=None to opt into the new "
- 'behavior and silence this warning or default="" to keep the previous behavior.'
- )
- def __init__(self, expression, delimiter, **extra):
- delimiter_expr = Value(str(delimiter))
- super().__init__(expression, delimiter_expr, **extra)
|