tests.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. from __future__ import unicode_literals
  2. import threading
  3. import time
  4. from multiple_database.routers import TestRouter
  5. from django.db import DatabaseError, connection, router, transaction
  6. from django.test import (
  7. TransactionTestCase, override_settings, skipIfDBFeature,
  8. skipUnlessDBFeature,
  9. )
  10. from django.test.utils import CaptureQueriesContext
  11. from .models import Person
  12. class SelectForUpdateTests(TransactionTestCase):
  13. available_apps = ['select_for_update']
  14. def setUp(self):
  15. # This is executed in autocommit mode so that code in
  16. # run_select_for_update can see this data.
  17. self.person = Person.objects.create(name='Reinhardt')
  18. # We need another database connection in transaction to test that one
  19. # connection issuing a SELECT ... FOR UPDATE will block.
  20. self.new_connection = connection.copy()
  21. def tearDown(self):
  22. try:
  23. self.end_blocking_transaction()
  24. except (DatabaseError, AttributeError):
  25. pass
  26. self.new_connection.close()
  27. def start_blocking_transaction(self):
  28. self.new_connection.set_autocommit(False)
  29. # Start a blocking transaction. At some point,
  30. # end_blocking_transaction() should be called.
  31. self.cursor = self.new_connection.cursor()
  32. sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
  33. 'db_table': Person._meta.db_table,
  34. 'for_update': self.new_connection.ops.for_update_sql(),
  35. }
  36. self.cursor.execute(sql, ())
  37. self.cursor.fetchone()
  38. def end_blocking_transaction(self):
  39. # Roll back the blocking transaction.
  40. self.new_connection.rollback()
  41. self.new_connection.set_autocommit(True)
  42. def has_for_update_sql(self, queries, nowait=False):
  43. # Examine the SQL that was executed to determine whether it
  44. # contains the 'SELECT..FOR UPDATE' stanza.
  45. for_update_sql = connection.ops.for_update_sql(nowait)
  46. return any(for_update_sql in query['sql'] for query in queries)
  47. @skipUnlessDBFeature('has_select_for_update')
  48. def test_for_update_sql_generated(self):
  49. """
  50. Test that the backend's FOR UPDATE variant appears in
  51. generated SQL when select_for_update is invoked.
  52. """
  53. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  54. list(Person.objects.all().select_for_update())
  55. self.assertTrue(self.has_for_update_sql(ctx.captured_queries))
  56. @skipUnlessDBFeature('has_select_for_update_nowait')
  57. def test_for_update_sql_generated_nowait(self):
  58. """
  59. Test that the backend's FOR UPDATE NOWAIT variant appears in
  60. generated SQL when select_for_update is invoked.
  61. """
  62. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  63. list(Person.objects.all().select_for_update(nowait=True))
  64. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
  65. @skipUnlessDBFeature('has_select_for_update_nowait')
  66. def test_nowait_raises_error_on_block(self):
  67. """
  68. If nowait is specified, we expect an error to be raised rather
  69. than blocking.
  70. """
  71. self.start_blocking_transaction()
  72. status = []
  73. thread = threading.Thread(
  74. target=self.run_select_for_update,
  75. args=(status,),
  76. kwargs={'nowait': True},
  77. )
  78. thread.start()
  79. time.sleep(1)
  80. thread.join()
  81. self.end_blocking_transaction()
  82. self.assertIsInstance(status[-1], DatabaseError)
  83. @skipIfDBFeature('has_select_for_update_nowait')
  84. @skipUnlessDBFeature('has_select_for_update')
  85. def test_unsupported_nowait_raises_error(self):
  86. """
  87. If a SELECT...FOR UPDATE NOWAIT is run on a database backend
  88. that supports FOR UPDATE but not NOWAIT, then we should find
  89. that a DatabaseError is raised.
  90. """
  91. with self.assertRaises(DatabaseError):
  92. list(Person.objects.all().select_for_update(nowait=True))
  93. @skipUnlessDBFeature('has_select_for_update')
  94. def test_for_update_requires_transaction(self):
  95. """
  96. Test that a TransactionManagementError is raised
  97. when a select_for_update query is executed outside of a transaction.
  98. """
  99. with self.assertRaises(transaction.TransactionManagementError):
  100. list(Person.objects.all().select_for_update())
  101. @skipUnlessDBFeature('has_select_for_update')
  102. def test_for_update_requires_transaction_only_in_execution(self):
  103. """
  104. Test that no TransactionManagementError is raised
  105. when select_for_update is invoked outside of a transaction -
  106. only when the query is executed.
  107. """
  108. people = Person.objects.all().select_for_update()
  109. with self.assertRaises(transaction.TransactionManagementError):
  110. list(people)
  111. def run_select_for_update(self, status, nowait=False):
  112. """
  113. Utility method that runs a SELECT FOR UPDATE against all
  114. Person instances. After the select_for_update, it attempts
  115. to update the name of the only record, save, and commit.
  116. This function expects to run in a separate thread.
  117. """
  118. status.append('started')
  119. try:
  120. # We need to enter transaction management again, as this is done on
  121. # per-thread basis
  122. with transaction.atomic():
  123. people = list(
  124. Person.objects.all().select_for_update(nowait=nowait)
  125. )
  126. people[0].name = 'Fred'
  127. people[0].save()
  128. except DatabaseError as e:
  129. status.append(e)
  130. finally:
  131. # This method is run in a separate thread. It uses its own
  132. # database connection. Close it without waiting for the GC.
  133. connection.close()
  134. @skipUnlessDBFeature('has_select_for_update')
  135. @skipUnlessDBFeature('supports_transactions')
  136. def test_block(self):
  137. """
  138. Check that a thread running a select_for_update that
  139. accesses rows being touched by a similar operation
  140. on another connection blocks correctly.
  141. """
  142. # First, let's start the transaction in our thread.
  143. self.start_blocking_transaction()
  144. # Now, try it again using the ORM's select_for_update
  145. # facility. Do this in a separate thread.
  146. status = []
  147. thread = threading.Thread(
  148. target=self.run_select_for_update, args=(status,)
  149. )
  150. # The thread should immediately block, but we'll sleep
  151. # for a bit to make sure.
  152. thread.start()
  153. sanity_count = 0
  154. while len(status) != 1 and sanity_count < 10:
  155. sanity_count += 1
  156. time.sleep(1)
  157. if sanity_count >= 10:
  158. raise ValueError('Thread did not run and block')
  159. # Check the person hasn't been updated. Since this isn't
  160. # using FOR UPDATE, it won't block.
  161. p = Person.objects.get(pk=self.person.pk)
  162. self.assertEqual('Reinhardt', p.name)
  163. # When we end our blocking transaction, our thread should
  164. # be able to continue.
  165. self.end_blocking_transaction()
  166. thread.join(5.0)
  167. # Check the thread has finished. Assuming it has, we should
  168. # find that it has updated the person's name.
  169. self.assertFalse(thread.isAlive())
  170. # We must commit the transaction to ensure that MySQL gets a fresh read,
  171. # since by default it runs in REPEATABLE READ mode
  172. transaction.commit()
  173. p = Person.objects.get(pk=self.person.pk)
  174. self.assertEqual('Fred', p.name)
  175. @skipUnlessDBFeature('has_select_for_update')
  176. def test_raw_lock_not_available(self):
  177. """
  178. Check that running a raw query which can't obtain a FOR UPDATE lock
  179. raises the correct exception
  180. """
  181. self.start_blocking_transaction()
  182. def raw(status):
  183. try:
  184. list(
  185. Person.objects.raw(
  186. 'SELECT * FROM %s %s' % (
  187. Person._meta.db_table,
  188. connection.ops.for_update_sql(nowait=True)
  189. )
  190. )
  191. )
  192. except DatabaseError as e:
  193. status.append(e)
  194. finally:
  195. # This method is run in a separate thread. It uses its own
  196. # database connection. Close it without waiting for the GC.
  197. connection.close()
  198. status = []
  199. thread = threading.Thread(target=raw, kwargs={'status': status})
  200. thread.start()
  201. time.sleep(1)
  202. thread.join()
  203. self.end_blocking_transaction()
  204. self.assertIsInstance(status[-1], DatabaseError)
  205. @skipUnlessDBFeature('has_select_for_update')
  206. @override_settings(DATABASE_ROUTERS=[TestRouter()])
  207. def test_select_for_update_on_multidb(self):
  208. query = Person.objects.select_for_update()
  209. self.assertEqual(router.db_for_write(Person), query.db)
  210. @skipUnlessDBFeature('has_select_for_update')
  211. def test_select_for_update_with_get(self):
  212. with transaction.atomic():
  213. person = Person.objects.select_for_update().get(name='Reinhardt')
  214. self.assertEqual(person.name, 'Reinhardt')