tests.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. import threading
  2. import time
  3. from unittest import mock
  4. from multiple_database.routers import TestRouter
  5. from django.db import (
  6. DatabaseError, connection, connections, router, transaction,
  7. )
  8. from django.test import (
  9. TransactionTestCase, override_settings, skipIfDBFeature,
  10. skipUnlessDBFeature,
  11. )
  12. from django.test.utils import CaptureQueriesContext
  13. from .models import Person
  14. class SelectForUpdateTests(TransactionTestCase):
  15. available_apps = ['select_for_update']
  16. def setUp(self):
  17. # This is executed in autocommit mode so that code in
  18. # run_select_for_update can see this data.
  19. self.person = Person.objects.create(name='Reinhardt')
  20. # We need another database connection in transaction to test that one
  21. # connection issuing a SELECT ... FOR UPDATE will block.
  22. self.new_connection = connection.copy()
  23. def tearDown(self):
  24. try:
  25. self.end_blocking_transaction()
  26. except (DatabaseError, AttributeError):
  27. pass
  28. self.new_connection.close()
  29. def start_blocking_transaction(self):
  30. self.new_connection.set_autocommit(False)
  31. # Start a blocking transaction. At some point,
  32. # end_blocking_transaction() should be called.
  33. self.cursor = self.new_connection.cursor()
  34. sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
  35. 'db_table': Person._meta.db_table,
  36. 'for_update': self.new_connection.ops.for_update_sql(),
  37. }
  38. self.cursor.execute(sql, ())
  39. self.cursor.fetchone()
  40. def end_blocking_transaction(self):
  41. # Roll back the blocking transaction.
  42. self.new_connection.rollback()
  43. self.new_connection.set_autocommit(True)
  44. def has_for_update_sql(self, queries, **kwargs):
  45. # Examine the SQL that was executed to determine whether it
  46. # contains the 'SELECT..FOR UPDATE' stanza.
  47. for_update_sql = connection.ops.for_update_sql(**kwargs)
  48. return any(for_update_sql in query['sql'] for query in queries)
  49. @skipUnlessDBFeature('has_select_for_update')
  50. def test_for_update_sql_generated(self):
  51. """
  52. The backend's FOR UPDATE variant appears in
  53. generated SQL when select_for_update is invoked.
  54. """
  55. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  56. list(Person.objects.all().select_for_update())
  57. self.assertTrue(self.has_for_update_sql(ctx.captured_queries))
  58. @skipUnlessDBFeature('has_select_for_update_nowait')
  59. def test_for_update_sql_generated_nowait(self):
  60. """
  61. The backend's FOR UPDATE NOWAIT variant appears in
  62. generated SQL when select_for_update is invoked.
  63. """
  64. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  65. list(Person.objects.all().select_for_update(nowait=True))
  66. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
  67. @skipUnlessDBFeature('has_select_for_update_skip_locked')
  68. def test_for_update_sql_generated_skip_locked(self):
  69. """
  70. The backend's FOR UPDATE SKIP LOCKED variant appears in
  71. generated SQL when select_for_update is invoked.
  72. """
  73. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  74. list(Person.objects.all().select_for_update(skip_locked=True))
  75. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
  76. @skipUnlessDBFeature('has_select_for_update_nowait')
  77. def test_nowait_raises_error_on_block(self):
  78. """
  79. If nowait is specified, we expect an error to be raised rather
  80. than blocking.
  81. """
  82. self.start_blocking_transaction()
  83. status = []
  84. thread = threading.Thread(
  85. target=self.run_select_for_update,
  86. args=(status,),
  87. kwargs={'nowait': True},
  88. )
  89. thread.start()
  90. time.sleep(1)
  91. thread.join()
  92. self.end_blocking_transaction()
  93. self.assertIsInstance(status[-1], DatabaseError)
  94. @skipUnlessDBFeature('has_select_for_update_skip_locked')
  95. def test_skip_locked_skips_locked_rows(self):
  96. """
  97. If skip_locked is specified, the locked row is skipped resulting in
  98. Person.DoesNotExist.
  99. """
  100. self.start_blocking_transaction()
  101. status = []
  102. thread = threading.Thread(
  103. target=self.run_select_for_update,
  104. args=(status,),
  105. kwargs={'skip_locked': True},
  106. )
  107. thread.start()
  108. time.sleep(1)
  109. thread.join()
  110. self.end_blocking_transaction()
  111. self.assertIsInstance(status[-1], Person.DoesNotExist)
  112. @skipIfDBFeature('has_select_for_update_nowait')
  113. @skipUnlessDBFeature('has_select_for_update')
  114. def test_unsupported_nowait_raises_error(self):
  115. """
  116. DatabaseError is raised if a SELECT...FOR UPDATE NOWAIT is run on
  117. a database backend that supports FOR UPDATE but not NOWAIT.
  118. """
  119. with self.assertRaisesMessage(DatabaseError, 'NOWAIT is not supported on this database backend.'):
  120. with transaction.atomic():
  121. Person.objects.select_for_update(nowait=True).get()
  122. @skipIfDBFeature('has_select_for_update_skip_locked')
  123. @skipUnlessDBFeature('has_select_for_update')
  124. def test_unsupported_skip_locked_raises_error(self):
  125. """
  126. DatabaseError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run on
  127. a database backend that supports FOR UPDATE but not SKIP LOCKED.
  128. """
  129. with self.assertRaisesMessage(DatabaseError, 'SKIP LOCKED is not supported on this database backend.'):
  130. with transaction.atomic():
  131. Person.objects.select_for_update(skip_locked=True).get()
  132. @skipUnlessDBFeature('has_select_for_update')
  133. def test_for_update_after_from(self):
  134. features_class = connections['default'].features.__class__
  135. attribute_to_patch = "%s.%s.for_update_after_from" % (features_class.__module__, features_class.__name__)
  136. with mock.patch(attribute_to_patch, return_value=True):
  137. with transaction.atomic():
  138. self.assertIn('FOR UPDATE WHERE', str(Person.objects.filter(name='foo').select_for_update().query))
  139. @skipUnlessDBFeature('has_select_for_update')
  140. def test_for_update_requires_transaction(self):
  141. """
  142. A TransactionManagementError is raised
  143. when a select_for_update query is executed outside of a transaction.
  144. """
  145. with self.assertRaises(transaction.TransactionManagementError):
  146. list(Person.objects.all().select_for_update())
  147. @skipUnlessDBFeature('has_select_for_update')
  148. def test_for_update_requires_transaction_only_in_execution(self):
  149. """
  150. No TransactionManagementError is raised
  151. when select_for_update is invoked outside of a transaction -
  152. only when the query is executed.
  153. """
  154. people = Person.objects.all().select_for_update()
  155. with self.assertRaises(transaction.TransactionManagementError):
  156. list(people)
  157. def run_select_for_update(self, status, **kwargs):
  158. """
  159. Utility method that runs a SELECT FOR UPDATE against all
  160. Person instances. After the select_for_update, it attempts
  161. to update the name of the only record, save, and commit.
  162. This function expects to run in a separate thread.
  163. """
  164. status.append('started')
  165. try:
  166. # We need to enter transaction management again, as this is done on
  167. # per-thread basis
  168. with transaction.atomic():
  169. person = Person.objects.select_for_update(**kwargs).get()
  170. person.name = 'Fred'
  171. person.save()
  172. except (DatabaseError, Person.DoesNotExist) as e:
  173. status.append(e)
  174. finally:
  175. # This method is run in a separate thread. It uses its own
  176. # database connection. Close it without waiting for the GC.
  177. connection.close()
  178. @skipUnlessDBFeature('has_select_for_update')
  179. @skipUnlessDBFeature('supports_transactions')
  180. def test_block(self):
  181. """
  182. A thread running a select_for_update that accesses rows being touched
  183. by a similar operation on another connection blocks correctly.
  184. """
  185. # First, let's start the transaction in our thread.
  186. self.start_blocking_transaction()
  187. # Now, try it again using the ORM's select_for_update
  188. # facility. Do this in a separate thread.
  189. status = []
  190. thread = threading.Thread(
  191. target=self.run_select_for_update, args=(status,)
  192. )
  193. # The thread should immediately block, but we'll sleep
  194. # for a bit to make sure.
  195. thread.start()
  196. sanity_count = 0
  197. while len(status) != 1 and sanity_count < 10:
  198. sanity_count += 1
  199. time.sleep(1)
  200. if sanity_count >= 10:
  201. raise ValueError('Thread did not run and block')
  202. # Check the person hasn't been updated. Since this isn't
  203. # using FOR UPDATE, it won't block.
  204. p = Person.objects.get(pk=self.person.pk)
  205. self.assertEqual('Reinhardt', p.name)
  206. # When we end our blocking transaction, our thread should
  207. # be able to continue.
  208. self.end_blocking_transaction()
  209. thread.join(5.0)
  210. # Check the thread has finished. Assuming it has, we should
  211. # find that it has updated the person's name.
  212. self.assertFalse(thread.isAlive())
  213. # We must commit the transaction to ensure that MySQL gets a fresh read,
  214. # since by default it runs in REPEATABLE READ mode
  215. transaction.commit()
  216. p = Person.objects.get(pk=self.person.pk)
  217. self.assertEqual('Fred', p.name)
  218. @skipUnlessDBFeature('has_select_for_update')
  219. def test_raw_lock_not_available(self):
  220. """
  221. Running a raw query which can't obtain a FOR UPDATE lock raises
  222. the correct exception
  223. """
  224. self.start_blocking_transaction()
  225. def raw(status):
  226. try:
  227. list(
  228. Person.objects.raw(
  229. 'SELECT * FROM %s %s' % (
  230. Person._meta.db_table,
  231. connection.ops.for_update_sql(nowait=True)
  232. )
  233. )
  234. )
  235. except DatabaseError as e:
  236. status.append(e)
  237. finally:
  238. # This method is run in a separate thread. It uses its own
  239. # database connection. Close it without waiting for the GC.
  240. connection.close()
  241. status = []
  242. thread = threading.Thread(target=raw, kwargs={'status': status})
  243. thread.start()
  244. time.sleep(1)
  245. thread.join()
  246. self.end_blocking_transaction()
  247. self.assertIsInstance(status[-1], DatabaseError)
  248. @skipUnlessDBFeature('has_select_for_update')
  249. @override_settings(DATABASE_ROUTERS=[TestRouter()])
  250. def test_select_for_update_on_multidb(self):
  251. query = Person.objects.select_for_update()
  252. self.assertEqual(router.db_for_write(Person), query.db)
  253. @skipUnlessDBFeature('has_select_for_update')
  254. def test_select_for_update_with_get(self):
  255. with transaction.atomic():
  256. person = Person.objects.select_for_update().get(name='Reinhardt')
  257. self.assertEqual(person.name, 'Reinhardt')
  258. def test_nowait_and_skip_locked(self):
  259. with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'):
  260. Person.objects.select_for_update(nowait=True, skip_locked=True)
  261. def test_ordered_select_for_update(self):
  262. """
  263. Subqueries should respect ordering as an ORDER BY clause may be useful
  264. to specify a row locking order to prevent deadlocks (#27193).
  265. """
  266. with transaction.atomic():
  267. qs = Person.objects.filter(id__in=Person.objects.order_by('-id').select_for_update())
  268. self.assertIn('ORDER BY', str(qs.query))