tests.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. from __future__ import unicode_literals
  2. import re
  3. import unittest
  4. from django.apps import apps
  5. from django.core.management.color import no_style
  6. from django.core.management.sql import (sql_create, sql_delete, sql_indexes,
  7. sql_destroy_indexes, sql_all)
  8. from django.db import connections, DEFAULT_DB_ALIAS
  9. from django.test import TestCase, ignore_warnings, override_settings
  10. from django.utils import six
  11. from django.utils.deprecation import RemovedInDjango20Warning
  12. # See also initial_sql_regress for 'custom_sql_for_model' tests
  13. class SQLCommandsTestCase(TestCase):
  14. """Tests for several functions in django/core/management/sql.py"""
  15. def count_ddl(self, output, cmd):
  16. return len([o for o in output if o.startswith(cmd)])
  17. def test_sql_create(self):
  18. app_config = apps.get_app_config('commands_sql')
  19. output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  20. tables = set()
  21. create_table_re = re.compile(r'^create table .(?P<table>[\w_]+).*', re.IGNORECASE)
  22. reference_re = re.compile(r'.* references .(?P<table>[\w_]+).*', re.IGNORECASE)
  23. for statement in output:
  24. create_table = create_table_re.match(statement)
  25. if create_table:
  26. # Lower since Oracle's table names are upper cased.
  27. tables.add(create_table.group('table').lower())
  28. continue
  29. reference = reference_re.match(statement)
  30. if reference:
  31. # Lower since Oracle's table names are upper cased.
  32. table = reference.group('table').lower()
  33. self.assertIn(
  34. table, tables, "The table %s is referenced before its creation." % table
  35. )
  36. self.assertEqual(tables, {
  37. 'commands_sql_comment', 'commands_sql_book', 'commands_sql_book_comments'
  38. })
  39. @unittest.skipUnless('PositiveIntegerField' in connections[DEFAULT_DB_ALIAS].data_type_check_constraints, 'Backend does not have checks.')
  40. def test_sql_create_check(self):
  41. """Regression test for #23416 -- Check that db_params['check'] is respected."""
  42. app_config = apps.get_app_config('commands_sql')
  43. output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  44. success = False
  45. for statement in output:
  46. if 'CHECK' in statement:
  47. success = True
  48. if not success:
  49. self.fail("'CHECK' not found in output %s" % output)
  50. def test_sql_delete(self):
  51. app_config = apps.get_app_config('commands_sql')
  52. output = sql_delete(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
  53. drop_tables = [o for o in output if o.startswith('DROP TABLE')]
  54. self.assertEqual(len(drop_tables), 3)
  55. # Lower so that Oracle's upper case tbl names wont break
  56. sql = drop_tables[-1].lower()
  57. six.assertRegex(self, sql, r'^drop table .commands_sql_comment.*')
  58. @ignore_warnings(category=RemovedInDjango20Warning)
  59. def test_sql_indexes(self):
  60. app_config = apps.get_app_config('commands_sql')
  61. output = sql_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  62. # Number of indexes is backend-dependent
  63. self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
  64. def test_sql_destroy_indexes(self):
  65. app_config = apps.get_app_config('commands_sql')
  66. output = sql_destroy_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  67. # Number of indexes is backend-dependent
  68. self.assertTrue(1 <= self.count_ddl(output, 'DROP INDEX') <= 4)
  69. @ignore_warnings(category=RemovedInDjango20Warning)
  70. def test_sql_all(self):
  71. app_config = apps.get_app_config('commands_sql')
  72. output = sql_all(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  73. self.assertEqual(self.count_ddl(output, 'CREATE TABLE'), 3)
  74. # Number of indexes is backend-dependent
  75. self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
  76. class TestRouter(object):
  77. def allow_migrate(self, db, model):
  78. return False
  79. @override_settings(DATABASE_ROUTERS=[TestRouter()])
  80. class SQLCommandsRouterTestCase(TestCase):
  81. def test_router_honored(self):
  82. app_config = apps.get_app_config('commands_sql')
  83. for sql_command in (sql_all, sql_create, sql_delete, sql_indexes, sql_destroy_indexes):
  84. if sql_command is sql_delete:
  85. output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
  86. # "App creates no tables in the database. Nothing to do."
  87. expected_output = 1
  88. else:
  89. output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
  90. expected_output = 0
  91. self.assertEqual(len(output), expected_output,
  92. "%s command is not honoring routers" % sql_command.__name__)