diff options
| author | Tom Forbes <tom@tomforb.es> | 2019-01-14 01:33:47 +0000 |
|---|---|---|
| committer | Tim Graham <timograham@gmail.com> | 2019-01-13 20:33:47 -0500 |
| commit | c8720e7696ca41f3262d5369365cc1bd72a216ca (patch) | |
| tree | 1fc858e37415196f06982d2af32f98e29044cde1 /tests | |
| parent | a02a6fd5805f9f0e613b9951249555876b8c4041 (diff) | |
Fixed #27685 -- Added watchman support to the autoreloader.
Removed support for pyinotify (refs #9722).
Diffstat (limited to 'tests')
| -rw-r--r-- | tests/apps/tests.py | 3 | ||||
| -rw-r--r-- | tests/i18n/tests.py | 68 | ||||
| -rw-r--r-- | tests/requirements/py3.txt | 1 | ||||
| -rw-r--r-- | tests/utils_tests/locale/nl/LC_MESSAGES/django.mo | bin | 367 -> 0 bytes | |||
| -rw-r--r-- | tests/utils_tests/locale/nl/LC_MESSAGES/django.po | 17 | ||||
| -rw-r--r-- | tests/utils_tests/test_autoreload.py | 768 |
6 files changed, 647 insertions, 210 deletions
diff --git a/tests/apps/tests.py b/tests/apps/tests.py index cd22a4d45c..566aec60c3 100644 --- a/tests/apps/tests.py +++ b/tests/apps/tests.py @@ -48,6 +48,9 @@ class AppsTests(SimpleTestCase): self.assertIs(apps.ready, True) # Non-master app registries are populated in __init__. self.assertIs(Apps().ready, True) + # The condition is set when apps are ready + self.assertIs(apps.ready_event.is_set(), True) + self.assertIs(Apps().ready_event.is_set(), True) def test_bad_app_config(self): """ diff --git a/tests/i18n/tests.py b/tests/i18n/tests.py index 7b54089cf2..2377c8992e 100644 --- a/tests/i18n/tests.py +++ b/tests/i18n/tests.py @@ -7,9 +7,12 @@ import re import tempfile from contextlib import contextmanager from importlib import import_module +from pathlib import Path from threading import local from unittest import mock +import _thread + from django import forms from django.apps import AppConfig from django.conf import settings @@ -33,6 +36,9 @@ from django.utils.translation import ( npgettext, npgettext_lazy, pgettext, to_language, to_locale, trans_null, trans_real, ugettext, ugettext_lazy, ungettext, ungettext_lazy, ) +from django.utils.translation.reloader import ( + translation_file_changed, watch_for_translation_changes, +) from .forms import CompanyForm, I18nForm, SelectDateForm from .models import Company, TestModel @@ -1790,3 +1796,65 @@ class NonDjangoLanguageTests(SimpleTestCase): def test_plural_non_django_language(self): self.assertEqual(get_language(), 'xyz') self.assertEqual(ngettext('year', 'years', 2), 'years') + + +@override_settings(USE_I18N=True) +class WatchForTranslationChangesTests(SimpleTestCase): + @override_settings(USE_I18N=False) + def test_i18n_disabled(self): + mocked_sender = mock.MagicMock() + watch_for_translation_changes(mocked_sender) + mocked_sender.watch_dir.assert_not_called() + + def test_i18n_enabled(self): + mocked_sender = mock.MagicMock() + watch_for_translation_changes(mocked_sender) + self.assertGreater(mocked_sender.watch_dir.call_count, 1) + + def test_i18n_locale_paths(self): + mocked_sender = mock.MagicMock() + with tempfile.TemporaryDirectory() as app_dir: + with self.settings(LOCALE_PATHS=[app_dir]): + watch_for_translation_changes(mocked_sender) + mocked_sender.watch_dir.assert_any_call(Path(app_dir), '**/*.mo') + + def test_i18n_app_dirs(self): + mocked_sender = mock.MagicMock() + with self.settings(INSTALLED_APPS=['tests.i18n.sampleproject']): + watch_for_translation_changes(mocked_sender) + project_dir = Path(__file__).parent / 'sampleproject' / 'locale' + mocked_sender.watch_dir.assert_any_call(project_dir, '**/*.mo') + + def test_i18n_local_locale(self): + mocked_sender = mock.MagicMock() + watch_for_translation_changes(mocked_sender) + locale_dir = Path(__file__).parent / 'locale' + mocked_sender.watch_dir.assert_any_call(locale_dir, '**/*.mo') + + +class TranslationFileChangedTests(SimpleTestCase): + def setUp(self): + self.gettext_translations = gettext_module._translations.copy() + self.trans_real_translations = trans_real._translations.copy() + + def tearDown(self): + gettext._translations = self.gettext_translations + trans_real._translations = self.trans_real_translations + + def test_ignores_non_mo_files(self): + gettext_module._translations = {'foo': 'bar'} + path = Path('test.py') + self.assertIsNone(translation_file_changed(None, path)) + self.assertEqual(gettext_module._translations, {'foo': 'bar'}) + + def test_resets_cache_with_mo_files(self): + gettext_module._translations = {'foo': 'bar'} + trans_real._translations = {'foo': 'bar'} + trans_real._default = 1 + trans_real._active = False + path = Path('test.mo') + self.assertIs(translation_file_changed(None, path), True) + self.assertEqual(gettext_module._translations, {}) + self.assertEqual(trans_real._translations, {}) + self.assertIsNone(trans_real._default) + self.assertIsInstance(trans_real._active, _thread._local) diff --git a/tests/requirements/py3.txt b/tests/requirements/py3.txt index cc84522ca0..3f0a01e164 100644 --- a/tests/requirements/py3.txt +++ b/tests/requirements/py3.txt @@ -9,6 +9,7 @@ Pillow != 5.4.0 pylibmc; sys.platform != 'win32' python-memcached >= 1.59 pytz +pywatchman; sys.platform != 'win32' PyYAML selenium sqlparse diff --git a/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo b/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo Binary files differdeleted file mode 100644 index 3ead8f2a31..0000000000 --- a/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo +++ /dev/null diff --git a/tests/utils_tests/locale/nl/LC_MESSAGES/django.po b/tests/utils_tests/locale/nl/LC_MESSAGES/django.po deleted file mode 100644 index 6633f12b39..0000000000 --- a/tests/utils_tests/locale/nl/LC_MESSAGES/django.po +++ /dev/null @@ -1,17 +0,0 @@ -# SOME DESCRIPTIVE TITLE. -# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER -# This file is distributed under the same license as the PACKAGE package. -# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR. -# -#, fuzzy -msgid "" -msgstr "" -"Project-Id-Version: PACKAGE VERSION\n" -"Report-Msgid-Bugs-To: \n" -"POT-Creation-Date: 2007-09-15 19:15+0200\n" -"PO-Revision-Date: 2010-05-12 12:41-0300\n" -"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n" -"Language-Team: LANGUAGE <LL@li.org>\n" -"MIME-Version: 1.0\n" -"Content-Type: text/plain; charset=UTF-8\n" -"Content-Transfer-Encoding: 8bit\n" diff --git a/tests/utils_tests/test_autoreload.py b/tests/utils_tests/test_autoreload.py index 486d62cd18..6aa272dd9a 100644 --- a/tests/utils_tests/test_autoreload.py +++ b/tests/utils_tests/test_autoreload.py @@ -1,257 +1,279 @@ -import gettext +import contextlib import os +import py_compile import shutil +import sys import tempfile +import threading +import time +import zipfile from importlib import import_module -from unittest import mock +from pathlib import Path +from unittest import mock, skip -import _thread - -from django import conf -from django.contrib import admin -from django.test import SimpleTestCase, override_settings +from django.apps.registry import Apps +from django.test import SimpleTestCase from django.test.utils import extend_sys_path from django.utils import autoreload -from django.utils.translation import trans_real - -LOCALE_PATH = os.path.join(os.path.dirname(__file__), 'locale') +from django.utils.autoreload import WatchmanUnavailable -class TestFilenameGenerator(SimpleTestCase): +class TestIterModulesAndFiles(SimpleTestCase): + def import_and_cleanup(self, name): + import_module(name) + self.addCleanup(lambda: sys.path_importer_cache.clear()) + self.addCleanup(lambda: sys.modules.pop(name, None)) def clear_autoreload_caches(self): - autoreload._cached_modules = set() - autoreload._cached_filenames = [] + autoreload.iter_modules_and_files.cache_clear() def assertFileFound(self, filename): + # Some temp directories are symlinks. Python resolves these fully while + # importing. + resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access - self.assertIn(filename, autoreload.gen_filenames()) + self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access - self.assertIn(filename, autoreload.gen_filenames()) + self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files())) + self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) def assertFileNotFound(self, filename): + resolved_filename = filename.resolve() self.clear_autoreload_caches() # Test uncached access - self.assertNotIn(filename, autoreload.gen_filenames()) - # Test cached access - self.assertNotIn(filename, autoreload.gen_filenames()) - - def assertFileFoundOnlyNew(self, filename): - self.clear_autoreload_caches() - # Test uncached access - self.assertIn(filename, autoreload.gen_filenames(only_new=True)) + self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) # Test cached access - self.assertNotIn(filename, autoreload.gen_filenames(only_new=True)) - - def test_django_locales(self): - """ - gen_filenames() yields the built-in Django locale files. - """ - django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale') - django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(django_mo) - - @override_settings(LOCALE_PATHS=[LOCALE_PATH]) - def test_locale_paths_setting(self): - """ - gen_filenames also yields from LOCALE_PATHS locales. - """ - locale_paths_mo = os.path.join(LOCALE_PATH, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(locale_paths_mo) - - @override_settings(INSTALLED_APPS=[]) - def test_project_root_locale(self): - """ - gen_filenames() also yields from the current directory (project root). - """ - old_cwd = os.getcwd() - os.chdir(os.path.dirname(__file__)) - current_dir = os.path.join(os.path.dirname(__file__), 'locale') - current_dir_mo = os.path.join(current_dir, 'nl', 'LC_MESSAGES', 'django.mo') - try: - self.assertFileFound(current_dir_mo) - finally: - os.chdir(old_cwd) - - @override_settings(INSTALLED_APPS=['django.contrib.admin']) - def test_app_locales(self): - """ - gen_filenames() also yields from locale dirs in installed apps. - """ - admin_dir = os.path.join(os.path.dirname(admin.__file__), 'locale') - admin_mo = os.path.join(admin_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileFound(admin_mo) + self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files())) + self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1) - @override_settings(USE_I18N=False) - def test_no_i18n(self): - """ - If i18n machinery is disabled, there is no need for watching the - locale files. - """ - django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale') - django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo') - self.assertFileNotFound(django_mo) - - def test_paths_are_native_strings(self): - for filename in autoreload.gen_filenames(): - self.assertIsInstance(filename, str) - - def test_only_new_files(self): - """ - When calling a second time gen_filenames with only_new = True, only - files from newly loaded modules should be given. - """ + def temporary_file(self, filename): dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_only_new_module.py') self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w'): - pass - - # Test uncached access - self.clear_autoreload_caches() - filenames = set(autoreload.gen_filenames(only_new=True)) - filenames_reference = set(autoreload.gen_filenames()) - self.assertEqual(filenames, filenames_reference) - - # Test cached access: no changes - filenames = set(autoreload.gen_filenames(only_new=True)) - self.assertEqual(filenames, set()) + return Path(dirname) / filename - # Test cached access: add a module - with extend_sys_path(dirname): - import_module('test_only_new_module') - filenames = set(autoreload.gen_filenames(only_new=True)) - self.assertEqual(filenames, {filename}) + def test_paths_are_pathlib_instances(self): + for filename in autoreload.iter_all_python_module_files(): + self.assertIsInstance(filename, Path) - def test_deleted_removed(self): + def test_file_added(self): """ - When a file is deleted, gen_filenames() no longer returns it. + When a file is added, it's returned by iter_all_python_module_files(). """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_deleted_removed_module.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w'): - pass + filename = self.temporary_file('test_deleted_removed_module.py') + filename.touch() - with extend_sys_path(dirname): - import_module('test_deleted_removed_module') - self.assertFileFound(filename) + with extend_sys_path(str(filename.parent)): + self.import_and_cleanup('test_deleted_removed_module') - os.unlink(filename) - self.assertFileNotFound(filename) + self.assertFileFound(filename.absolute()) def test_check_errors(self): """ When a file containing an error is imported in a function wrapped by check_errors(), gen_filenames() returns it. """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_syntax_error.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("Ceci n'est pas du Python.") + filename = self.temporary_file('test_syntax_error.py') + filename.write_text("Ceci n'est pas du Python.") - with extend_sys_path(dirname): + with extend_sys_path(str(filename.parent)): with self.assertRaises(SyntaxError): autoreload.check_errors(import_module)('test_syntax_error') self.assertFileFound(filename) - def test_check_errors_only_new(self): - """ - When a file containing an error is imported in a function wrapped by - check_errors(), gen_filenames(only_new=True) returns it. - """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_syntax_error.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("Ceci n'est pas du Python.") - - with extend_sys_path(dirname): - with self.assertRaises(SyntaxError): - autoreload.check_errors(import_module)('test_syntax_error') - self.assertFileFoundOnlyNew(filename) - def test_check_errors_catches_all_exceptions(self): """ Since Python may raise arbitrary exceptions when importing code, check_errors() must catch Exception, not just some subclasses. """ - dirname = tempfile.mkdtemp() - filename = os.path.join(dirname, 'test_exception.py') - self.addCleanup(shutil.rmtree, dirname) - with open(filename, 'w') as f: - f.write("raise Exception") - - with extend_sys_path(dirname): + filename = self.temporary_file('test_exception.py') + filename.write_text('raise Exception') + with extend_sys_path(str(filename.parent)): with self.assertRaises(Exception): autoreload.check_errors(import_module)('test_exception') self.assertFileFound(filename) - -class CleanFilesTests(SimpleTestCase): - TEST_MAP = { - # description: (input_file_list, expected_returned_file_list) - 'falsies': ([None, False], []), - 'pycs': (['myfile.pyc'], ['myfile.py']), - 'pyos': (['myfile.pyo'], ['myfile.py']), - '$py.class': (['myclass$py.class'], ['myclass.py']), - 'combined': ( - [None, 'file1.pyo', 'file2.pyc', 'myclass$py.class'], - ['file1.py', 'file2.py', 'myclass.py'], - ) - } - - def _run_tests(self, mock_files_exist=True): - with mock.patch('django.utils.autoreload.os.path.exists', return_value=mock_files_exist): - for description, values in self.TEST_MAP.items(): - filenames, expected_returned_filenames = values - self.assertEqual( - autoreload.clean_files(filenames), - expected_returned_filenames if mock_files_exist else [], - msg='{} failed for input file list: {}; returned file list: {}'.format( - description, filenames, expected_returned_filenames - ), - ) - - def test_files_exist(self): + def test_zip_reload(self): """ - If the file exists, any compiled files (pyc, pyo, $py.class) are - transformed as their source files. + Modules imported from zipped files have their archive location included + in the result. """ - self._run_tests() + zip_file = self.temporary_file('zip_import.zip') + with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf: + zipf.writestr('test_zipped_file.py', '') - def test_files_do_not_exist(self): - """ - If the files don't exist, they aren't in the returned file list. - """ - self._run_tests(mock_files_exist=False) + with extend_sys_path(str(zip_file)): + self.import_and_cleanup('test_zipped_file') + self.assertFileFound(zip_file) + def test_bytecode_conversion_to_source(self): + """.pyc and .pyo files are included in the files list.""" + filename = self.temporary_file('test_compiled.py') + filename.touch() + compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc')))) + filename.unlink() + with extend_sys_path(str(compiled_file.parent)): + self.import_and_cleanup('test_compiled') + self.assertFileFound(compiled_file) -class ResetTranslationsTests(SimpleTestCase): +class TestCommonRoots(SimpleTestCase): + def test_common_roots(self): + paths = ( + Path('/first/second'), + Path('/first/second/third'), + Path('/first/'), + Path('/root/first/'), + ) + results = autoreload.common_roots(paths) + self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')]) + + +class TestSysPathDirectories(SimpleTestCase): def setUp(self): - self.gettext_translations = gettext._translations.copy() - self.trans_real_translations = trans_real._translations.copy() + self._directory = tempfile.TemporaryDirectory() + self.directory = Path(self._directory.name).resolve().absolute() + self.file = self.directory / 'test' + self.file.touch() def tearDown(self): - gettext._translations = self.gettext_translations - trans_real._translations = self.trans_real_translations + self._directory.cleanup() + + def test_sys_paths_with_directories(self): + with extend_sys_path(str(self.file)): + paths = list(autoreload.sys_path_directories()) + self.assertIn(self.file.parent, paths) + + def test_sys_paths_non_existing(self): + nonexistant_file = Path(self.directory.name) / 'does_not_exist' + with extend_sys_path(str(nonexistant_file)): + paths = list(autoreload.sys_path_directories()) + self.assertNotIn(nonexistant_file, paths) + self.assertNotIn(nonexistant_file.parent, paths) + + def test_sys_paths_absolute(self): + paths = list(autoreload.sys_path_directories()) + self.assertTrue(all(p.is_absolute() for p in paths)) + + def test_sys_paths_directories(self): + with extend_sys_path(str(self.directory)): + paths = list(autoreload.sys_path_directories()) + self.assertIn(self.directory, paths) + + +class GetReloaderTests(SimpleTestCase): + @mock.patch('django.utils.autoreload.WatchmanReloader') + def test_watchman_unavailable(self, mocked_watchman): + mocked_watchman.check_availability.side_effect = WatchmanUnavailable + self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader) + + @mock.patch.object(autoreload.WatchmanReloader, 'check_availability') + def test_watchman_available(self, mocked_available): + # If WatchmanUnavailable isn't raised, Watchman will be chosen. + mocked_available.return_value = None + result = autoreload.get_reloader() + self.assertIsInstance(result, autoreload.WatchmanReloader) + - def test_resets_gettext(self): - gettext._translations = {'foo': 'bar'} - autoreload.reset_translations() - self.assertEqual(gettext._translations, {}) +class RunWithReloaderTests(SimpleTestCase): + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) + @mock.patch('django.utils.autoreload.get_reloader') + def test_swallows_keyboard_interrupt(self, mocked_get_reloader): + mocked_get_reloader.side_effect = KeyboardInterrupt() + autoreload.run_with_reloader(lambda: None) # No exception - def test_resets_trans_real(self): - trans_real._translations = {'foo': 'bar'} - trans_real._default = 1 - trans_real._active = False - autoreload.reset_translations() - self.assertEqual(trans_real._translations, {}) - self.assertIsNone(trans_real._default) - self.assertIsInstance(trans_real._active, _thread._local) + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'}) + @mock.patch('django.utils.autoreload.restart_with_reloader') + def test_calls_sys_exit(self, mocked_restart_reloader): + mocked_restart_reloader.return_value = 1 + with self.assertRaises(SystemExit) as exc: + autoreload.run_with_reloader(lambda: None) + self.assertEqual(exc.exception.code, 1) + + @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'}) + @mock.patch('django.utils.autoreload.start_django') + @mock.patch('django.utils.autoreload.get_reloader') + def test_calls_start_django(self, mocked_reloader, mocked_start_django): + mocked_reloader.return_value = mock.sentinel.RELOADER + autoreload.run_with_reloader(mock.sentinel.METHOD) + self.assertEqual(mocked_start_django.call_count, 1) + self.assertSequenceEqual( + mocked_start_django.call_args[0], + [mock.sentinel.RELOADER, mock.sentinel.METHOD] + ) + + +class StartDjangoTests(SimpleTestCase): + @mock.patch('django.utils.autoreload.StatReloader') + def test_watchman_becomes_unavailable(self, mocked_stat): + mocked_stat.should_stop.return_value = True + fake_reloader = mock.MagicMock() + fake_reloader.should_stop = False + fake_reloader.run.side_effect = autoreload.WatchmanUnavailable() + + autoreload.start_django(fake_reloader, lambda: None) + self.assertEqual(mocked_stat.call_count, 1) + + @mock.patch('django.utils.autoreload.ensure_echo_on') + def test_echo_on_called(self, mocked_echo): + fake_reloader = mock.MagicMock() + autoreload.start_django(fake_reloader, lambda: None) + self.assertEqual(mocked_echo.call_count, 1) + + @mock.patch('django.utils.autoreload.check_errors') + def test_check_errors_called(self, mocked_check_errors): + fake_method = mock.MagicMock(return_value=None) + fake_reloader = mock.MagicMock() + autoreload.start_django(fake_reloader, fake_method) + self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method]) + + @mock.patch('threading.Thread') + @mock.patch('django.utils.autoreload.check_errors') + def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread): + fake_reloader = mock.MagicMock() + fake_main_func = mock.MagicMock() + fake_thread = mock.MagicMock() + mocked_check_errors.return_value = fake_main_func + mocked_thread.return_value = fake_thread + autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123) + self.assertEqual(mocked_thread.call_count, 1) + self.assertEqual( + mocked_thread.call_args[1], + {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}} + ) + self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True]) + self.assertTrue(fake_thread.start.called) + + +class TestCheckErrors(SimpleTestCase): + def test_mutates_error_files(self): + fake_method = mock.MagicMock(side_effect=RuntimeError()) + wrapped = autoreload.check_errors(fake_method) + with mock.patch.object(autoreload, '_error_files') as mocked_error_files: + with self.assertRaises(RuntimeError): + wrapped() + self.assertEqual(mocked_error_files.append.call_count, 1) + + +class TestRaiseLastException(SimpleTestCase): + @mock.patch('django.utils.autoreload._exception', None) + def test_no_exception(self): + # Should raise no exception if _exception is None + autoreload.raise_last_exception() + + def test_raises_exception(self): + class MyException(Exception): + pass + + # Create an exception + try: + raise MyException('Test Message') + except MyException: + exc_info = sys.exc_info() + + with mock.patch('django.utils.autoreload._exception', exc_info): + with self.assertRaises(MyException, msg='Test Message'): + autoreload.raise_last_exception() class RestartWithReloaderTests(SimpleTestCase): @@ -286,3 +308,363 @@ class RestartWithReloaderTests(SimpleTestCase): autoreload.restart_with_reloader() self.assertEqual(mock_call.call_count, 1) self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:]) + + +class ReloaderTests(SimpleTestCase): + RELOADER_CLS = None + + def setUp(self): + self._tempdir = tempfile.TemporaryDirectory() + self.tempdir = Path(self._tempdir.name).resolve().absolute() + self.existing_file = self.ensure_file(self.tempdir / 'test.py') + self.nonexistant_file = (self.tempdir / 'does_not_exist.py').absolute() + self.reloader = self.RELOADER_CLS() + + def tearDown(self): + self._tempdir.cleanup() + self.reloader.stop() + + def ensure_file(self, path): + path.parent.mkdir(exist_ok=True, parents=True) + path.touch() + # On Linux and Windows updating the mtime of a file using touch() will set a timestamp + # value that is in the past, as the time value for the last kernel tick is used rather + # than getting the correct absolute time. + # To make testing simpler set the mtime to be the observed time when this function is + # called. + self.set_mtime(path, time.time()) + return path.absolute() + + def set_mtime(self, fp, value): + os.utime(str(fp), (value, value)) + + def increment_mtime(self, fp, by=1): + current_time = time.time() + self.set_mtime(fp, current_time + by) + + @contextlib.contextmanager + def tick_twice(self): + ticker = self.reloader.tick() + next(ticker) + yield + next(ticker) + + +class IntegrationTests: + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_file(self, mocked_modules, notify_mock): + self.reloader.watch_file(self.existing_file) + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nonexistant_file(self, mocked_modules, notify_mock): + self.reloader.watch_file(self.nonexistant_file) + with self.tick_twice(): + self.ensure_file(self.nonexistant_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nonexistant_file_in_non_existing_directory(self, mocked_modules, notify_mock): + non_existing_directory = self.tempdir / 'non_existing_dir' + nonexistant_file = non_existing_directory / 'test' + self.reloader.watch_file(nonexistant_file) + with self.tick_twice(): + self.ensure_file(nonexistant_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'non_py_file') + self.reloader.watch_dir(self.tempdir, '*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob_non_existing_directory(self, mocked_modules, notify_mock): + non_existing_directory = self.tempdir / 'does_not_exist' + nonexistant_file = non_existing_directory / 'test.py' + self.reloader.watch_dir(non_existing_directory, '*.py') + with self.tick_twice(): + self.ensure_file(nonexistant_file) + self.set_mtime(nonexistant_file, time.time()) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_multiple_globs(self, mocked_modules, notify_mock): + self.ensure_file(self.tempdir / 'x.test') + self.reloader.watch_dir(self.tempdir, '*.py') + self.reloader.watch_dir(self.tempdir, '*.test') + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_overlapping_globs(self, mocked_modules, notify_mock): + self.reloader.watch_dir(self.tempdir, '*.py') + self.reloader.watch_dir(self.tempdir, '*.p*') + with self.tick_twice(): + self.increment_mtime(self.existing_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [self.existing_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_glob_recursive(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file') + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [py_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_multiple_recursive_globs(self, mocked_modules, notify_mock): + non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt') + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.txt') + self.reloader.watch_dir(self.tempdir, '**/*.py') + with self.tick_twice(): + self.increment_mtime(non_py_file) + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 2) + self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_nested_glob_recursive(self, mocked_modules, notify_mock): + inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + self.reloader.watch_dir(inner_py_file.parent, '**/*.py') + with self.tick_twice(): + self.increment_mtime(inner_py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [inner_py_file]) + + @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed') + @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset()) + def test_overlapping_glob_recursive(self, mocked_modules, notify_mock): + py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py') + self.reloader.watch_dir(self.tempdir, '**/*.p*') + self.reloader.watch_dir(self.tempdir, '**/*.py*') + with self.tick_twice(): + self.increment_mtime(py_file) + self.assertEqual(notify_mock.call_count, 1) + self.assertCountEqual(notify_mock.call_args[0], [py_file]) + + +class BaseReloaderTests(ReloaderTests): + RELOADER_CLS = autoreload.BaseReloader + + def test_watch_without_absolute(self): + with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'): + self.reloader.watch_file('test.py') + + def test_watch_with_single_file(self): + self.reloader.watch_file(self.existing_file) + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + + def test_watch_with_glob(self): + self.reloader.watch_dir(self.tempdir, '*.py') + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + + def test_watch_files_with_recursive_glob(self): + inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py') + self.reloader.watch_dir(self.tempdir, '**/*.py') + watched_files = list(self.reloader.watched_files()) + self.assertIn(self.existing_file, watched_files) + self.assertIn(inner_file, watched_files) + + def test_run_loop_catches_stopiteration(self): + def mocked_tick(): + yield + + with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: + self.reloader.run_loop() + self.assertEqual(tick.call_count, 1) + + def test_run_loop_stop_and_return(self): + def mocked_tick(*args): + yield + self.reloader.stop() + return # Raises StopIteration + + with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick: + self.reloader.run_loop() + + self.assertEqual(tick.call_count, 1) + + def test_wait_for_apps_ready_checks_for_exception(self): + app_reg = Apps() + app_reg.ready_event.set() + # thread.is_alive() is False if it's not started. + dead_thread = threading.Thread() + self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread)) + + def test_wait_for_apps_ready_without_exception(self): + app_reg = Apps() + app_reg.ready_event.set() + thread = mock.MagicMock() + thread.is_alive.return_value = True + self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread)) + + +def skip_unless_watchman_available(): + try: + autoreload.WatchmanReloader.check_availability() + except WatchmanUnavailable as e: + return skip('Watchman unavailable: %s' % e) + return lambda func: func + + +@skip_unless_watchman_available() +class WatchmanReloaderTests(ReloaderTests, IntegrationTests): + RELOADER_CLS = autoreload.WatchmanReloader + + def test_watch_glob_ignores_non_existing_directories_two_levels(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*']) + self.assertFalse(mocked_subscribe.called) + + def test_watch_glob_uses_existing_parent_directories(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*']) + self.assertSequenceEqual( + mocked_subscribe.call_args[0], + [ + self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir, + ['anyof', ['match', 'does_not_exist/*', 'wholename']] + ] + ) + + def test_watch_glob_multiple_patterns(self): + with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe: + self.reloader._watch_glob(self.tempdir, ['*', '*.py']) + self.assertSequenceEqual( + mocked_subscribe.call_args[0], + [ + self.tempdir, 'glob:%s' % self.tempdir, + ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']] + ] + ) + + def test_watched_roots_contains_files(self): + paths = self.reloader.watched_roots([self.existing_file]) + self.assertIn(self.existing_file.parent, paths) + + def test_watched_roots_contains_directory_globs(self): + self.reloader.watch_dir(self.tempdir, '*.py') + paths = self.reloader.watched_roots([]) + self.assertIn(self.tempdir, paths) + + def test_watched_roots_contains_sys_path(self): + with extend_sys_path(str(self.tempdir)): + paths = self.reloader.watched_roots([]) + self.assertIn(self.tempdir, paths) + + def test_check_server_status(self): + self.assertTrue(self.reloader.check_server_status()) + + def test_check_server_status_raises_error(self): + with mock.patch.object(self.reloader.client, 'query') as mocked_query: + mocked_query.side_effect = Exception() + with self.assertRaises(autoreload.WatchmanUnavailable): + self.reloader.check_server_status() + + @mock.patch('pywatchman.client') + def test_check_availability(self, mocked_client): + mocked_client().capabilityCheck.side_effect = Exception() + with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'): + self.RELOADER_CLS.check_availability() + + @mock.patch('pywatchman.client') + def test_check_availability_lower_version(self, mocked_client): + mocked_client().capabilityCheck.return_value = {'version': '4.8.10'} + with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'): + self.RELOADER_CLS.check_availability() + + def test_pywatchman_not_available(self): + with mock.patch.object(autoreload, 'pywatchman') as mocked: + mocked.__bool__.return_value = False + with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'): + self.RELOADER_CLS.check_availability() + + def test_update_watches_raises_exceptions(self): + class TestException(Exception): + pass + + with mock.patch.object(self.reloader, '_update_watches') as mocked_watches: + with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status: + mocked_watches.side_effect = TestException() + mocked_server_status.return_value = True + with self.assertRaises(TestException): + self.reloader.update_watches() + self.assertIsInstance(mocked_server_status.call_args[0][0], TestException) + + +class StatReloaderTests(ReloaderTests, IntegrationTests): + RELOADER_CLS = autoreload.StatReloader + + def setUp(self): + super().setUp() + # Shorten the sleep time to speed up tests. + self.reloader.SLEEP_TIME = 0.01 + + def test_snapshot_files_ignores_missing_files(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]): + self.assertEqual(dict(self.reloader.snapshot_files()), {}) + + def test_snapshot_files_updates(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]): + snapshot1 = dict(self.reloader.snapshot_files()) + self.assertIn(self.existing_file, snapshot1) + self.increment_mtime(self.existing_file) + snapshot2 = dict(self.reloader.snapshot_files()) + self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file]) + + def test_does_not_fire_without_changes(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + mtime = self.existing_file.stat().st_mtime + initial_snapshot = {self.existing_file: mtime} + second_snapshot = self.reloader.loop_files(initial_snapshot, time.time()) + self.assertEqual(second_snapshot, {}) + notifier.assert_not_called() + + def test_fires_when_created(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + self.nonexistant_file.touch() + mtime = self.nonexistant_file.stat().st_mtime + second_snapshot = self.reloader.loop_files({}, mtime - 1) + self.assertCountEqual(second_snapshot.keys(), [self.nonexistant_file]) + notifier.assert_called_once_with(self.nonexistant_file) + + def test_fires_with_changes(self): + with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \ + mock.patch.object(self.reloader, 'notify_file_changed') as notifier: + initial_snapshot = {self.existing_file: 1} + second_snapshot = self.reloader.loop_files(initial_snapshot, time.time()) + notifier.assert_called_once_with(self.existing_file) + self.assertCountEqual(second_snapshot.keys(), [self.existing_file]) |
