summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--django/core/cache/backends/filebased.py10
-rw-r--r--django/core/files/storage/filesystem.py13
-rw-r--r--django/utils/_os.py58
-rw-r--r--docs/releases/4.2.29.txt15
-rw-r--r--tests/utils_tests/test_os_utils.py166
5 files changed, 245 insertions, 17 deletions
diff --git a/django/core/cache/backends/filebased.py b/django/core/cache/backends/filebased.py
index 215fefbcc0..32a231d125 100644
--- a/django/core/cache/backends/filebased.py
+++ b/django/core/cache/backends/filebased.py
@@ -10,6 +10,7 @@ import zlib
from django.core.cache.backends.base import DEFAULT_TIMEOUT, BaseCache
from django.core.files import locks
from django.core.files.move import file_move_safe
+from django.utils._os import safe_makedirs
from django.utils.crypto import md5
@@ -114,13 +115,10 @@ class FileBasedCache(BaseCache):
self._delete(fname)
def _createdir(self):
- # Set the umask because os.makedirs() doesn't apply the "mode" argument
+ # Workaround because os.makedirs() doesn't apply the "mode" argument
# to intermediate-level directories.
- old_umask = os.umask(0o077)
- try:
- os.makedirs(self._dir, 0o700, exist_ok=True)
- finally:
- os.umask(old_umask)
+ # https://github.com/python/cpython/issues/86533
+ safe_makedirs(self._dir, mode=0o700, exist_ok=True)
def _key_to_file(self, key, version=None):
"""
diff --git a/django/core/files/storage/filesystem.py b/django/core/files/storage/filesystem.py
index 85fc4eff9f..90a7adde8e 100644
--- a/django/core/files/storage/filesystem.py
+++ b/django/core/files/storage/filesystem.py
@@ -6,7 +6,7 @@ from django.conf import settings
from django.core.files import File, locks
from django.core.files.move import file_move_safe
from django.core.signals import setting_changed
-from django.utils._os import safe_join
+from django.utils._os import safe_join, safe_makedirs
from django.utils.deconstruct import deconstructible
from django.utils.encoding import filepath_to_uri
from django.utils.functional import cached_property
@@ -74,15 +74,10 @@ class FileSystemStorage(Storage, StorageSettingsMixin):
directory = os.path.dirname(full_path)
try:
if self.directory_permissions_mode is not None:
- # Set the umask because os.makedirs() doesn't apply the "mode"
+ # Workaround because os.makedirs() doesn't apply the "mode"
# argument to intermediate-level directories.
- old_umask = os.umask(0o777 & ~self.directory_permissions_mode)
- try:
- os.makedirs(
- directory, self.directory_permissions_mode, exist_ok=True
- )
- finally:
- os.umask(old_umask)
+ # https://github.com/python/cpython/issues/86533
+ safe_makedirs(directory, self.directory_permissions_mode, exist_ok=True)
else:
os.makedirs(directory, exist_ok=True)
except FileExistsError:
diff --git a/django/utils/_os.py b/django/utils/_os.py
index f85fe09248..8054fadf4c 100644
--- a/django/utils/_os.py
+++ b/django/utils/_os.py
@@ -1,11 +1,67 @@
import os
import tempfile
-from os.path import abspath, dirname, join, normcase, sep
+from os.path import abspath, curdir, dirname, join, normcase, sep
from pathlib import Path
from django.core.exceptions import SuspiciousFileOperation
+# Copied verbatim (minus `os.path` fixes) from:
+# https://github.com/python/cpython/pull/23901.
+# Python versions >= PY315 may include this fix, so periodic checks are needed
+# to remove this vendored copy of `makedirs` once solved upstream.
+def makedirs(name, mode=0o777, exist_ok=False, *, parent_mode=None):
+ """makedirs(name [, mode=0o777][, exist_ok=False][, parent_mode=None])
+
+ Super-mkdir; create a leaf directory and all intermediate ones. Works like
+ mkdir, except that any intermediate path segment (not just the rightmost)
+ will be created if it does not exist. If the target directory already
+ exists, raise an OSError if exist_ok is False. Otherwise no exception is
+ raised. If parent_mode is not None, it will be used as the mode for any
+ newly-created, intermediate-level directories. Otherwise, intermediate
+ directories are created with the default permissions (respecting umask).
+ This is recursive.
+
+ """
+ head, tail = os.path.split(name)
+ if not tail:
+ head, tail = os.path.split(head)
+ if head and tail and not os.path.exists(head):
+ try:
+ if parent_mode is not None:
+ makedirs(
+ head, mode=parent_mode, exist_ok=exist_ok, parent_mode=parent_mode
+ )
+ else:
+ makedirs(head, exist_ok=exist_ok)
+ except FileExistsError:
+ # Defeats race condition when another thread created the path
+ pass
+ cdir = curdir
+ if isinstance(tail, bytes):
+ cdir = bytes(curdir, "ASCII")
+ if tail == cdir: # xxx/newdir/. exists if xxx/newdir exists
+ return
+ try:
+ os.mkdir(name, mode)
+ # PY315: The call to `chmod()` is not in the CPython proposed code.
+ # Apply `chmod()` after `mkdir()` to enforce the exact requested
+ # permissions, since the kernel masks the mode argument with the
+ # process umask. This guarantees consistent directory permissions
+ # without mutating global umask state.
+ os.chmod(name, mode)
+ except OSError:
+ # Cannot rely on checking for EEXIST, since the operating system
+ # could give priority to other errors like EACCES or EROFS
+ if not exist_ok or not os.path.isdir(name):
+ raise
+
+
+def safe_makedirs(name, mode=0o777, exist_ok=False):
+ """Create directories recursively with explicit `mode` on each level."""
+ makedirs(name=name, mode=mode, exist_ok=exist_ok, parent_mode=mode)
+
+
def safe_join(base, *paths):
"""
Join one or more path components to the base path component intelligently.
diff --git a/docs/releases/4.2.29.txt b/docs/releases/4.2.29.txt
index b780264929..71170a5763 100644
--- a/docs/releases/4.2.29.txt
+++ b/docs/releases/4.2.29.txt
@@ -28,3 +28,18 @@ the previous behavior of ``URLField.to_python()``.
This issue has severity "moderate" according to the :ref:`Django security
policy <security-disclosure>`.
+
+CVE-2026-25674: Potential incorrect permissions on newly created file system objects
+====================================================================================
+
+Django's file-system storage and file-based cache backends used the process
+``umask`` to control permissions when creating directories. In multi-threaded
+environments, one thread's temporary umask change can affect other threads'
+file and directory creation, resulting in file system objects being created
+with unintended permissions.
+
+Django now applies the requested permissions via :func:`~os.chmod` after
+:func:`~os.mkdir`, removing the dependency on the process-wide umask.
+
+This issue has severity "low" according to the :ref:`Django security policy
+<security-disclosure>`.
diff --git a/tests/utils_tests/test_os_utils.py b/tests/utils_tests/test_os_utils.py
index 7204167688..290e418e64 100644
--- a/tests/utils_tests/test_os_utils.py
+++ b/tests/utils_tests/test_os_utils.py
@@ -1,9 +1,173 @@
import os
+import stat
+import sys
+import tempfile
import unittest
from pathlib import Path
from django.core.exceptions import SuspiciousFileOperation
-from django.utils._os import safe_join, to_path
+from django.utils._os import safe_join, safe_makedirs, to_path
+
+
+class SafeMakeDirsTests(unittest.TestCase):
+ def setUp(self):
+ tmp = tempfile.TemporaryDirectory()
+ self.base = tmp.name
+ self.addCleanup(tmp.cleanup)
+
+ def assertDirMode(self, path, expected):
+ self.assertIs(os.path.isdir(path), True)
+ if sys.platform == "win32":
+ # Windows partially supports chmod: dirs always end up with 0o777.
+ expected = 0o777
+
+ # These tests assume a typical process umask (0o022 or similar): they
+ # create directories with modes like 0o755 and 0o700, which don't have
+ # group/world write bits, so a typical umask doesn't change the final
+ # permissions. On unexpected failures, check whether umask has changed.
+ self.assertEqual(stat.S_IMODE(os.stat(path).st_mode), expected)
+
+ def test_creates_directory_hierarchy_with_permissions(self):
+ path = os.path.join(self.base, "a", "b", "c")
+ safe_makedirs(path, mode=0o755)
+
+ self.assertDirMode(os.path.join(self.base, "a"), 0o755)
+ self.assertDirMode(os.path.join(self.base, "a", "b"), 0o755)
+ self.assertDirMode(path, 0o755)
+
+ def test_existing_directory_exist_ok(self):
+ path = os.path.join(self.base, "a")
+ os.mkdir(path, 0o700)
+
+ safe_makedirs(path, mode=0o755, exist_ok=True)
+
+ self.assertDirMode(path, 0o700)
+
+ def test_existing_directory_exist_ok_false_raises(self):
+ path = os.path.join(self.base, "a")
+ os.mkdir(path)
+
+ with self.assertRaises(FileExistsError):
+ safe_makedirs(path, mode=0o755, exist_ok=False)
+
+ def test_existing_file_at_target_raises(self):
+ path = os.path.join(self.base, "a")
+ with open(path, "w") as f:
+ f.write("x")
+
+ with self.assertRaises(FileExistsError):
+ safe_makedirs(path, mode=0o755, exist_ok=False)
+
+ with self.assertRaises(FileExistsError):
+ safe_makedirs(path, mode=0o755, exist_ok=True)
+
+ def test_file_in_intermediate_path_raises(self):
+ file_path = os.path.join(self.base, "a")
+ with open(file_path, "w") as f:
+ f.write("x")
+
+ path = os.path.join(file_path, "b")
+
+ expected = FileNotFoundError if sys.platform == "win32" else NotADirectoryError
+
+ with self.assertRaises(expected):
+ safe_makedirs(path, mode=0o755, exist_ok=False)
+
+ with self.assertRaises(expected):
+ safe_makedirs(path, mode=0o755, exist_ok=True)
+
+ def test_existing_parent_preserves_permissions(self):
+ a = os.path.join(self.base, "a")
+ b = os.path.join(a, "b")
+
+ os.mkdir(a, 0o700)
+
+ safe_makedirs(b, mode=0o755, exist_ok=False)
+
+ self.assertDirMode(a, 0o700)
+ self.assertDirMode(b, 0o755)
+
+ c = os.path.join(a, "c")
+ safe_makedirs(c, mode=0o750, exist_ok=True)
+
+ self.assertDirMode(a, 0o700)
+ self.assertDirMode(c, 0o750)
+
+ def test_path_is_normalized(self):
+ path = os.path.join(self.base, "a", "b", "..", "c")
+ safe_makedirs(path, mode=0o755)
+
+ self.assertDirMode(os.path.normpath(path), 0o755)
+ self.assertIs(os.path.isdir(os.path.join(self.base, "a", "c")), True)
+
+ def test_permissions_unaffected_by_process_umask(self):
+ path = os.path.join(self.base, "a", "b", "c")
+ # `umask()` returns the current mask, so it'll be restored on cleanup.
+ self.addCleanup(os.umask, os.umask(0o077))
+
+ safe_makedirs(path, mode=0o755)
+
+ self.assertDirMode(os.path.join(self.base, "a"), 0o755)
+ self.assertDirMode(os.path.join(self.base, "a", "b"), 0o755)
+ self.assertDirMode(path, 0o755)
+
+ def test_permissions_correct_despite_concurrent_umask_change(self):
+ path = os.path.join(self.base, "a", "b", "c")
+ original_mkdir = os.mkdir
+ # `umask()` returns the current mask, so it'll be restored on cleanup.
+ self.addCleanup(os.umask, os.umask(0o000))
+
+ def mkdir_changing_umask(p, mode):
+ # Simulate a concurrent thread changing the process umask.
+ os.umask(0o077)
+ original_mkdir(p, mode)
+
+ with unittest.mock.patch("os.mkdir", side_effect=mkdir_changing_umask):
+ safe_makedirs(path, mode=0o755)
+
+ self.assertDirMode(os.path.join(self.base, "a"), 0o755)
+ self.assertDirMode(os.path.join(self.base, "a", "b"), 0o755)
+ self.assertDirMode(path, 0o755)
+
+ def test_race_condition_exist_ok_false(self):
+ path = os.path.join(self.base, "a", "b")
+
+ original_mkdir = os.mkdir
+ call_count = [0]
+
+ # `safe_makedirs()` calls `os.mkdir()` for each level in the path.
+ # For path "a/b", mkdir is called twice: once for "a", once for "b".
+ def mkdir_with_race(p, mode):
+ call_count[0] += 1
+ if call_count[0] == 1:
+ original_mkdir(p, mode)
+ else:
+ raise FileExistsError(f"Directory exists: '{p}'")
+
+ with unittest.mock.patch("os.mkdir", side_effect=mkdir_with_race):
+ with self.assertRaises(FileExistsError):
+ safe_makedirs(path, mode=0o755, exist_ok=False)
+
+ def test_race_condition_exist_ok_true(self):
+ path = os.path.join(self.base, "a", "b")
+
+ original_mkdir = os.mkdir
+ call_count = [0]
+
+ def mkdir_with_race(p, mode):
+ call_count[0] += 1
+ if call_count[0] == 1:
+ original_mkdir(p, mode)
+ else:
+ # Simulate other thread creating the directory during the race.
+ # The directory needs to exist for `exist_ok=True` to succeed.
+ original_mkdir(p, mode)
+ raise FileExistsError(f"Directory exists: '{p}'")
+
+ with unittest.mock.patch("os.mkdir", side_effect=mkdir_with_race):
+ safe_makedirs(path, mode=0o755, exist_ok=True)
+
+ self.assertIs(os.path.isdir(path), True)
class SafeJoinTests(unittest.TestCase):