Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:dirkmueller:acdc:as_python3_module
salt.16132
batch-async-catch-exceptions-and-safety-unregis...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File batch-async-catch-exceptions-and-safety-unregister-a.patch of Package salt.16132
From c5edf396ffd66b6ac1479aa01367aae3eff7683d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= <psuarezhernandez@suse.com> Date: Fri, 28 Feb 2020 15:11:53 +0000 Subject: [PATCH] Batch Async: Catch exceptions and safety unregister and close instances --- salt/cli/batch_async.py | 156 +++++++++++++++++++++++----------------- 1 file changed, 89 insertions(+), 67 deletions(-) diff --git a/salt/cli/batch_async.py b/salt/cli/batch_async.py index da069b64bd..b8f272ed67 100644 --- a/salt/cli/batch_async.py +++ b/salt/cli/batch_async.py @@ -13,7 +13,6 @@ import salt.client # pylint: enable=import-error,no-name-in-module,redefined-builtin import logging -import fnmatch log = logging.getLogger(__name__) @@ -104,22 +103,25 @@ class BatchAsync(object): def __event_handler(self, raw): if not self.event: return - mtag, data = self.event.unpack(raw, self.event.serial) - for (pattern, op) in self.patterns: - if mtag.startswith(pattern[:-1]): - minion = data['id'] - if op == 'ping_return': - self.minions.add(minion) - if self.targeted_minions == self.minions: - self.event.io_loop.spawn_callback(self.start_batch) - elif op == 'find_job_return': - if data.get("return", None): - self.find_job_returned.add(minion) - elif op == 'batch_run': - if minion in self.active: - self.active.remove(minion) - self.done_minions.add(minion) - self.event.io_loop.spawn_callback(self.schedule_next) + try: + mtag, data = self.event.unpack(raw, self.event.serial) + for (pattern, op) in self.patterns: + if mtag.startswith(pattern[:-1]): + minion = data['id'] + if op == 'ping_return': + self.minions.add(minion) + if self.targeted_minions == self.minions: + self.event.io_loop.spawn_callback(self.start_batch) + elif op == 'find_job_return': + if data.get("return", None): + self.find_job_returned.add(minion) + elif op == 'batch_run': + if minion in self.active: + self.active.remove(minion) + self.done_minions.add(minion) + self.event.io_loop.spawn_callback(self.schedule_next) + except Exception as ex: + log.error("Exception occured while processing event: {}".format(ex)) def _get_next(self): to_run = self.minions.difference( @@ -146,54 +148,59 @@ class BatchAsync(object): if timedout_minions: self.schedule_next() - if running: + if self.event and running: self.find_job_returned = self.find_job_returned.difference(running) self.event.io_loop.spawn_callback(self.find_job, running) @tornado.gen.coroutine def find_job(self, minions): - not_done = minions.difference(self.done_minions).difference(self.timedout_minions) - - if not_done: - jid = self.jid_gen() - find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid) - self.patterns.add((find_job_return_pattern, "find_job_return")) - self.event.subscribe(find_job_return_pattern, match_type='glob') - - ret = yield self.local.run_job_async( - not_done, - 'saltutil.find_job', - [self.batch_jid], - 'list', - gather_job_timeout=self.opts['gather_job_timeout'], - jid=jid, - **self.eauth) - yield tornado.gen.sleep(self.opts['gather_job_timeout']) - self.event.io_loop.spawn_callback( - self.check_find_job, - not_done, - jid) + if self.event: + not_done = minions.difference(self.done_minions).difference(self.timedout_minions) + try: + if not_done: + jid = self.jid_gen() + find_job_return_pattern = 'salt/job/{0}/ret/*'.format(jid) + self.patterns.add((find_job_return_pattern, "find_job_return")) + self.event.subscribe(find_job_return_pattern, match_type='glob') + ret = yield self.local.run_job_async( + not_done, + 'saltutil.find_job', + [self.batch_jid], + 'list', + gather_job_timeout=self.opts['gather_job_timeout'], + jid=jid, + **self.eauth) + yield tornado.gen.sleep(self.opts['gather_job_timeout']) + if self.event: + self.event.io_loop.spawn_callback( + self.check_find_job, + not_done, + jid) + except Exception as ex: + log.error("Exception occured handling batch async: {}. Aborting execution.".format(ex)) + self.close_safe() @tornado.gen.coroutine def start(self): - self.__set_event_handler() - ping_return = yield self.local.run_job_async( - self.opts['tgt'], - 'test.ping', - [], - self.opts.get( - 'selected_target_option', - self.opts.get('tgt_type', 'glob') - ), - gather_job_timeout=self.opts['gather_job_timeout'], - jid=self.ping_jid, - metadata=self.metadata, - **self.eauth) - self.targeted_minions = set(ping_return['minions']) - #start batching even if not all minions respond to ping - yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout']) - self.event.io_loop.spawn_callback(self.start_batch) - + if self.event: + self.__set_event_handler() + ping_return = yield self.local.run_job_async( + self.opts['tgt'], + 'test.ping', + [], + self.opts.get( + 'selected_target_option', + self.opts.get('tgt_type', 'glob') + ), + gather_job_timeout=self.opts['gather_job_timeout'], + jid=self.ping_jid, + metadata=self.metadata, + **self.eauth) + self.targeted_minions = set(ping_return['minions']) + #start batching even if not all minions respond to ping + yield tornado.gen.sleep(self.batch_presence_ping_timeout or self.opts['gather_job_timeout']) + if self.event: + self.event.io_loop.spawn_callback(self.start_batch) @tornado.gen.coroutine def start_batch(self): @@ -206,7 +213,8 @@ class BatchAsync(object): "metadata": self.metadata } ret = self.event.fire_event(data, "salt/batch/{0}/start".format(self.batch_jid)) - self.event.io_loop.spawn_callback(self.run_next) + if self.event: + self.event.io_loop.spawn_callback(self.run_next) @tornado.gen.coroutine def end_batch(self): @@ -221,11 +229,21 @@ class BatchAsync(object): "metadata": self.metadata } self.event.fire_event(data, "salt/batch/{0}/done".format(self.batch_jid)) - for (pattern, label) in self.patterns: - if label in ["ping_return", "batch_run"]: - self.event.unsubscribe(pattern, match_type='glob') - del self - gc.collect() + + # release to the IOLoop to allow the event to be published + # before closing batch async execution + yield tornado.gen.sleep(1) + self.close_safe() + + def close_safe(self): + for (pattern, label) in self.patterns: + self.event.unsubscribe(pattern, match_type='glob') + self.event.remove_event_handler(self.__event_handler) + self.event = None + self.local = None + self.ioloop = None + del self + gc.collect() @tornado.gen.coroutine def schedule_next(self): @@ -233,7 +251,8 @@ class BatchAsync(object): self.scheduled = True # call later so that we maybe gather more returns yield tornado.gen.sleep(self.batch_delay) - self.event.io_loop.spawn_callback(self.run_next) + if self.event: + self.event.io_loop.spawn_callback(self.run_next) @tornado.gen.coroutine def run_next(self): @@ -254,17 +273,20 @@ class BatchAsync(object): metadata=self.metadata) yield tornado.gen.sleep(self.opts['timeout']) - self.event.io_loop.spawn_callback(self.find_job, set(next_batch)) + + # The batch can be done already at this point, which means no self.event + if self.event: + self.event.io_loop.spawn_callback(self.find_job, set(next_batch)) except Exception as ex: - log.error("Error in scheduling next batch: %s", ex) + log.error("Error in scheduling next batch: %s. Aborting execution", ex) self.active = self.active.difference(next_batch) + self.close_safe() else: yield self.end_batch() gc.collect() def __del__(self): self.local = None - self.event.remove_event_handler(self.__event_handler) self.event = None self.ioloop = None gc.collect() -- 2.23.0
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor