123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- import os
- import re
- import tempfile
- import threading
- import unittest
- from contextlib import contextmanager
- from pathlib import Path
- from unittest import mock
- from django.core.exceptions import ImproperlyConfigured
- from django.db import (
- DEFAULT_DB_ALIAS,
- NotSupportedError,
- connection,
- connections,
- transaction,
- )
- from django.db.models import Aggregate, Avg, StdDev, Sum, Variance
- from django.db.utils import ConnectionHandler
- from django.test import SimpleTestCase, TestCase, TransactionTestCase, override_settings
- from django.test.utils import CaptureQueriesContext, isolate_apps
- from ..models import Item, Object, Square
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- class Tests(TestCase):
- longMessage = True
- def test_aggregation(self):
- """Raise NotSupportedError when aggregating on date/time fields."""
- for aggregate in (Sum, Avg, Variance, StdDev):
- with self.assertRaises(NotSupportedError):
- Item.objects.aggregate(aggregate("time"))
- with self.assertRaises(NotSupportedError):
- Item.objects.aggregate(aggregate("date"))
- with self.assertRaises(NotSupportedError):
- Item.objects.aggregate(aggregate("last_modified"))
- with self.assertRaises(NotSupportedError):
- Item.objects.aggregate(
- **{
- "complex": aggregate("last_modified")
- + aggregate("last_modified")
- }
- )
- def test_distinct_aggregation(self):
- class DistinctAggregate(Aggregate):
- allow_distinct = True
- aggregate = DistinctAggregate("first", "second", distinct=True)
- msg = (
- "SQLite doesn't support DISTINCT on aggregate functions accepting "
- "multiple arguments."
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- connection.ops.check_expression_support(aggregate)
- def test_distinct_aggregation_multiple_args_no_distinct(self):
- # Aggregate functions accept multiple arguments when DISTINCT isn't
- # used, e.g. GROUP_CONCAT().
- class DistinctAggregate(Aggregate):
- allow_distinct = True
- aggregate = DistinctAggregate("first", "second", distinct=False)
- connection.ops.check_expression_support(aggregate)
- def test_memory_db_test_name(self):
- """A named in-memory db should be allowed where supported."""
- from django.db.backends.sqlite3.base import DatabaseWrapper
- settings_dict = {
- "TEST": {
- "NAME": "file:memorydb_test?mode=memory&cache=shared",
- }
- }
- creation = DatabaseWrapper(settings_dict).creation
- self.assertEqual(
- creation._get_test_db_name(),
- creation.connection.settings_dict["TEST"]["NAME"],
- )
- def test_regexp_function(self):
- tests = (
- ("test", r"[0-9]+", False),
- ("test", r"[a-z]+", True),
- ("test", None, None),
- (None, r"[a-z]+", None),
- (None, None, None),
- )
- for string, pattern, expected in tests:
- with self.subTest((string, pattern)):
- with connection.cursor() as cursor:
- cursor.execute("SELECT %s REGEXP %s", [string, pattern])
- value = cursor.fetchone()[0]
- value = bool(value) if value in {0, 1} else value
- self.assertIs(value, expected)
- def test_pathlib_name(self):
- with tempfile.TemporaryDirectory() as tmp:
- settings_dict = {
- "default": {
- "ENGINE": "django.db.backends.sqlite3",
- "NAME": Path(tmp) / "test.db",
- },
- }
- connections = ConnectionHandler(settings_dict)
- connections["default"].ensure_connection()
- connections["default"].close()
- self.assertTrue(os.path.isfile(os.path.join(tmp, "test.db")))
- @mock.patch.object(connection, "get_database_version", return_value=(3, 26))
- def test_check_database_version_supported(self, mocked_get_database_version):
- msg = "SQLite 3.27 or later is required (found 3.26)."
- with self.assertRaisesMessage(NotSupportedError, msg):
- connection.check_database_version_supported()
- self.assertTrue(mocked_get_database_version.called)
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- @isolate_apps("backends")
- class SchemaTests(TransactionTestCase):
- available_apps = ["backends"]
- def test_autoincrement(self):
- """
- auto_increment fields are created with the AUTOINCREMENT keyword
- in order to be monotonically increasing (#10164).
- """
- with connection.schema_editor(collect_sql=True) as editor:
- editor.create_model(Square)
- statements = editor.collected_sql
- match = re.search('"id" ([^,]+),', statements[0])
- self.assertIsNotNone(match)
- self.assertEqual(
- "integer NOT NULL PRIMARY KEY AUTOINCREMENT",
- match[1],
- "Wrong SQL used to create an auto-increment column on SQLite",
- )
- def test_disable_constraint_checking_failure_disallowed(self):
- """
- SQLite schema editor is not usable within an outer transaction if
- foreign key constraint checks are not disabled beforehand.
- """
- msg = (
- "SQLite schema editor cannot be used while foreign key "
- "constraint checks are enabled. Make sure to disable them "
- "before entering a transaction.atomic() context because "
- "SQLite does not support disabling them in the middle of "
- "a multi-statement transaction."
- )
- with self.assertRaisesMessage(NotSupportedError, msg):
- with transaction.atomic(), connection.schema_editor(atomic=True):
- pass
- def test_constraint_checks_disabled_atomic_allowed(self):
- """
- SQLite schema editor is usable within an outer transaction as long as
- foreign key constraints checks are disabled beforehand.
- """
- def constraint_checks_enabled():
- with connection.cursor() as cursor:
- return bool(cursor.execute("PRAGMA foreign_keys").fetchone()[0])
- with connection.constraint_checks_disabled(), transaction.atomic():
- with connection.schema_editor(atomic=True):
- self.assertFalse(constraint_checks_enabled())
- self.assertFalse(constraint_checks_enabled())
- self.assertTrue(constraint_checks_enabled())
- @unittest.skipUnless(connection.vendor == "sqlite", "Test only for SQLite")
- @override_settings(DEBUG=True)
- class LastExecutedQueryTest(TestCase):
- def test_no_interpolation(self):
- # This shouldn't raise an exception (#17158)
- query = "SELECT strftime('%Y', 'now');"
- with connection.cursor() as cursor:
- cursor.execute(query)
- self.assertEqual(connection.queries[-1]["sql"], query)
- def test_parameter_quoting(self):
- # The implementation of last_executed_queries isn't optimal. It's
- # worth testing that parameters are quoted (#14091).
- query = "SELECT %s"
- params = ["\"'\\"]
- with connection.cursor() as cursor:
- cursor.execute(query, params)
- # Note that the single quote is repeated
- substituted = "SELECT '\"''\\'"
- self.assertEqual(connection.queries[-1]["sql"], substituted)
- def test_large_number_of_parameters(self):
- # If SQLITE_MAX_VARIABLE_NUMBER (default = 999) has been changed to be
- # greater than SQLITE_MAX_COLUMN (default = 2000), last_executed_query
- # can hit the SQLITE_MAX_COLUMN limit (#26063).
- with connection.cursor() as cursor:
- sql = "SELECT MAX(%s)" % ", ".join(["%s"] * 2001)
- params = list(range(2001))
- # This should not raise an exception.
- cursor.db.ops.last_executed_query(cursor.cursor, sql, params)
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- class EscapingChecks(TestCase):
- """
- All tests in this test case are also run with settings.DEBUG=True in
- EscapingChecksDebug test case, to also test CursorDebugWrapper.
- """
- def test_parameter_escaping(self):
- # '%s' escaping support for sqlite3 (#13648).
- with connection.cursor() as cursor:
- cursor.execute("select strftime('%s', date('now'))")
- response = cursor.fetchall()[0][0]
- # response should be an non-zero integer
- self.assertTrue(int(response))
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- @override_settings(DEBUG=True)
- class EscapingChecksDebug(EscapingChecks):
- pass
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- class ThreadSharing(TransactionTestCase):
- available_apps = ["backends"]
- def test_database_sharing_in_threads(self):
- thread_connections = []
- def create_object():
- Object.objects.create()
- thread_connections.append(connections[DEFAULT_DB_ALIAS].connection)
- main_connection = connections[DEFAULT_DB_ALIAS].connection
- try:
- create_object()
- thread = threading.Thread(target=create_object)
- thread.start()
- thread.join()
- self.assertEqual(Object.objects.count(), 2)
- finally:
- for conn in thread_connections:
- if conn is not main_connection:
- conn.close()
- @unittest.skipUnless(connection.vendor == "sqlite", "SQLite tests")
- class TestTransactionMode(SimpleTestCase):
- databases = {"default"}
- def test_default_transaction_mode(self):
- with CaptureQueriesContext(connection) as captured_queries:
- with transaction.atomic():
- pass
- begin_query, commit_query = captured_queries
- self.assertEqual(begin_query["sql"], "BEGIN")
- self.assertEqual(commit_query["sql"], "COMMIT")
- def test_invalid_transaction_mode(self):
- msg = (
- "settings.DATABASES['default']['OPTIONS']['transaction_mode'] is "
- "improperly configured to 'invalid'. Use one of 'DEFERRED', 'EXCLUSIVE', "
- "'IMMEDIATE', or None."
- )
- with self.change_transaction_mode("invalid") as new_connection:
- with self.assertRaisesMessage(ImproperlyConfigured, msg):
- new_connection.ensure_connection()
- def test_valid_transaction_modes(self):
- valid_transaction_modes = ("deferred", "immediate", "exclusive")
- for transaction_mode in valid_transaction_modes:
- with (
- self.subTest(transaction_mode=transaction_mode),
- self.change_transaction_mode(transaction_mode) as new_connection,
- CaptureQueriesContext(new_connection) as captured_queries,
- ):
- new_connection.set_autocommit(
- False, force_begin_transaction_with_broken_autocommit=True
- )
- new_connection.commit()
- expected_transaction_mode = transaction_mode.upper()
- begin_sql = captured_queries[0]["sql"]
- self.assertEqual(begin_sql, f"BEGIN {expected_transaction_mode}")
- @contextmanager
- def change_transaction_mode(self, transaction_mode):
- new_connection = connection.copy()
- new_connection.settings_dict["OPTIONS"] = {
- **new_connection.settings_dict["OPTIONS"],
- "transaction_mode": transaction_mode,
- }
- try:
- yield new_connection
- finally:
- new_connection.close()
|