tests.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. import threading
  2. import time
  3. from unittest import mock
  4. from multiple_database.routers import TestRouter
  5. from django.core.exceptions import FieldError
  6. from django.db import (
  7. DatabaseError,
  8. NotSupportedError,
  9. connection,
  10. connections,
  11. router,
  12. transaction,
  13. )
  14. from django.test import (
  15. TransactionTestCase,
  16. override_settings,
  17. skipIfDBFeature,
  18. skipUnlessDBFeature,
  19. )
  20. from django.test.utils import CaptureQueriesContext
  21. from .models import (
  22. City,
  23. CityCountryProxy,
  24. Country,
  25. EUCity,
  26. EUCountry,
  27. Person,
  28. PersonProfile,
  29. )
  30. class SelectForUpdateTests(TransactionTestCase):
  31. available_apps = ["select_for_update"]
  32. def setUp(self):
  33. # This is executed in autocommit mode so that code in
  34. # run_select_for_update can see this data.
  35. self.country1 = Country.objects.create(name="Belgium")
  36. self.country2 = Country.objects.create(name="France")
  37. self.city1 = City.objects.create(name="Liberchies", country=self.country1)
  38. self.city2 = City.objects.create(name="Samois-sur-Seine", country=self.country2)
  39. self.person = Person.objects.create(
  40. name="Reinhardt", born=self.city1, died=self.city2
  41. )
  42. self.person_profile = PersonProfile.objects.create(person=self.person)
  43. # We need another database connection in transaction to test that one
  44. # connection issuing a SELECT ... FOR UPDATE will block.
  45. self.new_connection = connection.copy()
  46. def tearDown(self):
  47. try:
  48. self.end_blocking_transaction()
  49. except (DatabaseError, AttributeError):
  50. pass
  51. self.new_connection.close()
  52. def start_blocking_transaction(self):
  53. self.new_connection.set_autocommit(False)
  54. # Start a blocking transaction. At some point,
  55. # end_blocking_transaction() should be called.
  56. self.cursor = self.new_connection.cursor()
  57. sql = "SELECT * FROM %(db_table)s %(for_update)s;" % {
  58. "db_table": Person._meta.db_table,
  59. "for_update": self.new_connection.ops.for_update_sql(),
  60. }
  61. self.cursor.execute(sql, ())
  62. self.cursor.fetchone()
  63. def end_blocking_transaction(self):
  64. # Roll back the blocking transaction.
  65. self.cursor.close()
  66. self.new_connection.rollback()
  67. self.new_connection.set_autocommit(True)
  68. def has_for_update_sql(self, queries, **kwargs):
  69. # Examine the SQL that was executed to determine whether it
  70. # contains the 'SELECT..FOR UPDATE' stanza.
  71. for_update_sql = connection.ops.for_update_sql(**kwargs)
  72. return any(for_update_sql in query["sql"] for query in queries)
  73. @skipUnlessDBFeature("has_select_for_update")
  74. def test_for_update_sql_generated(self):
  75. """
  76. The backend's FOR UPDATE variant appears in
  77. generated SQL when select_for_update is invoked.
  78. """
  79. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  80. list(Person.objects.select_for_update())
  81. self.assertTrue(self.has_for_update_sql(ctx.captured_queries))
  82. @skipUnlessDBFeature("has_select_for_update_nowait")
  83. def test_for_update_sql_generated_nowait(self):
  84. """
  85. The backend's FOR UPDATE NOWAIT variant appears in
  86. generated SQL when select_for_update is invoked.
  87. """
  88. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  89. list(Person.objects.select_for_update(nowait=True))
  90. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
  91. @skipUnlessDBFeature("has_select_for_update_skip_locked")
  92. def test_for_update_sql_generated_skip_locked(self):
  93. """
  94. The backend's FOR UPDATE SKIP LOCKED variant appears in
  95. generated SQL when select_for_update is invoked.
  96. """
  97. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  98. list(Person.objects.select_for_update(skip_locked=True))
  99. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
  100. @skipUnlessDBFeature("has_select_for_no_key_update")
  101. def test_update_sql_generated_no_key(self):
  102. """
  103. The backend's FOR NO KEY UPDATE variant appears in generated SQL when
  104. select_for_update() is invoked.
  105. """
  106. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  107. list(Person.objects.select_for_update(no_key=True))
  108. self.assertIs(self.has_for_update_sql(ctx.captured_queries, no_key=True), True)
  109. @skipUnlessDBFeature("has_select_for_update_of")
  110. def test_for_update_sql_generated_of(self):
  111. """
  112. The backend's FOR UPDATE OF variant appears in the generated SQL when
  113. select_for_update() is invoked.
  114. """
  115. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  116. list(
  117. Person.objects.select_related(
  118. "born__country",
  119. )
  120. .select_for_update(
  121. of=("born__country",),
  122. )
  123. .select_for_update(of=("self", "born__country"))
  124. )
  125. features = connections["default"].features
  126. if features.select_for_update_of_column:
  127. expected = [
  128. 'select_for_update_person"."id',
  129. 'select_for_update_country"."entity_ptr_id',
  130. ]
  131. else:
  132. expected = ["select_for_update_person", "select_for_update_country"]
  133. expected = [connection.ops.quote_name(value) for value in expected]
  134. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  135. @skipUnlessDBFeature("has_select_for_update_of")
  136. def test_for_update_sql_model_inheritance_generated_of(self):
  137. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  138. list(EUCountry.objects.select_for_update(of=("self",)))
  139. if connection.features.select_for_update_of_column:
  140. expected = ['select_for_update_eucountry"."country_ptr_id']
  141. else:
  142. expected = ["select_for_update_eucountry"]
  143. expected = [connection.ops.quote_name(value) for value in expected]
  144. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  145. @skipUnlessDBFeature("has_select_for_update_of")
  146. def test_for_update_sql_model_inheritance_ptr_generated_of(self):
  147. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  148. list(
  149. EUCountry.objects.select_for_update(
  150. of=(
  151. "self",
  152. "country_ptr",
  153. )
  154. )
  155. )
  156. if connection.features.select_for_update_of_column:
  157. expected = [
  158. 'select_for_update_eucountry"."country_ptr_id',
  159. 'select_for_update_country"."entity_ptr_id',
  160. ]
  161. else:
  162. expected = ["select_for_update_eucountry", "select_for_update_country"]
  163. expected = [connection.ops.quote_name(value) for value in expected]
  164. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  165. @skipUnlessDBFeature("has_select_for_update_of")
  166. def test_for_update_sql_related_model_inheritance_generated_of(self):
  167. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  168. list(
  169. EUCity.objects.select_related("country").select_for_update(
  170. of=("self", "country"),
  171. )
  172. )
  173. if connection.features.select_for_update_of_column:
  174. expected = [
  175. 'select_for_update_eucity"."id',
  176. 'select_for_update_eucountry"."country_ptr_id',
  177. ]
  178. else:
  179. expected = ["select_for_update_eucity", "select_for_update_eucountry"]
  180. expected = [connection.ops.quote_name(value) for value in expected]
  181. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  182. @skipUnlessDBFeature("has_select_for_update_of")
  183. def test_for_update_sql_model_inheritance_nested_ptr_generated_of(self):
  184. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  185. list(
  186. EUCity.objects.select_related("country").select_for_update(
  187. of=(
  188. "self",
  189. "country__country_ptr",
  190. ),
  191. )
  192. )
  193. if connection.features.select_for_update_of_column:
  194. expected = [
  195. 'select_for_update_eucity"."id',
  196. 'select_for_update_country"."entity_ptr_id',
  197. ]
  198. else:
  199. expected = ["select_for_update_eucity", "select_for_update_country"]
  200. expected = [connection.ops.quote_name(value) for value in expected]
  201. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  202. @skipUnlessDBFeature("has_select_for_update_of")
  203. def test_for_update_sql_multilevel_model_inheritance_ptr_generated_of(self):
  204. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  205. list(
  206. EUCountry.objects.select_for_update(
  207. of=("country_ptr", "country_ptr__entity_ptr"),
  208. )
  209. )
  210. if connection.features.select_for_update_of_column:
  211. expected = [
  212. 'select_for_update_country"."entity_ptr_id',
  213. 'select_for_update_entity"."id',
  214. ]
  215. else:
  216. expected = ["select_for_update_country", "select_for_update_entity"]
  217. expected = [connection.ops.quote_name(value) for value in expected]
  218. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  219. @skipUnlessDBFeature("has_select_for_update_of")
  220. def test_for_update_sql_model_proxy_generated_of(self):
  221. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  222. list(
  223. CityCountryProxy.objects.select_related("country",).select_for_update(
  224. of=("country",),
  225. )
  226. )
  227. if connection.features.select_for_update_of_column:
  228. expected = ['select_for_update_country"."entity_ptr_id']
  229. else:
  230. expected = ["select_for_update_country"]
  231. expected = [connection.ops.quote_name(value) for value in expected]
  232. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  233. @skipUnlessDBFeature("has_select_for_update_of")
  234. def test_for_update_of_followed_by_values(self):
  235. with transaction.atomic():
  236. values = list(Person.objects.select_for_update(of=("self",)).values("pk"))
  237. self.assertEqual(values, [{"pk": self.person.pk}])
  238. @skipUnlessDBFeature("has_select_for_update_of")
  239. def test_for_update_of_followed_by_values_list(self):
  240. with transaction.atomic():
  241. values = list(
  242. Person.objects.select_for_update(of=("self",)).values_list("pk")
  243. )
  244. self.assertEqual(values, [(self.person.pk,)])
  245. @skipUnlessDBFeature("has_select_for_update_of")
  246. def test_for_update_of_self_when_self_is_not_selected(self):
  247. """
  248. select_for_update(of=['self']) when the only columns selected are from
  249. related tables.
  250. """
  251. with transaction.atomic():
  252. values = list(
  253. Person.objects.select_related("born")
  254. .select_for_update(of=("self",))
  255. .values("born__name")
  256. )
  257. self.assertEqual(values, [{"born__name": self.city1.name}])
  258. @skipUnlessDBFeature(
  259. "has_select_for_update_of",
  260. "supports_select_for_update_with_limit",
  261. )
  262. def test_for_update_of_with_exists(self):
  263. with transaction.atomic():
  264. qs = Person.objects.select_for_update(of=("self", "born"))
  265. self.assertIs(qs.exists(), True)
  266. @skipUnlessDBFeature("has_select_for_update_nowait")
  267. def test_nowait_raises_error_on_block(self):
  268. """
  269. If nowait is specified, we expect an error to be raised rather
  270. than blocking.
  271. """
  272. self.start_blocking_transaction()
  273. status = []
  274. thread = threading.Thread(
  275. target=self.run_select_for_update,
  276. args=(status,),
  277. kwargs={"nowait": True},
  278. )
  279. thread.start()
  280. time.sleep(1)
  281. thread.join()
  282. self.end_blocking_transaction()
  283. self.assertIsInstance(status[-1], DatabaseError)
  284. @skipUnlessDBFeature("has_select_for_update_skip_locked")
  285. def test_skip_locked_skips_locked_rows(self):
  286. """
  287. If skip_locked is specified, the locked row is skipped resulting in
  288. Person.DoesNotExist.
  289. """
  290. self.start_blocking_transaction()
  291. status = []
  292. thread = threading.Thread(
  293. target=self.run_select_for_update,
  294. args=(status,),
  295. kwargs={"skip_locked": True},
  296. )
  297. thread.start()
  298. time.sleep(1)
  299. thread.join()
  300. self.end_blocking_transaction()
  301. self.assertIsInstance(status[-1], Person.DoesNotExist)
  302. @skipIfDBFeature("has_select_for_update_nowait")
  303. @skipUnlessDBFeature("has_select_for_update")
  304. def test_unsupported_nowait_raises_error(self):
  305. """
  306. NotSupportedError is raised if a SELECT...FOR UPDATE NOWAIT is run on
  307. a database backend that supports FOR UPDATE but not NOWAIT.
  308. """
  309. with self.assertRaisesMessage(
  310. NotSupportedError, "NOWAIT is not supported on this database backend."
  311. ):
  312. with transaction.atomic():
  313. Person.objects.select_for_update(nowait=True).get()
  314. @skipIfDBFeature("has_select_for_update_skip_locked")
  315. @skipUnlessDBFeature("has_select_for_update")
  316. def test_unsupported_skip_locked_raises_error(self):
  317. """
  318. NotSupportedError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run
  319. on a database backend that supports FOR UPDATE but not SKIP LOCKED.
  320. """
  321. with self.assertRaisesMessage(
  322. NotSupportedError, "SKIP LOCKED is not supported on this database backend."
  323. ):
  324. with transaction.atomic():
  325. Person.objects.select_for_update(skip_locked=True).get()
  326. @skipIfDBFeature("has_select_for_update_of")
  327. @skipUnlessDBFeature("has_select_for_update")
  328. def test_unsupported_of_raises_error(self):
  329. """
  330. NotSupportedError is raised if a SELECT...FOR UPDATE OF... is run on
  331. a database backend that supports FOR UPDATE but not OF.
  332. """
  333. msg = "FOR UPDATE OF is not supported on this database backend."
  334. with self.assertRaisesMessage(NotSupportedError, msg):
  335. with transaction.atomic():
  336. Person.objects.select_for_update(of=("self",)).get()
  337. @skipIfDBFeature("has_select_for_no_key_update")
  338. @skipUnlessDBFeature("has_select_for_update")
  339. def test_unsuported_no_key_raises_error(self):
  340. """
  341. NotSupportedError is raised if a SELECT...FOR NO KEY UPDATE... is run
  342. on a database backend that supports FOR UPDATE but not NO KEY.
  343. """
  344. msg = "FOR NO KEY UPDATE is not supported on this database backend."
  345. with self.assertRaisesMessage(NotSupportedError, msg):
  346. with transaction.atomic():
  347. Person.objects.select_for_update(no_key=True).get()
  348. @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
  349. def test_unrelated_of_argument_raises_error(self):
  350. """
  351. FieldError is raised if a non-relation field is specified in of=(...).
  352. """
  353. msg = (
  354. "Invalid field name(s) given in select_for_update(of=(...)): %s. "
  355. "Only relational fields followed in the query are allowed. "
  356. "Choices are: self, born, born__country, "
  357. "born__country__entity_ptr."
  358. )
  359. invalid_of = [
  360. ("nonexistent",),
  361. ("name",),
  362. ("born__nonexistent",),
  363. ("born__name",),
  364. ("born__nonexistent", "born__name"),
  365. ]
  366. for of in invalid_of:
  367. with self.subTest(of=of):
  368. with self.assertRaisesMessage(FieldError, msg % ", ".join(of)):
  369. with transaction.atomic():
  370. Person.objects.select_related(
  371. "born__country"
  372. ).select_for_update(of=of).get()
  373. @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
  374. def test_related_but_unselected_of_argument_raises_error(self):
  375. """
  376. FieldError is raised if a relation field that is not followed in the
  377. query is specified in of=(...).
  378. """
  379. msg = (
  380. "Invalid field name(s) given in select_for_update(of=(...)): %s. "
  381. "Only relational fields followed in the query are allowed. "
  382. "Choices are: self, born, profile."
  383. )
  384. for name in ["born__country", "died", "died__country"]:
  385. with self.subTest(name=name):
  386. with self.assertRaisesMessage(FieldError, msg % name):
  387. with transaction.atomic():
  388. Person.objects.select_related("born", "profile",).exclude(
  389. profile=None
  390. ).select_for_update(of=(name,)).get()
  391. @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
  392. def test_model_inheritance_of_argument_raises_error_ptr_in_choices(self):
  393. msg = (
  394. "Invalid field name(s) given in select_for_update(of=(...)): "
  395. "name. Only relational fields followed in the query are allowed. "
  396. "Choices are: self, %s."
  397. )
  398. with self.assertRaisesMessage(
  399. FieldError,
  400. msg % "country, country__country_ptr, country__country_ptr__entity_ptr",
  401. ):
  402. with transaction.atomic():
  403. EUCity.objects.select_related(
  404. "country",
  405. ).select_for_update(of=("name",)).get()
  406. with self.assertRaisesMessage(
  407. FieldError, msg % "country_ptr, country_ptr__entity_ptr"
  408. ):
  409. with transaction.atomic():
  410. EUCountry.objects.select_for_update(of=("name",)).get()
  411. @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
  412. def test_model_proxy_of_argument_raises_error_proxy_field_in_choices(self):
  413. msg = (
  414. "Invalid field name(s) given in select_for_update(of=(...)): "
  415. "name. Only relational fields followed in the query are allowed. "
  416. "Choices are: self, country, country__entity_ptr."
  417. )
  418. with self.assertRaisesMessage(FieldError, msg):
  419. with transaction.atomic():
  420. CityCountryProxy.objects.select_related(
  421. "country",
  422. ).select_for_update(of=("name",)).get()
  423. @skipUnlessDBFeature("has_select_for_update", "has_select_for_update_of")
  424. def test_reverse_one_to_one_of_arguments(self):
  425. """
  426. Reverse OneToOneFields may be included in of=(...) as long as NULLs
  427. are excluded because LEFT JOIN isn't allowed in SELECT FOR UPDATE.
  428. """
  429. with transaction.atomic():
  430. person = (
  431. Person.objects.select_related(
  432. "profile",
  433. )
  434. .exclude(profile=None)
  435. .select_for_update(of=("profile",))
  436. .get()
  437. )
  438. self.assertEqual(person.profile, self.person_profile)
  439. @skipUnlessDBFeature("has_select_for_update")
  440. def test_for_update_after_from(self):
  441. features_class = connections["default"].features.__class__
  442. attribute_to_patch = "%s.%s.for_update_after_from" % (
  443. features_class.__module__,
  444. features_class.__name__,
  445. )
  446. with mock.patch(attribute_to_patch, return_value=True):
  447. with transaction.atomic():
  448. self.assertIn(
  449. "FOR UPDATE WHERE",
  450. str(Person.objects.filter(name="foo").select_for_update().query),
  451. )
  452. @skipUnlessDBFeature("has_select_for_update")
  453. def test_for_update_requires_transaction(self):
  454. """
  455. A TransactionManagementError is raised
  456. when a select_for_update query is executed outside of a transaction.
  457. """
  458. msg = "select_for_update cannot be used outside of a transaction."
  459. with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
  460. list(Person.objects.select_for_update())
  461. @skipUnlessDBFeature("has_select_for_update")
  462. def test_for_update_requires_transaction_only_in_execution(self):
  463. """
  464. No TransactionManagementError is raised
  465. when select_for_update is invoked outside of a transaction -
  466. only when the query is executed.
  467. """
  468. people = Person.objects.select_for_update()
  469. msg = "select_for_update cannot be used outside of a transaction."
  470. with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
  471. list(people)
  472. @skipUnlessDBFeature("supports_select_for_update_with_limit")
  473. def test_select_for_update_with_limit(self):
  474. other = Person.objects.create(name="Grappeli", born=self.city1, died=self.city2)
  475. with transaction.atomic():
  476. qs = list(Person.objects.order_by("pk").select_for_update()[1:2])
  477. self.assertEqual(qs[0], other)
  478. @skipIfDBFeature("supports_select_for_update_with_limit")
  479. def test_unsupported_select_for_update_with_limit(self):
  480. msg = (
  481. "LIMIT/OFFSET is not supported with select_for_update on this database "
  482. "backend."
  483. )
  484. with self.assertRaisesMessage(NotSupportedError, msg):
  485. with transaction.atomic():
  486. list(Person.objects.order_by("pk").select_for_update()[1:2])
  487. def run_select_for_update(self, status, **kwargs):
  488. """
  489. Utility method that runs a SELECT FOR UPDATE against all
  490. Person instances. After the select_for_update, it attempts
  491. to update the name of the only record, save, and commit.
  492. This function expects to run in a separate thread.
  493. """
  494. status.append("started")
  495. try:
  496. # We need to enter transaction management again, as this is done on
  497. # per-thread basis
  498. with transaction.atomic():
  499. person = Person.objects.select_for_update(**kwargs).get()
  500. person.name = "Fred"
  501. person.save()
  502. except (DatabaseError, Person.DoesNotExist) as e:
  503. status.append(e)
  504. finally:
  505. # This method is run in a separate thread. It uses its own
  506. # database connection. Close it without waiting for the GC.
  507. connection.close()
  508. @skipUnlessDBFeature("has_select_for_update")
  509. @skipUnlessDBFeature("supports_transactions")
  510. def test_block(self):
  511. """
  512. A thread running a select_for_update that accesses rows being touched
  513. by a similar operation on another connection blocks correctly.
  514. """
  515. # First, let's start the transaction in our thread.
  516. self.start_blocking_transaction()
  517. # Now, try it again using the ORM's select_for_update
  518. # facility. Do this in a separate thread.
  519. status = []
  520. thread = threading.Thread(target=self.run_select_for_update, args=(status,))
  521. # The thread should immediately block, but we'll sleep
  522. # for a bit to make sure.
  523. thread.start()
  524. sanity_count = 0
  525. while len(status) != 1 and sanity_count < 10:
  526. sanity_count += 1
  527. time.sleep(1)
  528. if sanity_count >= 10:
  529. raise ValueError("Thread did not run and block")
  530. # Check the person hasn't been updated. Since this isn't
  531. # using FOR UPDATE, it won't block.
  532. p = Person.objects.get(pk=self.person.pk)
  533. self.assertEqual("Reinhardt", p.name)
  534. # When we end our blocking transaction, our thread should
  535. # be able to continue.
  536. self.end_blocking_transaction()
  537. thread.join(5.0)
  538. # Check the thread has finished. Assuming it has, we should
  539. # find that it has updated the person's name.
  540. self.assertFalse(thread.is_alive())
  541. # We must commit the transaction to ensure that MySQL gets a fresh read,
  542. # since by default it runs in REPEATABLE READ mode
  543. transaction.commit()
  544. p = Person.objects.get(pk=self.person.pk)
  545. self.assertEqual("Fred", p.name)
  546. @skipUnlessDBFeature("has_select_for_update")
  547. def test_raw_lock_not_available(self):
  548. """
  549. Running a raw query which can't obtain a FOR UPDATE lock raises
  550. the correct exception
  551. """
  552. self.start_blocking_transaction()
  553. def raw(status):
  554. try:
  555. list(
  556. Person.objects.raw(
  557. "SELECT * FROM %s %s"
  558. % (
  559. Person._meta.db_table,
  560. connection.ops.for_update_sql(nowait=True),
  561. )
  562. )
  563. )
  564. except DatabaseError as e:
  565. status.append(e)
  566. finally:
  567. # This method is run in a separate thread. It uses its own
  568. # database connection. Close it without waiting for the GC.
  569. # Connection cannot be closed on Oracle because cursor is still
  570. # open.
  571. if connection.vendor != "oracle":
  572. connection.close()
  573. status = []
  574. thread = threading.Thread(target=raw, kwargs={"status": status})
  575. thread.start()
  576. time.sleep(1)
  577. thread.join()
  578. self.end_blocking_transaction()
  579. self.assertIsInstance(status[-1], DatabaseError)
  580. @skipUnlessDBFeature("has_select_for_update")
  581. @override_settings(DATABASE_ROUTERS=[TestRouter()])
  582. def test_select_for_update_on_multidb(self):
  583. query = Person.objects.select_for_update()
  584. self.assertEqual(router.db_for_write(Person), query.db)
  585. @skipUnlessDBFeature("has_select_for_update")
  586. def test_select_for_update_with_get(self):
  587. with transaction.atomic():
  588. person = Person.objects.select_for_update().get(name="Reinhardt")
  589. self.assertEqual(person.name, "Reinhardt")
  590. def test_nowait_and_skip_locked(self):
  591. with self.assertRaisesMessage(
  592. ValueError, "The nowait option cannot be used with skip_locked."
  593. ):
  594. Person.objects.select_for_update(nowait=True, skip_locked=True)
  595. def test_ordered_select_for_update(self):
  596. """
  597. Subqueries should respect ordering as an ORDER BY clause may be useful
  598. to specify a row locking order to prevent deadlocks (#27193).
  599. """
  600. with transaction.atomic():
  601. qs = Person.objects.filter(
  602. id__in=Person.objects.order_by("-id").select_for_update()
  603. )
  604. self.assertIn("ORDER BY", str(qs.query))