tests.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. import copy
  2. import unittest
  3. from io import StringIO
  4. from unittest import mock
  5. from django.core.exceptions import ImproperlyConfigured
  6. from django.db import (
  7. DEFAULT_DB_ALIAS,
  8. DatabaseError,
  9. NotSupportedError,
  10. connection,
  11. connections,
  12. )
  13. from django.db.backends.base.base import BaseDatabaseWrapper
  14. from django.test import TestCase, override_settings
  15. @unittest.skipUnless(connection.vendor == "postgresql", "PostgreSQL tests")
  16. class Tests(TestCase):
  17. databases = {"default", "other"}
  18. def test_nodb_cursor(self):
  19. """
  20. The _nodb_cursor() fallbacks to the default connection database when
  21. access to the 'postgres' database is not granted.
  22. """
  23. orig_connect = BaseDatabaseWrapper.connect
  24. def mocked_connect(self):
  25. if self.settings_dict["NAME"] is None:
  26. raise DatabaseError()
  27. return orig_connect(self)
  28. with connection._nodb_cursor() as cursor:
  29. self.assertIs(cursor.closed, False)
  30. self.assertIsNotNone(cursor.db.connection)
  31. self.assertIsNone(cursor.db.settings_dict["NAME"])
  32. self.assertIs(cursor.closed, True)
  33. self.assertIsNone(cursor.db.connection)
  34. # Now assume the 'postgres' db isn't available
  35. msg = (
  36. "Normally Django will use a connection to the 'postgres' database "
  37. "to avoid running initialization queries against the production "
  38. "database when it's not needed (for example, when running tests). "
  39. "Django was unable to create a connection to the 'postgres' "
  40. "database and will use the first PostgreSQL database instead."
  41. )
  42. with self.assertWarnsMessage(RuntimeWarning, msg):
  43. with mock.patch(
  44. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  45. side_effect=mocked_connect,
  46. autospec=True,
  47. ):
  48. with mock.patch.object(
  49. connection,
  50. "settings_dict",
  51. {**connection.settings_dict, "NAME": "postgres"},
  52. ):
  53. with connection._nodb_cursor() as cursor:
  54. self.assertIs(cursor.closed, False)
  55. self.assertIsNotNone(cursor.db.connection)
  56. self.assertIs(cursor.closed, True)
  57. self.assertIsNone(cursor.db.connection)
  58. self.assertIsNotNone(cursor.db.settings_dict["NAME"])
  59. self.assertEqual(
  60. cursor.db.settings_dict["NAME"], connections["other"].settings_dict["NAME"]
  61. )
  62. # Cursor is yielded only for the first PostgreSQL database.
  63. with self.assertWarnsMessage(RuntimeWarning, msg):
  64. with mock.patch(
  65. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  66. side_effect=mocked_connect,
  67. autospec=True,
  68. ):
  69. with connection._nodb_cursor() as cursor:
  70. self.assertIs(cursor.closed, False)
  71. self.assertIsNotNone(cursor.db.connection)
  72. def test_nodb_cursor_raises_postgres_authentication_failure(self):
  73. """
  74. _nodb_cursor() re-raises authentication failure to the 'postgres' db
  75. when other connection to the PostgreSQL database isn't available.
  76. """
  77. def mocked_connect(self):
  78. raise DatabaseError()
  79. def mocked_all(self):
  80. test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
  81. test_connection.settings_dict = copy.deepcopy(connection.settings_dict)
  82. test_connection.settings_dict["NAME"] = "postgres"
  83. return [test_connection]
  84. msg = (
  85. "Normally Django will use a connection to the 'postgres' database "
  86. "to avoid running initialization queries against the production "
  87. "database when it's not needed (for example, when running tests). "
  88. "Django was unable to create a connection to the 'postgres' "
  89. "database and will use the first PostgreSQL database instead."
  90. )
  91. with self.assertWarnsMessage(RuntimeWarning, msg):
  92. mocker_connections_all = mock.patch(
  93. "django.utils.connection.BaseConnectionHandler.all",
  94. side_effect=mocked_all,
  95. autospec=True,
  96. )
  97. mocker_connect = mock.patch(
  98. "django.db.backends.base.base.BaseDatabaseWrapper.connect",
  99. side_effect=mocked_connect,
  100. autospec=True,
  101. )
  102. with mocker_connections_all, mocker_connect:
  103. with self.assertRaises(DatabaseError):
  104. with connection._nodb_cursor():
  105. pass
  106. def test_nodb_cursor_reraise_exceptions(self):
  107. with self.assertRaisesMessage(DatabaseError, "exception"):
  108. with connection._nodb_cursor():
  109. raise DatabaseError("exception")
  110. def test_database_name_too_long(self):
  111. from django.db.backends.postgresql.base import DatabaseWrapper
  112. settings = connection.settings_dict.copy()
  113. max_name_length = connection.ops.max_name_length()
  114. settings["NAME"] = "a" + (max_name_length * "a")
  115. msg = (
  116. "The database name '%s' (%d characters) is longer than "
  117. "PostgreSQL's limit of %s characters. Supply a shorter NAME in "
  118. "settings.DATABASES."
  119. ) % (settings["NAME"], max_name_length + 1, max_name_length)
  120. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  121. DatabaseWrapper(settings).get_connection_params()
  122. def test_database_name_empty(self):
  123. from django.db.backends.postgresql.base import DatabaseWrapper
  124. settings = connection.settings_dict.copy()
  125. settings["NAME"] = ""
  126. msg = (
  127. "settings.DATABASES is improperly configured. Please supply the "
  128. "NAME or OPTIONS['service'] value."
  129. )
  130. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  131. DatabaseWrapper(settings).get_connection_params()
  132. def test_service_name(self):
  133. from django.db.backends.postgresql.base import DatabaseWrapper
  134. settings = connection.settings_dict.copy()
  135. settings["OPTIONS"] = {"service": "my_service"}
  136. settings["NAME"] = ""
  137. params = DatabaseWrapper(settings).get_connection_params()
  138. self.assertEqual(params["service"], "my_service")
  139. self.assertNotIn("database", params)
  140. def test_service_name_default_db(self):
  141. # None is used to connect to the default 'postgres' db.
  142. from django.db.backends.postgresql.base import DatabaseWrapper
  143. settings = connection.settings_dict.copy()
  144. settings["NAME"] = None
  145. settings["OPTIONS"] = {"service": "django_test"}
  146. params = DatabaseWrapper(settings).get_connection_params()
  147. self.assertEqual(params["database"], "postgres")
  148. self.assertNotIn("service", params)
  149. def test_connect_and_rollback(self):
  150. """
  151. PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
  152. transaction is rolled back (#17062).
  153. """
  154. new_connection = connection.copy()
  155. try:
  156. # Ensure the database default time zone is different than
  157. # the time zone in new_connection.settings_dict. We can
  158. # get the default time zone by reset & show.
  159. with new_connection.cursor() as cursor:
  160. cursor.execute("RESET TIMEZONE")
  161. cursor.execute("SHOW TIMEZONE")
  162. db_default_tz = cursor.fetchone()[0]
  163. new_tz = "Europe/Paris" if db_default_tz == "UTC" else "UTC"
  164. new_connection.close()
  165. # Invalidate timezone name cache, because the setting_changed
  166. # handler cannot know about new_connection.
  167. del new_connection.timezone_name
  168. # Fetch a new connection with the new_tz as default
  169. # time zone, run a query and rollback.
  170. with self.settings(TIME_ZONE=new_tz):
  171. new_connection.set_autocommit(False)
  172. new_connection.rollback()
  173. # Now let's see if the rollback rolled back the SET TIME ZONE.
  174. with new_connection.cursor() as cursor:
  175. cursor.execute("SHOW TIMEZONE")
  176. tz = cursor.fetchone()[0]
  177. self.assertEqual(new_tz, tz)
  178. finally:
  179. new_connection.close()
  180. def test_connect_non_autocommit(self):
  181. """
  182. The connection wrapper shouldn't believe that autocommit is enabled
  183. after setting the time zone when AUTOCOMMIT is False (#21452).
  184. """
  185. new_connection = connection.copy()
  186. new_connection.settings_dict["AUTOCOMMIT"] = False
  187. try:
  188. # Open a database connection.
  189. with new_connection.cursor():
  190. self.assertFalse(new_connection.get_autocommit())
  191. finally:
  192. new_connection.close()
  193. def test_connect_isolation_level(self):
  194. """
  195. The transaction level can be configured with
  196. DATABASES ['OPTIONS']['isolation_level'].
  197. """
  198. from psycopg2.extensions import ISOLATION_LEVEL_SERIALIZABLE as serializable
  199. # Since this is a django.test.TestCase, a transaction is in progress
  200. # and the isolation level isn't reported as 0. This test assumes that
  201. # PostgreSQL is configured with the default isolation level.
  202. # Check the level on the psycopg2 connection, not the Django wrapper.
  203. self.assertIsNone(connection.connection.isolation_level)
  204. new_connection = connection.copy()
  205. new_connection.settings_dict["OPTIONS"]["isolation_level"] = serializable
  206. try:
  207. # Start a transaction so the isolation level isn't reported as 0.
  208. new_connection.set_autocommit(False)
  209. # Check the level on the psycopg2 connection, not the Django wrapper.
  210. self.assertEqual(new_connection.connection.isolation_level, serializable)
  211. finally:
  212. new_connection.close()
  213. def test_connect_no_is_usable_checks(self):
  214. new_connection = connection.copy()
  215. try:
  216. with mock.patch.object(new_connection, "is_usable") as is_usable:
  217. new_connection.connect()
  218. is_usable.assert_not_called()
  219. finally:
  220. new_connection.close()
  221. def _select(self, val):
  222. with connection.cursor() as cursor:
  223. cursor.execute("SELECT %s", (val,))
  224. return cursor.fetchone()[0]
  225. def test_select_ascii_array(self):
  226. a = ["awef"]
  227. b = self._select(a)
  228. self.assertEqual(a[0], b[0])
  229. def test_select_unicode_array(self):
  230. a = ["ᄲawef"]
  231. b = self._select(a)
  232. self.assertEqual(a[0], b[0])
  233. def test_lookup_cast(self):
  234. from django.db.backends.postgresql.operations import DatabaseOperations
  235. do = DatabaseOperations(connection=None)
  236. lookups = (
  237. "iexact",
  238. "contains",
  239. "icontains",
  240. "startswith",
  241. "istartswith",
  242. "endswith",
  243. "iendswith",
  244. "regex",
  245. "iregex",
  246. )
  247. for lookup in lookups:
  248. with self.subTest(lookup=lookup):
  249. self.assertIn("::text", do.lookup_cast(lookup))
  250. for lookup in lookups:
  251. for field_type in ("CICharField", "CIEmailField", "CITextField"):
  252. with self.subTest(lookup=lookup, field_type=field_type):
  253. self.assertIn(
  254. "::citext", do.lookup_cast(lookup, internal_type=field_type)
  255. )
  256. def test_correct_extraction_psycopg2_version(self):
  257. from django.db.backends.postgresql.base import psycopg2_version
  258. with mock.patch("psycopg2.__version__", "4.2.1 (dt dec pq3 ext lo64)"):
  259. self.assertEqual(psycopg2_version(), (4, 2, 1))
  260. with mock.patch("psycopg2.__version__", "4.2b0.dev1 (dt dec pq3 ext lo64)"):
  261. self.assertEqual(psycopg2_version(), (4, 2))
  262. @override_settings(DEBUG=True)
  263. def test_copy_cursors(self):
  264. out = StringIO()
  265. copy_expert_sql = "COPY django_session TO STDOUT (FORMAT CSV, HEADER)"
  266. with connection.cursor() as cursor:
  267. cursor.copy_expert(copy_expert_sql, out)
  268. cursor.copy_to(out, "django_session")
  269. self.assertEqual(
  270. [q["sql"] for q in connection.queries],
  271. [copy_expert_sql, "COPY django_session TO STDOUT"],
  272. )
  273. def test_get_database_version(self):
  274. new_connection = connection.copy()
  275. new_connection.pg_version = 110009
  276. self.assertEqual(new_connection.get_database_version(), (11, 9))
  277. @mock.patch.object(connection, "get_database_version", return_value=(10,))
  278. def test_check_database_version_supported(self, mocked_get_database_version):
  279. msg = "PostgreSQL 11 or later is required (found 10)."
  280. with self.assertRaisesMessage(NotSupportedError, msg):
  281. connection.check_database_version_supported()
  282. self.assertTrue(mocked_get_database_version.called)