tests.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. from datetime import date
  2. from decimal import Decimal
  3. from django.db.models.query import RawQuerySet
  4. from django.db.models.query_utils import InvalidQuery
  5. from django.test import TestCase, skipUnlessDBFeature
  6. from .models import (
  7. Author, Book, BookFkAsPk, Coffee, FriendlyAuthor, MixedCaseIDColumn,
  8. Reviewer,
  9. )
  10. class RawQueryTests(TestCase):
  11. @classmethod
  12. def setUpTestData(cls):
  13. cls.a1 = Author.objects.create(first_name='Joe', last_name='Smith', dob=date(1950, 9, 20))
  14. cls.a2 = Author.objects.create(first_name='Jill', last_name='Doe', dob=date(1920, 4, 2))
  15. cls.a3 = Author.objects.create(first_name='Bob', last_name='Smith', dob=date(1986, 1, 25))
  16. cls.a4 = Author.objects.create(first_name='Bill', last_name='Jones', dob=date(1932, 5, 10))
  17. cls.b1 = Book.objects.create(
  18. title='The awesome book', author=cls.a1, paperback=False,
  19. opening_line='It was a bright cold day in April and the clocks were striking thirteen.',
  20. )
  21. cls.b2 = Book.objects.create(
  22. title='The horrible book', author=cls.a1, paperback=True,
  23. opening_line=(
  24. 'On an evening in the latter part of May a middle-aged man '
  25. 'was walking homeward from Shaston to the village of Marlott, '
  26. 'in the adjoining Vale of Blakemore, or Blackmoor.'
  27. ),
  28. )
  29. cls.b3 = Book.objects.create(
  30. title='Another awesome book', author=cls.a1, paperback=False,
  31. opening_line='A squat grey building of only thirty-four stories.',
  32. )
  33. cls.b4 = Book.objects.create(
  34. title='Some other book', author=cls.a3, paperback=True,
  35. opening_line='It was the day my grandmother exploded.',
  36. )
  37. cls.c1 = Coffee.objects.create(brand='dunkin doughnuts')
  38. cls.c2 = Coffee.objects.create(brand='starbucks')
  39. cls.r1 = Reviewer.objects.create()
  40. cls.r2 = Reviewer.objects.create()
  41. cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
  42. def assertSuccessfulRawQuery(self, model, query, expected_results,
  43. expected_annotations=(), params=[], translations=None):
  44. """
  45. Execute the passed query against the passed model and check the output
  46. """
  47. results = list(model.objects.raw(query, params=params, translations=translations))
  48. self.assertProcessed(model, results, expected_results, expected_annotations)
  49. self.assertAnnotations(results, expected_annotations)
  50. def assertProcessed(self, model, results, orig, expected_annotations=()):
  51. """
  52. Compare the results of a raw query against expected results
  53. """
  54. self.assertEqual(len(results), len(orig))
  55. for index, item in enumerate(results):
  56. orig_item = orig[index]
  57. for annotation in expected_annotations:
  58. setattr(orig_item, *annotation)
  59. for field in model._meta.fields:
  60. # All values on the model are equal
  61. self.assertEqual(
  62. getattr(item, field.attname),
  63. getattr(orig_item, field.attname)
  64. )
  65. # This includes checking that they are the same type
  66. self.assertEqual(
  67. type(getattr(item, field.attname)),
  68. type(getattr(orig_item, field.attname))
  69. )
  70. def assertNoAnnotations(self, results):
  71. """
  72. The results of a raw query contain no annotations
  73. """
  74. self.assertAnnotations(results, ())
  75. def assertAnnotations(self, results, expected_annotations):
  76. """
  77. The passed raw query results contain the expected annotations
  78. """
  79. if expected_annotations:
  80. for index, result in enumerate(results):
  81. annotation, value = expected_annotations[index]
  82. self.assertTrue(hasattr(result, annotation))
  83. self.assertEqual(getattr(result, annotation), value)
  84. def test_rawqueryset_repr(self):
  85. queryset = RawQuerySet(raw_query='SELECT * FROM raw_query_author')
  86. self.assertEqual(repr(queryset), '<RawQuerySet: SELECT * FROM raw_query_author>')
  87. self.assertEqual(repr(queryset.query), '<RawQuery: SELECT * FROM raw_query_author>')
  88. def test_simple_raw_query(self):
  89. """
  90. Basic test of raw query with a simple database query
  91. """
  92. query = "SELECT * FROM raw_query_author"
  93. authors = Author.objects.all()
  94. self.assertSuccessfulRawQuery(Author, query, authors)
  95. def test_raw_query_lazy(self):
  96. """
  97. Raw queries are lazy: they aren't actually executed until they're
  98. iterated over.
  99. """
  100. q = Author.objects.raw('SELECT * FROM raw_query_author')
  101. self.assertIsNone(q.query.cursor)
  102. list(q)
  103. self.assertIsNotNone(q.query.cursor)
  104. def test_FK_raw_query(self):
  105. """
  106. Test of a simple raw query against a model containing a foreign key
  107. """
  108. query = "SELECT * FROM raw_query_book"
  109. books = Book.objects.all()
  110. self.assertSuccessfulRawQuery(Book, query, books)
  111. def test_db_column_handler(self):
  112. """
  113. Test of a simple raw query against a model containing a field with
  114. db_column defined.
  115. """
  116. query = "SELECT * FROM raw_query_coffee"
  117. coffees = Coffee.objects.all()
  118. self.assertSuccessfulRawQuery(Coffee, query, coffees)
  119. def test_pk_with_mixed_case_db_column(self):
  120. """
  121. A raw query with a model that has a pk db_column with mixed case.
  122. """
  123. query = "SELECT * FROM raw_query_mixedcaseidcolumn"
  124. queryset = MixedCaseIDColumn.objects.all()
  125. self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
  126. def test_order_handler(self):
  127. """
  128. Test of raw raw query's tolerance for columns being returned in any
  129. order
  130. """
  131. selects = (
  132. ('dob, last_name, first_name, id'),
  133. ('last_name, dob, first_name, id'),
  134. ('first_name, last_name, dob, id'),
  135. )
  136. for select in selects:
  137. query = "SELECT %s FROM raw_query_author" % select
  138. authors = Author.objects.all()
  139. self.assertSuccessfulRawQuery(Author, query, authors)
  140. def test_translations(self):
  141. """
  142. Test of raw query's optional ability to translate unexpected result
  143. column names to specific model fields
  144. """
  145. query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author"
  146. translations = {'first': 'first_name', 'last': 'last_name'}
  147. authors = Author.objects.all()
  148. self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
  149. def test_params(self):
  150. """
  151. Test passing optional query parameters
  152. """
  153. query = "SELECT * FROM raw_query_author WHERE first_name = %s"
  154. author = Author.objects.all()[2]
  155. params = [author.first_name]
  156. qset = Author.objects.raw(query, params=params)
  157. results = list(qset)
  158. self.assertProcessed(Author, results, [author])
  159. self.assertNoAnnotations(results)
  160. self.assertEqual(len(results), 1)
  161. self.assertIsInstance(repr(qset), str)
  162. @skipUnlessDBFeature('supports_paramstyle_pyformat')
  163. def test_pyformat_params(self):
  164. """
  165. Test passing optional query parameters
  166. """
  167. query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
  168. author = Author.objects.all()[2]
  169. params = {'first': author.first_name}
  170. qset = Author.objects.raw(query, params=params)
  171. results = list(qset)
  172. self.assertProcessed(Author, results, [author])
  173. self.assertNoAnnotations(results)
  174. self.assertEqual(len(results), 1)
  175. self.assertIsInstance(repr(qset), str)
  176. def test_query_representation(self):
  177. """
  178. Test representation of raw query with parameters
  179. """
  180. query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
  181. qset = Author.objects.raw(query, {'last': 'foo'})
  182. self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
  183. self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
  184. query = "SELECT * FROM raw_query_author WHERE last_name = %s"
  185. qset = Author.objects.raw(query, {'foo'})
  186. self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
  187. self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
  188. def test_many_to_many(self):
  189. """
  190. Test of a simple raw query against a model containing a m2m field
  191. """
  192. query = "SELECT * FROM raw_query_reviewer"
  193. reviewers = Reviewer.objects.all()
  194. self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
  195. def test_extra_conversions(self):
  196. """
  197. Test to insure that extra translations are ignored.
  198. """
  199. query = "SELECT * FROM raw_query_author"
  200. translations = {'something': 'else'}
  201. authors = Author.objects.all()
  202. self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
  203. def test_missing_fields(self):
  204. query = "SELECT id, first_name, dob FROM raw_query_author"
  205. for author in Author.objects.raw(query):
  206. self.assertIsNotNone(author.first_name)
  207. # last_name isn't given, but it will be retrieved on demand
  208. self.assertIsNotNone(author.last_name)
  209. def test_missing_fields_without_PK(self):
  210. query = "SELECT first_name, dob FROM raw_query_author"
  211. with self.assertRaisesMessage(InvalidQuery, 'Raw query must include the primary key'):
  212. list(Author.objects.raw(query))
  213. def test_annotations(self):
  214. query = (
  215. "SELECT a.*, count(b.id) as book_count "
  216. "FROM raw_query_author a "
  217. "LEFT JOIN raw_query_book b ON a.id = b.author_id "
  218. "GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
  219. )
  220. expected_annotations = (
  221. ('book_count', 3),
  222. ('book_count', 0),
  223. ('book_count', 1),
  224. ('book_count', 0),
  225. )
  226. authors = Author.objects.all()
  227. self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
  228. def test_white_space_query(self):
  229. query = " SELECT * FROM raw_query_author"
  230. authors = Author.objects.all()
  231. self.assertSuccessfulRawQuery(Author, query, authors)
  232. def test_multiple_iterations(self):
  233. query = "SELECT * FROM raw_query_author"
  234. normal_authors = Author.objects.all()
  235. raw_authors = Author.objects.raw(query)
  236. # First Iteration
  237. first_iterations = 0
  238. for index, raw_author in enumerate(raw_authors):
  239. self.assertEqual(normal_authors[index], raw_author)
  240. first_iterations += 1
  241. # Second Iteration
  242. second_iterations = 0
  243. for index, raw_author in enumerate(raw_authors):
  244. self.assertEqual(normal_authors[index], raw_author)
  245. second_iterations += 1
  246. self.assertEqual(first_iterations, second_iterations)
  247. def test_get_item(self):
  248. # Indexing on RawQuerySets
  249. query = "SELECT * FROM raw_query_author ORDER BY id ASC"
  250. third_author = Author.objects.raw(query)[2]
  251. self.assertEqual(third_author.first_name, 'Bob')
  252. first_two = Author.objects.raw(query)[0:2]
  253. self.assertEqual(len(first_two), 2)
  254. with self.assertRaises(TypeError):
  255. Author.objects.raw(query)['test']
  256. def test_inheritance(self):
  257. f = FriendlyAuthor.objects.create(first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28))
  258. query = "SELECT * FROM raw_query_friendlyauthor"
  259. self.assertEqual(
  260. [o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk]
  261. )
  262. def test_query_count(self):
  263. self.assertNumQueries(1, list, Author.objects.raw("SELECT * FROM raw_query_author"))
  264. def test_subquery_in_raw_sql(self):
  265. list(Book.objects.raw('SELECT id FROM (SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq'))
  266. def test_db_column_name_is_used_in_raw_query(self):
  267. """
  268. Regression test that ensures the `column` attribute on the field is
  269. used to generate the list of fields included in the query, as opposed
  270. to the `attname`. This is important when the primary key is a
  271. ForeignKey field because `attname` and `column` are not necessarily the
  272. same.
  273. """
  274. b = BookFkAsPk.objects.create(book=self.b1)
  275. self.assertEqual(list(BookFkAsPk.objects.raw('SELECT not_the_default FROM raw_query_bookfkaspk')), [b])
  276. def test_decimal_parameter(self):
  277. c = Coffee.objects.create(brand='starbucks', price=20.5)
  278. qs = Coffee.objects.raw("SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)])
  279. self.assertEqual(list(qs), [c])
  280. def test_result_caching(self):
  281. with self.assertNumQueries(1):
  282. books = Book.objects.raw('SELECT * FROM raw_query_book')
  283. list(books)
  284. list(books)
  285. def test_iterator(self):
  286. with self.assertNumQueries(2):
  287. books = Book.objects.raw('SELECT * FROM raw_query_book')
  288. list(books.iterator())
  289. list(books.iterator())
  290. def test_bool(self):
  291. self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book')), True)
  292. self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), False)
  293. def test_len(self):
  294. self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book')), 4)
  295. self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), 0)