diff options
| author | Tom Forbes <tom@tomforb.es> | 2019-01-14 01:33:47 +0000 |
|---|---|---|
| committer | Tim Graham <timograham@gmail.com> | 2019-01-13 20:33:47 -0500 |
| commit | c8720e7696ca41f3262d5369365cc1bd72a216ca (patch) | |
| tree | 1fc858e37415196f06982d2af32f98e29044cde1 /tests/utils_tests/test_autoreload.py | |
| parent | a02a6fd5805f9f0e613b9951249555876b8c4041 (diff) | |
Fixed #27685 -- Added watchman support to the autoreloader.
Removed support for pyinotify (refs #9722).
Diffstat (limited to 'tests/utils_tests/test_autoreload.py')
| -rw-r--r-- | tests/utils_tests/test_autoreload.py | 768 |
1 files changed, 575 insertions, 193 deletions
diff --git a/tests/utils_tests/test_autoreload.py b/tests/utils_tests/test_autoreload.py index 486d62cd18..6aa272dd9a 100644 --- a/tests/utils_tests/test_autoreload.py +++ b/tests/utils_tests/test_autoreload.py @@ -1,257 +1,279 @@ -import gettext +import contextlib import os +import py_compile import shutil +import sys import tempfile +import threading +import time +import zipfile from importlib import import_module -from unittest import mock +from pathlib import Path +from unittest import mock, skip -import _thread - -from django import conf -from django.contrib import admin -from django.test import SimpleTestCase, override_settings +from django.apps.registry import Apps +from django.test import SimpleTestCase from django.test.utils import extend_sys_path from django.utils import autoreload -from django.utils.translation import trans_real - -LOCALE_PATH = os.path.join(os.path.dirname(__file__), 'locale') +from django.utils.autoreload import WatchmanUnavailable -class TestFilenameGenerator(SimpleTestCase): +class TestIterModulesAndFiles(SimpleTestCase): + def import_and_cleanup(self, name): + import_module(name) + self.addCleanup(lambda: sys.path_importer_cache.clear()) + self.addCleanup(lambda: sys.modules.pop(name, None)) def clear_autoreload_caches(self): - autoreload._cached_modules = set() - autoreload._cached_filenames = [] + autoreload.iter_modules_and_files.cache_clear() def assertFileFound(self, filename): + # Some temp directories are symlinks. Python resolves these fully while + # importing. + resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access - self.assertIn(filename, autoreload.gen_filenames()) + self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access - self.assertIn(filename, autoreload.gen_filenames()) + self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) + self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) def assertFileNotFound(self, filename): + resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access - self.assertNotIn(filename, autoreload.gen_filenames()) - # Test cached access - self.assertNotIn(filename, autoreload.gen_filenames()) - - def assertFileFoundOnlyNew(self, filename): - self.clear_autoreload_caches() - # Test uncached access - self.assertIn(filename, autoreload.gen_filenames(only_new=True)) + self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access - self.assertNotIn(filename, autoreload.gen_filenames(only_new=True)) - - def test_django_locales(self): - """ - gen_filenames() yields the built-in Django locale files. - """ - django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale') - django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(django_mo) - - @override_settings(LOCALE_PATHS=[LOCALE_PATH]) - def test_locale_paths_setting(self): - """ - gen_filenames also yields from LOCALE_PATHS locales. - """ - locale_paths_mo = os.path.join(LOCALE_PATH, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(locale_paths_mo) - - @override_settings(INSTALLED_APPS=[]) - def test_project_root_locale(self): - """ - gen_filenames() also yields from the current directory (project root). - """ - old_cwd = os.getcwd() - os.chdir(os.path.dirname(__file__)) - current_dir = os.path.join(os.path.dirname(__file__), 'locale') - current_dir_mo = os.path.join(current_dir, 'nl', 'LC_MESSAGES', 'django.mo') - try: - self.assertFileFound(current_dir_mo) - finally: - os.chdir(old_cwd) - - @override_settings(INSTALLED_APPS=['django.contrib.admin']) - def test_app_locales(self): - """ - gen_filenames() also yields from locale dirs in installed apps. - """ - admin_dir = os.path.join(os.path.dirname(admin.__file__), 'locale') - admin_mo = os.path.join(admin_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(admin_mo) + self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) + self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) - @override_settings(USE_I18N=False) - def test_no_i18n(self): - """ - If i18n machinery is disabled, there is no need for watching the - locale files. - """ - django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale') - django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileNotFound(django_mo) - - def test_paths_are_native_strings(self): - for filename in autoreload.gen_filenames(): - self.assertIsInstance(filename, str) - - def test_only_new_files(self): - """ - When calling a second time gen_filenames with only_new = True, only - files from newly loaded modules should be given. - """ + def temporary_file(self, filename): dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_only_new_module.py') self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w'): - pass - - # Test uncached access - self.clear_autoreload_caches() - filenames = set(autoreload.gen_filenames(only_new=True)) - filenames_reference = set(autoreload.gen_filenames()) - self.assertEqual(filenames, filenames_reference) - - # Test cached access: no changes - filenames = set(autoreload.gen_filenames(only_new=True)) - self.assertEqual(filenames, set()) + return Path(dirname) / filename - # Test cached access: add a module - with extend_sys_path(dirname): - import_module('test_only_new_module') - filenames = set(autoreload.gen_filenames(only_new=True)) - self.assertEqual(filenames, {filename}) + def test_paths_are_pathlib_instances(self): + for filename in autoreload.iter_all_python_module_files(): + self.assertIsInstance(filename, Path) - def test_deleted_removed(self): + def test_file_added(self): """ - When a file is deleted, gen_filenames() no longer returns it. + When a file is added, it's returned by iter_all_python_module_files(). """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_deleted_removed_module.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w'): - pass + filename = self.temporary_file('test_deleted_removed_module.py') + filename.touch() - with extend_sys_path(dirname): - import_module('test_deleted_removed_module') - self.assertFileFound(filename) + with extend_sys_path(str(filename.parent)): + self.import_and_cleanup('test_deleted_removed_module') - os.unlink(filename) - self.assertFileNotFound(filename) + self.assertFileFound(filename.absolute()) def test_check_errors(self): """ When a file containing an error is imported in a function wrapped by check_errors(), gen_filenames() returns it. """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_syntax_error.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("Ceci n'est pas du Python.") + filename = self.temporary_file('test_syntax_error.py') + filename.write_text("Ceci n'est pas du Python.") - with extend_sys_path(dirname): + with extend_sys_path(str(filename.parent)): with self.assertRaises(SyntaxError): autoreload.check_errors(import_module)('test_syntax_error') self.assertFileFound(filename) - def test_check_errors_only_new(self): - """ - When a file containing an error is imported in a function wrapped by - check_errors(), gen_filenames(only_new=True) returns it. - """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_syntax_error.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("Ceci n'est pas du Python.") - - with extend_sys_path(dirname): - with self.assertRaises(SyntaxError): - autoreload.check_errors(import_module)('test_syntax_error') - self.assertFileFoundOnlyNew(filename) - def test_check_errors_catches_all_exceptions(self): """ Since Python may raise arbitrary exceptions when importing code, check_errors() must catch Exception, not just some subclasses. """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_exception.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("raise Exception") - - with extend_sys_path(dirname): + filename = self.temporary_file('test_exception.py') + filename.write_text('raise Exception') + with extend_sys_path(str(filename.parent)): with self.assertRaises(Exception): autoreload.check_errors(import_module)('test_exception') self.assertFileFound(filename) - -class CleanFilesTests(SimpleTestCase): - TEST_MAP = { - # description: (input_file_list, expected_returned_file_list) - 'falsies': ([None, False], []), - 'pycs': (['myfile.pyc'], ['myfile.py']), - 'pyos': (['myfile.pyo'], ['myfile.py']), - '$py.class': (['myclass$py.class'], ['myclass.py']), - 'combined': ( - [None, 'file1.pyo', 'file2.pyc', 'myclass$py.class'], - ['file1.py', 'file2.py', 'myclass.py'], - ) - } - - def _run_tests(self, mock_files_exist=True): - with mock.patch('django.utils.autoreload.os.path.exists', return_value=mock_files_exist): - for description, values in self.TEST_MAP.items(): - filenames, expected_returned_filenames = values - self.assertEqual( - autoreload.clean_files(filenames), - expected_returned_filenames if mock_files_exist else [], - msg='{} failed for input file list: {}; returned file list: {}'.format( - description, filenames, expected_returned_filenames - ), - ) - - def test_files_exist(self): + def test_zip_reload(self): """ - If the file exists, any compiled files (pyc, pyo, $py.class) are - transformed as their source files. + Modules imported from zipped files have their archive location included + in the result. """ - self._run_tests() + zip_file = self.temporary_file('zip_import.zip') + with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf: + zipf.writestr('test_zipped_file.py', '') - def test_files_do_not_exist(self): - """ - If the files don't exist, they aren't in the returned file list. - """ - self._run_tests(mock_files_exist=False) + with extend_sys_path(str(zip_file)): + self.import_and_cleanup('test_zipped_file') + self.assertFileFound(zip_file) + def test_bytecode_conversion_to_source(self): + """.pyc and .pyo files are included in the files list.""" + filename = self.temporary_file('test_compiled.py') + filename.touch() + compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc')))) + filename.unlink() + with extend_sys_path(str(compiled_file.parent)): + self.import_and_cleanup('test_compiled') + self.assertFileFound(compiled_file) -class ResetTranslationsTests(SimpleTestCase): +class TestCommonRoots(SimpleTestCase): + def test_common_roots(self): + paths = ( + Path('/first/second'), + Path('/first/second/third'), + Path('/first/'), + Path('/root/first/'), + ) + results = autoreload.common_roots(paths) + self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')]) + + +class TestSysPathDirectories(SimpleTestCase): def setUp(self): - self.gettext_translations = gettext._translations.copy() - self.trans_real_translations = trans_real._translations.copy() + self._directory = tempfile.TemporaryDirectory() + self.directory = Path(self._directory.name).resolve().absolute() + self.file = self.directory / 'test' + self.file.touch() def tearDown(self): - gettext._translations = self.gettext_translations - trans_real._translations = self.trans_real_translations + self._directory.cleanup() + + def test_sys_paths_with_directories(self): + with extend_sys_path(str(self.file)): + paths = list(autoreload.sys_path_directories()) + self.assertIn(self.file.parent, paths) + + def test_sys_paths_non_existing(self): + nonexistant_file = Path(self.directory.name) / 'does_not_exist' + with extend_sys_path(str(nonexistant_file)): + paths = list(autoreload.sys_path_directories()) + self.assertNotIn(nonexistant_file, paths) + self.assertNotIn(nonexistant_file.parent, paths) + + def test_sys_paths_absolute(self): + paths = list(autoreload.sys_path_directories()) + self.assertTrue(all(p.is_absolute() for p in paths)) + + def test_sys_paths_directories(self): + with extend_sys_path(str(self.directory)): + paths = list(autoreload.sys_path_directories()) + self.assertIn(self.directory, paths) + + +class GetReloaderTests(SimpleTestCase): + @mock.patch('django.utils.autoreload.WatchmanReloader') + def test_watchman_unavailable(self, mocked_watchman): + mocked_watchman.check_availability.side_effect = WatchmanUnavailable + self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader) + + @mock.patch.object(autoreload.WatchmanReloader, 'check_availability') + def test_watchman_available(self, mocked_available): + # If WatchmanUnavailable isn't raised, Watchman will be chosen. + mocked_available.return_value = None + result = autoreload.get_reloader() + self.assertIsInstance(result, autoreload.WatchmanReloader) + - def test_resets_gettext(self): - gettext._translations = {'foo': 'bar'} - autoreload.reset_translations() - self.assertEqual(gettext._translations, {}) +class RunWithReloaderTests(SimpleTestCase): + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) + @mock.patch('django.utils.autoreload.get_reloader') + def test_swallows_keyboard_interrupt(self, mocked_get_reloader): + mocked_get_reloader.side_effect = KeyboardInterrupt() + autoreload.run_with_reloader(lambda: None) # No exception - def test_resets_trans_real(self): - trans_real._translations = {'foo': 'bar'} - trans_real._default = 1 - trans_real._active = False - autoreload.reset_translations() - self.assertEqual(trans_real._translations, {}) - self.assertIsNone(trans_real._default) - self.assertIsInstance(trans_real._active, _thread._local) + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'}) + @mock.patch('django.utils.autoreload.restart_with_reloader') + def test_calls_sys_exit(self, mocked_restart_reloader): + mocked_restart_reloader.return_value = 1 + with self.assertRaises(SystemExit) as exc: + autoreload.run_with_reloader(lambda: None) + self.assertEqual(exc.exception.code, 1) + + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) + @mock.patch('django.utils.autoreload.start_django') + @mock.patch('django.utils.autoreload.get_reloader') + def test_calls_start_django(self, mocked_reloader, mocked_start_django): + mocked_reloader.return_value = mock.sentinel.RELOADER + autoreload.run_with_reloader(mock.sentinel.METHOD) + self.assertEqual(mocked_start_django.call_count, 1) + self.assertSequenceEqual( + mocked_start_django.call_args[0], + [mock.sentinel.RELOADER, mock.sentinel.METHOD] + ) + + +class StartDjangoTests(SimpleTestCase): + @mock.patch('django.utils.autoreload.StatReloader') + def test_watchman_becomes_unavailable(self, mocked_stat): + mocked_stat.should_stop.return_value = True + fake_reloader = mock.MagicMock() + fake_reloader.should_stop = False + fake_reloader.run.side_effect = autoreload.WatchmanUnavailable() + + autoreload.start_django(fake_reloader, lambda: None) + self.assertEqual(mocked_stat.call_count, 1) + + @mock.patch('django.utils.autoreload.ensure_echo_on') + def test_echo_on_called(self, mocked_echo): + fake_reloader = mock.MagicMock() + autoreload.start_django(fake_reloader, lambda: None) + self.assertEqual(mocked_echo.call_count, 1) + + @mock.patch('django.utils.autoreload.check_errors') + def test_check_errors_called(self, mocked_check_errors): + fake_method = mock.MagicMock(return_value=None) + fake_reloader = mock.MagicMock() + autoreload.start_django(fake_reloader, fake_method) + self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method]) + + @mock.patch('threading.Thread') + @mock.patch('django.utils.autoreload.check_errors') + def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread): + fake_reloader = mock.MagicMock() + fake_main_func = mock.MagicMock() + fake_thread = mock.MagicMock() + mocked_check_errors.return_value = fake_main_func + mocked_thread.return_value = fake_thread + autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123) + self.assertEqual(mocked_thread.call_count, 1) + self.assertEqual( + mocked_thread.call_args[1], + {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}} + ) + self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True]) + self.assertTrue(fake_thread.start.called) + + +class TestCheckErrors(SimpleTestCase): + def test_mutates_error_files(self): + fake_method = mock.MagicMock(side_effect=RuntimeError()) + wrapped = autoreload.check_errors(fake_method) + with mock.patch.object(autoreload, '_error_files') as mocked_error_files: + with self.assertRaises(RuntimeError): + wrapped() + self.assertEqual(mocked_error_files.append.call_count, 1) + + +class TestRaiseLastException(SimpleTestCase): + @mock.patch('django.utils.autoreload._exception', None) + def test_no_exception(self): + # Should raise no exception if _exception is None + autoreload.raise_last_exception() + + def test_raises_exception(self): + class MyException(Exception): + pass + + # Create an exception + try: + raise MyException('Test Message') + except MyException: + exc_info = sys.exc_info() + + with mock.patch('django.utils.autoreload._exception', exc_info): + with self.assertRaises(MyException, msg='Test Message'): + autoreload.raise_last_exception() class RestartWithReloaderTests(SimpleTestCase): @@ -286,3 +308,363 @@ class RestartWithReloaderTests(SimpleTestCase): autoreload.restart_with_reloader() self.assertEqual(mock_call.call_count, 1) self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:]) + + +class ReloaderTests(SimpleTestCase): + RELOADER_CLS = None + + def setUp(self): + self._tempdir = tempfile.TemporaryDirectory() + self.tempdir = Path(self._tempdir.name).resolve().absolute() + self.existing_file = self.ensure_file(self.tempdir / 'test.py') + self.nonexistant_file = (self.tempdir / 'does_not_exist.py').absolute() + self.reloader = self.RELOADER_CLS() + + def tearDown(self): + self._tempdir.cleanup() + self.reloader.stop() + + def ensure_file(self, path): + path.parent.mkdir(exist_ok=True, parents=True) + path.touch() + # On Linux and Windows updating the mtime of a file using touch() will set a timestamp + # value that is in the past, as the time value for the last kernel tick is used rather + # than getting the correct absolute time. + # To make testing simpler set the mtime to be the observed time when this function is + # called. + self.set_mtime(path, time.time()) + return path.absolute() + + def set_mtime(self, fp, value): + os.utime(str(fp), (value, value)) + + def increment_mtime(self, fp, by=1): + current_time = time.time() + self.set_mtime(fp, current_time + by) + + @contextlib.contextmanager + def tick_twice(self): + ticker = self.reloader.tick() + next(ticker) + yield + next(ticker) + + +class IntegrationTests: + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_file(self, mocked_modules, notify_mock): + self.reloader.watch_file(self.existing_file) + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nonexistant_file(self, mocked_modules, notify_mock): + self.reloader.watch_file(self.nonexistant_file) + with self.tick_twice(): + self.ensure_file(self.nonexistant_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nonexistant_file_in_non_existing_directory(self, mocked_modules, notify_mock): + non_existing_directory = self.tempdir / 'non_existing_dir' + nonexistant_file = non_existing_directory / 'test' + self.reloader.watch_file(nonexistant_file) + with self.tick_twice(): + self.ensure_file(nonexistant_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'non_py_file') + self.reloader.watch_dir(self.tempdir, '*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob_non_existing_directory(self, mocked_modules, notify_mock): + non_existing_directory = self.tempdir / 'does_not_exist' + nonexistant_file = non_existing_directory / 'test.py' + self.reloader.watch_dir(non_existing_directory, '*.py') + with self.tick_twice(): + self.ensure_file(nonexistant_file) + self.set_mtime(nonexistant_file, time.time()) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_multiple_globs(self, mocked_modules, notify_mock): + self.ensure_file(self.tempdir / 'x.test') + self.reloader.watch_dir(self.tempdir, '*.py') + self.reloader.watch_dir(self.tempdir, '*.test') + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_overlapping_globs(self, mocked_modules, notify_mock): + self.reloader.watch_dir(self.tempdir, '*.py') + self.reloader.watch_dir(self.tempdir, '*.p*') + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob_recursive(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file') + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [py_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_multiple_recursive_globs(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt') + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.txt') + self.reloader.watch_dir(self.tempdir, '**/*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 2) + self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nested_glob_recursive(self, mocked_modules, notify_mock): + inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + self.reloader.watch_dir(inner_py_file.parent, '**/*.py') + with self.tick_twice(): + self.increment_mtime(inner_py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [inner_py_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_overlapping_glob_recursive(self, mocked_modules, notify_mock): + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.p*') + self.reloader.watch_dir(self.tempdir, '**/*.py*') + with self.tick_twice(): + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [py_file]) + + +class BaseReloaderTests(ReloaderTests): + RELOADER_CLS = autoreload.BaseReloader + + def test_watch_without_absolute(self): + with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'): + self.reloader.watch_file('test.py') + + def test_watch_with_single_file(self): + self.reloader.watch_file(self.existing_file) + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + + def test_watch_with_glob(self): + self.reloader.watch_dir(self.tempdir, '*.py') + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + + def test_watch_files_with_recursive_glob(self): + inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + self.assertIn(inner_file, watched_files) + + def test_run_loop_catches_stopiteration(self): + def mocked_tick(): + yield + + with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: + self.reloader.run_loop() + self.assertEqual(tick.call_count, 1) + + def test_run_loop_stop_and_return(self): + def mocked_tick(*args): + yield + self.reloader.stop() + return # Raises StopIteration + + with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: + self.reloader.run_loop() + + self.assertEqual(tick.call_count, 1) + + def test_wait_for_apps_ready_checks_for_exception(self): + app_reg = Apps() + app_reg.ready_event.set() + # thread.is_alive() is False if it's not started. + dead_thread = threading.Thread() + self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread)) + + def test_wait_for_apps_ready_without_exception(self): + app_reg = Apps() + app_reg.ready_event.set() + thread = mock.MagicMock() + thread.is_alive.return_value = True + self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread)) + + +def skip_unless_watchman_available(): + try: + autoreload.WatchmanReloader.check_availability() + except WatchmanUnavailable as e: + return skip('Watchman unavailable: %s' % e) + return lambda func: func + + +@skip_unless_watchman_available() +class WatchmanReloaderTests(ReloaderTests, IntegrationTests): + RELOADER_CLS = autoreload.WatchmanReloader + + def test_watch_glob_ignores_non_existing_directories_two_levels(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*']) + self.assertFalse(mocked_subscribe.called) + + def test_watch_glob_uses_existing_parent_directories(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*']) + self.assertSequenceEqual( + mocked_subscribe.call_args[0], + [ + self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir, + ['anyof', ['match', 'does_not_exist/*', 'wholename']] + ] + ) + + def test_watch_glob_multiple_patterns(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir, ['*', '*.py']) + self.assertSequenceEqual( + mocked_subscribe.call_args[0], + [ + self.tempdir, 'glob:%s' % self.tempdir, + ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']] + ] + ) + + def test_watched_roots_contains_files(self): + paths = self.reloader.watched_roots([self.existing_file]) + self.assertIn(self.existing_file.parent, paths) + + def test_watched_roots_contains_directory_globs(self): + self.reloader.watch_dir(self.tempdir, '*.py') + paths = self.reloader.watched_roots([]) + self.assertIn(self.tempdir, paths) + + def test_watched_roots_contains_sys_path(self): + with extend_sys_path(str(self.tempdir)): + paths = self.reloader.watched_roots([]) + self.assertIn(self.tempdir, paths) + + def test_check_server_status(self): + self.assertTrue(self.reloader.check_server_status()) + + def test_check_server_status_raises_error(self): + with mock.patch.object(self.reloader.client, 'query') as mocked_query: + mocked_query.side_effect = Exception() + with self.assertRaises(autoreload.WatchmanUnavailable): + self.reloader.check_server_status() + + @mock.patch('pywatchman.client') + def test_check_availability(self, mocked_client): + mocked_client().capabilityCheck.side_effect = Exception() + with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'): + self.RELOADER_CLS.check_availability() + + @mock.patch('pywatchman.client') + def test_check_availability_lower_version(self, mocked_client): + mocked_client().capabilityCheck.return_value = {'version': '4.8.10'} + with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'): + self.RELOADER_CLS.check_availability() + + def test_pywatchman_not_available(self): + with mock.patch.object(autoreload, 'pywatchman') as mocked: + mocked.__bool__.return_value = False + with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'): + self.RELOADER_CLS.check_availability() + + def test_update_watches_raises_exceptions(self): + class TestException(Exception): + pass + + with mock.patch.object(self.reloader, '_update_watches') as mocked_watches: + with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status: + mocked_watches.side_effect = TestException() + mocked_server_status.return_value = True + with self.assertRaises(TestException): + self.reloader.update_watches() + self.assertIsInstance(mocked_server_status.call_args[0][0], TestException) + + +class StatReloaderTests(ReloaderTests, IntegrationTests): + RELOADER_CLS = autoreload.StatReloader + + def setUp(self): + super().setUp() + # Shorten the sleep time to speed up tests. + self.reloader.SLEEP_TIME = 0.01 + + def test_snapshot_files_ignores_missing_files(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]): + self.assertEqual(dict(self.reloader.snapshot_files()), {}) + + def test_snapshot_files_updates(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]): + snapshot1 = dict(self.reloader.snapshot_files()) + self.assertIn(self.existing_file, snapshot1) + self.increment_mtime(self.existing_file) + snapshot2 = dict(self.reloader.snapshot_files()) + self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file]) + + def test_does_not_fire_without_changes(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + mtime = self.existing_file.stat().st_mtime + initial_snapshot = {self.existing_file: mtime} + second_snapshot = self.reloader.loop_files(initial_snapshot, time.time()) + self.assertEqual(second_snapshot, {}) + notifier.assert_not_called() + + def test_fires_when_created(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + self.nonexistant_file.touch() + mtime = self.nonexistant_file.stat().st_mtime + second_snapshot = self.reloader.loop_files({}, mtime - 1) + self.assertCountEqual(second_snapshot.keys(), [self.nonexistant_file]) + notifier.assert_called_once_with(self.nonexistant_file) + + def test_fires_with_changes(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + initial_snapshot = {self.existing_file: 1} + second_snapshot = self.reloader.loop_files(initial_snapshot, time.time()) + notifier.assert_called_once_with(self.existing_file) + self.assertCountEqual(second_snapshot.keys(), [self.existing_file]) |
