123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- from __future__ import absolute_import
- import sys
- import time
- import unittest
- from django.conf import settings
- from django.db import transaction, connection
- from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError
- from django.test import (TransactionTestCase, skipIfDBFeature,
- skipUnlessDBFeature)
- from .models import Person
- # Some tests require threading, which might not be available. So create a
- # skip-test decorator for those test functions.
- try:
- import threading
- except ImportError:
- threading = None
- requires_threading = unittest.skipUnless(threading, 'requires threading')
- class SelectForUpdateTests(TransactionTestCase):
- available_apps = ['select_for_update']
- def setUp(self):
- transaction.enter_transaction_management()
- self.person = Person.objects.create(name='Reinhardt')
- # We have to commit here so that code in run_select_for_update can
- # see this data.
- transaction.commit()
- # We need another database connection to test that one connection
- # issuing a SELECT ... FOR UPDATE will block.
- new_connections = ConnectionHandler(settings.DATABASES)
- self.new_connection = new_connections[DEFAULT_DB_ALIAS]
- self.new_connection.enter_transaction_management()
- # We need to set settings.DEBUG to True so we can capture
- # the output SQL to examine.
- self._old_debug = settings.DEBUG
- settings.DEBUG = True
- def tearDown(self):
- try:
- # We don't really care if this fails - some of the tests will set
- # this in the course of their run.
- transaction.abort()
- self.new_connection.abort()
- except transaction.TransactionManagementError:
- pass
- self.new_connection.close()
- settings.DEBUG = self._old_debug
- try:
- self.end_blocking_transaction()
- except (DatabaseError, AttributeError):
- pass
- def start_blocking_transaction(self):
- # Start a blocking transaction. At some point,
- # end_blocking_transaction() should be called.
- self.cursor = self.new_connection.cursor()
- sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % {
- 'db_table': Person._meta.db_table,
- 'for_update': self.new_connection.ops.for_update_sql(),
- }
- self.cursor.execute(sql, ())
- self.cursor.fetchone()
- def end_blocking_transaction(self):
- # Roll back the blocking transaction.
- self.new_connection.rollback()
- def has_for_update_sql(self, tested_connection, nowait=False):
- # Examine the SQL that was executed to determine whether it
- # contains the 'SELECT..FOR UPDATE' stanza.
- for_update_sql = tested_connection.ops.for_update_sql(nowait)
- sql = tested_connection.queries[-1]['sql']
- return bool(sql.find(for_update_sql) > -1)
- @skipUnlessDBFeature('has_select_for_update')
- def test_for_update_sql_generated(self):
- """
- Test that the backend's FOR UPDATE variant appears in
- generated SQL when select_for_update is invoked.
- """
- list(Person.objects.all().select_for_update())
- self.assertTrue(self.has_for_update_sql(connection))
- @skipUnlessDBFeature('has_select_for_update_nowait')
- def test_for_update_sql_generated_nowait(self):
- """
- Test that the backend's FOR UPDATE NOWAIT variant appears in
- generated SQL when select_for_update is invoked.
- """
- list(Person.objects.all().select_for_update(nowait=True))
- self.assertTrue(self.has_for_update_sql(connection, nowait=True))
- @requires_threading
- @skipUnlessDBFeature('has_select_for_update_nowait')
- def test_nowait_raises_error_on_block(self):
- """
- If nowait is specified, we expect an error to be raised rather
- than blocking.
- """
- self.start_blocking_transaction()
- status = []
- thread = threading.Thread(
- target=self.run_select_for_update,
- args=(status,),
- kwargs={'nowait': True},
- )
- thread.start()
- time.sleep(1)
- thread.join()
- self.end_blocking_transaction()
- self.assertIsInstance(status[-1], DatabaseError)
- @skipIfDBFeature('has_select_for_update_nowait')
- @skipUnlessDBFeature('has_select_for_update')
- def test_unsupported_nowait_raises_error(self):
- """
- If a SELECT...FOR UPDATE NOWAIT is run on a database backend
- that supports FOR UPDATE but not NOWAIT, then we should find
- that a DatabaseError is raised.
- """
- self.assertRaises(
- DatabaseError,
- list,
- Person.objects.all().select_for_update(nowait=True)
- )
- def run_select_for_update(self, status, nowait=False):
- """
- Utility method that runs a SELECT FOR UPDATE against all
- Person instances. After the select_for_update, it attempts
- to update the name of the only record, save, and commit.
- This function expects to run in a separate thread.
- """
- status.append('started')
- try:
- # We need to enter transaction management again, as this is done on
- # per-thread basis
- transaction.enter_transaction_management()
- people = list(
- Person.objects.all().select_for_update(nowait=nowait)
- )
- people[0].name = 'Fred'
- people[0].save()
- transaction.commit()
- except DatabaseError as e:
- status.append(e)
- finally:
- # This method is run in a separate thread. It uses its own
- # database connection. Close it without waiting for the GC.
- transaction.abort()
- connection.close()
- @requires_threading
- @skipUnlessDBFeature('has_select_for_update')
- @skipUnlessDBFeature('supports_transactions')
- def test_block(self):
- """
- Check that a thread running a select_for_update that
- accesses rows being touched by a similar operation
- on another connection blocks correctly.
- """
- # First, let's start the transaction in our thread.
- self.start_blocking_transaction()
- # Now, try it again using the ORM's select_for_update
- # facility. Do this in a separate thread.
- status = []
- thread = threading.Thread(
- target=self.run_select_for_update, args=(status,)
- )
- # The thread should immediately block, but we'll sleep
- # for a bit to make sure.
- thread.start()
- sanity_count = 0
- while len(status) != 1 and sanity_count < 10:
- sanity_count += 1
- time.sleep(1)
- if sanity_count >= 10:
- raise ValueError('Thread did not run and block')
- # Check the person hasn't been updated. Since this isn't
- # using FOR UPDATE, it won't block.
- p = Person.objects.get(pk=self.person.pk)
- self.assertEqual('Reinhardt', p.name)
- # When we end our blocking transaction, our thread should
- # be able to continue.
- self.end_blocking_transaction()
- thread.join(5.0)
- # Check the thread has finished. Assuming it has, we should
- # find that it has updated the person's name.
- self.assertFalse(thread.isAlive())
- # We must commit the transaction to ensure that MySQL gets a fresh read,
- # since by default it runs in REPEATABLE READ mode
- transaction.commit()
- p = Person.objects.get(pk=self.person.pk)
- self.assertEqual('Fred', p.name)
- @requires_threading
- @skipUnlessDBFeature('has_select_for_update')
- def test_raw_lock_not_available(self):
- """
- Check that running a raw query which can't obtain a FOR UPDATE lock
- raises the correct exception
- """
- self.start_blocking_transaction()
- def raw(status):
- try:
- list(
- Person.objects.raw(
- 'SELECT * FROM %s %s' % (
- Person._meta.db_table,
- connection.ops.for_update_sql(nowait=True)
- )
- )
- )
- except DatabaseError as e:
- status.append(e)
- finally:
- # This method is run in a separate thread. It uses its own
- # database connection. Close it without waiting for the GC.
- connection.close()
- status = []
- thread = threading.Thread(target=raw, kwargs={'status': status})
- thread.start()
- time.sleep(1)
- thread.join()
- self.end_blocking_transaction()
- self.assertIsInstance(status[-1], DatabaseError)
- @skipUnlessDBFeature('has_select_for_update')
- def test_transaction_dirty_managed(self):
- """ Check that a select_for_update sets the transaction to be
- dirty when executed under txn management. Setting the txn dirty
- means that it will be either committed or rolled back by Django,
- which will release any locks held by the SELECT FOR UPDATE.
- """
- people = list(Person.objects.select_for_update())
- self.assertTrue(transaction.is_dirty())
|