tests.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564
  1. import threading
  2. import time
  3. from unittest import mock
  4. from multiple_database.routers import TestRouter
  5. from django.core.exceptions import FieldError
  6. from django.db import (
  7. DatabaseError, NotSupportedError, connection, connections, router,
  8. transaction,
  9. )
  10. from django.test import (
  11. TransactionTestCase, override_settings, skipIfDBFeature,
  12. skipUnlessDBFeature,
  13. )
  14. from django.test.utils import CaptureQueriesContext
  15. from .models import City, Country, EUCity, EUCountry, Person, PersonProfile
  16. class SelectForUpdateTests(TransactionTestCase):
  17. available_apps = ['select_for_update']
  18. def setUp(self):
  19. # This is executed in autocommit mode so that code in
  20. # run_select_for_update can see this data.
  21. self.country1 = Country.objects.create(name='Belgium')
  22. self.country2 = Country.objects.create(name='France')
  23. self.city1 = City.objects.create(name='Liberchies', country=self.country1)
  24. self.city2 = City.objects.create(name='Samois-sur-Seine', country=self.country2)
  25. self.person = Person.objects.create(name='Reinhardt', born=self.city1, died=self.city2)
  26. self.person_profile = PersonProfile.objects.create(person=self.person)
  27. # We need another database connection in transaction to test that one
  28. # connection issuing a SELECT ... FOR UPDATE will block.
  29. self.new_connection = connection.copy()
  30. def tearDown(self):
  31. try:
  32. self.end_blocking_transaction()
  33. except (DatabaseError, AttributeError):
  34. pass
  35. self.new_connection.close()
  36. def start_blocking_transaction(self):
  37. self.new_connection.set_autocommit(False)
  38. # Start a blocking transaction. At some point,
  39. # end_blocking_transaction() should be called.
  40. self.cursor = self.new_connection.cursor()
  41. sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
  42. 'db_table': Person._meta.db_table,
  43. 'for_update': self.new_connection.ops.for_update_sql(),
  44. }
  45. self.cursor.execute(sql, ())
  46. self.cursor.fetchone()
  47. def end_blocking_transaction(self):
  48. # Roll back the blocking transaction.
  49. self.cursor.close()
  50. self.new_connection.rollback()
  51. self.new_connection.set_autocommit(True)
  52. def has_for_update_sql(self, queries, **kwargs):
  53. # Examine the SQL that was executed to determine whether it
  54. # contains the 'SELECT..FOR UPDATE' stanza.
  55. for_update_sql = connection.ops.for_update_sql(**kwargs)
  56. return any(for_update_sql in query['sql'] for query in queries)
  57. @skipUnlessDBFeature('has_select_for_update')
  58. def test_for_update_sql_generated(self):
  59. """
  60. The backend's FOR UPDATE variant appears in
  61. generated SQL when select_for_update is invoked.
  62. """
  63. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  64. list(Person.objects.all().select_for_update())
  65. self.assertTrue(self.has_for_update_sql(ctx.captured_queries))
  66. @skipUnlessDBFeature('has_select_for_update_nowait')
  67. def test_for_update_sql_generated_nowait(self):
  68. """
  69. The backend's FOR UPDATE NOWAIT variant appears in
  70. generated SQL when select_for_update is invoked.
  71. """
  72. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  73. list(Person.objects.all().select_for_update(nowait=True))
  74. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, nowait=True))
  75. @skipUnlessDBFeature('has_select_for_update_skip_locked')
  76. def test_for_update_sql_generated_skip_locked(self):
  77. """
  78. The backend's FOR UPDATE SKIP LOCKED variant appears in
  79. generated SQL when select_for_update is invoked.
  80. """
  81. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  82. list(Person.objects.all().select_for_update(skip_locked=True))
  83. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, skip_locked=True))
  84. @skipUnlessDBFeature('has_select_for_no_key_update')
  85. def test_update_sql_generated_no_key(self):
  86. """
  87. The backend's FOR NO KEY UPDATE variant appears in generated SQL when
  88. select_for_update() is invoked.
  89. """
  90. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  91. list(Person.objects.all().select_for_update(no_key=True))
  92. self.assertIs(self.has_for_update_sql(ctx.captured_queries, no_key=True), True)
  93. @skipUnlessDBFeature('has_select_for_update_of')
  94. def test_for_update_sql_generated_of(self):
  95. """
  96. The backend's FOR UPDATE OF variant appears in the generated SQL when
  97. select_for_update() is invoked.
  98. """
  99. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  100. list(Person.objects.select_related(
  101. 'born__country',
  102. ).select_for_update(
  103. of=('born__country',),
  104. ).select_for_update(
  105. of=('self', 'born__country')
  106. ))
  107. features = connections['default'].features
  108. if features.select_for_update_of_column:
  109. expected = [
  110. 'select_for_update_person"."id',
  111. 'select_for_update_country"."entity_ptr_id',
  112. ]
  113. else:
  114. expected = ['select_for_update_person', 'select_for_update_country']
  115. expected = [connection.ops.quote_name(value) for value in expected]
  116. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  117. @skipUnlessDBFeature('has_select_for_update_of')
  118. def test_for_update_sql_model_inheritance_generated_of(self):
  119. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  120. list(EUCountry.objects.select_for_update(of=('self',)))
  121. if connection.features.select_for_update_of_column:
  122. expected = ['select_for_update_eucountry"."country_ptr_id']
  123. else:
  124. expected = ['select_for_update_eucountry']
  125. expected = [connection.ops.quote_name(value) for value in expected]
  126. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  127. @skipUnlessDBFeature('has_select_for_update_of')
  128. def test_for_update_sql_model_inheritance_ptr_generated_of(self):
  129. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  130. list(EUCountry.objects.select_for_update(of=('self', 'country_ptr',)))
  131. if connection.features.select_for_update_of_column:
  132. expected = [
  133. 'select_for_update_eucountry"."country_ptr_id',
  134. 'select_for_update_country"."entity_ptr_id',
  135. ]
  136. else:
  137. expected = ['select_for_update_eucountry', 'select_for_update_country']
  138. expected = [connection.ops.quote_name(value) for value in expected]
  139. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  140. @skipUnlessDBFeature('has_select_for_update_of')
  141. def test_for_update_sql_related_model_inheritance_generated_of(self):
  142. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  143. list(EUCity.objects.select_related('country').select_for_update(
  144. of=('self', 'country'),
  145. ))
  146. if connection.features.select_for_update_of_column:
  147. expected = [
  148. 'select_for_update_eucity"."id',
  149. 'select_for_update_eucountry"."country_ptr_id',
  150. ]
  151. else:
  152. expected = ['select_for_update_eucity', 'select_for_update_eucountry']
  153. expected = [connection.ops.quote_name(value) for value in expected]
  154. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  155. @skipUnlessDBFeature('has_select_for_update_of')
  156. def test_for_update_sql_model_inheritance_nested_ptr_generated_of(self):
  157. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  158. list(EUCity.objects.select_related('country').select_for_update(
  159. of=('self', 'country__country_ptr',),
  160. ))
  161. if connection.features.select_for_update_of_column:
  162. expected = [
  163. 'select_for_update_eucity"."id',
  164. 'select_for_update_country"."entity_ptr_id',
  165. ]
  166. else:
  167. expected = ['select_for_update_eucity', 'select_for_update_country']
  168. expected = [connection.ops.quote_name(value) for value in expected]
  169. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  170. @skipUnlessDBFeature('has_select_for_update_of')
  171. def test_for_update_sql_multilevel_model_inheritance_ptr_generated_of(self):
  172. with transaction.atomic(), CaptureQueriesContext(connection) as ctx:
  173. list(EUCountry.objects.select_for_update(
  174. of=('country_ptr', 'country_ptr__entity_ptr'),
  175. ))
  176. if connection.features.select_for_update_of_column:
  177. expected = [
  178. 'select_for_update_country"."entity_ptr_id',
  179. 'select_for_update_entity"."id',
  180. ]
  181. else:
  182. expected = ['select_for_update_country', 'select_for_update_entity']
  183. expected = [connection.ops.quote_name(value) for value in expected]
  184. self.assertTrue(self.has_for_update_sql(ctx.captured_queries, of=expected))
  185. @skipUnlessDBFeature('has_select_for_update_of')
  186. def test_for_update_of_followed_by_values(self):
  187. with transaction.atomic():
  188. values = list(Person.objects.select_for_update(of=('self',)).values('pk'))
  189. self.assertEqual(values, [{'pk': self.person.pk}])
  190. @skipUnlessDBFeature('has_select_for_update_of')
  191. def test_for_update_of_followed_by_values_list(self):
  192. with transaction.atomic():
  193. values = list(Person.objects.select_for_update(of=('self',)).values_list('pk'))
  194. self.assertEqual(values, [(self.person.pk,)])
  195. @skipUnlessDBFeature('has_select_for_update_of')
  196. def test_for_update_of_self_when_self_is_not_selected(self):
  197. """
  198. select_for_update(of=['self']) when the only columns selected are from
  199. related tables.
  200. """
  201. with transaction.atomic():
  202. values = list(Person.objects.select_related('born').select_for_update(of=('self',)).values('born__name'))
  203. self.assertEqual(values, [{'born__name': self.city1.name}])
  204. @skipUnlessDBFeature('has_select_for_update_nowait')
  205. def test_nowait_raises_error_on_block(self):
  206. """
  207. If nowait is specified, we expect an error to be raised rather
  208. than blocking.
  209. """
  210. self.start_blocking_transaction()
  211. status = []
  212. thread = threading.Thread(
  213. target=self.run_select_for_update,
  214. args=(status,),
  215. kwargs={'nowait': True},
  216. )
  217. thread.start()
  218. time.sleep(1)
  219. thread.join()
  220. self.end_blocking_transaction()
  221. self.assertIsInstance(status[-1], DatabaseError)
  222. @skipUnlessDBFeature('has_select_for_update_skip_locked')
  223. def test_skip_locked_skips_locked_rows(self):
  224. """
  225. If skip_locked is specified, the locked row is skipped resulting in
  226. Person.DoesNotExist.
  227. """
  228. self.start_blocking_transaction()
  229. status = []
  230. thread = threading.Thread(
  231. target=self.run_select_for_update,
  232. args=(status,),
  233. kwargs={'skip_locked': True},
  234. )
  235. thread.start()
  236. time.sleep(1)
  237. thread.join()
  238. self.end_blocking_transaction()
  239. self.assertIsInstance(status[-1], Person.DoesNotExist)
  240. @skipIfDBFeature('has_select_for_update_nowait')
  241. @skipUnlessDBFeature('has_select_for_update')
  242. def test_unsupported_nowait_raises_error(self):
  243. """
  244. NotSupportedError is raised if a SELECT...FOR UPDATE NOWAIT is run on
  245. a database backend that supports FOR UPDATE but not NOWAIT.
  246. """
  247. with self.assertRaisesMessage(NotSupportedError, 'NOWAIT is not supported on this database backend.'):
  248. with transaction.atomic():
  249. Person.objects.select_for_update(nowait=True).get()
  250. @skipIfDBFeature('has_select_for_update_skip_locked')
  251. @skipUnlessDBFeature('has_select_for_update')
  252. def test_unsupported_skip_locked_raises_error(self):
  253. """
  254. NotSupportedError is raised if a SELECT...FOR UPDATE SKIP LOCKED is run
  255. on a database backend that supports FOR UPDATE but not SKIP LOCKED.
  256. """
  257. with self.assertRaisesMessage(NotSupportedError, 'SKIP LOCKED is not supported on this database backend.'):
  258. with transaction.atomic():
  259. Person.objects.select_for_update(skip_locked=True).get()
  260. @skipIfDBFeature('has_select_for_update_of')
  261. @skipUnlessDBFeature('has_select_for_update')
  262. def test_unsupported_of_raises_error(self):
  263. """
  264. NotSupportedError is raised if a SELECT...FOR UPDATE OF... is run on
  265. a database backend that supports FOR UPDATE but not OF.
  266. """
  267. msg = 'FOR UPDATE OF is not supported on this database backend.'
  268. with self.assertRaisesMessage(NotSupportedError, msg):
  269. with transaction.atomic():
  270. Person.objects.select_for_update(of=('self',)).get()
  271. @skipIfDBFeature('has_select_for_no_key_update')
  272. @skipUnlessDBFeature('has_select_for_update')
  273. def test_unsuported_no_key_raises_error(self):
  274. """
  275. NotSupportedError is raised if a SELECT...FOR NO KEY UPDATE... is run
  276. on a database backend that supports FOR UPDATE but not NO KEY.
  277. """
  278. msg = 'FOR NO KEY UPDATE is not supported on this database backend.'
  279. with self.assertRaisesMessage(NotSupportedError, msg):
  280. with transaction.atomic():
  281. Person.objects.select_for_update(no_key=True).get()
  282. @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of')
  283. def test_unrelated_of_argument_raises_error(self):
  284. """
  285. FieldError is raised if a non-relation field is specified in of=(...).
  286. """
  287. msg = (
  288. 'Invalid field name(s) given in select_for_update(of=(...)): %s. '
  289. 'Only relational fields followed in the query are allowed. '
  290. 'Choices are: self, born, born__country, '
  291. 'born__country__entity_ptr.'
  292. )
  293. invalid_of = [
  294. ('nonexistent',),
  295. ('name',),
  296. ('born__nonexistent',),
  297. ('born__name',),
  298. ('born__nonexistent', 'born__name'),
  299. ]
  300. for of in invalid_of:
  301. with self.subTest(of=of):
  302. with self.assertRaisesMessage(FieldError, msg % ', '.join(of)):
  303. with transaction.atomic():
  304. Person.objects.select_related('born__country').select_for_update(of=of).get()
  305. @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of')
  306. def test_related_but_unselected_of_argument_raises_error(self):
  307. """
  308. FieldError is raised if a relation field that is not followed in the
  309. query is specified in of=(...).
  310. """
  311. msg = (
  312. 'Invalid field name(s) given in select_for_update(of=(...)): %s. '
  313. 'Only relational fields followed in the query are allowed. '
  314. 'Choices are: self, born, profile.'
  315. )
  316. for name in ['born__country', 'died', 'died__country']:
  317. with self.subTest(name=name):
  318. with self.assertRaisesMessage(FieldError, msg % name):
  319. with transaction.atomic():
  320. Person.objects.select_related(
  321. 'born', 'profile',
  322. ).exclude(profile=None).select_for_update(of=(name,)).get()
  323. @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of')
  324. def test_model_inheritance_of_argument_raises_error_ptr_in_choices(self):
  325. msg = (
  326. 'Invalid field name(s) given in select_for_update(of=(...)): '
  327. 'name. Only relational fields followed in the query are allowed. '
  328. 'Choices are: self, %s.'
  329. )
  330. with self.assertRaisesMessage(
  331. FieldError,
  332. msg % 'country, country__country_ptr, country__country_ptr__entity_ptr',
  333. ):
  334. with transaction.atomic():
  335. EUCity.objects.select_related(
  336. 'country',
  337. ).select_for_update(of=('name',)).get()
  338. with self.assertRaisesMessage(FieldError, msg % 'country_ptr, country_ptr__entity_ptr'):
  339. with transaction.atomic():
  340. EUCountry.objects.select_for_update(of=('name',)).get()
  341. @skipUnlessDBFeature('has_select_for_update', 'has_select_for_update_of')
  342. def test_reverse_one_to_one_of_arguments(self):
  343. """
  344. Reverse OneToOneFields may be included in of=(...) as long as NULLs
  345. are excluded because LEFT JOIN isn't allowed in SELECT FOR UPDATE.
  346. """
  347. with transaction.atomic():
  348. person = Person.objects.select_related(
  349. 'profile',
  350. ).exclude(profile=None).select_for_update(of=('profile',)).get()
  351. self.assertEqual(person.profile, self.person_profile)
  352. @skipUnlessDBFeature('has_select_for_update')
  353. def test_for_update_after_from(self):
  354. features_class = connections['default'].features.__class__
  355. attribute_to_patch = "%s.%s.for_update_after_from" % (features_class.__module__, features_class.__name__)
  356. with mock.patch(attribute_to_patch, return_value=True):
  357. with transaction.atomic():
  358. self.assertIn('FOR UPDATE WHERE', str(Person.objects.filter(name='foo').select_for_update().query))
  359. @skipUnlessDBFeature('has_select_for_update')
  360. def test_for_update_requires_transaction(self):
  361. """
  362. A TransactionManagementError is raised
  363. when a select_for_update query is executed outside of a transaction.
  364. """
  365. msg = 'select_for_update cannot be used outside of a transaction.'
  366. with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
  367. list(Person.objects.all().select_for_update())
  368. @skipUnlessDBFeature('has_select_for_update')
  369. def test_for_update_requires_transaction_only_in_execution(self):
  370. """
  371. No TransactionManagementError is raised
  372. when select_for_update is invoked outside of a transaction -
  373. only when the query is executed.
  374. """
  375. people = Person.objects.all().select_for_update()
  376. msg = 'select_for_update cannot be used outside of a transaction.'
  377. with self.assertRaisesMessage(transaction.TransactionManagementError, msg):
  378. list(people)
  379. @skipUnlessDBFeature('supports_select_for_update_with_limit')
  380. def test_select_for_update_with_limit(self):
  381. other = Person.objects.create(name='Grappeli', born=self.city1, died=self.city2)
  382. with transaction.atomic():
  383. qs = list(Person.objects.all().order_by('pk').select_for_update()[1:2])
  384. self.assertEqual(qs[0], other)
  385. @skipIfDBFeature('supports_select_for_update_with_limit')
  386. def test_unsupported_select_for_update_with_limit(self):
  387. msg = 'LIMIT/OFFSET is not supported with select_for_update on this database backend.'
  388. with self.assertRaisesMessage(NotSupportedError, msg):
  389. with transaction.atomic():
  390. list(Person.objects.all().order_by('pk').select_for_update()[1:2])
  391. def run_select_for_update(self, status, **kwargs):
  392. """
  393. Utility method that runs a SELECT FOR UPDATE against all
  394. Person instances. After the select_for_update, it attempts
  395. to update the name of the only record, save, and commit.
  396. This function expects to run in a separate thread.
  397. """
  398. status.append('started')
  399. try:
  400. # We need to enter transaction management again, as this is done on
  401. # per-thread basis
  402. with transaction.atomic():
  403. person = Person.objects.select_for_update(**kwargs).get()
  404. person.name = 'Fred'
  405. person.save()
  406. except (DatabaseError, Person.DoesNotExist) as e:
  407. status.append(e)
  408. finally:
  409. # This method is run in a separate thread. It uses its own
  410. # database connection. Close it without waiting for the GC.
  411. connection.close()
  412. @skipUnlessDBFeature('has_select_for_update')
  413. @skipUnlessDBFeature('supports_transactions')
  414. def test_block(self):
  415. """
  416. A thread running a select_for_update that accesses rows being touched
  417. by a similar operation on another connection blocks correctly.
  418. """
  419. # First, let's start the transaction in our thread.
  420. self.start_blocking_transaction()
  421. # Now, try it again using the ORM's select_for_update
  422. # facility. Do this in a separate thread.
  423. status = []
  424. thread = threading.Thread(
  425. target=self.run_select_for_update, args=(status,)
  426. )
  427. # The thread should immediately block, but we'll sleep
  428. # for a bit to make sure.
  429. thread.start()
  430. sanity_count = 0
  431. while len(status) != 1 and sanity_count < 10:
  432. sanity_count += 1
  433. time.sleep(1)
  434. if sanity_count >= 10:
  435. raise ValueError('Thread did not run and block')
  436. # Check the person hasn't been updated. Since this isn't
  437. # using FOR UPDATE, it won't block.
  438. p = Person.objects.get(pk=self.person.pk)
  439. self.assertEqual('Reinhardt', p.name)
  440. # When we end our blocking transaction, our thread should
  441. # be able to continue.
  442. self.end_blocking_transaction()
  443. thread.join(5.0)
  444. # Check the thread has finished. Assuming it has, we should
  445. # find that it has updated the person's name.
  446. self.assertFalse(thread.is_alive())
  447. # We must commit the transaction to ensure that MySQL gets a fresh read,
  448. # since by default it runs in REPEATABLE READ mode
  449. transaction.commit()
  450. p = Person.objects.get(pk=self.person.pk)
  451. self.assertEqual('Fred', p.name)
  452. @skipUnlessDBFeature('has_select_for_update')
  453. def test_raw_lock_not_available(self):
  454. """
  455. Running a raw query which can't obtain a FOR UPDATE lock raises
  456. the correct exception
  457. """
  458. self.start_blocking_transaction()
  459. def raw(status):
  460. try:
  461. list(
  462. Person.objects.raw(
  463. 'SELECT * FROM %s %s' % (
  464. Person._meta.db_table,
  465. connection.ops.for_update_sql(nowait=True)
  466. )
  467. )
  468. )
  469. except DatabaseError as e:
  470. status.append(e)
  471. finally:
  472. # This method is run in a separate thread. It uses its own
  473. # database connection. Close it without waiting for the GC.
  474. # Connection cannot be closed on Oracle because cursor is still
  475. # open.
  476. if connection.vendor != 'oracle':
  477. connection.close()
  478. status = []
  479. thread = threading.Thread(target=raw, kwargs={'status': status})
  480. thread.start()
  481. time.sleep(1)
  482. thread.join()
  483. self.end_blocking_transaction()
  484. self.assertIsInstance(status[-1], DatabaseError)
  485. @skipUnlessDBFeature('has_select_for_update')
  486. @override_settings(DATABASE_ROUTERS=[TestRouter()])
  487. def test_select_for_update_on_multidb(self):
  488. query = Person.objects.select_for_update()
  489. self.assertEqual(router.db_for_write(Person), query.db)
  490. @skipUnlessDBFeature('has_select_for_update')
  491. def test_select_for_update_with_get(self):
  492. with transaction.atomic():
  493. person = Person.objects.select_for_update().get(name='Reinhardt')
  494. self.assertEqual(person.name, 'Reinhardt')
  495. def test_nowait_and_skip_locked(self):
  496. with self.assertRaisesMessage(ValueError, 'The nowait option cannot be used with skip_locked.'):
  497. Person.objects.select_for_update(nowait=True, skip_locked=True)
  498. def test_ordered_select_for_update(self):
  499. """
  500. Subqueries should respect ordering as an ORDER BY clause may be useful
  501. to specify a row locking order to prevent deadlocks (#27193).
  502. """
  503. with transaction.atomic():
  504. qs = Person.objects.filter(id__in=Person.objects.order_by('-id').select_for_update())
  505. self.assertIn('ORDER BY', str(qs.query))