|
@@ -8,11 +8,13 @@ from decimal import Decimal
|
|
|
import re
|
|
|
import threading
|
|
|
import unittest
|
|
|
+import warnings
|
|
|
|
|
|
from django.conf import settings
|
|
|
from django.core.management.color import no_style
|
|
|
from django.db import (connection, connections, DEFAULT_DB_ALIAS,
|
|
|
- DatabaseError, IntegrityError, transaction)
|
|
|
+ DatabaseError, IntegrityError, reset_queries, transaction)
|
|
|
+from django.db.backends import BaseDatabaseWrapper
|
|
|
from django.db.backends.signals import connection_created
|
|
|
from django.db.backends.postgresql_psycopg2 import version as pg_version
|
|
|
from django.db.backends.utils import format_number, CursorWrapper
|
|
@@ -630,7 +632,7 @@ class BackendTestCase(TestCase):
|
|
|
# to use ProgrammingError).
|
|
|
with self.assertRaises(connection.features.closed_cursor_error_class):
|
|
|
# cursor should be closed, so no queries should be possible.
|
|
|
- cursor.execute("select 1")
|
|
|
+ cursor.execute("SELECT 1" + connection.features.bare_select_suffix)
|
|
|
|
|
|
@unittest.skipUnless(connection.vendor == 'postgresql',
|
|
|
"Psycopg2 specific cursor.closed attribute needed")
|
|
@@ -666,6 +668,62 @@ class BackendTestCase(TestCase):
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
+ @override_settings(DEBUG=True)
|
|
|
+ def test_queries(self):
|
|
|
+ """
|
|
|
+ Test the documented API of connection.queries.
|
|
|
+ """
|
|
|
+ reset_queries()
|
|
|
+
|
|
|
+ with connection.cursor() as cursor:
|
|
|
+ cursor.execute("SELECT 1" + connection.features.bare_select_suffix)
|
|
|
+ self.assertEqual(1, len(connection.queries))
|
|
|
+
|
|
|
+ self.assertIsInstance(connection.queries, list)
|
|
|
+ self.assertIsInstance(connection.queries[0], dict)
|
|
|
+ self.assertItemsEqual(connection.queries[0].keys(), ['sql', 'time'])
|
|
|
+
|
|
|
+ reset_queries()
|
|
|
+ self.assertEqual(0, len(connection.queries))
|
|
|
+
|
|
|
+
|
|
|
+ # Unfortunately with sqlite3 the in-memory test database cannot be closed.
|
|
|
+ @skipUnlessDBFeature('test_db_allows_multiple_connections')
|
|
|
+ @override_settings(DEBUG=True)
|
|
|
+ def test_queries_limit(self):
|
|
|
+ """
|
|
|
+ Test that the backend doesn't store an unlimited number of queries.
|
|
|
+
|
|
|
+ Regression for #12581.
|
|
|
+ """
|
|
|
+ old_queries_limit = BaseDatabaseWrapper.queries_limit
|
|
|
+ BaseDatabaseWrapper.queries_limit = 3
|
|
|
+ new_connections = ConnectionHandler(settings.DATABASES)
|
|
|
+ new_connection = new_connections[DEFAULT_DB_ALIAS]
|
|
|
+
|
|
|
+ try:
|
|
|
+ with new_connection.cursor() as cursor:
|
|
|
+ cursor.execute("SELECT 1" + new_connection.features.bare_select_suffix)
|
|
|
+ cursor.execute("SELECT 2" + new_connection.features.bare_select_suffix)
|
|
|
+
|
|
|
+ with warnings.catch_warnings(record=True) as w:
|
|
|
+ self.assertEqual(2, len(new_connection.queries))
|
|
|
+ self.assertEqual(0, len(w))
|
|
|
+
|
|
|
+ with new_connection.cursor() as cursor:
|
|
|
+ cursor.execute("SELECT 3" + new_connection.features.bare_select_suffix)
|
|
|
+ cursor.execute("SELECT 4" + new_connection.features.bare_select_suffix)
|
|
|
+
|
|
|
+ with warnings.catch_warnings(record=True) as w:
|
|
|
+ self.assertEqual(3, len(new_connection.queries))
|
|
|
+ self.assertEqual(1, len(w))
|
|
|
+ self.assertEqual(str(w[0].message), "Limit for query logging "
|
|
|
+ "exceeded, only the last 3 queries will be returned.")
|
|
|
+
|
|
|
+ finally:
|
|
|
+ BaseDatabaseWrapper.queries_limit = old_queries_limit
|
|
|
+ new_connection.close()
|
|
|
+
|
|
|
|
|
|
# We don't make these tests conditional because that means we would need to
|
|
|
# check and differentiate between:
|