tests.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. import os
  2. import re
  3. from io import StringIO
  4. from unittest import mock, skipUnless
  5. from django.core.management import call_command
  6. from django.db import connection
  7. from django.db.backends.base.introspection import TableInfo
  8. from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
  9. from .models import PeopleMoreData, test_collation
  10. def inspectdb_tables_only(table_name):
  11. """
  12. Limit introspection to tables created for models of this app.
  13. Some databases such as Oracle are extremely slow at introspection.
  14. """
  15. return table_name.startswith("inspectdb_")
  16. def inspectdb_views_only(table_name):
  17. return table_name.startswith("inspectdb_") and table_name.endswith(
  18. ("_materialized", "_view")
  19. )
  20. def special_table_only(table_name):
  21. return table_name.startswith("inspectdb_special")
  22. class InspectDBTestCase(TestCase):
  23. unique_re = re.compile(r".*unique_together = \((.+),\).*")
  24. def test_stealth_table_name_filter_option(self):
  25. out = StringIO()
  26. call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
  27. error_message = (
  28. "inspectdb has examined a table that should have been filtered out."
  29. )
  30. # contrib.contenttypes is one of the apps always installed when running
  31. # the Django test suite, check that one of its tables hasn't been
  32. # inspected
  33. self.assertNotIn(
  34. "class DjangoContentType(models.Model):", out.getvalue(), msg=error_message
  35. )
  36. def test_table_option(self):
  37. """
  38. inspectdb can inspect a subset of tables by passing the table names as
  39. arguments.
  40. """
  41. out = StringIO()
  42. call_command("inspectdb", "inspectdb_people", stdout=out)
  43. output = out.getvalue()
  44. self.assertIn("class InspectdbPeople(models.Model):", output)
  45. self.assertNotIn("InspectdbPeopledata", output)
  46. def make_field_type_asserter(self):
  47. """
  48. Call inspectdb and return a function to validate a field type in its
  49. output.
  50. """
  51. out = StringIO()
  52. call_command("inspectdb", "inspectdb_columntypes", stdout=out)
  53. output = out.getvalue()
  54. def assertFieldType(name, definition):
  55. out_def = re.search(r"^\s*%s = (models.*)$" % name, output, re.MULTILINE)[1]
  56. self.assertEqual(definition, out_def)
  57. return assertFieldType
  58. def test_field_types(self):
  59. """Test introspection of various Django field types"""
  60. assertFieldType = self.make_field_type_asserter()
  61. introspected_field_types = connection.features.introspected_field_types
  62. char_field_type = introspected_field_types["CharField"]
  63. # Inspecting Oracle DB doesn't produce correct results (#19884):
  64. # - it reports fields as blank=True when they aren't.
  65. if (
  66. not connection.features.interprets_empty_strings_as_nulls
  67. and char_field_type == "CharField"
  68. ):
  69. assertFieldType("char_field", "models.CharField(max_length=10)")
  70. assertFieldType(
  71. "null_char_field",
  72. "models.CharField(max_length=10, blank=True, null=True)",
  73. )
  74. assertFieldType("email_field", "models.CharField(max_length=254)")
  75. assertFieldType("file_field", "models.CharField(max_length=100)")
  76. assertFieldType("file_path_field", "models.CharField(max_length=100)")
  77. assertFieldType("slug_field", "models.CharField(max_length=50)")
  78. assertFieldType("text_field", "models.TextField()")
  79. assertFieldType("url_field", "models.CharField(max_length=200)")
  80. if char_field_type == "TextField":
  81. assertFieldType("char_field", "models.TextField()")
  82. assertFieldType(
  83. "null_char_field", "models.TextField(blank=True, null=True)"
  84. )
  85. assertFieldType("email_field", "models.TextField()")
  86. assertFieldType("file_field", "models.TextField()")
  87. assertFieldType("file_path_field", "models.TextField()")
  88. assertFieldType("slug_field", "models.TextField()")
  89. assertFieldType("text_field", "models.TextField()")
  90. assertFieldType("url_field", "models.TextField()")
  91. assertFieldType("date_field", "models.DateField()")
  92. assertFieldType("date_time_field", "models.DateTimeField()")
  93. if introspected_field_types["GenericIPAddressField"] == "GenericIPAddressField":
  94. assertFieldType("gen_ip_address_field", "models.GenericIPAddressField()")
  95. elif not connection.features.interprets_empty_strings_as_nulls:
  96. assertFieldType("gen_ip_address_field", "models.CharField(max_length=39)")
  97. assertFieldType(
  98. "time_field", "models.%s()" % introspected_field_types["TimeField"]
  99. )
  100. if connection.features.has_native_uuid_field:
  101. assertFieldType("uuid_field", "models.UUIDField()")
  102. elif not connection.features.interprets_empty_strings_as_nulls:
  103. assertFieldType("uuid_field", "models.CharField(max_length=32)")
  104. @skipUnlessDBFeature("can_introspect_json_field", "supports_json_field")
  105. def test_json_field(self):
  106. out = StringIO()
  107. call_command("inspectdb", "inspectdb_jsonfieldcolumntype", stdout=out)
  108. output = out.getvalue()
  109. if not connection.features.interprets_empty_strings_as_nulls:
  110. self.assertIn("json_field = models.JSONField()", output)
  111. self.assertIn(
  112. "null_json_field = models.JSONField(blank=True, null=True)", output
  113. )
  114. @skipUnlessDBFeature("supports_comments")
  115. def test_db_comments(self):
  116. out = StringIO()
  117. call_command("inspectdb", "inspectdb_dbcomment", stdout=out)
  118. output = out.getvalue()
  119. integer_field_type = connection.features.introspected_field_types[
  120. "IntegerField"
  121. ]
  122. self.assertIn(
  123. f"rank = models.{integer_field_type}("
  124. f"db_comment=\"'Rank' column comment\")",
  125. output,
  126. )
  127. self.assertIn(
  128. " db_table_comment = 'Custom table comment'",
  129. output,
  130. )
  131. @skipUnlessDBFeature("supports_collation_on_charfield")
  132. @skipUnless(test_collation, "Language collations are not supported.")
  133. def test_char_field_db_collation(self):
  134. out = StringIO()
  135. call_command("inspectdb", "inspectdb_charfielddbcollation", stdout=out)
  136. output = out.getvalue()
  137. if not connection.features.interprets_empty_strings_as_nulls:
  138. self.assertIn(
  139. "char_field = models.CharField(max_length=10, "
  140. "db_collation='%s')" % test_collation,
  141. output,
  142. )
  143. else:
  144. self.assertIn(
  145. "char_field = models.CharField(max_length=10, "
  146. "db_collation='%s', blank=True, null=True)" % test_collation,
  147. output,
  148. )
  149. @skipUnlessDBFeature("supports_collation_on_textfield")
  150. @skipUnless(test_collation, "Language collations are not supported.")
  151. def test_text_field_db_collation(self):
  152. out = StringIO()
  153. call_command("inspectdb", "inspectdb_textfielddbcollation", stdout=out)
  154. output = out.getvalue()
  155. if not connection.features.interprets_empty_strings_as_nulls:
  156. self.assertIn(
  157. "text_field = models.TextField(db_collation='%s')" % test_collation,
  158. output,
  159. )
  160. else:
  161. self.assertIn(
  162. "text_field = models.TextField(db_collation='%s, blank=True, "
  163. "null=True)" % test_collation,
  164. output,
  165. )
  166. @skipUnlessDBFeature("supports_unlimited_charfield")
  167. def test_char_field_unlimited(self):
  168. out = StringIO()
  169. call_command("inspectdb", "inspectdb_charfieldunlimited", stdout=out)
  170. output = out.getvalue()
  171. self.assertIn("char_field = models.CharField()", output)
  172. def test_number_field_types(self):
  173. """Test introspection of various Django field types"""
  174. assertFieldType = self.make_field_type_asserter()
  175. introspected_field_types = connection.features.introspected_field_types
  176. auto_field_type = connection.features.introspected_field_types["AutoField"]
  177. if auto_field_type != "AutoField":
  178. assertFieldType(
  179. "id", "models.%s(primary_key=True) # AutoField?" % auto_field_type
  180. )
  181. assertFieldType(
  182. "big_int_field", "models.%s()" % introspected_field_types["BigIntegerField"]
  183. )
  184. bool_field_type = introspected_field_types["BooleanField"]
  185. assertFieldType("bool_field", "models.{}()".format(bool_field_type))
  186. assertFieldType(
  187. "null_bool_field",
  188. "models.{}(blank=True, null=True)".format(bool_field_type),
  189. )
  190. if connection.vendor != "sqlite":
  191. assertFieldType(
  192. "decimal_field", "models.DecimalField(max_digits=6, decimal_places=1)"
  193. )
  194. else: # Guessed arguments on SQLite, see #5014
  195. assertFieldType(
  196. "decimal_field",
  197. "models.DecimalField(max_digits=10, decimal_places=5) "
  198. "# max_digits and decimal_places have been guessed, "
  199. "as this database handles decimal fields as float",
  200. )
  201. assertFieldType("float_field", "models.FloatField()")
  202. assertFieldType(
  203. "int_field", "models.%s()" % introspected_field_types["IntegerField"]
  204. )
  205. assertFieldType(
  206. "pos_int_field",
  207. "models.%s()" % introspected_field_types["PositiveIntegerField"],
  208. )
  209. assertFieldType(
  210. "pos_big_int_field",
  211. "models.%s()" % introspected_field_types["PositiveBigIntegerField"],
  212. )
  213. assertFieldType(
  214. "pos_small_int_field",
  215. "models.%s()" % introspected_field_types["PositiveSmallIntegerField"],
  216. )
  217. assertFieldType(
  218. "small_int_field",
  219. "models.%s()" % introspected_field_types["SmallIntegerField"],
  220. )
  221. @skipUnlessDBFeature("can_introspect_foreign_keys")
  222. def test_attribute_name_not_python_keyword(self):
  223. out = StringIO()
  224. call_command("inspectdb", table_name_filter=inspectdb_tables_only, stdout=out)
  225. output = out.getvalue()
  226. error_message = (
  227. "inspectdb generated an attribute name which is a Python keyword"
  228. )
  229. # Recursive foreign keys should be set to 'self'
  230. self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
  231. self.assertNotIn(
  232. "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
  233. output,
  234. msg=error_message,
  235. )
  236. # As InspectdbPeople model is defined after InspectdbMessage, it should
  237. # be quoted.
  238. self.assertIn(
  239. "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
  240. "db_column='from_id')",
  241. output,
  242. )
  243. self.assertIn(
  244. "people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, "
  245. "primary_key=True)",
  246. output,
  247. )
  248. self.assertIn(
  249. "people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)",
  250. output,
  251. )
  252. @skipUnlessDBFeature("can_introspect_foreign_keys")
  253. def test_foreign_key_to_field(self):
  254. out = StringIO()
  255. call_command("inspectdb", "inspectdb_foreignkeytofield", stdout=out)
  256. self.assertIn(
  257. "to_field_fk = models.ForeignKey('InspectdbPeoplemoredata', "
  258. "models.DO_NOTHING, to_field='people_unique_id')",
  259. out.getvalue(),
  260. )
  261. def test_digits_column_name_introspection(self):
  262. """Introspection of column names consist/start with digits (#16536/#17676)"""
  263. char_field_type = connection.features.introspected_field_types["CharField"]
  264. out = StringIO()
  265. call_command("inspectdb", "inspectdb_digitsincolumnname", stdout=out)
  266. output = out.getvalue()
  267. error_message = "inspectdb generated a model field name which is a number"
  268. self.assertNotIn(
  269. " 123 = models.%s" % char_field_type, output, msg=error_message
  270. )
  271. self.assertIn("number_123 = models.%s" % char_field_type, output)
  272. error_message = (
  273. "inspectdb generated a model field name which starts with a digit"
  274. )
  275. self.assertNotIn(
  276. " 4extra = models.%s" % char_field_type, output, msg=error_message
  277. )
  278. self.assertIn("number_4extra = models.%s" % char_field_type, output)
  279. self.assertNotIn(
  280. " 45extra = models.%s" % char_field_type, output, msg=error_message
  281. )
  282. self.assertIn("number_45extra = models.%s" % char_field_type, output)
  283. def test_special_column_name_introspection(self):
  284. """
  285. Introspection of column names containing special characters,
  286. unsuitable for Python identifiers
  287. """
  288. out = StringIO()
  289. call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
  290. output = out.getvalue()
  291. base_name = connection.introspection.identifier_converter("Field")
  292. integer_field_type = connection.features.introspected_field_types[
  293. "IntegerField"
  294. ]
  295. self.assertIn("field = models.%s()" % integer_field_type, output)
  296. self.assertIn(
  297. "field_field = models.%s(db_column='%s_')"
  298. % (integer_field_type, base_name),
  299. output,
  300. )
  301. self.assertIn(
  302. "field_field_0 = models.%s(db_column='%s__')"
  303. % (integer_field_type, base_name),
  304. output,
  305. )
  306. self.assertIn(
  307. "field_field_1 = models.%s(db_column='__field')" % integer_field_type,
  308. output,
  309. )
  310. self.assertIn(
  311. "prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output
  312. )
  313. self.assertIn("tamaño = models.%s()" % integer_field_type, output)
  314. def test_table_name_introspection(self):
  315. """
  316. Introspection of table names containing special characters,
  317. unsuitable for Python identifiers
  318. """
  319. out = StringIO()
  320. call_command("inspectdb", table_name_filter=special_table_only, stdout=out)
  321. output = out.getvalue()
  322. self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
  323. @skipUnlessDBFeature("supports_expression_indexes")
  324. def test_table_with_func_unique_constraint(self):
  325. out = StringIO()
  326. call_command("inspectdb", "inspectdb_funcuniqueconstraint", stdout=out)
  327. output = out.getvalue()
  328. self.assertIn("class InspectdbFuncuniqueconstraint(models.Model):", output)
  329. def test_managed_models(self):
  330. """
  331. By default the command generates models with `Meta.managed = False`.
  332. """
  333. out = StringIO()
  334. call_command("inspectdb", "inspectdb_columntypes", stdout=out)
  335. output = out.getvalue()
  336. self.longMessage = False
  337. self.assertIn(
  338. " managed = False",
  339. output,
  340. msg="inspectdb should generate unmanaged models.",
  341. )
  342. def test_unique_together_meta(self):
  343. out = StringIO()
  344. call_command("inspectdb", "inspectdb_uniquetogether", stdout=out)
  345. output = out.getvalue()
  346. self.assertIn(" unique_together = (('", output)
  347. unique_together_match = self.unique_re.findall(output)
  348. # There should be one unique_together tuple.
  349. self.assertEqual(len(unique_together_match), 1)
  350. fields = unique_together_match[0]
  351. # Fields with db_column = field name.
  352. self.assertIn("('field1', 'field2')", fields)
  353. # Fields from columns whose names are Python keywords.
  354. self.assertIn("('field1', 'field2')", fields)
  355. # Fields whose names normalize to the same Python field name and hence
  356. # are given an integer suffix.
  357. self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
  358. @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
  359. def test_unsupported_unique_together(self):
  360. """Unsupported index types (COALESCE here) are skipped."""
  361. with connection.cursor() as c:
  362. c.execute(
  363. "CREATE UNIQUE INDEX Findex ON %s "
  364. "(id, people_unique_id, COALESCE(message_id, -1))"
  365. % PeopleMoreData._meta.db_table
  366. )
  367. try:
  368. out = StringIO()
  369. call_command(
  370. "inspectdb",
  371. table_name_filter=lambda tn: tn.startswith(
  372. PeopleMoreData._meta.db_table
  373. ),
  374. stdout=out,
  375. )
  376. output = out.getvalue()
  377. self.assertIn("# A unique constraint could not be introspected.", output)
  378. self.assertEqual(
  379. self.unique_re.findall(output), ["('id', 'people_unique')"]
  380. )
  381. finally:
  382. with connection.cursor() as c:
  383. c.execute("DROP INDEX Findex")
  384. @skipUnless(
  385. connection.vendor == "sqlite",
  386. "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test",
  387. )
  388. def test_custom_fields(self):
  389. """
  390. Introspection of columns with a custom field (#21090)
  391. """
  392. out = StringIO()
  393. with mock.patch(
  394. "django.db.connection.introspection.data_types_reverse."
  395. "base_data_types_reverse",
  396. {
  397. "text": "myfields.TextField",
  398. "bigint": "BigIntegerField",
  399. },
  400. ):
  401. call_command("inspectdb", "inspectdb_columntypes", stdout=out)
  402. output = out.getvalue()
  403. self.assertIn("text_field = myfields.TextField()", output)
  404. self.assertIn("big_int_field = models.BigIntegerField()", output)
  405. def test_introspection_errors(self):
  406. """
  407. Introspection errors should not crash the command, and the error should
  408. be visible in the output.
  409. """
  410. out = StringIO()
  411. with mock.patch(
  412. "django.db.connection.introspection.get_table_list",
  413. return_value=[TableInfo(name="nonexistent", type="t")],
  414. ):
  415. call_command("inspectdb", stdout=out)
  416. output = out.getvalue()
  417. self.assertIn("# Unable to inspect table 'nonexistent'", output)
  418. # The error message depends on the backend
  419. self.assertIn("# The error was:", output)
  420. def test_same_relations(self):
  421. out = StringIO()
  422. call_command("inspectdb", "inspectdb_message", stdout=out)
  423. self.assertIn(
  424. "author = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, "
  425. "related_name='inspectdbmessage_author_set')",
  426. out.getvalue(),
  427. )
  428. class InspectDBTransactionalTests(TransactionTestCase):
  429. available_apps = ["inspectdb"]
  430. def test_include_views(self):
  431. """inspectdb --include-views creates models for database views."""
  432. with connection.cursor() as cursor:
  433. cursor.execute(
  434. "CREATE VIEW inspectdb_people_view AS "
  435. "SELECT id, name FROM inspectdb_people"
  436. )
  437. out = StringIO()
  438. view_model = "class InspectdbPeopleView(models.Model):"
  439. view_managed = "managed = False # Created from a view."
  440. try:
  441. call_command(
  442. "inspectdb",
  443. table_name_filter=inspectdb_views_only,
  444. stdout=out,
  445. )
  446. no_views_output = out.getvalue()
  447. self.assertNotIn(view_model, no_views_output)
  448. self.assertNotIn(view_managed, no_views_output)
  449. call_command(
  450. "inspectdb",
  451. table_name_filter=inspectdb_views_only,
  452. include_views=True,
  453. stdout=out,
  454. )
  455. with_views_output = out.getvalue()
  456. self.assertIn(view_model, with_views_output)
  457. self.assertIn(view_managed, with_views_output)
  458. finally:
  459. with connection.cursor() as cursor:
  460. cursor.execute("DROP VIEW inspectdb_people_view")
  461. @skipUnlessDBFeature("can_introspect_materialized_views")
  462. def test_include_materialized_views(self):
  463. """inspectdb --include-views creates models for materialized views."""
  464. with connection.cursor() as cursor:
  465. cursor.execute(
  466. "CREATE MATERIALIZED VIEW inspectdb_people_materialized AS "
  467. "SELECT id, name FROM inspectdb_people"
  468. )
  469. out = StringIO()
  470. view_model = "class InspectdbPeopleMaterialized(models.Model):"
  471. view_managed = "managed = False # Created from a view."
  472. try:
  473. call_command(
  474. "inspectdb",
  475. table_name_filter=inspectdb_views_only,
  476. stdout=out,
  477. )
  478. no_views_output = out.getvalue()
  479. self.assertNotIn(view_model, no_views_output)
  480. self.assertNotIn(view_managed, no_views_output)
  481. call_command(
  482. "inspectdb",
  483. table_name_filter=inspectdb_views_only,
  484. include_views=True,
  485. stdout=out,
  486. )
  487. with_views_output = out.getvalue()
  488. self.assertIn(view_model, with_views_output)
  489. self.assertIn(view_managed, with_views_output)
  490. finally:
  491. with connection.cursor() as cursor:
  492. cursor.execute("DROP MATERIALIZED VIEW inspectdb_people_materialized")
  493. @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
  494. def test_include_partitions(self):
  495. """inspectdb --include-partitions creates models for partitions."""
  496. with connection.cursor() as cursor:
  497. cursor.execute(
  498. """\
  499. CREATE TABLE inspectdb_partition_parent (name text not null)
  500. PARTITION BY LIST (left(upper(name), 1))
  501. """
  502. )
  503. cursor.execute(
  504. """\
  505. CREATE TABLE inspectdb_partition_child
  506. PARTITION OF inspectdb_partition_parent
  507. FOR VALUES IN ('A', 'B', 'C')
  508. """
  509. )
  510. out = StringIO()
  511. partition_model_parent = "class InspectdbPartitionParent(models.Model):"
  512. partition_model_child = "class InspectdbPartitionChild(models.Model):"
  513. partition_managed = "managed = False # Created from a partition."
  514. try:
  515. call_command(
  516. "inspectdb", table_name_filter=inspectdb_tables_only, stdout=out
  517. )
  518. no_partitions_output = out.getvalue()
  519. self.assertIn(partition_model_parent, no_partitions_output)
  520. self.assertNotIn(partition_model_child, no_partitions_output)
  521. self.assertNotIn(partition_managed, no_partitions_output)
  522. call_command(
  523. "inspectdb",
  524. table_name_filter=inspectdb_tables_only,
  525. include_partitions=True,
  526. stdout=out,
  527. )
  528. with_partitions_output = out.getvalue()
  529. self.assertIn(partition_model_parent, with_partitions_output)
  530. self.assertIn(partition_model_child, with_partitions_output)
  531. self.assertIn(partition_managed, with_partitions_output)
  532. finally:
  533. with connection.cursor() as cursor:
  534. cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_child")
  535. cursor.execute("DROP TABLE IF EXISTS inspectdb_partition_parent")
  536. @skipUnless(connection.vendor == "postgresql", "PostgreSQL specific SQL")
  537. def test_foreign_data_wrapper(self):
  538. with connection.cursor() as cursor:
  539. cursor.execute("CREATE EXTENSION IF NOT EXISTS file_fdw")
  540. cursor.execute(
  541. "CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw"
  542. )
  543. cursor.execute(
  544. connection.ops.compose_sql(
  545. """
  546. CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
  547. petal_length real,
  548. petal_width real,
  549. sepal_length real,
  550. sepal_width real
  551. ) SERVER inspectdb_server OPTIONS (
  552. filename %s
  553. )
  554. """,
  555. [os.devnull],
  556. )
  557. )
  558. out = StringIO()
  559. foreign_table_model = "class InspectdbIrisForeignTable(models.Model):"
  560. foreign_table_managed = "managed = False"
  561. try:
  562. call_command(
  563. "inspectdb",
  564. table_name_filter=inspectdb_tables_only,
  565. stdout=out,
  566. )
  567. output = out.getvalue()
  568. self.assertIn(foreign_table_model, output)
  569. self.assertIn(foreign_table_managed, output)
  570. finally:
  571. with connection.cursor() as cursor:
  572. cursor.execute(
  573. "DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table"
  574. )
  575. cursor.execute("DROP SERVER IF EXISTS inspectdb_server")
  576. cursor.execute("DROP EXTENSION IF EXISTS file_fdw")
  577. @skipUnlessDBFeature("create_test_table_with_composite_primary_key")
  578. def test_composite_primary_key(self):
  579. table_name = "test_table_composite_pk"
  580. with connection.cursor() as cursor:
  581. cursor.execute(
  582. connection.features.create_test_table_with_composite_primary_key
  583. )
  584. out = StringIO()
  585. if connection.vendor == "sqlite":
  586. field_type = connection.features.introspected_field_types["AutoField"]
  587. else:
  588. field_type = connection.features.introspected_field_types["IntegerField"]
  589. try:
  590. call_command("inspectdb", table_name, stdout=out)
  591. output = out.getvalue()
  592. self.assertIn(
  593. f"column_1 = models.{field_type}(primary_key=True) # The composite "
  594. f"primary key (column_1, column_2) found, that is not supported. The "
  595. f"first column is selected.",
  596. output,
  597. )
  598. self.assertIn(
  599. "column_2 = models.%s()"
  600. % connection.features.introspected_field_types["IntegerField"],
  601. output,
  602. )
  603. finally:
  604. with connection.cursor() as cursor:
  605. cursor.execute("DROP TABLE %s" % table_name)