tests.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. from __future__ import unicode_literals
  2. import time
  3. import unittest
  4. from django.conf import settings
  5. from django.db import transaction, connection, router
  6. from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
  7. from django.test import (TransactionTestCase, skipIfDBFeature,
  8. skipUnlessDBFeature)
  9. from django.test import override_settings
  10. from multiple_database.routers import TestRouter
  11. from .models import Person
  12. # Some tests require threading, which might not be available. So create a
  13. # skip-test decorator for those test functions.
  14. try:
  15. import threading
  16. except ImportError:
  17. threading = None
  18. requires_threading = unittest.skipUnless(threading, 'requires threading')
  19. # We need to set settings.DEBUG to True so we can capture the output SQL
  20. # to examine.
  21. @override_settings(DEBUG=True)
  22. class SelectForUpdateTests(TransactionTestCase):
  23. available_apps = ['select_for_update']
  24. def setUp(self):
  25. # This is executed in autocommit mode so that code in
  26. # run_select_for_update can see this data.
  27. self.person = Person.objects.create(name='Reinhardt')
  28. # We need another database connection in transaction to test that one
  29. # connection issuing a SELECT ... FOR UPDATE will block.
  30. new_connections = ConnectionHandler(settings.DATABASES)
  31. self.new_connection = new_connections[DEFAULT_DB_ALIAS]
  32. def tearDown(self):
  33. try:
  34. self.end_blocking_transaction()
  35. except (DatabaseError, AttributeError):
  36. pass
  37. self.new_connection.close()
  38. def start_blocking_transaction(self):
  39. self.new_connection.set_autocommit(False)
  40. # Start a blocking transaction. At some point,
  41. # end_blocking_transaction() should be called.
  42. self.cursor = self.new_connection.cursor()
  43. sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
  44. 'db_table': Person._meta.db_table,
  45. 'for_update': self.new_connection.ops.for_update_sql(),
  46. }
  47. self.cursor.execute(sql, ())
  48. self.cursor.fetchone()
  49. def end_blocking_transaction(self):
  50. # Roll back the blocking transaction.
  51. self.new_connection.rollback()
  52. self.new_connection.set_autocommit(True)
  53. def has_for_update_sql(self, tested_connection, nowait=False):
  54. # Examine the SQL that was executed to determine whether it
  55. # contains the 'SELECT..FOR UPDATE' stanza.
  56. for_update_sql = tested_connection.ops.for_update_sql(nowait)
  57. sql = tested_connection.queries[-1]['sql']
  58. return bool(sql.find(for_update_sql) > -1)
  59. @skipUnlessDBFeature('has_select_for_update')
  60. def test_for_update_sql_generated(self):
  61. """
  62. Test that the backend's FOR UPDATE variant appears in
  63. generated SQL when select_for_update is invoked.
  64. """
  65. with transaction.atomic():
  66. list(Person.objects.all().select_for_update())
  67. self.assertTrue(self.has_for_update_sql(connection))
  68. @skipUnlessDBFeature('has_select_for_update_nowait')
  69. def test_for_update_sql_generated_nowait(self):
  70. """
  71. Test that the backend's FOR UPDATE NOWAIT variant appears in
  72. generated SQL when select_for_update is invoked.
  73. """
  74. with transaction.atomic():
  75. list(Person.objects.all().select_for_update(nowait=True))
  76. self.assertTrue(self.has_for_update_sql(connection, nowait=True))
  77. @requires_threading
  78. @skipUnlessDBFeature('has_select_for_update_nowait')
  79. def test_nowait_raises_error_on_block(self):
  80. """
  81. If nowait is specified, we expect an error to be raised rather
  82. than blocking.
  83. """
  84. self.start_blocking_transaction()
  85. status = []
  86. thread = threading.Thread(
  87. target=self.run_select_for_update,
  88. args=(status,),
  89. kwargs={'nowait': True},
  90. )
  91. thread.start()
  92. time.sleep(1)
  93. thread.join()
  94. self.end_blocking_transaction()
  95. self.assertIsInstance(status[-1], DatabaseError)
  96. @skipIfDBFeature('has_select_for_update_nowait')
  97. @skipUnlessDBFeature('has_select_for_update')
  98. def test_unsupported_nowait_raises_error(self):
  99. """
  100. If a SELECT...FOR UPDATE NOWAIT is run on a database backend
  101. that supports FOR UPDATE but not NOWAIT, then we should find
  102. that a DatabaseError is raised.
  103. """
  104. self.assertRaises(
  105. DatabaseError,
  106. list,
  107. Person.objects.all().select_for_update(nowait=True)
  108. )
  109. @skipUnlessDBFeature('has_select_for_update')
  110. def test_for_update_requires_transaction(self):
  111. """
  112. Test that a TransactionManagementError is raised
  113. when a select_for_update query is executed outside of a transaction.
  114. """
  115. with self.assertRaises(transaction.TransactionManagementError):
  116. list(Person.objects.all().select_for_update())
  117. @skipUnlessDBFeature('has_select_for_update')
  118. def test_for_update_requires_transaction_only_in_execution(self):
  119. """
  120. Test that no TransactionManagementError is raised
  121. when select_for_update is invoked outside of a transaction -
  122. only when the query is executed.
  123. """
  124. people = Person.objects.all().select_for_update()
  125. with self.assertRaises(transaction.TransactionManagementError):
  126. list(people)
  127. def run_select_for_update(self, status, nowait=False):
  128. """
  129. Utility method that runs a SELECT FOR UPDATE against all
  130. Person instances. After the select_for_update, it attempts
  131. to update the name of the only record, save, and commit.
  132. This function expects to run in a separate thread.
  133. """
  134. status.append('started')
  135. try:
  136. # We need to enter transaction management again, as this is done on
  137. # per-thread basis
  138. with transaction.atomic():
  139. people = list(
  140. Person.objects.all().select_for_update(nowait=nowait)
  141. )
  142. people[0].name = 'Fred'
  143. people[0].save()
  144. except DatabaseError as e:
  145. status.append(e)
  146. finally:
  147. # This method is run in a separate thread. It uses its own
  148. # database connection. Close it without waiting for the GC.
  149. connection.close()
  150. @requires_threading
  151. @skipUnlessDBFeature('has_select_for_update')
  152. @skipUnlessDBFeature('supports_transactions')
  153. def test_block(self):
  154. """
  155. Check that a thread running a select_for_update that
  156. accesses rows being touched by a similar operation
  157. on another connection blocks correctly.
  158. """
  159. # First, let's start the transaction in our thread.
  160. self.start_blocking_transaction()
  161. # Now, try it again using the ORM's select_for_update
  162. # facility. Do this in a separate thread.
  163. status = []
  164. thread = threading.Thread(
  165. target=self.run_select_for_update, args=(status,)
  166. )
  167. # The thread should immediately block, but we'll sleep
  168. # for a bit to make sure.
  169. thread.start()
  170. sanity_count = 0
  171. while len(status) != 1 and sanity_count < 10:
  172. sanity_count += 1
  173. time.sleep(1)
  174. if sanity_count >= 10:
  175. raise ValueError('Thread did not run and block')
  176. # Check the person hasn't been updated. Since this isn't
  177. # using FOR UPDATE, it won't block.
  178. p = Person.objects.get(pk=self.person.pk)
  179. self.assertEqual('Reinhardt', p.name)
  180. # When we end our blocking transaction, our thread should
  181. # be able to continue.
  182. self.end_blocking_transaction()
  183. thread.join(5.0)
  184. # Check the thread has finished. Assuming it has, we should
  185. # find that it has updated the person's name.
  186. self.assertFalse(thread.isAlive())
  187. # We must commit the transaction to ensure that MySQL gets a fresh read,
  188. # since by default it runs in REPEATABLE READ mode
  189. transaction.commit()
  190. p = Person.objects.get(pk=self.person.pk)
  191. self.assertEqual('Fred', p.name)
  192. @requires_threading
  193. @skipUnlessDBFeature('has_select_for_update')
  194. def test_raw_lock_not_available(self):
  195. """
  196. Check that running a raw query which can't obtain a FOR UPDATE lock
  197. raises the correct exception
  198. """
  199. self.start_blocking_transaction()
  200. def raw(status):
  201. try:
  202. list(
  203. Person.objects.raw(
  204. 'SELECT * FROM %s %s' % (
  205. Person._meta.db_table,
  206. connection.ops.for_update_sql(nowait=True)
  207. )
  208. )
  209. )
  210. except DatabaseError as e:
  211. status.append(e)
  212. finally:
  213. # This method is run in a separate thread. It uses its own
  214. # database connection. Close it without waiting for the GC.
  215. connection.close()
  216. status = []
  217. thread = threading.Thread(target=raw, kwargs={'status': status})
  218. thread.start()
  219. time.sleep(1)
  220. thread.join()
  221. self.end_blocking_transaction()
  222. self.assertIsInstance(status[-1], DatabaseError)
  223. @skipUnlessDBFeature('has_select_for_update')
  224. def test_select_for_update_on_multidb(self):
  225. old_routers = router.routers
  226. try:
  227. router.routers = [TestRouter()]
  228. query = Person.objects.select_for_update()
  229. self.assertEqual(router.db_for_write(Person), query.db)
  230. finally:
  231. router.routers = old_routers
  232. @skipUnlessDBFeature('has_select_for_update')
  233. def test_select_for_update_with_get(self):
  234. with transaction.atomic():
  235. person = Person.objects.select_for_update().get(name='Reinhardt')
  236. self.assertEqual(person.name, 'Reinhardt')