Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-15-SP1:GA
salt.10902
python-3.7-support.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File python-3.7-support.patch of Package salt.10902
From 1c18b65e6cee9f84843f99f9f30b6d088f687930 Mon Sep 17 00:00:00 2001 From: Dmitry Kuzmenko <dmitry.kuzmenko@dsr-corporation.com> Date: Mon, 16 Apr 2018 22:41:44 +0300 Subject: [PATCH] Python 3.7 support Tornado 5.0 compatibility fixes Allow running pytest>=3.5.0 Run off of a temporary config use salt.utils.zeromq add io_loop handling to runtests engine fix pylint fix _create_stream and tornado 5.0 This should be the last fix for tornado 5.0 allow using tornado 5.0 Fix last test for tornado Thank you so much @bdarnell for helping me get this last fix. With this, the test suite passes with tornado 5. fix pylint Rename module to full wording Fix imports Fix docstring typo Fix CLI config Fix comments Fix docstrings Rename async function to asynchronous Change internal function signatures to avoid reserved word Remove internal variables/properties with the reserved words Fix local opts from CLI Fix log error/info/warning and exception messages Cleanup docstrings at module level Fix function signatures in Cassandra module Lintfix: PEP8 requires two empty lines Deprecate 'async' parameter in Mandrill API Revert api call: it is about "functionname_async" suffix. Add 'async' backward compatibility Update docstring Use kwargs instead of directly named parameters Support original API Fix nag-message Keep runner API unchanged fix unicode literals Remove async keyword, moving it into the kwargs. Fix configuration setting --- requirements/base.txt | 2 +- requirements/dev_python27.txt | 4 +- requirements/dev_python34.txt | 4 +- requirements/pytest.txt | 2 +- salt/client/api.py | 6 +- salt/client/mixins.py | 4 +- salt/cloud/clouds/msazure.py | 2 +- salt/cloud/clouds/profitbricks.py | 2 +- salt/cloud/clouds/xen.py | 2 +- salt/daemons/masterapi.py | 6 +- salt/engines/ircbot.py | 5 +- salt/engines/slack.py | 4 +- salt/engines/webhook.py | 3 +- salt/master.py | 6 +- salt/minion.py | 19 +-- salt/modules/cassandra_cql.py | 22 +-- salt/modules/mandrill.py | 21 ++- salt/modules/saltutil.py | 6 +- salt/netapi/rest_cherrypy/app.py | 4 +- salt/netapi/rest_cherrypy/event_processor.py | 2 +- salt/netapi/rest_tornado/__init__.py | 2 +- salt/netapi/rest_tornado/event_processor.py | 2 +- salt/netapi/rest_tornado/saltnado.py | 16 +-- .../rest_tornado/saltnado_websockets.py | 2 +- salt/returners/cassandra_cql_return.py | 8 +- salt/runner.py | 10 +- salt/thorium/runner.py | 6 +- salt/thorium/wheel.py | 4 +- salt/transport/client.py | 2 +- salt/transport/ipc.py | 47 +++---- salt/transport/server.py | 2 +- salt/transport/tcp.py | 127 +++++++++--------- salt/utils/{async.py => asynchronous.py} | 22 +-- salt/utils/event.py | 18 +-- salt/utils/process.py | 4 +- salt/utils/thin.py | 2 +- salt/wheel/__init__.py | 4 +- tests/conftest.py | 25 +--- .../files/engines/runtests_engine.py | 11 +- .../netapi/rest_tornado/test_app.py | 2 +- tests/support/case.py | 27 +++- tests/unit/test_minion.py | 7 +- tests/unit/utils/test_async.py | 20 +-- 43 files changed, 254 insertions(+), 242 deletions(-) rename salt/utils/{async.py => asynchronous.py} (81%) diff --git a/requirements/base.txt b/requirements/base.txt index de490ed07f..08d836574f 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -3,6 +3,6 @@ msgpack-python>0.3,!=0.5.5 PyYAML MarkupSafe requests>=1.0.0 -tornado>=4.2.1,<5.0 +tornado>=4.2.1,<6.0 # Required by Tornado to handle threads stuff. futures>=2.0 diff --git a/requirements/dev_python27.txt b/requirements/dev_python27.txt index 8834842ecd..42ba18893a 100644 --- a/requirements/dev_python27.txt +++ b/requirements/dev_python27.txt @@ -1,4 +1,4 @@ --r base.txt +-r base-py2.txt mock apache-libcloud>=0.14.0 @@ -6,7 +6,7 @@ boto>=2.32.1 boto3>=1.2.1 moto>=0.3.6 SaltPyLint>=v2017.3.6 -pytest +pytest>=3.5.0 git+https://github.com/eisensheng/pytest-catchlog.git@develop#egg=Pytest-catchlog git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt testinfra>=1.7.0 diff --git a/requirements/dev_python34.txt b/requirements/dev_python34.txt index 19a2574d62..25b9cf45cf 100644 --- a/requirements/dev_python34.txt +++ b/requirements/dev_python34.txt @@ -1,4 +1,4 @@ --r base.txt +-r base-py3.txt mock apache-libcloud>=0.14.0 @@ -11,7 +11,7 @@ moto>=0.3.6 # prevent it from being successfully installed (at least on Python 3.4). httpretty SaltPyLint>=v2017.2.29 -pytest +pytest>=3.5.0 git+https://github.com/saltstack/pytest-salt.git@master#egg=pytest-salt git+https://github.com/eisensheng/pytest-catchlog.git@develop#egg=Pytest-catchlog testinfra>=1.7.0 diff --git a/requirements/pytest.txt b/requirements/pytest.txt index 60e4e44e0e..e5ec2acff3 100644 --- a/requirements/pytest.txt +++ b/requirements/pytest.txt @@ -1,3 +1,3 @@ -pytest +pytest>=3.5.0 pytest-helpers-namespace pytest-tempdir diff --git a/salt/client/api.py b/salt/client/api.py index ac6f6de24a..b2aab460fa 100644 --- a/salt/client/api.py +++ b/salt/client/api.py @@ -93,7 +93,7 @@ class APIClient(object): The cmd dict items are as follows: - mode: either 'sync' or 'async'. Defaults to 'async' if missing + mode: either 'sync' or 'asynchronous'. Defaults to 'asynchronous' if missing fun: required. If the function is to be run on the master using either a wheel or runner client then the fun: includes either 'wheel.' or 'runner.' as a prefix and has three parts separated by '.'. @@ -120,7 +120,7 @@ class APIClient(object): ''' cmd = dict(cmd) # make copy client = 'minion' # default to local minion client - mode = cmd.get('mode', 'async') # default to 'async' + mode = cmd.get('mode', 'async') # check for wheel or runner prefix to fun name to use wheel or runner client funparts = cmd.get('fun', '').split('.') @@ -162,7 +162,7 @@ class APIClient(object): ''' return self.runnerClient.master_call(**kwargs) - runner_sync = runner_async # always runner async, so works in either mode + runner_sync = runner_async # always runner asynchronous, so works in either mode def wheel_sync(self, **kwargs): ''' diff --git a/salt/client/mixins.py b/salt/client/mixins.py index 29b6077661..4182fa5b81 100644 --- a/salt/client/mixins.py +++ b/salt/client/mixins.py @@ -458,7 +458,7 @@ class SyncClientMixin(object): class AsyncClientMixin(object): ''' - A mixin for *Client interfaces to enable easy async function execution + A mixin for *Client interfaces to enable easy asynchronous function execution ''' client = None tag_prefix = None @@ -510,7 +510,7 @@ class AsyncClientMixin(object): tag = salt.utils.event.tagify(jid, prefix=self.tag_prefix) return {'tag': tag, 'jid': jid} - def async(self, fun, low, user='UNKNOWN', pub=None): + def asynchronous(self, fun, low, user='UNKNOWN', pub=None): ''' Execute the function in a multiprocess and return the event tag to use to watch for the return diff --git a/salt/cloud/clouds/msazure.py b/salt/cloud/clouds/msazure.py index 9d48616db6..0994b0c3a4 100644 --- a/salt/cloud/clouds/msazure.py +++ b/salt/cloud/clouds/msazure.py @@ -885,7 +885,7 @@ def _wait_for_async(conn, request_id): while result.status == 'InProgress': count = count + 1 if count > 120: - raise ValueError('Timed out waiting for async operation to complete.') + raise ValueError('Timed out waiting for asynchronous operation to complete.') time.sleep(5) result = conn.get_operation_status(request_id) diff --git a/salt/cloud/clouds/profitbricks.py b/salt/cloud/clouds/profitbricks.py index 1ce0a162f0..8d13bf7b70 100644 --- a/salt/cloud/clouds/profitbricks.py +++ b/salt/cloud/clouds/profitbricks.py @@ -1098,7 +1098,7 @@ def _wait_for_completion(conn, promise, wait_timeout, msg): ) raise Exception( - 'Timed out waiting for async operation {0} "{1}" to complete.'.format( + 'Timed out waiting for asynchronous operation {0} "{1}" to complete.'.format( msg, six.text_type(promise['requestId']) ) ) diff --git a/salt/cloud/clouds/xen.py b/salt/cloud/clouds/xen.py index 5823fd7cc9..e29b6a98f1 100644 --- a/salt/cloud/clouds/xen.py +++ b/salt/cloud/clouds/xen.py @@ -724,7 +724,7 @@ def _wait_for_ip(name, session): def _run_async_task(task=None, session=None): ''' - Run XenAPI task in async mode to prevent timeouts + Run XenAPI task in asynchronous mode to prevent timeouts ''' if task is None or session is None: return None diff --git a/salt/daemons/masterapi.py b/salt/daemons/masterapi.py index 4fe00934cd..e23b370b63 100644 --- a/salt/daemons/masterapi.py +++ b/salt/daemons/masterapi.py @@ -1068,9 +1068,9 @@ class LocalFuncs(object): try: fun = load.pop('fun') runner_client = salt.runner.RunnerClient(self.opts) - return runner_client.async(fun, - load.get('kwarg', {}), - username) + return runner_client.asynchronous(fun, + load.get('kwarg', {}), + username) except Exception as exc: log.exception('Exception occurred while introspecting %s') return {'error': {'name': exc.__class__.__name__, diff --git a/salt/engines/ircbot.py b/salt/engines/ircbot.py index e3b8778cd2..c93eb5fb95 100644 --- a/salt/engines/ircbot.py +++ b/salt/engines/ircbot.py @@ -97,14 +97,15 @@ class IRCClient(object): self.allow_nicks = allow_nicks self.disable_query = disable_query self.io_loop = tornado.ioloop.IOLoop(make_current=False) + self.io_loop.make_current() self._connect() def _connect(self): _sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) if self.ssl is True: - self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE}, io_loop=self.io_loop) + self._stream = tornado.iostream.SSLIOStream(_sock, ssl_options={'cert_reqs': ssl.CERT_NONE}) else: - self._stream = tornado.iostream.IOStream(_sock, io_loop=self.io_loop) + self._stream = tornado.iostream.IOStream(_sock) self._stream.set_close_callback(self.on_closed) self._stream.connect((self.host, self.port), self.on_connect) diff --git a/salt/engines/slack.py b/salt/engines/slack.py index 6537c29412..b761d388db 100644 --- a/salt/engines/slack.py +++ b/salt/engines/slack.py @@ -717,7 +717,7 @@ class SlackClient(object): :param interval: time to wait between ending a loop and beginning the next ''' - log.debug('Going to run a command async') + log.debug('Going to run a command asynchronous') runner_functions = sorted(salt.runner.Runner(__opts__).functions) # Parse args and kwargs cmd = msg['cmdline'][0] @@ -739,7 +739,7 @@ class SlackClient(object): log.debug('Command %s will run via runner_functions', cmd) # pylint is tripping # pylint: disable=missing-whitespace-after-comma - job_id_dict = runner.async(cmd, {'args': args, 'kwargs': kwargs}) + job_id_dict = runner.asynchronous(cmd, {'args': args, 'kwargs': kwargs}) job_id = job_id_dict['jid'] # Default to trying to run as a client module. diff --git a/salt/engines/webhook.py b/salt/engines/webhook.py index a00eb813a1..6c92113fb7 100644 --- a/salt/engines/webhook.py +++ b/salt/engines/webhook.py @@ -81,6 +81,7 @@ def start(address=None, port=5000, ssl_crt=None, ssl_key=None): if all([ssl_crt, ssl_key]): ssl_options = {"certfile": ssl_crt, "keyfile": ssl_key} io_loop = tornado.ioloop.IOLoop(make_current=False) - http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options, io_loop=io_loop) + io_loop.make_current() + http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options) http_server.listen(port, address=address) io_loop.start() diff --git a/salt/master.py b/salt/master.py index 6fb37ece1a..6d34d13de4 100644 --- a/salt/master.py +++ b/salt/master.py @@ -1878,9 +1878,9 @@ class ClearFuncs(object): try: fun = clear_load.pop('fun') runner_client = salt.runner.RunnerClient(self.opts) - return runner_client.async(fun, - clear_load.get('kwarg', {}), - username) + return runner_client.asynchronous(fun, + clear_load.get('kwarg', {}), + username) except Exception as exc: log.error('Exception occurred while introspecting %s: %s', fun, exc) return {'error': {'name': exc.__class__.__name__, diff --git a/salt/minion.py b/salt/minion.py index 4a30e70be5..173a43d06a 100644 --- a/salt/minion.py +++ b/salt/minion.py @@ -920,7 +920,7 @@ class MinionManager(MinionBase): install_zmq() self.io_loop = ZMQDefaultLoop.current() self.process_manager = ProcessManager(name='MultiMinionProcessManager') - self.io_loop.spawn_callback(self.process_manager.run, async=True) + self.io_loop.spawn_callback(self.process_manager.run, **{'async': True}) # Tornado backward compat def __del__(self): self.destroy() @@ -1117,7 +1117,7 @@ class Minion(MinionBase): time.sleep(sleep_time) self.process_manager = ProcessManager(name='MinionProcessManager') - self.io_loop.spawn_callback(self.process_manager.run, async=True) + self.io_loop.spawn_callback(self.process_manager.run, **{'async': True}) # We don't have the proxy setup yet, so we can't start engines # Engines need to be able to access __proxy__ if not salt.utils.platform.is_proxy(): @@ -2487,13 +2487,15 @@ class Minion(MinionBase): if beacons and self.connected: self._fire_master(events=beacons) - new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback(handle_beacons, loop_interval * 1000, io_loop=self.io_loop) + new_periodic_callbacks['beacons'] = tornado.ioloop.PeriodicCallback( + handle_beacons, loop_interval * 1000) if before_connect: # Make sure there is a chance for one iteration to occur before connect handle_beacons() if 'cleanup' not in self.periodic_callbacks: - new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop) + new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback( + self._fallback_cleanups, loop_interval * 1000) # start all the other callbacks for periodic_cb in six.itervalues(new_periodic_callbacks): @@ -2545,14 +2547,15 @@ class Minion(MinionBase): # TODO: actually listen to the return and change period def handle_schedule(): self.process_schedule(self, loop_interval) - new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000, io_loop=self.io_loop) + new_periodic_callbacks['schedule'] = tornado.ioloop.PeriodicCallback(handle_schedule, 1000) if before_connect: # Make sure there is a chance for one iteration to occur before connect handle_schedule() if 'cleanup' not in self.periodic_callbacks: - new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback(self._fallback_cleanups, loop_interval * 1000, io_loop=self.io_loop) + new_periodic_callbacks['cleanup'] = tornado.ioloop.PeriodicCallback( + self._fallback_cleanups, loop_interval * 1000) # start all the other callbacks for periodic_cb in six.itervalues(new_periodic_callbacks): @@ -2609,7 +2612,7 @@ class Minion(MinionBase): self._fire_master('ping', 'minion_ping', sync=False, timeout_handler=ping_timeout_handler) except Exception: log.warning('Attempt to ping master failed.', exc_on_loglevel=logging.DEBUG) - self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000, io_loop=self.io_loop) + self.periodic_callbacks['ping'] = tornado.ioloop.PeriodicCallback(ping_master, ping_interval * 1000) self.periodic_callbacks['ping'].start() # add handler to subscriber @@ -3078,7 +3081,7 @@ class SyndicManager(MinionBase): # forward events every syndic_event_forward_timeout self.forward_events = tornado.ioloop.PeriodicCallback(self._forward_events, self.opts['syndic_event_forward_timeout'] * 1000, - io_loop=self.io_loop) + ) self.forward_events.start() # Make sure to gracefully handle SIGUSR1 diff --git a/salt/modules/cassandra_cql.py b/salt/modules/cassandra_cql.py index 82b211bddf..30db93dccc 100644 --- a/salt/modules/cassandra_cql.py +++ b/salt/modules/cassandra_cql.py @@ -93,6 +93,7 @@ from salt.exceptions import CommandExecutionError # Import 3rd-party libs from salt.ext import six from salt.ext.six.moves import range +import salt.utils.versions SSL_VERSION = 'ssl_version' @@ -128,7 +129,7 @@ def __virtual__(): def _async_log_errors(errors): - log.error('Cassandra_cql async call returned: %s', errors) + log.error('Cassandra_cql asynchronous call returned: %s', errors) def _load_properties(property_name, config_option, set_default=False, default=None): @@ -361,9 +362,8 @@ def cql_query(query, contact_points=None, port=None, cql_user=None, cql_pass=Non return ret -def cql_query_with_prepare(query, statement_name, statement_arguments, async=False, - callback_errors=None, - contact_points=None, port=None, cql_user=None, cql_pass=None): +def cql_query_with_prepare(query, statement_name, statement_arguments, callback_errors=None, contact_points=None, + port=None, cql_user=None, cql_pass=None, **kwargs): ''' Run a query on a Cassandra cluster and return a dictionary. @@ -377,8 +377,8 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal :type statement_name: str :param statement_arguments: Bind parameters for the SQL statement :type statement_arguments: list[str] - :param async: Run this query in asynchronous mode - :type async: bool + :param async: Run this query in asynchronous mode + :type async: bool :param callback_errors: Function to call after query runs if there is an error :type callback_errors: Function callable :param contact_points: The Cassandra cluster addresses, can either be a string or a list of IPs. @@ -401,12 +401,14 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal # Insert data asynchronously salt this-node cassandra_cql.cql_query_with_prepare "name_insert" "INSERT INTO USERS (first_name, last_name) VALUES (?, ?)" \ - statement_arguments=['John','Doe'], async=True + statement_arguments=['John','Doe'], asynchronous=True # Select data, should not be asynchronous because there is not currently a facility to return data from a future salt this-node cassandra_cql.cql_query_with_prepare "name_select" "SELECT * FROM USERS WHERE first_name=?" \ statement_arguments=['John'] ''' + # Backward-compatibility with Python 3.7: "async" is a reserved word + asynchronous = kwargs.get('async', False) try: cluster, session = _connect(contact_points=contact_points, port=port, cql_user=cql_user, cql_pass=cql_pass) @@ -431,7 +433,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal ret = [] try: - if async: + if asynchronous: future_results = session.execute_async(bound_statement.bind(statement_arguments)) # future_results.add_callbacks(_async_log_errors) else: @@ -441,7 +443,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal msg = "ERROR: Cassandra query failed: {0} reason: {1}".format(query, e) raise CommandExecutionError(msg) - if not async and results: + if not asynchronous and results: for result in results: values = {} for key, value in six.iteritems(result): @@ -456,7 +458,7 @@ def cql_query_with_prepare(query, statement_name, statement_arguments, async=Fal # If this was a synchronous call, then we either have an empty list # because there was no return, or we have a return - # If this was an async call we only return the empty list + # If this was an asynchronous call we only return the empty list return ret diff --git a/salt/modules/mandrill.py b/salt/modules/mandrill.py index 248939d09c..7044060154 100644 --- a/salt/modules/mandrill.py +++ b/salt/modules/mandrill.py @@ -24,6 +24,7 @@ import logging # Import Salt libs import salt.utils.json +import salt.utils.versions # import third party try: @@ -137,12 +138,13 @@ def _http_request(url, def send(message, - async=False, + asynchronous=False, ip_pool=None, send_at=None, api_url=None, api_version=None, - api_key=None): + api_key=None, + **kwargs): ''' Send out the email using the details from the ``message`` argument. @@ -151,14 +153,14 @@ def send(message, sent as dictionary with at fields as specified in the Mandrill API documentation. - async: ``False`` + asynchronous: ``False`` Enable a background sending mode that is optimized for bulk sending. - In async mode, messages/send will immediately return a status of - "queued" for every recipient. To handle rejections when sending in async + In asynchronous mode, messages/send will immediately return a status of + "queued" for every recipient. To handle rejections when sending in asynchronous mode, set up a webhook for the 'reject' event. Defaults to false for messages with no more than 10 recipients; messages with more than 10 recipients are always sent asynchronously, regardless of the value of - async. + asynchronous. ip_pool The name of the dedicated ip pool that should be used to send the @@ -229,6 +231,11 @@ def send(message, result: True ''' + if 'async' in kwargs: # Remove this in Sodium + salt.utils.versions.warn_until('Sodium', 'Parameter "async" is renamed to "asynchronous" ' + 'and will be removed in version {version}.') + asynchronous = bool(kwargs['async']) + params = _get_api_params(api_url=api_url, api_version=api_version, api_key=api_key) @@ -238,7 +245,7 @@ def send(message, data = { 'key': params['api_key'], 'message': message, - 'async': async, + 'async': asynchronous, 'ip_pool': ip_pool, 'send_at': send_at } diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py index 50d1f14ed9..b951983f5c 100644 --- a/salt/modules/saltutil.py +++ b/salt/modules/saltutil.py @@ -947,10 +947,11 @@ def refresh_pillar(): ret = False # Effectively a no-op, since we can't really return without an event system return ret + pillar_refresh = salt.utils.functools.alias_function(refresh_pillar, 'pillar_refresh') -def refresh_modules(async=True): +def refresh_modules(**kwargs): ''' Signal the minion to refresh the module and grain data @@ -964,8 +965,9 @@ def refresh_modules(async=True): salt '*' saltutil.refresh_modules ''' + asynchronous = bool(kwargs.get('async', True)) try: - if async: + if asynchronous: # If we're going to block, first setup a listener ret = __salt__['event.fire']({}, 'module_refresh') else: diff --git a/salt/netapi/rest_cherrypy/app.py b/salt/netapi/rest_cherrypy/app.py index d4b6c93f28..c1d333adcd 100644 --- a/salt/netapi/rest_cherrypy/app.py +++ b/salt/netapi/rest_cherrypy/app.py @@ -529,7 +529,7 @@ described above, the most effective and most scalable way to use both Salt and salt-api is to run commands asynchronously using the ``local_async``, ``runner_async``, and ``wheel_async`` clients. -Running async jobs results in being able to process 3x more commands per second +Running asynchronous jobs results in being able to process 3x more commands per second for ``LocalClient`` and 17x more commands per second for ``RunnerClient``, in addition to much less network traffic and memory requirements. Job returns can be fetched from Salt's job cache via the ``/jobs/<jid>`` endpoint, or they can @@ -2534,7 +2534,7 @@ class WebsocketEndpoint(object): parent_pipe, child_pipe = Pipe() handler.pipe = parent_pipe handler.opts = self.opts - # Process to handle async push to a client. + # Process to handle asynchronous push to a client. # Each GET request causes a process to be kicked off. proc = Process(target=event_stream, args=(handler, child_pipe)) proc.start() diff --git a/salt/netapi/rest_cherrypy/event_processor.py b/salt/netapi/rest_cherrypy/event_processor.py index e409a00180..f0cf6d361a 100644 --- a/salt/netapi/rest_cherrypy/event_processor.py +++ b/salt/netapi/rest_cherrypy/event_processor.py @@ -180,7 +180,7 @@ class SaltInfo(object): 'expr_type': 'list', 'mode': 'client', 'client': 'local', - 'async': 'local_async', + 'asynchronous': 'local_async', 'token': token, }) diff --git a/salt/netapi/rest_tornado/__init__.py b/salt/netapi/rest_tornado/__init__.py index 5ed2769068..5b4dc03bdb 100644 --- a/salt/netapi/rest_tornado/__init__.py +++ b/salt/netapi/rest_tornado/__init__.py @@ -128,6 +128,6 @@ def start(): raise SystemExit(1) try: - tornado.ioloop.IOLoop.instance().start() + tornado.ioloop.IOLoop.current().start() except KeyboardInterrupt: raise SystemExit(0) diff --git a/salt/netapi/rest_tornado/event_processor.py b/salt/netapi/rest_tornado/event_processor.py index d8c338836e..70a379e2c5 100644 --- a/salt/netapi/rest_tornado/event_processor.py +++ b/salt/netapi/rest_tornado/event_processor.py @@ -194,7 +194,7 @@ class SaltInfo(object): 'expr_type': 'list', 'mode': 'client', 'client': 'local', - 'async': 'local_async', + 'asynchronous': 'local_async', 'token': token, }) diff --git a/salt/netapi/rest_tornado/saltnado.py b/salt/netapi/rest_tornado/saltnado.py index 7f927e9473..c69b5e0976 100644 --- a/salt/netapi/rest_tornado/saltnado.py +++ b/salt/netapi/rest_tornado/saltnado.py @@ -202,14 +202,12 @@ import tornado.ioloop import tornado.web import tornado.gen from tornado.concurrent import Future -from zmq.eventloop import ioloop -from salt.ext import six # pylint: enable=import-error - -# instantiate the zmq IOLoop (specialized poller) -ioloop.install() +import salt.utils +salt.utils.zeromq.install_zmq() # salt imports +import salt.ext.six as six import salt.netapi import salt.utils.args import salt.utils.event @@ -245,7 +243,7 @@ def _json_dumps(obj, **kwargs): # # master side # - "runner" (done) -# - "wheel" (need async api...) +# - "wheel" (need asynchronous api...) AUTH_TOKEN_HEADER = 'X-Auth-Token' @@ -274,7 +272,7 @@ class Any(Future): class EventListener(object): ''' Class responsible for listening to the salt master event bus and updating - futures. This is the core of what makes this async, this allows us to do + futures. This is the core of what makes this asynchronous, this allows us to do non-blocking work in the main processes and "wait" for an event to happen ''' @@ -324,7 +322,7 @@ class EventListener(object): timeout=None ): ''' - Get an event (async of course) return a future that will get it later + Get an event (asynchronous of course) return a future that will get it later ''' # if the request finished, no reason to allow event fetching, since we # can't send back to the client @@ -629,7 +627,7 @@ class SaltAuthHandler(BaseSaltAPIHandler): # pylint: disable=W0223 self.write(self.serialize(ret)) - # TODO: make async? Underlying library isn't... and we ARE making disk calls :( + # TODO: make asynchronous? Underlying library isn't... and we ARE making disk calls :( def post(self): ''' :ref:`Authenticate <rest_tornado-auth>` against Salt's eauth system diff --git a/salt/netapi/rest_tornado/saltnado_websockets.py b/salt/netapi/rest_tornado/saltnado_websockets.py index 89cdfd039a..cf6d51852f 100644 --- a/salt/netapi/rest_tornado/saltnado_websockets.py +++ b/salt/netapi/rest_tornado/saltnado_websockets.py @@ -411,7 +411,7 @@ class FormattedEventsHandler(AllEventsHandler): # pylint: disable=W0223,W0232 'tgt': '*', 'token': self.token, 'mode': 'client', - 'async': 'local_async', + 'asynchronous': 'local_async', 'client': 'local' }) while True: diff --git a/salt/returners/cassandra_cql_return.py b/salt/returners/cassandra_cql_return.py index 8e92e32147..0ec8c2db27 100644 --- a/salt/returners/cassandra_cql_return.py +++ b/salt/returners/cassandra_cql_return.py @@ -204,7 +204,7 @@ def returner(ret): __salt__['cassandra_cql.cql_query_with_prepare'](query, 'returner_return', tuple(statement_arguments), - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not insert into salt_returns with Cassandra returner.') raise @@ -228,7 +228,7 @@ def returner(ret): __salt__['cassandra_cql.cql_query_with_prepare'](query, 'returner_minion', tuple(statement_arguments), - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not store minion ID with Cassandra returner.') raise @@ -270,7 +270,7 @@ def event_return(events): try: __salt__['cassandra_cql.cql_query_with_prepare'](query, 'salt_events', statement_arguments, - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not store events with Cassandra returner.') raise @@ -300,7 +300,7 @@ def save_load(jid, load, minions=None): try: __salt__['cassandra_cql.cql_query_with_prepare'](query, 'save_load', statement_arguments, - async=True) + asynchronous=True) except CommandExecutionError: log.critical('Could not save load in jids table.') raise diff --git a/salt/runner.py b/salt/runner.py index d96a8af38c..fea1031abb 100644 --- a/salt/runner.py +++ b/salt/runner.py @@ -240,13 +240,13 @@ class Runner(RunnerClient): if self.opts.get('eauth'): async_pub = self.cmd_async(low) else: - async_pub = self.async(self.opts['fun'], - low, - user=user, - pub=async_pub) + async_pub = self.asynchronous(self.opts['fun'], + low, + user=user, + pub=async_pub) # by default: info will be not enougth to be printed out ! log.warning( - 'Running in async mode. Results of this execution may ' + 'Running in asynchronous mode. Results of this execution may ' 'be collected by attaching to the master event bus or ' 'by examing the master job cache, if configured. ' 'This execution is running under tag %s', async_pub['tag'] diff --git a/salt/thorium/runner.py b/salt/thorium/runner.py index d6235d40e7..9545eac35c 100644 --- a/salt/thorium/runner.py +++ b/salt/thorium/runner.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -React by calling async runners +React by calling asynchronous runners ''' # Import python libs from __future__ import absolute_import, print_function, unicode_literals @@ -14,7 +14,7 @@ def cmd( arg=(), **kwargs): ''' - Execute a runner async: + Execute a runner asynchronous: USAGE: @@ -42,7 +42,7 @@ def cmd( func = name local_opts = {} local_opts.update(__opts__) - local_opts['async'] = True # ensure this will be run async + local_opts['async'] = True # ensure this will be run asynchronous local_opts.update({ 'fun': func, 'arg': arg, diff --git a/salt/thorium/wheel.py b/salt/thorium/wheel.py index 7c98eff4bd..e3c4bf1701 100644 --- a/salt/thorium/wheel.py +++ b/salt/thorium/wheel.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -React by calling async runners +React by calling asynchronous runners ''' # Import python libs from __future__ import absolute_import, print_function, unicode_literals @@ -14,7 +14,7 @@ def cmd( arg=(), **kwargs): ''' - Execute a runner async: + Execute a runner asynchronous: USAGE: diff --git a/salt/transport/client.py b/salt/transport/client.py index 86c4962f94..ca83ac9376 100644 --- a/salt/transport/client.py +++ b/salt/transport/client.py @@ -10,7 +10,7 @@ from __future__ import absolute_import, print_function, unicode_literals import logging # Import Salt Libs -from salt.utils.async import SyncWrapper +from salt.utils.asynchronous import SyncWrapper log = logging.getLogger(__name__) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index 7ea9bf8c87..84aa781ef0 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -130,11 +130,11 @@ class IPCServer(object): else: self.sock = tornado.netutil.bind_unix_socket(self.socket_path) - tornado.netutil.add_accept_handler( - self.sock, - self.handle_connection, - io_loop=self.io_loop, - ) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + tornado.netutil.add_accept_handler( + self.sock, + self.handle_connection, + ) self._started = True @tornado.gen.coroutine @@ -196,10 +196,10 @@ class IPCServer(object): log.trace('IPCServer: Handling connection ' 'to address: %s', address) try: - stream = IOStream( - connection, - io_loop=self.io_loop, - ) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + stream = IOStream( + connection, + ) self.io_loop.spawn_callback(self.handle_stream, stream) except Exception as exc: log.error('IPC streaming error: %s', exc) @@ -329,10 +329,10 @@ class IPCClient(object): break if self.stream is None: - self.stream = IOStream( - socket.socket(sock_type, socket.SOCK_STREAM), - io_loop=self.io_loop, - ) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + self.stream = IOStream( + socket.socket(sock_type, socket.SOCK_STREAM), + ) try: log.trace('IPCClient: Connecting to socket: %s', self.socket_path) @@ -510,11 +510,11 @@ class IPCMessagePublisher(object): else: self.sock = tornado.netutil.bind_unix_socket(self.socket_path) - tornado.netutil.add_accept_handler( - self.sock, - self.handle_connection, - io_loop=self.io_loop, - ) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + tornado.netutil.add_accept_handler( + self.sock, + self.handle_connection, + ) self._started = True @tornado.gen.coroutine @@ -545,17 +545,14 @@ class IPCMessagePublisher(object): def handle_connection(self, connection, address): log.trace('IPCServer: Handling connection to address: %s', address) try: + kwargs = {} if self.opts['ipc_write_buffer'] > 0: + kwargs['max_write_buffer_size'] = self.opts['ipc_write_buffer'] log.trace('Setting IPC connection write buffer: %s', (self.opts['ipc_write_buffer'])) + with salt.utils.asynchronous.current_ioloop(self.io_loop): stream = IOStream( connection, - io_loop=self.io_loop, - max_write_buffer_size=self.opts['ipc_write_buffer'] - ) - else: - stream = IOStream( - connection, - io_loop=self.io_loop + **kwargs ) self.streams.add(stream) diff --git a/salt/transport/server.py b/salt/transport/server.py index 46c14bdb39..1d67dc98af 100644 --- a/salt/transport/server.py +++ b/salt/transport/server.py @@ -55,7 +55,7 @@ class ReqServerChannel(object): ''' Do anything you need post-fork. This should handle all incoming payloads and call payload_handler. You will also be passed io_loop, for all of your - async needs + asynchronous needs ''' pass diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 334ed0a3ad..6d39ed877b 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -19,7 +19,7 @@ import errno # Import Salt Libs import salt.crypt -import salt.utils.async +import salt.utils.asynchronous import salt.utils.event import salt.utils.platform import salt.utils.process @@ -475,7 +475,7 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran 'tok': self.tok, 'data': data, 'tag': tag} - req_channel = salt.utils.async.SyncWrapper( + req_channel = salt.utils.asynchronous.SyncWrapper( AsyncTCPReqChannel, (self.opts,) ) try: @@ -597,23 +597,22 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra self.payload_handler = payload_handler self.io_loop = io_loop self.serial = salt.payload.Serial(self.opts) - if USE_LOAD_BALANCER: - self.req_server = LoadBalancerWorker(self.socket_queue, - self.handle_message, - io_loop=self.io_loop, - ssl_options=self.opts.get('ssl')) - else: - if salt.utils.platform.is_windows(): - self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - _set_tcp_keepalive(self._socket, self.opts) - self._socket.setblocking(0) - self._socket.bind((self.opts['interface'], int(self.opts['ret_port']))) - self.req_server = SaltMessageServer(self.handle_message, - io_loop=self.io_loop, - ssl_options=self.opts.get('ssl')) - self.req_server.add_socket(self._socket) - self._socket.listen(self.backlog) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + if USE_LOAD_BALANCER: + self.req_server = LoadBalancerWorker(self.socket_queue, + self.handle_message, + ssl_options=self.opts.get('ssl')) + else: + if salt.utils.platform.is_windows(): + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + _set_tcp_keepalive(self._socket, self.opts) + self._socket.setblocking(0) + self._socket.bind((self.opts['interface'], int(self.opts['ret_port']))) + self.req_server = SaltMessageServer(self.handle_message, + ssl_options=self.opts.get('ssl')) + self.req_server.add_socket(self._socket) + self._socket.listen(self.backlog) salt.transport.mixins.auth.AESReqServerMixin.post_fork(self, payload_handler, io_loop) @tornado.gen.coroutine @@ -698,6 +697,7 @@ class SaltMessageServer(tornado.tcpserver.TCPServer, object): ''' def __init__(self, message_handler, *args, **kwargs): super(SaltMessageServer, self).__init__(*args, **kwargs) + self.io_loop = tornado.ioloop.IOLoop.current() self.clients = [] self.message_handler = message_handler @@ -774,10 +774,9 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient): ''' Override _create_stream() in TCPClient to enable keep alive support. ''' - def __init__(self, opts, resolver=None, io_loop=None): + def __init__(self, opts, resolver=None): self.opts = opts - super(TCPClientKeepAlive, self).__init__( - resolver=resolver, io_loop=io_loop) + super(TCPClientKeepAlive, self).__init__(resolver=resolver) def _create_stream(self, max_buffer_size, af, addr, **kwargs): # pylint: disable=unused-argument ''' @@ -793,9 +792,10 @@ class TCPClientKeepAlive(tornado.tcpclient.TCPClient): _set_tcp_keepalive(sock, self.opts) stream = tornado.iostream.IOStream( sock, - io_loop=self.io_loop, max_buffer_size=max_buffer_size) - return stream.connect(addr) + if tornado.version_info < (5,): + return stream.connect(addr) + return stream, stream.connect(addr) class SaltMessageClientPool(salt.transport.MessageClientPool): @@ -855,8 +855,8 @@ class SaltMessageClient(object): self.io_loop = io_loop or tornado.ioloop.IOLoop.current() - self._tcp_client = TCPClientKeepAlive( - opts, io_loop=self.io_loop, resolver=resolver) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + self._tcp_client = TCPClientKeepAlive(opts, resolver=resolver) self._mid = 1 self._max_messages = int((1 << 31) - 2) # number of IDs before we wrap @@ -879,33 +879,33 @@ class SaltMessageClient(object): return self._closing = True if hasattr(self, '_stream') and not self._stream.closed(): - self._stream.close() - if self._read_until_future is not None: - # This will prevent this message from showing up: - # '[ERROR ] Future exception was never retrieved: - # StreamClosedError' - # This happens because the logic is always waiting to read - # the next message and the associated read future is marked - # 'StreamClosedError' when the stream is closed. - self._read_until_future.exc_info() - if (not self._stream_return_future.done() and - self.io_loop != tornado.ioloop.IOLoop.current( - instance=False)): - # If _stream_return() hasn't completed, it means the IO - # Loop is stopped (such as when using - # 'salt.utils.async.SyncWrapper'). Ensure that - # _stream_return() completes by restarting the IO Loop. - # This will prevent potential errors on shutdown. - orig_loop = tornado.ioloop.IOLoop.current() - self.io_loop.make_current() - try: + # If _stream_return() hasn't completed, it means the IO + # Loop is stopped (such as when using + # 'salt.utils.asynchronous.SyncWrapper'). Ensure that + # _stream_return() completes by restarting the IO Loop. + # This will prevent potential errors on shutdown. + try: + orig_loop = tornado.ioloop.IOLoop.current() + self.io_loop.make_current() + self._stream.close() + if self._read_until_future is not None: + # This will prevent this message from showing up: + # '[ERROR ] Future exception was never retrieved: + # StreamClosedError' + # This happens because the logic is always waiting to read + # the next message and the associated read future is marked + # 'StreamClosedError' when the stream is closed. + self._read_until_future.exception() + if (not self._stream_return_future.done() and + self.io_loop != tornado.ioloop.IOLoop.current( + instance=False)): self.io_loop.add_future( self._stream_return_future, lambda future: self.io_loop.stop() ) self.io_loop.start() - finally: - orig_loop.make_current() + finally: + orig_loop.make_current() self._tcp_client.close() # Clear callback references to allow the object that they belong to # to be deleted. @@ -945,21 +945,21 @@ class SaltMessageClient(object): if self._closing: break try: - if (self.source_ip or self.source_port) and tornado.version_info >= (4, 5): - ### source_ip and source_port are supported only in Tornado >= 4.5 - # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html - # Otherwise will just ignore these args - self._stream = yield self._tcp_client.connect(self.host, - self.port, - ssl_options=self.opts.get('ssl'), - source_ip=self.source_ip, - source_port=self.source_port) - else: - if self.source_ip or self.source_port: + kwargs = {} + if self.source_ip or self.source_port: + if tornado.version_info >= (4, 5): + ### source_ip and source_port are supported only in Tornado >= 4.5 + # See http://www.tornadoweb.org/en/stable/releases/v4.5.0.html + # Otherwise will just ignore these args + kwargs = {'source_ip': self.source_ip, + 'source_port': self.source_port} + else: log.warning('If you need a certain source IP/port, consider upgrading Tornado >= 4.5') + with salt.utils.asynchronous.current_ioloop(self.io_loop): self._stream = yield self._tcp_client.connect(self.host, self.port, - ssl_options=self.opts.get('ssl')) + ssl_options=self.opts.get('ssl'), + **kwargs) self._connecting_future.set_result(True) break except Exception as e: @@ -1162,7 +1162,8 @@ class PubServer(tornado.tcpserver.TCPServer, object): TCP publisher ''' def __init__(self, opts, io_loop=None): - super(PubServer, self).__init__(io_loop=io_loop, ssl_options=opts.get('ssl')) + super(PubServer, self).__init__(ssl_options=opts.get('ssl')) + self.io_loop = io_loop self.opts = opts self._closing = False self.clients = set() @@ -1429,9 +1430,9 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel): pull_uri = int(self.opts.get('tcp_master_publish_pull', 4514)) else: pull_uri = os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') - # TODO: switch to the actual async interface + # TODO: switch to the actual asynchronous interface #pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop) - pub_sock = salt.utils.async.SyncWrapper( + pub_sock = salt.utils.asynchronous.SyncWrapper( salt.transport.ipc.IPCMessageClient, (pull_uri,) ) diff --git a/salt/utils/async.py b/salt/utils/asynchronous.py similarity index 81% rename from salt/utils/async.py rename to salt/utils/asynchronous.py index 55d21d0ccc..16a7088360 100644 --- a/salt/utils/async.py +++ b/salt/utils/asynchronous.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- ''' -Helpers/utils for working with tornado async stuff +Helpers/utils for working with tornado asynchronous stuff ''' from __future__ import absolute_import, print_function, unicode_literals @@ -30,9 +30,9 @@ class SyncWrapper(object): This is uses as a simple wrapper, for example: - async = AsyncClass() + asynchronous = AsyncClass() # this method would reguarly return a future - future = async.async_method() + future = asynchronous.async_method() sync = SyncWrapper(async_factory_method, (arg1, arg2), {'kwarg1': 'val'}) # the sync wrapper will automatically wait on the future @@ -46,15 +46,15 @@ class SyncWrapper(object): kwargs['io_loop'] = self.io_loop with current_ioloop(self.io_loop): - self.async = method(*args, **kwargs) + self.asynchronous = method(*args, **kwargs) def __getattribute__(self, key): try: return object.__getattribute__(self, key) except AttributeError as ex: - if key == 'async': + if key == 'asynchronous': raise ex - attr = getattr(self.async, key) + attr = getattr(self.asynchronous, key) if hasattr(attr, '__call__'): def wrap(*args, **kwargs): # Overload the ioloop for the func call-- since it might call .current() @@ -75,15 +75,15 @@ class SyncWrapper(object): def __del__(self): ''' - On deletion of the async wrapper, make sure to clean up the async stuff + On deletion of the asynchronous wrapper, make sure to clean up the asynchronous stuff ''' - if hasattr(self, 'async'): - if hasattr(self.async, 'close'): + if hasattr(self, 'asynchronous'): + if hasattr(self.asynchronous, 'close'): # Certain things such as streams should be closed before # their associated io_loop is closed to allow for proper # cleanup. - self.async.close() - del self.async + self.asynchronous.close() + del self.asynchronous self.io_loop.close() del self.io_loop elif hasattr(self, 'io_loop'): diff --git a/salt/utils/event.py b/salt/utils/event.py index cb9f5018e3..19fbfc7a78 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -72,7 +72,7 @@ import tornado.iostream # Import salt libs import salt.config import salt.payload -import salt.utils.async +import salt.utils.asynchronous import salt.utils.cache import salt.utils.dicttrim import salt.utils.platform @@ -221,7 +221,7 @@ class SaltEvent(object): :param Bool keep_loop: Pass a boolean to determine if we want to keep the io loop or destroy it when the event handle is destroyed. This is useful when using event - loops from within third party async code + loops from within third party asynchronous code ''' self.serial = salt.payload.Serial({'serial': 'msgpack'}) self.keep_loop = keep_loop @@ -357,7 +357,7 @@ class SaltEvent(object): return True if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.subscriber is None: self.subscriber = salt.transport.ipc.IPCMessageSubscriber( self.puburi, @@ -376,7 +376,7 @@ class SaltEvent(object): io_loop=self.io_loop ) - # For the async case, the connect will be defered to when + # For the asynchronous case, the connect will be defered to when # set_event_handler() is invoked. self.cpub = True return self.cpub @@ -402,7 +402,7 @@ class SaltEvent(object): return True if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.pusher is None: self.pusher = salt.transport.ipc.IPCMessageClient( self.pulluri, @@ -420,7 +420,7 @@ class SaltEvent(object): self.pulluri, io_loop=self.io_loop ) - # For the async case, the connect will be deferred to when + # For the asynchronous case, the connect will be deferred to when # fire_event() is invoked. self.cpush = True return self.cpush @@ -625,7 +625,7 @@ class SaltEvent(object): ret = self._check_pending(tag, match_func) if ret is None: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if auto_reconnect: raise_errors = self.raise_errors self.raise_errors = True @@ -736,7 +736,7 @@ class SaltEvent(object): serialized_data]) msg = salt.utils.stringutils.to_bytes(event, 'utf-8') if self._run_io_loop_sync: - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): try: self.io_loop.run_sync(lambda: self.pusher.send(msg)) except Exception as ex: @@ -1079,7 +1079,7 @@ class EventPublisher(salt.utils.process.SignalHandlingMultiprocessingProcess): ''' salt.utils.process.appendproctitle(self.__class__.__name__) self.io_loop = tornado.ioloop.IOLoop() - with salt.utils.async.current_ioloop(self.io_loop): + with salt.utils.asynchronous.current_ioloop(self.io_loop): if self.opts['ipc_mode'] == 'tcp': epub_uri = int(self.opts['tcp_master_pub_port']) epull_uri = int(self.opts['tcp_master_pull_port']) diff --git a/salt/utils/process.py b/salt/utils/process.py index 6dafaf0441..82719b9034 100644 --- a/salt/utils/process.py +++ b/salt/utils/process.py @@ -472,7 +472,7 @@ class ProcessManager(object): del self._process_map[pid] @gen.coroutine - def run(self, async=False): + def run(self, asynchronous=False): ''' Load and start all available api modules ''' @@ -495,7 +495,7 @@ class ProcessManager(object): # The event-based subprocesses management code was removed from here # because os.wait() conflicts with the subprocesses management logic # implemented in `multiprocessing` package. See #35480 for details. - if async: + if asynchronous: yield gen.sleep(10) else: time.sleep(10) diff --git a/salt/utils/thin.py b/salt/utils/thin.py index b99e407583..9a74b8d7d6 100644 --- a/salt/utils/thin.py +++ b/salt/utils/thin.py @@ -701,7 +701,7 @@ def gen_min(cachedir, extra_mods='', overwrite=False, so_mods='', 'salt/utils/openstack', 'salt/utils/openstack/__init__.py', 'salt/utils/openstack/swift.py', - 'salt/utils/async.py', + 'salt/utils/asynchronous.py', 'salt/utils/process.py', 'salt/utils/jinja.py', 'salt/utils/rsax931.py', diff --git a/salt/wheel/__init__.py b/salt/wheel/__init__.py index abfd776342..65092ef974 100644 --- a/salt/wheel/__init__.py +++ b/salt/wheel/__init__.py @@ -57,7 +57,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin, return self.low(fun, kwargs, print_event=kwargs.get('print_event', True), full_return=kwargs.get('full_return', False)) # TODO: Inconsistent with runner client-- the runner client's master_call gives - # an async return, unlike this + # an asynchronous return, unlike this def master_call(self, **kwargs): ''' Execute a wheel function through the master network interface (eauth). @@ -120,7 +120,7 @@ class WheelClient(salt.client.mixins.SyncClientMixin, {'jid': '20131219224744416681', 'tag': 'salt/wheel/20131219224744416681'} ''' fun = low.pop('fun') - return self.async(fun, low) + return self.asynchronous(fun, low) def cmd(self, fun, arg=None, pub_data=None, kwarg=None, print_event=True, full_return=False): ''' diff --git a/tests/conftest.py b/tests/conftest.py index 7591cd46df..d2add41d41 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -55,13 +55,17 @@ import salt.log.setup from salt.utils.odict import OrderedDict # Define the pytest plugins we rely on -pytest_plugins = ['pytest_catchlog', 'tempdir', 'helpers_namespace'] # pylint: disable=invalid-name +pytest_plugins = ['tempdir', 'helpers_namespace'] # pylint: disable=invalid-name # Define where not to collect tests from collect_ignore = ['setup.py'] log = logging.getLogger('salt.testsuite') +# Reset logging root handlers +for handler in logging.root.handlers: + logging.root.removeHandler(handler) + def pytest_tempdir_basename(): ''' @@ -197,25 +201,6 @@ def pytest_configure(config): called after command line options have been parsed and all plugins and initial conftest files been loaded. ''' - # Configure the console logger based on the catch_log settings. - # Most importantly, shutdown Salt's null, store and temporary logging queue handlers - catch_log = config.pluginmanager.getplugin('_catch_log') - cli_logging_handler = catch_log.log_cli_handler - # Add the pytest_catchlog CLI log handler to the logging root - logging.root.addHandler(cli_logging_handler) - cli_level = cli_logging_handler.level - cli_level = config._catchlog_log_cli_level - cli_format = cli_logging_handler.formatter._fmt - cli_date_format = cli_logging_handler.formatter.datefmt - # Setup the console logger which shuts down the null and the temporary queue handlers - salt.log.setup_console_logger( - log_level=salt.log.setup.LOG_VALUES_TO_LEVELS.get(cli_level, 'error'), - log_format=cli_format, - date_format=cli_date_format - ) - # Disable the store logging queue handler - salt.log.setup.setup_extended_logging({'extension_modules': ''}) - config.addinivalue_line('norecursedirs', os.path.join(CODE_DIR, 'templates')) config.addinivalue_line( 'markers', diff --git a/tests/integration/files/engines/runtests_engine.py b/tests/integration/files/engines/runtests_engine.py index 9247a3dfb4..426ab2a5b2 100644 --- a/tests/integration/files/engines/runtests_engine.py +++ b/tests/integration/files/engines/runtests_engine.py @@ -21,6 +21,7 @@ import logging # Import salt libs import salt.utils.event +import salt.utils.asynchronous # Import 3rd-party libs from tornado import gen @@ -69,11 +70,11 @@ class PyTestEngine(object): self.sock.bind(('localhost', port)) # become a server socket self.sock.listen(5) - netutil.add_accept_handler( - self.sock, - self.handle_connection, - io_loop=self.io_loop, - ) + with salt.utils.asynchronous.current_ioloop(self.io_loop): + netutil.add_accept_handler( + self.sock, + self.handle_connection, + ) def handle_connection(self, connection, address): log.warning('Accepted connection from %s. Role: %s', address, self.opts['__role']) diff --git a/tests/integration/netapi/rest_tornado/test_app.py b/tests/integration/netapi/rest_tornado/test_app.py index 96bb42f0f9..b6a572a0b5 100644 --- a/tests/integration/netapi/rest_tornado/test_app.py +++ b/tests/integration/netapi/rest_tornado/test_app.py @@ -372,7 +372,7 @@ class TestMinionSaltAPIHandler(_SaltnadoIntegrationTestCase): def test_post_with_incorrect_client(self): ''' - The /minions endpoint is async only, so if you try something else + The /minions endpoint is asynchronous only, so if you try something else make sure you get an error ''' # get a token for this test diff --git a/tests/support/case.py b/tests/support/case.py index 8dbf733972..bf568a26e3 100644 --- a/tests/support/case.py +++ b/tests/support/case.py @@ -13,7 +13,7 @@ # pylint: disable=repr-flag-used-in-string # Import python libs -from __future__ import absolute_import +from __future__ import absolute_import, unicode_literals import os import re import sys @@ -139,15 +139,28 @@ class ShellTestCase(TestCase, AdaptedConfigurationTestCaseMixin): ) return self.run_script('salt-ssh', arg_str, with_retcode=with_retcode, catch_stderr=catch_stderr, raw=True) - def run_run(self, arg_str, with_retcode=False, catch_stderr=False, async=False, timeout=60, config_dir=None): + def run_run(self, + arg_str, + with_retcode=False, + catch_stderr=False, + asynchronous=False, + timeout=60, + config_dir=None, + **kwargs): ''' Execute salt-run ''' - arg_str = '-c {0}{async_flag} -t {timeout} {1}'.format(config_dir or self.get_config_dir(), - arg_str, - timeout=timeout, - async_flag=' --async' if async else '') - return self.run_script('salt-run', arg_str, with_retcode=with_retcode, catch_stderr=catch_stderr) + asynchronous = kwargs.get('async', asynchronous) + arg_str = '-c {0}{async_flag} -t {timeout} {1}'.format( + config_dir or self.get_config_dir(), + arg_str, + timeout=timeout, + async_flag=' --async' if asynchronous else '') + return self.run_script('salt-run', + arg_str, + with_retcode=with_retcode, + catch_stderr=catch_stderr, + timeout=timeout) def run_run_plus(self, fun, *arg, **kwargs): ''' diff --git a/tests/unit/test_minion.py b/tests/unit/test_minion.py index 8ccd214012..01046d3bc7 100644 --- a/tests/unit/test_minion.py +++ b/tests/unit/test_minion.py @@ -11,6 +11,7 @@ import os # Import Salt Testing libs from tests.support.unit import TestCase, skipIf from tests.support.mock import NO_MOCK, NO_MOCK_REASON, patch, MagicMock +from tests.support.mixins import AdaptedConfigurationTestCaseMixin from tests.support.helpers import skip_if_not_root # Import salt libs import salt.minion @@ -24,7 +25,7 @@ __opts__ = {} @skipIf(NO_MOCK, NO_MOCK_REASON) -class MinionTestCase(TestCase): +class MinionTestCase(TestCase, AdaptedConfigurationTestCaseMixin): def test_invalid_master_address(self): with patch.dict(__opts__, {'ipv6': False, 'master': float('127.0'), 'master_port': '4555', 'retry_dns': False}): self.assertRaises(SaltSystemExit, salt.minion.resolve_dns, __opts__) @@ -187,7 +188,7 @@ class MinionTestCase(TestCase): patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \ patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \ patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)): - mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS) + mock_opts = self.get_config('minion', from_scratch=True) mock_opts['beacons_before_connect'] = True minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop()) try: @@ -211,7 +212,7 @@ class MinionTestCase(TestCase): patch('salt.minion.Minion.sync_connect_master', MagicMock(side_effect=RuntimeError('stop execution'))), \ patch('salt.utils.process.SignalHandlingMultiprocessingProcess.start', MagicMock(return_value=True)), \ patch('salt.utils.process.SignalHandlingMultiprocessingProcess.join', MagicMock(return_value=True)): - mock_opts = copy.copy(salt.config.DEFAULT_MINION_OPTS) + mock_opts = self.get_config('minion', from_scratch=True) mock_opts['scheduler_before_connect'] = True minion = salt.minion.Minion(mock_opts, io_loop=tornado.ioloop.IOLoop()) try: diff --git a/tests/unit/utils/test_async.py b/tests/unit/utils/test_async.py index c93538f0dd..694a7aebfe 100644 --- a/tests/unit/utils/test_async.py +++ b/tests/unit/utils/test_async.py @@ -8,7 +8,7 @@ import tornado.testing import tornado.gen from tornado.testing import AsyncTestCase -import salt.utils.async as async +import salt.utils.asynchronous as asynchronous class HelperA(object): @@ -24,7 +24,7 @@ class HelperA(object): class HelperB(object): def __init__(self, a=None, io_loop=None): if a is None: - a = async.SyncWrapper(HelperA) + a = asynchronous.SyncWrapper(HelperA) self.a = a @tornado.gen.coroutine @@ -38,7 +38,7 @@ class TestSyncWrapper(AsyncTestCase): @tornado.testing.gen_test def test_helpers(self): ''' - Test that the helper classes do what we expect within a regular async env + Test that the helper classes do what we expect within a regular asynchronous env ''' ha = HelperA() ret = yield ha.sleep() @@ -50,29 +50,29 @@ class TestSyncWrapper(AsyncTestCase): def test_basic_wrap(self): ''' - Test that we can wrap an async caller. + Test that we can wrap an asynchronous caller. ''' - sync = async.SyncWrapper(HelperA) + sync = asynchronous.SyncWrapper(HelperA) ret = sync.sleep() self.assertTrue(ret) def test_double(self): ''' - Test when the async wrapper object itself creates a wrap of another thing + Test when the asynchronous wrapper object itself creates a wrap of another thing This works fine since the second wrap is based on the first's IOLoop so we don't have to worry about complex start/stop mechanics ''' - sync = async.SyncWrapper(HelperB) + sync = asynchronous.SyncWrapper(HelperB) ret = sync.sleep() self.assertFalse(ret) def test_double_sameloop(self): ''' - Test async wrappers initiated from the same IOLoop, to ensure that + Test asynchronous wrappers initiated from the same IOLoop, to ensure that we don't wire up both to the same IOLoop (since it causes MANY problems). ''' - a = async.SyncWrapper(HelperA) - sync = async.SyncWrapper(HelperB, (a,)) + a = asynchronous.SyncWrapper(HelperA) + sync = asynchronous.SyncWrapper(HelperB, (a,)) ret = sync.sleep() self.assertFalse(ret) -- 2.17.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor