|
@@ -1,11 +1,18 @@
|
|
|
+from datetime import date
|
|
|
+from decimal import Decimal
|
|
|
from unittest import mock
|
|
|
|
|
|
from django.db import connection, transaction
|
|
|
-from django.db.models import Case, Count, F, FilteredRelation, Q, When
|
|
|
+from django.db.models import (
|
|
|
+ Case, Count, DecimalField, F, FilteredRelation, Q, Sum, When,
|
|
|
+)
|
|
|
from django.test import TestCase
|
|
|
from django.test.testcases import skipUnlessDBFeature
|
|
|
|
|
|
-from .models import Author, Book, Borrower, Editor, RentalSession, Reservation
|
|
|
+from .models import (
|
|
|
+ Author, Book, BookDailySales, Borrower, Currency, Editor, ExchangeRate,
|
|
|
+ RentalSession, Reservation, Seller,
|
|
|
+)
|
|
|
|
|
|
|
|
|
class FilteredRelationTests(TestCase):
|
|
@@ -279,28 +286,148 @@ class FilteredRelationTests(TestCase):
|
|
|
qs = Author.objects.filter(id__in=inner_qs)
|
|
|
self.assertSequenceEqual(qs, [self.author1])
|
|
|
|
|
|
- def test_with_foreign_key_error(self):
|
|
|
+ def test_nested_foreign_key(self):
|
|
|
+ qs = Author.objects.annotate(
|
|
|
+ book_editor_worked_with=FilteredRelation(
|
|
|
+ 'book__editor',
|
|
|
+ condition=Q(book__title__icontains='book by'),
|
|
|
+ ),
|
|
|
+ ).filter(
|
|
|
+ book_editor_worked_with__isnull=False,
|
|
|
+ ).select_related(
|
|
|
+ 'book_editor_worked_with',
|
|
|
+ ).order_by('pk', 'book_editor_worked_with__pk')
|
|
|
+ with self.assertNumQueries(1):
|
|
|
+ self.assertQuerysetEqual(qs, [
|
|
|
+ (self.author1, self.editor_a),
|
|
|
+ (self.author2, self.editor_b),
|
|
|
+ (self.author2, self.editor_b),
|
|
|
+ ], lambda x: (x, x.book_editor_worked_with))
|
|
|
+
|
|
|
+ def test_nested_foreign_key_nested_field(self):
|
|
|
+ qs = Author.objects.annotate(
|
|
|
+ book_editor_worked_with=FilteredRelation(
|
|
|
+ 'book__editor',
|
|
|
+ condition=Q(book__title__icontains='book by')
|
|
|
+ ),
|
|
|
+ ).filter(
|
|
|
+ book_editor_worked_with__isnull=False,
|
|
|
+ ).values(
|
|
|
+ 'name', 'book_editor_worked_with__name',
|
|
|
+ ).order_by('name', 'book_editor_worked_with__name').distinct()
|
|
|
+ self.assertSequenceEqual(qs, [
|
|
|
+ {'name': self.author1.name, 'book_editor_worked_with__name': self.editor_a.name},
|
|
|
+ {'name': self.author2.name, 'book_editor_worked_with__name': self.editor_b.name},
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_nested_foreign_key_filtered_base_object(self):
|
|
|
+ qs = Author.objects.annotate(
|
|
|
+ alice_editors=FilteredRelation(
|
|
|
+ 'book__editor',
|
|
|
+ condition=Q(name='Alice'),
|
|
|
+ ),
|
|
|
+ ).values(
|
|
|
+ 'name', 'alice_editors__pk',
|
|
|
+ ).order_by('name', 'alice_editors__name').distinct()
|
|
|
+ self.assertSequenceEqual(qs, [
|
|
|
+ {'name': self.author1.name, 'alice_editors__pk': self.editor_a.pk},
|
|
|
+ {'name': self.author2.name, 'alice_editors__pk': None},
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_nested_m2m_filtered(self):
|
|
|
+ qs = Book.objects.annotate(
|
|
|
+ favorite_book=FilteredRelation(
|
|
|
+ 'author__favorite_books',
|
|
|
+ condition=Q(author__favorite_books__title__icontains='book by')
|
|
|
+ ),
|
|
|
+ ).values(
|
|
|
+ 'title', 'favorite_book__pk',
|
|
|
+ ).order_by('title', 'favorite_book__title')
|
|
|
+ self.assertSequenceEqual(qs, [
|
|
|
+ {'title': self.book1.title, 'favorite_book__pk': self.book2.pk},
|
|
|
+ {'title': self.book1.title, 'favorite_book__pk': self.book3.pk},
|
|
|
+ {'title': self.book4.title, 'favorite_book__pk': self.book2.pk},
|
|
|
+ {'title': self.book4.title, 'favorite_book__pk': self.book3.pk},
|
|
|
+ {'title': self.book2.title, 'favorite_book__pk': None},
|
|
|
+ {'title': self.book3.title, 'favorite_book__pk': None},
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_nested_chained_relations(self):
|
|
|
+ qs = Author.objects.annotate(
|
|
|
+ my_books=FilteredRelation(
|
|
|
+ 'book', condition=Q(book__title__icontains='book by'),
|
|
|
+ ),
|
|
|
+ preferred_by_authors=FilteredRelation(
|
|
|
+ 'my_books__preferred_by_authors',
|
|
|
+ condition=Q(my_books__preferred_by_authors__name='Alice'),
|
|
|
+ ),
|
|
|
+ ).annotate(
|
|
|
+ author=F('name'),
|
|
|
+ book_title=F('my_books__title'),
|
|
|
+ preferred_by_author_pk=F('preferred_by_authors'),
|
|
|
+ ).order_by('author', 'book_title', 'preferred_by_author_pk')
|
|
|
+ self.assertQuerysetEqual(qs, [
|
|
|
+ ('Alice', 'The book by Alice', None),
|
|
|
+ ('Jane', 'The book by Jane A', self.author1.pk),
|
|
|
+ ('Jane', 'The book by Jane B', self.author1.pk),
|
|
|
+ ], lambda x: (x.author, x.book_title, x.preferred_by_author_pk))
|
|
|
+
|
|
|
+ def test_deep_nested_foreign_key(self):
|
|
|
+ qs = Book.objects.annotate(
|
|
|
+ author_favorite_book_editor=FilteredRelation(
|
|
|
+ 'author__favorite_books__editor',
|
|
|
+ condition=Q(author__favorite_books__title__icontains='Jane A'),
|
|
|
+ ),
|
|
|
+ ).filter(
|
|
|
+ author_favorite_book_editor__isnull=False,
|
|
|
+ ).select_related(
|
|
|
+ 'author_favorite_book_editor',
|
|
|
+ ).order_by('pk', 'author_favorite_book_editor__pk')
|
|
|
+ with self.assertNumQueries(1):
|
|
|
+ self.assertQuerysetEqual(qs, [
|
|
|
+ (self.book1, self.editor_b),
|
|
|
+ (self.book4, self.editor_b),
|
|
|
+ ], lambda x: (x, x.author_favorite_book_editor))
|
|
|
+
|
|
|
+ def test_relation_name_lookup(self):
|
|
|
msg = (
|
|
|
- "FilteredRelation's condition doesn't support nested relations "
|
|
|
- "(got 'author__favorite_books__author')."
|
|
|
+ "FilteredRelation's relation_name cannot contain lookups (got "
|
|
|
+ "'book__title__icontains')."
|
|
|
)
|
|
|
with self.assertRaisesMessage(ValueError, msg):
|
|
|
- list(Book.objects.annotate(
|
|
|
- alice_favorite_books=FilteredRelation(
|
|
|
- 'author__favorite_books',
|
|
|
- condition=Q(author__favorite_books__author=self.author1),
|
|
|
- )
|
|
|
- ))
|
|
|
+ Author.objects.annotate(
|
|
|
+ book_title=FilteredRelation(
|
|
|
+ 'book__title__icontains',
|
|
|
+ condition=Q(book__title='Poem by Alice'),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_condition_outside_relation_name(self):
|
|
|
+ msg = (
|
|
|
+ "FilteredRelation's condition doesn't support relations outside "
|
|
|
+ "the 'book__editor' (got 'book__author__name__icontains')."
|
|
|
+ )
|
|
|
+ with self.assertRaisesMessage(ValueError, msg):
|
|
|
+ Author.objects.annotate(
|
|
|
+ book_editor=FilteredRelation(
|
|
|
+ 'book__editor',
|
|
|
+ condition=Q(book__author__name__icontains='book'),
|
|
|
+ ),
|
|
|
+ )
|
|
|
|
|
|
- def test_with_foreign_key_on_condition_error(self):
|
|
|
+ def test_condition_deeper_relation_name(self):
|
|
|
msg = (
|
|
|
"FilteredRelation's condition doesn't support nested relations "
|
|
|
- "(got 'book__editor__name__icontains')."
|
|
|
+ "deeper than the relation_name (got "
|
|
|
+ "'book__editor__name__icontains' for 'book')."
|
|
|
)
|
|
|
with self.assertRaisesMessage(ValueError, msg):
|
|
|
- list(Author.objects.annotate(
|
|
|
- book_edited_by_b=FilteredRelation('book', condition=Q(book__editor__name__icontains='b')),
|
|
|
- ))
|
|
|
+ Author.objects.annotate(
|
|
|
+ book_editor=FilteredRelation(
|
|
|
+ 'book',
|
|
|
+ condition=Q(book__editor__name__icontains='b'),
|
|
|
+ ),
|
|
|
+ )
|
|
|
|
|
|
def test_with_empty_relation_name_error(self):
|
|
|
with self.assertRaisesMessage(ValueError, 'relation_name cannot be empty.'):
|
|
@@ -424,3 +551,123 @@ class FilteredRelationAggregationTests(TestCase):
|
|
|
).distinct()
|
|
|
self.assertEqual(qs.count(), 1)
|
|
|
self.assertSequenceEqual(qs.annotate(total=Count('pk')).values('total'), [{'total': 1}])
|
|
|
+
|
|
|
+
|
|
|
+class FilteredRelationAnalyticalAggregationTests(TestCase):
|
|
|
+ @classmethod
|
|
|
+ def setUpTestData(cls):
|
|
|
+ author = Author.objects.create(name='Author')
|
|
|
+ editor = Editor.objects.create(name='Editor')
|
|
|
+ cls.book1 = Book.objects.create(
|
|
|
+ title='Poem by Alice',
|
|
|
+ editor=editor,
|
|
|
+ author=author,
|
|
|
+ )
|
|
|
+ cls.book2 = Book.objects.create(
|
|
|
+ title='The book by Jane A',
|
|
|
+ editor=editor,
|
|
|
+ author=author,
|
|
|
+ )
|
|
|
+ cls.book3 = Book.objects.create(
|
|
|
+ title='The book by Jane B',
|
|
|
+ editor=editor,
|
|
|
+ author=author,
|
|
|
+ )
|
|
|
+ cls.seller1 = Seller.objects.create(name='Seller 1')
|
|
|
+ cls.seller2 = Seller.objects.create(name='Seller 2')
|
|
|
+ cls.usd = Currency.objects.create(currency='USD')
|
|
|
+ cls.eur = Currency.objects.create(currency='EUR')
|
|
|
+ cls.sales_date1 = date(2020, 7, 6)
|
|
|
+ cls.sales_date2 = date(2020, 7, 7)
|
|
|
+ ExchangeRate.objects.bulk_create([
|
|
|
+ ExchangeRate(
|
|
|
+ rate_date=cls.sales_date1,
|
|
|
+ from_currency=cls.usd,
|
|
|
+ to_currency=cls.eur,
|
|
|
+ rate=0.40,
|
|
|
+ ),
|
|
|
+ ExchangeRate(
|
|
|
+ rate_date=cls.sales_date1,
|
|
|
+ from_currency=cls.eur,
|
|
|
+ to_currency=cls.usd,
|
|
|
+ rate=1.60,
|
|
|
+ ),
|
|
|
+ ExchangeRate(
|
|
|
+ rate_date=cls.sales_date2,
|
|
|
+ from_currency=cls.usd,
|
|
|
+ to_currency=cls.eur,
|
|
|
+ rate=0.50,
|
|
|
+ ),
|
|
|
+ ExchangeRate(
|
|
|
+ rate_date=cls.sales_date2,
|
|
|
+ from_currency=cls.eur,
|
|
|
+ to_currency=cls.usd,
|
|
|
+ rate=1.50,
|
|
|
+ ),
|
|
|
+ ExchangeRate(
|
|
|
+ rate_date=cls.sales_date2,
|
|
|
+ from_currency=cls.usd,
|
|
|
+ to_currency=cls.usd,
|
|
|
+ rate=1.00,
|
|
|
+ ),
|
|
|
+ ])
|
|
|
+ BookDailySales.objects.bulk_create([
|
|
|
+ BookDailySales(
|
|
|
+ book=cls.book1,
|
|
|
+ sale_date=cls.sales_date1,
|
|
|
+ currency=cls.usd,
|
|
|
+ sales=100.00,
|
|
|
+ seller=cls.seller1,
|
|
|
+ ),
|
|
|
+ BookDailySales(
|
|
|
+ book=cls.book2,
|
|
|
+ sale_date=cls.sales_date1,
|
|
|
+ currency=cls.eur,
|
|
|
+ sales=200.00,
|
|
|
+ seller=cls.seller1,
|
|
|
+ ),
|
|
|
+ BookDailySales(
|
|
|
+ book=cls.book1,
|
|
|
+ sale_date=cls.sales_date2,
|
|
|
+ currency=cls.usd,
|
|
|
+ sales=50.00,
|
|
|
+ seller=cls.seller2,
|
|
|
+ ),
|
|
|
+ BookDailySales(
|
|
|
+ book=cls.book2,
|
|
|
+ sale_date=cls.sales_date2,
|
|
|
+ currency=cls.eur,
|
|
|
+ sales=100.00,
|
|
|
+ seller=cls.seller2,
|
|
|
+ ),
|
|
|
+ ])
|
|
|
+
|
|
|
+ def test_aggregate(self):
|
|
|
+ tests = [
|
|
|
+ Q(daily_sales__sale_date__gte=self.sales_date2),
|
|
|
+ ~Q(daily_sales__seller=self.seller1),
|
|
|
+ ]
|
|
|
+ for condition in tests:
|
|
|
+ with self.subTest(condition=condition):
|
|
|
+ qs = Book.objects.annotate(
|
|
|
+ recent_sales=FilteredRelation('daily_sales', condition=condition),
|
|
|
+ recent_sales_rates=FilteredRelation(
|
|
|
+ 'recent_sales__currency__rates_from',
|
|
|
+ condition=Q(
|
|
|
+ recent_sales__currency__rates_from__rate_date=F('recent_sales__sale_date'),
|
|
|
+ recent_sales__currency__rates_from__to_currency=self.usd,
|
|
|
+ ),
|
|
|
+ ),
|
|
|
+ ).annotate(
|
|
|
+ sales_sum=Sum(
|
|
|
+ F('recent_sales__sales') * F('recent_sales_rates__rate'),
|
|
|
+ output_field=DecimalField(),
|
|
|
+ ),
|
|
|
+ ).values('title', 'sales_sum').order_by(
|
|
|
+ F('sales_sum').desc(nulls_last=True),
|
|
|
+ )
|
|
|
+ self.assertSequenceEqual(qs, [
|
|
|
+ {'title': self.book2.title, 'sales_sum': Decimal(150.00)},
|
|
|
+ {'title': self.book1.title, 'sales_sum': Decimal(50.00)},
|
|
|
+ {'title': self.book3.title, 'sales_sum': None},
|
|
|
+ ])
|