Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Alexander_Naumov:SLE-12:Update
salt.24749
implementation-of-held-unheld-functions-for-sta...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File implementation-of-held-unheld-functions-for-state-pk.patch of Package salt.24749
From 3db1d2e6a3149291959991a583159c05a6a1e5ef Mon Sep 17 00:00:00 2001 From: Victor Zhestkov <35733135+vzhestkov@users.noreply.github.com> Date: Mon, 5 Jul 2021 18:39:49 +0300 Subject: [PATCH] Implementation of held/unheld functions for state pkg - 3000 (#388) * Implementation of held/unheld functions for state pkg --- salt/modules/zypperpkg.py | 216 ++++++++++++++----- salt/states/pkg.py | 310 +++++++++++++++++++++++++++ tests/unit/modules/test_zypperpkg.py | 133 +++++++++++- tests/unit/states/test_pkg.py | 145 +++++++++++++ 4 files changed, 748 insertions(+), 56 deletions(-) diff --git a/salt/modules/zypperpkg.py b/salt/modules/zypperpkg.py index 325999360d..7936993a6b 100644 --- a/salt/modules/zypperpkg.py +++ b/salt/modules/zypperpkg.py @@ -1963,6 +1963,76 @@ def purge(name=None, pkgs=None, root=None, inclusion_detection=False, **kwargs): return _uninstall(inclusion_detection, name=name, pkgs=pkgs, root=root) +def list_holds(pattern=None, full=True, root=None, **kwargs): + """ + List information on locked packages. + + .. note:: + This function returns the computed output of ``list_locks`` + to show exact locked packages. + + pattern + Regular expression used to match the package name + + full : True + Show the full hold definition including version and epoch. Set to + ``False`` to return just the name of the package(s) being held. + + root + Operate on a different root directory. + + + CLI Example: + + .. code-block:: bash + + salt '*' pkg.list_holds + salt '*' pkg.list_holds full=False + """ + locks = list_locks(root=root) + ret = [] + inst_pkgs = {} + for solv_name, lock in locks.items(): + if lock.get("type", "package") != "package": + continue + try: + found_pkgs = search( + solv_name, + root=root, + match=None if "*" in solv_name else "exact", + case_sensitive=(lock.get("case_sensitive", "on") == "on"), + installed_only=True, + details=True, + all_versions=True, + ignore_no_matching_item=True, + ) + except CommandExecutionError: + continue + if found_pkgs: + for pkg in found_pkgs: + if pkg not in inst_pkgs: + inst_pkgs.update( + info_installed( + pkg, root=root, attr="edition,epoch", all_versions=True + ) + ) + + ptrn_re = re.compile(r"{}-\S+".format(pattern)) if pattern else None + for pkg_name, pkg_editions in inst_pkgs.items(): + for pkg_info in pkg_editions: + pkg_ret = ( + "{}-{}:{}.*".format( + pkg_name, pkg_info.get("epoch", 0), pkg_info.get("edition") + ) + if full + else pkg_name + ) + if pkg_ret not in ret and (not ptrn_re or ptrn_re.match(pkg_ret)): + ret.append(pkg_ret) + + return ret + + def list_locks(root=None): ''' List current package locks. @@ -2031,46 +2101,72 @@ def clean_locks(root=None): return out -def unhold(name=None, pkgs=None, **kwargs): - ''' - Remove specified package lock. +def unhold(name=None, pkgs=None, root=None, **kwargs): + """ + Remove a package hold. + + name + A package name to unhold, or a comma-separated list of package names to + unhold. + + pkgs + A list of packages to unhold. The ``name`` parameter will be ignored if + this option is passed. root - operate on a different root directory. + Operate on a different root directory. + CLI Example: .. code-block:: bash - salt '*' pkg.remove_lock <package name> - salt '*' pkg.remove_lock <package1>,<package2>,<package3> - salt '*' pkg.remove_lock pkgs='["foo", "bar"]' - ''' + salt '*' pkg.unhold <package name> + salt '*' pkg.unhold <package1>,<package2>,<package3> + salt '*' pkg.unhold pkgs='["foo", "bar"]' + """ ret = {} - root = kwargs.get('root') - if (not name and not pkgs) or (name and pkgs): - raise CommandExecutionError('Name or packages must be specified.') - elif name: - pkgs = [name] + if not name and not pkgs: + raise CommandExecutionError("Name or packages must be specified.") - locks = list_locks(root) - try: - pkgs = list(__salt__['pkg_resource.parse_targets'](pkgs)[0].keys()) - except MinionError as exc: - raise CommandExecutionError(exc) + targets = [] + if pkgs: + targets.extend(pkgs) + else: + targets.append(name) + locks = list_locks(root=root) removed = [] - missing = [] - for pkg in pkgs: - if locks.get(pkg): - removed.append(pkg) - ret[pkg]['comment'] = 'Package {0} is no longer held.'.format(pkg) + + for target in targets: + version = None + if isinstance(target, dict): + (target, version) = next(iter(target.items())) + ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""} + if locks.get(target): + lock_ver = None + if "version" in locks.get(target): + lock_ver = locks.get(target)["version"] + lock_ver = lock_ver.lstrip("= ") + if version and lock_ver != version: + ret[target]["result"] = False + ret[target][ + "comment" + ] = "Unable to unhold package {} as it is held with the other version.".format( + target + ) + else: + removed.append( + target if not lock_ver else "{}={}".format(target, lock_ver) + ) + ret[target]["changes"]["new"] = "" + ret[target]["changes"]["old"] = "hold" + ret[target]["comment"] = "Package {} is no longer held.".format(target) else: - missing.append(pkg) - ret[pkg]['comment'] = 'Package {0} unable to be unheld.'.format(pkg) + ret[target]["comment"] = "Package {} was already unheld.".format(target) if removed: - __zypper__(root=root).call('rl', *removed) + __zypper__(root=root).call("rl", *removed) return ret @@ -2111,50 +2207,60 @@ def remove_lock(packages, root=None, **kwargs): # pylint: disable=unused-argume return {'removed': len(removed), 'not_found': missing} -def hold(name=None, pkgs=None, **kwargs): - ''' - Add a package lock. Specify packages to lock by exact name. +def hold(name=None, pkgs=None, root=None, **kwargs): + """ + Add a package hold. Specify one of ``name`` and ``pkgs``. + + name + A package name to hold, or a comma-separated list of package names to + hold. + + pkgs + A list of packages to hold. The ``name`` parameter will be ignored if + this option is passed. root - operate on a different root directory. + Operate on a different root directory. + CLI Example: .. code-block:: bash - salt '*' pkg.add_lock <package name> - salt '*' pkg.add_lock <package1>,<package2>,<package3> - salt '*' pkg.add_lock pkgs='["foo", "bar"]' - - :param name: - :param pkgs: - :param kwargs: - :return: - ''' + salt '*' pkg.hold <package name> + salt '*' pkg.hold <package1>,<package2>,<package3> + salt '*' pkg.hold pkgs='["foo", "bar"]' + """ ret = {} - root = kwargs.get('root') - if (not name and not pkgs) or (name and pkgs): - raise CommandExecutionError('Name or packages must be specified.') - elif name: - pkgs = [name] + if not name and not pkgs: + raise CommandExecutionError("Name or packages must be specified.") + + targets = [] + if pkgs: + targets.extend(pkgs) + else: + targets.append(name) locks = list_locks(root=root) added = [] - try: - pkgs = list(__salt__['pkg_resource.parse_targets'](pkgs)[0].keys()) - except MinionError as exc: - raise CommandExecutionError(exc) - for pkg in pkgs: - ret[pkg] = {'name': pkg, 'changes': {}, 'result': False, 'comment': ''} - if not locks.get(pkg): - added.append(pkg) - ret[pkg]['comment'] = 'Package {0} is now being held.'.format(pkg) + for target in targets: + version = None + if isinstance(target, dict): + (target, version) = next(iter(target.items())) + ret[target] = {"name": target, "changes": {}, "result": True, "comment": ""} + if not locks.get(target): + added.append(target if not version else "{}={}".format(target, version)) + ret[target]["changes"]["new"] = "hold" + ret[target]["changes"]["old"] = "" + ret[target]["comment"] = "Package {} is now being held.".format(target) else: - ret[pkg]['comment'] = 'Package {0} is already set to be held.'.format(pkg) + ret[target]["comment"] = "Package {} is already set to be held.".format( + target + ) if added: - __zypper__(root=root).call('al', *added) + __zypper__(root=root).call("al", *added) return ret diff --git a/salt/states/pkg.py b/salt/states/pkg.py index 71ba29a27c..4d3b9f14bf 100644 --- a/salt/states/pkg.py +++ b/salt/states/pkg.py @@ -3350,3 +3350,313 @@ def mod_watch(name, **kwargs): 'changes': {}, 'comment': 'pkg.{0} does not work with the watch requisite'.format(sfun), 'result': False} + + +def held(name, version=None, pkgs=None, replace=False, **kwargs): + """ + Set package in 'hold' state, meaning it will not be changed. + + :param str name: + The name of the package to be held. This parameter is ignored + if ``pkgs`` is used. + + :param str version: + Hold a specific version of a package. + Full description of this parameter is in `installed` function. + + .. note:: + + This parameter make sense for Zypper-based systems. + Ignored for YUM/DNF and APT + + :param list pkgs: + A list of packages to be held. All packages listed under ``pkgs`` + will be held. + + .. code-block:: yaml + + mypkgs: + pkg.held: + - pkgs: + - foo + - bar: 1.2.3-4 + - baz + + .. note:: + + For Zypper-based systems the package could be held for + the version specified. YUM/DNF and APT ingore it. + + :param bool replace: + Force replacement of existings holds with specified. + By default, this parameter is set to ``False``. + """ + + if isinstance(pkgs, list) and len(pkgs) == 0 and not replace: + return { + "name": name, + "changes": {}, + "result": True, + "comment": "No packages to be held provided", + } + + # If just a name (and optionally a version) is passed, just pack them into + # the pkgs argument. + if name and pkgs is None: + if version: + pkgs = [{name: version}] + version = None + else: + pkgs = [name] + + locks = {} + vr_lock = False + if "pkg.list_locks" in __salt__: + locks = __salt__["pkg.list_locks"]() + vr_lock = True + elif "pkg.list_holds" in __salt__: + _locks = __salt__["pkg.list_holds"](full=True) + lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*") + for lock in _locks: + match = lock_re.match(lock) + if match: + epoch = match.group(2) + if epoch == "0": + epoch = "" + else: + epoch = "{}:".format(epoch) + locks.update( + {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}} + ) + else: + locks.update({lock: {}}) + elif "pkg.get_selections" in __salt__: + _locks = __salt__["pkg.get_selections"](state="hold") + for lock in _locks.get("hold", []): + locks.update({lock: {}}) + else: + return { + "name": name, + "changes": {}, + "result": False, + "comment": "No any function to get the list of held packages available.\n" + "Check if the package manager supports package locking.", + } + + if "pkg.hold" not in __salt__: + return { + "name": name, + "changes": {}, + "result": False, + "comment": "`hold` function is not implemented for the package manager.", + } + + ret = {"name": name, "changes": {}, "result": True, "comment": ""} + comments = [] + + held_pkgs = set() + for pkg in pkgs: + if isinstance(pkg, dict): + (pkg_name, pkg_ver) = next(iter(pkg.items())) + else: + pkg_name = pkg + pkg_ver = None + lock_ver = None + if pkg_name in locks and "version" in locks[pkg_name]: + lock_ver = locks[pkg_name]["version"] + lock_ver = lock_ver.lstrip("= ") + held_pkgs.add(pkg_name) + if pkg_name not in locks or (vr_lock and lock_ver != pkg_ver): + if __opts__["test"]: + if pkg_name in locks: + comments.append( + "The following package's hold rule would be updated: {}{}".format( + pkg_name, + "" if not pkg_ver else " (version = {})".format(pkg_ver), + ) + ) + else: + comments.append( + "The following package would be held: {}{}".format( + pkg_name, + "" if not pkg_ver else " (version = {})".format(pkg_ver), + ) + ) + else: + unhold_ret = None + if pkg_name in locks: + unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name]) + hold_ret = __salt__["pkg.hold"](name=name, pkgs=[pkg]) + if not hold_ret.get(pkg_name, {}).get("result", False): + ret["result"] = False + if ( + unhold_ret + and unhold_ret.get(pkg_name, {}).get("result", False) + and hold_ret + and hold_ret.get(pkg_name, {}).get("result", False) + ): + comments.append( + "Package {} was updated with hold rule".format(pkg_name) + ) + elif hold_ret and hold_ret.get(pkg_name, {}).get("result", False): + comments.append("Package {} is now being held".format(pkg_name)) + else: + comments.append("Package {} was not held".format(pkg_name)) + ret["changes"].update(hold_ret) + + if replace: + for pkg_name in locks: + if locks[pkg_name].get("type", "package") != "package": + continue + if __opts__["test"]: + if pkg_name not in held_pkgs: + comments.append( + "The following package would be unheld: {}".format(pkg_name) + ) + else: + if pkg_name not in held_pkgs: + unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name]) + if not unhold_ret.get(pkg_name, {}).get("result", False): + ret["result"] = False + if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"): + comments.append(unhold_ret.get(pkg_name).get("comment")) + ret["changes"].update(unhold_ret) + + ret["comment"] = "\n".join(comments) + if not (ret["changes"] or ret["comment"]): + ret["comment"] = "No changes made" + + return ret + + +def unheld(name, version=None, pkgs=None, all=False, **kwargs): + """ + Unset package from 'hold' state, to allow operations with the package. + + :param str name: + The name of the package to be unheld. This parameter is ignored + if ``pkgs`` is used. + + :param str version: + Unhold a specific version of a package. + Full description of this parameter is in `installed` function. + + .. note:: + + This parameter make sense for Zypper-based systems. + Ignored for YUM/DNF and APT + + :param list pkgs: + A list of packages to be unheld. All packages listed under ``pkgs`` + will be unheld. + + .. code-block:: yaml + + mypkgs: + pkg.unheld: + - pkgs: + - foo + - bar: 1.2.3-4 + - baz + + .. note:: + + For Zypper-based systems the package could be held for + the version specified. YUM/DNF and APT ingore it. + For ``unheld`` there is no need to specify the exact version + to be unheld. + + :param bool all: + Force removing of all existings locks. + By default, this parameter is set to ``False``. + """ + + if isinstance(pkgs, list) and len(pkgs) == 0 and not all: + return { + "name": name, + "changes": {}, + "result": True, + "comment": "No packages to be unheld provided", + } + + # If just a name (and optionally a version) is passed, just pack them into + # the pkgs argument. + if name and pkgs is None: + pkgs = [{name: version}] + version = None + + locks = {} + vr_lock = False + if "pkg.list_locks" in __salt__: + locks = __salt__["pkg.list_locks"]() + vr_lock = True + elif "pkg.list_holds" in __salt__: + _locks = __salt__["pkg.list_holds"](full=True) + lock_re = re.compile(r"^(.+)-(\d+):(.*)\.\*") + for lock in _locks: + match = lock_re.match(lock) + if match: + epoch = match.group(2) + if epoch == "0": + epoch = "" + else: + epoch = "{}:".format(epoch) + locks.update( + {match.group(1): {"version": "{}{}".format(epoch, match.group(3))}} + ) + else: + locks.update({lock: {}}) + elif "pkg.get_selections" in __salt__: + _locks = __salt__["pkg.get_selections"](state="hold") + for lock in _locks.get("hold", []): + locks.update({lock: {}}) + else: + return { + "name": name, + "changes": {}, + "result": False, + "comment": "No any function to get the list of held packages available.\n" + "Check if the package manager supports package locking.", + } + + dpkgs = {} + for pkg in pkgs: + if isinstance(pkg, dict): + (pkg_name, pkg_ver) = next(iter(pkg.items())) + dpkgs.update({pkg_name: pkg_ver}) + else: + dpkgs.update({pkg: None}) + + ret = {"name": name, "changes": {}, "result": True, "comment": ""} + comments = [] + + for pkg_name in locks: + if locks[pkg_name].get("type", "package") != "package": + continue + lock_ver = None + if vr_lock and "version" in locks[pkg_name]: + lock_ver = locks[pkg_name]["version"] + lock_ver = lock_ver.lstrip("= ") + if all or (pkg_name in dpkgs and (not lock_ver or lock_ver == dpkgs[pkg_name])): + if __opts__["test"]: + comments.append( + "The following package would be unheld: {}{}".format( + pkg_name, + "" + if not dpkgs.get(pkg_name) + else " (version = {})".format(lock_ver), + ) + ) + else: + unhold_ret = __salt__["pkg.unhold"](name=name, pkgs=[pkg_name]) + if not unhold_ret.get(pkg_name, {}).get("result", False): + ret["result"] = False + if unhold_ret and unhold_ret.get(pkg_name, {}).get("comment"): + comments.append(unhold_ret.get(pkg_name).get("comment")) + ret["changes"].update(unhold_ret) + + ret["comment"] = "\n".join(comments) + if not (ret["changes"] or ret["comment"]): + ret["comment"] = "No changes made" + + return ret diff --git a/tests/unit/modules/test_zypperpkg.py b/tests/unit/modules/test_zypperpkg.py index dfe7f4f7a1..10d90660c6 100644 --- a/tests/unit/modules/test_zypperpkg.py +++ b/tests/unit/modules/test_zypperpkg.py @@ -1942,7 +1942,6 @@ pattern() = package-c""" python_shell=False, env={"ZYPP_READONLY_HACK": "1"}, ) - self.assertEqual(zypper.__context__, {"pkg.other_data": None}) def test_get_repo_keys(self): salt_mock = {"lowpkg.list_gpg_keys": MagicMock(return_value=True)} @@ -1994,3 +1993,135 @@ pattern() = package-c""" with patch("salt.modules.zypperpkg.__zypper__", zypper_mock): assert zypper.services_need_restart() == expected zypper_mock(root=None).nolock.call.assert_called_with("ps", "-sss") + + def test_pkg_hold(self): + """ + Tests holding packages with Zypper + """ + + # Test openSUSE 15.3 + list_locks_mock = { + "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + "minimal_base": { + "type": "pattern", + "match_type": "glob", + "case_sensitive": "on", + }, + "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + } + + cmd = MagicMock( + return_value={ + "pid": 1234, + "retcode": 0, + "stdout": "Specified lock has been successfully added.", + "stderr": "", + } + ) + with patch.object( + zypper, "list_locks", MagicMock(return_value=list_locks_mock) + ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}): + ret = zypper.hold("foo") + assert ret["foo"]["changes"]["old"] == "" + assert ret["foo"]["changes"]["new"] == "hold" + assert ret["foo"]["comment"] == "Package foo is now being held." + cmd.assert_called_once_with( + ["zypper", "--non-interactive", "--no-refresh", "al", "foo"], + env={}, + output_loglevel="trace", + python_shell=False, + ) + cmd.reset_mock() + ret = zypper.hold(pkgs=["foo", "bar"]) + assert ret["foo"]["changes"]["old"] == "" + assert ret["foo"]["changes"]["new"] == "hold" + assert ret["foo"]["comment"] == "Package foo is now being held." + assert ret["bar"]["changes"] == {} + assert ret["bar"]["comment"] == "Package bar is already set to be held." + cmd.assert_called_once_with( + ["zypper", "--non-interactive", "--no-refresh", "al", "foo"], + env={}, + output_loglevel="trace", + python_shell=False, + ) + + + def test_pkg_unhold(self): + """ + Tests unholding packages with Zypper + """ + + # Test openSUSE 15.3 + list_locks_mock = { + "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + "minimal_base": { + "type": "pattern", + "match_type": "glob", + "case_sensitive": "on", + }, + "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + } + + cmd = MagicMock( + return_value={ + "pid": 1234, + "retcode": 0, + "stdout": "1 lock has been successfully removed.", + "stderr": "", + } + ) + with patch.object( + zypper, "list_locks", MagicMock(return_value=list_locks_mock) + ), patch.dict(zypper.__salt__, {"cmd.run_all": cmd}): + ret = zypper.unhold("foo") + assert ret["foo"]["comment"] == "Package foo was already unheld." + cmd.assert_not_called() + cmd.reset_mock() + ret = zypper.unhold(pkgs=["foo", "bar"]) + assert ret["foo"]["changes"] == {} + assert ret["foo"]["comment"] == "Package foo was already unheld." + assert ret["bar"]["changes"]["old"] == "hold" + assert ret["bar"]["changes"]["new"] == "" + assert ret["bar"]["comment"] == "Package bar is no longer held." + cmd.assert_called_once_with( + ["zypper", "--non-interactive", "--no-refresh", "rl", "bar"], + env={}, + output_loglevel="trace", + python_shell=False, + ) + + + def test_pkg_list_holds(self): + """ + Tests listing of calculated held packages with Zypper + """ + + # Test openSUSE 15.3 + list_locks_mock = { + "bar": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + "minimal_base": { + "type": "pattern", + "match_type": "glob", + "case_sensitive": "on", + }, + "baz": {"type": "package", "match_type": "glob", "case_sensitive": "on"}, + } + installed_pkgs = { + "foo": [{"edition": "1.2.3-1.1"}], + "bar": [{"edition": "2.3.4-2.1", "epoch": "2"}], + } + + def zypper_search_mock(name, *_args, **_kwargs): + if name in installed_pkgs: + return {name: installed_pkgs.get(name)} + + with patch.object( + zypper, "list_locks", MagicMock(return_value=list_locks_mock) + ), patch.object( + zypper, "search", MagicMock(side_effect=zypper_search_mock) + ), patch.object( + zypper, "info_installed", MagicMock(side_effect=zypper_search_mock) + ): + ret = zypper.list_holds() + assert len(ret) == 1 + assert "bar-2:2.3.4-2.1.*" in ret diff --git a/tests/unit/states/test_pkg.py b/tests/unit/states/test_pkg.py index 38f72353fa..41d74a4706 100644 --- a/tests/unit/states/test_pkg.py +++ b/tests/unit/states/test_pkg.py @@ -232,3 +232,148 @@ class PkgTestCase(TestCase, LoaderModuleMockMixin): for installed_versions, operator, version, expected_result in test_parameters: msg = "installed_versions: {}, operator: {}, version: {}, expected_result: {}".format(installed_versions, operator, version, expected_result) self.assertEqual(expected_result, pkg._fulfills_version_spec(installed_versions, operator, version), msg) + + def test_held_unheld_Zypper(self): + self.pkgr_held_unheld("Zypper") + + def test_held_unheld_YUM(self): + self.pkgr_held_unheld("YUM/DNF") + + def test_held_unheld_APT(self): + self.pkgr_held_unheld("APT") + + def pkgr_held_unheld(self, package_manager): + """ + Test pkg.held and pkg.unheld with Zypper, YUM/DNF and APT + """ + + if package_manager == "Zypper": + list_holds_func = "pkg.list_locks" + list_holds_mock = MagicMock( + return_value={ + "bar": { + "type": "package", + "match_type": "glob", + "case_sensitive": "on", + }, + "minimal_base": { + "type": "pattern", + "match_type": "glob", + "case_sensitive": "on", + }, + "baz": { + "type": "package", + "match_type": "glob", + "case_sensitive": "on", + }, + } + ) + elif package_manager == "YUM/DNF": + list_holds_func = "pkg.list_holds" + list_holds_mock = MagicMock( + return_value=["bar-0:1.2.3-1.1.*", "baz-0:2.3.4-2.1.*"] + ) + elif package_manager == "APT": + list_holds_func = "pkg.get_selections" + list_holds_mock = MagicMock(return_value={"hold": ["bar", "baz"]}) + + def pkg_hold(name, pkgs=None, *_args, **__kwargs): + if name and pkgs is None: + pkgs = [name] + ret = {} + for pkg in pkgs: + ret.update( + { + pkg: { + "name": pkg, + "changes": {"new": "hold", "old": ""}, + "result": True, + "comment": "Package {} is now being held.".format(pkg), + } + } + ) + return ret + + def pkg_unhold(name, pkgs=None, *_args, **__kwargs): + if name and pkgs is None: + pkgs = [name] + ret = {} + for pkg in pkgs: + ret.update( + { + pkg: { + "name": pkg, + "changes": {"new": "", "old": "hold"}, + "result": True, + "comment": "Package {} is no longer held.".format(pkg), + } + } + ) + return ret + hold_mock = MagicMock(side_effect=pkg_hold) + unhold_mock = MagicMock(side_effect=pkg_unhold) + + # Testing with Zypper + with patch.dict( + pkg.__salt__, + { + list_holds_func: list_holds_mock, + "pkg.hold": hold_mock, + "pkg.unhold": unhold_mock, + }, + ), patch.dict( + pkg.__opts__, { + "test": False, + } + ): + # Holding one of two packages + ret = pkg.held("held-test", pkgs=["foo", "bar"]) + assert "foo" in ret["changes"] + assert len(ret["changes"]) == 1 + hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"]) + unhold_mock.assert_not_called() + + hold_mock.reset_mock() + unhold_mock.reset_mock() + + # Holding one of two packages and replacing all the rest held packages + ret = pkg.held("held-test", pkgs=["foo", "bar"], replace=True) + assert "foo" in ret["changes"] + assert "baz" in ret["changes"] + assert len(ret["changes"]) == 2 + hold_mock.assert_called_once_with(name="held-test", pkgs=["foo"]) + unhold_mock.assert_called_once_with(name="held-test", pkgs=["baz"]) + + hold_mock.reset_mock() + unhold_mock.reset_mock() + + # Remove all holds + ret = pkg.held("held-test", pkgs=[], replace=True) + assert "bar" in ret["changes"] + assert "baz" in ret["changes"] + assert len(ret["changes"]) == 2 + hold_mock.assert_not_called() + unhold_mock.assert_any_call(name="held-test", pkgs=["baz"]) + unhold_mock.assert_any_call(name="held-test", pkgs=["bar"]) + + hold_mock.reset_mock() + unhold_mock.reset_mock() + + # Unolding one of two packages + ret = pkg.unheld("held-test", pkgs=["foo", "bar"]) + assert "bar" in ret["changes"] + assert len(ret["changes"]) == 1 + unhold_mock.assert_called_once_with(name="held-test", pkgs=["bar"]) + hold_mock.assert_not_called() + + hold_mock.reset_mock() + unhold_mock.reset_mock() + + # Remove all holds + ret = pkg.unheld("held-test", all=True) + assert "bar" in ret["changes"] + assert "baz" in ret["changes"] + assert len(ret["changes"]) == 2 + hold_mock.assert_not_called() + unhold_mock.assert_any_call(name="held-test", pkgs=["baz"]) + unhold_mock.assert_any_call(name="held-test", pkgs=["bar"]) -- 2.32.0
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor