Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-12-SP3:GA
salt.10899
add-parallel-support-for-orchestrations.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File add-parallel-support-for-orchestrations.patch of Package salt.10899
From cd89919706697e0b4bf0648443a2ec1edf2a52a0 Mon Sep 17 00:00:00 2001 From: Matt Phillips <mphillips81@bloomberg.net> Date: Tue, 13 Feb 2018 19:46:23 -0500 Subject: [PATCH] add parallel support for orchestrations originally the parallel global state requisite did not work correctly when invoked under an orch - this fixes that; as well as running any other saltmod state (function, runner, wheel). * join() parallel process instead of a recursive sleep its not clear to me why the recursive calls were chosen originally. this should address https://github.com/saltstack/salt/issues/43668 * revisit previous join() behavior in check_requisites rather than join()'ing on each running process, we instead use check_running to assert completion of our processes. This should provide more correct timing results, as measuring durations of a longer running join()'d process could trample a shorter parallel process that just happened to be checked after instead of before. * record start/stop duration for parallel processes separately previously durations were only recording the time to spawn the multiprocessing proc, not the actual time of completion, which is completely wrong. This should capture the full duration correctly now. We unfortunately have to duplicate start & complete times instead of using the passed in start_time attr, as that value is only a time (not date), so it is impossible to re-calculate the full duration based on that alone (ie, what happens if start_time is 23:59 with a roll-over to the next day). This fixes #44828 * cherry-pick cdata KeyError prevention from #39832 @ninja- noticed there was some useful code already in _call_parallel_target to mitigate KeyErrors for potentially empty cdata, but it wasnt being executed due to the invoking method making the same mistake before calling it. this moves that code up to eliminate that potential stacktrace. this should close #39832 * add integration test to runners/test_state to exercise parallel this should hopefully exercise a few different facets of parallel that were previously not covered in the code base. * removing prereq from test orch seems to be encountering unrelated preexisting failures in the functionality unrelated to my changes. * fix parallel mode py3 compatibility parallel: True codepath incompatibilities uncovered by the added tests. additionally use salt.serializers.msgpack to avoid other py2/py3/back compat issues. --- salt/modules/saltutil.py | 3 + salt/runner.py | 2 +- salt/runners/state.py | 1 + salt/state.py | 72 ++++++++++++--------- salt/utils/files.py | 6 +- tests/integration/runners/test_state.py | 86 +++++++++++++++++++++++++ 6 files changed, 136 insertions(+), 34 deletions(-) diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py index b951983f5c..f5717db443 100644 --- a/salt/modules/saltutil.py +++ b/salt/modules/saltutil.py @@ -1506,6 +1506,9 @@ def runner(name, arg=None, kwarg=None, full_return=False, saltenv='base', jid=No if 'saltenv' in aspec.args: kwarg['saltenv'] = saltenv + if name in ['state.orchestrate', 'state.orch', 'state.sls']: + kwarg['orchestration_jid'] = jid + if jid: salt.utils.event.fire_args( __opts__, diff --git a/salt/runner.py b/salt/runner.py index fea1031abb..ec389a45b0 100644 --- a/salt/runner.py +++ b/salt/runner.py @@ -232,7 +232,7 @@ class Runner(RunnerClient): else: user = salt.utils.user.get_specific_user() - if low['fun'] in ('state.orchestrate', 'state.orch'): + if low['fun'] in ['state.orchestrate', 'state.orch', 'state.sls']: low['kwarg']['orchestration_jid'] = async_pub['jid'] # Run the runner! diff --git a/salt/runners/state.py b/salt/runners/state.py index a0e65a49df..993d00055a 100644 --- a/salt/runners/state.py +++ b/salt/runners/state.py @@ -103,6 +103,7 @@ def orchestrate(mods, saltenv=saltenv, pillarenv=pillarenv, pillar_enc=pillar_enc, + __pub_jid=orchestration_jid, orchestration_jid=orchestration_jid) ret = {'data': {minion.opts['id']: running}, 'outputter': 'highstate'} res = __utils__['state.check_result'](ret['data']) diff --git a/salt/state.py b/salt/state.py index 4a59b77305..2746622ab2 100644 --- a/salt/state.py +++ b/salt/state.py @@ -43,6 +43,7 @@ import salt.utils.platform import salt.utils.process import salt.utils.url import salt.syspaths as syspaths +from salt.serializers.msgpack import serialize as msgpack_serialize, deserialize as msgpack_deserialize from salt.template import compile_template, compile_template_str from salt.exceptions import ( SaltRenderError, @@ -54,11 +55,11 @@ from salt.utils.locales import sdecode import salt.utils.yamlloader as yamlloader # Import third party libs +from msgpack import UnpackValueError # pylint: disable=import-error,no-name-in-module,redefined-builtin from salt.ext import six from salt.ext.six.moves import map, range, reload_module # pylint: enable=import-error,no-name-in-module,redefined-builtin -import msgpack log = logging.getLogger(__name__) @@ -1695,27 +1696,20 @@ class State(object): errors.extend(req_in_errors) return req_in_high, errors - def _call_parallel_target(self, cdata, low): + def _call_parallel_target(self, name, cdata, low): ''' The target function to call that will create the parallel thread/process ''' + # we need to re-record start/end duration here because it is impossible to + # correctly calculate further down the chain + utc_start_time = datetime.datetime.utcnow() + tag = _gen_tag(low) try: ret = self.states[cdata['full']](*cdata['args'], **cdata['kwargs']) except Exception: trb = traceback.format_exc() - # There are a number of possibilities to not have the cdata - # populated with what we might have expected, so just be smart - # enough to not raise another KeyError as the name is easily - # guessable and fallback in all cases to present the real - # exception to the user - if len(cdata['args']) > 0: - name = cdata['args'][0] - elif 'name' in cdata['kwargs']: - name = cdata['kwargs']['name'] - else: - name = low.get('name', low.get('__id__')) ret = { 'result': False, 'name': name, @@ -1723,6 +1717,13 @@ class State(object): 'comment': 'An exception occurred in this state: {0}'.format( trb) } + + utc_finish_time = datetime.datetime.utcnow() + delta = (utc_finish_time - utc_start_time) + # duration in milliseconds.microseconds + duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0 + ret['duration'] = duration + troot = os.path.join(self.opts['cachedir'], self.jid) tfile = os.path.join(troot, _clean_tag(tag)) if not os.path.isdir(troot): @@ -1733,17 +1734,26 @@ class State(object): # and the attempt, we are safe to pass pass with salt.utils.files.fopen(tfile, 'wb+') as fp_: - fp_.write(msgpack.dumps(ret)) + fp_.write(msgpack_serialize(ret)) def call_parallel(self, cdata, low): ''' Call the state defined in the given cdata in parallel ''' + # There are a number of possibilities to not have the cdata + # populated with what we might have expected, so just be smart + # enough to not raise another KeyError as the name is easily + # guessable and fallback in all cases to present the real + # exception to the user + name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name') + if not name: + name = low.get('name', low.get('__id__')) + proc = salt.utils.process.MultiprocessingProcess( target=self._call_parallel_target, - args=(cdata, low)) + args=(name, cdata, low)) proc.start() - ret = {'name': cdata['args'][0], + ret = {'name': name, 'result': None, 'changes': {}, 'comment': 'Started in a seperate process', @@ -1892,12 +1902,10 @@ class State(object): # enough to not raise another KeyError as the name is easily # guessable and fallback in all cases to present the real # exception to the user - if len(cdata['args']) > 0: - name = cdata['args'][0] - elif 'name' in cdata['kwargs']: - name = cdata['kwargs']['name'] - else: + name = (cdata.get('args') or [None])[0] or cdata['kwargs'].get('name') + if not name: name = low.get('name', low.get('__id__')) + ret = { 'result': False, 'name': name, @@ -1938,7 +1946,7 @@ class State(object): ret['start_time'] = local_start_time.time().isoformat() delta = (utc_finish_time - utc_start_time) # duration in milliseconds.microseconds - duration = (delta.seconds * 1000000 + delta.microseconds)/1000.0 + duration = (delta.seconds * 1000000 + delta.microseconds) / 1000.0 ret['duration'] = duration ret['__id__'] = low['__id__'] log.info( @@ -2106,7 +2114,7 @@ class State(object): while True: if self.reconcile_procs(running): break - time.sleep(0.01) + time.sleep(0.0001) ret = dict(list(disabled.items()) + list(running.items())) return ret @@ -2138,8 +2146,8 @@ class State(object): tries = 0 with salt.utils.files.fopen(pause_path, 'rb') as fp_: try: - pdat = msgpack.loads(fp_.read()) - except msgpack.UnpackValueError: + pdat = msgpack_deserialize(fp_.read()) + except UnpackValueError: # Reading race condition if tries > 10: # Break out if there are a ton of read errors @@ -2185,7 +2193,7 @@ class State(object): 'changes': {}} try: with salt.utils.files.fopen(ret_cache, 'rb') as fp_: - ret = msgpack.loads(fp_.read()) + ret = msgpack_deserialize(fp_.read()) except (OSError, IOError): ret = {'result': False, 'comment': 'Parallel cache failure', @@ -2298,16 +2306,18 @@ class State(object): run_dict = self.pre else: run_dict = running + + while True: + if self.reconcile_procs(run_dict): + break + time.sleep(0.0001) + for chunk in chunks: tag = _gen_tag(chunk) if tag not in run_dict: req_stats.add('unmet') continue - if run_dict[tag].get('proc'): - # Run in parallel, first wait for a touch and then recheck - time.sleep(0.01) - return self.check_requisite(low, running, chunks, pre) - if r_state.startswith('onfail'): + if r_state == 'onfail': if run_dict[tag]['result'] is True: req_stats.add('onfail') # At least one state is OK continue diff --git a/salt/utils/files.py b/salt/utils/files.py index 3ca73b9db8..d22bce9e19 100644 --- a/salt/utils/files.py +++ b/salt/utils/files.py @@ -16,7 +16,6 @@ import stat import subprocess import tempfile import time -import urllib # Import Salt libs import salt.utils.path @@ -36,6 +35,9 @@ except ImportError: # fcntl is not available on windows HAS_FCNTL = False +from salt.ext.six.moves.urllib.parse import quote # pylint: disable=no-name-in-module + + log = logging.getLogger(__name__) LOCAL_PROTOS = ('', 'file') @@ -563,7 +565,7 @@ def safe_filename_leaf(file_basename): :codeauthor: Damon Atkins <https://github.com/damon-atkins> ''' def _replace(re_obj): - return urllib.quote(re_obj.group(0), safe='') + return quote(re_obj.group(0), safe=u'') if not isinstance(file_basename, six.text_type): # the following string is not prefixed with u return re.sub('[\\\\:/*?"<>|]', diff --git a/tests/integration/runners/test_state.py b/tests/integration/runners/test_state.py index dac7b3f033..312fcffc01 100644 --- a/tests/integration/runners/test_state.py +++ b/tests/integration/runners/test_state.py @@ -6,10 +6,12 @@ Tests for the state runner # Import Python Libs from __future__ import absolute_import, print_function, unicode_literals import errno +import logging import os import shutil import signal import tempfile +import time import textwrap import threading from salt.ext.six.moves import queue @@ -31,6 +33,8 @@ import salt.utils.yaml # Import 3rd-party libs from salt.ext import six +log = logging.getLogger(__name__) + class StateRunnerTest(ShellCase): ''' @@ -351,3 +355,85 @@ class OrchEventTest(ShellCase): finally: del listener signal.alarm(0) + + def test_parallel_orchestrations(self): + ''' + Test to confirm that the parallel state requisite works in orch + we do this by running 10 test.sleep's of 10 seconds, and insure it only takes roughly 10s + ''' + self.write_conf({ + 'fileserver_backend': ['roots'], + 'file_roots': { + 'base': [self.base_env], + }, + }) + + orch_sls = os.path.join(self.base_env, 'test_par_orch.sls') + + with salt.utils.fopen(orch_sls, 'w') as fp_: + fp_.write(textwrap.dedent(''' + {% for count in range(1, 20) %} + + sleep {{ count }}: + module.run: + - name: test.sleep + - length: 10 + - parallel: True + + {% endfor %} + + sleep 21: + module.run: + - name: test.sleep + - length: 10 + - parallel: True + - require: + - module: sleep 1 + ''')) + + orch_sls = os.path.join(self.base_env, 'test_par_orch.sls') + + listener = salt.utils.event.get_event( + 'master', + sock_dir=self.master_opts['sock_dir'], + transport=self.master_opts['transport'], + opts=self.master_opts) + + start_time = time.time() + jid = self.run_run_plus( + 'state.orchestrate', + 'test_par_orch', + __reload_config=True).get('jid') + + if jid is None: + raise Exception('jid missing from run_run_plus output') + + signal.signal(signal.SIGALRM, self.alarm_handler) + signal.alarm(self.timeout) + received = False + try: + while True: + event = listener.get_event(full=True) + if event is None: + continue + + # if we receive the ret for this job before self.timeout (60), + # the test is implicitly sucessful; if it were happening in serial it would be + # atleast 110 seconds. + if event['tag'] == 'salt/run/{0}/ret'.format(jid): + received = True + # Don't wrap this in a try/except. We want to know if the + # data structure is different from what we expect! + ret = event['data']['return']['data']['master'] + for state in ret: + data = ret[state] + # we expect each duration to be greater than 10s + self.assertTrue(data['duration'] > 10000) + break + + # self confirm that the total runtime is roughly 30s (left 10s for buffer) + self.assertTrue((time.time() - start_time) < 40) + finally: + self.assertTrue(received) + del listener + signal.alarm(0) -- 2.20.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor