|
@@ -0,0 +1,289 @@
|
|
|
+import os
|
|
|
+from optparse import make_option
|
|
|
+
|
|
|
+from django.conf import settings
|
|
|
+from django.core.exceptions import ImproperlyConfigured
|
|
|
+from django.test import TestCase
|
|
|
+from django.test.utils import setup_test_environment, teardown_test_environment
|
|
|
+from django.utils import unittest
|
|
|
+from django.utils.unittest import TestSuite, defaultTestLoader
|
|
|
+
|
|
|
+
|
|
|
+class DiscoverRunner(object):
|
|
|
+ """
|
|
|
+ A Django test runner that uses unittest2 test discovery.
|
|
|
+ """
|
|
|
+
|
|
|
+ test_loader = defaultTestLoader
|
|
|
+ reorder_by = (TestCase, )
|
|
|
+ option_list = (
|
|
|
+ make_option('-t', '--top-level-directory',
|
|
|
+ action='store', dest='top_level', default=None,
|
|
|
+ help='Top level of project for unittest discovery.'),
|
|
|
+ make_option('-p', '--pattern', action='store', dest='pattern',
|
|
|
+ default="test*.py",
|
|
|
+ help='The test matching pattern. Defaults to test*.py.'),
|
|
|
+ )
|
|
|
+
|
|
|
+ def __init__(self, pattern=None, top_level=None,
|
|
|
+ verbosity=1, interactive=True, failfast=False,
|
|
|
+ **kwargs):
|
|
|
+
|
|
|
+ self.pattern = pattern
|
|
|
+ self.top_level = top_level
|
|
|
+
|
|
|
+ self.verbosity = verbosity
|
|
|
+ self.interactive = interactive
|
|
|
+ self.failfast = failfast
|
|
|
+
|
|
|
+ def setup_test_environment(self, **kwargs):
|
|
|
+ setup_test_environment()
|
|
|
+ settings.DEBUG = False
|
|
|
+ unittest.installHandler()
|
|
|
+
|
|
|
+ def build_suite(self, test_labels=None, extra_tests=None, **kwargs):
|
|
|
+ suite = TestSuite()
|
|
|
+ test_labels = test_labels or ['.']
|
|
|
+ extra_tests = extra_tests or []
|
|
|
+
|
|
|
+ discover_kwargs = {}
|
|
|
+ if self.pattern is not None:
|
|
|
+ discover_kwargs['pattern'] = self.pattern
|
|
|
+ if self.top_level is not None:
|
|
|
+ discover_kwargs['top_level_dir'] = self.top_level
|
|
|
+
|
|
|
+ for label in test_labels:
|
|
|
+ kwargs = discover_kwargs.copy()
|
|
|
+ tests = None
|
|
|
+
|
|
|
+ label_as_path = os.path.abspath(label)
|
|
|
+
|
|
|
+ # if a module, or "module.ClassName[.method_name]", just run those
|
|
|
+ if not os.path.exists(label_as_path):
|
|
|
+ tests = self.test_loader.loadTestsFromName(label)
|
|
|
+ elif os.path.isdir(label_as_path) and not self.top_level:
|
|
|
+ # Try to be a bit smarter than unittest about finding the
|
|
|
+ # default top-level for a given directory path, to avoid
|
|
|
+ # breaking relative imports. (Unittest's default is to set
|
|
|
+ # top-level equal to the path, which means relative imports
|
|
|
+ # will result in "Attempted relative import in non-package.").
|
|
|
+
|
|
|
+ # We'd be happy to skip this and require dotted module paths
|
|
|
+ # (which don't cause this problem) instead of file paths (which
|
|
|
+ # do), but in the case of a directory in the cwd, which would
|
|
|
+ # be equally valid if considered as a top-level module or as a
|
|
|
+ # directory path, unittest unfortunately prefers the latter.
|
|
|
+
|
|
|
+ top_level = label_as_path
|
|
|
+ while True:
|
|
|
+ init_py = os.path.join(top_level, '__init__.py')
|
|
|
+ if os.path.exists(init_py):
|
|
|
+ try_next = os.path.dirname(top_level)
|
|
|
+ if try_next == top_level:
|
|
|
+ # __init__.py all the way down? give up.
|
|
|
+ break
|
|
|
+ top_level = try_next
|
|
|
+ continue
|
|
|
+ break
|
|
|
+ kwargs['top_level_dir'] = top_level
|
|
|
+
|
|
|
+
|
|
|
+ if not (tests and tests.countTestCases()):
|
|
|
+ # if no tests found, it's probably a package; try discovery
|
|
|
+ tests = self.test_loader.discover(start_dir=label, **kwargs)
|
|
|
+
|
|
|
+ # make unittest forget the top-level dir it calculated from this
|
|
|
+ # run, to support running tests from two different top-levels.
|
|
|
+ self.test_loader._top_level_dir = None
|
|
|
+
|
|
|
+ suite.addTests(tests)
|
|
|
+
|
|
|
+ for test in extra_tests:
|
|
|
+ suite.addTest(test)
|
|
|
+
|
|
|
+ return reorder_suite(suite, self.reorder_by)
|
|
|
+
|
|
|
+ def setup_databases(self, **kwargs):
|
|
|
+ return setup_databases(self.verbosity, self.interactive, **kwargs)
|
|
|
+
|
|
|
+ def run_suite(self, suite, **kwargs):
|
|
|
+ return unittest.TextTestRunner(
|
|
|
+ verbosity=self.verbosity,
|
|
|
+ failfast=self.failfast,
|
|
|
+ ).run(suite)
|
|
|
+
|
|
|
+ def teardown_databases(self, old_config, **kwargs):
|
|
|
+ """
|
|
|
+ Destroys all the non-mirror databases.
|
|
|
+ """
|
|
|
+ old_names, mirrors = old_config
|
|
|
+ for connection, old_name, destroy in old_names:
|
|
|
+ if destroy:
|
|
|
+ connection.creation.destroy_test_db(old_name, self.verbosity)
|
|
|
+
|
|
|
+ def teardown_test_environment(self, **kwargs):
|
|
|
+ unittest.removeHandler()
|
|
|
+ teardown_test_environment()
|
|
|
+
|
|
|
+ def suite_result(self, suite, result, **kwargs):
|
|
|
+ return len(result.failures) + len(result.errors)
|
|
|
+
|
|
|
+ def run_tests(self, test_labels, extra_tests=None, **kwargs):
|
|
|
+ """
|
|
|
+ Run the unit tests for all the test labels in the provided list.
|
|
|
+
|
|
|
+ Test labels should be dotted Python paths to test modules, test
|
|
|
+ classes, or test methods.
|
|
|
+
|
|
|
+ A list of 'extra' tests may also be provided; these tests
|
|
|
+ will be added to the test suite.
|
|
|
+
|
|
|
+ Returns the number of tests that failed.
|
|
|
+ """
|
|
|
+ self.setup_test_environment()
|
|
|
+ suite = self.build_suite(test_labels, extra_tests)
|
|
|
+ old_config = self.setup_databases()
|
|
|
+ result = self.run_suite(suite)
|
|
|
+ self.teardown_databases(old_config)
|
|
|
+ self.teardown_test_environment()
|
|
|
+ return self.suite_result(suite, result)
|
|
|
+
|
|
|
+
|
|
|
+def dependency_ordered(test_databases, dependencies):
|
|
|
+ """
|
|
|
+ Reorder test_databases into an order that honors the dependencies
|
|
|
+ described in TEST_DEPENDENCIES.
|
|
|
+ """
|
|
|
+ ordered_test_databases = []
|
|
|
+ resolved_databases = set()
|
|
|
+
|
|
|
+ # Maps db signature to dependencies of all it's aliases
|
|
|
+ dependencies_map = {}
|
|
|
+
|
|
|
+ # sanity check - no DB can depend on it's own alias
|
|
|
+ for sig, (_, aliases) in test_databases:
|
|
|
+ all_deps = set()
|
|
|
+ for alias in aliases:
|
|
|
+ all_deps.update(dependencies.get(alias, []))
|
|
|
+ if not all_deps.isdisjoint(aliases):
|
|
|
+ raise ImproperlyConfigured(
|
|
|
+ "Circular dependency: databases %r depend on each other, "
|
|
|
+ "but are aliases." % aliases)
|
|
|
+ dependencies_map[sig] = all_deps
|
|
|
+
|
|
|
+ while test_databases:
|
|
|
+ changed = False
|
|
|
+ deferred = []
|
|
|
+
|
|
|
+ # Try to find a DB that has all it's dependencies met
|
|
|
+ for signature, (db_name, aliases) in test_databases:
|
|
|
+ if dependencies_map[signature].issubset(resolved_databases):
|
|
|
+ resolved_databases.update(aliases)
|
|
|
+ ordered_test_databases.append((signature, (db_name, aliases)))
|
|
|
+ changed = True
|
|
|
+ else:
|
|
|
+ deferred.append((signature, (db_name, aliases)))
|
|
|
+
|
|
|
+ if not changed:
|
|
|
+ raise ImproperlyConfigured(
|
|
|
+ "Circular dependency in TEST_DEPENDENCIES")
|
|
|
+ test_databases = deferred
|
|
|
+ return ordered_test_databases
|
|
|
+
|
|
|
+
|
|
|
+def reorder_suite(suite, classes):
|
|
|
+ """
|
|
|
+ Reorders a test suite by test type.
|
|
|
+
|
|
|
+ `classes` is a sequence of types
|
|
|
+
|
|
|
+ All tests of type classes[0] are placed first, then tests of type
|
|
|
+ classes[1], etc. Tests with no match in classes are placed last.
|
|
|
+ """
|
|
|
+ class_count = len(classes)
|
|
|
+ bins = [unittest.TestSuite() for i in range(class_count+1)]
|
|
|
+ partition_suite(suite, classes, bins)
|
|
|
+ for i in range(class_count):
|
|
|
+ bins[0].addTests(bins[i+1])
|
|
|
+ return bins[0]
|
|
|
+
|
|
|
+
|
|
|
+def partition_suite(suite, classes, bins):
|
|
|
+ """
|
|
|
+ Partitions a test suite by test type.
|
|
|
+
|
|
|
+ classes is a sequence of types
|
|
|
+ bins is a sequence of TestSuites, one more than classes
|
|
|
+
|
|
|
+ Tests of type classes[i] are added to bins[i],
|
|
|
+ tests with no match found in classes are place in bins[-1]
|
|
|
+ """
|
|
|
+ for test in suite:
|
|
|
+ if isinstance(test, unittest.TestSuite):
|
|
|
+ partition_suite(test, classes, bins)
|
|
|
+ else:
|
|
|
+ for i in range(len(classes)):
|
|
|
+ if isinstance(test, classes[i]):
|
|
|
+ bins[i].addTest(test)
|
|
|
+ break
|
|
|
+ else:
|
|
|
+ bins[-1].addTest(test)
|
|
|
+
|
|
|
+
|
|
|
+def setup_databases(verbosity, interactive, **kwargs):
|
|
|
+ from django.db import connections, DEFAULT_DB_ALIAS
|
|
|
+
|
|
|
+ # First pass -- work out which databases actually need to be created,
|
|
|
+ # and which ones are test mirrors or duplicate entries in DATABASES
|
|
|
+ mirrored_aliases = {}
|
|
|
+ test_databases = {}
|
|
|
+ dependencies = {}
|
|
|
+ for alias in connections:
|
|
|
+ connection = connections[alias]
|
|
|
+ if connection.settings_dict['TEST_MIRROR']:
|
|
|
+ # If the database is marked as a test mirror, save
|
|
|
+ # the alias.
|
|
|
+ mirrored_aliases[alias] = (
|
|
|
+ connection.settings_dict['TEST_MIRROR'])
|
|
|
+ else:
|
|
|
+ # Store a tuple with DB parameters that uniquely identify it.
|
|
|
+ # If we have two aliases with the same values for that tuple,
|
|
|
+ # we only need to create the test database once.
|
|
|
+ item = test_databases.setdefault(
|
|
|
+ connection.creation.test_db_signature(),
|
|
|
+ (connection.settings_dict['NAME'], set())
|
|
|
+ )
|
|
|
+ item[1].add(alias)
|
|
|
+
|
|
|
+ if 'TEST_DEPENDENCIES' in connection.settings_dict:
|
|
|
+ dependencies[alias] = (
|
|
|
+ connection.settings_dict['TEST_DEPENDENCIES'])
|
|
|
+ else:
|
|
|
+ if alias != DEFAULT_DB_ALIAS:
|
|
|
+ dependencies[alias] = connection.settings_dict.get(
|
|
|
+ 'TEST_DEPENDENCIES', [DEFAULT_DB_ALIAS])
|
|
|
+
|
|
|
+ # Second pass -- actually create the databases.
|
|
|
+ old_names = []
|
|
|
+ mirrors = []
|
|
|
+
|
|
|
+ for signature, (db_name, aliases) in dependency_ordered(
|
|
|
+ test_databases.items(), dependencies):
|
|
|
+ test_db_name = None
|
|
|
+ # Actually create the database for the first connection
|
|
|
+
|
|
|
+ for alias in aliases:
|
|
|
+ connection = connections[alias]
|
|
|
+ old_names.append((connection, db_name, True))
|
|
|
+ if test_db_name is None:
|
|
|
+ test_db_name = connection.creation.create_test_db(
|
|
|
+ verbosity, autoclobber=not interactive)
|
|
|
+ else:
|
|
|
+ connection.settings_dict['NAME'] = test_db_name
|
|
|
+
|
|
|
+ for alias, mirror_alias in mirrored_aliases.items():
|
|
|
+ mirrors.append((alias, connections[alias].settings_dict['NAME']))
|
|
|
+ connections[alias].settings_dict['NAME'] = (
|
|
|
+ connections[mirror_alias].settings_dict['NAME'])
|
|
|
+
|
|
|
+ return old_names, mirrors
|