test_base.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. from unittest.mock import MagicMock, patch
  2. from django.db import DEFAULT_DB_ALIAS, connection, connections
  3. from django.db.backends.base.base import BaseDatabaseWrapper
  4. from django.test import SimpleTestCase, TestCase, skipUnlessDBFeature
  5. from ..models import Square
  6. class DatabaseWrapperTests(SimpleTestCase):
  7. def test_repr(self):
  8. conn = connections[DEFAULT_DB_ALIAS]
  9. self.assertEqual(
  10. repr(conn),
  11. f"<DatabaseWrapper vendor={connection.vendor!r} alias='default'>",
  12. )
  13. def test_initialization_class_attributes(self):
  14. """
  15. The "initialization" class attributes like client_class and
  16. creation_class should be set on the class and reflected in the
  17. corresponding instance attributes of the instantiated backend.
  18. """
  19. conn = connections[DEFAULT_DB_ALIAS]
  20. conn_class = type(conn)
  21. attr_names = [
  22. ('client_class', 'client'),
  23. ('creation_class', 'creation'),
  24. ('features_class', 'features'),
  25. ('introspection_class', 'introspection'),
  26. ('ops_class', 'ops'),
  27. ('validation_class', 'validation'),
  28. ]
  29. for class_attr_name, instance_attr_name in attr_names:
  30. class_attr_value = getattr(conn_class, class_attr_name)
  31. self.assertIsNotNone(class_attr_value)
  32. instance_attr_value = getattr(conn, instance_attr_name)
  33. self.assertIsInstance(instance_attr_value, class_attr_value)
  34. def test_initialization_display_name(self):
  35. self.assertEqual(BaseDatabaseWrapper.display_name, 'unknown')
  36. self.assertNotEqual(connection.display_name, 'unknown')
  37. class ExecuteWrapperTests(TestCase):
  38. @staticmethod
  39. def call_execute(connection, params=None):
  40. ret_val = '1' if params is None else '%s'
  41. sql = 'SELECT ' + ret_val + connection.features.bare_select_suffix
  42. with connection.cursor() as cursor:
  43. cursor.execute(sql, params)
  44. def call_executemany(self, connection, params=None):
  45. # executemany() must use an update query. Make sure it does nothing
  46. # by putting a false condition in the WHERE clause.
  47. sql = 'DELETE FROM {} WHERE 0=1 AND 0=%s'.format(Square._meta.db_table)
  48. if params is None:
  49. params = [(i,) for i in range(3)]
  50. with connection.cursor() as cursor:
  51. cursor.executemany(sql, params)
  52. @staticmethod
  53. def mock_wrapper():
  54. return MagicMock(side_effect=lambda execute, *args: execute(*args))
  55. def test_wrapper_invoked(self):
  56. wrapper = self.mock_wrapper()
  57. with connection.execute_wrapper(wrapper):
  58. self.call_execute(connection)
  59. self.assertTrue(wrapper.called)
  60. (_, sql, params, many, context), _ = wrapper.call_args
  61. self.assertIn('SELECT', sql)
  62. self.assertIsNone(params)
  63. self.assertIs(many, False)
  64. self.assertEqual(context['connection'], connection)
  65. def test_wrapper_invoked_many(self):
  66. wrapper = self.mock_wrapper()
  67. with connection.execute_wrapper(wrapper):
  68. self.call_executemany(connection)
  69. self.assertTrue(wrapper.called)
  70. (_, sql, param_list, many, context), _ = wrapper.call_args
  71. self.assertIn('DELETE', sql)
  72. self.assertIsInstance(param_list, (list, tuple))
  73. self.assertIs(many, True)
  74. self.assertEqual(context['connection'], connection)
  75. def test_database_queried(self):
  76. wrapper = self.mock_wrapper()
  77. with connection.execute_wrapper(wrapper):
  78. with connection.cursor() as cursor:
  79. sql = 'SELECT 17' + connection.features.bare_select_suffix
  80. cursor.execute(sql)
  81. seventeen = cursor.fetchall()
  82. self.assertEqual(list(seventeen), [(17,)])
  83. self.call_executemany(connection)
  84. def test_nested_wrapper_invoked(self):
  85. outer_wrapper = self.mock_wrapper()
  86. inner_wrapper = self.mock_wrapper()
  87. with connection.execute_wrapper(outer_wrapper), connection.execute_wrapper(inner_wrapper):
  88. self.call_execute(connection)
  89. self.assertEqual(inner_wrapper.call_count, 1)
  90. self.call_executemany(connection)
  91. self.assertEqual(inner_wrapper.call_count, 2)
  92. def test_outer_wrapper_blocks(self):
  93. def blocker(*args):
  94. pass
  95. wrapper = self.mock_wrapper()
  96. c = connection # This alias shortens the next line.
  97. with c.execute_wrapper(wrapper), c.execute_wrapper(blocker), c.execute_wrapper(wrapper):
  98. with c.cursor() as cursor:
  99. cursor.execute("The database never sees this")
  100. self.assertEqual(wrapper.call_count, 1)
  101. cursor.executemany("The database never sees this %s", [("either",)])
  102. self.assertEqual(wrapper.call_count, 2)
  103. def test_wrapper_gets_sql(self):
  104. wrapper = self.mock_wrapper()
  105. sql = "SELECT 'aloha'" + connection.features.bare_select_suffix
  106. with connection.execute_wrapper(wrapper), connection.cursor() as cursor:
  107. cursor.execute(sql)
  108. (_, reported_sql, _, _, _), _ = wrapper.call_args
  109. self.assertEqual(reported_sql, sql)
  110. def test_wrapper_connection_specific(self):
  111. wrapper = self.mock_wrapper()
  112. with connections['other'].execute_wrapper(wrapper):
  113. self.assertEqual(connections['other'].execute_wrappers, [wrapper])
  114. self.call_execute(connection)
  115. self.assertFalse(wrapper.called)
  116. self.assertEqual(connection.execute_wrappers, [])
  117. self.assertEqual(connections['other'].execute_wrappers, [])
  118. class ConnectionHealthChecksTests(SimpleTestCase):
  119. databases = {'default'}
  120. def setUp(self):
  121. # All test cases here need newly configured and created connections.
  122. # Use the default db connection for convenience.
  123. connection.close()
  124. self.addCleanup(connection.close)
  125. def patch_settings_dict(self, conn_health_checks):
  126. self.settings_dict_patcher = patch.dict(connection.settings_dict, {
  127. **connection.settings_dict,
  128. 'CONN_MAX_AGE': None,
  129. 'CONN_HEALTH_CHECKS': conn_health_checks,
  130. })
  131. self.settings_dict_patcher.start()
  132. self.addCleanup(self.settings_dict_patcher.stop)
  133. def run_query(self):
  134. with connection.cursor() as cursor:
  135. cursor.execute('SELECT 42' + connection.features.bare_select_suffix)
  136. @skipUnlessDBFeature('test_db_allows_multiple_connections')
  137. def test_health_checks_enabled(self):
  138. self.patch_settings_dict(conn_health_checks=True)
  139. self.assertIsNone(connection.connection)
  140. # Newly created connections are considered healthy without performing
  141. # the health check.
  142. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  143. self.run_query()
  144. old_connection = connection.connection
  145. # Simulate request_finished.
  146. connection.close_if_unusable_or_obsolete()
  147. self.assertIs(old_connection, connection.connection)
  148. # Simulate connection health check failing.
  149. with patch.object(connection, 'is_usable', return_value=False) as mocked_is_usable:
  150. self.run_query()
  151. new_connection = connection.connection
  152. # A new connection is established.
  153. self.assertIsNot(new_connection, old_connection)
  154. # Only one health check per "request" is performed, so the next
  155. # query will carry on even if the health check fails. Next query
  156. # succeeds because the real connection is healthy and only the
  157. # health check failure is mocked.
  158. self.run_query()
  159. self.assertIs(new_connection, connection.connection)
  160. self.assertEqual(mocked_is_usable.call_count, 1)
  161. # Simulate request_finished.
  162. connection.close_if_unusable_or_obsolete()
  163. # The underlying connection is being reused further with health checks
  164. # succeeding.
  165. self.run_query()
  166. self.run_query()
  167. self.assertIs(new_connection, connection.connection)
  168. @skipUnlessDBFeature('test_db_allows_multiple_connections')
  169. def test_health_checks_enabled_errors_occurred(self):
  170. self.patch_settings_dict(conn_health_checks=True)
  171. self.assertIsNone(connection.connection)
  172. # Newly created connections are considered healthy without performing
  173. # the health check.
  174. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  175. self.run_query()
  176. old_connection = connection.connection
  177. # Simulate errors_occurred.
  178. connection.errors_occurred = True
  179. # Simulate request_started (the connection is healthy).
  180. connection.close_if_unusable_or_obsolete()
  181. # Persistent connections are enabled.
  182. self.assertIs(old_connection, connection.connection)
  183. # No additional health checks after the one in
  184. # close_if_unusable_or_obsolete() are executed during this "request"
  185. # when running queries.
  186. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  187. self.run_query()
  188. @skipUnlessDBFeature('test_db_allows_multiple_connections')
  189. def test_health_checks_disabled(self):
  190. self.patch_settings_dict(conn_health_checks=False)
  191. self.assertIsNone(connection.connection)
  192. # Newly created connections are considered healthy without performing
  193. # the health check.
  194. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  195. self.run_query()
  196. old_connection = connection.connection
  197. # Simulate request_finished.
  198. connection.close_if_unusable_or_obsolete()
  199. # Persistent connections are enabled (connection is not).
  200. self.assertIs(old_connection, connection.connection)
  201. # Health checks are not performed.
  202. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  203. self.run_query()
  204. # Health check wasn't performed and the connection is unchanged.
  205. self.assertIs(old_connection, connection.connection)
  206. self.run_query()
  207. # The connection is unchanged after the next query either during
  208. # the current "request".
  209. self.assertIs(old_connection, connection.connection)
  210. @skipUnlessDBFeature('test_db_allows_multiple_connections')
  211. def test_set_autocommit_health_checks_enabled(self):
  212. self.patch_settings_dict(conn_health_checks=True)
  213. self.assertIsNone(connection.connection)
  214. # Newly created connections are considered healthy without performing
  215. # the health check.
  216. with patch.object(connection, 'is_usable', side_effect=AssertionError):
  217. # Simulate outermost atomic block: changing autocommit for
  218. # a connection.
  219. connection.set_autocommit(False)
  220. self.run_query()
  221. connection.commit()
  222. connection.set_autocommit(True)
  223. old_connection = connection.connection
  224. # Simulate request_finished.
  225. connection.close_if_unusable_or_obsolete()
  226. # Persistent connections are enabled.
  227. self.assertIs(old_connection, connection.connection)
  228. # Simulate connection health check failing.
  229. with patch.object(connection, 'is_usable', return_value=False) as mocked_is_usable:
  230. # Simulate outermost atomic block: changing autocommit for
  231. # a connection.
  232. connection.set_autocommit(False)
  233. new_connection = connection.connection
  234. self.assertIsNot(new_connection, old_connection)
  235. # Only one health check per "request" is performed, so a query will
  236. # carry on even if the health check fails. This query succeeds
  237. # because the real connection is healthy and only the health check
  238. # failure is mocked.
  239. self.run_query()
  240. connection.commit()
  241. connection.set_autocommit(True)
  242. # The connection is unchanged.
  243. self.assertIs(new_connection, connection.connection)
  244. self.assertEqual(mocked_is_usable.call_count, 1)
  245. # Simulate request_finished.
  246. connection.close_if_unusable_or_obsolete()
  247. # The underlying connection is being reused further with health checks
  248. # succeeding.
  249. connection.set_autocommit(False)
  250. self.run_query()
  251. connection.commit()
  252. connection.set_autocommit(True)
  253. self.assertIs(new_connection, connection.connection)