Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-15-SP1:Update
salt.21004
fix-memory-leak-produced-by-batch-async-find_jo...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File fix-memory-leak-produced-by-batch-async-find_jobs-me.patch of Package salt.21004
From 00c538383e463febba492e74577ae64be80d4d00 Mon Sep 17 00:00:00 2001 From: Mihai Dinca <mdinca@suse.de> Date: Mon, 16 Sep 2019 11:27:30 +0200 Subject: [PATCH] Fix memory leak produced by batch async find_jobs mechanism (bsc#1140912) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multiple fixes: - use different JIDs per find_job - fix bug in detection of find_job returns - fix timeout passed from request payload - better cleanup at the end of batching Co-authored-by: Pablo Suárez Hernández <psuarezhernandez@suse.com> --- salt/cli/batch_async.py | 59 ++++++++++++++++++++++++++++------------- salt/client/__init__.py | 1 + salt/master.py | 2 -- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index 7225491228..388b709416 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -73,6 +73,7 @@ class BatchAsync: self.done_minions = set() self.active = set() self.initialized = False + self.jid_gen = jid_gen self.ping_jid = jid_gen() self.batch_jid = jid_gen() self.find_job_jid = jid_gen() @@ -91,14 +92,11 @@ class BatchAsync: def __set_event_handler(self): ping_return_pattern = "salt/job/{}/ret/*".format(self.ping_jid) batch_return_pattern = "salt/job/{}/ret/*".format(self.batch_jid) - find_job_return_pattern = "salt/job/{}/ret/*".format(self.find_job_jid) self.event.subscribe(ping_return_pattern, match_type="glob") self.event.subscribe(batch_return_pattern, match_type="glob") - self.event.subscribe(find_job_return_pattern, match_type="glob") - self.event.patterns = { + self.patterns = { (ping_return_pattern, "ping_return"), (batch_return_pattern, "batch_run"), - (find_job_return_pattern, "find_job_return"), } self.event.set_event_handler(self.__event_handler) @@ -106,7 +104,7 @@ class BatchAsync: if not self.event: return mtag, data = self.event.unpack(raw, self.event.serial) - for (pattern, op) in self.event.patterns: + for (pattern, op) in self.patterns: if fnmatch.fnmatch(mtag, pattern): minion = data["id"] if op == "ping_return": @@ -114,7 +112,8 @@ class BatchAsync: if self.targeted_minions == self.minions: self.event.io_loop.spawn_callback(self.start_batch) elif op == "find_job_return": - self.find_job_returned.add(minion) + if data.get("return", None): + self.find_job_returned.add(minion) elif op == "batch_run": if minion in self.active: self.active.remove(minion) @@ -134,7 +133,11 @@ class BatchAsync: return set(list(to_run)[:next_batch_size]) @tornado.gen.coroutine - def check_find_job(self, batch_minions): + def check_find_job(self, batch_minions, jid): + find_job_return_pattern = "salt/job/{}/ret/*".format(jid) + self.event.unsubscribe(find_job_return_pattern, match_type="glob") + self.patterns.remove((find_job_return_pattern, "find_job_return")) + timedout_minions = batch_minions.difference(self.find_job_returned).difference( self.done_minions ) @@ -143,27 +146,39 @@ class BatchAsync: running = batch_minions.difference(self.done_minions).difference( self.timedout_minions ) + if timedout_minions: self.schedule_next() + if running: + self.find_job_returned = self.find_job_returned.difference(running) self.event.io_loop.add_callback(self.find_job, running) @tornado.gen.coroutine def find_job(self, minions): - not_done = minions.difference(self.done_minions) - ping_return = yield self.local.run_job_async( - not_done, - "saltutil.find_job", - [self.batch_jid], - "list", - gather_job_timeout=self.opts["gather_job_timeout"], - jid=self.find_job_jid, - **self.eauth - ) - self.event.io_loop.call_later( - self.opts["gather_job_timeout"], self.check_find_job, not_done + not_done = minions.difference(self.done_minions).difference( + self.timedout_minions ) + if not_done: + jid = self.jid_gen() + find_job_return_pattern = "salt/job/{}/ret/*".format(jid) + self.patterns.add((find_job_return_pattern, "find_job_return")) + self.event.subscribe(find_job_return_pattern, match_type="glob") + + ret = yield self.local.run_job_async( + not_done, + "saltutil.find_job", + [self.batch_jid], + "list", + gather_job_timeout=self.opts["gather_job_timeout"], + jid=jid, + **self.eauth + ) + self.event.io_loop.call_later( + self.opts["gather_job_timeout"], self.check_find_job, not_done, jid + ) + @tornado.gen.coroutine def start(self): self.__set_event_handler() @@ -211,6 +226,9 @@ class BatchAsync: } self.event.fire_event(data, "salt/batch/{}/done".format(self.batch_jid)) self.event.remove_event_handler(self.__event_handler) + for (pattern, label) in self.patterns: + if label in ["ping_return", "batch_run"]: + self.event.unsubscribe(pattern, match_type="glob") def schedule_next(self): if not self.scheduled: @@ -235,11 +253,14 @@ class BatchAsync: jid=self.batch_jid, metadata=self.metadata, ) + self.event.io_loop.call_later( self.opts["timeout"], self.find_job, set(next_batch) ) except Exception as ex: + log.error("Error in scheduling next batch: %s", ex) self.active = self.active.difference(next_batch) else: self.end_batch() self.scheduled = False + yield diff --git a/salt/client/__init__.py b/salt/client/__init__.py index 1e9f11df4c..cc8fd4048d 100644 --- a/salt/client/__init__.py +++ b/salt/client/__init__.py @@ -1776,6 +1776,7 @@ class LocalClient: "key": self.key, "tgt_type": tgt_type, "ret": ret, + "timeout": timeout, "jid": jid, } diff --git a/salt/master.py b/salt/master.py index b9bc1a7a67..7a99af357a 100644 --- a/salt/master.py +++ b/salt/master.py @@ -2232,8 +2232,6 @@ class ClearFuncs(TransportMethods): def publish_batch(self, clear_load, minions, missing): batch_load = {} batch_load.update(clear_load) - import salt.cli.batch_async - batch = salt.cli.batch_async.BatchAsync( self.local.opts, functools.partial(self._prep_jid, clear_load, {}), -- 2.29.2
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor