tests.py 16 KB


  1. from django.db import DatabaseError, connection
  2. from django.db.models import Index
  3. from django.test import TransactionTestCase, skipUnlessDBFeature
  4. from .models import (
  5. Article,
  6. ArticleReporter,
  7. CheckConstraintModel,
  8. City,
  9. Comment,
  10. Country,
  11. DbCommentModel,
  12. District,
  13. Reporter,
  14. UniqueConstraintConditionModel,
  15. )
  16. class IntrospectionTests(TransactionTestCase):
  17. available_apps = ["introspection"]
  18. def test_table_names(self):
  19. tl = connection.introspection.table_names()
  20. self.assertEqual(tl, sorted(tl))
  21. self.assertIn(
  22. Reporter._meta.db_table,
  23. tl,
  24. "'%s' isn't in table_list()." % Reporter._meta.db_table,
  25. )
  26. self.assertIn(
  27. Article._meta.db_table,
  28. tl,
  29. "'%s' isn't in table_list()." % Article._meta.db_table,
  30. )
  31. def test_django_table_names(self):
  32. with connection.cursor() as cursor:
  33. cursor.execute("CREATE TABLE django_ixn_test_table (id INTEGER);")
  34. tl = connection.introspection.django_table_names()
  35. cursor.execute("DROP TABLE django_ixn_test_table;")
  36. self.assertNotIn(
  37. "django_ixn_test_table",
  38. tl,
  39. "django_table_names() returned a non-Django table",
  40. )
  41. def test_django_table_names_retval_type(self):
  42. # Table name is a list #15216
  43. tl = connection.introspection.django_table_names(only_existing=True)
  44. self.assertIs(type(tl), list)
  45. tl = connection.introspection.django_table_names(only_existing=False)
  46. self.assertIs(type(tl), list)
  47. def test_table_names_with_views(self):
  48. with connection.cursor() as cursor:
  49. try:
  50. cursor.execute(
  51. "CREATE VIEW introspection_article_view AS SELECT headline "
  52. "from introspection_article;"
  53. )
  54. except DatabaseError as e:
  55. if "insufficient privileges" in str(e):
  56. self.fail("The test user has no CREATE VIEW privileges")
  57. else:
  58. raise
  59. try:
  60. self.assertIn(
  61. "introspection_article_view",
  62. connection.introspection.table_names(include_views=True),
  63. )
  64. self.assertNotIn(
  65. "introspection_article_view", connection.introspection.table_names()
  66. )
  67. finally:
  68. with connection.cursor() as cursor:
  69. cursor.execute("DROP VIEW introspection_article_view")
  70. def test_unmanaged_through_model(self):
  71. tables = connection.introspection.django_table_names()
  72. self.assertNotIn(ArticleReporter._meta.db_table, tables)
  73. def test_installed_models(self):
  74. tables = [Article._meta.db_table, Reporter._meta.db_table]
  75. models = connection.introspection.installed_models(tables)
  76. self.assertEqual(models, {Article, Reporter})
  77. def test_sequence_list(self):
  78. sequences = connection.introspection.sequence_list()
  79. reporter_seqs = [
  80. seq for seq in sequences if seq["table"] == Reporter._meta.db_table
  81. ]
  82. self.assertEqual(
  83. len(reporter_seqs), 1, "Reporter sequence not found in sequence_list()"
  84. )
  85. self.assertEqual(reporter_seqs[0]["column"], "id")
  86. def test_get_table_description_names(self):
  87. with connection.cursor() as cursor:
  88. desc = connection.introspection.get_table_description(
  89. cursor, Reporter._meta.db_table
  90. )
  91. self.assertEqual(
  92. [r[0] for r in desc], [f.column for f in Reporter._meta.fields]
  93. )
  94. def test_get_table_description_types(self):
  95. with connection.cursor() as cursor:
  96. desc = connection.introspection.get_table_description(
  97. cursor, Reporter._meta.db_table
  98. )
  99. self.assertEqual(
  100. [connection.introspection.get_field_type(r[1], r) for r in desc],
  101. [
  102. connection.features.introspected_field_types[field]
  103. for field in (
  104. "AutoField",
  105. "CharField",
  106. "CharField",
  107. "CharField",
  108. "BigIntegerField",
  109. "BinaryField",
  110. "SmallIntegerField",
  111. "DurationField",
  112. )
  113. ],
  114. )
  115. def test_get_table_description_col_lengths(self):
  116. with connection.cursor() as cursor:
  117. desc = connection.introspection.get_table_description(
  118. cursor, Reporter._meta.db_table
  119. )
  120. self.assertEqual(
  121. [
  122. r[2]
  123. for r in desc
  124. if connection.introspection.get_field_type(r[1], r) == "CharField"
  125. ],
  126. [30, 30, 254],
  127. )
  128. def test_get_table_description_nullable(self):
  129. with connection.cursor() as cursor:
  130. desc = connection.introspection.get_table_description(
  131. cursor, Reporter._meta.db_table
  132. )
  133. nullable_by_backend = connection.features.interprets_empty_strings_as_nulls
  134. self.assertEqual(
  135. [r[6] for r in desc],
  136. [
  137. False,
  138. nullable_by_backend,
  139. nullable_by_backend,
  140. nullable_by_backend,
  141. True,
  142. True,
  143. False,
  144. False,
  145. ],
  146. )
  147. def test_bigautofield(self):
  148. with connection.cursor() as cursor:
  149. desc = connection.introspection.get_table_description(
  150. cursor, City._meta.db_table
  151. )
  152. self.assertIn(
  153. connection.features.introspected_field_types["BigAutoField"],
  154. [connection.introspection.get_field_type(r[1], r) for r in desc],
  155. )
  156. def test_smallautofield(self):
  157. with connection.cursor() as cursor:
  158. desc = connection.introspection.get_table_description(
  159. cursor, Country._meta.db_table
  160. )
  161. self.assertIn(
  162. connection.features.introspected_field_types["SmallAutoField"],
  163. [connection.introspection.get_field_type(r[1], r) for r in desc],
  164. )
  165. @skipUnlessDBFeature("supports_comments")
  166. def test_db_comments(self):
  167. with connection.cursor() as cursor:
  168. desc = connection.introspection.get_table_description(
  169. cursor, DbCommentModel._meta.db_table
  170. )
  171. table_list = connection.introspection.get_table_list(cursor)
  172. self.assertEqual(
  173. ["'Name' column comment"],
  174. [field.comment for field in desc if field.name == "name"],
  175. )
  176. self.assertEqual(
  177. ["Custom table comment"],
  178. [
  179. table.comment
  180. for table in table_list
  181. if table.name == "introspection_dbcommentmodel"
  182. ],
  183. )
  184. # Regression test for #9991 - 'real' types in postgres
  185. @skipUnlessDBFeature("has_real_datatype")
  186. def test_postgresql_real_type(self):
  187. with connection.cursor() as cursor:
  188. cursor.execute("CREATE TABLE django_ixn_real_test_table (number REAL);")
  189. desc = connection.introspection.get_table_description(
  190. cursor, "django_ixn_real_test_table"
  191. )
  192. cursor.execute("DROP TABLE django_ixn_real_test_table;")
  193. self.assertEqual(
  194. connection.introspection.get_field_type(desc[0][1], desc[0]), "FloatField"
  195. )
  196. @skipUnlessDBFeature("can_introspect_foreign_keys")
  197. def test_get_relations(self):
  198. with connection.cursor() as cursor:
  199. relations = connection.introspection.get_relations(
  200. cursor, Article._meta.db_table
  201. )
  202. # That's {field_name: (field_name_other_table, other_table)}
  203. expected_relations = {
  204. "reporter_id": ("id", Reporter._meta.db_table),
  205. "response_to_id": ("id", Article._meta.db_table),
  206. }
  207. self.assertEqual(relations, expected_relations)
  208. # Removing a field shouldn't disturb get_relations (#17785)
  209. body = Article._meta.get_field("body")
  210. with connection.schema_editor() as editor:
  211. editor.remove_field(Article, body)
  212. with connection.cursor() as cursor:
  213. relations = connection.introspection.get_relations(
  214. cursor, Article._meta.db_table
  215. )
  216. with connection.schema_editor() as editor:
  217. editor.add_field(Article, body)
  218. self.assertEqual(relations, expected_relations)
  219. def test_get_primary_key_column(self):
  220. with connection.cursor() as cursor:
  221. primary_key_column = connection.introspection.get_primary_key_column(
  222. cursor, Article._meta.db_table
  223. )
  224. pk_fk_column = connection.introspection.get_primary_key_column(
  225. cursor, District._meta.db_table
  226. )
  227. self.assertEqual(primary_key_column, "id")
  228. self.assertEqual(pk_fk_column, "city_id")
  229. def test_get_constraints_index_types(self):
  230. with connection.cursor() as cursor:
  231. constraints = connection.introspection.get_constraints(
  232. cursor, Article._meta.db_table
  233. )
  234. index = {}
  235. index2 = {}
  236. for val in constraints.values():
  237. if val["columns"] == ["headline", "pub_date"]:
  238. index = val
  239. if val["columns"] == [
  240. "headline",
  241. "response_to_id",
  242. "pub_date",
  243. "reporter_id",
  244. ]:
  245. index2 = val
  246. self.assertEqual(index["type"], Index.suffix)
  247. self.assertEqual(index2["type"], Index.suffix)
  248. @skipUnlessDBFeature("supports_index_column_ordering")
  249. def test_get_constraints_indexes_orders(self):
  250. """
  251. Indexes have the 'orders' key with a list of 'ASC'/'DESC' values.
  252. """
  253. with connection.cursor() as cursor:
  254. constraints = connection.introspection.get_constraints(
  255. cursor, Article._meta.db_table
  256. )
  257. indexes_verified = 0
  258. expected_columns = [
  259. ["headline", "pub_date"],
  260. ["headline", "response_to_id", "pub_date", "reporter_id"],
  261. ]
  262. if connection.features.indexes_foreign_keys:
  263. expected_columns += [
  264. ["reporter_id"],
  265. ["response_to_id"],
  266. ]
  267. for val in constraints.values():
  268. if val["index"] and not (val["primary_key"] or val["unique"]):
  269. self.assertIn(val["columns"], expected_columns)
  270. self.assertEqual(val["orders"], ["ASC"] * len(val["columns"]))
  271. indexes_verified += 1
  272. self.assertEqual(indexes_verified, len(expected_columns))
  273. @skipUnlessDBFeature("supports_index_column_ordering", "supports_partial_indexes")
  274. def test_get_constraints_unique_indexes_orders(self):
  275. with connection.cursor() as cursor:
  276. constraints = connection.introspection.get_constraints(
  277. cursor,
  278. UniqueConstraintConditionModel._meta.db_table,
  279. )
  280. self.assertIn("cond_name_without_color_uniq", constraints)
  281. constraint = constraints["cond_name_without_color_uniq"]
  282. self.assertIs(constraint["unique"], True)
  283. self.assertEqual(constraint["columns"], ["name"])
  284. self.assertEqual(constraint["orders"], ["ASC"])
  285. def test_get_constraints(self):
  286. def assertDetails(
  287. details,
  288. cols,
  289. primary_key=False,
  290. unique=False,
  291. index=False,
  292. check=False,
  293. foreign_key=None,
  294. ):
  295. # Different backends have different values for same constraints:
  296. # PRIMARY KEY UNIQUE CONSTRAINT UNIQUE INDEX
  297. # MySQL pk=1 uniq=1 idx=1 pk=0 uniq=1 idx=1 pk=0 uniq=1 idx=1
  298. # PostgreSQL pk=1 uniq=1 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
  299. # SQLite pk=1 uniq=0 idx=0 pk=0 uniq=1 idx=0 pk=0 uniq=1 idx=1
  300. if details["primary_key"]:
  301. details["unique"] = True
  302. if details["unique"]:
  303. details["index"] = False
  304. self.assertEqual(details["columns"], cols)
  305. self.assertEqual(details["primary_key"], primary_key)
  306. self.assertEqual(details["unique"], unique)
  307. self.assertEqual(details["index"], index)
  308. self.assertEqual(details["check"], check)
  309. self.assertEqual(details["foreign_key"], foreign_key)
  310. # Test custom constraints
  311. custom_constraints = {
  312. "article_email_pub_date_uniq",
  313. "email_pub_date_idx",
  314. }
  315. with connection.cursor() as cursor:
  316. constraints = connection.introspection.get_constraints(
  317. cursor, Comment._meta.db_table
  318. )
  319. if (
  320. connection.features.supports_column_check_constraints
  321. and connection.features.can_introspect_check_constraints
  322. ):
  323. constraints.update(
  324. connection.introspection.get_constraints(
  325. cursor, CheckConstraintModel._meta.db_table
  326. )
  327. )
  328. custom_constraints.add("up_votes_gte_0_check")
  329. assertDetails(
  330. constraints["up_votes_gte_0_check"], ["up_votes"], check=True
  331. )
  332. assertDetails(
  333. constraints["article_email_pub_date_uniq"],
  334. ["article_id", "email", "pub_date"],
  335. unique=True,
  336. )
  337. assertDetails(
  338. constraints["email_pub_date_idx"], ["email", "pub_date"], index=True
  339. )
  340. # Test field constraints
  341. field_constraints = set()
  342. for name, details in constraints.items():
  343. if name in custom_constraints:
  344. continue
  345. elif details["columns"] == ["up_votes"] and details["check"]:
  346. assertDetails(details, ["up_votes"], check=True)
  347. field_constraints.add(name)
  348. elif details["columns"] == ["voting_number"] and details["check"]:
  349. assertDetails(details, ["voting_number"], check=True)
  350. field_constraints.add(name)
  351. elif details["columns"] == ["ref"] and details["unique"]:
  352. assertDetails(details, ["ref"], unique=True)
  353. field_constraints.add(name)
  354. elif details["columns"] == ["voting_number"] and details["unique"]:
  355. assertDetails(details, ["voting_number"], unique=True)
  356. field_constraints.add(name)
  357. elif details["columns"] == ["article_id"] and details["index"]:
  358. assertDetails(details, ["article_id"], index=True)
  359. field_constraints.add(name)
  360. elif details["columns"] == ["id"] and details["primary_key"]:
  361. assertDetails(details, ["id"], primary_key=True, unique=True)
  362. field_constraints.add(name)
  363. elif details["columns"] == ["article_id"] and details["foreign_key"]:
  364. assertDetails(
  365. details, ["article_id"], foreign_key=("introspection_article", "id")
  366. )
  367. field_constraints.add(name)
  368. elif details["check"]:
  369. # Some databases (e.g. Oracle) include additional check
  370. # constraints.
  371. field_constraints.add(name)
  372. # All constraints are accounted for.
  373. self.assertEqual(
  374. constraints.keys() ^ (custom_constraints | field_constraints), set()
  375. )