瀏覽代碼

Refs #25406 -- Removed exception hiding in MySQL test database creation during --keepdb.

Thanks Adam Johnson, Simon Charette and Tim Graham for reviews.
Mariusz Felisiak 8 年之前
父節點
當前提交
e1253bc26f
共有 2 個文件被更改,包括 66 次插入7 次删除
  1. 28 7
      django/db/backends/mysql/creation.py
  2. 38 0
      tests/backends/test_creation.py

+ 28 - 7
django/db/backends/mysql/creation.py

@@ -17,24 +17,45 @@ class DatabaseCreation(BaseDatabaseCreation):
             suffix.append('COLLATE %s' % test_settings['COLLATION'])
         return ' '.join(suffix)
 
+    def _execute_create_test_db(self, cursor, parameters, keepdb=False):
+        try:
+            if keepdb:
+                # If the database should be kept, add "IF NOT EXISTS" to avoid
+                # "database exists" error, also temporarily disable "database
+                # exists" warning.
+                cursor.execute('''
+                    SET @_tmp_sql_notes := @@sql_notes, sql_notes = 0;
+                    CREATE DATABASE IF NOT EXISTS %(dbname)s %(suffix)s;
+                    SET sql_notes = @_tmp_sql_notes;
+                ''' % parameters)
+            else:
+                super()._execute_create_test_db(cursor, parameters, keepdb)
+        except Exception as e:
+            if len(e.args) < 1 or e.args[0] != 1007:
+                # All errors except "database exists" (1007) cancel tests.
+                sys.stderr.write('Got an error creating the test database: %s\n' % e)
+                sys.exit(2)
+            else:
+                raise e
+
     def _clone_test_db(self, number, verbosity, keepdb=False):
-        qn = self.connection.ops.quote_name
         source_database_name = self.connection.settings_dict['NAME']
         target_database_name = self.get_test_db_clone_settings(number)['NAME']
-
+        test_db_params = {
+            'dbname': self.connection.ops.quote_name(target_database_name),
+            'suffix': self.sql_table_creation_suffix(),
+        }
         with self._nodb_connection.cursor() as cursor:
             try:
-                cursor.execute("CREATE DATABASE %s" % qn(target_database_name))
+                self._execute_create_test_db(cursor, test_db_params, keepdb)
             except Exception:
-                if keepdb:
-                    return
                 try:
                     if verbosity >= 1:
                         print("Destroying old test database for alias %s..." % (
                             self._get_database_display_str(verbosity, target_database_name),
                         ))
-                    cursor.execute("DROP DATABASE %s" % qn(target_database_name))
-                    cursor.execute("CREATE DATABASE %s" % qn(target_database_name))
+                    cursor.execute('DROP DATABASE %(dbname)s' % test_db_params)
+                    self._execute_create_test_db(cursor, test_db_params, keepdb)
                 except Exception as e:
                     sys.stderr.write("Got an error recreating the test database: %s\n" % e)
                     sys.exit(2)

+ 38 - 0
tests/backends/test_creation.py

@@ -8,6 +8,8 @@ from django.db import DEFAULT_DB_ALIAS, connection, connections
 from django.db.backends.base.creation import (
     TEST_DATABASE_PREFIX, BaseDatabaseCreation,
 )
+from django.db.backends.mysql.creation import \
+    DatabaseCreation as MySQLDatabaseCreation
 from django.db.backends.oracle.creation import \
     DatabaseCreation as OracleDatabaseCreation
 from django.db.backends.postgresql.creation import \
@@ -191,3 +193,39 @@ class OracleDatabaseCreationTests(TestCase):
                     creation._create_test_db(verbosity=0, keepdb=False)
                 with self.assertRaises(SystemExit):
                     creation._create_test_db(verbosity=0, keepdb=True)
+
+
+@unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests")
+class MySQLDatabaseCreationTests(SimpleTestCase):
+
+    def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
+        raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
+
+    def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
+        raise DatabaseError(1044, "Access denied for user")
+
+    def patch_test_db_creation(self, execute_create_test_db):
+        return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
+
+    @mock.patch('sys.stdout', new_callable=StringIO)
+    @mock.patch('sys.stderr', new_callable=StringIO)
+    def test_create_test_db_database_exists(self, *mocked_objects):
+        # Simulate test database creation raising "database exists"
+        creation = MySQLDatabaseCreation(connection)
+        with self.patch_test_db_creation(self._execute_raise_database_exists):
+            with mock.patch('builtins.input', return_value='no'):
+                with self.assertRaises(SystemExit):
+                    # SystemExit is raised if the user answers "no" to the
+                    # prompt asking if it's okay to delete the test database.
+                    creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
+            # "Database exists" shouldn't appear when keepdb is on
+            creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
+
+    @mock.patch('sys.stdout', new_callable=StringIO)
+    @mock.patch('sys.stderr', new_callable=StringIO)
+    def test_create_test_db_unexpected_error(self, *mocked_objects):
+        # Simulate test database creation raising unexpected error
+        creation = MySQLDatabaseCreation(connection)
+        with self.patch_test_db_creation(self._execute_raise_access_denied):
+            with self.assertRaises(SystemExit):
+                creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)