tests.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. import copy
  2. import unittest
  3. from io import StringIO
  4. from unittest import mock
  5. from django.core.exceptions import ImproperlyConfigured
  6. from django.db import (
  7. DEFAULT_DB_ALIAS,
  8. DatabaseError,
  9. NotSupportedError,
  10. connection,
  11. connections,
  12. )
  13. from django.db.backends.base.base import BaseDatabaseWrapper
  14. from django.test import TestCase, override_settings
  15. try:
  16. from django.db.backends.postgresql.psycopg_any import errors, is_psycopg3
  17. except ImportError:
  18. is_psycopg3 = False
  19. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests")
  20. class Tests(TestCase):
  21. databases = {"default", "other"}
  22. def test_nodb_cursor(self):
  23. """
  24. The _nodb_cursor() fallbacks to the default connection database when
  25. access to the 'postgres' database is not granted.
  26. """
  27. orig_connect = BaseDatabaseWrapper.connect
  28. def mocked_connect(self):
  29. if self.settings_dict["NAME"] is None:
  30. raise DatabaseError()
  31. return orig_connect(self)
  32. with connection._nodb_cursor() as cursor:
  33. self.assertIs(cursor.closed, False)
  34. self.assertIsNotNone(cursor.db.connection)
  35. self.assertIsNone(cursor.db.settings_dict["NAME"])
  36. self.assertIs(cursor.closed, True)
  37. self.assertIsNone(cursor.db.connection)
  38. # Now assume the 'postgres' db isn't available
  39. msg = (
  40. "Normally Django will use a connection to the 'postgres' database "
  41. "to avoid running initialization queries against the production "
  42. "database when it's not needed (for example, when running tests). "
  43. "Django was unable to create a connection to the 'postgres' "
  44. "database and will use the first PostgreSQL database instead."
  45. )
  46. with self.assertWarnsMessage(RuntimeWarning, msg):
  47. with mock.patch(
  48. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  49. side_effect=mocked_connect,
  50. autospec=True,
  51. ):
  52. with mock.patch.object(
  53. connection,
  54. "settings_dict",
  55. {**connection.settings_dict, "NAME": "postgres"},
  56. ):
  57. with connection._nodb_cursor() as cursor:
  58. self.assertIs(cursor.closed, False)
  59. self.assertIsNotNone(cursor.db.connection)
  60. self.assertIs(cursor.closed, True)
  61. self.assertIsNone(cursor.db.connection)
  62. self.assertIsNotNone(cursor.db.settings_dict["NAME"])
  63. self.assertEqual(
  64. cursor.db.settings_dict["NAME"], connections["other"].settings_dict["NAME"]
  65. )
  66. # Cursor is yielded only for the first PostgreSQL database.
  67. with self.assertWarnsMessage(RuntimeWarning, msg):
  68. with mock.patch(
  69. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  70. side_effect=mocked_connect,
  71. autospec=True,
  72. ):
  73. with connection._nodb_cursor() as cursor:
  74. self.assertIs(cursor.closed, False)
  75. self.assertIsNotNone(cursor.db.connection)
  76. def test_nodb_cursor_raises_postgres_authentication_failure(self):
  77. """
  78. _nodb_cursor() re-raises authentication failure to the 'postgres' db
  79. when other connection to the PostgreSQL database isn't available.
  80. """
  81. def mocked_connect(self):
  82. raise DatabaseError()
  83. def mocked_all(self):
  84. test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
  85. test_connection.settings_dict = copy.deepcopy(connection.settings_dict)
  86. test_connection.settings_dict["NAME"] = "postgres"
  87. return [test_connection]
  88. msg = (
  89. "Normally Django will use a connection to the 'postgres' database "
  90. "to avoid running initialization queries against the production "
  91. "database when it's not needed (for example, when running tests). "
  92. "Django was unable to create a connection to the 'postgres' "
  93. "database and will use the first PostgreSQL database instead."
  94. )
  95. with self.assertWarnsMessage(RuntimeWarning, msg):
  96. mocker_connections_all = mock.patch(
  97. "django.utils.connection.BaseConnectionHandler.all",
  98. side_effect=mocked_all,
  99. autospec=True,
  100. )
  101. mocker_connect = mock.patch(
  102. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  103. side_effect=mocked_connect,
  104. autospec=True,
  105. )
  106. with mocker_connections_all, mocker_connect:
  107. with self.assertRaises(DatabaseError):
  108. with connection._nodb_cursor():
  109. pass
  110. def test_nodb_cursor_reraise_exceptions(self):
  111. with self.assertRaisesMessage(DatabaseError, "exception"):
  112. with connection._nodb_cursor():
  113. raise DatabaseError("exception")
  114. def test_database_name_too_long(self):
  115. from django.db.backends.postgresql.base import DatabaseWrapper
  116. settings = connection.settings_dict.copy()
  117. max_name_length = connection.ops.max_name_length()
  118. settings["NAME"] = "a" + (max_name_length * "a")
  119. msg = (
  120. "The database name '%s' (%d characters) is longer than "
  121. "PostgreSQL's limit of %s characters. Supply a shorter NAME in "
  122. "settings.DATABASES."
  123. ) % (settings["NAME"], max_name_length + 1, max_name_length)
  124. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  125. DatabaseWrapper(settings).get_connection_params()
  126. def test_database_name_empty(self):
  127. from django.db.backends.postgresql.base import DatabaseWrapper
  128. settings = connection.settings_dict.copy()
  129. settings["NAME"] = ""
  130. msg = (
  131. "settings.DATABASES is improperly configured. Please supply the "
  132. "NAME or OPTIONS['service'] value."
  133. )
  134. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  135. DatabaseWrapper(settings).get_connection_params()
  136. def test_service_name(self):
  137. from django.db.backends.postgresql.base import DatabaseWrapper
  138. settings = connection.settings_dict.copy()
  139. settings["OPTIONS"] = {"service": "my_service"}
  140. settings["NAME"] = ""
  141. params = DatabaseWrapper(settings).get_connection_params()
  142. self.assertEqual(params["service"], "my_service")
  143. self.assertNotIn("database", params)
  144. def test_service_name_default_db(self):
  145. # None is used to connect to the default 'postgres' db.
  146. from django.db.backends.postgresql.base import DatabaseWrapper
  147. settings = connection.settings_dict.copy()
  148. settings["NAME"] = None
  149. settings["OPTIONS"] = {"service": "django_test"}
  150. params = DatabaseWrapper(settings).get_connection_params()
  151. self.assertEqual(params["dbname"], "postgres")
  152. self.assertNotIn("service", params)
  153. def test_connect_and_rollback(self):
  154. """
  155. PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
  156. transaction is rolled back (#17062).
  157. """
  158. new_connection = connection.copy()
  159. try:
  160. # Ensure the database default time zone is different than
  161. # the time zone in new_connection.settings_dict. We can
  162. # get the default time zone by reset & show.
  163. with new_connection.cursor() as cursor:
  164. cursor.execute("RESET TIMEZONE")
  165. cursor.execute("SHOW TIMEZONE")
  166. db_default_tz = cursor.fetchone()[0]
  167. new_tz = "Europe/Paris" if db_default_tz == "UTC" else "UTC"
  168. new_connection.close()
  169. # Invalidate timezone name cache, because the setting_changed
  170. # handler cannot know about new_connection.
  171. del new_connection.timezone_name
  172. # Fetch a new connection with the new_tz as default
  173. # time zone, run a query and rollback.
  174. with self.settings(TIME_ZONE=new_tz):
  175. new_connection.set_autocommit(False)
  176. new_connection.rollback()
  177. # Now let's see if the rollback rolled back the SET TIME ZONE.
  178. with new_connection.cursor() as cursor:
  179. cursor.execute("SHOW TIMEZONE")
  180. tz = cursor.fetchone()[0]
  181. self.assertEqual(new_tz, tz)
  182. finally:
  183. new_connection.close()
  184. def test_connect_non_autocommit(self):
  185. """
  186. The connection wrapper shouldn't believe that autocommit is enabled
  187. after setting the time zone when AUTOCOMMIT is False (#21452).
  188. """
  189. new_connection = connection.copy()
  190. new_connection.settings_dict["AUTOCOMMIT"] = False
  191. try:
  192. # Open a database connection.
  193. with new_connection.cursor():
  194. self.assertFalse(new_connection.get_autocommit())
  195. finally:
  196. new_connection.close()
  197. def test_connect_isolation_level(self):
  198. """
  199. The transaction level can be configured with
  200. DATABASES ['OPTIONS']['isolation_level'].
  201. """
  202. from django.db.backends.postgresql.psycopg_any import IsolationLevel
  203. # Since this is a django.test.TestCase, a transaction is in progress
  204. # and the isolation level isn't reported as 0. This test assumes that
  205. # PostgreSQL is configured with the default isolation level.
  206. # Check the level on the psycopg connection, not the Django wrapper.
  207. self.assertIsNone(connection.connection.isolation_level)
  208. new_connection = connection.copy()
  209. new_connection.settings_dict["OPTIONS"][
  210. "isolation_level"
  211. ] = IsolationLevel.SERIALIZABLE
  212. try:
  213. # Start a transaction so the isolation level isn't reported as 0.
  214. new_connection.set_autocommit(False)
  215. # Check the level on the psycopg connection, not the Django wrapper.
  216. self.assertEqual(
  217. new_connection.connection.isolation_level,
  218. IsolationLevel.SERIALIZABLE,
  219. )
  220. finally:
  221. new_connection.close()
  222. def test_connect_invalid_isolation_level(self):
  223. self.assertIsNone(connection.connection.isolation_level)
  224. new_connection = connection.copy()
  225. new_connection.settings_dict["OPTIONS"]["isolation_level"] = -1
  226. msg = (
  227. "Invalid transaction isolation level -1 specified. Use one of the "
  228. "psycopg.IsolationLevel values."
  229. )
  230. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  231. new_connection.ensure_connection()
  232. def test_connect_role(self):
  233. """
  234. The session role can be configured with DATABASES
  235. ["OPTIONS"]["assume_role"].
  236. """
  237. try:
  238. custom_role = "django_nonexistent_role"
  239. new_connection = connection.copy()
  240. new_connection.settings_dict["OPTIONS"]["assume_role"] = custom_role
  241. msg = f'role "{custom_role}" does not exist'
  242. with self.assertRaisesMessage(errors.InvalidParameterValue, msg):
  243. new_connection.connect()
  244. finally:
  245. new_connection.close()
  246. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  247. def test_connect_server_side_binding(self):
  248. """
  249. The server-side parameters binding role can be enabled with DATABASES
  250. ["OPTIONS"]["server_side_binding"].
  251. """
  252. from django.db.backends.postgresql.base import ServerBindingCursor
  253. new_connection = connection.copy()
  254. new_connection.settings_dict["OPTIONS"]["server_side_binding"] = True
  255. try:
  256. new_connection.connect()
  257. self.assertEqual(
  258. new_connection.connection.cursor_factory,
  259. ServerBindingCursor,
  260. )
  261. finally:
  262. new_connection.close()
  263. def test_connect_custom_cursor_factory(self):
  264. """
  265. A custom cursor factory can be configured with DATABASES["options"]
  266. ["cursor_factory"].
  267. """
  268. from django.db.backends.postgresql.base import Cursor
  269. class MyCursor(Cursor):
  270. pass
  271. new_connection = connection.copy()
  272. new_connection.settings_dict["OPTIONS"]["cursor_factory"] = MyCursor
  273. try:
  274. new_connection.connect()
  275. self.assertEqual(new_connection.connection.cursor_factory, MyCursor)
  276. finally:
  277. new_connection.close()
  278. def test_connect_no_is_usable_checks(self):
  279. new_connection = connection.copy()
  280. try:
  281. with mock.patch.object(new_connection, "is_usable") as is_usable:
  282. new_connection.connect()
  283. is_usable.assert_not_called()
  284. finally:
  285. new_connection.close()
  286. def test_client_encoding_utf8_enforce(self):
  287. new_connection = connection.copy()
  288. new_connection.settings_dict["OPTIONS"]["client_encoding"] = "iso-8859-2"
  289. try:
  290. new_connection.connect()
  291. if is_psycopg3:
  292. self.assertEqual(new_connection.connection.info.encoding, "utf-8")
  293. else:
  294. self.assertEqual(new_connection.connection.encoding, "UTF8")
  295. finally:
  296. new_connection.close()
  297. def _select(self, val):
  298. with connection.cursor() as cursor:
  299. cursor.execute("SELECT %s::text[]", (val,))
  300. return cursor.fetchone()[0]
  301. def test_select_ascii_array(self):
  302. a = ["awef"]
  303. b = self._select(a)
  304. self.assertEqual(a[0], b[0])
  305. def test_select_unicode_array(self):
  306. a = ["ᄲawef"]
  307. b = self._select(a)
  308. self.assertEqual(a[0], b[0])
  309. def test_lookup_cast(self):
  310. from django.db.backends.postgresql.operations import DatabaseOperations
  311. do = DatabaseOperations(connection=None)
  312. lookups = (
  313. "iexact",
  314. "contains",
  315. "icontains",
  316. "startswith",
  317. "istartswith",
  318. "endswith",
  319. "iendswith",
  320. "regex",
  321. "iregex",
  322. )
  323. for lookup in lookups:
  324. with self.subTest(lookup=lookup):
  325. self.assertIn("::text", do.lookup_cast(lookup))
  326. def test_correct_extraction_psycopg_version(self):
  327. from django.db.backends.postgresql.base import Database, psycopg_version
  328. with mock.patch.object(Database, "__version__", "4.2.1 (dt dec pq3 ext lo64)"):
  329. self.assertEqual(psycopg_version(), (4, 2, 1))
  330. with mock.patch.object(
  331. Database, "__version__", "4.2b0.dev1 (dt dec pq3 ext lo64)"
  332. ):
  333. self.assertEqual(psycopg_version(), (4, 2))
  334. @override_settings(DEBUG=True)
  335. @unittest.skipIf(is_psycopg3, "psycopg2 specific test")
  336. def test_copy_to_expert_cursors(self):
  337. out = StringIO()
  338. copy_expert_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)"
  339. with connection.cursor() as cursor:
  340. cursor.copy_expert(copy_expert_sql, out)
  341. cursor.copy_to(out, "django_session")
  342. self.assertEqual(
  343. [q["sql"] for q in connection.queries],
  344. [copy_expert_sql, "COPY django_session TO STDOUT"],
  345. )
  346. @override_settings(DEBUG=True)
  347. @unittest.skipUnless(is_psycopg3, "psycopg3 specific test")
  348. def test_copy_cursors(self):
  349. copy_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)"
  350. with connection.cursor() as cursor:
  351. with cursor.copy(copy_sql) as copy:
  352. for row in copy:
  353. pass
  354. self.assertEqual([q["sql"] for q in connection.queries], [copy_sql])
  355. def test_get_database_version(self):
  356. new_connection = connection.copy()
  357. new_connection.pg_version = 110009
  358. self.assertEqual(new_connection.get_database_version(), (11, 9))
  359. @mock.patch.object(connection, "get_database_version", return_value=(11,))
  360. def test_check_database_version_supported(self, mocked_get_database_version):
  361. msg = "PostgreSQL 12 or later is required (found 11)."
  362. with self.assertRaisesMessage(NotSupportedError, msg):
  363. connection.check_database_version_supported()
  364. self.assertTrue(mocked_get_database_version.called)
  365. def test_compose_sql_when_no_connection(self):
  366. new_connection = connection.copy()
  367. try:
  368. self.assertEqual(
  369. new_connection.ops.compose_sql("SELECT %s", ["test"]),
  370. "SELECT 'test'",
  371. )
  372. finally:
  373. new_connection.close()