|
@@ -1,257 +1,279 @@
|
|
|
-import gettext
|
|
|
+import contextlib
|
|
|
import os
|
|
|
+import py_compile
|
|
|
import shutil
|
|
|
+import sys
|
|
|
import tempfile
|
|
|
+import threading
|
|
|
+import time
|
|
|
+import zipfile
|
|
|
from importlib import import_module
|
|
|
-from unittest import mock
|
|
|
+from pathlib import Path
|
|
|
+from unittest import mock, skip
|
|
|
|
|
|
-import _thread
|
|
|
-
|
|
|
-from django import conf
|
|
|
-from django.contrib import admin
|
|
|
-from django.test import SimpleTestCase, override_settings
|
|
|
+from django.apps.registry import Apps
|
|
|
+from django.test import SimpleTestCase
|
|
|
from django.test.utils import extend_sys_path
|
|
|
from django.utils import autoreload
|
|
|
-from django.utils.translation import trans_real
|
|
|
-
|
|
|
-LOCALE_PATH = os.path.join(os.path.dirname(__file__), 'locale')
|
|
|
+from django.utils.autoreload import WatchmanUnavailable
|
|
|
|
|
|
|
|
|
-class TestFilenameGenerator(SimpleTestCase):
|
|
|
+class TestIterModulesAndFiles(SimpleTestCase):
|
|
|
+ def import_and_cleanup(self, name):
|
|
|
+ import_module(name)
|
|
|
+ self.addCleanup(lambda: sys.path_importer_cache.clear())
|
|
|
+ self.addCleanup(lambda: sys.modules.pop(name, None))
|
|
|
|
|
|
def clear_autoreload_caches(self):
|
|
|
- autoreload._cached_modules = set()
|
|
|
- autoreload._cached_filenames = []
|
|
|
+ autoreload.iter_modules_and_files.cache_clear()
|
|
|
|
|
|
def assertFileFound(self, filename):
|
|
|
+
|
|
|
+
|
|
|
+ resolved_filename = filename.resolve()
|
|
|
self.clear_autoreload_caches()
|
|
|
|
|
|
- self.assertIn(filename, autoreload.gen_filenames())
|
|
|
+ self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
|
|
|
|
- self.assertIn(filename, autoreload.gen_filenames())
|
|
|
+ self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
|
+ self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
|
|
|
|
|
|
def assertFileNotFound(self, filename):
|
|
|
+ resolved_filename = filename.resolve()
|
|
|
self.clear_autoreload_caches()
|
|
|
|
|
|
- self.assertNotIn(filename, autoreload.gen_filenames())
|
|
|
-
|
|
|
- self.assertNotIn(filename, autoreload.gen_filenames())
|
|
|
-
|
|
|
- def assertFileFoundOnlyNew(self, filename):
|
|
|
- self.clear_autoreload_caches()
|
|
|
-
|
|
|
- self.assertIn(filename, autoreload.gen_filenames(only_new=True))
|
|
|
+ self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
|
|
|
|
- self.assertNotIn(filename, autoreload.gen_filenames(only_new=True))
|
|
|
+ self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
|
|
|
+ self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
|
|
|
|
|
|
- def test_django_locales(self):
|
|
|
- """
|
|
|
- gen_filenames() yields the built-in Django locale files.
|
|
|
- """
|
|
|
- django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale')
|
|
|
- django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo')
|
|
|
- self.assertFileFound(django_mo)
|
|
|
-
|
|
|
- @override_settings(LOCALE_PATHS=[LOCALE_PATH])
|
|
|
- def test_locale_paths_setting(self):
|
|
|
- """
|
|
|
- gen_filenames also yields from LOCALE_PATHS locales.
|
|
|
- """
|
|
|
- locale_paths_mo = os.path.join(LOCALE_PATH, 'nl', 'LC_MESSAGES', 'django.mo')
|
|
|
- self.assertFileFound(locale_paths_mo)
|
|
|
-
|
|
|
- @override_settings(INSTALLED_APPS=[])
|
|
|
- def test_project_root_locale(self):
|
|
|
- """
|
|
|
- gen_filenames() also yields from the current directory (project root).
|
|
|
- """
|
|
|
- old_cwd = os.getcwd()
|
|
|
- os.chdir(os.path.dirname(__file__))
|
|
|
- current_dir = os.path.join(os.path.dirname(__file__), 'locale')
|
|
|
- current_dir_mo = os.path.join(current_dir, 'nl', 'LC_MESSAGES', 'django.mo')
|
|
|
- try:
|
|
|
- self.assertFileFound(current_dir_mo)
|
|
|
- finally:
|
|
|
- os.chdir(old_cwd)
|
|
|
-
|
|
|
- @override_settings(INSTALLED_APPS=['django.contrib.admin'])
|
|
|
- def test_app_locales(self):
|
|
|
- """
|
|
|
- gen_filenames() also yields from locale dirs in installed apps.
|
|
|
- """
|
|
|
- admin_dir = os.path.join(os.path.dirname(admin.__file__), 'locale')
|
|
|
- admin_mo = os.path.join(admin_dir, 'nl', 'LC_MESSAGES', 'django.mo')
|
|
|
- self.assertFileFound(admin_mo)
|
|
|
-
|
|
|
- @override_settings(USE_I18N=False)
|
|
|
- def test_no_i18n(self):
|
|
|
- """
|
|
|
- If i18n machinery is disabled, there is no need for watching the
|
|
|
- locale files.
|
|
|
- """
|
|
|
- django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale')
|
|
|
- django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo')
|
|
|
- self.assertFileNotFound(django_mo)
|
|
|
-
|
|
|
- def test_paths_are_native_strings(self):
|
|
|
- for filename in autoreload.gen_filenames():
|
|
|
- self.assertIsInstance(filename, str)
|
|
|
-
|
|
|
- def test_only_new_files(self):
|
|
|
- """
|
|
|
- When calling a second time gen_filenames with only_new = True, only
|
|
|
- files from newly loaded modules should be given.
|
|
|
- """
|
|
|
+ def temporary_file(self, filename):
|
|
|
dirname = tempfile.mkdtemp()
|
|
|
- filename = os.path.join(dirname, 'test_only_new_module.py')
|
|
|
self.addCleanup(shutil.rmtree, dirname)
|
|
|
- with open(filename, 'w'):
|
|
|
- pass
|
|
|
+ return Path(dirname) / filename
|
|
|
|
|
|
-
|
|
|
- self.clear_autoreload_caches()
|
|
|
- filenames = set(autoreload.gen_filenames(only_new=True))
|
|
|
- filenames_reference = set(autoreload.gen_filenames())
|
|
|
- self.assertEqual(filenames, filenames_reference)
|
|
|
-
|
|
|
-
|
|
|
- filenames = set(autoreload.gen_filenames(only_new=True))
|
|
|
- self.assertEqual(filenames, set())
|
|
|
+ def test_paths_are_pathlib_instances(self):
|
|
|
+ for filename in autoreload.iter_all_python_module_files():
|
|
|
+ self.assertIsInstance(filename, Path)
|
|
|
|
|
|
-
|
|
|
- with extend_sys_path(dirname):
|
|
|
- import_module('test_only_new_module')
|
|
|
- filenames = set(autoreload.gen_filenames(only_new=True))
|
|
|
- self.assertEqual(filenames, {filename})
|
|
|
-
|
|
|
- def test_deleted_removed(self):
|
|
|
+ def test_file_added(self):
|
|
|
"""
|
|
|
- When a file is deleted, gen_filenames() no longer returns it.
|
|
|
+ When a file is added, it's returned by iter_all_python_module_files().
|
|
|
"""
|
|
|
- dirname = tempfile.mkdtemp()
|
|
|
- filename = os.path.join(dirname, 'test_deleted_removed_module.py')
|
|
|
- self.addCleanup(shutil.rmtree, dirname)
|
|
|
- with open(filename, 'w'):
|
|
|
- pass
|
|
|
+ filename = self.temporary_file('test_deleted_removed_module.py')
|
|
|
+ filename.touch()
|
|
|
|
|
|
- with extend_sys_path(dirname):
|
|
|
- import_module('test_deleted_removed_module')
|
|
|
- self.assertFileFound(filename)
|
|
|
+ with extend_sys_path(str(filename.parent)):
|
|
|
+ self.import_and_cleanup('test_deleted_removed_module')
|
|
|
|
|
|
- os.unlink(filename)
|
|
|
- self.assertFileNotFound(filename)
|
|
|
+ self.assertFileFound(filename.absolute())
|
|
|
|
|
|
def test_check_errors(self):
|
|
|
"""
|
|
|
When a file containing an error is imported in a function wrapped by
|
|
|
check_errors(), gen_filenames() returns it.
|
|
|
"""
|
|
|
- dirname = tempfile.mkdtemp()
|
|
|
- filename = os.path.join(dirname, 'test_syntax_error.py')
|
|
|
- self.addCleanup(shutil.rmtree, dirname)
|
|
|
- with open(filename, 'w') as f:
|
|
|
- f.write("Ceci n'est pas du Python.")
|
|
|
+ filename = self.temporary_file('test_syntax_error.py')
|
|
|
+ filename.write_text("Ceci n'est pas du Python.")
|
|
|
|
|
|
- with extend_sys_path(dirname):
|
|
|
+ with extend_sys_path(str(filename.parent)):
|
|
|
with self.assertRaises(SyntaxError):
|
|
|
autoreload.check_errors(import_module)('test_syntax_error')
|
|
|
self.assertFileFound(filename)
|
|
|
|
|
|
- def test_check_errors_only_new(self):
|
|
|
- """
|
|
|
- When a file containing an error is imported in a function wrapped by
|
|
|
- check_errors(), gen_filenames(only_new=True) returns it.
|
|
|
- """
|
|
|
- dirname = tempfile.mkdtemp()
|
|
|
- filename = os.path.join(dirname, 'test_syntax_error.py')
|
|
|
- self.addCleanup(shutil.rmtree, dirname)
|
|
|
- with open(filename, 'w') as f:
|
|
|
- f.write("Ceci n'est pas du Python.")
|
|
|
-
|
|
|
- with extend_sys_path(dirname):
|
|
|
- with self.assertRaises(SyntaxError):
|
|
|
- autoreload.check_errors(import_module)('test_syntax_error')
|
|
|
- self.assertFileFoundOnlyNew(filename)
|
|
|
-
|
|
|
def test_check_errors_catches_all_exceptions(self):
|
|
|
"""
|
|
|
Since Python may raise arbitrary exceptions when importing code,
|
|
|
check_errors() must catch Exception, not just some subclasses.
|
|
|
"""
|
|
|
- dirname = tempfile.mkdtemp()
|
|
|
- filename = os.path.join(dirname, 'test_exception.py')
|
|
|
- self.addCleanup(shutil.rmtree, dirname)
|
|
|
- with open(filename, 'w') as f:
|
|
|
- f.write("raise Exception")
|
|
|
-
|
|
|
- with extend_sys_path(dirname):
|
|
|
+ filename = self.temporary_file('test_exception.py')
|
|
|
+ filename.write_text('raise Exception')
|
|
|
+ with extend_sys_path(str(filename.parent)):
|
|
|
with self.assertRaises(Exception):
|
|
|
autoreload.check_errors(import_module)('test_exception')
|
|
|
self.assertFileFound(filename)
|
|
|
|
|
|
-
|
|
|
-class CleanFilesTests(SimpleTestCase):
|
|
|
- TEST_MAP = {
|
|
|
-
|
|
|
- 'falsies': ([None, False], []),
|
|
|
- 'pycs': (['myfile.pyc'], ['myfile.py']),
|
|
|
- 'pyos': (['myfile.pyo'], ['myfile.py']),
|
|
|
- '$py.class': (['myclass$py.class'], ['myclass.py']),
|
|
|
- 'combined': (
|
|
|
- [None, 'file1.pyo', 'file2.pyc', 'myclass$py.class'],
|
|
|
- ['file1.py', 'file2.py', 'myclass.py'],
|
|
|
- )
|
|
|
- }
|
|
|
-
|
|
|
- def _run_tests(self, mock_files_exist=True):
|
|
|
- with mock.patch('django.utils.autoreload.os.path.exists', return_value=mock_files_exist):
|
|
|
- for description, values in self.TEST_MAP.items():
|
|
|
- filenames, expected_returned_filenames = values
|
|
|
- self.assertEqual(
|
|
|
- autoreload.clean_files(filenames),
|
|
|
- expected_returned_filenames if mock_files_exist else [],
|
|
|
- msg='{} failed for input file list: {}; returned file list: {}'.format(
|
|
|
- description, filenames, expected_returned_filenames
|
|
|
- ),
|
|
|
- )
|
|
|
-
|
|
|
- def test_files_exist(self):
|
|
|
+ def test_zip_reload(self):
|
|
|
"""
|
|
|
- If the file exists, any compiled files (pyc, pyo, $py.class) are
|
|
|
- transformed as their source files.
|
|
|
- """
|
|
|
- self._run_tests()
|
|
|
-
|
|
|
- def test_files_do_not_exist(self):
|
|
|
+ Modules imported from zipped files have their archive location included
|
|
|
+ in the result.
|
|
|
"""
|
|
|
- If the files don't exist, they aren't in the returned file list.
|
|
|
- """
|
|
|
- self._run_tests(mock_files_exist=False)
|
|
|
-
|
|
|
+ zip_file = self.temporary_file('zip_import.zip')
|
|
|
+ with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf:
|
|
|
+ zipf.writestr('test_zipped_file.py', '')
|
|
|
+
|
|
|
+ with extend_sys_path(str(zip_file)):
|
|
|
+ self.import_and_cleanup('test_zipped_file')
|
|
|
+ self.assertFileFound(zip_file)
|
|
|
+
|
|
|
+ def test_bytecode_conversion_to_source(self):
|
|
|
+ """.pyc and .pyo files are included in the files list."""
|
|
|
+ filename = self.temporary_file('test_compiled.py')
|
|
|
+ filename.touch()
|
|
|
+ compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc'))))
|
|
|
+ filename.unlink()
|
|
|
+ with extend_sys_path(str(compiled_file.parent)):
|
|
|
+ self.import_and_cleanup('test_compiled')
|
|
|
+ self.assertFileFound(compiled_file)
|
|
|
+
|
|
|
+
|
|
|
+class TestCommonRoots(SimpleTestCase):
|
|
|
+ def test_common_roots(self):
|
|
|
+ paths = (
|
|
|
+ Path('/first/second'),
|
|
|
+ Path('/first/second/third'),
|
|
|
+ Path('/first/'),
|
|
|
+ Path('/root/first/'),
|
|
|
+ )
|
|
|
+ results = autoreload.common_roots(paths)
|
|
|
+ self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')])
|
|
|
|
|
|
-class ResetTranslationsTests(SimpleTestCase):
|
|
|
|
|
|
+class TestSysPathDirectories(SimpleTestCase):
|
|
|
def setUp(self):
|
|
|
- self.gettext_translations = gettext._translations.copy()
|
|
|
- self.trans_real_translations = trans_real._translations.copy()
|
|
|
+ self._directory = tempfile.TemporaryDirectory()
|
|
|
+ self.directory = Path(self._directory.name).resolve().absolute()
|
|
|
+ self.file = self.directory / 'test'
|
|
|
+ self.file.touch()
|
|
|
|
|
|
def tearDown(self):
|
|
|
- gettext._translations = self.gettext_translations
|
|
|
- trans_real._translations = self.trans_real_translations
|
|
|
+ self._directory.cleanup()
|
|
|
+
|
|
|
+ def test_sys_paths_with_directories(self):
|
|
|
+ with extend_sys_path(str(self.file)):
|
|
|
+ paths = list(autoreload.sys_path_directories())
|
|
|
+ self.assertIn(self.file.parent, paths)
|
|
|
+
|
|
|
+ def test_sys_paths_non_existing(self):
|
|
|
+ nonexistant_file = Path(self.directory.name) / 'does_not_exist'
|
|
|
+ with extend_sys_path(str(nonexistant_file)):
|
|
|
+ paths = list(autoreload.sys_path_directories())
|
|
|
+ self.assertNotIn(nonexistant_file, paths)
|
|
|
+ self.assertNotIn(nonexistant_file.parent, paths)
|
|
|
+
|
|
|
+ def test_sys_paths_absolute(self):
|
|
|
+ paths = list(autoreload.sys_path_directories())
|
|
|
+ self.assertTrue(all(p.is_absolute() for p in paths))
|
|
|
+
|
|
|
+ def test_sys_paths_directories(self):
|
|
|
+ with extend_sys_path(str(self.directory)):
|
|
|
+ paths = list(autoreload.sys_path_directories())
|
|
|
+ self.assertIn(self.directory, paths)
|
|
|
+
|
|
|
+
|
|
|
+class GetReloaderTests(SimpleTestCase):
|
|
|
+ @mock.patch('django.utils.autoreload.WatchmanReloader')
|
|
|
+ def test_watchman_unavailable(self, mocked_watchman):
|
|
|
+ mocked_watchman.check_availability.side_effect = WatchmanUnavailable
|
|
|
+ self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
|
|
|
+
|
|
|
+ @mock.patch.object(autoreload.WatchmanReloader, 'check_availability')
|
|
|
+ def test_watchman_available(self, mocked_available):
|
|
|
+
|
|
|
+ mocked_available.return_value = None
|
|
|
+ result = autoreload.get_reloader()
|
|
|
+ self.assertIsInstance(result, autoreload.WatchmanReloader)
|
|
|
+
|
|
|
+
|
|
|
+class RunWithReloaderTests(SimpleTestCase):
|
|
|
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
|
|
|
+ @mock.patch('django.utils.autoreload.get_reloader')
|
|
|
+ def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
|
|
|
+ mocked_get_reloader.side_effect = KeyboardInterrupt()
|
|
|
+ autoreload.run_with_reloader(lambda: None)
|
|
|
+
|
|
|
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'})
|
|
|
+ @mock.patch('django.utils.autoreload.restart_with_reloader')
|
|
|
+ def test_calls_sys_exit(self, mocked_restart_reloader):
|
|
|
+ mocked_restart_reloader.return_value = 1
|
|
|
+ with self.assertRaises(SystemExit) as exc:
|
|
|
+ autoreload.run_with_reloader(lambda: None)
|
|
|
+ self.assertEqual(exc.exception.code, 1)
|
|
|
+
|
|
|
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
|
|
|
+ @mock.patch('django.utils.autoreload.start_django')
|
|
|
+ @mock.patch('django.utils.autoreload.get_reloader')
|
|
|
+ def test_calls_start_django(self, mocked_reloader, mocked_start_django):
|
|
|
+ mocked_reloader.return_value = mock.sentinel.RELOADER
|
|
|
+ autoreload.run_with_reloader(mock.sentinel.METHOD)
|
|
|
+ self.assertEqual(mocked_start_django.call_count, 1)
|
|
|
+ self.assertSequenceEqual(
|
|
|
+ mocked_start_django.call_args[0],
|
|
|
+ [mock.sentinel.RELOADER, mock.sentinel.METHOD]
|
|
|
+ )
|
|
|
+
|
|
|
|
|
|
- def test_resets_gettext(self):
|
|
|
- gettext._translations = {'foo': 'bar'}
|
|
|
- autoreload.reset_translations()
|
|
|
- self.assertEqual(gettext._translations, {})
|
|
|
+class StartDjangoTests(SimpleTestCase):
|
|
|
+ @mock.patch('django.utils.autoreload.StatReloader')
|
|
|
+ def test_watchman_becomes_unavailable(self, mocked_stat):
|
|
|
+ mocked_stat.should_stop.return_value = True
|
|
|
+ fake_reloader = mock.MagicMock()
|
|
|
+ fake_reloader.should_stop = False
|
|
|
+ fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()
|
|
|
+
|
|
|
+ autoreload.start_django(fake_reloader, lambda: None)
|
|
|
+ self.assertEqual(mocked_stat.call_count, 1)
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.ensure_echo_on')
|
|
|
+ def test_echo_on_called(self, mocked_echo):
|
|
|
+ fake_reloader = mock.MagicMock()
|
|
|
+ autoreload.start_django(fake_reloader, lambda: None)
|
|
|
+ self.assertEqual(mocked_echo.call_count, 1)
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.check_errors')
|
|
|
+ def test_check_errors_called(self, mocked_check_errors):
|
|
|
+ fake_method = mock.MagicMock(return_value=None)
|
|
|
+ fake_reloader = mock.MagicMock()
|
|
|
+ autoreload.start_django(fake_reloader, fake_method)
|
|
|
+ self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
|
|
|
+
|
|
|
+ @mock.patch('threading.Thread')
|
|
|
+ @mock.patch('django.utils.autoreload.check_errors')
|
|
|
+ def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
|
|
|
+ fake_reloader = mock.MagicMock()
|
|
|
+ fake_main_func = mock.MagicMock()
|
|
|
+ fake_thread = mock.MagicMock()
|
|
|
+ mocked_check_errors.return_value = fake_main_func
|
|
|
+ mocked_thread.return_value = fake_thread
|
|
|
+ autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
|
|
|
+ self.assertEqual(mocked_thread.call_count, 1)
|
|
|
+ self.assertEqual(
|
|
|
+ mocked_thread.call_args[1],
|
|
|
+ {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}}
|
|
|
+ )
|
|
|
+ self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True])
|
|
|
+ self.assertTrue(fake_thread.start.called)
|
|
|
+
|
|
|
+
|
|
|
+class TestCheckErrors(SimpleTestCase):
|
|
|
+ def test_mutates_error_files(self):
|
|
|
+ fake_method = mock.MagicMock(side_effect=RuntimeError())
|
|
|
+ wrapped = autoreload.check_errors(fake_method)
|
|
|
+ with mock.patch.object(autoreload, '_error_files') as mocked_error_files:
|
|
|
+ with self.assertRaises(RuntimeError):
|
|
|
+ wrapped()
|
|
|
+ self.assertEqual(mocked_error_files.append.call_count, 1)
|
|
|
|
|
|
- def test_resets_trans_real(self):
|
|
|
- trans_real._translations = {'foo': 'bar'}
|
|
|
- trans_real._default = 1
|
|
|
- trans_real._active = False
|
|
|
- autoreload.reset_translations()
|
|
|
- self.assertEqual(trans_real._translations, {})
|
|
|
- self.assertIsNone(trans_real._default)
|
|
|
- self.assertIsInstance(trans_real._active, _thread._local)
|
|
|
+
|
|
|
+class TestRaiseLastException(SimpleTestCase):
|
|
|
+ @mock.patch('django.utils.autoreload._exception', None)
|
|
|
+ def test_no_exception(self):
|
|
|
+
|
|
|
+ autoreload.raise_last_exception()
|
|
|
+
|
|
|
+ def test_raises_exception(self):
|
|
|
+ class MyException(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+ try:
|
|
|
+ raise MyException('Test Message')
|
|
|
+ except MyException:
|
|
|
+ exc_info = sys.exc_info()
|
|
|
+
|
|
|
+ with mock.patch('django.utils.autoreload._exception', exc_info):
|
|
|
+ with self.assertRaises(MyException, msg='Test Message'):
|
|
|
+ autoreload.raise_last_exception()
|
|
|
|
|
|
|
|
|
class RestartWithReloaderTests(SimpleTestCase):
|
|
@@ -286,3 +308,363 @@ class RestartWithReloaderTests(SimpleTestCase):
|
|
|
autoreload.restart_with_reloader()
|
|
|
self.assertEqual(mock_call.call_count, 1)
|
|
|
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:])
|
|
|
+
|
|
|
+
|
|
|
+class ReloaderTests(SimpleTestCase):
|
|
|
+ RELOADER_CLS = None
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ self._tempdir = tempfile.TemporaryDirectory()
|
|
|
+ self.tempdir = Path(self._tempdir.name).resolve().absolute()
|
|
|
+ self.existing_file = self.ensure_file(self.tempdir / 'test.py')
|
|
|
+ self.nonexistant_file = (self.tempdir / 'does_not_exist.py').absolute()
|
|
|
+ self.reloader = self.RELOADER_CLS()
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ self._tempdir.cleanup()
|
|
|
+ self.reloader.stop()
|
|
|
+
|
|
|
+ def ensure_file(self, path):
|
|
|
+ path.parent.mkdir(exist_ok=True, parents=True)
|
|
|
+ path.touch()
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ self.set_mtime(path, time.time())
|
|
|
+ return path.absolute()
|
|
|
+
|
|
|
+ def set_mtime(self, fp, value):
|
|
|
+ os.utime(str(fp), (value, value))
|
|
|
+
|
|
|
+ def increment_mtime(self, fp, by=1):
|
|
|
+ current_time = time.time()
|
|
|
+ self.set_mtime(fp, current_time + by)
|
|
|
+
|
|
|
+ @contextlib.contextmanager
|
|
|
+ def tick_twice(self):
|
|
|
+ ticker = self.reloader.tick()
|
|
|
+ next(ticker)
|
|
|
+ yield
|
|
|
+ next(ticker)
|
|
|
+
|
|
|
+
|
|
|
+class IntegrationTests:
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_file(self, mocked_modules, notify_mock):
|
|
|
+ self.reloader.watch_file(self.existing_file)
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(self.existing_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_nonexistant_file(self, mocked_modules, notify_mock):
|
|
|
+ self.reloader.watch_file(self.nonexistant_file)
|
|
|
+ with self.tick_twice():
|
|
|
+ self.ensure_file(self.nonexistant_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [self.nonexistant_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_nonexistant_file_in_non_existing_directory(self, mocked_modules, notify_mock):
|
|
|
+ non_existing_directory = self.tempdir / 'non_existing_dir'
|
|
|
+ nonexistant_file = non_existing_directory / 'test'
|
|
|
+ self.reloader.watch_file(nonexistant_file)
|
|
|
+ with self.tick_twice():
|
|
|
+ self.ensure_file(nonexistant_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_glob(self, mocked_modules, notify_mock):
|
|
|
+ non_py_file = self.ensure_file(self.tempdir / 'non_py_file')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.py')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(non_py_file)
|
|
|
+ self.increment_mtime(self.existing_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_glob_non_existing_directory(self, mocked_modules, notify_mock):
|
|
|
+ non_existing_directory = self.tempdir / 'does_not_exist'
|
|
|
+ nonexistant_file = non_existing_directory / 'test.py'
|
|
|
+ self.reloader.watch_dir(non_existing_directory, '*.py')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.ensure_file(nonexistant_file)
|
|
|
+ self.set_mtime(nonexistant_file, time.time())
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_multiple_globs(self, mocked_modules, notify_mock):
|
|
|
+ self.ensure_file(self.tempdir / 'x.test')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.test')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(self.existing_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_overlapping_globs(self, mocked_modules, notify_mock):
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.p*')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(self.existing_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_glob_recursive(self, mocked_modules, notify_mock):
|
|
|
+ non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file')
|
|
|
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(non_py_file)
|
|
|
+ self.increment_mtime(py_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [py_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
|
|
|
+ non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt')
|
|
|
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.txt')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(non_py_file)
|
|
|
+ self.increment_mtime(py_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 2)
|
|
|
+ self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_nested_glob_recursive(self, mocked_modules, notify_mock):
|
|
|
+ inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
|
+ self.reloader.watch_dir(inner_py_file.parent, '**/*.py')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(inner_py_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
|
|
|
+
|
|
|
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
|
|
|
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
|
|
|
+ def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
|
|
|
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.p*')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.py*')
|
|
|
+ with self.tick_twice():
|
|
|
+ self.increment_mtime(py_file)
|
|
|
+ self.assertEqual(notify_mock.call_count, 1)
|
|
|
+ self.assertCountEqual(notify_mock.call_args[0], [py_file])
|
|
|
+
|
|
|
+
|
|
|
+class BaseReloaderTests(ReloaderTests):
|
|
|
+ RELOADER_CLS = autoreload.BaseReloader
|
|
|
+
|
|
|
+ def test_watch_without_absolute(self):
|
|
|
+ with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'):
|
|
|
+ self.reloader.watch_file('test.py')
|
|
|
+
|
|
|
+ def test_watch_with_single_file(self):
|
|
|
+ self.reloader.watch_file(self.existing_file)
|
|
|
+ watched_files = list(self.reloader.watched_files())
|
|
|
+ self.assertIn(self.existing_file, watched_files)
|
|
|
+
|
|
|
+ def test_watch_with_glob(self):
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.py')
|
|
|
+ watched_files = list(self.reloader.watched_files())
|
|
|
+ self.assertIn(self.existing_file, watched_files)
|
|
|
+
|
|
|
+ def test_watch_files_with_recursive_glob(self):
|
|
|
+ inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py')
|
|
|
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
|
|
|
+ watched_files = list(self.reloader.watched_files())
|
|
|
+ self.assertIn(self.existing_file, watched_files)
|
|
|
+ self.assertIn(inner_file, watched_files)
|
|
|
+
|
|
|
+ def test_run_loop_catches_stopiteration(self):
|
|
|
+ def mocked_tick():
|
|
|
+ yield
|
|
|
+
|
|
|
+ with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
|
|
|
+ self.reloader.run_loop()
|
|
|
+ self.assertEqual(tick.call_count, 1)
|
|
|
+
|
|
|
+ def test_run_loop_stop_and_return(self):
|
|
|
+ def mocked_tick(*args):
|
|
|
+ yield
|
|
|
+ self.reloader.stop()
|
|
|
+ return
|
|
|
+
|
|
|
+ with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
|
|
|
+ self.reloader.run_loop()
|
|
|
+
|
|
|
+ self.assertEqual(tick.call_count, 1)
|
|
|
+
|
|
|
+ def test_wait_for_apps_ready_checks_for_exception(self):
|
|
|
+ app_reg = Apps()
|
|
|
+ app_reg.ready_event.set()
|
|
|
+
|
|
|
+ dead_thread = threading.Thread()
|
|
|
+ self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
|
|
|
+
|
|
|
+ def test_wait_for_apps_ready_without_exception(self):
|
|
|
+ app_reg = Apps()
|
|
|
+ app_reg.ready_event.set()
|
|
|
+ thread = mock.MagicMock()
|
|
|
+ thread.is_alive.return_value = True
|
|
|
+ self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
|
|
|
+
|
|
|
+
|
|
|
+def skip_unless_watchman_available():
|
|
|
+ try:
|
|
|
+ autoreload.WatchmanReloader.check_availability()
|
|
|
+ except WatchmanUnavailable as e:
|
|
|
+ return skip('Watchman unavailable: %s' % e)
|
|
|
+ return lambda func: func
|
|
|
+
|
|
|
+
|
|
|
+@skip_unless_watchman_available()
|
|
|
+class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
|
|
|
+ RELOADER_CLS = autoreload.WatchmanReloader
|
|
|
+
|
|
|
+ def test_watch_glob_ignores_non_existing_directories_two_levels(self):
|
|
|
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
|
+ self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*'])
|
|
|
+ self.assertFalse(mocked_subscribe.called)
|
|
|
+
|
|
|
+ def test_watch_glob_uses_existing_parent_directories(self):
|
|
|
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
|
+ self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*'])
|
|
|
+ self.assertSequenceEqual(
|
|
|
+ mocked_subscribe.call_args[0],
|
|
|
+ [
|
|
|
+ self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir,
|
|
|
+ ['anyof', ['match', 'does_not_exist/*', 'wholename']]
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_watch_glob_multiple_patterns(self):
|
|
|
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
|
|
|
+ self.reloader._watch_glob(self.tempdir, ['*', '*.py'])
|
|
|
+ self.assertSequenceEqual(
|
|
|
+ mocked_subscribe.call_args[0],
|
|
|
+ [
|
|
|
+ self.tempdir, 'glob:%s' % self.tempdir,
|
|
|
+ ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']]
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_watched_roots_contains_files(self):
|
|
|
+ paths = self.reloader.watched_roots([self.existing_file])
|
|
|
+ self.assertIn(self.existing_file.parent, paths)
|
|
|
+
|
|
|
+ def test_watched_roots_contains_directory_globs(self):
|
|
|
+ self.reloader.watch_dir(self.tempdir, '*.py')
|
|
|
+ paths = self.reloader.watched_roots([])
|
|
|
+ self.assertIn(self.tempdir, paths)
|
|
|
+
|
|
|
+ def test_watched_roots_contains_sys_path(self):
|
|
|
+ with extend_sys_path(str(self.tempdir)):
|
|
|
+ paths = self.reloader.watched_roots([])
|
|
|
+ self.assertIn(self.tempdir, paths)
|
|
|
+
|
|
|
+ def test_check_server_status(self):
|
|
|
+ self.assertTrue(self.reloader.check_server_status())
|
|
|
+
|
|
|
+ def test_check_server_status_raises_error(self):
|
|
|
+ with mock.patch.object(self.reloader.client, 'query') as mocked_query:
|
|
|
+ mocked_query.side_effect = Exception()
|
|
|
+ with self.assertRaises(autoreload.WatchmanUnavailable):
|
|
|
+ self.reloader.check_server_status()
|
|
|
+
|
|
|
+ @mock.patch('pywatchman.client')
|
|
|
+ def test_check_availability(self, mocked_client):
|
|
|
+ mocked_client().capabilityCheck.side_effect = Exception()
|
|
|
+ with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'):
|
|
|
+ self.RELOADER_CLS.check_availability()
|
|
|
+
|
|
|
+ @mock.patch('pywatchman.client')
|
|
|
+ def test_check_availability_lower_version(self, mocked_client):
|
|
|
+ mocked_client().capabilityCheck.return_value = {'version': '4.8.10'}
|
|
|
+ with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'):
|
|
|
+ self.RELOADER_CLS.check_availability()
|
|
|
+
|
|
|
+ def test_pywatchman_not_available(self):
|
|
|
+ with mock.patch.object(autoreload, 'pywatchman') as mocked:
|
|
|
+ mocked.__bool__.return_value = False
|
|
|
+ with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'):
|
|
|
+ self.RELOADER_CLS.check_availability()
|
|
|
+
|
|
|
+ def test_update_watches_raises_exceptions(self):
|
|
|
+ class TestException(Exception):
|
|
|
+ pass
|
|
|
+
|
|
|
+ with mock.patch.object(self.reloader, '_update_watches') as mocked_watches:
|
|
|
+ with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status:
|
|
|
+ mocked_watches.side_effect = TestException()
|
|
|
+ mocked_server_status.return_value = True
|
|
|
+ with self.assertRaises(TestException):
|
|
|
+ self.reloader.update_watches()
|
|
|
+ self.assertIsInstance(mocked_server_status.call_args[0][0], TestException)
|
|
|
+
|
|
|
+
|
|
|
+class StatReloaderTests(ReloaderTests, IntegrationTests):
|
|
|
+ RELOADER_CLS = autoreload.StatReloader
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ super().setUp()
|
|
|
+
|
|
|
+ self.reloader.SLEEP_TIME = 0.01
|
|
|
+
|
|
|
+ def test_snapshot_files_ignores_missing_files(self):
|
|
|
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]):
|
|
|
+ self.assertEqual(dict(self.reloader.snapshot_files()), {})
|
|
|
+
|
|
|
+ def test_snapshot_files_updates(self):
|
|
|
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
|
|
|
+ snapshot1 = dict(self.reloader.snapshot_files())
|
|
|
+ self.assertIn(self.existing_file, snapshot1)
|
|
|
+ self.increment_mtime(self.existing_file)
|
|
|
+ snapshot2 = dict(self.reloader.snapshot_files())
|
|
|
+ self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file])
|
|
|
+
|
|
|
+ def test_does_not_fire_without_changes(self):
|
|
|
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
|
|
|
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
|
+ mtime = self.existing_file.stat().st_mtime
|
|
|
+ initial_snapshot = {self.existing_file: mtime}
|
|
|
+ second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
|
|
|
+ self.assertEqual(second_snapshot, {})
|
|
|
+ notifier.assert_not_called()
|
|
|
+
|
|
|
+ def test_fires_when_created(self):
|
|
|
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]), \
|
|
|
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
|
+ self.nonexistant_file.touch()
|
|
|
+ mtime = self.nonexistant_file.stat().st_mtime
|
|
|
+ second_snapshot = self.reloader.loop_files({}, mtime - 1)
|
|
|
+ self.assertCountEqual(second_snapshot.keys(), [self.nonexistant_file])
|
|
|
+ notifier.assert_called_once_with(self.nonexistant_file)
|
|
|
+
|
|
|
+ def test_fires_with_changes(self):
|
|
|
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
|
|
|
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
|
|
|
+ initial_snapshot = {self.existing_file: 1}
|
|
|
+ second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
|
|
|
+ notifier.assert_called_once_with(self.existing_file)
|
|
|
+ self.assertCountEqual(second_snapshot.keys(), [self.existing_file])
|