tests.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. from __future__ import absolute_import
  2. from django.db import connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError
  3. from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError
  4. from django.test import TransactionTestCase, skipUnlessDBFeature
  5. from django.test.utils import override_settings
  6. from django.utils.unittest import skipIf, skipUnless, expectedFailure
  7. from .models import Mod, M2mA, M2mB
  8. class TestTransactionClosing(TransactionTestCase):
  9. """
  10. Tests to make sure that transactions are properly closed
  11. when they should be, and aren't left pending after operations
  12. have been performed in them. Refs #9964.
  13. """
  14. def test_raw_committed_on_success(self):
  15. """
  16. Make sure a transaction consisting of raw SQL execution gets
  17. committed by the commit_on_success decorator.
  18. """
  19. @commit_on_success
  20. def raw_sql():
  21. "Write a record using raw sql under a commit_on_success decorator"
  22. cursor = connection.cursor()
  23. cursor.execute("INSERT into transactions_regress_mod (fld) values (18)")
  24. raw_sql()
  25. # Rollback so that if the decorator didn't commit, the record is unwritten
  26. transaction.rollback()
  27. self.assertEqual(Mod.objects.count(), 1)
  28. # Check that the record is in the DB
  29. obj = Mod.objects.all()[0]
  30. self.assertEqual(obj.fld, 18)
  31. def test_commit_manually_enforced(self):
  32. """
  33. Make sure that under commit_manually, even "read-only" transaction require closure
  34. (commit or rollback), and a transaction left pending is treated as an error.
  35. """
  36. @commit_manually
  37. def non_comitter():
  38. "Execute a managed transaction with read-only operations and fail to commit"
  39. Mod.objects.count()
  40. self.assertRaises(TransactionManagementError, non_comitter)
  41. def test_commit_manually_commit_ok(self):
  42. """
  43. Test that under commit_manually, a committed transaction is accepted by the transaction
  44. management mechanisms
  45. """
  46. @commit_manually
  47. def committer():
  48. """
  49. Perform a database query, then commit the transaction
  50. """
  51. Mod.objects.count()
  52. transaction.commit()
  53. try:
  54. committer()
  55. except TransactionManagementError:
  56. self.fail("Commit did not clear the transaction state")
  57. def test_commit_manually_rollback_ok(self):
  58. """
  59. Test that under commit_manually, a rolled-back transaction is accepted by the transaction
  60. management mechanisms
  61. """
  62. @commit_manually
  63. def roller_back():
  64. """
  65. Perform a database query, then rollback the transaction
  66. """
  67. Mod.objects.count()
  68. transaction.rollback()
  69. try:
  70. roller_back()
  71. except TransactionManagementError:
  72. self.fail("Rollback did not clear the transaction state")
  73. def test_commit_manually_enforced_after_commit(self):
  74. """
  75. Test that under commit_manually, if a transaction is committed and an operation is
  76. performed later, we still require the new transaction to be closed
  77. """
  78. @commit_manually
  79. def fake_committer():
  80. "Query, commit, then query again, leaving with a pending transaction"
  81. Mod.objects.count()
  82. transaction.commit()
  83. Mod.objects.count()
  84. self.assertRaises(TransactionManagementError, fake_committer)
  85. @skipUnlessDBFeature('supports_transactions')
  86. def test_reuse_cursor_reference(self):
  87. """
  88. Make sure transaction closure is enforced even when the queries are performed
  89. through a single cursor reference retrieved in the beginning
  90. (this is to show why it is wrong to set the transaction dirty only when a cursor
  91. is fetched from the connection).
  92. """
  93. @commit_on_success
  94. def reuse_cursor_ref():
  95. """
  96. Fetch a cursor, perform an query, rollback to close the transaction,
  97. then write a record (in a new transaction) using the same cursor object
  98. (reference). All this under commit_on_success, so the second insert should
  99. be committed.
  100. """
  101. cursor = connection.cursor()
  102. cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
  103. transaction.rollback()
  104. cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
  105. reuse_cursor_ref()
  106. # Rollback so that if the decorator didn't commit, the record is unwritten
  107. transaction.rollback()
  108. self.assertEqual(Mod.objects.count(), 1)
  109. obj = Mod.objects.all()[0]
  110. self.assertEqual(obj.fld, 2)
  111. def test_failing_query_transaction_closed(self):
  112. """
  113. Make sure that under commit_on_success, a transaction is rolled back even if
  114. the first database-modifying operation fails.
  115. This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample
  116. code posted there to exemplify the problem): Before Django 1.3,
  117. transactions were only marked "dirty" by the save() function after it successfully
  118. wrote the object to the database.
  119. """
  120. from django.contrib.auth.models import User
  121. @transaction.commit_on_success
  122. def create_system_user():
  123. "Create a user in a transaction"
  124. user = User.objects.create_user(username='system', password='iamr00t',
  125. email='root@SITENAME.com')
  126. # Redundant, just makes sure the user id was read back from DB
  127. Mod.objects.create(fld=user.pk)
  128. # Create a user
  129. create_system_user()
  130. with self.assertRaises(DatabaseError):
  131. # The second call to create_system_user should fail for violating
  132. # a unique constraint (it's trying to re-create the same user)
  133. create_system_user()
  134. # Try to read the database. If the last transaction was indeed closed,
  135. # this should cause no problems
  136. User.objects.all()[0]
  137. @override_settings(DEBUG=True)
  138. def test_failing_query_transaction_closed_debug(self):
  139. """
  140. Regression for #6669. Same test as above, with DEBUG=True.
  141. """
  142. self.test_failing_query_transaction_closed()
  143. @skipIf(connection.vendor == 'sqlite' and
  144. (connection.settings_dict['NAME'] == ':memory:' or
  145. not connection.settings_dict['NAME']),
  146. 'Test uses multiple connections, but in-memory sqlite does not support this')
  147. class TestNewConnection(TransactionTestCase):
  148. """
  149. Check that new connections don't have special behaviour.
  150. """
  151. def setUp(self):
  152. self._old_backend = connections[DEFAULT_DB_ALIAS]
  153. settings = self._old_backend.settings_dict.copy()
  154. new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
  155. connections[DEFAULT_DB_ALIAS] = new_backend
  156. def tearDown(self):
  157. try:
  158. connections[DEFAULT_DB_ALIAS].abort()
  159. except Exception:
  160. import ipdb; ipdb.set_trace()
  161. finally:
  162. connections[DEFAULT_DB_ALIAS].close()
  163. connections[DEFAULT_DB_ALIAS] = self._old_backend
  164. # TODO: update this test to account for database-level autocommit.
  165. @expectedFailure
  166. def test_commit(self):
  167. """
  168. Users are allowed to commit and rollback connections.
  169. """
  170. # The starting value is False, not None.
  171. self.assertIs(connection._dirty, False)
  172. list(Mod.objects.all())
  173. self.assertTrue(connection.is_dirty())
  174. connection.commit()
  175. self.assertFalse(connection.is_dirty())
  176. list(Mod.objects.all())
  177. self.assertTrue(connection.is_dirty())
  178. connection.rollback()
  179. self.assertFalse(connection.is_dirty())
  180. def test_enter_exit_management(self):
  181. orig_dirty = connection._dirty
  182. connection.enter_transaction_management()
  183. connection.leave_transaction_management()
  184. self.assertEqual(orig_dirty, connection._dirty)
  185. @skipUnless(connection.vendor == 'postgresql',
  186. "This test only valid for PostgreSQL")
  187. class TestPostgresAutocommitAndIsolation(TransactionTestCase):
  188. """
  189. Tests to make sure psycopg2's autocommit mode and isolation level
  190. is restored after entering and leaving transaction management.
  191. Refs #16047, #18130.
  192. """
  193. def setUp(self):
  194. from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
  195. ISOLATION_LEVEL_SERIALIZABLE,
  196. TRANSACTION_STATUS_IDLE)
  197. self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT
  198. self._serializable = ISOLATION_LEVEL_SERIALIZABLE
  199. self._idle = TRANSACTION_STATUS_IDLE
  200. # We want a clean backend with autocommit = True, so
  201. # first we need to do a bit of work to have that.
  202. self._old_backend = connections[DEFAULT_DB_ALIAS]
  203. settings = self._old_backend.settings_dict.copy()
  204. opts = settings['OPTIONS'].copy()
  205. opts['isolation_level'] = ISOLATION_LEVEL_SERIALIZABLE
  206. settings['OPTIONS'] = opts
  207. new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
  208. connections[DEFAULT_DB_ALIAS] = new_backend
  209. def tearDown(self):
  210. try:
  211. connections[DEFAULT_DB_ALIAS].abort()
  212. finally:
  213. connections[DEFAULT_DB_ALIAS].close()
  214. connections[DEFAULT_DB_ALIAS] = self._old_backend
  215. def test_initial_autocommit_state(self):
  216. # Autocommit is activated when the connection is created.
  217. connection.cursor().close()
  218. self.assertTrue(connection.autocommit)
  219. def test_transaction_management(self):
  220. transaction.enter_transaction_management()
  221. self.assertFalse(connection.autocommit)
  222. self.assertEqual(connection.isolation_level, self._serializable)
  223. transaction.leave_transaction_management()
  224. self.assertTrue(connection.autocommit)
  225. def test_transaction_stacking(self):
  226. transaction.enter_transaction_management()
  227. self.assertFalse(connection.autocommit)
  228. self.assertEqual(connection.isolation_level, self._serializable)
  229. transaction.enter_transaction_management()
  230. self.assertFalse(connection.autocommit)
  231. self.assertEqual(connection.isolation_level, self._serializable)
  232. transaction.leave_transaction_management()
  233. self.assertFalse(connection.autocommit)
  234. self.assertEqual(connection.isolation_level, self._serializable)
  235. transaction.leave_transaction_management()
  236. self.assertTrue(connection.autocommit)
  237. def test_enter_autocommit(self):
  238. transaction.enter_transaction_management()
  239. self.assertFalse(connection.autocommit)
  240. self.assertEqual(connection.isolation_level, self._serializable)
  241. list(Mod.objects.all())
  242. self.assertTrue(transaction.is_dirty())
  243. # Enter autocommit mode again.
  244. transaction.enter_transaction_management(False)
  245. self.assertFalse(transaction.is_dirty())
  246. self.assertEqual(
  247. connection.connection.get_transaction_status(),
  248. self._idle)
  249. list(Mod.objects.all())
  250. self.assertFalse(transaction.is_dirty())
  251. transaction.leave_transaction_management()
  252. self.assertFalse(connection.autocommit)
  253. self.assertEqual(connection.isolation_level, self._serializable)
  254. transaction.leave_transaction_management()
  255. self.assertTrue(connection.autocommit)
  256. class TestManyToManyAddTransaction(TransactionTestCase):
  257. def test_manyrelated_add_commit(self):
  258. "Test for https://code.djangoproject.com/ticket/16818"
  259. a = M2mA.objects.create()
  260. b = M2mB.objects.create(fld=10)
  261. a.others.add(b)
  262. # We're in a TransactionTestCase and have not changed transaction
  263. # behavior from default of "autocommit", so this rollback should not
  264. # actually do anything. If it does in fact undo our add, that's a bug
  265. # that the bulk insert was not auto-committed.
  266. transaction.rollback()
  267. self.assertEqual(a.others.count(), 1)
  268. class SavepointTest(TransactionTestCase):
  269. @skipIf(connection.vendor == 'sqlite',
  270. "SQLite doesn't support savepoints in managed mode")
  271. @skipUnlessDBFeature('uses_savepoints')
  272. def test_savepoint_commit(self):
  273. @commit_manually
  274. def work():
  275. mod = Mod.objects.create(fld=1)
  276. pk = mod.pk
  277. sid = transaction.savepoint()
  278. Mod.objects.filter(pk=pk).update(fld=10)
  279. transaction.savepoint_commit(sid)
  280. mod2 = Mod.objects.get(pk=pk)
  281. transaction.commit()
  282. self.assertEqual(mod2.fld, 10)
  283. work()
  284. @skipIf(connection.vendor == 'sqlite',
  285. "SQLite doesn't support savepoints in managed mode")
  286. @skipIf(connection.vendor == 'mysql' and
  287. connection.features._mysql_storage_engine == 'MyISAM',
  288. "MyISAM MySQL storage engine doesn't support savepoints")
  289. @skipUnlessDBFeature('uses_savepoints')
  290. def test_savepoint_rollback(self):
  291. @commit_manually
  292. def work():
  293. mod = Mod.objects.create(fld=1)
  294. pk = mod.pk
  295. sid = transaction.savepoint()
  296. Mod.objects.filter(pk=pk).update(fld=20)
  297. transaction.savepoint_rollback(sid)
  298. mod2 = Mod.objects.get(pk=pk)
  299. transaction.commit()
  300. self.assertEqual(mod2.fld, 1)
  301. work()