2
0

tests.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. from unittest import mock
  2. from django.db import connection, transaction
  3. from django.test import TestCase, skipIfDBFeature, skipUnlessDBFeature
  4. from django.utils.deprecation import RemovedInDjango60Warning
  5. from .models import (
  6. Article,
  7. InheritedArticleA,
  8. InheritedArticleB,
  9. NullablePublicationThrough,
  10. NullableTargetArticle,
  11. Publication,
  12. User,
  13. )
  14. class ManyToManyTests(TestCase):
  15. @classmethod
  16. def setUpTestData(cls):
  17. # Create a couple of Publications.
  18. cls.p1 = Publication.objects.create(title="The Python Journal")
  19. cls.p2 = Publication.objects.create(title="Science News")
  20. cls.p3 = Publication.objects.create(title="Science Weekly")
  21. cls.p4 = Publication.objects.create(title="Highlights for Children")
  22. cls.a1 = Article.objects.create(
  23. headline="Django lets you build web apps easily"
  24. )
  25. cls.a1.publications.add(cls.p1)
  26. cls.a2 = Article.objects.create(headline="NASA uses Python")
  27. cls.a2.publications.add(cls.p1, cls.p2, cls.p3, cls.p4)
  28. cls.a3 = Article.objects.create(headline="NASA finds intelligent life on Earth")
  29. cls.a3.publications.add(cls.p2)
  30. cls.a4 = Article.objects.create(headline="Oxygen-free diet works wonders")
  31. cls.a4.publications.add(cls.p2)
  32. def test_add(self):
  33. # Create an Article.
  34. a5 = Article(headline="Django lets you create web apps easily")
  35. # You can't associate it with a Publication until it's been saved.
  36. msg = (
  37. '"<Article: Django lets you create web apps easily>" needs to have '
  38. 'a value for field "id" before this many-to-many relationship can be used.'
  39. )
  40. with self.assertRaisesMessage(ValueError, msg):
  41. getattr(a5, "publications")
  42. # Save it!
  43. a5.save()
  44. # Associate the Article with a Publication.
  45. a5.publications.add(self.p1)
  46. self.assertSequenceEqual(a5.publications.all(), [self.p1])
  47. # Create another Article, and set it to appear in both Publications.
  48. a6 = Article(headline="ESA uses Python")
  49. a6.save()
  50. a6.publications.add(self.p1, self.p2)
  51. a6.publications.add(self.p3)
  52. # Adding a second time is OK
  53. a6.publications.add(self.p3)
  54. self.assertSequenceEqual(
  55. a6.publications.all(),
  56. [self.p2, self.p3, self.p1],
  57. )
  58. # Adding an object of the wrong type raises TypeError
  59. msg = (
  60. "'Publication' instance expected, got <Article: Django lets you create web "
  61. "apps easily>"
  62. )
  63. with self.assertRaisesMessage(TypeError, msg):
  64. with transaction.atomic():
  65. a6.publications.add(a5)
  66. # Add a Publication directly via publications.add by using keyword arguments.
  67. p5 = a6.publications.create(title="Highlights for Adults")
  68. self.assertSequenceEqual(
  69. a6.publications.all(),
  70. [p5, self.p2, self.p3, self.p1],
  71. )
  72. def test_add_remove_set_by_pk(self):
  73. a5 = Article.objects.create(headline="Django lets you create web apps easily")
  74. a5.publications.add(self.p1.pk)
  75. self.assertSequenceEqual(a5.publications.all(), [self.p1])
  76. a5.publications.set([self.p2.pk])
  77. self.assertSequenceEqual(a5.publications.all(), [self.p2])
  78. a5.publications.remove(self.p2.pk)
  79. self.assertSequenceEqual(a5.publications.all(), [])
  80. def test_add_remove_set_by_to_field(self):
  81. user_1 = User.objects.create(username="Jean")
  82. user_2 = User.objects.create(username="Joe")
  83. a5 = Article.objects.create(headline="Django lets you create web apps easily")
  84. a5.authors.add(user_1.username)
  85. self.assertSequenceEqual(a5.authors.all(), [user_1])
  86. a5.authors.set([user_2.username])
  87. self.assertSequenceEqual(a5.authors.all(), [user_2])
  88. a5.authors.remove(user_2.username)
  89. self.assertSequenceEqual(a5.authors.all(), [])
  90. def test_related_manager_refresh(self):
  91. user_1 = User.objects.create(username="Jean")
  92. user_2 = User.objects.create(username="Joe")
  93. self.a3.authors.add(user_1.username)
  94. self.assertSequenceEqual(user_1.article_set.all(), [self.a3])
  95. # Change the username on a different instance of the same user.
  96. user_1_from_db = User.objects.get(pk=user_1.pk)
  97. self.assertSequenceEqual(user_1_from_db.article_set.all(), [self.a3])
  98. user_1_from_db.username = "Paul"
  99. self.a3.authors.set([user_2.username])
  100. user_1_from_db.save()
  101. # Assign a different article.
  102. self.a4.authors.add(user_1_from_db.username)
  103. self.assertSequenceEqual(user_1_from_db.article_set.all(), [self.a4])
  104. # Refresh the instance with an evaluated related manager.
  105. user_1.refresh_from_db()
  106. self.assertEqual(user_1.username, "Paul")
  107. self.assertSequenceEqual(user_1.article_set.all(), [self.a4])
  108. def test_add_remove_invalid_type(self):
  109. msg = "Field 'id' expected a number but got 'invalid'."
  110. for method in ["add", "remove"]:
  111. with self.subTest(method), self.assertRaisesMessage(ValueError, msg):
  112. getattr(self.a1.publications, method)("invalid")
  113. def test_reverse_add(self):
  114. # Adding via the 'other' end of an m2m
  115. a5 = Article(headline="NASA finds intelligent life on Mars")
  116. a5.save()
  117. self.p2.article_set.add(a5)
  118. self.assertSequenceEqual(
  119. self.p2.article_set.all(),
  120. [self.a3, a5, self.a2, self.a4],
  121. )
  122. self.assertSequenceEqual(a5.publications.all(), [self.p2])
  123. # Adding via the other end using keywords
  124. a6 = self.p2.article_set.create(headline="Carbon-free diet works wonders")
  125. self.assertSequenceEqual(
  126. self.p2.article_set.all(),
  127. [a6, self.a3, a5, self.a2, self.a4],
  128. )
  129. a6 = self.p2.article_set.all()[3]
  130. self.assertSequenceEqual(
  131. a6.publications.all(),
  132. [self.p4, self.p2, self.p3, self.p1],
  133. )
  134. @skipUnlessDBFeature("supports_ignore_conflicts")
  135. def test_fast_add_ignore_conflicts(self):
  136. """
  137. A single query is necessary to add auto-created through instances if
  138. the database backend supports bulk_create(ignore_conflicts) and no
  139. m2m_changed signals receivers are connected.
  140. """
  141. with self.assertNumQueries(1):
  142. self.a1.publications.add(self.p1, self.p2)
  143. @skipIfDBFeature("supports_ignore_conflicts")
  144. def test_add_existing_different_type(self):
  145. # A single SELECT query is necessary to compare existing values to the
  146. # provided one; no INSERT should be attempted.
  147. with self.assertNumQueries(1):
  148. self.a1.publications.add(str(self.p1.pk))
  149. self.assertEqual(self.a1.publications.get(), self.p1)
  150. @skipUnlessDBFeature("supports_ignore_conflicts")
  151. def test_slow_add_ignore_conflicts(self):
  152. manager_cls = self.a1.publications.__class__
  153. # Simulate a race condition between the missing ids retrieval and
  154. # the bulk insertion attempt.
  155. missing_target_ids = {self.p1.id}
  156. # Disable fast-add to test the case where the slow add path is taken.
  157. add_plan = (True, False, False)
  158. with mock.patch.object(
  159. manager_cls, "_get_missing_target_ids", return_value=missing_target_ids
  160. ) as mocked:
  161. with mock.patch.object(manager_cls, "_get_add_plan", return_value=add_plan):
  162. self.a1.publications.add(self.p1)
  163. mocked.assert_called_once()
  164. def test_related_sets(self):
  165. # Article objects have access to their related Publication objects.
  166. self.assertSequenceEqual(self.a1.publications.all(), [self.p1])
  167. self.assertSequenceEqual(
  168. self.a2.publications.all(),
  169. [self.p4, self.p2, self.p3, self.p1],
  170. )
  171. # Publication objects have access to their related Article objects.
  172. self.assertSequenceEqual(
  173. self.p2.article_set.all(),
  174. [self.a3, self.a2, self.a4],
  175. )
  176. self.assertSequenceEqual(
  177. self.p1.article_set.all(),
  178. [self.a1, self.a2],
  179. )
  180. self.assertSequenceEqual(
  181. Publication.objects.get(id=self.p4.id).article_set.all(),
  182. [self.a2],
  183. )
  184. def test_selects(self):
  185. # We can perform kwarg queries across m2m relationships
  186. self.assertSequenceEqual(
  187. Article.objects.filter(publications__id__exact=self.p1.id),
  188. [self.a1, self.a2],
  189. )
  190. self.assertSequenceEqual(
  191. Article.objects.filter(publications__pk=self.p1.id),
  192. [self.a1, self.a2],
  193. )
  194. self.assertSequenceEqual(
  195. Article.objects.filter(publications=self.p1.id),
  196. [self.a1, self.a2],
  197. )
  198. self.assertSequenceEqual(
  199. Article.objects.filter(publications=self.p1),
  200. [self.a1, self.a2],
  201. )
  202. self.assertSequenceEqual(
  203. Article.objects.filter(publications__title__startswith="Science"),
  204. [self.a3, self.a2, self.a2, self.a4],
  205. )
  206. self.assertSequenceEqual(
  207. Article.objects.filter(
  208. publications__title__startswith="Science"
  209. ).distinct(),
  210. [self.a3, self.a2, self.a4],
  211. )
  212. # The count() function respects distinct() as well.
  213. self.assertEqual(
  214. Article.objects.filter(publications__title__startswith="Science").count(), 4
  215. )
  216. self.assertEqual(
  217. Article.objects.filter(publications__title__startswith="Science")
  218. .distinct()
  219. .count(),
  220. 3,
  221. )
  222. self.assertSequenceEqual(
  223. Article.objects.filter(
  224. publications__in=[self.p1.id, self.p2.id]
  225. ).distinct(),
  226. [self.a1, self.a3, self.a2, self.a4],
  227. )
  228. self.assertSequenceEqual(
  229. Article.objects.filter(publications__in=[self.p1.id, self.p2]).distinct(),
  230. [self.a1, self.a3, self.a2, self.a4],
  231. )
  232. self.assertSequenceEqual(
  233. Article.objects.filter(publications__in=[self.p1, self.p2]).distinct(),
  234. [self.a1, self.a3, self.a2, self.a4],
  235. )
  236. # Excluding a related item works as you would expect, too (although the SQL
  237. # involved is a little complex).
  238. self.assertSequenceEqual(
  239. Article.objects.exclude(publications=self.p2),
  240. [self.a1],
  241. )
  242. def test_reverse_selects(self):
  243. # Reverse m2m queries are supported (i.e., starting at the table that
  244. # doesn't have a ManyToManyField).
  245. python_journal = [self.p1]
  246. self.assertSequenceEqual(
  247. Publication.objects.filter(id__exact=self.p1.id), python_journal
  248. )
  249. self.assertSequenceEqual(
  250. Publication.objects.filter(pk=self.p1.id), python_journal
  251. )
  252. self.assertSequenceEqual(
  253. Publication.objects.filter(article__headline__startswith="NASA"),
  254. [self.p4, self.p2, self.p2, self.p3, self.p1],
  255. )
  256. self.assertSequenceEqual(
  257. Publication.objects.filter(article__id__exact=self.a1.id), python_journal
  258. )
  259. self.assertSequenceEqual(
  260. Publication.objects.filter(article__pk=self.a1.id), python_journal
  261. )
  262. self.assertSequenceEqual(
  263. Publication.objects.filter(article=self.a1.id), python_journal
  264. )
  265. self.assertSequenceEqual(
  266. Publication.objects.filter(article=self.a1), python_journal
  267. )
  268. self.assertSequenceEqual(
  269. Publication.objects.filter(article__in=[self.a1.id, self.a2.id]).distinct(),
  270. [self.p4, self.p2, self.p3, self.p1],
  271. )
  272. self.assertSequenceEqual(
  273. Publication.objects.filter(article__in=[self.a1.id, self.a2]).distinct(),
  274. [self.p4, self.p2, self.p3, self.p1],
  275. )
  276. self.assertSequenceEqual(
  277. Publication.objects.filter(article__in=[self.a1, self.a2]).distinct(),
  278. [self.p4, self.p2, self.p3, self.p1],
  279. )
  280. def test_delete(self):
  281. # If we delete a Publication, its Articles won't be able to access it.
  282. self.p1.delete()
  283. self.assertSequenceEqual(
  284. Publication.objects.all(),
  285. [self.p4, self.p2, self.p3],
  286. )
  287. self.assertSequenceEqual(self.a1.publications.all(), [])
  288. # If we delete an Article, its Publications won't be able to access it.
  289. self.a2.delete()
  290. self.assertSequenceEqual(
  291. Article.objects.all(),
  292. [self.a1, self.a3, self.a4],
  293. )
  294. self.assertSequenceEqual(
  295. self.p2.article_set.all(),
  296. [self.a3, self.a4],
  297. )
  298. def test_bulk_delete(self):
  299. # Bulk delete some Publications - references to deleted publications should go
  300. Publication.objects.filter(title__startswith="Science").delete()
  301. self.assertSequenceEqual(
  302. Publication.objects.all(),
  303. [self.p4, self.p1],
  304. )
  305. self.assertSequenceEqual(
  306. Article.objects.all(),
  307. [self.a1, self.a3, self.a2, self.a4],
  308. )
  309. self.assertSequenceEqual(
  310. self.a2.publications.all(),
  311. [self.p4, self.p1],
  312. )
  313. # Bulk delete some articles - references to deleted objects should go
  314. q = Article.objects.filter(headline__startswith="Django")
  315. self.assertSequenceEqual(q, [self.a1])
  316. q.delete()
  317. # After the delete, the QuerySet cache needs to be cleared,
  318. # and the referenced objects should be gone
  319. self.assertSequenceEqual(q, [])
  320. self.assertSequenceEqual(self.p1.article_set.all(), [self.a2])
  321. def test_remove(self):
  322. # Removing publication from an article:
  323. self.assertSequenceEqual(
  324. self.p2.article_set.all(),
  325. [self.a3, self.a2, self.a4],
  326. )
  327. self.a4.publications.remove(self.p2)
  328. self.assertSequenceEqual(
  329. self.p2.article_set.all(),
  330. [self.a3, self.a2],
  331. )
  332. self.assertSequenceEqual(self.a4.publications.all(), [])
  333. # And from the other end
  334. self.p2.article_set.remove(self.a3)
  335. self.assertSequenceEqual(self.p2.article_set.all(), [self.a2])
  336. self.assertSequenceEqual(self.a3.publications.all(), [])
  337. def test_set(self):
  338. self.p2.article_set.set([self.a4, self.a3])
  339. self.assertSequenceEqual(
  340. self.p2.article_set.all(),
  341. [self.a3, self.a4],
  342. )
  343. self.assertSequenceEqual(self.a4.publications.all(), [self.p2])
  344. self.a4.publications.set([self.p3.id])
  345. self.assertSequenceEqual(self.p2.article_set.all(), [self.a3])
  346. self.assertSequenceEqual(self.a4.publications.all(), [self.p3])
  347. self.p2.article_set.set([])
  348. self.assertSequenceEqual(self.p2.article_set.all(), [])
  349. self.a4.publications.set([])
  350. self.assertSequenceEqual(self.a4.publications.all(), [])
  351. self.p2.article_set.set([self.a4, self.a3], clear=True)
  352. self.assertSequenceEqual(
  353. self.p2.article_set.all(),
  354. [self.a3, self.a4],
  355. )
  356. self.assertSequenceEqual(self.a4.publications.all(), [self.p2])
  357. self.a4.publications.set([self.p3.id], clear=True)
  358. self.assertSequenceEqual(self.p2.article_set.all(), [self.a3])
  359. self.assertSequenceEqual(self.a4.publications.all(), [self.p3])
  360. self.p2.article_set.set([], clear=True)
  361. self.assertSequenceEqual(self.p2.article_set.all(), [])
  362. self.a4.publications.set([], clear=True)
  363. self.assertSequenceEqual(self.a4.publications.all(), [])
  364. def test_set_existing_different_type(self):
  365. # Existing many-to-many relations remain the same for values provided
  366. # with a different type.
  367. ids = set(
  368. Publication.article_set.through.objects.filter(
  369. article__in=[self.a4, self.a3],
  370. publication=self.p2,
  371. ).values_list("id", flat=True)
  372. )
  373. self.p2.article_set.set([str(self.a4.pk), str(self.a3.pk)])
  374. new_ids = set(
  375. Publication.article_set.through.objects.filter(
  376. publication=self.p2,
  377. ).values_list("id", flat=True)
  378. )
  379. self.assertEqual(ids, new_ids)
  380. def test_assign_forward(self):
  381. msg = (
  382. "Direct assignment to the reverse side of a many-to-many set is "
  383. "prohibited. Use article_set.set() instead."
  384. )
  385. with self.assertRaisesMessage(TypeError, msg):
  386. self.p2.article_set = [self.a4, self.a3]
  387. def test_assign_reverse(self):
  388. msg = (
  389. "Direct assignment to the forward side of a many-to-many "
  390. "set is prohibited. Use publications.set() instead."
  391. )
  392. with self.assertRaisesMessage(TypeError, msg):
  393. self.a1.publications = [self.p1, self.p2]
  394. def test_assign(self):
  395. # Relation sets can be assigned using set().
  396. self.p2.article_set.set([self.a4, self.a3])
  397. self.assertSequenceEqual(
  398. self.p2.article_set.all(),
  399. [self.a3, self.a4],
  400. )
  401. self.assertSequenceEqual(self.a4.publications.all(), [self.p2])
  402. self.a4.publications.set([self.p3.id])
  403. self.assertSequenceEqual(self.p2.article_set.all(), [self.a3])
  404. self.assertSequenceEqual(self.a4.publications.all(), [self.p3])
  405. # An alternate to calling clear() is to set an empty set.
  406. self.p2.article_set.set([])
  407. self.assertSequenceEqual(self.p2.article_set.all(), [])
  408. self.a4.publications.set([])
  409. self.assertSequenceEqual(self.a4.publications.all(), [])
  410. def test_assign_ids(self):
  411. # Relation sets can also be set using primary key values
  412. self.p2.article_set.set([self.a4.id, self.a3.id])
  413. self.assertSequenceEqual(
  414. self.p2.article_set.all(),
  415. [self.a3, self.a4],
  416. )
  417. self.assertSequenceEqual(self.a4.publications.all(), [self.p2])
  418. self.a4.publications.set([self.p3.id])
  419. self.assertSequenceEqual(self.p2.article_set.all(), [self.a3])
  420. self.assertSequenceEqual(self.a4.publications.all(), [self.p3])
  421. def test_forward_assign_with_queryset(self):
  422. # Querysets used in m2m assignments are pre-evaluated so their value
  423. # isn't affected by the clearing operation in ManyRelatedManager.set()
  424. # (#19816).
  425. self.a1.publications.set([self.p1, self.p2])
  426. qs = self.a1.publications.filter(title="The Python Journal")
  427. self.a1.publications.set(qs)
  428. self.assertEqual(1, self.a1.publications.count())
  429. self.assertEqual(1, qs.count())
  430. def test_reverse_assign_with_queryset(self):
  431. # Querysets used in M2M assignments are pre-evaluated so their value
  432. # isn't affected by the clearing operation in ManyRelatedManager.set()
  433. # (#19816).
  434. self.p1.article_set.set([self.a1, self.a2])
  435. qs = self.p1.article_set.filter(
  436. headline="Django lets you build web apps easily"
  437. )
  438. self.p1.article_set.set(qs)
  439. self.assertEqual(1, self.p1.article_set.count())
  440. self.assertEqual(1, qs.count())
  441. def test_clear(self):
  442. # Relation sets can be cleared:
  443. self.p2.article_set.clear()
  444. self.assertSequenceEqual(self.p2.article_set.all(), [])
  445. self.assertSequenceEqual(self.a4.publications.all(), [])
  446. # And you can clear from the other end
  447. self.p2.article_set.add(self.a3, self.a4)
  448. self.assertSequenceEqual(
  449. self.p2.article_set.all(),
  450. [self.a3, self.a4],
  451. )
  452. self.assertSequenceEqual(self.a4.publications.all(), [self.p2])
  453. self.a4.publications.clear()
  454. self.assertSequenceEqual(self.a4.publications.all(), [])
  455. self.assertSequenceEqual(self.p2.article_set.all(), [self.a3])
  456. def test_clear_after_prefetch(self):
  457. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  458. self.assertSequenceEqual(a4.publications.all(), [self.p2])
  459. a4.publications.clear()
  460. self.assertSequenceEqual(a4.publications.all(), [])
  461. def test_remove_after_prefetch(self):
  462. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  463. self.assertSequenceEqual(a4.publications.all(), [self.p2])
  464. a4.publications.remove(self.p2)
  465. self.assertSequenceEqual(a4.publications.all(), [])
  466. def test_add_after_prefetch(self):
  467. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  468. self.assertEqual(a4.publications.count(), 1)
  469. a4.publications.add(self.p1)
  470. self.assertEqual(a4.publications.count(), 2)
  471. def test_create_after_prefetch(self):
  472. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  473. self.assertSequenceEqual(a4.publications.all(), [self.p2])
  474. p5 = a4.publications.create(title="Django beats")
  475. self.assertCountEqual(a4.publications.all(), [self.p2, p5])
  476. def test_set_after_prefetch(self):
  477. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  478. self.assertEqual(a4.publications.count(), 1)
  479. a4.publications.set([self.p2, self.p1])
  480. self.assertEqual(a4.publications.count(), 2)
  481. a4.publications.set([self.p1])
  482. self.assertEqual(a4.publications.count(), 1)
  483. def test_add_then_remove_after_prefetch(self):
  484. a4 = Article.objects.prefetch_related("publications").get(id=self.a4.id)
  485. self.assertEqual(a4.publications.count(), 1)
  486. a4.publications.add(self.p1)
  487. self.assertEqual(a4.publications.count(), 2)
  488. a4.publications.remove(self.p1)
  489. self.assertSequenceEqual(a4.publications.all(), [self.p2])
  490. def test_inherited_models_selects(self):
  491. """
  492. #24156 - Objects from child models where the parent's m2m field uses
  493. related_name='+' should be retrieved correctly.
  494. """
  495. a = InheritedArticleA.objects.create()
  496. b = InheritedArticleB.objects.create()
  497. a.publications.add(self.p1, self.p2)
  498. self.assertSequenceEqual(
  499. a.publications.all(),
  500. [self.p2, self.p1],
  501. )
  502. self.assertSequenceEqual(b.publications.all(), [])
  503. b.publications.add(self.p3)
  504. self.assertSequenceEqual(
  505. a.publications.all(),
  506. [self.p2, self.p1],
  507. )
  508. self.assertSequenceEqual(b.publications.all(), [self.p3])
  509. def test_custom_default_manager_exists_count(self):
  510. a5 = Article.objects.create(headline="deleted")
  511. a5.publications.add(self.p2)
  512. with self.assertNumQueries(2) as ctx:
  513. self.assertEqual(
  514. self.p2.article_set.count(), self.p2.article_set.all().count()
  515. )
  516. self.assertIn("JOIN", ctx.captured_queries[0]["sql"])
  517. with self.assertNumQueries(2) as ctx:
  518. self.assertEqual(
  519. self.p3.article_set.exists(), self.p3.article_set.all().exists()
  520. )
  521. self.assertIn("JOIN", ctx.captured_queries[0]["sql"])
  522. def test_get_prefetch_queryset_warning(self):
  523. articles = Article.objects.all()
  524. msg = (
  525. "get_prefetch_queryset() is deprecated. Use get_prefetch_querysets() "
  526. "instead."
  527. )
  528. with self.assertWarnsMessage(RemovedInDjango60Warning, msg) as ctx:
  529. self.a1.publications.get_prefetch_queryset(articles)
  530. self.assertEqual(ctx.filename, __file__)
  531. def test_get_prefetch_querysets_invalid_querysets_length(self):
  532. articles = Article.objects.all()
  533. msg = (
  534. "querysets argument of get_prefetch_querysets() should have a length of 1."
  535. )
  536. with self.assertRaisesMessage(ValueError, msg):
  537. self.a1.publications.get_prefetch_querysets(
  538. instances=articles,
  539. querysets=[Publication.objects.all(), Publication.objects.all()],
  540. )
  541. class ManyToManyQueryTests(TestCase):
  542. """
  543. SQL is optimized to reference the through table without joining against the
  544. related table when using count() and exists() functions on a queryset for
  545. many to many relations. The optimization applies to the case where there
  546. are no filters.
  547. """
  548. @classmethod
  549. def setUpTestData(cls):
  550. cls.article = Article.objects.create(
  551. headline="Django lets you build Web apps easily"
  552. )
  553. cls.nullable_target_article = NullableTargetArticle.objects.create(
  554. headline="The python is good"
  555. )
  556. NullablePublicationThrough.objects.create(
  557. article=cls.nullable_target_article, publication=None
  558. )
  559. @skipUnlessDBFeature("supports_foreign_keys")
  560. def test_count_join_optimization(self):
  561. with self.assertNumQueries(1) as ctx:
  562. self.article.publications.count()
  563. self.assertNotIn("JOIN", ctx.captured_queries[0]["sql"])
  564. with self.assertNumQueries(1) as ctx:
  565. self.article.publications.count()
  566. self.assertNotIn("JOIN", ctx.captured_queries[0]["sql"])
  567. self.assertEqual(self.nullable_target_article.publications.count(), 0)
  568. def test_count_join_optimization_disabled(self):
  569. with (
  570. mock.patch.object(connection.features, "supports_foreign_keys", False),
  571. self.assertNumQueries(1) as ctx,
  572. ):
  573. self.article.publications.count()
  574. self.assertIn("JOIN", ctx.captured_queries[0]["sql"])
  575. @skipUnlessDBFeature("supports_foreign_keys")
  576. def test_exists_join_optimization(self):
  577. with self.assertNumQueries(1) as ctx:
  578. self.article.publications.exists()
  579. self.assertNotIn("JOIN", ctx.captured_queries[0]["sql"])
  580. self.article.publications.prefetch_related()
  581. with self.assertNumQueries(1) as ctx:
  582. self.article.publications.exists()
  583. self.assertNotIn("JOIN", ctx.captured_queries[0]["sql"])
  584. self.assertIs(self.nullable_target_article.publications.exists(), False)
  585. def test_exists_join_optimization_disabled(self):
  586. with (
  587. mock.patch.object(connection.features, "supports_foreign_keys", False),
  588. self.assertNumQueries(1) as ctx,
  589. ):
  590. self.article.publications.exists()
  591. self.assertIn("JOIN", ctx.captured_queries[0]["sql"])
  592. def test_prefetch_related_no_queries_optimization_disabled(self):
  593. qs = Article.objects.prefetch_related("publications")
  594. article = qs.get()
  595. with self.assertNumQueries(0):
  596. article.publications.count()
  597. with self.assertNumQueries(0):
  598. article.publications.exists()