Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-12-SP5:GA
python3-base.36306
CVE-2023-6597-TempDir-cleaning-symlink.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File CVE-2023-6597-TempDir-cleaning-symlink.patch of Package python3-base.36306
From d8f50e5e191027ad3358bafe6828a1263d3c0cda Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=B8ren=20L=C3=B8vborg?= <sorenl@unity3d.com> Date: Thu, 1 Dec 2022 14:30:26 +0100 Subject: [PATCH 01/12] tempfile tests: try not to leave NOUNLINK files behind During development, it becomes tiresome to have to manually clean up these files in case of unrelated TemporaryDirectory breakage. --- Lib/posixpath.py | 36 ++ Lib/tempfile.py | 148 ++++++++-- Lib/test/support/__init__.py | 31 ++ Lib/test/test_tempfile.py | 99 ++++++ Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst | 2 5 files changed, 301 insertions(+), 15 deletions(-) --- a/Lib/posixpath.py +++ b/Lib/posixpath.py @@ -455,3 +455,39 @@ def relpath(path, start=None): if not rel_list: return curdir return join(*rel_list) + +def commonpath(paths): + """Given a sequence of path names, returns the longest common sub-path.""" + + if not paths: + raise ValueError('commonpath() arg is an empty sequence') + + if isinstance(paths[0], bytes): + sep = b'/' + curdir = b'.' + else: + sep = '/' + curdir = '.' + + try: + split_paths = [path.split(sep) for path in paths] + + try: + isabs, = set(p[:1] == sep for p in paths) + except ValueError: + raise ValueError("Can't mix absolute and relative paths") from None + + split_paths = [[c for c in s if c and c != curdir] for s in split_paths] + s1 = min(split_paths) + s2 = max(split_paths) + common = s1 + for i, c in enumerate(s1): + if c != s2[i]: + common = s1[:i] + break + + prefix = sep if isabs else sep[:0] + return prefix + sep.join(common) + except (TypeError, AttributeError): + genericpath._check_arg_types('commonpath', *paths) + raise --- a/Lib/tempfile.py +++ b/Lib/tempfile.py @@ -21,7 +21,7 @@ __all__ = [ "mkstemp", "mkdtemp", # low level safe interfaces "mktemp", # deprecated unsafe interface "TMP_MAX", "gettempprefix", # constants - "tempdir", "gettempdir" + "tempdir", "gettempdir", "gettempdirb" ] @@ -82,6 +82,45 @@ def _exists(fn): else: return True + +def _infer_return_type(*args): + """Look at the type of all args and divine their implied return type.""" + return_type = None + for arg in args: + if arg is None: + continue + if isinstance(arg, bytes): + if return_type is str: + raise TypeError("Can't mix bytes and non-bytes in " + "path components.") + return_type = bytes + else: + if return_type is bytes: + raise TypeError("Can't mix bytes and non-bytes in " + "path components.") + return_type = str + if return_type is None: + return str # tempfile APIs return a str by default. + return return_type + + +def _sanitize_params(prefix, suffix, dir): + """Common parameter processing for most APIs in this module.""" + output_type = _infer_return_type(prefix, suffix, dir) + if suffix is None: + suffix = output_type() + if prefix is None: + if output_type is str: + prefix = template + else: + prefix = _os.fsencode(template) + if dir is None: + if output_type is str: + dir = gettempdir() + else: + dir = gettempdirb() + return prefix, suffix, dir, output_type + class _RandomNameSequence: """An instance of _RandomNameSequence generates an endless sequence of unpredictable strings which can safely be incorporated @@ -220,6 +259,22 @@ def _mkstemp_inner(dir, pre, suf, flags) raise FileExistsError(_errno.EEXIST, "No usable temporary file name found") +def _dont_follow_symlinks(func, path, *args): + # Pass follow_symlinks=False, unless not supported on this platform. + if func in _os.supports_follow_symlinks: + func(path, *args, follow_symlinks=False) + elif _os.name == 'nt' or not _os.path.islink(path): + func(path, *args) + +def _resetperms(path): + try: + chflags = _os.chflags + except AttributeError: + pass + else: + _dont_follow_symlinks(chflags, path, 0) + _dont_follow_symlinks(_os.chmod, path, 0o700) + # User visible interfaces. @@ -241,6 +296,12 @@ def gettempdir(): _once_lock.release() return tempdir + +def gettempdirb(): + """A bytes version of tempfile.gettempdir().""" + return _os.fsencode(gettempdir()) + + def mkstemp(suffix="", prefix=template, dir=None, text=False): """User-callable function to create and return a unique temporary file. The return value is a pair (fd, name) where fd is the @@ -277,7 +338,6 @@ def mkstemp(suffix="", prefix=template, return _mkstemp_inner(dir, prefix, suffix, flags) - def mkdtemp(suffix="", prefix=template, dir=None): """User-callable function to create and return a unique temporary directory. The return value is the pathname of the directory. @@ -291,17 +351,17 @@ def mkdtemp(suffix="", prefix=template, Caller is responsible for deleting the directory when done with it. """ - if dir is None: - dir = gettempdir() + prefix, suffix, dir, output_type = _sanitize_params(prefix, suffix, dir) names = _get_candidate_names() + if output_type is bytes: + names = map(_os.fsencode, names) for seq in range(TMP_MAX): name = next(names) file = _os.path.join(dir, prefix + name + suffix) try: _os.mkdir(file, 0o700) - return file except FileExistsError: continue # try again except PermissionError: @@ -312,10 +372,12 @@ def mkdtemp(suffix="", prefix=template, continue else: raise + return file raise FileExistsError(_errno.EEXIST, "No usable temporary directory name found") + def mktemp(suffix="", prefix=template, dir=None): """User-callable function to return a unique temporary file name. The file is not created. @@ -675,7 +737,7 @@ class SpooledTemporaryFile: return rv -class TemporaryDirectory(object): +class TemporaryDirectory: """Create and return a temporary directory. This has the same behavior as mkdtemp but can be used as a context manager. For example: @@ -684,20 +746,75 @@ class TemporaryDirectory(object): ... Upon exiting the context, the directory and everything contained - in it are removed. + in it are removed (unless delete=False is passed or an exception + is raised during cleanup and ignore_cleanup_errors is not True). + + Optional Arguments: + suffix - A str suffix for the directory name. (see mkdtemp) + prefix - A str prefix for the directory name. (see mkdtemp) + dir - A directory to create this temp dir in. (see mkdtemp) + ignore_cleanup_errors - False; ignore exceptions during cleanup? + delete - True; whether the directory is automatically deleted. """ - def __init__(self, suffix="", prefix=template, dir=None): + def __init__(self, suffix=None, prefix=None, dir=None, + ignore_cleanup_errors=False, *, delete=True): self.name = mkdtemp(suffix, prefix, dir) + self._ignore_cleanup_errors = ignore_cleanup_errors + self._delete = delete self._finalizer = _weakref.finalize( self, self._cleanup, self.name, - warn_message="Implicitly cleaning up {!r}".format(self)) + warn_message="Implicitly cleaning up {!r}".format(self), + ignore_errors=self._ignore_cleanup_errors, delete=self._delete) @classmethod - def _cleanup(cls, name, warn_message): - _shutil.rmtree(name) - _warnings.warn(warn_message, ResourceWarning) + def _rmtree(cls, name, ignore_errors=False, repeated=False): + def onexc(func, path, exc_info): + exc = exc_info[1] + if isinstance(exc, PermissionError): + if repeated and path == name: + if ignore_errors: + return + raise + + try: + if path != name: + _resetperms(_os.path.dirname(path)) + _resetperms(path) + try: + _os.unlink(path) + except IsADirectoryError: + cls._rmtree(path) + except PermissionError: + # The PermissionError handler was originally added for + # FreeBSD in directories, but it seems that it is raised + # on Windows too. + # bpo-43153: Calling _rmtree again may + # raise NotADirectoryError and mask the PermissionError. + # So we must re-raise the current PermissionError if + # path is not a directory. + if not _os.path.isdir(path) or _os.path.isjunction(path): + if ignore_errors: + return + raise + cls._rmtree(path, ignore_errors=ignore_errors, + repeated=(path == name)) + except FileNotFoundError: + pass + elif isinstance(exc, FileNotFoundError): + pass + else: + if not ignore_errors: + raise + + _shutil.rmtree(name, onerror=onexc) + + @classmethod + def _cleanup(cls, name, warn_message, ignore_errors=False, delete=True): + if delete: + cls._rmtree(name, ignore_errors=ignore_errors) + _warnings.warn(warn_message, ResourceWarning) def __repr__(self): return "<{} {!r}>".format(self.__class__.__name__, self.name) @@ -706,8 +823,9 @@ class TemporaryDirectory(object): return self.name def __exit__(self, exc, value, tb): - self.cleanup() + if self._delete: + self.cleanup() def cleanup(self): - if self._finalizer.detach(): - _shutil.rmtree(self.name) + if self._finalizer.detach() or _os.path.exists(self.name): + self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors) --- a/Lib/test/support/__init__.py +++ b/Lib/test/support/__init__.py @@ -119,6 +119,37 @@ class ResourceDenied(unittest.SkipTest): and unexpected skips. """ +_can_symlink = None + +TESTFN_ASCII = "{}_{}_tmp".format('@test', os.getpid()) + +def can_symlink(): + global _can_symlink + if _can_symlink is not None: + return _can_symlink + # WASI / wasmtime prevents symlinks with absolute paths, see man + # openat2(2) RESOLVE_BENEATH. Almost all symlink tests use absolute + # paths. Skip symlink tests on WASI for now. + src = os.path.abspath(TESTFN) + symlink_path = src + "can_symlink" + try: + os.symlink(src, symlink_path) + can = True + except (OSError, NotImplementedError, AttributeError): + can = False + else: + os.remove(symlink_path) + _can_symlink = can + return can + + +def skip_unless_symlink(test): + """Skip decorator for tests that require functional symlink""" + ok = can_symlink() + msg = "Requires functional symlink implementation" + return test if ok else unittest.skip(msg)(test) + + @contextlib.contextmanager def _ignore_deprecated_imports(ignore=True): """Context manager to suppress package and module deprecation --- a/Lib/test/test_tempfile.py +++ b/Lib/test/test_tempfile.py @@ -83,6 +83,7 @@ class TestExports(BaseTestCase): "TMP_MAX" : 1, "gettempprefix" : 1, "gettempdir" : 1, + "gettempdirb" : 1, "tempdir" : 1, "template" : 1, "SpooledTemporaryFile" : 1, @@ -1154,6 +1155,7 @@ class NulledModules: d.clear() d.update(c) + class TestTemporaryDirectory(BaseTestCase): """Test TemporaryDirectory().""" @@ -1214,6 +1216,103 @@ class TestTemporaryDirectory(BaseTestCas "were deleted") d2.cleanup() + @support.skip_unless_symlink + def test_cleanup_with_symlink_modes(self): + # cleanup() should not follow symlinks when fixing mode bits (#91133) + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + for mode in range(8): + mode <<= 6 + with self.subTest(mode=format(mode, '03o')): + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chmod(symlink, mode, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chmod(symlink, mode) + except FileNotFoundError: + pass + os.chmod(d1.name, mode) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chmod(file1, mode) + old_mode = os.stat(file1).st_mode + test(file1, target_is_directory=False) + new_mode = os.stat(file1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + with self.subTest('existing dir'): + os.chmod(dir1, mode) + old_mode = os.stat(dir1).st_mode + test(dir1, target_is_directory=True) + new_mode = os.stat(dir1).st_mode + self.assertEqual(new_mode, old_mode, + '%03o != %03o' % (new_mode, old_mode)) + + @unittest.skipUnless(hasattr(os, 'chflags'), 'requires os.chflags') + @support.skip_unless_symlink + def test_cleanup_with_symlink_flags(self): + # cleanup() should not follow symlinks when fixing flags (#91133) + flags = stat.UF_IMMUTABLE | stat.UF_NOUNLINK + self.check_flags(flags) + + with self.do_create(recurse=0) as d2: + file1 = os.path.join(d2, 'file1') + open(file1, 'wb').close() + dir1 = os.path.join(d2, 'dir1') + os.mkdir(dir1) + def test(target, target_is_directory): + d1 = self.do_create(recurse=0) + symlink = os.path.join(d1.name, 'symlink') + os.symlink(target, symlink, + target_is_directory=target_is_directory) + try: + os.chflags(symlink, flags, follow_symlinks=False) + except NotImplementedError: + pass + try: + os.chflags(symlink, flags) + except FileNotFoundError: + pass + os.chflags(d1.name, flags) + d1.cleanup() + self.assertFalse(os.path.exists(d1.name)) + + with self.subTest('nonexisting file'): + test('nonexisting', target_is_directory=False) + with self.subTest('nonexisting dir'): + test('nonexisting', target_is_directory=True) + + with self.subTest('existing file'): + os.chflags(file1, flags) + old_flags = os.stat(file1).st_flags + test(file1, target_is_directory=False) + new_flags = os.stat(file1).st_flags + self.assertEqual(new_flags, old_flags) + + with self.subTest('existing dir'): + os.chflags(dir1, flags) + old_flags = os.stat(dir1).st_flags + test(dir1, target_is_directory=True) + new_flags = os.stat(dir1).st_flags + self.assertEqual(new_flags, old_flags) + @support.cpython_only def test_del_on_collection(self): # A TemporaryDirectory is deleted when garbage collected --- /dev/null +++ b/Misc/NEWS.d/next/Library/2022-12-01-16-57-44.gh-issue-91133.LKMVCV.rst @@ -0,0 +1,2 @@ +Fix a bug in :class:`tempfile.TemporaryDirectory` cleanup, which now no longer +dereferences symlinks when working around file system permission errors.
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor