|
@@ -3,9 +3,11 @@ import re
|
|
|
import tempfile
|
|
|
import threading
|
|
|
import unittest
|
|
|
+from contextlib import contextmanager
|
|
|
from pathlib import Path
|
|
|
from unittest import mock
|
|
|
|
|
|
+from django.core.exceptions import ImproperlyConfigured
|
|
|
from django.db import (
|
|
|
DEFAULT_DB_ALIAS,
|
|
|
NotSupportedError,
|
|
@@ -15,8 +17,8 @@ from django.db import (
|
|
|
)
|
|
|
from django.db.models import Aggregate, Avg, StdDev, Sum, Variance
|
|
|
from django.db.utils import ConnectionHandler
|
|
|
-from django.test import TestCase, TransactionTestCase, override_settings
|
|
|
-from django.test.utils import isolate_apps
|
|
|
+from django.test import SimpleTestCase, TestCase, TransactionTestCase, override_settings
|
|
|
+from django.test.utils import CaptureQueriesContext, isolate_apps
|
|
|
|
|
|
from ..models import Item, Object, Square
|
|
|
|
|
@@ -245,3 +247,55 @@ class ThreadSharing(TransactionTestCase):
|
|
|
for conn in thread_connections:
|
|
|
if conn is not main_connection:
|
|
|
conn.close()
|
|
|
+
|
|
|
+
|
|
|
+@unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
|
|
|
+class TestTransactionMode(SimpleTestCase):
|
|
|
+ databases = {"default"}
|
|
|
+
|
|
|
+ def test_default_transaction_mode(self):
|
|
|
+ with CaptureQueriesContext(connection) as captured_queries:
|
|
|
+ with transaction.atomic():
|
|
|
+ pass
|
|
|
+
|
|
|
+ begin_query, commit_query = captured_queries
|
|
|
+ self.assertEqual(begin_query["sql"], "BEGIN")
|
|
|
+ self.assertEqual(commit_query["sql"], "COMMIT")
|
|
|
+
|
|
|
+ def test_invalid_transaction_mode(self):
|
|
|
+ msg = (
|
|
|
+ "settings.DATABASES['default']['OPTIONS']['transaction_mode'] is "
|
|
|
+ "improperly configured to 'invalid'. Use one of 'DEFERRED', 'EXCLUSIVE', "
|
|
|
+ "'IMMEDIATE', or None."
|
|
|
+ )
|
|
|
+ with self.change_transaction_mode("invalid") as new_connection:
|
|
|
+ with self.assertRaisesMessage(ImproperlyConfigured, msg):
|
|
|
+ new_connection.ensure_connection()
|
|
|
+
|
|
|
+ def test_valid_transaction_modes(self):
|
|
|
+ valid_transaction_modes = ("deferred", "immediate", "exclusive")
|
|
|
+ for transaction_mode in valid_transaction_modes:
|
|
|
+ with (
|
|
|
+ self.subTest(transaction_mode=transaction_mode),
|
|
|
+ self.change_transaction_mode(transaction_mode) as new_connection,
|
|
|
+ CaptureQueriesContext(new_connection) as captured_queries,
|
|
|
+ ):
|
|
|
+ new_connection.set_autocommit(
|
|
|
+ False, force_begin_transaction_with_broken_autocommit=True
|
|
|
+ )
|
|
|
+ new_connection.commit()
|
|
|
+ expected_transaction_mode = transaction_mode.upper()
|
|
|
+ begin_sql = captured_queries[0]["sql"]
|
|
|
+ self.assertEqual(begin_sql, f"BEGIN {expected_transaction_mode}")
|
|
|
+
|
|
|
+ @contextmanager
|
|
|
+ def change_transaction_mode(self, transaction_mode):
|
|
|
+ new_connection = connection.copy()
|
|
|
+ new_connection.settings_dict["OPTIONS"] = {
|
|
|
+ **new_connection.settings_dict["OPTIONS"],
|
|
|
+ "transaction_mode": transaction_mode,
|
|
|
+ }
|
|
|
+ try:
|
|
|
+ yield new_connection
|
|
|
+ finally:
|
|
|
+ new_connection.close()
|