123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- from unittest import mock, skipUnless
- from django.db import connection
- from django.db.models import Index
- from django.db.utils import DatabaseError
- from django.test import TransactionTestCase, skipUnlessDBFeature
- from .models import Article, ArticleReporter, City, District, Reporter
- class IntrospectionTests(TransactionTestCase):
- available_apps = ['introspection']
- def test_table_names(self):
- tl = connection.introspection.table_names()
- self.assertEqual(tl, sorted(tl))
- self.assertIn(Reporter._meta.db_table, tl, "'%s' isn't in table_list()." % Reporter._meta.db_table)
- self.assertIn(Article._meta.db_table, tl, "'%s' isn't in table_list()." % Article._meta.db_table)
- def test_django_table_names(self):
- with connection.cursor() as cursor:
- cursor.execute('CREATE TABLE django_ixn_test_table (id INTEGER);')
- tl = connection.introspection.django_table_names()
- cursor.execute("DROP TABLE django_ixn_test_table;")
- self.assertNotIn('django_ixn_test_table', tl,
- "django_table_names() returned a non-Django table")
- def test_django_table_names_retval_type(self):
- # Table name is a list #15216
- tl = connection.introspection.django_table_names(only_existing=True)
- self.assertIs(type(tl), list)
- tl = connection.introspection.django_table_names(only_existing=False)
- self.assertIs(type(tl), list)
- def test_table_names_with_views(self):
- with connection.cursor() as cursor:
- try:
- cursor.execute(
- 'CREATE VIEW introspection_article_view AS SELECT headline '
- 'from introspection_article;')
- except DatabaseError as e:
- if 'insufficient privileges' in str(e):
- self.fail("The test user has no CREATE VIEW privileges")
- else:
- raise
- self.assertIn('introspection_article_view', connection.introspection.table_names(include_views=True))
- self.assertNotIn('introspection_article_view', connection.introspection.table_names())
- def test_unmanaged_through_model(self):
- tables = connection.introspection.django_table_names()
- self.assertNotIn(ArticleReporter._meta.db_table, tables)
- def test_installed_models(self):
- tables = [Article._meta.db_table, Reporter._meta.db_table]
- models = connection.introspection.installed_models(tables)
- self.assertEqual(models, {Article, Reporter})
- def test_sequence_list(self):
- sequences = connection.introspection.sequence_list()
- reporter_seqs = [seq for seq in sequences if seq['table'] == Reporter._meta.db_table]
- self.assertEqual(len(reporter_seqs), 1, 'Reporter sequence not found in sequence_list()')
- self.assertEqual(reporter_seqs[0]['column'], 'id')
- def test_get_table_description_names(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
- self.assertEqual([r[0] for r in desc],
- [f.column for f in Reporter._meta.fields])
- def test_get_table_description_types(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
- self.assertEqual(
- [datatype(r[1], r) for r in desc],
- ['AutoField' if connection.features.can_introspect_autofield else 'IntegerField',
- 'CharField', 'CharField', 'CharField',
- 'BigIntegerField' if connection.features.can_introspect_big_integer_field else 'IntegerField',
- 'BinaryField' if connection.features.can_introspect_binary_field else 'TextField',
- 'SmallIntegerField' if connection.features.can_introspect_small_integer_field else 'IntegerField']
- )
- def test_get_table_description_col_lengths(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
- self.assertEqual(
- [r[3] for r in desc if datatype(r[1], r) == 'CharField'],
- [30, 30, 254]
- )
- @skipUnlessDBFeature('can_introspect_null')
- def test_get_table_description_nullable(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(cursor, Reporter._meta.db_table)
- nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
- self.assertEqual(
- [r[6] for r in desc],
- [False, nullable_by_backend, nullable_by_backend, nullable_by_backend, True, True, False]
- )
- @skipUnlessDBFeature('can_introspect_autofield')
- def test_bigautofield(self):
- with connection.cursor() as cursor:
- desc = connection.introspection.get_table_description(cursor, City._meta.db_table)
- self.assertIn('BigAutoField', [datatype(r[1], r) for r in desc])
- # Regression test for #9991 - 'real' types in postgres
- @skipUnlessDBFeature('has_real_datatype')
- def test_postgresql_real_type(self):
- with connection.cursor() as cursor:
- cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
- desc = connection.introspection.get_table_description(cursor, 'django_ixn_real_test_table')
- cursor.execute('DROP TABLE django_ixn_real_test_table;')
- self.assertEqual(datatype(desc[0][1], desc[0]), 'FloatField')
- @skipUnlessDBFeature('can_introspect_foreign_keys')
- def test_get_relations(self):
- with connection.cursor() as cursor:
- relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
- # That's {field_name: (field_name_other_table, other_table)}
- expected_relations = {
- 'reporter_id': ('id', Reporter._meta.db_table),
- 'response_to_id': ('id', Article._meta.db_table),
- }
- self.assertEqual(relations, expected_relations)
- # Removing a field shouldn't disturb get_relations (#17785)
- body = Article._meta.get_field('body')
- with connection.schema_editor() as editor:
- editor.remove_field(Article, body)
- with connection.cursor() as cursor:
- relations = connection.introspection.get_relations(cursor, Article._meta.db_table)
- with connection.schema_editor() as editor:
- editor.add_field(Article, body)
- self.assertEqual(relations, expected_relations)
- @skipUnless(connection.vendor == 'sqlite', "This is an sqlite-specific issue")
- def test_get_relations_alt_format(self):
- """
- With SQLite, foreign keys can be added with different syntaxes and
- formatting.
- """
- create_table_statements = [
- "CREATE TABLE track(id, art_id INTEGER, FOREIGN KEY(art_id) REFERENCES {}(id));",
- "CREATE TABLE track(id, art_id INTEGER, FOREIGN KEY (art_id) REFERENCES {}(id));"
- ]
- for statement in create_table_statements:
- with connection.cursor() as cursor:
- cursor.fetchone = mock.Mock(return_value=[statement.format(Article._meta.db_table)])
- relations = connection.introspection.get_relations(cursor, 'mocked_table')
- self.assertEqual(relations, {'art_id': ('id', Article._meta.db_table)})
- @skipUnlessDBFeature('can_introspect_foreign_keys')
- def test_get_key_columns(self):
- with connection.cursor() as cursor:
- key_columns = connection.introspection.get_key_columns(cursor, Article._meta.db_table)
- self.assertEqual(
- set(key_columns),
- {('reporter_id', Reporter._meta.db_table, 'id'),
- ('response_to_id', Article._meta.db_table, 'id')})
- def test_get_primary_key_column(self):
- with connection.cursor() as cursor:
- primary_key_column = connection.introspection.get_primary_key_column(cursor, Article._meta.db_table)
- pk_fk_column = connection.introspection.get_primary_key_column(cursor, District._meta.db_table)
- self.assertEqual(primary_key_column, 'id')
- self.assertEqual(pk_fk_column, 'city_id')
- def test_get_constraints_index_types(self):
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(cursor, Article._meta.db_table)
- index = {}
- index2 = {}
- for key, val in constraints.items():
- if val['columns'] == ['headline', 'pub_date']:
- index = val
- if val['columns'] == ['headline', 'response_to_id', 'pub_date', 'reporter_id']:
- index2 = val
- self.assertEqual(index['type'], Index.suffix)
- self.assertEqual(index2['type'], Index.suffix)
- @skipUnlessDBFeature('supports_index_column_ordering')
- def test_get_constraints_indexes_orders(self):
- """
- Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
- """
- with connection.cursor() as cursor:
- constraints = connection.introspection.get_constraints(cursor, Article._meta.db_table)
- indexes_verified = 0
- expected_columns = [
- ['reporter_id'],
- ['headline', 'pub_date'],
- ['response_to_id'],
- ['headline', 'response_to_id', 'pub_date', 'reporter_id'],
- ]
- for key, val in constraints.items():
- if val['index'] and not (val['primary_key'] or val['unique']):
- self.assertIn(val['columns'], expected_columns)
- self.assertEqual(val['orders'], ['ASC'] * len(val['columns']))
- indexes_verified += 1
- self.assertEqual(indexes_verified, 4)
- def datatype(dbtype, description):
- """Helper to convert a data type into a string."""
- dt = connection.introspection.get_field_type(dbtype, description)
- if type(dt) is tuple:
- return dt[0]
- else:
- return dt
|