tests.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. from datetime import date
  2. from decimal import Decimal
  3. from django.core.exceptions import FieldDoesNotExist
  4. from django.db.models.query import RawQuerySet
  5. from django.test import TestCase, skipUnlessDBFeature
  6. from .models import (
  7. Author, Book, BookFkAsPk, Coffee, FriendlyAuthor, MixedCaseIDColumn,
  8. Reviewer,
  9. )
  10. class RawQueryTests(TestCase):
  11. @classmethod
  12. def setUpTestData(cls):
  13. cls.a1 = Author.objects.create(first_name='Joe', last_name='Smith', dob=date(1950, 9, 20))
  14. cls.a2 = Author.objects.create(first_name='Jill', last_name='Doe', dob=date(1920, 4, 2))
  15. cls.a3 = Author.objects.create(first_name='Bob', last_name='Smith', dob=date(1986, 1, 25))
  16. cls.a4 = Author.objects.create(first_name='Bill', last_name='Jones', dob=date(1932, 5, 10))
  17. cls.b1 = Book.objects.create(
  18. title='The awesome book', author=cls.a1, paperback=False,
  19. opening_line='It was a bright cold day in April and the clocks were striking thirteen.',
  20. )
  21. cls.b2 = Book.objects.create(
  22. title='The horrible book', author=cls.a1, paperback=True,
  23. opening_line=(
  24. 'On an evening in the latter part of May a middle-aged man '
  25. 'was walking homeward from Shaston to the village of Marlott, '
  26. 'in the adjoining Vale of Blakemore, or Blackmoor.'
  27. ),
  28. )
  29. cls.b3 = Book.objects.create(
  30. title='Another awesome book', author=cls.a1, paperback=False,
  31. opening_line='A squat grey building of only thirty-four stories.',
  32. )
  33. cls.b4 = Book.objects.create(
  34. title='Some other book', author=cls.a3, paperback=True,
  35. opening_line='It was the day my grandmother exploded.',
  36. )
  37. cls.c1 = Coffee.objects.create(brand='dunkin doughnuts')
  38. cls.c2 = Coffee.objects.create(brand='starbucks')
  39. cls.r1 = Reviewer.objects.create()
  40. cls.r2 = Reviewer.objects.create()
  41. cls.r1.reviewed.add(cls.b2, cls.b3, cls.b4)
  42. def assertSuccessfulRawQuery(self, model, query, expected_results,
  43. expected_annotations=(), params=[], translations=None):
  44. """
  45. Execute the passed query against the passed model and check the output
  46. """
  47. results = list(model.objects.raw(query, params=params, translations=translations))
  48. self.assertProcessed(model, results, expected_results, expected_annotations)
  49. self.assertAnnotations(results, expected_annotations)
  50. def assertProcessed(self, model, results, orig, expected_annotations=()):
  51. """
  52. Compare the results of a raw query against expected results
  53. """
  54. self.assertEqual(len(results), len(orig))
  55. for index, item in enumerate(results):
  56. orig_item = orig[index]
  57. for annotation in expected_annotations:
  58. setattr(orig_item, *annotation)
  59. for field in model._meta.fields:
  60. # All values on the model are equal
  61. self.assertEqual(
  62. getattr(item, field.attname),
  63. getattr(orig_item, field.attname)
  64. )
  65. # This includes checking that they are the same type
  66. self.assertEqual(
  67. type(getattr(item, field.attname)),
  68. type(getattr(orig_item, field.attname))
  69. )
  70. def assertNoAnnotations(self, results):
  71. """
  72. The results of a raw query contain no annotations
  73. """
  74. self.assertAnnotations(results, ())
  75. def assertAnnotations(self, results, expected_annotations):
  76. """
  77. The passed raw query results contain the expected annotations
  78. """
  79. if expected_annotations:
  80. for index, result in enumerate(results):
  81. annotation, value = expected_annotations[index]
  82. self.assertTrue(hasattr(result, annotation))
  83. self.assertEqual(getattr(result, annotation), value)
  84. def test_rawqueryset_repr(self):
  85. queryset = RawQuerySet(raw_query='SELECT * FROM raw_query_author')
  86. self.assertEqual(repr(queryset), '<RawQuerySet: SELECT * FROM raw_query_author>')
  87. self.assertEqual(repr(queryset.query), '<RawQuery: SELECT * FROM raw_query_author>')
  88. def test_simple_raw_query(self):
  89. """
  90. Basic test of raw query with a simple database query
  91. """
  92. query = "SELECT * FROM raw_query_author"
  93. authors = Author.objects.all()
  94. self.assertSuccessfulRawQuery(Author, query, authors)
  95. def test_raw_query_lazy(self):
  96. """
  97. Raw queries are lazy: they aren't actually executed until they're
  98. iterated over.
  99. """
  100. q = Author.objects.raw('SELECT * FROM raw_query_author')
  101. self.assertIsNone(q.query.cursor)
  102. list(q)
  103. self.assertIsNotNone(q.query.cursor)
  104. def test_FK_raw_query(self):
  105. """
  106. Test of a simple raw query against a model containing a foreign key
  107. """
  108. query = "SELECT * FROM raw_query_book"
  109. books = Book.objects.all()
  110. self.assertSuccessfulRawQuery(Book, query, books)
  111. def test_db_column_handler(self):
  112. """
  113. Test of a simple raw query against a model containing a field with
  114. db_column defined.
  115. """
  116. query = "SELECT * FROM raw_query_coffee"
  117. coffees = Coffee.objects.all()
  118. self.assertSuccessfulRawQuery(Coffee, query, coffees)
  119. def test_pk_with_mixed_case_db_column(self):
  120. """
  121. A raw query with a model that has a pk db_column with mixed case.
  122. """
  123. query = "SELECT * FROM raw_query_mixedcaseidcolumn"
  124. queryset = MixedCaseIDColumn.objects.all()
  125. self.assertSuccessfulRawQuery(MixedCaseIDColumn, query, queryset)
  126. def test_order_handler(self):
  127. """
  128. Test of raw raw query's tolerance for columns being returned in any
  129. order
  130. """
  131. selects = (
  132. ('dob, last_name, first_name, id'),
  133. ('last_name, dob, first_name, id'),
  134. ('first_name, last_name, dob, id'),
  135. )
  136. for select in selects:
  137. query = "SELECT %s FROM raw_query_author" % select
  138. authors = Author.objects.all()
  139. self.assertSuccessfulRawQuery(Author, query, authors)
  140. def test_translations(self):
  141. """
  142. Test of raw query's optional ability to translate unexpected result
  143. column names to specific model fields
  144. """
  145. query = "SELECT first_name AS first, last_name AS last, dob, id FROM raw_query_author"
  146. translations = {'first': 'first_name', 'last': 'last_name'}
  147. authors = Author.objects.all()
  148. self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
  149. def test_params(self):
  150. """
  151. Test passing optional query parameters
  152. """
  153. query = "SELECT * FROM raw_query_author WHERE first_name = %s"
  154. author = Author.objects.all()[2]
  155. params = [author.first_name]
  156. qset = Author.objects.raw(query, params=params)
  157. results = list(qset)
  158. self.assertProcessed(Author, results, [author])
  159. self.assertNoAnnotations(results)
  160. self.assertEqual(len(results), 1)
  161. self.assertIsInstance(repr(qset), str)
  162. def test_params_none(self):
  163. query = "SELECT * FROM raw_query_author WHERE first_name like 'J%'"
  164. qset = Author.objects.raw(query, params=None)
  165. self.assertEqual(len(qset), 2)
  166. def test_escaped_percent(self):
  167. query = "SELECT * FROM raw_query_author WHERE first_name like 'J%%'"
  168. qset = Author.objects.raw(query)
  169. self.assertEqual(len(qset), 2)
  170. @skipUnlessDBFeature('supports_paramstyle_pyformat')
  171. def test_pyformat_params(self):
  172. """
  173. Test passing optional query parameters
  174. """
  175. query = "SELECT * FROM raw_query_author WHERE first_name = %(first)s"
  176. author = Author.objects.all()[2]
  177. params = {'first': author.first_name}
  178. qset = Author.objects.raw(query, params=params)
  179. results = list(qset)
  180. self.assertProcessed(Author, results, [author])
  181. self.assertNoAnnotations(results)
  182. self.assertEqual(len(results), 1)
  183. self.assertIsInstance(repr(qset), str)
  184. def test_query_representation(self):
  185. """
  186. Test representation of raw query with parameters
  187. """
  188. query = "SELECT * FROM raw_query_author WHERE last_name = %(last)s"
  189. qset = Author.objects.raw(query, {'last': 'foo'})
  190. self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
  191. self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
  192. query = "SELECT * FROM raw_query_author WHERE last_name = %s"
  193. qset = Author.objects.raw(query, {'foo'})
  194. self.assertEqual(repr(qset), "<RawQuerySet: SELECT * FROM raw_query_author WHERE last_name = foo>")
  195. self.assertEqual(repr(qset.query), "<RawQuery: SELECT * FROM raw_query_author WHERE last_name = foo>")
  196. def test_many_to_many(self):
  197. """
  198. Test of a simple raw query against a model containing a m2m field
  199. """
  200. query = "SELECT * FROM raw_query_reviewer"
  201. reviewers = Reviewer.objects.all()
  202. self.assertSuccessfulRawQuery(Reviewer, query, reviewers)
  203. def test_extra_conversions(self):
  204. """
  205. Test to insure that extra translations are ignored.
  206. """
  207. query = "SELECT * FROM raw_query_author"
  208. translations = {'something': 'else'}
  209. authors = Author.objects.all()
  210. self.assertSuccessfulRawQuery(Author, query, authors, translations=translations)
  211. def test_missing_fields(self):
  212. query = "SELECT id, first_name, dob FROM raw_query_author"
  213. for author in Author.objects.raw(query):
  214. self.assertIsNotNone(author.first_name)
  215. # last_name isn't given, but it will be retrieved on demand
  216. self.assertIsNotNone(author.last_name)
  217. def test_missing_fields_without_PK(self):
  218. query = "SELECT first_name, dob FROM raw_query_author"
  219. msg = 'Raw query must include the primary key'
  220. with self.assertRaisesMessage(FieldDoesNotExist, msg):
  221. list(Author.objects.raw(query))
  222. def test_annotations(self):
  223. query = (
  224. "SELECT a.*, count(b.id) as book_count "
  225. "FROM raw_query_author a "
  226. "LEFT JOIN raw_query_book b ON a.id = b.author_id "
  227. "GROUP BY a.id, a.first_name, a.last_name, a.dob ORDER BY a.id"
  228. )
  229. expected_annotations = (
  230. ('book_count', 3),
  231. ('book_count', 0),
  232. ('book_count', 1),
  233. ('book_count', 0),
  234. )
  235. authors = Author.objects.all()
  236. self.assertSuccessfulRawQuery(Author, query, authors, expected_annotations)
  237. def test_white_space_query(self):
  238. query = " SELECT * FROM raw_query_author"
  239. authors = Author.objects.all()
  240. self.assertSuccessfulRawQuery(Author, query, authors)
  241. def test_multiple_iterations(self):
  242. query = "SELECT * FROM raw_query_author"
  243. normal_authors = Author.objects.all()
  244. raw_authors = Author.objects.raw(query)
  245. # First Iteration
  246. first_iterations = 0
  247. for index, raw_author in enumerate(raw_authors):
  248. self.assertEqual(normal_authors[index], raw_author)
  249. first_iterations += 1
  250. # Second Iteration
  251. second_iterations = 0
  252. for index, raw_author in enumerate(raw_authors):
  253. self.assertEqual(normal_authors[index], raw_author)
  254. second_iterations += 1
  255. self.assertEqual(first_iterations, second_iterations)
  256. def test_get_item(self):
  257. # Indexing on RawQuerySets
  258. query = "SELECT * FROM raw_query_author ORDER BY id ASC"
  259. third_author = Author.objects.raw(query)[2]
  260. self.assertEqual(third_author.first_name, 'Bob')
  261. first_two = Author.objects.raw(query)[0:2]
  262. self.assertEqual(len(first_two), 2)
  263. with self.assertRaises(TypeError):
  264. Author.objects.raw(query)['test']
  265. def test_inheritance(self):
  266. f = FriendlyAuthor.objects.create(first_name="Wesley", last_name="Chun", dob=date(1962, 10, 28))
  267. query = "SELECT * FROM raw_query_friendlyauthor"
  268. self.assertEqual(
  269. [o.pk for o in FriendlyAuthor.objects.raw(query)], [f.pk]
  270. )
  271. def test_query_count(self):
  272. self.assertNumQueries(1, list, Author.objects.raw("SELECT * FROM raw_query_author"))
  273. def test_subquery_in_raw_sql(self):
  274. list(Book.objects.raw('SELECT id FROM (SELECT * FROM raw_query_book WHERE paperback IS NOT NULL) sq'))
  275. def test_db_column_name_is_used_in_raw_query(self):
  276. """
  277. Regression test that ensures the `column` attribute on the field is
  278. used to generate the list of fields included in the query, as opposed
  279. to the `attname`. This is important when the primary key is a
  280. ForeignKey field because `attname` and `column` are not necessarily the
  281. same.
  282. """
  283. b = BookFkAsPk.objects.create(book=self.b1)
  284. self.assertEqual(list(BookFkAsPk.objects.raw('SELECT not_the_default FROM raw_query_bookfkaspk')), [b])
  285. def test_decimal_parameter(self):
  286. c = Coffee.objects.create(brand='starbucks', price=20.5)
  287. qs = Coffee.objects.raw("SELECT * FROM raw_query_coffee WHERE price >= %s", params=[Decimal(20)])
  288. self.assertEqual(list(qs), [c])
  289. def test_result_caching(self):
  290. with self.assertNumQueries(1):
  291. books = Book.objects.raw('SELECT * FROM raw_query_book')
  292. list(books)
  293. list(books)
  294. def test_iterator(self):
  295. with self.assertNumQueries(2):
  296. books = Book.objects.raw('SELECT * FROM raw_query_book')
  297. list(books.iterator())
  298. list(books.iterator())
  299. def test_bool(self):
  300. self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book')), True)
  301. self.assertIs(bool(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), False)
  302. def test_len(self):
  303. self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book')), 4)
  304. self.assertEqual(len(Book.objects.raw('SELECT * FROM raw_query_book WHERE id = 0')), 0)