123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- from django.db import DatabaseError, connection
- from django.db.models import Index
- from django.test import TransactionTestCase, skipUnlessDBFeature
- from .models import (
- Article,
- ArticleReporter,
- CheckConstraintModel,
- City,
- Comment,
- Country,
- DbCommentModel,
- District,
- Reporter,
- UniqueConstraintConditionModel,
- )
- class IntrospectionTests(TransactionTestCase):
- available_apps = ["introspection"]
- def test_table_names(self):
- tl = connection.introspection.table_names()
- self.assertEqual(tl, sorted(tl))
- self.assertIn(
- Reporter._meta.db_table,
- tl,
- "'%s' isn't in table_list()." % Reporter._meta.db_table,
- )
- self.assertIn(
- Article._meta.db_table,
- tl,
- "'%s' isn't in table_list()." % Article._meta.db_table,
- )
- def test_django_table_names(self):
- with connection.cursor() as cursor:
- cursor.execute("CREATE TABLE django_ixn_test_table (id INTEGER);")
- tl = connection.introspection.django_table_names()
- cursor.execute("DROP TABLE django_ixn_test_table;")
- self.assertNotIn(
- "django_ixn_test_table",
- tl,
- "django_table_names() returned a non-Django table",
- )
- def test_django_table_names_retval_type(self):
- # Table name is a list #15216
- tl = connection.introspection.django_table_names(only_existing=True)
- self.assertIs(type(tl), list)
- tl = connection.introspection.django_table_names(only_existing=False)
- self.assertIs(type(tl), list)
- def test_table_names_with_views(self):
- with connection.cursor() as cursor:
- try:
- cursor.execute(
- "CREATE VIEW introspection_article_view AS SELECT headline "
- "from introspection_article;"
- )
- except DatabaseError as e:
- if "insufficient privileges" in str(e):
- self.fail("The test user has no CREATE VIEW privileges")
- else:
- raise
- try:
- self.assertIn(
- "introspection_article_view",
- connection.introspection.table_names(include_views=True),
- )
- self.assertNotIn(
- "introspection_article_view", connection.introspection.table_names()
- )
- finally:
- with connection.cursor() as cursor:
- cursor.execute("DROP VIEW introspection_article_view")
- def test_unmanaged_through_model(self):
- tables = connection.introspection.django_table_names()
- self.assertNotIn(ArticleReporter._meta.db_table, tables)
- def test_installed_models(self):
- tables = [Article._meta.db_table, Reporter._meta.db_table]
- models = connection.introspection.installed_models(tables)
- self.assertEqual(models, {Article, Reporter})
- def test_sequence_list(self):
- sequences = connection.introspection.sequence_list()
- reporter_seqs = [
- seq for seq in sequences if seq["table"] == Reporter._meta.db_table
- ]
- self.assertEqual(
- len(reporter_seqs), 1, "Reporter sequence not found in sequence_list()"
- )
- self.assertEqual(reporter_seqs[0]["column"], "id")
- def test_get_table_description_names(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, Reporter._meta.db_table
- )
- self.assertEqual(
- [r[0] for r in desc], [f.column for f in Reporter._meta.fields]
- )
- def test_get_table_description_types(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, Reporter._meta.db_table
- )
- self.assertEqual(
- [connection.introspection.get_field_type(r[1], r) for r in desc],
- [
- connection.features.introspected_field_types[field]
- for field in (
- "AutoField",
- "CharField",
- "CharField",
- "CharField",
- "BigIntegerField",
- "BinaryField",
- "SmallIntegerField",
- "DurationField",
- )
- ],
- )
- def test_get_table_description_col_lengths(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, Reporter._meta.db_table
- )
- self.assertEqual(
- [
- r[2]
- for r in desc
- if connection.introspection.get_field_type(r[1], r) == "CharField"
- ],
- [30, 30, 254],
- )
- def test_get_table_description_nullable(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, Reporter._meta.db_table
- )
- nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
- self.assertEqual(
- [r[6] for r in desc],
- [
- False,
- nullable_by_backend,
- nullable_by_backend,
- nullable_by_backend,
- True,
- True,
- False,
- False,
- ],
- )
- def test_bigautofield(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, City._meta.db_table
- )
- self.assertIn(
- connection.features.introspected_field_types["BigAutoField"],
- [connection.introspection.get_field_type(r[1], r) for r in desc],
- )
- def test_smallautofield(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, Country._meta.db_table
- )
- self.assertIn(
- connection.features.introspected_field_types["SmallAutoField"],
- [connection.introspection.get_field_type(r[1], r) for r in desc],
- )
- @skipUnlessDBFeature("supports_comments")
- def test_db_comments(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(
- cursor, DbCommentModel._meta.db_table
- )
- table_list = connection.introspection.get_table_list(cursor)
- self.assertEqual(
- ["'Name' column comment"],
- [field.comment for field in desc if field.name == "name"],
- )
- self.assertEqual(
- ["Custom table comment"],
- [
- table.comment
- for table in table_list
- if table.name == "introspection_dbcommentmodel"
- ],
- )
- # Regression test for #9991 - 'real' types in postgres
- @skipUnlessDBFeature("has_real_datatype")
- def test_postgresql_real_type(self):
- with connection.cursor() as cursor:
- cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
- desc = connection.introspection.get_table_description(
- cursor, "django_ixn_real_test_table"
- )
- cursor.execute("DROP TABLE django_ixn_real_test_table;")
- self.assertEqual(
- connection.introspection.get_field_type(desc[0][1], desc[0]), "FloatField"
- )
- @skipUnlessDBFeature("can_introspect_foreign_keys")
- def test_get_relations(self):
- with connection.cursor() as cursor:
- relations = connection.introspection.get_relations(
- cursor, Article._meta.db_table
- )
- # That's {field_name: (field_name_other_table, other_table)}
- expected_relations = {
- "reporter_id": ("id", Reporter._meta.db_table),
- "response_to_id": ("id", Article._meta.db_table),
- }
- self.assertEqual(relations, expected_relations)
- # Removing a field shouldn't disturb get_relations (#17785)
- body = Article._meta.get_field("body")
- with connection.schema_editor() as editor:
- editor.remove_field(Article, body)
- with connection.cursor() as cursor:
- relations = connection.introspection.get_relations(
- cursor, Article._meta.db_table
- )
- with connection.schema_editor() as editor:
- editor.add_field(Article, body)
- self.assertEqual(relations, expected_relations)
- def test_get_primary_key_column(self):
- with connection.cursor() as cursor:
- primary_key_column = connection.introspection.get_primary_key_column(
- cursor, Article._meta.db_table
- )
- pk_fk_column = connection.introspection.get_primary_key_column(
- cursor, District._meta.db_table
- )
- self.assertEqual(primary_key_column, "id")
- self.assertEqual(pk_fk_column, "city_id")
- def test_get_constraints_index_types(self):
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(
- cursor, Article._meta.db_table
- )
- index = {}
- index2 = {}
- for val in constraints.values():
- if val["columns"] == ["headline", "pub_date"]:
- index = val
- if val["columns"] == [
- "headline",
- "response_to_id",
- "pub_date",
- "reporter_id",
- ]:
- index2 = val
- self.assertEqual(index["type"], Index.suffix)
- self.assertEqual(index2["type"], Index.suffix)
- @skipUnlessDBFeature("supports_index_column_ordering")
- def test_get_constraints_indexes_orders(self):
- """
- Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
- """
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(
- cursor, Article._meta.db_table
- )
- indexes_verified = 0
- expected_columns = [
- ["headline", "pub_date"],
- ["headline", "response_to_id", "pub_date", "reporter_id"],
- ]
- if connection.features.indexes_foreign_keys:
- expected_columns += [
- ["reporter_id"],
- ["response_to_id"],
- ]
- for val in constraints.values():
- if val["index"] and not (val["primary_key"] or val["unique"]):
- self.assertIn(val["columns"], expected_columns)
- self.assertEqual(val["orders"], ["ASC"] * len(val["columns"]))
- indexes_verified += 1
- self.assertEqual(indexes_verified, len(expected_columns))
- @skipUnlessDBFeature("supports_index_column_ordering", "supports_partial_indexes")
- def test_get_constraints_unique_indexes_orders(self):
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(
- cursor,
- UniqueConstraintConditionModel._meta.db_table,
- )
- self.assertIn("cond_name_without_color_uniq", constraints)
- constraint = constraints["cond_name_without_color_uniq"]
- self.assertIs(constraint["unique"], True)
- self.assertEqual(constraint["columns"], ["name"])
- self.assertEqual(constraint["orders"], ["ASC"])
- def test_get_constraints(self):
- def assertDetails(
- details,
- cols,
- primary_key=False,
- unique=False,
- index=False,
- check=False,
- foreign_key=None,
- ):
- # Different backends have different values for same constraints:
- # PRIMARY KEY UNIQUE CONSTRAINT UNIQUE INDEX
- # MySQL pk=1 uniq=1 idx=1 pk=0 uniq=1 idx=1 pk=0 uniq=1 idx=1
- # PostgreSQL pk=1 uniq=1 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
- # SQLite pk=1 uniq=0 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
- if details["primary_key"]:
- details["unique"] = True
- if details["unique"]:
- details["index"] = False
- self.assertEqual(details["columns"], cols)
- self.assertEqual(details["primary_key"], primary_key)
- self.assertEqual(details["unique"], unique)
- self.assertEqual(details["index"], index)
- self.assertEqual(details["check"], check)
- self.assertEqual(details["foreign_key"], foreign_key)
- # Test custom constraints
- custom_constraints = {
- "article_email_pub_date_uniq",
- "email_pub_date_idx",
- }
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(
- cursor, Comment._meta.db_table
- )
- if (
- connection.features.supports_column_check_constraints
- and connection.features.can_introspect_check_constraints
- ):
- constraints.update(
- connection.introspection.get_constraints(
- cursor, CheckConstraintModel._meta.db_table
- )
- )
- custom_constraints.add("up_votes_gte_0_check")
- assertDetails(
- constraints["up_votes_gte_0_check"], ["up_votes"], check=True
- )
- assertDetails(
- constraints["article_email_pub_date_uniq"],
- ["article_id", "email", "pub_date"],
- unique=True,
- )
- assertDetails(
- constraints["email_pub_date_idx"], ["email", "pub_date"], index=True
- )
- # Test field constraints
- field_constraints = set()
- for name, details in constraints.items():
- if name in custom_constraints:
- continue
- elif details["columns"] == ["up_votes"] and details["check"]:
- assertDetails(details, ["up_votes"], check=True)
- field_constraints.add(name)
- elif details["columns"] == ["voting_number"] and details["check"]:
- assertDetails(details, ["voting_number"], check=True)
- field_constraints.add(name)
- elif details["columns"] == ["ref"] and details["unique"]:
- assertDetails(details, ["ref"], unique=True)
- field_constraints.add(name)
- elif details["columns"] == ["voting_number"] and details["unique"]:
- assertDetails(details, ["voting_number"], unique=True)
- field_constraints.add(name)
- elif details["columns"] == ["article_id"] and details["index"]:
- assertDetails(details, ["article_id"], index=True)
- field_constraints.add(name)
- elif details["columns"] == ["id"] and details["primary_key"]:
- assertDetails(details, ["id"], primary_key=True, unique=True)
- field_constraints.add(name)
- elif details["columns"] == ["article_id"] and details["foreign_key"]:
- assertDetails(
- details, ["article_id"], foreign_key=("introspection_article", "id")
- )
- field_constraints.add(name)
- elif details["check"]:
- # Some databases (e.g. Oracle) include additional check
- # constraints.
- field_constraints.add(name)
- # All constraints are accounted for.
- self.assertEqual(
- constraints.keys() ^ (custom_constraints | field_constraints), set()
- )
|