123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- from django.db import connection, transaction
- from django.test import TransactionTestCase, skipUnlessDBFeature
- from .models import Thing
- class ForcedError(Exception):
- pass
- class TestConnectionOnCommit(TransactionTestCase):
- """
- Tests for transaction.on_commit().
- Creation/checking of database objects in parallel with callback tracking is
- to verify that the behavior of the two match in all tested cases.
- """
- available_apps = ['transaction_hooks']
- def setUp(self):
- self.notified = []
- def notify(self, id_):
- if id_ == 'error':
- raise ForcedError()
- self.notified.append(id_)
- def do(self, num):
- """Create a Thing instance and notify about it."""
- Thing.objects.create(num=num)
- transaction.on_commit(lambda: self.notify(num))
- def assertDone(self, nums):
- self.assertNotified(nums)
- self.assertEqual(sorted(t.num for t in Thing.objects.all()), sorted(nums))
- def assertNotified(self, nums):
- self.assertEqual(self.notified, nums)
- def test_executes_immediately_if_no_transaction(self):
- self.do(1)
- self.assertDone([1])
- def test_delays_execution_until_after_transaction_commit(self):
- with transaction.atomic():
- self.do(1)
- self.assertNotified([])
- self.assertDone([1])
- def test_does_not_execute_if_transaction_rolled_back(self):
- try:
- with transaction.atomic():
- self.do(1)
- raise ForcedError()
- except ForcedError:
- pass
- self.assertDone([])
- def test_executes_only_after_final_transaction_committed(self):
- with transaction.atomic():
- with transaction.atomic():
- self.do(1)
- self.assertNotified([])
- self.assertNotified([])
- self.assertDone([1])
- def test_discards_hooks_from_rolled_back_savepoint(self):
- with transaction.atomic():
- # one successful savepoint
- with transaction.atomic():
- self.do(1)
- # one failed savepoint
- try:
- with transaction.atomic():
- self.do(2)
- raise ForcedError()
- except ForcedError:
- pass
- # another successful savepoint
- with transaction.atomic():
- self.do(3)
- # only hooks registered during successful savepoints execute
- self.assertDone([1, 3])
- def test_no_hooks_run_from_failed_transaction(self):
- """If outer transaction fails, no hooks from within it run."""
- try:
- with transaction.atomic():
- with transaction.atomic():
- self.do(1)
- raise ForcedError()
- except ForcedError:
- pass
- self.assertDone([])
- def test_inner_savepoint_rolled_back_with_outer(self):
- with transaction.atomic():
- try:
- with transaction.atomic():
- with transaction.atomic():
- self.do(1)
- raise ForcedError()
- except ForcedError:
- pass
- self.do(2)
- self.assertDone([2])
- def test_no_savepoints_atomic_merged_with_outer(self):
- with transaction.atomic():
- with transaction.atomic():
- self.do(1)
- try:
- with transaction.atomic(savepoint=False):
- raise ForcedError()
- except ForcedError:
- pass
- self.assertDone([])
- def test_inner_savepoint_does_not_affect_outer(self):
- with transaction.atomic():
- with transaction.atomic():
- self.do(1)
- try:
- with transaction.atomic():
- raise ForcedError()
- except ForcedError:
- pass
- self.assertDone([1])
- def test_runs_hooks_in_order_registered(self):
- with transaction.atomic():
- self.do(1)
- with transaction.atomic():
- self.do(2)
- self.do(3)
- self.assertDone([1, 2, 3])
- def test_hooks_cleared_after_successful_commit(self):
- with transaction.atomic():
- self.do(1)
- with transaction.atomic():
- self.do(2)
- self.assertDone([1, 2]) # not [1, 1, 2]
- def test_hooks_cleared_after_rollback(self):
- try:
- with transaction.atomic():
- self.do(1)
- raise ForcedError()
- except ForcedError:
- pass
- with transaction.atomic():
- self.do(2)
- self.assertDone([2])
- @skipUnlessDBFeature('test_db_allows_multiple_connections')
- def test_hooks_cleared_on_reconnect(self):
- with transaction.atomic():
- self.do(1)
- connection.close()
- connection.connect()
- with transaction.atomic():
- self.do(2)
- self.assertDone([2])
- def test_error_in_hook_doesnt_prevent_clearing_hooks(self):
- try:
- with transaction.atomic():
- transaction.on_commit(lambda: self.notify('error'))
- except ForcedError:
- pass
- with transaction.atomic():
- self.do(1)
- self.assertDone([1])
- def test_db_query_in_hook(self):
- with transaction.atomic():
- Thing.objects.create(num=1)
- transaction.on_commit(
- lambda: [self.notify(t.num) for t in Thing.objects.all()]
- )
- self.assertDone([1])
- def test_transaction_in_hook(self):
- def on_commit():
- with transaction.atomic():
- t = Thing.objects.create(num=1)
- self.notify(t.num)
- with transaction.atomic():
- transaction.on_commit(on_commit)
- self.assertDone([1])
- def test_raises_exception_non_autocommit_mode(self):
- def should_never_be_called():
- raise AssertionError('this function should never be called')
- try:
- connection.set_autocommit(False)
- with self.assertRaises(transaction.TransactionManagementError):
- transaction.on_commit(should_never_be_called)
- finally:
- connection.set_autocommit(True)
|