123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 |
- import copy
- import datetime
- import re
- from django.db import DatabaseError
- from django.db.backends.base.schema import (
- BaseDatabaseSchemaEditor,
- _related_non_m2m_objects,
- )
- from django.utils.duration import duration_iso_string
- class DatabaseSchemaEditor(BaseDatabaseSchemaEditor):
- sql_create_column = "ALTER TABLE %(table)s ADD %(column)s %(definition)s"
- sql_alter_column_type = "MODIFY %(column)s %(type)s"
- sql_alter_column_null = "MODIFY %(column)s NULL"
- sql_alter_column_not_null = "MODIFY %(column)s NOT NULL"
- sql_alter_column_default = "MODIFY %(column)s DEFAULT %(default)s"
- sql_alter_column_no_default = "MODIFY %(column)s DEFAULT NULL"
- sql_alter_column_no_default_null = sql_alter_column_no_default
- sql_alter_column_collate = "MODIFY %(column)s %(type)s%(collation)s"
- sql_delete_column = "ALTER TABLE %(table)s DROP COLUMN %(column)s"
- sql_create_column_inline_fk = (
- "CONSTRAINT %(name)s REFERENCES %(to_table)s(%(to_column)s)%(deferrable)s"
- )
- sql_delete_table = "DROP TABLE %(table)s CASCADE CONSTRAINTS"
- sql_create_index = "CREATE INDEX %(name)s ON %(table)s (%(columns)s)%(extra)s"
- def quote_value(self, value):
- if isinstance(value, (datetime.date, datetime.time, datetime.datetime)):
- return "'%s'" % value
- elif isinstance(value, datetime.timedelta):
- return "'%s'" % duration_iso_string(value)
- elif isinstance(value, str):
- return "'%s'" % value.replace("'", "''").replace("%", "%%")
- elif isinstance(value, (bytes, bytearray, memoryview)):
- return "'%s'" % value.hex()
- elif isinstance(value, bool):
- return "1" if value else "0"
- else:
- return str(value)
- def remove_field(self, model, field):
- # If the column is an identity column, drop the identity before
- # removing the field.
- if self._is_identity_column(model._meta.db_table, field.column):
- self._drop_identity(model._meta.db_table, field.column)
- super().remove_field(model, field)
- def delete_model(self, model):
- # Run superclass action
- super().delete_model(model)
- # Clean up manually created sequence.
- self.execute(
- """
- DECLARE
- i INTEGER;
- BEGIN
- SELECT COUNT(1) INTO i FROM USER_SEQUENCES
- WHERE SEQUENCE_NAME = '%(sq_name)s';
- IF i = 1 THEN
- EXECUTE IMMEDIATE 'DROP SEQUENCE "%(sq_name)s"';
- END IF;
- END;
- /"""
- % {
- "sq_name": self.connection.ops._get_no_autofield_sequence_name(
- model._meta.db_table
- )
- }
- )
- def alter_field(self, model, old_field, new_field, strict=False):
- try:
- super().alter_field(model, old_field, new_field, strict)
- except DatabaseError as e:
- description = str(e)
- # If we're changing type to an unsupported type we need a
- # SQLite-ish workaround
- if "ORA-22858" in description or "ORA-22859" in description:
- self._alter_field_type_workaround(model, old_field, new_field)
- # If an identity column is changing to a non-numeric type, drop the
- # identity first.
- elif "ORA-30675" in description:
- self._drop_identity(model._meta.db_table, old_field.column)
- self.alter_field(model, old_field, new_field, strict)
- # If a primary key column is changing to an identity column, drop
- # the primary key first.
- elif "ORA-30673" in description and old_field.primary_key:
- self._delete_primary_key(model, strict=True)
- self._alter_field_type_workaround(model, old_field, new_field)
- else:
- raise
- def _alter_field_type_workaround(self, model, old_field, new_field):
- """
- Oracle refuses to change from some type to other type.
- What we need to do instead is:
- - Add a nullable version of the desired field with a temporary name. If
- the new column is an auto field, then the temporary column can't be
- nullable.
- - Update the table to transfer values from old to new
- - Drop old column
- - Rename the new column and possibly drop the nullable property
- """
- # Make a new field that's like the new one but with a temporary
- # column name.
- new_temp_field = copy.deepcopy(new_field)
- new_temp_field.null = new_field.get_internal_type() not in (
- "AutoField",
- "BigAutoField",
- "SmallAutoField",
- )
- new_temp_field.column = self._generate_temp_name(new_field.column)
- # Add it
- self.add_field(model, new_temp_field)
- # Explicit data type conversion
- # https://docs.oracle.com/en/database/oracle/oracle-database/18/sqlrf
- # /Data-Type-Comparison-Rules.html#GUID-D0C5A47E-6F93-4C2D-9E49-4F2B86B359DD
- new_value = self.quote_name(old_field.column)
- old_type = old_field.db_type(self.connection)
- if re.match("^N?CLOB", old_type):
- new_value = "TO_CHAR(%s)" % new_value
- old_type = "VARCHAR2"
- if re.match("^N?VARCHAR2", old_type):
- new_internal_type = new_field.get_internal_type()
- if new_internal_type == "DateField":
- new_value = "TO_DATE(%s, 'YYYY-MM-DD')" % new_value
- elif new_internal_type == "DateTimeField":
- new_value = "TO_TIMESTAMP(%s, 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
- elif new_internal_type == "TimeField":
- # TimeField are stored as TIMESTAMP with a 1900-01-01 date part.
- new_value = "CONCAT('1900-01-01 ', %s)" % new_value
- new_value = "TO_TIMESTAMP(%s, 'YYYY-MM-DD HH24:MI:SS.FF')" % new_value
- # Transfer values across
- self.execute(
- "UPDATE %s set %s=%s"
- % (
- self.quote_name(model._meta.db_table),
- self.quote_name(new_temp_field.column),
- new_value,
- )
- )
- # Drop the old field
- self.remove_field(model, old_field)
- # Rename and possibly make the new field NOT NULL
- super().alter_field(model, new_temp_field, new_field)
- # Recreate foreign key (if necessary) because the old field is not
- # passed to the alter_field() and data types of new_temp_field and
- # new_field always match.
- new_type = new_field.db_type(self.connection)
- if (
- (old_field.primary_key and new_field.primary_key)
- or (old_field.unique and new_field.unique)
- ) and old_type != new_type:
- for _, rel in _related_non_m2m_objects(new_temp_field, new_field):
- if rel.field.db_constraint:
- self.execute(
- self._create_fk_sql(rel.related_model, rel.field, "_fk")
- )
- def _alter_column_type_sql(self, model, old_field, new_field, new_type):
- auto_field_types = {"AutoField", "BigAutoField", "SmallAutoField"}
- # Drop the identity if migrating away from AutoField.
- if (
- old_field.get_internal_type() in auto_field_types
- and new_field.get_internal_type() not in auto_field_types
- and self._is_identity_column(model._meta.db_table, new_field.column)
- ):
- self._drop_identity(model._meta.db_table, new_field.column)
- return super()._alter_column_type_sql(model, old_field, new_field, new_type)
- def normalize_name(self, name):
- """
- Get the properly shortened and uppercased identifier as returned by
- quote_name() but without the quotes.
- """
- nn = self.quote_name(name)
- if nn[0] == '"' and nn[-1] == '"':
- nn = nn[1:-1]
- return nn
- def _generate_temp_name(self, for_name):
- """Generate temporary names for workarounds that need temp columns."""
- suffix = hex(hash(for_name)).upper()[1:]
- return self.normalize_name(for_name + "_" + suffix)
- def prepare_default(self, value):
- return self.quote_value(value)
- def _field_should_be_indexed(self, model, field):
- create_index = super()._field_should_be_indexed(model, field)
- db_type = field.db_type(self.connection)
- if (
- db_type is not None
- and db_type.lower() in self.connection._limited_data_types
- ):
- return False
- return create_index
- def _is_identity_column(self, table_name, column_name):
- with self.connection.cursor() as cursor:
- cursor.execute(
- """
- SELECT
- CASE WHEN identity_column = 'YES' THEN 1 ELSE 0 END
- FROM user_tab_cols
- WHERE table_name = %s AND
- column_name = %s
- """,
- [self.normalize_name(table_name), self.normalize_name(column_name)],
- )
- row = cursor.fetchone()
- return row[0] if row else False
- def _drop_identity(self, table_name, column_name):
- self.execute(
- "ALTER TABLE %(table)s MODIFY %(column)s DROP IDENTITY"
- % {
- "table": self.quote_name(table_name),
- "column": self.quote_name(column_name),
- }
- )
- def _get_default_collation(self, table_name):
- with self.connection.cursor() as cursor:
- cursor.execute(
- """
- SELECT default_collation FROM user_tables WHERE table_name = %s
- """,
- [self.normalize_name(table_name)],
- )
- return cursor.fetchone()[0]
- def _alter_column_collation_sql(self, model, new_field, new_type, new_collation):
- if new_collation is None:
- new_collation = self._get_default_collation(model._meta.db_table)
- return super()._alter_column_collation_sql(
- model, new_field, new_type, new_collation
- )
|