123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- import sys
- from django.conf import settings
- from django.db.backends.base.creation import BaseDatabaseCreation
- from django.db.utils import DatabaseError
- from django.utils.crypto import get_random_string
- from django.utils.functional import cached_property
- TEST_DATABASE_PREFIX = 'test_'
- class DatabaseCreation(BaseDatabaseCreation):
- @cached_property
- def _maindb_connection(self):
- """
- This is analogous to other backends' `_nodb_connection` property,
- which allows access to an "administrative" connection which can
- be used to manage the test databases.
- For Oracle, the only connection that can be used for that purpose
- is the main (non-test) connection.
- """
- settings_dict = settings.DATABASES[self.connection.alias]
- user = settings_dict.get('SAVED_USER') or settings_dict['USER']
- password = settings_dict.get('SAVED_PASSWORD') or settings_dict['PASSWORD']
- settings_dict = settings_dict.copy()
- settings_dict.update(USER=user, PASSWORD=password)
- DatabaseWrapper = type(self.connection)
- return DatabaseWrapper(settings_dict, alias=self.connection.alias)
- def _create_test_db(self, verbosity=1, autoclobber=False, keepdb=False):
- parameters = self._get_test_db_params()
- with self._maindb_connection.cursor() as cursor:
- if self._test_database_create():
- try:
- self._execute_test_db_creation(cursor, parameters, verbosity, keepdb)
- except Exception as e:
- if 'ORA-01543' not in str(e):
-
- sys.stderr.write("Got an error creating the test database: %s\n" % e)
- sys.exit(2)
- if not autoclobber:
- confirm = input(
- "It appears the test database, %s, already exists. "
- "Type 'yes' to delete it, or 'no' to cancel: " % parameters['user'])
- if autoclobber or confirm == 'yes':
- if verbosity >= 1:
- print("Destroying old test database for alias '%s'..." % self.connection.alias)
- try:
- self._execute_test_db_destruction(cursor, parameters, verbosity)
- except DatabaseError as e:
- if 'ORA-29857' in str(e):
- self._handle_objects_preventing_db_destruction(cursor, parameters,
- verbosity, autoclobber)
- else:
-
- sys.stderr.write("Got an error destroying the old test database: %s\n" % e)
- sys.exit(2)
- except Exception as e:
- sys.stderr.write("Got an error destroying the old test database: %s\n" % e)
- sys.exit(2)
- try:
- self._execute_test_db_creation(cursor, parameters, verbosity, keepdb)
- except Exception as e:
- sys.stderr.write("Got an error recreating the test database: %s\n" % e)
- sys.exit(2)
- else:
- print("Tests cancelled.")
- sys.exit(1)
- if self._test_user_create():
- if verbosity >= 1:
- print("Creating test user...")
- try:
- self._create_test_user(cursor, parameters, verbosity, keepdb)
- except Exception as e:
- if 'ORA-01920' not in str(e):
-
- sys.stderr.write("Got an error creating the test user: %s\n" % e)
- sys.exit(2)
- if not autoclobber:
- confirm = input(
- "It appears the test user, %s, already exists. Type "
- "'yes' to delete it, or 'no' to cancel: " % parameters['user'])
- if autoclobber or confirm == 'yes':
- try:
- if verbosity >= 1:
- print("Destroying old test user...")
- self._destroy_test_user(cursor, parameters, verbosity)
- if verbosity >= 1:
- print("Creating test user...")
- self._create_test_user(cursor, parameters, verbosity, keepdb)
- except Exception as e:
- sys.stderr.write("Got an error recreating the test user: %s\n" % e)
- sys.exit(2)
- else:
- print("Tests cancelled.")
- sys.exit(1)
- self._maindb_connection.close()
- self._switch_to_test_user(parameters)
- return self.connection.settings_dict['NAME']
- def _switch_to_test_user(self, parameters):
- """
- Switch to the user that's used for creating the test database.
- Oracle doesn't have the concept of separate databases under the same
- user, so a separate user is used; see _create_test_db(). The main user
- is also needed for cleanup when testing is completed, so save its
- credentials in the SAVED_USER/SAVED_PASSWORD key in the settings dict.
- """
- real_settings = settings.DATABASES[self.connection.alias]
- real_settings['SAVED_USER'] = self.connection.settings_dict['SAVED_USER'] = \
- self.connection.settings_dict['USER']
- real_settings['SAVED_PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD'] = \
- self.connection.settings_dict['PASSWORD']
- real_test_settings = real_settings['TEST']
- test_settings = self.connection.settings_dict['TEST']
- real_test_settings['USER'] = real_settings['USER'] = test_settings['USER'] = \
- self.connection.settings_dict['USER'] = parameters['user']
- real_settings['PASSWORD'] = self.connection.settings_dict['PASSWORD'] = parameters['password']
- def set_as_test_mirror(self, primary_settings_dict):
- """
- Set this database up to be used in testing as a mirror of a primary
- database whose settings are given.
- """
- self.connection.settings_dict['USER'] = primary_settings_dict['USER']
- self.connection.settings_dict['PASSWORD'] = primary_settings_dict['PASSWORD']
- def _handle_objects_preventing_db_destruction(self, cursor, parameters, verbosity, autoclobber):
-
-
- print("There are objects in the old test database which prevent its destruction.")
- print("If they belong to the test user, deleting the user will allow the test "
- "database to be recreated.")
- print("Otherwise, you will need to find and remove each of these objects, "
- "or use a different tablespace.\n")
- if self._test_user_create():
- if not autoclobber:
- confirm = input("Type 'yes' to delete user %s: " % parameters['user'])
- if autoclobber or confirm == 'yes':
- try:
- if verbosity >= 1:
- print("Destroying old test user...")
- self._destroy_test_user(cursor, parameters, verbosity)
- except Exception as e:
- sys.stderr.write("Got an error destroying the test user: %s\n" % e)
- sys.exit(2)
- try:
- if verbosity >= 1:
- print("Destroying old test database for alias '%s'..." % self.connection.alias)
- self._execute_test_db_destruction(cursor, parameters, verbosity)
- except Exception as e:
- sys.stderr.write("Got an error destroying the test database: %s\n" % e)
- sys.exit(2)
- else:
- print("Tests cancelled -- test database cannot be recreated.")
- sys.exit(1)
- else:
- print("Django is configured to use pre-existing test user '%s',"
- " and will not attempt to delete it.\n" % parameters['user'])
- print("Tests cancelled -- test database cannot be recreated.")
- sys.exit(1)
- def _destroy_test_db(self, test_database_name, verbosity=1):
- """
- Destroy a test database, prompting the user for confirmation if the
- database already exists. Return the name of the test database created.
- """
- self.connection.settings_dict['USER'] = self.connection.settings_dict['SAVED_USER']
- self.connection.settings_dict['PASSWORD'] = self.connection.settings_dict['SAVED_PASSWORD']
- self.connection.close()
- parameters = self._get_test_db_params()
- with self._maindb_connection.cursor() as cursor:
- if self._test_user_create():
- if verbosity >= 1:
- print('Destroying test user...')
- self._destroy_test_user(cursor, parameters, verbosity)
- if self._test_database_create():
- if verbosity >= 1:
- print('Destroying test database tables...')
- self._execute_test_db_destruction(cursor, parameters, verbosity)
- self._maindb_connection.close()
- def _execute_test_db_creation(self, cursor, parameters, verbosity, keepdb=False):
- if verbosity >= 2:
- print("_create_test_db(): dbname = %s" % parameters['user'])
- statements = [
- """CREATE TABLESPACE %(tblspace)s
- DATAFILE '%(datafile)s' SIZE %(size)s
- REUSE AUTOEXTEND ON NEXT %(extsize)s MAXSIZE %(maxsize)s
- """,
- """CREATE TEMPORARY TABLESPACE %(tblspace_temp)s
- TEMPFILE '%(datafile_tmp)s' SIZE %(size_tmp)s
- REUSE AUTOEXTEND ON NEXT %(extsize_tmp)s MAXSIZE %(maxsize_tmp)s
- """,
- ]
-
- acceptable_ora_err = 'ORA-01543' if keepdb else None
- self._execute_allow_fail_statements(cursor, statements, parameters, verbosity, acceptable_ora_err)
- def _create_test_user(self, cursor, parameters, verbosity, keepdb=False):
- if verbosity >= 2:
- print("_create_test_user(): username = %s" % parameters['user'])
- statements = [
- """CREATE USER %(user)s
- IDENTIFIED BY "%(password)s"
- DEFAULT TABLESPACE %(tblspace)s
- TEMPORARY TABLESPACE %(tblspace_temp)s
- QUOTA UNLIMITED ON %(tblspace)s
- """,
- """GRANT CREATE SESSION,
- CREATE TABLE,
- CREATE SEQUENCE,
- CREATE PROCEDURE,
- CREATE TRIGGER
- TO %(user)s""",
- ]
-
- acceptable_ora_err = 'ORA-01920' if keepdb else None
- success = self._execute_allow_fail_statements(cursor, statements, parameters, verbosity, acceptable_ora_err)
-
- if not success and self._test_settings_get('PASSWORD') is None:
- set_password = 'ALTER USER %(user)s IDENTIFIED BY "%(password)s"'
- self._execute_statements(cursor, [set_password], parameters, verbosity)
-
- extra = "GRANT CREATE VIEW TO %(user)s"
- success = self._execute_allow_fail_statements(cursor, [extra], parameters, verbosity, 'ORA-01031')
- if not success and verbosity >= 2:
- print("Failed to grant CREATE VIEW permission to test user. This may be ok.")
- def _execute_test_db_destruction(self, cursor, parameters, verbosity):
- if verbosity >= 2:
- print("_execute_test_db_destruction(): dbname=%s" % parameters['user'])
- statements = [
- 'DROP TABLESPACE %(tblspace)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
- 'DROP TABLESPACE %(tblspace_temp)s INCLUDING CONTENTS AND DATAFILES CASCADE CONSTRAINTS',
- ]
- self._execute_statements(cursor, statements, parameters, verbosity)
- def _destroy_test_user(self, cursor, parameters, verbosity):
- if verbosity >= 2:
- print("_destroy_test_user(): user=%s" % parameters['user'])
- print("Be patient. This can take some time...")
- statements = [
- 'DROP USER %(user)s CASCADE',
- ]
- self._execute_statements(cursor, statements, parameters, verbosity)
- def _execute_statements(self, cursor, statements, parameters, verbosity, allow_quiet_fail=False):
- for template in statements:
- stmt = template % parameters
- if verbosity >= 2:
- print(stmt)
- try:
- cursor.execute(stmt)
- except Exception as err:
- if (not allow_quiet_fail) or verbosity >= 2:
- sys.stderr.write("Failed (%s)\n" % (err))
- raise
- def _execute_allow_fail_statements(self, cursor, statements, parameters, verbosity, acceptable_ora_err):
- """
- Execute statements which are allowed to fail silently if the Oracle
- error code given by `acceptable_ora_err` is raised. Return True if the
- statements execute without an exception, or False otherwise.
- """
- try:
-
- allow_quiet_fail = acceptable_ora_err is not None and len(acceptable_ora_err) > 0
- self._execute_statements(cursor, statements, parameters, verbosity, allow_quiet_fail=allow_quiet_fail)
- return True
- except DatabaseError as err:
- description = str(err)
- if acceptable_ora_err is None or acceptable_ora_err not in description:
- raise
- return False
- def _get_test_db_params(self):
- return {
- 'dbname': self._test_database_name(),
- 'user': self._test_database_user(),
- 'password': self._test_database_passwd(),
- 'tblspace': self._test_database_tblspace(),
- 'tblspace_temp': self._test_database_tblspace_tmp(),
- 'datafile': self._test_database_tblspace_datafile(),
- 'datafile_tmp': self._test_database_tblspace_tmp_datafile(),
- 'maxsize': self._test_database_tblspace_maxsize(),
- 'maxsize_tmp': self._test_database_tblspace_tmp_maxsize(),
- 'size': self._test_database_tblspace_size(),
- 'size_tmp': self._test_database_tblspace_tmp_size(),
- 'extsize': self._test_database_tblspace_extsize(),
- 'extsize_tmp': self._test_database_tblspace_tmp_extsize(),
- }
- def _test_settings_get(self, key, default=None, prefixed=None):
- """
- Return a value from the test settings dict, or a given default, or a
- prefixed entry from the main settings dict.
- """
- settings_dict = self.connection.settings_dict
- val = settings_dict['TEST'].get(key, default)
- if val is None and prefixed:
- val = TEST_DATABASE_PREFIX + settings_dict[prefixed]
- return val
- def _test_database_name(self):
- return self._test_settings_get('NAME', prefixed='NAME')
- def _test_database_create(self):
- return self._test_settings_get('CREATE_DB', default=True)
- def _test_user_create(self):
- return self._test_settings_get('CREATE_USER', default=True)
- def _test_database_user(self):
- return self._test_settings_get('USER', prefixed='USER')
- def _test_database_passwd(self):
- password = self._test_settings_get('PASSWORD')
- if password is None and self._test_user_create():
-
- password = get_random_string(length=30)
- return password
- def _test_database_tblspace(self):
- return self._test_settings_get('TBLSPACE', prefixed='USER')
- def _test_database_tblspace_tmp(self):
- settings_dict = self.connection.settings_dict
- return settings_dict['TEST'].get('TBLSPACE_TMP',
- TEST_DATABASE_PREFIX + settings_dict['USER'] + '_temp')
- def _test_database_tblspace_datafile(self):
- tblspace = '%s.dbf' % self._test_database_tblspace()
- return self._test_settings_get('DATAFILE', default=tblspace)
- def _test_database_tblspace_tmp_datafile(self):
- tblspace = '%s.dbf' % self._test_database_tblspace_tmp()
- return self._test_settings_get('DATAFILE_TMP', default=tblspace)
- def _test_database_tblspace_maxsize(self):
- return self._test_settings_get('DATAFILE_MAXSIZE', default='500M')
- def _test_database_tblspace_tmp_maxsize(self):
- return self._test_settings_get('DATAFILE_TMP_MAXSIZE', default='500M')
- def _test_database_tblspace_size(self):
- return self._test_settings_get('DATAFILE_SIZE', default='50M')
- def _test_database_tblspace_tmp_size(self):
- return self._test_settings_get('DATAFILE_TMP_SIZE', default='50M')
- def _test_database_tblspace_extsize(self):
- return self._test_settings_get('DATAFILE_EXTSIZE', default='25M')
- def _test_database_tblspace_tmp_extsize(self):
- return self._test_settings_get('DATAFILE_TMP_EXTSIZE', default='25M')
- def _get_test_db_name(self):
- """
- Return the 'production' DB name to get the test DB creation machinery
- to work. This isn't a great deal in this case because DB names as
- handled by Django don't have real counterparts in Oracle.
- """
- return self.connection.settings_dict['NAME']
- def test_db_signature(self):
- settings_dict = self.connection.settings_dict
- return (
- settings_dict['HOST'],
- settings_dict['PORT'],
- settings_dict['ENGINE'],
- settings_dict['NAME'],
- self._test_database_user(),
- )
|