123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- from datetime import date
- from decimal import Decimal
- from django.core.exceptions import FieldDoesNotExist
- from django.db.models.query import RawQuerySet
- from django.test import TestCase, skipUnlessDBFeature
- from .models import (
- Author,
- Book,
- BookFkAsPk,
- Coffee,
- FriendlyAuthor,
- MixedCaseIDColumn,
- Reviewer,
- )
- class RawQueryTests(TestCase):
- @classmethod
- def setUpTestData(cls):
- cls.a1 = Author.objects.create(
- first_name="Joe", last_name="Smith", dob=date(1950, 9, 20)
- )
- cls.a2 = Author.objects.create(
- first_name="Jill", last_name="Doe", dob=date(1920, 4, 2)
- )
- cls.a3 = Author.objects.create(
- first_name="Bob", last_name="Smith", dob=date(1986, 1, 25)
- )
- cls.a4 = Author.objects.create(
- first_name="Bill", last_name="Jones", dob=date(1932, 5, 10)
- )
- cls.b1 = Book.objects.create(
- title="The awesome book",
- author=cls.a1,
- paperback=False,
- opening_line=(
- "It was a bright cold day in April and the clocks were striking "
- "thirteen."
- ),
- )
- cls.b2 = Book.objects.create(
- title="The horrible book",
- author=cls.a1,
- paperback=True,
- opening_line=(
- "On an evening in the latter part of May a middle-aged man "
- "was walking homeward from Shaston to the village of Marlott, "
- "in the adjoining Vale of Blakemore, or Blackmoor."
- ),
- )
- cls.b3 = Book.objects.create(
- title="Another awesome book",
- author=cls.a1,
- paperback=False,
- opening_line="A squat gray building of only thirty-four stories.",
- )
- cls.b4 = Book.objects.create(
- title="Some other book",
- author=cls.a3,
- paperback=True,
- opening_line="It was the day my grandmother exploded.",
- )
- cls.c1 = Coffee.objects.create(brand="dunkin doughnuts")
- cls.c2 = Coffee.objects.create(brand="starbucks")
- cls.r1 = Reviewer.objects.create()
- cls.r2 = Reviewer.objects.create()
- cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
- def assertSuccessfulRawQuery(
- self,
- model,
- query,
- expected_results,
- expected_annotations=(),
- params=[],
- translations=None,
- ):
- """
- Execute the passed query against the passed model and check the output
- """
- results = list(
- model.objects.raw(query, params=params, translations=translations)
- )
- self.assertProcessed(model, results, expected_results, expected_annotations)
- self.assertAnnotations(results, expected_annotations)
- def assertProcessed(self, model, results, orig, expected_annotations=()):
- """
- Compare the results of a raw query against expected results
- """
- self.assertEqual(len(results), len(orig))
- for index, item in enumerate(results):
- orig_item = orig[index]
- for annotation in expected_annotations:
- setattr(orig_item, *annotation)
- for field in model._meta.fields:
- # All values on the model are equal
- self.assertEqual(
- getattr(item, field.attname), getattr(orig_item, field.attname)
- )
- # This includes checking that they are the same type
- self.assertEqual(
- type(getattr(item, field.attname)),
- type(getattr(orig_item, field.attname)),
- )
- def assertNoAnnotations(self, results):
- """
- The results of a raw query contain no annotations
- """
- self.assertAnnotations(results, ())
- def assertAnnotations(self, results, expected_annotations):
- """
- The passed raw query results contain the expected annotations
- """
- if expected_annotations:
- for index, result in enumerate(results):
- annotation, value = expected_annotations[index]
- self.assertTrue(hasattr(result, annotation))
- self.assertEqual(getattr(result, annotation), value)
- def test_rawqueryset_repr(self):
- queryset = RawQuerySet(raw_query="SELECT * FROM raw_query_author")
- self.assertEqual(
- repr(queryset), "<RawQuerySet: SELECT * FROM raw_query_author>"
- )
- self.assertEqual(
- repr(queryset.query), "<RawQuery: SELECT * FROM raw_query_author>"
- )
- def test_simple_raw_query(self):
- """
- Basic test of raw query with a simple database query
- """
- query = "SELECT * FROM raw_query_author"
- authors = Author.objects.all()
- self.assertSuccessfulRawQuery(Author, query, authors)
- def test_raw_query_lazy(self):
- """
- Raw queries are lazy: they aren't actually executed until they're
- iterated over.
- """
- q = Author.objects.raw("SELECT * FROM raw_query_author")
- self.assertIsNone(q.query.cursor)
- list(q)
- self.assertIsNotNone(q.query.cursor)
- def test_FK_raw_query(self):
- """
- Test of a simple raw query against a model containing a foreign key
- """
- query = "SELECT * FROM raw_query_book"
- books = Book.objects.all()
- self.assertSuccessfulRawQuery(Book, query, books)
- def test_db_column_handler(self):
- """
- Test of a simple raw query against a model containing a field with
- db_column defined.
- """
- query = "SELECT * FROM raw_query_coffee"
- coffees = Coffee.objects.all()
- self.assertSuccessfulRawQuery(Coffee, query, coffees)
- def test_pk_with_mixed_case_db_column(self):
- """
- A raw query with a model that has a pk db_column with mixed case.
- """
- query = "SELECT * FROM raw_query_mixedcaseidcolumn"
- queryset = MixedCaseIDColumn.objects.all()
- self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
- def test_order_handler(self):
- """Raw query tolerates columns being returned in any order."""
- selects = (
- ("dob, last_name, first_name, id"),
- ("last_name, dob, first_name, id"),
- ("first_name, last_name, dob, id"),
- )
- for select in selects:
- query = "SELECT %s FROM raw_query_author" % select
- authors = Author.objects.all()
- self.assertSuccessfulRawQuery(Author, query, authors)
- def test_translations(self):
- """
- Test of raw query's optional ability to translate unexpected result
- column names to specific model fields
- """
- query = (
- "SELECT first_name AS first, last_name AS last, dob, id "
- "FROM raw_query_author"
- )
- translations = {"first": "first_name", "last": "last_name"}
- authors = Author.objects.all()
- self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
- def test_params(self):
- """
- Test passing optional query parameters
- """
- query = "SELECT * FROM raw_query_author WHERE first_name = %s"
- author = Author.objects.all()[2]
- params = [author.first_name]
- qset = Author.objects.raw(query, params=params)
- results = list(qset)
- self.assertProcessed(Author, results, [author])
- self.assertNoAnnotations(results)
- self.assertEqual(len(results), 1)
- self.assertIsInstance(repr(qset), str)
- def test_params_none(self):
- query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'"
- qset = Author.objects.raw(query, params=None)
- self.assertEqual(len(qset), 2)
- def test_escaped_percent(self):
- query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'"
- qset = Author.objects.raw(query)
- self.assertEqual(len(qset), 2)
- @skipUnlessDBFeature("supports_paramstyle_pyformat")
- def test_pyformat_params(self):
- """
- Test passing optional query parameters
- """
- query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
- author = Author.objects.all()[2]
- params = {"first": author.first_name}
- qset = Author.objects.raw(query, params=params)
- results = list(qset)
- self.assertProcessed(Author, results, [author])
- self.assertNoAnnotations(results)
- self.assertEqual(len(results), 1)
- self.assertIsInstance(repr(qset), str)
- def test_query_representation(self):
- """
- Test representation of raw query with parameters
- """
- query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
- qset = Author.objects.raw(query, {"last": "foo"})
- self.assertEqual(
- repr(qset),
- "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
- )
- self.assertEqual(
- repr(qset.query),
- "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
- )
- query = "SELECT * FROM raw_query_author WHERE last_name = %s"
- qset = Author.objects.raw(query, {"foo"})
- self.assertEqual(
- repr(qset),
- "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>",
- )
- self.assertEqual(
- repr(qset.query),
- "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>",
- )
- def test_many_to_many(self):
- """
- Test of a simple raw query against a model containing a m2m field
- """
- query = "SELECT * FROM raw_query_reviewer"
- reviewers = Reviewer.objects.all()
- self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
- def test_extra_conversions(self):
- """Extra translations are ignored."""
- query = "SELECT * FROM raw_query_author"
- translations = {"something": "else"}
- authors = Author.objects.all()
- self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
- def test_missing_fields(self):
- query = "SELECT id, first_name, dob FROM raw_query_author"
- for author in Author.objects.raw(query):
- self.assertIsNotNone(author.first_name)
- # last_name isn't given, but it will be retrieved on demand
- self.assertIsNotNone(author.last_name)
- def test_missing_fields_without_PK(self):
- query = "SELECT first_name, dob FROM raw_query_author"
- msg = "Raw query must include the primary key"
- with self.assertRaisesMessage(FieldDoesNotExist, msg):
- list(Author.objects.raw(query))
- def test_annotations(self):
- query = (
- "SELECT a.*, count(b.id) as book_count "
- "FROM raw_query_author a "
- "LEFT JOIN raw_query_book b ON a.id = b.author_id "
- "GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
- )
- expected_annotations = (
- ("book_count", 3),
- ("book_count", 0),
- ("book_count", 1),
- ("book_count", 0),
- )
- authors = Author.objects.order_by("pk")
- self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
- def test_white_space_query(self):
- query = " SELECT * FROM raw_query_author"
- authors = Author.objects.all()
- self.assertSuccessfulRawQuery(Author, query, authors)
- def test_multiple_iterations(self):
- query = "SELECT * FROM raw_query_author"
- normal_authors = Author.objects.all()
- raw_authors = Author.objects.raw(query)
- # First Iteration
- first_iterations = 0
- for index, raw_author in enumerate(raw_authors):
- self.assertEqual(normal_authors[index], raw_author)
- first_iterations += 1
- # Second Iteration
- second_iterations = 0
- for index, raw_author in enumerate(raw_authors):
- self.assertEqual(normal_authors[index], raw_author)
- second_iterations += 1
- self.assertEqual(first_iterations, second_iterations)
- def test_get_item(self):
- # Indexing on RawQuerySets
- query = "SELECT * FROM raw_query_author ORDER BY id ASC"
- third_author = Author.objects.raw(query)[2]
- self.assertEqual(third_author.first_name, "Bob")
- first_two = Author.objects.raw(query)[0:2]
- self.assertEqual(len(first_two), 2)
- with self.assertRaises(TypeError):
- Author.objects.raw(query)["test"]
- def test_inheritance(self):
- f = FriendlyAuthor.objects.create(
- first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28)
- )
- query = "SELECT * FROM raw_query_friendlyauthor"
- self.assertEqual([o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk])
- def test_query_count(self):
- self.assertNumQueries(
- 1, list, Author.objects.raw("SELECT * FROM raw_query_author")
- )
- def test_subquery_in_raw_sql(self):
- list(
- Book.objects.raw(
- "SELECT id FROM "
- "(SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq"
- )
- )
- def test_db_column_name_is_used_in_raw_query(self):
- """
- Regression test that ensures the `column` attribute on the field is
- used to generate the list of fields included in the query, as opposed
- to the `attname`. This is important when the primary key is a
- ForeignKey field because `attname` and `column` are not necessarily the
- same.
- """
- b = BookFkAsPk.objects.create(book=self.b1)
- self.assertEqual(
- list(
- BookFkAsPk.objects.raw(
- "SELECT not_the_default FROM raw_query_bookfkaspk"
- )
- ),
- [b],
- )
- def test_decimal_parameter(self):
- c = Coffee.objects.create(brand="starbucks", price=20.5)
- qs = Coffee.objects.raw(
- "SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)]
- )
- self.assertEqual(list(qs), [c])
- def test_result_caching(self):
- with self.assertNumQueries(1):
- books = Book.objects.raw("SELECT * FROM raw_query_book")
- list(books)
- list(books)
- def test_iterator(self):
- with self.assertNumQueries(2):
- books = Book.objects.raw("SELECT * FROM raw_query_book")
- list(books.iterator())
- list(books.iterator())
- def test_bool(self):
- self.assertIs(bool(Book.objects.raw("SELECT * FROM raw_query_book")), True)
- self.assertIs(
- bool(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), False
- )
- def test_len(self):
- self.assertEqual(len(Book.objects.raw("SELECT * FROM raw_query_book")), 4)
- self.assertEqual(
- len(Book.objects.raw("SELECT * FROM raw_query_book WHERE id = 0")), 0
- )
|