tests.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. import re
  2. from io import StringIO
  3. from unittest import mock, skipUnless
  4. from django.core.management import call_command
  5. from django.db import connection
  6. from django.db.backends.base.introspection import TableInfo
  7. from django.test import TestCase, TransactionTestCase, skipUnlessDBFeature
  8. from .models import PeopleMoreData
  9. def inspectdb_tables_only(table_name):
  10. """
  11. Limit introspection to tables created for models of this app.
  12. Some databases such as Oracle are extremely slow at introspection.
  13. """
  14. return table_name.startswith('inspectdb_')
  15. def special_table_only(table_name):
  16. return table_name.startswith('inspectdb_special')
  17. class InspectDBTestCase(TestCase):
  18. unique_re = re.compile(r'.*unique_together = \((.+),\).*')
  19. def test_stealth_table_name_filter_option(self):
  20. out = StringIO()
  21. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  22. error_message = "inspectdb has examined a table that should have been filtered out."
  23. # contrib.contenttypes is one of the apps always installed when running
  24. # the Django test suite, check that one of its tables hasn't been
  25. # inspected
  26. self.assertNotIn("class DjangoContentType(models.Model):", out.getvalue(), msg=error_message)
  27. def test_table_option(self):
  28. """
  29. inspectdb can inspect a subset of tables by passing the table names as
  30. arguments.
  31. """
  32. out = StringIO()
  33. call_command('inspectdb', 'inspectdb_people', stdout=out)
  34. output = out.getvalue()
  35. self.assertIn('class InspectdbPeople(models.Model):', output)
  36. self.assertNotIn("InspectdbPeopledata", output)
  37. def make_field_type_asserter(self):
  38. """Call inspectdb and return a function to validate a field type in its output"""
  39. out = StringIO()
  40. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  41. output = out.getvalue()
  42. def assertFieldType(name, definition):
  43. out_def = re.search(r'^\s*%s = (models.*)$' % name, output, re.MULTILINE).groups()[0]
  44. self.assertEqual(definition, out_def)
  45. return assertFieldType
  46. def test_field_types(self):
  47. """Test introspection of various Django field types"""
  48. assertFieldType = self.make_field_type_asserter()
  49. # Inspecting Oracle DB doesn't produce correct results (#19884):
  50. # - it reports fields as blank=True when they aren't.
  51. if not connection.features.interprets_empty_strings_as_nulls:
  52. assertFieldType('char_field', "models.CharField(max_length=10)")
  53. assertFieldType('null_char_field', "models.CharField(max_length=10, blank=True, null=True)")
  54. assertFieldType('email_field', "models.CharField(max_length=254)")
  55. assertFieldType('file_field', "models.CharField(max_length=100)")
  56. assertFieldType('file_path_field', "models.CharField(max_length=100)")
  57. assertFieldType('slug_field', "models.CharField(max_length=50)")
  58. assertFieldType('text_field', "models.TextField()")
  59. assertFieldType('url_field', "models.CharField(max_length=200)")
  60. assertFieldType('date_field', "models.DateField()")
  61. assertFieldType('date_time_field', "models.DateTimeField()")
  62. if connection.features.can_introspect_ip_address_field:
  63. assertFieldType('gen_ip_address_field', "models.GenericIPAddressField()")
  64. elif not connection.features.interprets_empty_strings_as_nulls:
  65. assertFieldType('gen_ip_address_field', "models.CharField(max_length=39)")
  66. if connection.features.can_introspect_time_field:
  67. assertFieldType('time_field', "models.TimeField()")
  68. if connection.features.has_native_uuid_field:
  69. assertFieldType('uuid_field', "models.UUIDField()")
  70. elif not connection.features.interprets_empty_strings_as_nulls:
  71. assertFieldType('uuid_field', "models.CharField(max_length=32)")
  72. def test_number_field_types(self):
  73. """Test introspection of various Django field types"""
  74. assertFieldType = self.make_field_type_asserter()
  75. if not connection.features.can_introspect_autofield:
  76. assertFieldType('id', "models.IntegerField(primary_key=True) # AutoField?")
  77. if connection.features.can_introspect_big_integer_field:
  78. assertFieldType('big_int_field', "models.BigIntegerField()")
  79. else:
  80. assertFieldType('big_int_field', "models.IntegerField()")
  81. bool_field_type = connection.features.introspected_boolean_field_type
  82. assertFieldType('bool_field', "models.{}()".format(bool_field_type))
  83. assertFieldType('null_bool_field', 'models.{}(blank=True, null=True)'.format(bool_field_type))
  84. if connection.features.can_introspect_decimal_field:
  85. assertFieldType('decimal_field', "models.DecimalField(max_digits=6, decimal_places=1)")
  86. else: # Guessed arguments on SQLite, see #5014
  87. assertFieldType('decimal_field', "models.DecimalField(max_digits=10, decimal_places=5) "
  88. "# max_digits and decimal_places have been guessed, "
  89. "as this database handles decimal fields as float")
  90. assertFieldType('float_field', "models.FloatField()")
  91. assertFieldType('int_field', "models.IntegerField()")
  92. if connection.features.can_introspect_positive_integer_field:
  93. assertFieldType('pos_int_field', "models.PositiveIntegerField()")
  94. else:
  95. assertFieldType('pos_int_field', "models.IntegerField()")
  96. if connection.features.can_introspect_positive_integer_field:
  97. if connection.features.can_introspect_small_integer_field:
  98. assertFieldType('pos_small_int_field', "models.PositiveSmallIntegerField()")
  99. else:
  100. assertFieldType('pos_small_int_field', "models.PositiveIntegerField()")
  101. else:
  102. if connection.features.can_introspect_small_integer_field:
  103. assertFieldType('pos_small_int_field', "models.SmallIntegerField()")
  104. else:
  105. assertFieldType('pos_small_int_field', "models.IntegerField()")
  106. if connection.features.can_introspect_small_integer_field:
  107. assertFieldType('small_int_field', "models.SmallIntegerField()")
  108. else:
  109. assertFieldType('small_int_field', "models.IntegerField()")
  110. @skipUnlessDBFeature('can_introspect_foreign_keys')
  111. def test_attribute_name_not_python_keyword(self):
  112. out = StringIO()
  113. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  114. output = out.getvalue()
  115. error_message = "inspectdb generated an attribute name which is a Python keyword"
  116. # Recursive foreign keys should be set to 'self'
  117. self.assertIn("parent = models.ForeignKey('self', models.DO_NOTHING)", output)
  118. self.assertNotIn(
  119. "from = models.ForeignKey(InspectdbPeople, models.DO_NOTHING)",
  120. output,
  121. msg=error_message,
  122. )
  123. # As InspectdbPeople model is defined after InspectdbMessage, it should be quoted
  124. self.assertIn(
  125. "from_field = models.ForeignKey('InspectdbPeople', models.DO_NOTHING, db_column='from_id')",
  126. output,
  127. )
  128. self.assertIn(
  129. "people_pk = models.ForeignKey(InspectdbPeople, models.DO_NOTHING, primary_key=True)",
  130. output,
  131. )
  132. self.assertIn(
  133. "people_unique = models.ForeignKey(InspectdbPeople, models.DO_NOTHING, unique=True)",
  134. output,
  135. )
  136. def test_digits_column_name_introspection(self):
  137. """Introspection of column names consist/start with digits (#16536/#17676)"""
  138. out = StringIO()
  139. call_command('inspectdb', 'inspectdb_digitsincolumnname', stdout=out)
  140. output = out.getvalue()
  141. error_message = "inspectdb generated a model field name which is a number"
  142. self.assertNotIn(" 123 = models.CharField", output, msg=error_message)
  143. self.assertIn("number_123 = models.CharField", output)
  144. error_message = "inspectdb generated a model field name which starts with a digit"
  145. self.assertNotIn(" 4extra = models.CharField", output, msg=error_message)
  146. self.assertIn("number_4extra = models.CharField", output)
  147. self.assertNotIn(" 45extra = models.CharField", output, msg=error_message)
  148. self.assertIn("number_45extra = models.CharField", output)
  149. def test_special_column_name_introspection(self):
  150. """
  151. Introspection of column names containing special characters,
  152. unsuitable for Python identifiers
  153. """
  154. out = StringIO()
  155. call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
  156. output = out.getvalue()
  157. base_name = 'field' if connection.features.uppercases_column_names else 'Field'
  158. self.assertIn("field = models.IntegerField()", output)
  159. self.assertIn("field_field = models.IntegerField(db_column='%s_')" % base_name, output)
  160. self.assertIn("field_field_0 = models.IntegerField(db_column='%s__')" % base_name, output)
  161. self.assertIn("field_field_1 = models.IntegerField(db_column='__field')", output)
  162. self.assertIn("prc_x = models.IntegerField(db_column='prc(%) x')", output)
  163. self.assertIn("tamaño = models.IntegerField()", output)
  164. def test_table_name_introspection(self):
  165. """
  166. Introspection of table names containing special characters,
  167. unsuitable for Python identifiers
  168. """
  169. out = StringIO()
  170. call_command('inspectdb', table_name_filter=special_table_only, stdout=out)
  171. output = out.getvalue()
  172. self.assertIn("class InspectdbSpecialTableName(models.Model):", output)
  173. def test_managed_models(self):
  174. """By default the command generates models with `Meta.managed = False` (#14305)"""
  175. out = StringIO()
  176. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  177. output = out.getvalue()
  178. self.longMessage = False
  179. self.assertIn(" managed = False", output, msg='inspectdb should generate unmanaged models.')
  180. def test_unique_together_meta(self):
  181. out = StringIO()
  182. call_command('inspectdb', 'inspectdb_uniquetogether', stdout=out)
  183. output = out.getvalue()
  184. self.assertIn(" unique_together = (('", output)
  185. unique_together_match = re.findall(self.unique_re, output)
  186. # There should be one unique_together tuple.
  187. self.assertEqual(len(unique_together_match), 1)
  188. fields = unique_together_match[0]
  189. # Fields with db_column = field name.
  190. self.assertIn("('field1', 'field2')", fields)
  191. # Fields from columns whose names are Python keywords.
  192. self.assertIn("('field1', 'field2')", fields)
  193. # Fields whose names normalize to the same Python field name and hence
  194. # are given an integer suffix.
  195. self.assertIn("('non_unique_column', 'non_unique_column_0')", fields)
  196. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  197. def test_unsupported_unique_together(self):
  198. """Unsupported index types (COALESCE here) are skipped."""
  199. with connection.cursor() as c:
  200. c.execute(
  201. 'CREATE UNIQUE INDEX Findex ON %s '
  202. '(id, people_unique_id, COALESCE(message_id, -1))' % PeopleMoreData._meta.db_table
  203. )
  204. try:
  205. out = StringIO()
  206. call_command(
  207. 'inspectdb',
  208. table_name_filter=lambda tn: tn.startswith(PeopleMoreData._meta.db_table),
  209. stdout=out,
  210. )
  211. output = out.getvalue()
  212. self.assertIn('# A unique constraint could not be introspected.', output)
  213. self.assertEqual(re.findall(self.unique_re, output), ["('id', 'people_unique')"])
  214. finally:
  215. with connection.cursor() as c:
  216. c.execute('DROP INDEX Findex')
  217. @skipUnless(connection.vendor == 'sqlite',
  218. "Only patched sqlite's DatabaseIntrospection.data_types_reverse for this test")
  219. def test_custom_fields(self):
  220. """
  221. Introspection of columns with a custom field (#21090)
  222. """
  223. out = StringIO()
  224. orig_data_types_reverse = connection.introspection.data_types_reverse
  225. try:
  226. connection.introspection.data_types_reverse = {
  227. 'text': 'myfields.TextField',
  228. 'bigint': 'BigIntegerField',
  229. }
  230. call_command('inspectdb', 'inspectdb_columntypes', stdout=out)
  231. output = out.getvalue()
  232. self.assertIn("text_field = myfields.TextField()", output)
  233. self.assertIn("big_int_field = models.BigIntegerField()", output)
  234. finally:
  235. connection.introspection.data_types_reverse = orig_data_types_reverse
  236. def test_introspection_errors(self):
  237. """
  238. Introspection errors should not crash the command, and the error should
  239. be visible in the output.
  240. """
  241. out = StringIO()
  242. with mock.patch('django.db.connection.introspection.get_table_list',
  243. return_value=[TableInfo(name='nonexistent', type='t')]):
  244. call_command('inspectdb', stdout=out)
  245. output = out.getvalue()
  246. self.assertIn("# Unable to inspect table 'nonexistent'", output)
  247. # The error message depends on the backend
  248. self.assertIn("# The error was:", output)
  249. class InspectDBTransactionalTests(TransactionTestCase):
  250. available_apps = None
  251. def test_include_views(self):
  252. """inspectdb --include-views creates models for database views."""
  253. with connection.cursor() as cursor:
  254. cursor.execute(
  255. 'CREATE VIEW inspectdb_people_view AS '
  256. 'SELECT id, name FROM inspectdb_people'
  257. )
  258. out = StringIO()
  259. view_model = 'class InspectdbPeopleView(models.Model):'
  260. view_managed = 'managed = False # Created from a view.'
  261. try:
  262. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  263. no_views_output = out.getvalue()
  264. self.assertNotIn(view_model, no_views_output)
  265. self.assertNotIn(view_managed, no_views_output)
  266. call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_views=True, stdout=out)
  267. with_views_output = out.getvalue()
  268. self.assertIn(view_model, with_views_output)
  269. self.assertIn(view_managed, with_views_output)
  270. finally:
  271. with connection.cursor() as cursor:
  272. cursor.execute('DROP VIEW inspectdb_people_view')
  273. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  274. def test_include_materialized_views(self):
  275. """inspectdb --include-views creates models for database materialized views."""
  276. with connection.cursor() as cursor:
  277. cursor.execute(
  278. 'CREATE MATERIALIZED VIEW inspectdb_people_materialized_view AS '
  279. 'SELECT id, name FROM inspectdb_people'
  280. )
  281. out = StringIO()
  282. view_model = 'class InspectdbPeopleMaterializedView(models.Model):'
  283. view_managed = 'managed = False # Created from a view.'
  284. try:
  285. call_command('inspectdb', table_name_filter=inspectdb_tables_only, stdout=out)
  286. no_views_output = out.getvalue()
  287. self.assertNotIn(view_model, no_views_output)
  288. self.assertNotIn(view_managed, no_views_output)
  289. call_command('inspectdb', table_name_filter=inspectdb_tables_only, include_views=True, stdout=out)
  290. with_views_output = out.getvalue()
  291. self.assertIn(view_model, with_views_output)
  292. self.assertIn(view_managed, with_views_output)
  293. finally:
  294. with connection.cursor() as cursor:
  295. cursor.execute('DROP MATERIALIZED VIEW IF EXISTS inspectdb_people_materialized_view')
  296. @skipUnless(connection.vendor == 'postgresql', 'PostgreSQL specific SQL')
  297. def test_foreign_data_wrapper(self):
  298. with connection.cursor() as cursor:
  299. cursor.execute('CREATE EXTENSION IF NOT EXISTS file_fdw')
  300. cursor.execute('CREATE SERVER inspectdb_server FOREIGN DATA WRAPPER file_fdw')
  301. cursor.execute('''\
  302. CREATE FOREIGN TABLE inspectdb_iris_foreign_table (
  303. petal_length real,
  304. petal_width real,
  305. sepal_length real,
  306. sepal_width real
  307. ) SERVER inspectdb_server OPTIONS (
  308. filename '/dev/null'
  309. )
  310. ''')
  311. out = StringIO()
  312. foreign_table_model = 'class InspectdbIrisForeignTable(models.Model):'
  313. foreign_table_managed = 'managed = False'
  314. try:
  315. call_command('inspectdb', stdout=out)
  316. output = out.getvalue()
  317. self.assertIn(foreign_table_model, output)
  318. self.assertIn(foreign_table_managed, output)
  319. finally:
  320. with connection.cursor() as cursor:
  321. cursor.execute('DROP FOREIGN TABLE IF EXISTS inspectdb_iris_foreign_table')
  322. cursor.execute('DROP SERVER IF EXISTS inspectdb_server')
  323. cursor.execute('DROP EXTENSION IF EXISTS file_fdw')