123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- from __future__ import unicode_literals
- import re
- import unittest
- from django.apps import apps
- from django.core.management.color import no_style
- from django.core.management.sql import (sql_create, sql_delete, sql_indexes,
- sql_destroy_indexes, sql_all)
- from django.db import connections, DEFAULT_DB_ALIAS
- from django.test import TestCase, ignore_warnings, override_settings
- from django.utils import six
- from django.utils.deprecation import RemovedInDjango20Warning
- # See also initial_sql_regress for 'custom_sql_for_model' tests
- class SQLCommandsTestCase(TestCase):
- """Tests for several functions in django/core/management/sql.py"""
- def count_ddl(self, output, cmd):
- return len([o for o in output if o.startswith(cmd)])
- def test_sql_create(self):
- app_config = apps.get_app_config('commands_sql')
- output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- tables = set()
- create_table_re = re.compile(r'^create table .(?P<table>[\w_]+).*', re.IGNORECASE)
- reference_re = re.compile(r'.* references .(?P<table>[\w_]+).*', re.IGNORECASE)
- for statement in output:
- create_table = create_table_re.match(statement)
- if create_table:
- # Lower since Oracle's table names are upper cased.
- tables.add(create_table.group('table').lower())
- continue
- reference = reference_re.match(statement)
- if reference:
- # Lower since Oracle's table names are upper cased.
- table = reference.group('table').lower()
- self.assertIn(
- table, tables, "The table %s is referenced before its creation." % table
- )
- self.assertEqual(tables, {
- 'commands_sql_comment', 'commands_sql_book', 'commands_sql_book_comments'
- })
- @unittest.skipUnless('PositiveIntegerField' in connections[DEFAULT_DB_ALIAS].data_type_check_constraints, 'Backend does not have checks.')
- def test_sql_create_check(self):
- """Regression test for #23416 -- Check that db_params['check'] is respected."""
- app_config = apps.get_app_config('commands_sql')
- output = sql_create(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- success = False
- for statement in output:
- if 'CHECK' in statement:
- success = True
- if not success:
- self.fail("'CHECK' not found in output %s" % output)
- def test_sql_delete(self):
- app_config = apps.get_app_config('commands_sql')
- output = sql_delete(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
- drop_tables = [o for o in output if o.startswith('DROP TABLE')]
- self.assertEqual(len(drop_tables), 3)
- # Lower so that Oracle's upper case tbl names wont break
- sql = drop_tables[-1].lower()
- six.assertRegex(self, sql, r'^drop table .commands_sql_comment.*')
- @ignore_warnings(category=RemovedInDjango20Warning)
- def test_sql_indexes(self):
- app_config = apps.get_app_config('commands_sql')
- output = sql_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- # Number of indexes is backend-dependent
- self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
- def test_sql_destroy_indexes(self):
- app_config = apps.get_app_config('commands_sql')
- output = sql_destroy_indexes(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- # Number of indexes is backend-dependent
- self.assertTrue(1 <= self.count_ddl(output, 'DROP INDEX') <= 4)
- @ignore_warnings(category=RemovedInDjango20Warning)
- def test_sql_all(self):
- app_config = apps.get_app_config('commands_sql')
- output = sql_all(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- self.assertEqual(self.count_ddl(output, 'CREATE TABLE'), 3)
- # Number of indexes is backend-dependent
- self.assertTrue(1 <= self.count_ddl(output, 'CREATE INDEX') <= 4)
- class TestRouter(object):
- def allow_migrate(self, db, model):
- return False
- @override_settings(DATABASE_ROUTERS=[TestRouter()])
- class SQLCommandsRouterTestCase(TestCase):
- def test_router_honored(self):
- app_config = apps.get_app_config('commands_sql')
- for sql_command in (sql_all, sql_create, sql_delete, sql_indexes, sql_destroy_indexes):
- if sql_command is sql_delete:
- output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS], close_connection=False)
- # "App creates no tables in the database. Nothing to do."
- expected_output = 1
- else:
- output = sql_command(app_config, no_style(), connections[DEFAULT_DB_ALIAS])
- expected_output = 0
- self.assertEqual(len(output), expected_output,
- "%s command is not honoring routers" % sql_command.__name__)
|