Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
Project not found: home:rhabacker:branches:KDE:Frameworks
systemsmanagement:Uyuni:Master:Ubuntu2404-Uyuni-Client-Tools
venv-salt-minion
make-salt-master-self-recoverable-on-killing-ev...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File make-salt-master-self-recoverable-on-killing-eventpu.patch of Package venv-salt-minion
From 794b5d1aa7b8e880e9a21940183d241c6cbde9c9 Mon Sep 17 00:00:00 2001 From: Victor Zhestkov <vzhestkov@suse.com> Date: Wed, 15 May 2024 09:42:23 +0200 Subject: [PATCH] Make salt-master self recoverable on killing EventPublisher * Implement timeout and tries to transport.ipc.IPCClient.send * Make timeout and tries configurable for fire_event * Add test of timeout and tries * Prevent exceptions from tornado Future on closing the IPC connection --- salt/transport/ipc.py | 73 +++++++++++++++++--- salt/utils/event.py | 21 +++++- tests/pytests/unit/utils/event/test_event.py | 43 ++++++++++++ 3 files changed, 125 insertions(+), 12 deletions(-) diff --git a/salt/transport/ipc.py b/salt/transport/ipc.py index cee100b086..6631781c5c 100644 --- a/salt/transport/ipc.py +++ b/salt/transport/ipc.py @@ -2,7 +2,6 @@ IPC transport classes """ - import errno import logging import socket @@ -340,7 +339,8 @@ class IPCClient: try: log.trace("IPCClient: Connecting to socket: %s", self.socket_path) yield self.stream.connect(sock_addr) - self._connecting_future.set_result(True) + if self._connecting_future is not None: + self._connecting_future.set_result(True) break except Exception as e: # pylint: disable=broad-except if self.stream.closed(): @@ -350,7 +350,8 @@ class IPCClient: if self.stream is not None: self.stream.close() self.stream = None - self._connecting_future.set_exception(e) + if self._connecting_future is not None: + self._connecting_future.set_exception(e) break yield salt.ext.tornado.gen.sleep(1) @@ -365,7 +366,13 @@ class IPCClient: return self._closing = True - self._connecting_future = None + if self._connecting_future is not None: + try: + self._connecting_future.set_result(True) + self._connecting_future.exception() # pylint: disable=E0203 + except Exception as e: # pylint: disable=broad-except + log.warning("Unhandled connecting exception: %s", e, exc_info=True) + self._connecting_future = None log.debug("Closing %s instance", self.__class__.__name__) @@ -435,8 +442,6 @@ class IPCMessageClient(IPCClient): "close", ] - # FIXME timeout unimplemented - # FIXME tries unimplemented @salt.ext.tornado.gen.coroutine def send(self, msg, timeout=None, tries=None): """ @@ -445,12 +450,60 @@ class IPCMessageClient(IPCClient): If the socket is not currently connected, a connection will be established. :param dict msg: The message to be sent - :param int timeout: Timeout when sending message (Currently unimplemented) + :param int timeout: Timeout when sending message + :param int tries: Maximum numer of tries to send message """ - if not self.connected(): - yield self.connect() + if tries is None or tries < 1: + tries = 1 + due_time = None + if timeout is not None: + due_time = time.time() + timeout + _try = 1 + exc_count = 0 pack = salt.transport.frame.frame_msg_ipc(msg, raw_body=True) - yield self.stream.write(pack) + while _try <= tries: + if not self.connected(): + self.close() + self.stream = None + self._closing = False + try: + yield self.connect( + timeout=( + None if due_time is None else max(due_time - time.time(), 1) + ) + ) + except StreamClosedError: + log.warning( + "IPCMessageClient: Unable to reconnect IPC stream on sending message with ID: 0x%016x%s", + id(msg), + f", retry {_try} of {tries}" if tries > 1 else "", + ) + exc_count += 1 + if self.connected(): + try: + yield self.stream.write(pack) + return + except StreamClosedError: + if self._closing: + break + log.warning( + "IPCMessageClient: Stream was closed on sending message with ID: 0x%016x", + id(msg), + ) + exc_count += 1 + if exc_count == 1: + # Give one more chance in case if stream was detected as closed + # on the first write attempt + continue + cur_time = time.time() + _try += 1 + if _try > tries or (due_time is not None and cur_time > due_time): + return + yield salt.ext.tornado.gen.sleep( + 1 + if due_time is None + else (due_time - cur_time) / max(tries - _try + 1, 1) + ) class IPCMessageServer(IPCServer): diff --git a/salt/utils/event.py b/salt/utils/event.py index ef048335ae..36b530d1af 100644 --- a/salt/utils/event.py +++ b/salt/utils/event.py @@ -270,6 +270,10 @@ class SaltEvent: # and don't read out events from the buffer on an on-going basis, # the buffer will grow resulting in big memory usage. self.connect_pub() + self.pusher_send_timeout = self.opts.get( + "pusher_send_timeout", self.opts.get("timeout") + ) + self.pusher_send_tries = self.opts.get("pusher_send_tries", 3) @classmethod def __load_cache_regex(cls): @@ -839,10 +843,18 @@ class SaltEvent: ] ) msg = salt.utils.stringutils.to_bytes(event, "utf-8") + if timeout is None: + timeout_s = self.pusher_send_timeout + else: + timeout_s = float(timeout) / 1000 if self._run_io_loop_sync: with salt.utils.asynchronous.current_ioloop(self.io_loop): try: - self.pusher.send(msg) + self.pusher.send( + msg, + timeout=timeout_s, + tries=self.pusher_send_tries, + ) except Exception as exc: # pylint: disable=broad-except log.debug( "Publisher send failed with exception: %s", @@ -851,7 +863,12 @@ class SaltEvent: ) raise else: - self.io_loop.spawn_callback(self.pusher.send, msg) + self.io_loop.spawn_callback( + self.pusher.send, + msg, + timeout=timeout_s, + tries=self.pusher_send_tries, + ) return True def fire_master(self, data, tag, timeout=1000): diff --git a/tests/pytests/unit/utils/event/test_event.py b/tests/pytests/unit/utils/event/test_event.py index 3eadfaf6ba..fa9e420a93 100644 --- a/tests/pytests/unit/utils/event/test_event.py +++ b/tests/pytests/unit/utils/event/test_event.py @@ -447,3 +447,46 @@ def test_event_fire_ret_load(): ) assert mock_log_error.mock_calls[0].args[1] == "minion_id.example.org" assert mock_log_error.mock_calls[0].args[2] == "".join(test_traceback) + + +@pytest.mark.slow_test +def test_event_single_timeout_tries(sock_dir): + """Test an event is sent with timout and tries""" + + write_calls_count = 0 + real_stream_write = None + + @salt.ext.tornado.gen.coroutine + def write_mock(pack): + nonlocal write_calls_count + nonlocal real_stream_write + write_calls_count += 1 + if write_calls_count > 3: + yield real_stream_write(pack) + else: + raise salt.ext.tornado.iostream.StreamClosedError() + + with eventpublisher_process(str(sock_dir)), salt.utils.event.MasterEvent( + str(sock_dir), listen=True + ) as me: + me.fire_event({"data": "foo1"}, "evt1") + evt1 = me.get_event(tag="evt1") + _assert_got_event(evt1, {"data": "foo1"}) + real_stream_write = me.pusher.stream.write + with patch.object( + me.pusher, + "connected", + side_effect=[True, True, False, False, True, True], + ), patch.object( + me.pusher, + "connect", + side_effect=salt.ext.tornado.iostream.StreamClosedError, + ), patch.object( + me.pusher.stream, + "write", + write_mock, + ): + me.fire_event({"data": "bar2"}, "evt2", timeout=5000) + evt2 = me.get_event(tag="evt2") + _assert_got_event(evt2, {"data": "bar2"}) + assert write_calls_count == 4 -- 2.45.0
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor