tests.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import unittest
  2. from unittest import mock
  3. from django.core.exceptions import ImproperlyConfigured
  4. from django.db import DatabaseError, connection, connections
  5. from django.test import TestCase
  6. @unittest.skipUnless(connection.vendor == 'postgresql', 'PostgreSQL tests')
  7. class Tests(TestCase):
  8. def test_nodb_connection(self):
  9. """
  10. The _nodb_connection property fallbacks to the default connection
  11. database when access to the 'postgres' database is not granted.
  12. """
  13. def mocked_connect(self):
  14. if self.settings_dict['NAME'] is None:
  15. raise DatabaseError()
  16. return ''
  17. nodb_conn = connection._nodb_connection
  18. self.assertIsNone(nodb_conn.settings_dict['NAME'])
  19. # Now assume the 'postgres' db isn't available
  20. msg = (
  21. "Normally Django will use a connection to the 'postgres' database "
  22. "to avoid running initialization queries against the production "
  23. "database when it's not needed (for example, when running tests). "
  24. "Django was unable to create a connection to the 'postgres' "
  25. "database and will use the first PostgreSQL database instead."
  26. )
  27. with self.assertWarnsMessage(RuntimeWarning, msg):
  28. with mock.patch('django.db.backends.base.base.BaseDatabaseWrapper.connect',
  29. side_effect=mocked_connect, autospec=True):
  30. with mock.patch.object(
  31. connection,
  32. 'settings_dict',
  33. {**connection.settings_dict, 'NAME': 'postgres'},
  34. ):
  35. nodb_conn = connection._nodb_connection
  36. self.assertIsNotNone(nodb_conn.settings_dict['NAME'])
  37. self.assertEqual(nodb_conn.settings_dict['NAME'], connections['other'].settings_dict['NAME'])
  38. def test_database_name_too_long(self):
  39. from django.db.backends.postgresql.base import DatabaseWrapper
  40. settings = connection.settings_dict.copy()
  41. max_name_length = connection.ops.max_name_length()
  42. settings['NAME'] = 'a' + (max_name_length * 'a')
  43. msg = (
  44. 'Database names longer than %d characters are not supported by '
  45. 'PostgreSQL. Supply a shorter NAME in settings.DATABASES.'
  46. ) % max_name_length
  47. with self.assertRaisesMessage(ImproperlyConfigured, msg):
  48. DatabaseWrapper(settings).get_connection_params()
  49. def test_connect_and_rollback(self):
  50. """
  51. PostgreSQL shouldn't roll back SET TIME ZONE, even if the first
  52. transaction is rolled back (#17062).
  53. """
  54. new_connection = connection.copy()
  55. try:
  56. # Ensure the database default time zone is different than
  57. # the time zone in new_connection.settings_dict. We can
  58. # get the default time zone by reset & show.
  59. with new_connection.cursor() as cursor:
  60. cursor.execute("RESET TIMEZONE")
  61. cursor.execute("SHOW TIMEZONE")
  62. db_default_tz = cursor.fetchone()[0]
  63. new_tz = 'Europe/Paris' if db_default_tz == 'UTC' else 'UTC'
  64. new_connection.close()
  65. # Invalidate timezone name cache, because the setting_changed
  66. # handler cannot know about new_connection.
  67. del new_connection.timezone_name
  68. # Fetch a new connection with the new_tz as default
  69. # time zone, run a query and rollback.
  70. with self.settings(TIME_ZONE=new_tz):
  71. new_connection.set_autocommit(False)
  72. new_connection.rollback()
  73. # Now let's see if the rollback rolled back the SET TIME ZONE.
  74. with new_connection.cursor() as cursor:
  75. cursor.execute("SHOW TIMEZONE")
  76. tz = cursor.fetchone()[0]
  77. self.assertEqual(new_tz, tz)
  78. finally:
  79. new_connection.close()
  80. def test_connect_non_autocommit(self):
  81. """
  82. The connection wrapper shouldn't believe that autocommit is enabled
  83. after setting the time zone when AUTOCOMMIT is False (#21452).
  84. """
  85. new_connection = connection.copy()
  86. new_connection.settings_dict['AUTOCOMMIT'] = False
  87. try:
  88. # Open a database connection.
  89. new_connection.cursor()
  90. self.assertFalse(new_connection.get_autocommit())
  91. finally:
  92. new_connection.close()
  93. def test_connect_isolation_level(self):
  94. """
  95. The transaction level can be configured with
  96. DATABASES ['OPTIONS']['isolation_level'].
  97. """
  98. import psycopg2
  99. from psycopg2.extensions import (
  100. ISOLATION_LEVEL_READ_COMMITTED as read_committed,
  101. ISOLATION_LEVEL_SERIALIZABLE as serializable,
  102. )
  103. # Since this is a django.test.TestCase, a transaction is in progress
  104. # and the isolation level isn't reported as 0. This test assumes that
  105. # PostgreSQL is configured with the default isolation level.
  106. # Check the level on the psycopg2 connection, not the Django wrapper.
  107. default_level = read_committed if psycopg2.__version__ < '2.7' else None
  108. self.assertEqual(connection.connection.isolation_level, default_level)
  109. new_connection = connection.copy()
  110. new_connection.settings_dict['OPTIONS']['isolation_level'] = serializable
  111. try:
  112. # Start a transaction so the isolation level isn't reported as 0.
  113. new_connection.set_autocommit(False)
  114. # Check the level on the psycopg2 connection, not the Django wrapper.
  115. self.assertEqual(new_connection.connection.isolation_level, serializable)
  116. finally:
  117. new_connection.close()
  118. def _select(self, val):
  119. with connection.cursor() as cursor:
  120. cursor.execute('SELECT %s', (val,))
  121. return cursor.fetchone()[0]
  122. def test_select_ascii_array(self):
  123. a = ['awef']
  124. b = self._select(a)
  125. self.assertEqual(a[0], b[0])
  126. def test_select_unicode_array(self):
  127. a = ['ᄲawef']
  128. b = self._select(a)
  129. self.assertEqual(a[0], b[0])
  130. def test_lookup_cast(self):
  131. from django.db.backends.postgresql.operations import DatabaseOperations
  132. do = DatabaseOperations(connection=None)
  133. lookups = (
  134. 'iexact', 'contains', 'icontains', 'startswith', 'istartswith',
  135. 'endswith', 'iendswith', 'regex', 'iregex',
  136. )
  137. for lookup in lookups:
  138. with self.subTest(lookup=lookup):
  139. self.assertIn('::text', do.lookup_cast(lookup))
  140. for lookup in lookups:
  141. for field_type in ('CICharField', 'CIEmailField', 'CITextField'):
  142. with self.subTest(lookup=lookup, field_type=field_type):
  143. self.assertIn('::citext', do.lookup_cast(lookup, internal_type=field_type))
  144. def test_correct_extraction_psycopg2_version(self):
  145. from django.db.backends.postgresql.base import psycopg2_version
  146. with mock.patch('psycopg2.__version__', '4.2.1 (dt dec pq3 ext lo64)'):
  147. self.assertEqual(psycopg2_version(), (4, 2, 1))
  148. with mock.patch('psycopg2.__version__', '4.2b0.dev1 (dt dec pq3 ext lo64)'):
  149. self.assertEqual(psycopg2_version(), (4, 2))