tests.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465
  1. import os
  2. import re
  3. from io import StringIO
  4. from unittest import mock, skipUnless
  5. from django.core.management import call_command
  6. from django.db import connection
  7. from django.db.backends.base.introspection import TableInfo
  8. from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
  9. from .models import PeopleMoreData, test_collation
  10. def inspectdb_tables_only(table_name):
  11. """
  12. Limit introspection to tables created for models of this app.
  13. Some databases such as Oracle are extremely slow at introspection.
  14. """
  15. return table_name.startswith('inspectdb_')
  16. def inspectdb_views_only(table_name):
  17. return (
  18. table_name.startswith('inspectdb_') and
  19. table_name.endswith(('_materialized', '_view'))
  20. )
  21. def special_table_only(table_name):
  22. return table_name.startswith('inspectdb_special')
  23. class InspectDBTestCase(TestCase):
  24. unique_re = re.compile(r'.*unique_together = \((.+),\).*')
  25. def test_stealth_table_name_filter_option(self):
  26. out = StringIO()
  27. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  28. error_message = "inspectdb has examined a table that should have been filtered out."
  29. # contrib.contenttypes is one of the apps always installed when running
  30. # the Django test suite, check that one of its tables hasn't been
  31. # inspected
  32. self.assertNotIn("class DjangoContentType(models.Model):", out.getvalue(), msg=error_message)
  33. def test_table_option(self):
  34. """
  35. inspectdb can inspect a subset of tables by passing the table names as
  36. arguments.
  37. """
  38. out = StringIO()
  39. call_command('inspectdb', 'inspectdb_people', stdout=out)
  40. output = out.getvalue()
  41. self.assertIn('class InspectdbPeople(models.Model):', output)
  42. self.assertNotIn("InspectdbPeopledata", output)
  43. def make_field_type_asserter(self):
  44. """Call inspectdb and return a function to validate a field type in its output"""
  45. out = StringIO()
  46. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  47. output = out.getvalue()
  48. def assertFieldType(name, definition):
  49. out_def = re.search(r'^\s*%s = (models.*)$' % name, output, re.MULTILINE)[1]
  50. self.assertEqual(definition, out_def)
  51. return assertFieldType
  52. def test_field_types(self):
  53. """Test introspection of various Django field types"""
  54. assertFieldType = self.make_field_type_asserter()
  55. introspected_field_types = connection.features.introspected_field_types
  56. char_field_type = introspected_field_types['CharField']
  57. # Inspecting Oracle DB doesn't produce correct results (#19884):
  58. # - it reports fields as blank=True when they aren't.
  59. if not connection.features.interprets_empty_strings_as_nulls and char_field_type == 'CharField':
  60. assertFieldType('char_field', "models.CharField(max_length=10)")
  61. assertFieldType('null_char_field', "models.CharField(max_length=10, blank=True, null=True)")
  62. assertFieldType('email_field', "models.CharField(max_length=254)")
  63. assertFieldType('file_field', "models.CharField(max_length=100)")
  64. assertFieldType('file_path_field', "models.CharField(max_length=100)")
  65. assertFieldType('slug_field', "models.CharField(max_length=50)")
  66. assertFieldType('text_field', "models.TextField()")
  67. assertFieldType('url_field', "models.CharField(max_length=200)")
  68. if char_field_type == 'TextField':
  69. assertFieldType('char_field', 'models.TextField()')
  70. assertFieldType('null_char_field', 'models.TextField(blank=True, null=True)')
  71. assertFieldType('email_field', 'models.TextField()')
  72. assertFieldType('file_field', 'models.TextField()')
  73. assertFieldType('file_path_field', 'models.TextField()')
  74. assertFieldType('slug_field', 'models.TextField()')
  75. assertFieldType('text_field', 'models.TextField()')
  76. assertFieldType('url_field', 'models.TextField()')
  77. assertFieldType('date_field', "models.DateField()")
  78. assertFieldType('date_time_field', "models.DateTimeField()")
  79. if introspected_field_types['GenericIPAddressField'] == 'GenericIPAddressField':
  80. assertFieldType('gen_ip_address_field', "models.GenericIPAddressField()")
  81. elif not connection.features.interprets_empty_strings_as_nulls:
  82. assertFieldType('gen_ip_address_field', "models.CharField(max_length=39)")
  83. assertFieldType('time_field', 'models.%s()' % introspected_field_types['TimeField'])
  84. if connection.features.has_native_uuid_field:
  85. assertFieldType('uuid_field', "models.UUIDField()")
  86. elif not connection.features.interprets_empty_strings_as_nulls:
  87. assertFieldType('uuid_field', "models.CharField(max_length=32)")
  88. @skipUnlessDBFeature('can_introspect_json_field', 'supports_json_field')
  89. def test_json_field(self):
  90. out = StringIO()
  91. call_command('inspectdb', 'inspectdb_jsonfieldcolumntype', stdout=out)
  92. output = out.getvalue()
  93. if not connection.features.interprets_empty_strings_as_nulls:
  94. self.assertIn('json_field = models.JSONField()', output)
  95. self.assertIn('null_json_field = models.JSONField(blank=True, null=True)', output)
  96. @skipUnlessDBFeature('supports_collation_on_charfield')
  97. @skipUnless(test_collation, 'Language collations are not supported.')
  98. def test_char_field_db_collation(self):
  99. out = StringIO()
  100. call_command('inspectdb', 'inspectdb_charfielddbcollation', stdout=out)
  101. output = out.getvalue()
  102. if not connection.features.interprets_empty_strings_as_nulls:
  103. self.assertIn(
  104. "char_field = models.CharField(max_length=10, "
  105. "db_collation='%s')" % test_collation,
  106. output,
  107. )
  108. else:
  109. self.assertIn(
  110. "char_field = models.CharField(max_length=10, "
  111. "db_collation='%s', blank=True, null=True)" % test_collation,
  112. output,
  113. )
  114. @skipUnlessDBFeature('supports_collation_on_textfield')
  115. @skipUnless(test_collation, 'Language collations are not supported.')
  116. def test_text_field_db_collation(self):
  117. out = StringIO()
  118. call_command('inspectdb', 'inspectdb_textfielddbcollation', stdout=out)
  119. output = out.getvalue()
  120. if not connection.features.interprets_empty_strings_as_nulls:
  121. self.assertIn(
  122. "text_field = models.TextField(db_collation='%s')" % test_collation,
  123. output,
  124. )
  125. else:
  126. self.assertIn(
  127. "text_field = models.TextField(db_collation='%s, blank=True, "
  128. "null=True)" % test_collation,
  129. output,
  130. )
  131. def test_number_field_types(self):
  132. """Test introspection of various Django field types"""
  133. assertFieldType = self.make_field_type_asserter()
  134. introspected_field_types = connection.features.introspected_field_types
  135. auto_field_type = connection.features.introspected_field_types['AutoField']
  136. if auto_field_type != 'AutoField':
  137. assertFieldType('id', "models.%s(primary_key=True) # AutoField?" % auto_field_type)
  138. assertFieldType('big_int_field', 'models.%s()' % introspected_field_types['BigIntegerField'])
  139. bool_field_type = introspected_field_types['BooleanField']
  140. assertFieldType('bool_field', "models.{}()".format(bool_field_type))
  141. assertFieldType('null_bool_field', 'models.{}(blank=True, null=True)'.format(bool_field_type))
  142. if connection.vendor != 'sqlite':
  143. assertFieldType('decimal_field', "models.DecimalField(max_digits=6, decimal_places=1)")
  144. else: # Guessed arguments on SQLite, see #5014
  145. assertFieldType('decimal_field', "models.DecimalField(max_digits=10, decimal_places=5) "
  146. "# max_digits and decimal_places have been guessed, "
  147. "as this database handles decimal fields as float")
  148. assertFieldType('float_field', "models.FloatField()")
  149. assertFieldType('int_field', 'models.%s()' % introspected_field_types['IntegerField'])
  150. assertFieldType('pos_int_field', 'models.%s()' % introspected_field_types['PositiveIntegerField'])
  151. assertFieldType('pos_big_int_field', 'models.%s()' % introspected_field_types['PositiveBigIntegerField'])
  152. assertFieldType('pos_small_int_field', 'models.%s()' % introspected_field_types['PositiveSmallIntegerField'])
  153. assertFieldType('small_int_field', 'models.%s()' % introspected_field_types['SmallIntegerField'])
  154. @skipUnlessDBFeature('can_introspect_foreign_keys')
  155. def test_attribute_name_not_python_keyword(self):
  156. out = StringIO()
  157. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  158. output = out.getvalue()
  159. error_message = "inspectdb generated an attribute name which is a Python keyword"
  160. # Recursive foreign keys should be set to 'self'
  161. self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
  162. self.assertNotIn(
  163. "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
  164. output,
  165. msg=error_message,
  166. )
  167. # As InspectdbPeople model is defined after InspectdbMessage, it should be quoted
  168. self.assertIn(
  169. "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, db_column='from_id')",
  170. output,
  171. )
  172. self.assertIn(
  173. 'people_pk = models.OneToOneField(InspectdbPeople, models.DO_NOTHING, primary_key=True)',
  174. output,
  175. )
  176. self.assertIn(
  177. 'people_unique = models.OneToOneField(InspectdbPeople, models.DO_NOTHING)',
  178. output,
  179. )
  180. def test_digits_column_name_introspection(self):
  181. """Introspection of column names consist/start with digits (#16536/#17676)"""
  182. char_field_type = connection.features.introspected_field_types['CharField']
  183. out = StringIO()
  184. call_command('inspectdb', 'inspectdb_digitsincolumnname', stdout=out)
  185. output = out.getvalue()
  186. error_message = "inspectdb generated a model field name which is a number"
  187. self.assertNotIn(' 123 = models.%s' % char_field_type, output, msg=error_message)
  188. self.assertIn('number_123 = models.%s' % char_field_type, output)
  189. error_message = "inspectdb generated a model field name which starts with a digit"
  190. self.assertNotIn(' 4extra = models.%s' % char_field_type, output, msg=error_message)
  191. self.assertIn('number_4extra = models.%s' % char_field_type, output)
  192. self.assertNotIn(' 45extra = models.%s' % char_field_type, output, msg=error_message)
  193. self.assertIn('number_45extra = models.%s' % char_field_type, output)
  194. def test_special_column_name_introspection(self):
  195. """
  196. Introspection of column names containing special characters,
  197. unsuitable for Python identifiers
  198. """
  199. out = StringIO()
  200. call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
  201. output = out.getvalue()
  202. base_name = connection.introspection.identifier_converter('Field')
  203. integer_field_type = connection.features.introspected_field_types['IntegerField']
  204. self.assertIn("field = models.%s()" % integer_field_type, output)
  205. self.assertIn("field_field = models.%s(db_column='%s_')" % (integer_field_type, base_name), output)
  206. self.assertIn("field_field_0 = models.%s(db_column='%s__')" % (integer_field_type, base_name), output)
  207. self.assertIn("field_field_1 = models.%s(db_column='__field')" % integer_field_type, output)
  208. self.assertIn("prc_x = models.{}(db_column='prc(%) x')".format(integer_field_type), output)
  209. self.assertIn("tamaño = models.%s()" % integer_field_type, output)
  210. def test_table_name_introspection(self):
  211. """
  212. Introspection of table names containing special characters,
  213. unsuitable for Python identifiers
  214. """
  215. out = StringIO()
  216. call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
  217. output = out.getvalue()
  218. self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
  219. def test_managed_models(self):
  220. """By default the command generates models with `Meta.managed = False` (#14305)"""
  221. out = StringIO()
  222. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  223. output = out.getvalue()
  224. self.longMessage = False
  225. self.assertIn(" managed = False", output, msg='inspectdb should generate unmanaged models.')
  226. def test_unique_together_meta(self):
  227. out = StringIO()
  228. call_command('inspectdb', 'inspectdb_uniquetogether', stdout=out)
  229. output = out.getvalue()
  230. self.assertIn(" unique_together = (('", output)
  231. unique_together_match = self.unique_re.findall(output)
  232. # There should be one unique_together tuple.
  233. self.assertEqual(len(unique_together_match), 1)
  234. fields = unique_together_match[0]
  235. # Fields with db_column = field name.
  236. self.assertIn("('field1', 'field2')", fields)
  237. # Fields from columns whose names are Python keywords.
  238. self.assertIn("('field1', 'field2')", fields)
  239. # Fields whose names normalize to the same Python field name and hence
  240. # are given an integer suffix.
  241. self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
  242. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  243. def test_unsupported_unique_together(self):
  244. """Unsupported index types (COALESCE here) are skipped."""
  245. with connection.cursor() as c:
  246. c.execute(
  247. 'CREATE UNIQUE INDEX Findex ON %s '
  248. '(id, people_unique_id, COALESCE(message_id, -1))' % PeopleMoreData._meta.db_table
  249. )
  250. try:
  251. out = StringIO()
  252. call_command(
  253. 'inspectdb',
  254. table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
  255. stdout=out,
  256. )
  257. output = out.getvalue()
  258. self.assertIn('# A unique constraint could not be introspected.', output)
  259. self.assertEqual(self.unique_re.findall(output), ["('id', 'people_unique')"])
  260. finally:
  261. with connection.cursor() as c:
  262. c.execute('DROP INDEX Findex')
  263. @skipUnless(connection.vendor == 'sqlite',
  264. "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test")
  265. def test_custom_fields(self):
  266. """
  267. Introspection of columns with a custom field (#21090)
  268. """
  269. out = StringIO()
  270. orig_data_types_reverse = connection.introspection.data_types_reverse
  271. try:
  272. connection.introspection.data_types_reverse = {
  273. 'text': 'myfields.TextField',
  274. 'bigint': 'BigIntegerField',
  275. }
  276. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  277. output = out.getvalue()
  278. self.assertIn("text_field = myfields.TextField()", output)
  279. self.assertIn("big_int_field = models.BigIntegerField()", output)
  280. finally:
  281. connection.introspection.data_types_reverse = orig_data_types_reverse
  282. def test_introspection_errors(self):
  283. """
  284. Introspection errors should not crash the command, and the error should
  285. be visible in the output.
  286. """
  287. out = StringIO()
  288. with mock.patch('django.db.connection.introspection.get_table_list',
  289. return_value=[TableInfo(name='nonexistent', type='t')]):
  290. call_command('inspectdb', stdout=out)
  291. output = out.getvalue()
  292. self.assertIn("# Unable to inspect table 'nonexistent'", output)
  293. # The error message depends on the backend
  294. self.assertIn("# The error was:", output)
  295. class InspectDBTransactionalTests(TransactionTestCase):
  296. available_apps = ['inspectdb']
  297. def test_include_views(self):
  298. """inspectdb --include-views creates models for database views."""
  299. with connection.cursor() as cursor:
  300. cursor.execute(
  301. 'CREATE VIEW inspectdb_people_view AS '
  302. 'SELECT id, name FROM inspectdb_people'
  303. )
  304. out = StringIO()
  305. view_model = 'class InspectdbPeopleView(models.Model):'
  306. view_managed = 'managed = False # Created from a view.'
  307. try:
  308. call_command(
  309. 'inspectdb',
  310. table_name_filter=inspectdb_views_only,
  311. stdout=out,
  312. )
  313. no_views_output = out.getvalue()
  314. self.assertNotIn(view_model, no_views_output)
  315. self.assertNotIn(view_managed, no_views_output)
  316. call_command(
  317. 'inspectdb',
  318. table_name_filter=inspectdb_views_only,
  319. include_views=True,
  320. stdout=out,
  321. )
  322. with_views_output = out.getvalue()
  323. self.assertIn(view_model, with_views_output)
  324. self.assertIn(view_managed, with_views_output)
  325. finally:
  326. with connection.cursor() as cursor:
  327. cursor.execute('DROP VIEW inspectdb_people_view')
  328. @skipUnlessDBFeature('can_introspect_materialized_views')
  329. def test_include_materialized_views(self):
  330. """inspectdb --include-views creates models for materialized views."""
  331. with connection.cursor() as cursor:
  332. cursor.execute(
  333. 'CREATE MATERIALIZED VIEW inspectdb_people_materialized AS '
  334. 'SELECT id, name FROM inspectdb_people'
  335. )
  336. out = StringIO()
  337. view_model = 'class InspectdbPeopleMaterialized(models.Model):'
  338. view_managed = 'managed = False # Created from a view.'
  339. try:
  340. call_command(
  341. 'inspectdb',
  342. table_name_filter=inspectdb_views_only,
  343. stdout=out,
  344. )
  345. no_views_output = out.getvalue()
  346. self.assertNotIn(view_model, no_views_output)
  347. self.assertNotIn(view_managed, no_views_output)
  348. call_command(
  349. 'inspectdb',
  350. table_name_filter=inspectdb_views_only,
  351. include_views=True,
  352. stdout=out,
  353. )
  354. with_views_output = out.getvalue()
  355. self.assertIn(view_model, with_views_output)
  356. self.assertIn(view_managed, with_views_output)
  357. finally:
  358. with connection.cursor() as cursor:
  359. cursor.execute('DROP MATERIALIZED VIEW inspectdb_people_materialized')
  360. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  361. def test_include_partitions(self):
  362. """inspectdb --include-partitions creates models for partitions."""
  363. with connection.cursor() as cursor:
  364. cursor.execute('''\
  365. CREATE TABLE inspectdb_partition_parent (name text not null)
  366. PARTITION BY LIST (left(upper(name), 1))
  367. ''')
  368. cursor.execute('''\
  369. CREATE TABLE inspectdb_partition_child
  370. PARTITION OF inspectdb_partition_parent
  371. FOR VALUES IN ('A', 'B', 'C')
  372. ''')
  373. out = StringIO()
  374. partition_model_parent = 'class InspectdbPartitionParent(models.Model):'
  375. partition_model_child = 'class InspectdbPartitionChild(models.Model):'
  376. partition_managed = 'managed = False # Created from a partition.'
  377. try:
  378. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  379. no_partitions_output = out.getvalue()
  380. self.assertIn(partition_model_parent, no_partitions_output)
  381. self.assertNotIn(partition_model_child, no_partitions_output)
  382. self.assertNotIn(partition_managed, no_partitions_output)
  383. call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_partitions=True, stdout=out)
  384. with_partitions_output = out.getvalue()
  385. self.assertIn(partition_model_parent, with_partitions_output)
  386. self.assertIn(partition_model_child, with_partitions_output)
  387. self.assertIn(partition_managed, with_partitions_output)
  388. finally:
  389. with connection.cursor() as cursor:
  390. cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_child')
  391. cursor.execute('DROP TABLE IF EXISTS inspectdb_partition_parent')
  392. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  393. def test_foreign_data_wrapper(self):
  394. with connection.cursor() as cursor:
  395. cursor.execute('CREATE EXTENSION IF NOT EXISTS file_fdw')
  396. cursor.execute('CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw')
  397. cursor.execute('''\
  398. CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
  399. petal_length real,
  400. petal_width real,
  401. sepal_length real,
  402. sepal_width real
  403. ) SERVER inspectdb_server OPTIONS (
  404. filename %s
  405. )
  406. ''', [os.devnull])
  407. out = StringIO()
  408. foreign_table_model = 'class InspectdbIrisForeignTable(models.Model):'
  409. foreign_table_managed = 'managed = False'
  410. try:
  411. call_command(
  412. 'inspectdb',
  413. table_name_filter=inspectdb_tables_only,
  414. stdout=out,
  415. )
  416. output = out.getvalue()
  417. self.assertIn(foreign_table_model, output)
  418. self.assertIn(foreign_table_managed, output)
  419. finally:
  420. with connection.cursor() as cursor:
  421. cursor.execute('DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table')
  422. cursor.execute('DROP SERVER IF EXISTS inspectdb_server')
  423. cursor.execute('DROP EXTENSION IF EXISTS file_fdw')