summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
authorTom Forbes <tom@tomforb.es>2019-01-14 01:33:47 +0000
committerTim Graham <timograham@gmail.com>2019-01-13 20:33:47 -0500
commitc8720e7696ca41f3262d5369365cc1bd72a216ca (patch)
tree1fc858e37415196f06982d2af32f98e29044cde1 /tests
parenta02a6fd5805f9f0e613b9951249555876b8c4041 (diff)
Fixed #27685 -- Added watchman support to the autoreloader.
Removed support for pyinotify (refs #9722).
Diffstat (limited to 'tests')
-rw-r--r--tests/apps/tests.py3
-rw-r--r--tests/i18n/tests.py68
-rw-r--r--tests/requirements/py3.txt1
-rw-r--r--tests/utils_tests/locale/nl/LC_MESSAGES/django.mobin367 -> 0 bytes
-rw-r--r--tests/utils_tests/locale/nl/LC_MESSAGES/django.po17
-rw-r--r--tests/utils_tests/test_autoreload.py768
6 files changed, 647 insertions, 210 deletions
diff --git a/tests/apps/tests.py b/tests/apps/tests.py
index cd22a4d45c..566aec60c3 100644
--- a/tests/apps/tests.py
+++ b/tests/apps/tests.py
@@ -48,6 +48,9 @@ class AppsTests(SimpleTestCase):
self.assertIs(apps.ready, True)
# Non-master app registries are populated in __init__.
self.assertIs(Apps().ready, True)
+ # The condition is set when apps are ready
+ self.assertIs(apps.ready_event.is_set(), True)
+ self.assertIs(Apps().ready_event.is_set(), True)
def test_bad_app_config(self):
"""
diff --git a/tests/i18n/tests.py b/tests/i18n/tests.py
index 7b54089cf2..2377c8992e 100644
--- a/tests/i18n/tests.py
+++ b/tests/i18n/tests.py
@@ -7,9 +7,12 @@ import re
import tempfile
from contextlib import contextmanager
from importlib import import_module
+from pathlib import Path
from threading import local
from unittest import mock
+import _thread
+
from django import forms
from django.apps import AppConfig
from django.conf import settings
@@ -33,6 +36,9 @@ from django.utils.translation import (
npgettext, npgettext_lazy, pgettext, to_language, to_locale, trans_null,
trans_real, ugettext, ugettext_lazy, ungettext, ungettext_lazy,
)
+from django.utils.translation.reloader import (
+ translation_file_changed, watch_for_translation_changes,
+)
from .forms import CompanyForm, I18nForm, SelectDateForm
from .models import Company, TestModel
@@ -1790,3 +1796,65 @@ class NonDjangoLanguageTests(SimpleTestCase):
def test_plural_non_django_language(self):
self.assertEqual(get_language(), 'xyz')
self.assertEqual(ngettext('year', 'years', 2), 'years')
+
+
+@override_settings(USE_I18N=True)
+class WatchForTranslationChangesTests(SimpleTestCase):
+ @override_settings(USE_I18N=False)
+ def test_i18n_disabled(self):
+ mocked_sender = mock.MagicMock()
+ watch_for_translation_changes(mocked_sender)
+ mocked_sender.watch_dir.assert_not_called()
+
+ def test_i18n_enabled(self):
+ mocked_sender = mock.MagicMock()
+ watch_for_translation_changes(mocked_sender)
+ self.assertGreater(mocked_sender.watch_dir.call_count, 1)
+
+ def test_i18n_locale_paths(self):
+ mocked_sender = mock.MagicMock()
+ with tempfile.TemporaryDirectory() as app_dir:
+ with self.settings(LOCALE_PATHS=[app_dir]):
+ watch_for_translation_changes(mocked_sender)
+ mocked_sender.watch_dir.assert_any_call(Path(app_dir), '**/*.mo')
+
+ def test_i18n_app_dirs(self):
+ mocked_sender = mock.MagicMock()
+ with self.settings(INSTALLED_APPS=['tests.i18n.sampleproject']):
+ watch_for_translation_changes(mocked_sender)
+ project_dir = Path(__file__).parent / 'sampleproject' / 'locale'
+ mocked_sender.watch_dir.assert_any_call(project_dir, '**/*.mo')
+
+ def test_i18n_local_locale(self):
+ mocked_sender = mock.MagicMock()
+ watch_for_translation_changes(mocked_sender)
+ locale_dir = Path(__file__).parent / 'locale'
+ mocked_sender.watch_dir.assert_any_call(locale_dir, '**/*.mo')
+
+
+class TranslationFileChangedTests(SimpleTestCase):
+ def setUp(self):
+ self.gettext_translations = gettext_module._translations.copy()
+ self.trans_real_translations = trans_real._translations.copy()
+
+ def tearDown(self):
+ gettext._translations = self.gettext_translations
+ trans_real._translations = self.trans_real_translations
+
+ def test_ignores_non_mo_files(self):
+ gettext_module._translations = {'foo': 'bar'}
+ path = Path('test.py')
+ self.assertIsNone(translation_file_changed(None, path))
+ self.assertEqual(gettext_module._translations, {'foo': 'bar'})
+
+ def test_resets_cache_with_mo_files(self):
+ gettext_module._translations = {'foo': 'bar'}
+ trans_real._translations = {'foo': 'bar'}
+ trans_real._default = 1
+ trans_real._active = False
+ path = Path('test.mo')
+ self.assertIs(translation_file_changed(None, path), True)
+ self.assertEqual(gettext_module._translations, {})
+ self.assertEqual(trans_real._translations, {})
+ self.assertIsNone(trans_real._default)
+ self.assertIsInstance(trans_real._active, _thread._local)
diff --git a/tests/requirements/py3.txt b/tests/requirements/py3.txt
index cc84522ca0..3f0a01e164 100644
--- a/tests/requirements/py3.txt
+++ b/tests/requirements/py3.txt
@@ -9,6 +9,7 @@ Pillow != 5.4.0
pylibmc; sys.platform != 'win32'
python-memcached >= 1.59
pytz
+pywatchman; sys.platform != 'win32'
PyYAML
selenium
sqlparse
diff --git a/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo b/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo
deleted file mode 100644
index 3ead8f2a31..0000000000
--- a/tests/utils_tests/locale/nl/LC_MESSAGES/django.mo
+++ /dev/null
Binary files differ
diff --git a/tests/utils_tests/locale/nl/LC_MESSAGES/django.po b/tests/utils_tests/locale/nl/LC_MESSAGES/django.po
deleted file mode 100644
index 6633f12b39..0000000000
--- a/tests/utils_tests/locale/nl/LC_MESSAGES/django.po
+++ /dev/null
@@ -1,17 +0,0 @@
-# SOME DESCRIPTIVE TITLE.
-# Copyright (C) YEAR THE PACKAGE'S COPYRIGHT HOLDER
-# This file is distributed under the same license as the PACKAGE package.
-# FIRST AUTHOR <EMAIL@ADDRESS>, YEAR.
-#
-#, fuzzy
-msgid ""
-msgstr ""
-"Project-Id-Version: PACKAGE VERSION\n"
-"Report-Msgid-Bugs-To: \n"
-"POT-Creation-Date: 2007-09-15 19:15+0200\n"
-"PO-Revision-Date: 2010-05-12 12:41-0300\n"
-"Last-Translator: FULL NAME <EMAIL@ADDRESS>\n"
-"Language-Team: LANGUAGE <LL@li.org>\n"
-"MIME-Version: 1.0\n"
-"Content-Type: text/plain; charset=UTF-8\n"
-"Content-Transfer-Encoding: 8bit\n"
diff --git a/tests/utils_tests/test_autoreload.py b/tests/utils_tests/test_autoreload.py
index 486d62cd18..6aa272dd9a 100644
--- a/tests/utils_tests/test_autoreload.py
+++ b/tests/utils_tests/test_autoreload.py
@@ -1,257 +1,279 @@
-import gettext
+import contextlib
import os
+import py_compile
import shutil
+import sys
import tempfile
+import threading
+import time
+import zipfile
from importlib import import_module
-from unittest import mock
+from pathlib import Path
+from unittest import mock, skip
-import _thread
-
-from django import conf
-from django.contrib import admin
-from django.test import SimpleTestCase, override_settings
+from django.apps.registry import Apps
+from django.test import SimpleTestCase
from django.test.utils import extend_sys_path
from django.utils import autoreload
-from django.utils.translation import trans_real
-
-LOCALE_PATH = os.path.join(os.path.dirname(__file__), 'locale')
+from django.utils.autoreload import WatchmanUnavailable
-class TestFilenameGenerator(SimpleTestCase):
+class TestIterModulesAndFiles(SimpleTestCase):
+ def import_and_cleanup(self, name):
+ import_module(name)
+ self.addCleanup(lambda: sys.path_importer_cache.clear())
+ self.addCleanup(lambda: sys.modules.pop(name, None))
def clear_autoreload_caches(self):
- autoreload._cached_modules = set()
- autoreload._cached_filenames = []
+ autoreload.iter_modules_and_files.cache_clear()
def assertFileFound(self, filename):
+ # Some temp directories are symlinks. Python resolves these fully while
+ # importing.
+ resolved_filename = filename.resolve()
self.clear_autoreload_caches()
# Test uncached access
- self.assertIn(filename, autoreload.gen_filenames())
+ self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
# Test cached access
- self.assertIn(filename, autoreload.gen_filenames())
+ self.assertIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
+ self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
def assertFileNotFound(self, filename):
+ resolved_filename = filename.resolve()
self.clear_autoreload_caches()
# Test uncached access
- self.assertNotIn(filename, autoreload.gen_filenames())
- # Test cached access
- self.assertNotIn(filename, autoreload.gen_filenames())
-
- def assertFileFoundOnlyNew(self, filename):
- self.clear_autoreload_caches()
- # Test uncached access
- self.assertIn(filename, autoreload.gen_filenames(only_new=True))
+ self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
# Test cached access
- self.assertNotIn(filename, autoreload.gen_filenames(only_new=True))
-
- def test_django_locales(self):
- """
- gen_filenames() yields the built-in Django locale files.
- """
- django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale')
- django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo')
- self.assertFileFound(django_mo)
-
- @override_settings(LOCALE_PATHS=[LOCALE_PATH])
- def test_locale_paths_setting(self):
- """
- gen_filenames also yields from LOCALE_PATHS locales.
- """
- locale_paths_mo = os.path.join(LOCALE_PATH, 'nl', 'LC_MESSAGES', 'django.mo')
- self.assertFileFound(locale_paths_mo)
-
- @override_settings(INSTALLED_APPS=[])
- def test_project_root_locale(self):
- """
- gen_filenames() also yields from the current directory (project root).
- """
- old_cwd = os.getcwd()
- os.chdir(os.path.dirname(__file__))
- current_dir = os.path.join(os.path.dirname(__file__), 'locale')
- current_dir_mo = os.path.join(current_dir, 'nl', 'LC_MESSAGES', 'django.mo')
- try:
- self.assertFileFound(current_dir_mo)
- finally:
- os.chdir(old_cwd)
-
- @override_settings(INSTALLED_APPS=['django.contrib.admin'])
- def test_app_locales(self):
- """
- gen_filenames() also yields from locale dirs in installed apps.
- """
- admin_dir = os.path.join(os.path.dirname(admin.__file__), 'locale')
- admin_mo = os.path.join(admin_dir, 'nl', 'LC_MESSAGES', 'django.mo')
- self.assertFileFound(admin_mo)
+ self.assertNotIn(resolved_filename, list(autoreload.iter_all_python_module_files()))
+ self.assertEqual(autoreload.iter_modules_and_files.cache_info().hits, 1)
- @override_settings(USE_I18N=False)
- def test_no_i18n(self):
- """
- If i18n machinery is disabled, there is no need for watching the
- locale files.
- """
- django_dir = os.path.join(os.path.dirname(conf.__file__), 'locale')
- django_mo = os.path.join(django_dir, 'nl', 'LC_MESSAGES', 'django.mo')
- self.assertFileNotFound(django_mo)
-
- def test_paths_are_native_strings(self):
- for filename in autoreload.gen_filenames():
- self.assertIsInstance(filename, str)
-
- def test_only_new_files(self):
- """
- When calling a second time gen_filenames with only_new = True, only
- files from newly loaded modules should be given.
- """
+ def temporary_file(self, filename):
dirname = tempfile.mkdtemp()
- filename = os.path.join(dirname, 'test_only_new_module.py')
self.addCleanup(shutil.rmtree, dirname)
- with open(filename, 'w'):
- pass
-
- # Test uncached access
- self.clear_autoreload_caches()
- filenames = set(autoreload.gen_filenames(only_new=True))
- filenames_reference = set(autoreload.gen_filenames())
- self.assertEqual(filenames, filenames_reference)
-
- # Test cached access: no changes
- filenames = set(autoreload.gen_filenames(only_new=True))
- self.assertEqual(filenames, set())
+ return Path(dirname) / filename
- # Test cached access: add a module
- with extend_sys_path(dirname):
- import_module('test_only_new_module')
- filenames = set(autoreload.gen_filenames(only_new=True))
- self.assertEqual(filenames, {filename})
+ def test_paths_are_pathlib_instances(self):
+ for filename in autoreload.iter_all_python_module_files():
+ self.assertIsInstance(filename, Path)
- def test_deleted_removed(self):
+ def test_file_added(self):
"""
- When a file is deleted, gen_filenames() no longer returns it.
+ When a file is added, it's returned by iter_all_python_module_files().
"""
- dirname = tempfile.mkdtemp()
- filename = os.path.join(dirname, 'test_deleted_removed_module.py')
- self.addCleanup(shutil.rmtree, dirname)
- with open(filename, 'w'):
- pass
+ filename = self.temporary_file('test_deleted_removed_module.py')
+ filename.touch()
- with extend_sys_path(dirname):
- import_module('test_deleted_removed_module')
- self.assertFileFound(filename)
+ with extend_sys_path(str(filename.parent)):
+ self.import_and_cleanup('test_deleted_removed_module')
- os.unlink(filename)
- self.assertFileNotFound(filename)
+ self.assertFileFound(filename.absolute())
def test_check_errors(self):
"""
When a file containing an error is imported in a function wrapped by
check_errors(), gen_filenames() returns it.
"""
- dirname = tempfile.mkdtemp()
- filename = os.path.join(dirname, 'test_syntax_error.py')
- self.addCleanup(shutil.rmtree, dirname)
- with open(filename, 'w') as f:
- f.write("Ceci n'est pas du Python.")
+ filename = self.temporary_file('test_syntax_error.py')
+ filename.write_text("Ceci n'est pas du Python.")
- with extend_sys_path(dirname):
+ with extend_sys_path(str(filename.parent)):
with self.assertRaises(SyntaxError):
autoreload.check_errors(import_module)('test_syntax_error')
self.assertFileFound(filename)
- def test_check_errors_only_new(self):
- """
- When a file containing an error is imported in a function wrapped by
- check_errors(), gen_filenames(only_new=True) returns it.
- """
- dirname = tempfile.mkdtemp()
- filename = os.path.join(dirname, 'test_syntax_error.py')
- self.addCleanup(shutil.rmtree, dirname)
- with open(filename, 'w') as f:
- f.write("Ceci n'est pas du Python.")
-
- with extend_sys_path(dirname):
- with self.assertRaises(SyntaxError):
- autoreload.check_errors(import_module)('test_syntax_error')
- self.assertFileFoundOnlyNew(filename)
-
def test_check_errors_catches_all_exceptions(self):
"""
Since Python may raise arbitrary exceptions when importing code,
check_errors() must catch Exception, not just some subclasses.
"""
- dirname = tempfile.mkdtemp()
- filename = os.path.join(dirname, 'test_exception.py')
- self.addCleanup(shutil.rmtree, dirname)
- with open(filename, 'w') as f:
- f.write("raise Exception")
-
- with extend_sys_path(dirname):
+ filename = self.temporary_file('test_exception.py')
+ filename.write_text('raise Exception')
+ with extend_sys_path(str(filename.parent)):
with self.assertRaises(Exception):
autoreload.check_errors(import_module)('test_exception')
self.assertFileFound(filename)
-
-class CleanFilesTests(SimpleTestCase):
- TEST_MAP = {
- # description: (input_file_list, expected_returned_file_list)
- 'falsies': ([None, False], []),
- 'pycs': (['myfile.pyc'], ['myfile.py']),
- 'pyos': (['myfile.pyo'], ['myfile.py']),
- '$py.class': (['myclass$py.class'], ['myclass.py']),
- 'combined': (
- [None, 'file1.pyo', 'file2.pyc', 'myclass$py.class'],
- ['file1.py', 'file2.py', 'myclass.py'],
- )
- }
-
- def _run_tests(self, mock_files_exist=True):
- with mock.patch('django.utils.autoreload.os.path.exists', return_value=mock_files_exist):
- for description, values in self.TEST_MAP.items():
- filenames, expected_returned_filenames = values
- self.assertEqual(
- autoreload.clean_files(filenames),
- expected_returned_filenames if mock_files_exist else [],
- msg='{} failed for input file list: {}; returned file list: {}'.format(
- description, filenames, expected_returned_filenames
- ),
- )
-
- def test_files_exist(self):
+ def test_zip_reload(self):
"""
- If the file exists, any compiled files (pyc, pyo, $py.class) are
- transformed as their source files.
+ Modules imported from zipped files have their archive location included
+ in the result.
"""
- self._run_tests()
+ zip_file = self.temporary_file('zip_import.zip')
+ with zipfile.ZipFile(str(zip_file), 'w', zipfile.ZIP_DEFLATED) as zipf:
+ zipf.writestr('test_zipped_file.py', '')
- def test_files_do_not_exist(self):
- """
- If the files don't exist, they aren't in the returned file list.
- """
- self._run_tests(mock_files_exist=False)
+ with extend_sys_path(str(zip_file)):
+ self.import_and_cleanup('test_zipped_file')
+ self.assertFileFound(zip_file)
+ def test_bytecode_conversion_to_source(self):
+ """.pyc and .pyo files are included in the files list."""
+ filename = self.temporary_file('test_compiled.py')
+ filename.touch()
+ compiled_file = Path(py_compile.compile(str(filename), str(filename.with_suffix('.pyc'))))
+ filename.unlink()
+ with extend_sys_path(str(compiled_file.parent)):
+ self.import_and_cleanup('test_compiled')
+ self.assertFileFound(compiled_file)
-class ResetTranslationsTests(SimpleTestCase):
+class TestCommonRoots(SimpleTestCase):
+ def test_common_roots(self):
+ paths = (
+ Path('/first/second'),
+ Path('/first/second/third'),
+ Path('/first/'),
+ Path('/root/first/'),
+ )
+ results = autoreload.common_roots(paths)
+ self.assertCountEqual(results, [Path('/first/'), Path('/root/first/')])
+
+
+class TestSysPathDirectories(SimpleTestCase):
def setUp(self):
- self.gettext_translations = gettext._translations.copy()
- self.trans_real_translations = trans_real._translations.copy()
+ self._directory = tempfile.TemporaryDirectory()
+ self.directory = Path(self._directory.name).resolve().absolute()
+ self.file = self.directory / 'test'
+ self.file.touch()
def tearDown(self):
- gettext._translations = self.gettext_translations
- trans_real._translations = self.trans_real_translations
+ self._directory.cleanup()
+
+ def test_sys_paths_with_directories(self):
+ with extend_sys_path(str(self.file)):
+ paths = list(autoreload.sys_path_directories())
+ self.assertIn(self.file.parent, paths)
+
+ def test_sys_paths_non_existing(self):
+ nonexistant_file = Path(self.directory.name) / 'does_not_exist'
+ with extend_sys_path(str(nonexistant_file)):
+ paths = list(autoreload.sys_path_directories())
+ self.assertNotIn(nonexistant_file, paths)
+ self.assertNotIn(nonexistant_file.parent, paths)
+
+ def test_sys_paths_absolute(self):
+ paths = list(autoreload.sys_path_directories())
+ self.assertTrue(all(p.is_absolute() for p in paths))
+
+ def test_sys_paths_directories(self):
+ with extend_sys_path(str(self.directory)):
+ paths = list(autoreload.sys_path_directories())
+ self.assertIn(self.directory, paths)
+
+
+class GetReloaderTests(SimpleTestCase):
+ @mock.patch('django.utils.autoreload.WatchmanReloader')
+ def test_watchman_unavailable(self, mocked_watchman):
+ mocked_watchman.check_availability.side_effect = WatchmanUnavailable
+ self.assertIsInstance(autoreload.get_reloader(), autoreload.StatReloader)
+
+ @mock.patch.object(autoreload.WatchmanReloader, 'check_availability')
+ def test_watchman_available(self, mocked_available):
+ # If WatchmanUnavailable isn't raised, Watchman will be chosen.
+ mocked_available.return_value = None
+ result = autoreload.get_reloader()
+ self.assertIsInstance(result, autoreload.WatchmanReloader)
+
- def test_resets_gettext(self):
- gettext._translations = {'foo': 'bar'}
- autoreload.reset_translations()
- self.assertEqual(gettext._translations, {})
+class RunWithReloaderTests(SimpleTestCase):
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
+ @mock.patch('django.utils.autoreload.get_reloader')
+ def test_swallows_keyboard_interrupt(self, mocked_get_reloader):
+ mocked_get_reloader.side_effect = KeyboardInterrupt()
+ autoreload.run_with_reloader(lambda: None) # No exception
- def test_resets_trans_real(self):
- trans_real._translations = {'foo': 'bar'}
- trans_real._default = 1
- trans_real._active = False
- autoreload.reset_translations()
- self.assertEqual(trans_real._translations, {})
- self.assertIsNone(trans_real._default)
- self.assertIsInstance(trans_real._active, _thread._local)
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'false'})
+ @mock.patch('django.utils.autoreload.restart_with_reloader')
+ def test_calls_sys_exit(self, mocked_restart_reloader):
+ mocked_restart_reloader.return_value = 1
+ with self.assertRaises(SystemExit) as exc:
+ autoreload.run_with_reloader(lambda: None)
+ self.assertEqual(exc.exception.code, 1)
+
+ @mock.patch.dict(os.environ, {autoreload.DJANGO_AUTORELOAD_ENV: 'true'})
+ @mock.patch('django.utils.autoreload.start_django')
+ @mock.patch('django.utils.autoreload.get_reloader')
+ def test_calls_start_django(self, mocked_reloader, mocked_start_django):
+ mocked_reloader.return_value = mock.sentinel.RELOADER
+ autoreload.run_with_reloader(mock.sentinel.METHOD)
+ self.assertEqual(mocked_start_django.call_count, 1)
+ self.assertSequenceEqual(
+ mocked_start_django.call_args[0],
+ [mock.sentinel.RELOADER, mock.sentinel.METHOD]
+ )
+
+
+class StartDjangoTests(SimpleTestCase):
+ @mock.patch('django.utils.autoreload.StatReloader')
+ def test_watchman_becomes_unavailable(self, mocked_stat):
+ mocked_stat.should_stop.return_value = True
+ fake_reloader = mock.MagicMock()
+ fake_reloader.should_stop = False
+ fake_reloader.run.side_effect = autoreload.WatchmanUnavailable()
+
+ autoreload.start_django(fake_reloader, lambda: None)
+ self.assertEqual(mocked_stat.call_count, 1)
+
+ @mock.patch('django.utils.autoreload.ensure_echo_on')
+ def test_echo_on_called(self, mocked_echo):
+ fake_reloader = mock.MagicMock()
+ autoreload.start_django(fake_reloader, lambda: None)
+ self.assertEqual(mocked_echo.call_count, 1)
+
+ @mock.patch('django.utils.autoreload.check_errors')
+ def test_check_errors_called(self, mocked_check_errors):
+ fake_method = mock.MagicMock(return_value=None)
+ fake_reloader = mock.MagicMock()
+ autoreload.start_django(fake_reloader, fake_method)
+ self.assertCountEqual(mocked_check_errors.call_args[0], [fake_method])
+
+ @mock.patch('threading.Thread')
+ @mock.patch('django.utils.autoreload.check_errors')
+ def test_starts_thread_with_args(self, mocked_check_errors, mocked_thread):
+ fake_reloader = mock.MagicMock()
+ fake_main_func = mock.MagicMock()
+ fake_thread = mock.MagicMock()
+ mocked_check_errors.return_value = fake_main_func
+ mocked_thread.return_value = fake_thread
+ autoreload.start_django(fake_reloader, fake_main_func, 123, abc=123)
+ self.assertEqual(mocked_thread.call_count, 1)
+ self.assertEqual(
+ mocked_thread.call_args[1],
+ {'target': fake_main_func, 'args': (123,), 'kwargs': {'abc': 123}}
+ )
+ self.assertSequenceEqual(fake_thread.setDaemon.call_args[0], [True])
+ self.assertTrue(fake_thread.start.called)
+
+
+class TestCheckErrors(SimpleTestCase):
+ def test_mutates_error_files(self):
+ fake_method = mock.MagicMock(side_effect=RuntimeError())
+ wrapped = autoreload.check_errors(fake_method)
+ with mock.patch.object(autoreload, '_error_files') as mocked_error_files:
+ with self.assertRaises(RuntimeError):
+ wrapped()
+ self.assertEqual(mocked_error_files.append.call_count, 1)
+
+
+class TestRaiseLastException(SimpleTestCase):
+ @mock.patch('django.utils.autoreload._exception', None)
+ def test_no_exception(self):
+ # Should raise no exception if _exception is None
+ autoreload.raise_last_exception()
+
+ def test_raises_exception(self):
+ class MyException(Exception):
+ pass
+
+ # Create an exception
+ try:
+ raise MyException('Test Message')
+ except MyException:
+ exc_info = sys.exc_info()
+
+ with mock.patch('django.utils.autoreload._exception', exc_info):
+ with self.assertRaises(MyException, msg='Test Message'):
+ autoreload.raise_last_exception()
class RestartWithReloaderTests(SimpleTestCase):
@@ -286,3 +308,363 @@ class RestartWithReloaderTests(SimpleTestCase):
autoreload.restart_with_reloader()
self.assertEqual(mock_call.call_count, 1)
self.assertEqual(mock_call.call_args[0][0], [self.executable, '-Wall', '-m', 'django'] + argv[1:])
+
+
+class ReloaderTests(SimpleTestCase):
+ RELOADER_CLS = None
+
+ def setUp(self):
+ self._tempdir = tempfile.TemporaryDirectory()
+ self.tempdir = Path(self._tempdir.name).resolve().absolute()
+ self.existing_file = self.ensure_file(self.tempdir / 'test.py')
+ self.nonexistant_file = (self.tempdir / 'does_not_exist.py').absolute()
+ self.reloader = self.RELOADER_CLS()
+
+ def tearDown(self):
+ self._tempdir.cleanup()
+ self.reloader.stop()
+
+ def ensure_file(self, path):
+ path.parent.mkdir(exist_ok=True, parents=True)
+ path.touch()
+ # On Linux and Windows updating the mtime of a file using touch() will set a timestamp
+ # value that is in the past, as the time value for the last kernel tick is used rather
+ # than getting the correct absolute time.
+ # To make testing simpler set the mtime to be the observed time when this function is
+ # called.
+ self.set_mtime(path, time.time())
+ return path.absolute()
+
+ def set_mtime(self, fp, value):
+ os.utime(str(fp), (value, value))
+
+ def increment_mtime(self, fp, by=1):
+ current_time = time.time()
+ self.set_mtime(fp, current_time + by)
+
+ @contextlib.contextmanager
+ def tick_twice(self):
+ ticker = self.reloader.tick()
+ next(ticker)
+ yield
+ next(ticker)
+
+
+class IntegrationTests:
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_file(self, mocked_modules, notify_mock):
+ self.reloader.watch_file(self.existing_file)
+ with self.tick_twice():
+ self.increment_mtime(self.existing_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_nonexistant_file(self, mocked_modules, notify_mock):
+ self.reloader.watch_file(self.nonexistant_file)
+ with self.tick_twice():
+ self.ensure_file(self.nonexistant_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [self.nonexistant_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_nonexistant_file_in_non_existing_directory(self, mocked_modules, notify_mock):
+ non_existing_directory = self.tempdir / 'non_existing_dir'
+ nonexistant_file = non_existing_directory / 'test'
+ self.reloader.watch_file(nonexistant_file)
+ with self.tick_twice():
+ self.ensure_file(nonexistant_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_glob(self, mocked_modules, notify_mock):
+ non_py_file = self.ensure_file(self.tempdir / 'non_py_file')
+ self.reloader.watch_dir(self.tempdir, '*.py')
+ with self.tick_twice():
+ self.increment_mtime(non_py_file)
+ self.increment_mtime(self.existing_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_glob_non_existing_directory(self, mocked_modules, notify_mock):
+ non_existing_directory = self.tempdir / 'does_not_exist'
+ nonexistant_file = non_existing_directory / 'test.py'
+ self.reloader.watch_dir(non_existing_directory, '*.py')
+ with self.tick_twice():
+ self.ensure_file(nonexistant_file)
+ self.set_mtime(nonexistant_file, time.time())
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [nonexistant_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_multiple_globs(self, mocked_modules, notify_mock):
+ self.ensure_file(self.tempdir / 'x.test')
+ self.reloader.watch_dir(self.tempdir, '*.py')
+ self.reloader.watch_dir(self.tempdir, '*.test')
+ with self.tick_twice():
+ self.increment_mtime(self.existing_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_overlapping_globs(self, mocked_modules, notify_mock):
+ self.reloader.watch_dir(self.tempdir, '*.py')
+ self.reloader.watch_dir(self.tempdir, '*.p*')
+ with self.tick_twice():
+ self.increment_mtime(self.existing_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [self.existing_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_glob_recursive(self, mocked_modules, notify_mock):
+ non_py_file = self.ensure_file(self.tempdir / 'dir' / 'non_py_file')
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
+ with self.tick_twice():
+ self.increment_mtime(non_py_file)
+ self.increment_mtime(py_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [py_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_multiple_recursive_globs(self, mocked_modules, notify_mock):
+ non_py_file = self.ensure_file(self.tempdir / 'dir' / 'test.txt')
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
+ self.reloader.watch_dir(self.tempdir, '**/*.txt')
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
+ with self.tick_twice():
+ self.increment_mtime(non_py_file)
+ self.increment_mtime(py_file)
+ self.assertEqual(notify_mock.call_count, 2)
+ self.assertCountEqual(notify_mock.call_args_list, [mock.call(py_file), mock.call(non_py_file)])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_nested_glob_recursive(self, mocked_modules, notify_mock):
+ inner_py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
+ self.reloader.watch_dir(inner_py_file.parent, '**/*.py')
+ with self.tick_twice():
+ self.increment_mtime(inner_py_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [inner_py_file])
+
+ @mock.patch('django.utils.autoreload.BaseReloader.notify_file_changed')
+ @mock.patch('django.utils.autoreload.iter_all_python_module_files', return_value=frozenset())
+ def test_overlapping_glob_recursive(self, mocked_modules, notify_mock):
+ py_file = self.ensure_file(self.tempdir / 'dir' / 'file.py')
+ self.reloader.watch_dir(self.tempdir, '**/*.p*')
+ self.reloader.watch_dir(self.tempdir, '**/*.py*')
+ with self.tick_twice():
+ self.increment_mtime(py_file)
+ self.assertEqual(notify_mock.call_count, 1)
+ self.assertCountEqual(notify_mock.call_args[0], [py_file])
+
+
+class BaseReloaderTests(ReloaderTests):
+ RELOADER_CLS = autoreload.BaseReloader
+
+ def test_watch_without_absolute(self):
+ with self.assertRaisesMessage(ValueError, 'test.py must be absolute.'):
+ self.reloader.watch_file('test.py')
+
+ def test_watch_with_single_file(self):
+ self.reloader.watch_file(self.existing_file)
+ watched_files = list(self.reloader.watched_files())
+ self.assertIn(self.existing_file, watched_files)
+
+ def test_watch_with_glob(self):
+ self.reloader.watch_dir(self.tempdir, '*.py')
+ watched_files = list(self.reloader.watched_files())
+ self.assertIn(self.existing_file, watched_files)
+
+ def test_watch_files_with_recursive_glob(self):
+ inner_file = self.ensure_file(self.tempdir / 'test' / 'test.py')
+ self.reloader.watch_dir(self.tempdir, '**/*.py')
+ watched_files = list(self.reloader.watched_files())
+ self.assertIn(self.existing_file, watched_files)
+ self.assertIn(inner_file, watched_files)
+
+ def test_run_loop_catches_stopiteration(self):
+ def mocked_tick():
+ yield
+
+ with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
+ self.reloader.run_loop()
+ self.assertEqual(tick.call_count, 1)
+
+ def test_run_loop_stop_and_return(self):
+ def mocked_tick(*args):
+ yield
+ self.reloader.stop()
+ return # Raises StopIteration
+
+ with mock.patch.object(self.reloader, 'tick', side_effect=mocked_tick) as tick:
+ self.reloader.run_loop()
+
+ self.assertEqual(tick.call_count, 1)
+
+ def test_wait_for_apps_ready_checks_for_exception(self):
+ app_reg = Apps()
+ app_reg.ready_event.set()
+ # thread.is_alive() is False if it's not started.
+ dead_thread = threading.Thread()
+ self.assertFalse(self.reloader.wait_for_apps_ready(app_reg, dead_thread))
+
+ def test_wait_for_apps_ready_without_exception(self):
+ app_reg = Apps()
+ app_reg.ready_event.set()
+ thread = mock.MagicMock()
+ thread.is_alive.return_value = True
+ self.assertTrue(self.reloader.wait_for_apps_ready(app_reg, thread))
+
+
+def skip_unless_watchman_available():
+ try:
+ autoreload.WatchmanReloader.check_availability()
+ except WatchmanUnavailable as e:
+ return skip('Watchman unavailable: %s' % e)
+ return lambda func: func
+
+
+@skip_unless_watchman_available()
+class WatchmanReloaderTests(ReloaderTests, IntegrationTests):
+ RELOADER_CLS = autoreload.WatchmanReloader
+
+ def test_watch_glob_ignores_non_existing_directories_two_levels(self):
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
+ self.reloader._watch_glob(self.tempdir / 'does_not_exist' / 'more', ['*'])
+ self.assertFalse(mocked_subscribe.called)
+
+ def test_watch_glob_uses_existing_parent_directories(self):
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
+ self.reloader._watch_glob(self.tempdir / 'does_not_exist', ['*'])
+ self.assertSequenceEqual(
+ mocked_subscribe.call_args[0],
+ [
+ self.tempdir, 'glob-parent-does_not_exist:%s' % self.tempdir,
+ ['anyof', ['match', 'does_not_exist/*', 'wholename']]
+ ]
+ )
+
+ def test_watch_glob_multiple_patterns(self):
+ with mock.patch.object(self.reloader, '_subscribe') as mocked_subscribe:
+ self.reloader._watch_glob(self.tempdir, ['*', '*.py'])
+ self.assertSequenceEqual(
+ mocked_subscribe.call_args[0],
+ [
+ self.tempdir, 'glob:%s' % self.tempdir,
+ ['anyof', ['match', '*', 'wholename'], ['match', '*.py', 'wholename']]
+ ]
+ )
+
+ def test_watched_roots_contains_files(self):
+ paths = self.reloader.watched_roots([self.existing_file])
+ self.assertIn(self.existing_file.parent, paths)
+
+ def test_watched_roots_contains_directory_globs(self):
+ self.reloader.watch_dir(self.tempdir, '*.py')
+ paths = self.reloader.watched_roots([])
+ self.assertIn(self.tempdir, paths)
+
+ def test_watched_roots_contains_sys_path(self):
+ with extend_sys_path(str(self.tempdir)):
+ paths = self.reloader.watched_roots([])
+ self.assertIn(self.tempdir, paths)
+
+ def test_check_server_status(self):
+ self.assertTrue(self.reloader.check_server_status())
+
+ def test_check_server_status_raises_error(self):
+ with mock.patch.object(self.reloader.client, 'query') as mocked_query:
+ mocked_query.side_effect = Exception()
+ with self.assertRaises(autoreload.WatchmanUnavailable):
+ self.reloader.check_server_status()
+
+ @mock.patch('pywatchman.client')
+ def test_check_availability(self, mocked_client):
+ mocked_client().capabilityCheck.side_effect = Exception()
+ with self.assertRaisesMessage(WatchmanUnavailable, 'Cannot connect to the watchman service'):
+ self.RELOADER_CLS.check_availability()
+
+ @mock.patch('pywatchman.client')
+ def test_check_availability_lower_version(self, mocked_client):
+ mocked_client().capabilityCheck.return_value = {'version': '4.8.10'}
+ with self.assertRaisesMessage(WatchmanUnavailable, 'Watchman 4.9 or later is required.'):
+ self.RELOADER_CLS.check_availability()
+
+ def test_pywatchman_not_available(self):
+ with mock.patch.object(autoreload, 'pywatchman') as mocked:
+ mocked.__bool__.return_value = False
+ with self.assertRaisesMessage(WatchmanUnavailable, 'pywatchman not installed.'):
+ self.RELOADER_CLS.check_availability()
+
+ def test_update_watches_raises_exceptions(self):
+ class TestException(Exception):
+ pass
+
+ with mock.patch.object(self.reloader, '_update_watches') as mocked_watches:
+ with mock.patch.object(self.reloader, 'check_server_status') as mocked_server_status:
+ mocked_watches.side_effect = TestException()
+ mocked_server_status.return_value = True
+ with self.assertRaises(TestException):
+ self.reloader.update_watches()
+ self.assertIsInstance(mocked_server_status.call_args[0][0], TestException)
+
+
+class StatReloaderTests(ReloaderTests, IntegrationTests):
+ RELOADER_CLS = autoreload.StatReloader
+
+ def setUp(self):
+ super().setUp()
+ # Shorten the sleep time to speed up tests.
+ self.reloader.SLEEP_TIME = 0.01
+
+ def test_snapshot_files_ignores_missing_files(self):
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]):
+ self.assertEqual(dict(self.reloader.snapshot_files()), {})
+
+ def test_snapshot_files_updates(self):
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]):
+ snapshot1 = dict(self.reloader.snapshot_files())
+ self.assertIn(self.existing_file, snapshot1)
+ self.increment_mtime(self.existing_file)
+ snapshot2 = dict(self.reloader.snapshot_files())
+ self.assertNotEqual(snapshot1[self.existing_file], snapshot2[self.existing_file])
+
+ def test_does_not_fire_without_changes(self):
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
+ mtime = self.existing_file.stat().st_mtime
+ initial_snapshot = {self.existing_file: mtime}
+ second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
+ self.assertEqual(second_snapshot, {})
+ notifier.assert_not_called()
+
+ def test_fires_when_created(self):
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.nonexistant_file]), \
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
+ self.nonexistant_file.touch()
+ mtime = self.nonexistant_file.stat().st_mtime
+ second_snapshot = self.reloader.loop_files({}, mtime - 1)
+ self.assertCountEqual(second_snapshot.keys(), [self.nonexistant_file])
+ notifier.assert_called_once_with(self.nonexistant_file)
+
+ def test_fires_with_changes(self):
+ with mock.patch.object(self.reloader, 'watched_files', return_value=[self.existing_file]), \
+ mock.patch.object(self.reloader, 'notify_file_changed') as notifier:
+ initial_snapshot = {self.existing_file: 1}
+ second_snapshot = self.reloader.loop_files(initial_snapshot, time.time())
+ notifier.assert_called_once_with(self.existing_file)
+ self.assertCountEqual(second_snapshot.keys(), [self.existing_file])