123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- import os
- import re
- from io import StringIO
- from unittest import mock, skipUnless
- from django.core.management import call_command
- from django.db import connection
- from django.db.backends.base.introspection import TableInfo
- from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
- from .models import PeopleMoreData, test_collation
- def inspectdb_tables_only(table_name):
- """
- Limit introspection to tables created for models of this app.
- Some databases such as Oracle are extremely slow at introspection.
- """
- return table_name.startswith("inspectdb_")
- def inspectdb_views_only(table_name):
- return table_name.startswith("inspectdb_") and table_name.endswith(
- ("_materialized", "_view")
- )
- def special_table_only(table_name):
- return table_name.startswith("inspectdb_special")
- class InspectDBTestCase(TestCase):
- unique_re = re.compile(r".*unique_together = \((.+),\).*")
- def test_stealth_table_name_filter_option(self):
- out = StringIO()
- call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
- error_message = (
- "inspectdb has examined a table that should have been filtered out."
- )
- # contrib.contenttypes is one of the apps always installed when running
- # the Django test suite, check that one of its tables hasn't been
- # inspected
- self.assertNotIn(
- "class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
- )
- def test_table_option(self):
- """
- inspectdb can inspect a subset of tables by passing the table names as
- arguments.
- """
- out = StringIO()
- call_command("inspectdb", "inspectdb_people", stdout=out)
- output = out.getvalue()
- self.assertIn("class InspectdbPeople(models.Model):", output)
- self.assertNotIn("InspectdbPeopledata", output)
- def make_field_type_asserter(self):
- """
- Call inspectdb and return a function to validate a field type in its
- output.
- """
- out = StringIO()
- call_command("inspectdb", "inspectdb_columntypes", stdout=out)
- output = out.getvalue()
- def assertFieldType(name, definition):
- out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
- self.assertEqual(definition, out_def)
- return assertFieldType
- def test_field_types(self):
- """Test introspection of various Django field types"""
- assertFieldType = self.make_field_type_asserter()
- introspected_field_types = connection.features.introspected_field_types
- char_field_type = introspected_field_types["CharField"]
- # Inspecting Oracle DB doesn't produce correct results (#19884):
- # - it reports fields as blank=True when they aren't.
- if (
- not connection.features.interprets_empty_strings_as_nulls
- and char_field_type == "CharField"
- ):
- assertFieldType("char_field", "models.CharField(max_length=10)")
- assertFieldType(
- "null_char_field",
- "models.CharField(max_length=10, blank=True, null=True)",
- )
- assertFieldType("email_field", "models.CharField(max_length=254)")
- assertFieldType("file_field", "models.CharField(max_length=100)")
- assertFieldType("file_path_field", "models.CharField(max_length=100)")
- assertFieldType("slug_field", "models.CharField(max_length=50)")
- assertFieldType("text_field", "models.TextField()")
- assertFieldType("url_field", "models.CharField(max_length=200)")
- if char_field_type == "TextField":
- assertFieldType("char_field", "models.TextField()")
- assertFieldType(
- "null_char_field", "models.TextField(blank=True, null=True)"
- )
- assertFieldType("email_field", "models.TextField()")
- assertFieldType("file_field", "models.TextField()")
- assertFieldType("file_path_field", "models.TextField()")
- assertFieldType("slug_field", "models.TextField()")
- assertFieldType("text_field", "models.TextField()")
- assertFieldType("url_field", "models.TextField()")
- assertFieldType("date_field", "models.DateField()")
- assertFieldType("date_time_field", "models.DateTimeField()")
- if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
- assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
- elif not connection.features.interprets_empty_strings_as_nulls:
- assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
- assertFieldType(
- "time_field", "models.%s()" % introspected_field_types["TimeField"]
- )
- if connection.features.has_native_uuid_field:
- assertFieldType("uuid_field", "models.UUIDField()")
- elif not connection.features.interprets_empty_strings_as_nulls:
- assertFieldType("uuid_field", "models.CharField(max_length=32)")
- @skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
- def test_json_field(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
- output = out.getvalue()
- if not connection.features.interprets_empty_strings_as_nulls:
- self.assertIn("json_field = models.JSONField()", output)
- self.assertIn(
- "null_json_field = models.JSONField(blank=True, null=True)", output
- )
- @skipUnlessDBFeature("supports_comments")
- def test_db_comments(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_dbcomment", stdout=out)
- output = out.getvalue()
- integer_field_type = connection.features.introspected_field_types[
- "IntegerField"
- ]
- self.assertIn(
- f"rank = models.{integer_field_type}("
- f"db_comment=\"'Rank' column comment\")",
- output,
- )
- self.assertIn(
- " db_table_comment = 'Custom table comment'",
- output,
- )
- @skipUnlessDBFeature("supports_collation_on_charfield")
- @skipUnless(test_collation, "Language collations are not supported.")
- def test_char_field_db_collation(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
- output = out.getvalue()
- if not connection.features.interprets_empty_strings_as_nulls:
- self.assertIn(
- "char_field = models.CharField(max_length=10, "
- "db_collation='%s')" % test_collation,
- output,
- )
- else:
- self.assertIn(
- "char_field = models.CharField(max_length=10, "
- "db_collation='%s', blank=True, null=True)" % test_collation,
- output,
- )
- @skipUnlessDBFeature("supports_collation_on_textfield")
- @skipUnless(test_collation, "Language collations are not supported.")
- def test_text_field_db_collation(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
- output = out.getvalue()
- if not connection.features.interprets_empty_strings_as_nulls:
- self.assertIn(
- "text_field = models.TextField(db_collation='%s')" % test_collation,
- output,
- )
- else:
- self.assertIn(
- "text_field = models.TextField(db_collation='%s, blank=True, "
- "null=True)" % test_collation,
- output,
- )
- @skipUnlessDBFeature("supports_unlimited_charfield")
- def test_char_field_unlimited(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_charfieldunlimited", stdout=out)
- output = out.getvalue()
- self.assertIn("char_field = models.CharField()", output)
- def test_number_field_types(self):
- """Test introspection of various Django field types"""
- assertFieldType = self.make_field_type_asserter()
- introspected_field_types = connection.features.introspected_field_types
- auto_field_type = connection.features.introspected_field_types["AutoField"]
- if auto_field_type != "AutoField":
- assertFieldType(
- "id", "models.%s(primary_key=True) # AutoField?" % auto_field_type
- )
- assertFieldType(
- "big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
- )
- bool_field_type = introspected_field_types["BooleanField"]
- assertFieldType("bool_field", "models.{}()".format(bool_field_type))
- assertFieldType(
- "null_bool_field",
- "models.{}(blank=True, null=True)".format(bool_field_type),
- )
- if connection.vendor != "sqlite":
- assertFieldType(
- "decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
- )
- else: # Guessed arguments on SQLite, see #5014
- assertFieldType(
- "decimal_field",
- "models.DecimalField(max_digits=10, decimal_places=5) "
- "# max_digits and decimal_places have been guessed, "
- "as this database handles decimal fields as float",
- )
- assertFieldType("float_field", "models.FloatField()")
- assertFieldType(
- "int_field", "models.%s()" % introspected_field_types["IntegerField"]
- )
- assertFieldType(
- "pos_int_field",
- "models.%s()" % introspected_field_types["PositiveIntegerField"],
- )
- assertFieldType(
- "pos_big_int_field",
- "models.%s()" % introspected_field_types["PositiveBigIntegerField"],
- )
- assertFieldType(
- "pos_small_int_field",
- "models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
- )
- assertFieldType(
- "small_int_field",
- "models.%s()" % introspected_field_types["SmallIntegerField"],
- )
- @skipUnlessDBFeature("can_introspect_foreign_keys")
- def test_attribute_name_not_python_keyword(self):
- out = StringIO()
- call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
- output = out.getvalue()
- error_message = (
- "inspectdb generated an attribute name which is a Python keyword"
- )
- # Recursive foreign keys should be set to 'self'
- self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
- self.assertNotIn(
- "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
- output,
- msg=error_message,
- )
- # As InspectdbPeople model is defined after InspectdbMessage, it should
- # be quoted.
- self.assertIn(
- "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
- "db_column='from_id')",
- output,
- )
- self.assertIn(
- "people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, "
- "primary_key=True)",
- output,
- )
- self.assertIn(
- "people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
- output,
- )
- @skipUnlessDBFeature("can_introspect_foreign_keys")
- def test_foreign_key_to_field(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
- self.assertIn(
- "to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
- "models.DO_NOTHING, to_field='people_unique_id')",
- out.getvalue(),
- )
- def test_digits_column_name_introspection(self):
- """Introspection of column names consist/start with digits (#16536/#17676)"""
- char_field_type = connection.features.introspected_field_types["CharField"]
- out = StringIO()
- call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
- output = out.getvalue()
- error_message = "inspectdb generated a model field name which is a number"
- self.assertNotIn(
- " 123 = models.%s" % char_field_type, output, msg=error_message
- )
- self.assertIn("number_123 = models.%s" % char_field_type, output)
- error_message = (
- "inspectdb generated a model field name which starts with a digit"
- )
- self.assertNotIn(
- " 4extra = models.%s" % char_field_type, output, msg=error_message
- )
- self.assertIn("number_4extra = models.%s" % char_field_type, output)
- self.assertNotIn(
- " 45extra = models.%s" % char_field_type, output, msg=error_message
- )
- self.assertIn("number_45extra = models.%s" % char_field_type, output)
- def test_special_column_name_introspection(self):
- """
- Introspection of column names containing special characters,
- unsuitable for Python identifiers
- """
- out = StringIO()
- call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
- output = out.getvalue()
- base_name = connection.introspection.identifier_converter("Field")
- integer_field_type = connection.features.introspected_field_types[
- "IntegerField"
- ]
- self.assertIn("field = models.%s()" % integer_field_type, output)
- self.assertIn(
- "field_field = models.%s(db_column='%s_')"
- % (integer_field_type, base_name),
- output,
- )
- self.assertIn(
- "field_field_0 = models.%s(db_column='%s__')"
- % (integer_field_type, base_name),
- output,
- )
- self.assertIn(
- "field_field_1 = models.%s(db_column='__field')" % integer_field_type,
- output,
- )
- self.assertIn(
- "prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
- )
- self.assertIn("tamaño = models.%s()" % integer_field_type, output)
- def test_table_name_introspection(self):
- """
- Introspection of table names containing special characters,
- unsuitable for Python identifiers
- """
- out = StringIO()
- call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
- output = out.getvalue()
- self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
- @skipUnlessDBFeature("supports_expression_indexes")
- def test_table_with_func_unique_constraint(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_funcuniqueconstraint", stdout=out)
- output = out.getvalue()
- self.assertIn("class InspectdbFuncuniqueconstraint(models.Model):", output)
- def test_managed_models(self):
- """
- By default the command generates models with `Meta.managed = False`.
- """
- out = StringIO()
- call_command("inspectdb", "inspectdb_columntypes", stdout=out)
- output = out.getvalue()
- self.longMessage = False
- self.assertIn(
- " managed = False",
- output,
- msg="inspectdb should generate unmanaged models.",
- )
- def test_unique_together_meta(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
- output = out.getvalue()
- self.assertIn(" unique_together = (('", output)
- unique_together_match = self.unique_re.findall(output)
- # There should be one unique_together tuple.
- self.assertEqual(len(unique_together_match), 1)
- fields = unique_together_match[0]
- # Fields with db_column = field name.
- self.assertIn("('field1', 'field2')", fields)
- # Fields from columns whose names are Python keywords.
- self.assertIn("('field1', 'field2')", fields)
- # Fields whose names normalize to the same Python field name and hence
- # are given an integer suffix.
- self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
- @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
- def test_unsupported_unique_together(self):
- """Unsupported index types (COALESCE here) are skipped."""
- with connection.cursor() as c:
- c.execute(
- "CREATE UNIQUE INDEX Findex ON %s "
- "(id, people_unique_id, COALESCE(message_id, -1))"
- % PeopleMoreData._meta.db_table
- )
- try:
- out = StringIO()
- call_command(
- "inspectdb",
- table_name_filter=lambda tn: tn.startswith(
- PeopleMoreData._meta.db_table
- ),
- stdout=out,
- )
- output = out.getvalue()
- self.assertIn("# A unique constraint could not be introspected.", output)
- self.assertEqual(
- self.unique_re.findall(output), ["('id', 'people_unique')"]
- )
- finally:
- with connection.cursor() as c:
- c.execute("DROP INDEX Findex")
- @skipUnless(
- connection.vendor == "sqlite",
- "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
- )
- def test_custom_fields(self):
- """
- Introspection of columns with a custom field (#21090)
- """
- out = StringIO()
- with mock.patch(
- "django.db.connection.introspection.data_types_reverse."
- "base_data_types_reverse",
- {
- "text": "myfields.TextField",
- "bigint": "BigIntegerField",
- },
- ):
- call_command("inspectdb", "inspectdb_columntypes", stdout=out)
- output = out.getvalue()
- self.assertIn("text_field = myfields.TextField()", output)
- self.assertIn("big_int_field = models.BigIntegerField()", output)
- def test_introspection_errors(self):
- """
- Introspection errors should not crash the command, and the error should
- be visible in the output.
- """
- out = StringIO()
- with mock.patch(
- "django.db.connection.introspection.get_table_list",
- return_value=[TableInfo(name="nonexistent", type="t")],
- ):
- call_command("inspectdb", stdout=out)
- output = out.getvalue()
- self.assertIn("# Unable to inspect table 'nonexistent'", output)
- # The error message depends on the backend
- self.assertIn("# The error was:", output)
- def test_same_relations(self):
- out = StringIO()
- call_command("inspectdb", "inspectdb_message", stdout=out)
- self.assertIn(
- "author = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
- "related_name='inspectdbmessage_author_set')",
- out.getvalue(),
- )
- class InspectDBTransactionalTests(TransactionTestCase):
- available_apps = ["inspectdb"]
- def test_include_views(self):
- """inspectdb --include-views creates models for database views."""
- with connection.cursor() as cursor:
- cursor.execute(
- "CREATE VIEW inspectdb_people_view AS "
- "SELECT id, name FROM inspectdb_people"
- )
- out = StringIO()
- view_model = "class InspectdbPeopleView(models.Model):"
- view_managed = "managed = False # Created from a view."
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- stdout=out,
- )
- no_views_output = out.getvalue()
- self.assertNotIn(view_model, no_views_output)
- self.assertNotIn(view_managed, no_views_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- include_views=True,
- stdout=out,
- )
- with_views_output = out.getvalue()
- self.assertIn(view_model, with_views_output)
- self.assertIn(view_managed, with_views_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP VIEW inspectdb_people_view")
- @skipUnlessDBFeature("can_introspect_materialized_views")
- def test_include_materialized_views(self):
- """inspectdb --include-views creates models for materialized views."""
- with connection.cursor() as cursor:
- cursor.execute(
- "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
- "SELECT id, name FROM inspectdb_people"
- )
- out = StringIO()
- view_model = "class InspectdbPeopleMaterialized(models.Model):"
- view_managed = "managed = False # Created from a view."
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- stdout=out,
- )
- no_views_output = out.getvalue()
- self.assertNotIn(view_model, no_views_output)
- self.assertNotIn(view_managed, no_views_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_views_only,
- include_views=True,
- stdout=out,
- )
- with_views_output = out.getvalue()
- self.assertIn(view_model, with_views_output)
- self.assertIn(view_managed, with_views_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
- @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
- def test_include_partitions(self):
- """inspectdb --include-partitions creates models for partitions."""
- with connection.cursor() as cursor:
- cursor.execute(
- """\
- CREATE TABLE inspectdb_partition_parent (name text not null)
- PARTITION BY LIST (left(upper(name), 1))
- """
- )
- cursor.execute(
- """\
- CREATE TABLE inspectdb_partition_child
- PARTITION OF inspectdb_partition_parent
- FOR VALUES IN ('A', 'B', 'C')
- """
- )
- out = StringIO()
- partition_model_parent = "class InspectdbPartitionParent(models.Model):"
- partition_model_child = "class InspectdbPartitionChild(models.Model):"
- partition_managed = "managed = False # Created from a partition."
- try:
- call_command(
- "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
- )
- no_partitions_output = out.getvalue()
- self.assertIn(partition_model_parent, no_partitions_output)
- self.assertNotIn(partition_model_child, no_partitions_output)
- self.assertNotIn(partition_managed, no_partitions_output)
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_tables_only,
- include_partitions=True,
- stdout=out,
- )
- with_partitions_output = out.getvalue()
- self.assertIn(partition_model_parent, with_partitions_output)
- self.assertIn(partition_model_child, with_partitions_output)
- self.assertIn(partition_managed, with_partitions_output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
- cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
- @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
- def test_foreign_data_wrapper(self):
- with connection.cursor() as cursor:
- cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
- cursor.execute(
- "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
- )
- cursor.execute(
- connection.ops.compose_sql(
- """
- CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
- petal_length real,
- petal_width real,
- sepal_length real,
- sepal_width real
- ) SERVER inspectdb_server OPTIONS (
- filename %s
- )
- """,
- [os.devnull],
- )
- )
- out = StringIO()
- foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
- foreign_table_managed = "managed = False"
- try:
- call_command(
- "inspectdb",
- table_name_filter=inspectdb_tables_only,
- stdout=out,
- )
- output = out.getvalue()
- self.assertIn(foreign_table_model, output)
- self.assertIn(foreign_table_managed, output)
- finally:
- with connection.cursor() as cursor:
- cursor.execute(
- "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
- )
- cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
- cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
- @skipUnlessDBFeature("create_test_table_with_composite_primary_key")
- def test_composite_primary_key(self):
- table_name = "test_table_composite_pk"
- with connection.cursor() as cursor:
- cursor.execute(
- connection.features.create_test_table_with_composite_primary_key
- )
- out = StringIO()
- if connection.vendor == "sqlite":
- field_type = connection.features.introspected_field_types["AutoField"]
- else:
- field_type = connection.features.introspected_field_types["IntegerField"]
- try:
- call_command("inspectdb", table_name, stdout=out)
- output = out.getvalue()
- self.assertIn(
- f"column_1 = models.{field_type}(primary_key=True) # The composite "
- f"primary key (column_1, column_2) found, that is not supported. The "
- f"first column is selected.",
- output,
- )
- self.assertIn(
- "column_2 = models.%s()"
- % connection.features.introspected_field_types["IntegerField"],
- output,
- )
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP TABLE %s" % table_name)
|