123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- import copy
- import unittest
- from contextlib import contextmanager
- from io import StringIO
- from unittest import mock
- from django.db import DEFAULT_DB_ALIAS, connection, connections
- from django.db.backends.base.creation import (
- TEST_DATABASE_PREFIX, BaseDatabaseCreation,
- )
- from django.db.backends.mysql.creation import (
- DatabaseCreation as MySQLDatabaseCreation,
- )
- from django.db.backends.oracle.creation import (
- DatabaseCreation as OracleDatabaseCreation,
- )
- from django.db.utils import DatabaseError
- from django.test import SimpleTestCase, TestCase
- try:
- import psycopg2 # NOQA
- except ImportError:
- pass
- else:
- from psycopg2 import errorcodes
- from django.db.backends.postgresql.creation import \
- DatabaseCreation as PostgreSQLDatabaseCreation
- class TestDbSignatureTests(SimpleTestCase):
- def get_connection_copy(self):
- # Get a copy of the default connection. (Can't use django.db.connection
- # because it'll modify the default connection itself.)
- test_connection = copy.copy(connections[DEFAULT_DB_ALIAS])
- test_connection.settings_dict = copy.copy(connections[DEFAULT_DB_ALIAS].settings_dict)
- return test_connection
- def test_default_name(self):
- # A test db name isn't set.
- prod_name = 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['NAME'] = prod_name
- test_connection.settings_dict['TEST'] = {'NAME': None}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], TEST_DATABASE_PREFIX + prod_name)
- def test_custom_test_name(self):
- # A regular test db name is set.
- test_name = 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['TEST'] = {'NAME': test_name}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], test_name)
- def test_custom_test_name_with_test_prefix(self):
- # A test db name prefixed with TEST_DATABASE_PREFIX is set.
- test_name = TEST_DATABASE_PREFIX + 'hodor'
- test_connection = self.get_connection_copy()
- test_connection.settings_dict['TEST'] = {'NAME': test_name}
- signature = BaseDatabaseCreation(test_connection).test_db_signature()
- self.assertEqual(signature[3], test_name)
- @unittest.skipUnless(connection.vendor == 'postgresql', "PostgreSQL-specific tests")
- class PostgreSQLDatabaseCreationTests(SimpleTestCase):
- @contextmanager
- def changed_test_settings(self, **kwargs):
- settings = connection.settings_dict['TEST']
- saved_values = {}
- for name in kwargs:
- if name in settings:
- saved_values[name] = settings[name]
- for name, value in kwargs.items():
- settings[name] = value
- try:
- yield
- finally:
- for name, value in kwargs.items():
- if name in saved_values:
- settings[name] = saved_values[name]
- else:
- del settings[name]
- def check_sql_table_creation_suffix(self, settings, expected):
- with self.changed_test_settings(**settings):
- creation = PostgreSQLDatabaseCreation(connection)
- suffix = creation.sql_table_creation_suffix()
- self.assertEqual(suffix, expected)
- def test_sql_table_creation_suffix_with_none_settings(self):
- settings = {'CHARSET': None, 'TEMPLATE': None}
- self.check_sql_table_creation_suffix(settings, "")
- def test_sql_table_creation_suffix_with_encoding(self):
- settings = {'CHARSET': 'UTF8'}
- self.check_sql_table_creation_suffix(settings, "WITH ENCODING 'UTF8'")
- def test_sql_table_creation_suffix_with_template(self):
- settings = {'TEMPLATE': 'template0'}
- self.check_sql_table_creation_suffix(settings, 'WITH TEMPLATE "template0"')
- def test_sql_table_creation_suffix_with_encoding_and_template(self):
- settings = {'CHARSET': 'UTF8', 'TEMPLATE': 'template0'}
- self.check_sql_table_creation_suffix(settings, '''WITH ENCODING 'UTF8' TEMPLATE "template0"''')
- def _execute_raise_database_already_exists(self, cursor, parameters, keepdb=False):
- error = DatabaseError('database %s already exists' % parameters['dbname'])
- error.pgcode = errorcodes.DUPLICATE_DATABASE
- raise DatabaseError() from error
- def _execute_raise_permission_denied(self, cursor, parameters, keepdb=False):
- error = DatabaseError('permission denied to create database')
- error.pgcode = errorcodes.INSUFFICIENT_PRIVILEGE
- raise DatabaseError() from error
- def patch_test_db_creation(self, execute_create_test_db):
- return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db(self, *mocked_objects):
- creation = PostgreSQLDatabaseCreation(connection)
- # Simulate test database creation raising "database already exists"
- with self.patch_test_db_creation(self._execute_raise_database_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test database.
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- # "Database already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
- # Simulate test database creation raising unexpected error
- with self.patch_test_db_creation(self._execute_raise_permission_denied):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
- @unittest.skipUnless(connection.vendor == 'oracle', "Oracle specific tests")
- @mock.patch.object(OracleDatabaseCreation, '_maindb_connection', return_value=connection)
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- class OracleDatabaseCreationTests(TestCase):
- def _execute_raise_user_already_exists(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
- # Raise "user already exists" only in test user creation
- if statements and statements[0].startswith('CREATE USER'):
- raise DatabaseError("ORA-01920: user name 'string' conflicts with another user or role name")
- def _execute_raise_tablespace_already_exists(
- self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
- ):
- raise DatabaseError("ORA-01543: tablespace 'string' already exists")
- def _execute_raise_insufficient_privileges(
- self, cursor, statements, parameters, verbosity, allow_quiet_fail=False
- ):
- raise DatabaseError("ORA-01031: insufficient privileges")
- def _test_database_passwd(self):
- # Mocked to avoid test user password changed
- return connection.settings_dict['SAVED_PASSWORD']
- def patch_execute_statements(self, execute_statements):
- return mock.patch.object(OracleDatabaseCreation, '_execute_statements', execute_statements)
- @mock.patch.object(OracleDatabaseCreation, '_test_user_create', return_value=False)
- def test_create_test_db(self, *mocked_objects):
- creation = OracleDatabaseCreation(connection)
- # Simulate test database creation raising "tablespace already exists"
- with self.patch_execute_statements(self._execute_raise_tablespace_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test tablespace.
- creation._create_test_db(verbosity=0, keepdb=False)
- # "Tablespace already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, keepdb=True)
- # Simulate test database creation raising unexpected error
- with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=True)
- @mock.patch.object(OracleDatabaseCreation, '_test_database_create', return_value=False)
- def test_create_test_user(self, *mocked_objects):
- creation = OracleDatabaseCreation(connection)
- with mock.patch.object(OracleDatabaseCreation, '_test_database_passwd', self._test_database_passwd):
- # Simulate test user creation raising "user already exists"
- with self.patch_execute_statements(self._execute_raise_user_already_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test user.
- creation._create_test_db(verbosity=0, keepdb=False)
- # "User already exists" error is ignored when keepdb is on
- creation._create_test_db(verbosity=0, keepdb=True)
- # Simulate test user creation raising unexpected error
- with self.patch_execute_statements(self._execute_raise_insufficient_privileges):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=False)
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, keepdb=True)
- @unittest.skipUnless(connection.vendor == 'mysql', "MySQL specific tests")
- class MySQLDatabaseCreationTests(SimpleTestCase):
- def _execute_raise_database_exists(self, cursor, parameters, keepdb=False):
- raise DatabaseError(1007, "Can't create database '%s'; database exists" % parameters['dbname'])
- def _execute_raise_access_denied(self, cursor, parameters, keepdb=False):
- raise DatabaseError(1044, "Access denied for user")
- def patch_test_db_creation(self, execute_create_test_db):
- return mock.patch.object(BaseDatabaseCreation, '_execute_create_test_db', execute_create_test_db)
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db_database_exists(self, *mocked_objects):
- # Simulate test database creation raising "database exists"
- creation = MySQLDatabaseCreation(connection)
- with self.patch_test_db_creation(self._execute_raise_database_exists):
- with mock.patch('builtins.input', return_value='no'):
- with self.assertRaises(SystemExit):
- # SystemExit is raised if the user answers "no" to the
- # prompt asking if it's okay to delete the test database.
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
- # "Database exists" shouldn't appear when keepdb is on
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=True)
- @mock.patch('sys.stdout', new_callable=StringIO)
- @mock.patch('sys.stderr', new_callable=StringIO)
- def test_create_test_db_unexpected_error(self, *mocked_objects):
- # Simulate test database creation raising unexpected error
- creation = MySQLDatabaseCreation(connection)
- with self.patch_test_db_creation(self._execute_raise_access_denied):
- with self.assertRaises(SystemExit):
- creation._create_test_db(verbosity=0, autoclobber=False, keepdb=False)
|