Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:darix:apps
salt
make-reactor-engine-less-blocking-the-eventpubl...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File make-reactor-engine-less-blocking-the-eventpublisher.patch of Package salt
From 0d35f09288700f5c961567442c3fcc25838b8de4 Mon Sep 17 00:00:00 2001 From: Victor Zhestkov <vzhestkov@suse.com> Date: Wed, 15 May 2024 09:44:21 +0200 Subject: [PATCH] Make reactor engine less blocking the EventPublisher --- salt/utils/reactor.py | 45 +++++++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 17 deletions(-) diff --git a/salt/utils/reactor.py b/salt/utils/reactor.py index 19420a51cf..78adad34da 100644 --- a/salt/utils/reactor.py +++ b/salt/utils/reactor.py @@ -1,10 +1,12 @@ """ Functions which implement running reactor jobs """ + import fnmatch import glob import logging import os +from threading import Lock import salt.client import salt.defaults.exitcodes @@ -194,13 +196,6 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): self.resolve_aliases(chunks) return chunks - def call_reactions(self, chunks): - """ - Execute the reaction state - """ - for chunk in chunks: - self.wrap.run(chunk) - def run(self): """ Enter into the server loop @@ -218,7 +213,7 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): ) as event: self.wrap = ReactWrap(self.opts) - for data in event.iter_events(full=True): + for data in event.iter_events(full=True, auto_reconnect=True): # skip all events fired by ourselves if data["data"].get("user") == self.wrap.event_user: continue @@ -268,15 +263,9 @@ class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): if not self.is_leader: continue else: - reactors = self.list_reactors(data["tag"]) - if not reactors: - continue - chunks = self.reactions(data["tag"], data["data"], reactors) - if chunks: - try: - self.call_reactions(chunks) - except SystemExit: - log.warning("Exit ignored by reactor") + self.wrap.call_reactions( + data, self.list_reactors, self.reactions + ) class ReactWrap: @@ -297,6 +286,7 @@ class ReactWrap: def __init__(self, opts): self.opts = opts + self._run_lock = Lock() if ReactWrap.client_cache is None: ReactWrap.client_cache = salt.utils.cache.CacheDict( opts["reactor_refresh_interval"] @@ -480,3 +470,24 @@ class ReactWrap: Wrap LocalCaller to execute remote exec functions locally on the Minion """ self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"]) + + def _call_reactions(self, data, list_reactors, get_reactions): + reactors = list_reactors(data["tag"]) + if not reactors: + return + chunks = get_reactions(data["tag"], data["data"], reactors) + if not chunks: + return + with self._run_lock: + try: + for chunk in chunks: + self.run(chunk) + except Exception as exc: # pylint: disable=broad-except + log.error( + "Exception while calling the reactions: %s", exc, exc_info=True + ) + + def call_reactions(self, data, list_reactors, get_reactions): + return self.pool.fire_async( + self._call_reactions, args=(data, list_reactors, get_reactions) + ) -- 2.45.0
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor