tests.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. import copy
  2. import unittest
  3. from io import StringIO
  4. from unittest import mock
  5. from django.core.exceptions import ImproperlyConfigured
  6. from django.db import (
  7. DEFAULT_DB_ALIAS,
  8. DatabaseError,
  9. NotSupportedError,
  10. ProgrammingError,
  11. connection,
  12. connections,
  13. )
  14. from django.db.backends.base.base import BaseDatabaseWrapper
  15. from django.test import TestCase, override_settings
  16. try:
  17. from django.db.backends.postgresql.psycopg_any import errors, is_psycopg3
  18. except ImportError:
  19. is_psycopg3 = False
  20. def no_pool_connection(alias=None):
  21. new_connection = connection.copy(alias)
  22. new_connection.settings_dict = copy.deepcopy(connection.settings_dict)
  23. # Ensure that the second connection circumvents the pool, this is kind
  24. # of a hack, but we cannot easily change the pool connections.
  25. new_connection.settings_dict["OPTIONS"]["pool"] = False
  26. return new_connection
  27. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests")
  28. class Tests(TestCase):
  29. databases = {"default", "other"}
  30. def test_nodb_cursor(self):
  31. """
  32. The _nodb_cursor() fallbacks to the default connection database when
  33. access to the 'postgres' database is not granted.
  34. """
  35. orig_connect = BaseDatabaseWrapper.connect
  36. def mocked_connect(self):
  37. if self.settings_dict["NAME"] is None:
  38. raise DatabaseError()
  39. return orig_connect(self)
  40. with connection._nodb_cursor() as cursor:
  41. self.assertIs(cursor.closed, False)
  42. self.assertIsNotNone(cursor.db.connection)
  43. self.assertIsNone(cursor.db.settings_dict["NAME"])
  44. self.assertIs(cursor.closed, True)
  45. self.assertIsNone(cursor.db.connection)
  46. # Now assume the 'postgres' db isn't available
  47. msg = (
  48. "Normally Django will use a connection to the 'postgres' database "
  49. "to avoid running initialization queries against the production "
  50. "database when it's not needed (for example, when running tests). "
  51. "Django was unable to create a connection to the 'postgres' "
  52. "database and will use the first PostgreSQL database instead."
  53. )
  54. with self.assertWarnsMessage(RuntimeWarning, msg):
  55. with mock.patch(
  56. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  57. side_effect=mocked_connect,
  58. autospec=True,
  59. ):
  60. with mock.patch.object(
  61. connection,
  62. "settings_dict",
  63. {**connection.settings_dict, "NAME": "postgres"},
  64. ):
  65. with connection._nodb_cursor() as cursor:
  66. self.assertIs(cursor.closed, False)
  67. self.assertIsNotNone(cursor.db.connection)
  68. self.assertIs(cursor.closed, True)
  69. self.assertIsNone(cursor.db.connection)
  70. self.assertIsNotNone(cursor.db.settings_dict["NAME"])
  71. self.assertEqual(
  72. cursor.db.settings_dict["NAME"], connections["other"].settings_dict["NAME"]
  73. )
  74. # Cursor is yielded only for the first PostgreSQL database.
  75. with self.assertWarnsMessage(RuntimeWarning, msg):
  76. with mock.patch(
  77. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  78. side_effect=mocked_connect,
  79. autospec=True,
  80. ):
  81. with connection._nodb_cursor() as cursor:
  82. self.assertIs(cursor.closed, False)
  83. self.assertIsNotNone(cursor.db.connection)
  84. def test_nodb_cursor_raises_postgres_authentication_failure(self):
  85. """
  86. _nodb_cursor() re-raises authentication failure to the 'postgres' db
  87. when other connection to the PostgreSQL database isn't available.
  88. """
  89. def mocked_connect(self):
  90. raise DatabaseError()
  91. def mocked_all(self):
  92. test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
  93. test_connection.settings_dict = copy.deepcopy(connection.settings_dict)
  94. test_connection.settings_dict["NAME"] = "postgres"
  95. return [test_connection]
  96. msg = (
  97. "Normally Django will use a connection to the 'postgres' database "
  98. "to avoid running initialization queries against the production "
  99. "database when it's not needed (for example, when running tests). "
  100. "Django was unable to create a connection to the 'postgres' "
  101. "database and will use the first PostgreSQL database instead."
  102. )
  103. with self.assertWarnsMessage(RuntimeWarning, msg):
  104. mocker_connections_all = mock.patch(
  105. "django.utils.connection.BaseConnectionHandler.all",
  106. side_effect=mocked_all,
  107. autospec=True,
  108. )
  109. mocker_connect = mock.patch(
  110. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  111. side_effect=mocked_connect,
  112. autospec=True,
  113. )
  114. with mocker_connections_all, mocker_connect:
  115. with self.assertRaises(DatabaseError):
  116. with connection._nodb_cursor():
  117. pass
  118. def test_nodb_cursor_reraise_exceptions(self):
  119. with self.assertRaisesMessage(DatabaseError, "exception"):
  120. with connection._nodb_cursor():
  121. raise DatabaseError("exception")
  122. def test_database_name_too_long(self):
  123. from django.db.backends.postgresql.base import DatabaseWrapper
  124. settings = connection.settings_dict.copy()
  125. max_name_length = connection.ops.max_name_length()
  126. settings["NAME"] = "a" + (max_name_length * "a")
  127. msg = (
  128. "The database name '%s' (%d characters) is longer than "
  129. "PostgreSQL's limit of %s characters. Supply a shorter NAME in "
  130. "settings.DATABASES."
  131. ) % (settings["NAME"], max_name_length + 1, max_name_length)
  132. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  133. DatabaseWrapper(settings).get_connection_params()
  134. def test_database_name_empty(self):
  135. from django.db.backends.postgresql.base import DatabaseWrapper
  136. settings = connection.settings_dict.copy()
  137. settings["NAME"] = ""
  138. msg = (
  139. "settings.DATABASES is improperly configured. Please supply the "
  140. "NAME or OPTIONS['service'] value."
  141. )
  142. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  143. DatabaseWrapper(settings).get_connection_params()
  144. def test_service_name(self):
  145. from django.db.backends.postgresql.base import DatabaseWrapper
  146. settings = connection.settings_dict.copy()
  147. settings["OPTIONS"] = {"service": "my_service"}
  148. settings["NAME"] = ""
  149. params = DatabaseWrapper(settings).get_connection_params()
  150. self.assertEqual(params["service"], "my_service")
  151. self.assertNotIn("database", params)
  152. def test_service_name_default_db(self):
  153. # None is used to connect to the default 'postgres' db.
  154. from django.db.backends.postgresql.base import DatabaseWrapper
  155. settings = connection.settings_dict.copy()
  156. settings["NAME"] = None
  157. settings["OPTIONS"] = {"service": "django_test"}
  158. params = DatabaseWrapper(settings).get_connection_params()
  159. self.assertEqual(params["dbname"], "postgres")
  160. self.assertNotIn("service", params)
  161. def test_connect_and_rollback(self):
  162. """
  163. PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
  164. transaction is rolled back (#17062).
  165. """
  166. new_connection = no_pool_connection()
  167. try:
  168. # Ensure the database default time zone is different than
  169. # the time zone in new_connection.settings_dict. We can
  170. # get the default time zone by reset & show.
  171. with new_connection.cursor() as cursor:
  172. cursor.execute("RESET TIMEZONE")
  173. cursor.execute("SHOW TIMEZONE")
  174. db_default_tz = cursor.fetchone()[0]
  175. new_tz = "Europe/Paris" if db_default_tz == "UTC" else "UTC"
  176. new_connection.close()
  177. # Invalidate timezone name cache, because the setting_changed
  178. # handler cannot know about new_connection.
  179. del new_connection.timezone_name
  180. # Fetch a new connection with the new_tz as default
  181. # time zone, run a query and rollback.
  182. with self.settings(TIME_ZONE=new_tz):
  183. new_connection.set_autocommit(False)
  184. new_connection.rollback()
  185. # Now let's see if the rollback rolled back the SET TIME ZONE.
  186. with new_connection.cursor() as cursor:
  187. cursor.execute("SHOW TIMEZONE")
  188. tz = cursor.fetchone()[0]
  189. self.assertEqual(new_tz, tz)
  190. finally:
  191. new_connection.close()
  192. def test_connect_non_autocommit(self):
  193. """
  194. The connection wrapper shouldn't believe that autocommit is enabled
  195. after setting the time zone when AUTOCOMMIT is False (#21452).
  196. """
  197. new_connection = no_pool_connection()
  198. new_connection.settings_dict["AUTOCOMMIT"] = False
  199. try:
  200. # Open a database connection.
  201. with new_connection.cursor():
  202. self.assertFalse(new_connection.get_autocommit())
  203. finally:
  204. new_connection.close()
  205. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  206. def test_connect_pool(self):
  207. from psycopg_pool import PoolTimeout
  208. new_connection = no_pool_connection(alias="default_pool")
  209. new_connection.settings_dict["OPTIONS"]["pool"] = {
  210. "min_size": 0,
  211. "max_size": 2,
  212. "timeout": 5,
  213. }
  214. self.assertIsNotNone(new_connection.pool)
  215. connections = []
  216. def get_connection():
  217. # copy() reuses the existing alias and as such the same pool.
  218. conn = new_connection.copy()
  219. conn.connect()
  220. connections.append(conn)
  221. return conn
  222. try:
  223. connection_1 = get_connection() # First connection.
  224. connection_1_backend_pid = connection_1.connection.info.backend_pid
  225. get_connection() # Get the second connection.
  226. with self.assertRaises(PoolTimeout):
  227. # The pool has a maximum of 2 connections.
  228. get_connection()
  229. connection_1.close() # Release back to the pool.
  230. connection_3 = get_connection()
  231. # Reuses the first connection as it is available.
  232. self.assertEqual(
  233. connection_3.connection.info.backend_pid, connection_1_backend_pid
  234. )
  235. finally:
  236. # Release all connections back to the pool.
  237. for conn in connections:
  238. conn.close()
  239. new_connection.close_pool()
  240. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  241. def test_connect_pool_set_to_true(self):
  242. new_connection = no_pool_connection(alias="default_pool")
  243. new_connection.settings_dict["OPTIONS"]["pool"] = True
  244. try:
  245. self.assertIsNotNone(new_connection.pool)
  246. finally:
  247. new_connection.close_pool()
  248. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  249. def test_connect_pool_with_timezone(self):
  250. new_time_zone = "Africa/Nairobi"
  251. new_connection = no_pool_connection(alias="default_pool")
  252. try:
  253. with new_connection.cursor() as cursor:
  254. cursor.execute("SHOW TIMEZONE")
  255. tz = cursor.fetchone()[0]
  256. self.assertNotEqual(new_time_zone, tz)
  257. finally:
  258. new_connection.close()
  259. del new_connection.timezone_name
  260. new_connection.settings_dict["OPTIONS"]["pool"] = True
  261. try:
  262. with self.settings(TIME_ZONE=new_time_zone):
  263. with new_connection.cursor() as cursor:
  264. cursor.execute("SHOW TIMEZONE")
  265. tz = cursor.fetchone()[0]
  266. self.assertEqual(new_time_zone, tz)
  267. finally:
  268. new_connection.close()
  269. new_connection.close_pool()
  270. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  271. def test_pooling_health_checks(self):
  272. new_connection = no_pool_connection(alias="default_pool")
  273. new_connection.settings_dict["OPTIONS"]["pool"] = True
  274. new_connection.settings_dict["CONN_HEALTH_CHECKS"] = False
  275. try:
  276. self.assertIsNone(new_connection.pool._check)
  277. finally:
  278. new_connection.close_pool()
  279. new_connection.settings_dict["CONN_HEALTH_CHECKS"] = True
  280. try:
  281. self.assertIsNotNone(new_connection.pool._check)
  282. finally:
  283. new_connection.close_pool()
  284. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  285. def test_cannot_open_new_connection_in_atomic_block(self):
  286. new_connection = no_pool_connection(alias="default_pool")
  287. new_connection.settings_dict["OPTIONS"]["pool"] = True
  288. msg = "Cannot open a new connection in an atomic block."
  289. new_connection.in_atomic_block = True
  290. new_connection.closed_in_transaction = True
  291. with self.assertRaisesMessage(ProgrammingError, msg):
  292. new_connection.ensure_connection()
  293. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  294. def test_pooling_not_support_persistent_connections(self):
  295. new_connection = no_pool_connection(alias="default_pool")
  296. new_connection.settings_dict["OPTIONS"]["pool"] = True
  297. new_connection.settings_dict["CONN_MAX_AGE"] = 10
  298. msg = "Pooling doesn't support persistent connections."
  299. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  300. new_connection.pool
  301. @unittest.skipIf(is_psycopg3, "psycopg2 specific test")
  302. def test_connect_pool_setting_ignored_for_psycopg2(self):
  303. new_connection = no_pool_connection()
  304. new_connection.settings_dict["OPTIONS"]["pool"] = True
  305. msg = "Database pooling requires psycopg >= 3"
  306. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  307. new_connection.connect()
  308. def test_connect_isolation_level(self):
  309. """
  310. The transaction level can be configured with
  311. DATABASES ['OPTIONS']['isolation_level'].
  312. """
  313. from django.db.backends.postgresql.psycopg_any import IsolationLevel
  314. # Since this is a django.test.TestCase, a transaction is in progress
  315. # and the isolation level isn't reported as 0. This test assumes that
  316. # PostgreSQL is configured with the default isolation level.
  317. # Check the level on the psycopg connection, not the Django wrapper.
  318. self.assertIsNone(connection.connection.isolation_level)
  319. new_connection = no_pool_connection()
  320. new_connection.settings_dict["OPTIONS"][
  321. "isolation_level"
  322. ] = IsolationLevel.SERIALIZABLE
  323. try:
  324. # Start a transaction so the isolation level isn't reported as 0.
  325. new_connection.set_autocommit(False)
  326. # Check the level on the psycopg connection, not the Django wrapper.
  327. self.assertEqual(
  328. new_connection.connection.isolation_level,
  329. IsolationLevel.SERIALIZABLE,
  330. )
  331. finally:
  332. new_connection.close()
  333. def test_connect_invalid_isolation_level(self):
  334. self.assertIsNone(connection.connection.isolation_level)
  335. new_connection = no_pool_connection()
  336. new_connection.settings_dict["OPTIONS"]["isolation_level"] = -1
  337. msg = (
  338. "Invalid transaction isolation level -1 specified. Use one of the "
  339. "psycopg.IsolationLevel values."
  340. )
  341. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  342. new_connection.ensure_connection()
  343. def test_connect_role(self):
  344. """
  345. The session role can be configured with DATABASES
  346. ["OPTIONS"]["assume_role"].
  347. """
  348. try:
  349. custom_role = "django_nonexistent_role"
  350. new_connection = no_pool_connection()
  351. new_connection.settings_dict["OPTIONS"]["assume_role"] = custom_role
  352. msg = f'role "{custom_role}" does not exist'
  353. with self.assertRaisesMessage(errors.InvalidParameterValue, msg):
  354. new_connection.connect()
  355. finally:
  356. new_connection.close()
  357. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  358. def test_connect_server_side_binding(self):
  359. """
  360. The server-side parameters binding role can be enabled with DATABASES
  361. ["OPTIONS"]["server_side_binding"].
  362. """
  363. from django.db.backends.postgresql.base import ServerBindingCursor
  364. new_connection = no_pool_connection()
  365. new_connection.settings_dict["OPTIONS"]["server_side_binding"] = True
  366. try:
  367. new_connection.connect()
  368. self.assertEqual(
  369. new_connection.connection.cursor_factory,
  370. ServerBindingCursor,
  371. )
  372. finally:
  373. new_connection.close()
  374. def test_connect_custom_cursor_factory(self):
  375. """
  376. A custom cursor factory can be configured with DATABASES["options"]
  377. ["cursor_factory"].
  378. """
  379. from django.db.backends.postgresql.base import Cursor
  380. class MyCursor(Cursor):
  381. pass
  382. new_connection = no_pool_connection()
  383. new_connection.settings_dict["OPTIONS"]["cursor_factory"] = MyCursor
  384. try:
  385. new_connection.connect()
  386. self.assertEqual(new_connection.connection.cursor_factory, MyCursor)
  387. finally:
  388. new_connection.close()
  389. def test_connect_no_is_usable_checks(self):
  390. new_connection = no_pool_connection()
  391. try:
  392. with mock.patch.object(new_connection, "is_usable") as is_usable:
  393. new_connection.connect()
  394. is_usable.assert_not_called()
  395. finally:
  396. new_connection.close()
  397. def test_client_encoding_utf8_enforce(self):
  398. new_connection = no_pool_connection()
  399. new_connection.settings_dict["OPTIONS"]["client_encoding"] = "iso-8859-2"
  400. try:
  401. new_connection.connect()
  402. if is_psycopg3:
  403. self.assertEqual(new_connection.connection.info.encoding, "utf-8")
  404. else:
  405. self.assertEqual(new_connection.connection.encoding, "UTF8")
  406. finally:
  407. new_connection.close()
  408. def _select(self, val):
  409. with connection.cursor() as cursor:
  410. cursor.execute("SELECT %s::text[]", (val,))
  411. return cursor.fetchone()[0]
  412. def test_select_ascii_array(self):
  413. a = ["awef"]
  414. b = self._select(a)
  415. self.assertEqual(a[0], b[0])
  416. def test_select_unicode_array(self):
  417. a = ["ᄲawef"]
  418. b = self._select(a)
  419. self.assertEqual(a[0], b[0])
  420. def test_lookup_cast(self):
  421. from django.db.backends.postgresql.operations import DatabaseOperations
  422. do = DatabaseOperations(connection=None)
  423. lookups = (
  424. "iexact",
  425. "contains",
  426. "icontains",
  427. "startswith",
  428. "istartswith",
  429. "endswith",
  430. "iendswith",
  431. "regex",
  432. "iregex",
  433. )
  434. for lookup in lookups:
  435. with self.subTest(lookup=lookup):
  436. self.assertIn("::text", do.lookup_cast(lookup))
  437. def test_lookup_cast_isnull_noop(self):
  438. from django.db.backends.postgresql.operations import DatabaseOperations
  439. do = DatabaseOperations(connection=None)
  440. # Using __isnull lookup doesn't require casting.
  441. tests = [
  442. "CharField",
  443. "EmailField",
  444. "TextField",
  445. ]
  446. for field_type in tests:
  447. with self.subTest(field_type=field_type):
  448. self.assertEqual(do.lookup_cast("isnull", field_type), "%s")
  449. def test_correct_extraction_psycopg_version(self):
  450. from django.db.backends.postgresql.base import Database, psycopg_version
  451. with mock.patch.object(Database, "__version__", "4.2.1 (dt dec pq3 ext lo64)"):
  452. self.assertEqual(psycopg_version(), (4, 2, 1))
  453. with mock.patch.object(
  454. Database, "__version__", "4.2b0.dev1 (dt dec pq3 ext lo64)"
  455. ):
  456. self.assertEqual(psycopg_version(), (4, 2))
  457. @override_settings(DEBUG=True)
  458. @unittest.skipIf(is_psycopg3, "psycopg2 specific test")
  459. def test_copy_to_expert_cursors(self):
  460. out = StringIO()
  461. copy_expert_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)"
  462. with connection.cursor() as cursor:
  463. cursor.copy_expert(copy_expert_sql, out)
  464. cursor.copy_to(out, "django_session")
  465. self.assertEqual(
  466. [q["sql"] for q in connection.queries],
  467. [copy_expert_sql, "COPY django_session TO STDOUT"],
  468. )
  469. @override_settings(DEBUG=True)
  470. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  471. def test_copy_cursors(self):
  472. copy_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)"
  473. with connection.cursor() as cursor:
  474. with cursor.copy(copy_sql) as copy:
  475. for row in copy:
  476. pass
  477. self.assertEqual([q["sql"] for q in connection.queries], [copy_sql])
  478. def test_get_database_version(self):
  479. new_connection = no_pool_connection()
  480. new_connection.pg_version = 140009
  481. self.assertEqual(new_connection.get_database_version(), (14, 9))
  482. @mock.patch.object(connection, "get_database_version", return_value=(13,))
  483. def test_check_database_version_supported(self, mocked_get_database_version):
  484. msg = "PostgreSQL 14 or later is required (found 13)."
  485. with self.assertRaisesMessage(NotSupportedError, msg):
  486. connection.check_database_version_supported()
  487. self.assertTrue(mocked_get_database_version.called)
  488. def test_compose_sql_when_no_connection(self):
  489. new_connection = no_pool_connection()
  490. try:
  491. self.assertEqual(
  492. new_connection.ops.compose_sql("SELECT %s", ["test"]),
  493. "SELECT 'test'",
  494. )
  495. finally:
  496. new_connection.close()