Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Alexander_Naumov:SLE-12:Update
salt.24749
fix-multiple-security-issues-bsc-1197417.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File fix-multiple-security-issues-bsc-1197417.patch of Package salt.24749
From c56be5236f29416ecfbd746ee8d25b886e7a8893 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Su=C3=A1rez=20Hern=C3=A1ndez?= <psuarezhernandez@suse.com> Date: Wed, 23 Mar 2022 16:16:49 +0000 Subject: [PATCH] Fix multiple security issues (bsc#1197417) * Sign authentication replies to prevent MiTM (CVE-2020-22935) * Sign pillar data to prevent MiTM attacks. (CVE-2022-22934) * Prevent job and fileserver replays (CVE-2022-22936) * Fixed targeting bug, especially visible when using syndic and user auth. (CVE-2022-22941) --- requirements/static/linux.in | 5 +- salt/crypt.py | 308 +++--- salt/master.py | 65 +- salt/pillar/__init__.py | 5 +- salt/transport/mixins/auth.py | 323 +++--- salt/transport/tcp.py | 173 ++- salt/transport/zeromq.py | 157 ++- salt/utils/minions.py | 25 +- tests/pytests/unit/test_crypt.py | 149 +++ tests/pytests/unit/transport/test_zeromq.py | 1042 +++++++++++++++++++ tests/unit/transport/mixins.py | 12 +- tests/unit/transport/test_tcp.py | 91 +- tests/unit/transport/test_zeromq.py | 124 ++- 13 files changed, 2008 insertions(+), 471 deletions(-) create mode 100644 tests/pytests/unit/transport/test_zeromq.py diff --git a/requirements/static/linux.in b/requirements/static/linux.in index c20b006e2b..bed8943683 100644 --- a/requirements/static/linux.in +++ b/requirements/static/linux.in @@ -1,5 +1,6 @@ apache-libcloud==2.0.0 boto3 +azure==4.0.0; sys_platform != "win32" boto>=2.46.0 certifi cffi @@ -20,8 +21,8 @@ kubernetes<4.0 libnacl==1.6.0 mock>=3.0.5 more-itertools==5.0.0 -moto -paramiko>=2.1.6 +moto; python_version >= '3.6' +paramiko>=2.1.6; python_version >= '3.6' psutil pygit2 pyinotify diff --git a/salt/crypt.py b/salt/crypt.py index be426c6ab3..d025d807be 100644 --- a/salt/crypt.py +++ b/salt/crypt.py @@ -20,6 +20,7 @@ import logging import stat import traceback import binascii +import uuid import weakref import getpass import salt.ext.tornado.gen @@ -255,7 +256,11 @@ def verify_signature(pubkey_path, message, signature): md = EVP.MessageDigest('sha1') md.update(salt.utils.stringutils.to_bytes(message)) digest = md.final() - return pubkey.verify(digest, signature) + try: + return pubkey.verify(digest, signature) + except RSA.RSAError as exc: + log.debug("Signature verification failed: %s", exc.args[0]) + return False else: verifier = PKCS1_v1_5.new(pubkey) return verifier.verify(SHA.new(salt.utils.stringutils.to_bytes(message)), signature) @@ -655,10 +660,20 @@ class AsyncAuth(object): self._authenticate_future.set_exception(error) else: key = self.__key(self.opts) - AsyncAuth.creds_map[key] = creds - self._creds = creds - self._crypticle = Crypticle(self.opts, creds['aes']) - self._authenticate_future.set_result(True) # mark the sign-in as complete + if key not in AsyncAuth.creds_map: + log.debug("%s Got new master aes key.", self) + AsyncAuth.creds_map[key] = creds + self._creds = creds + self._crypticle = Crypticle(self.opts, creds["aes"]) + elif self._creds["aes"] != creds["aes"]: + log.debug("%s The master's aes key has changed.", self) + AsyncAuth.creds_map[key] = creds + self._creds = creds + self._crypticle = Crypticle(self.opts, creds["aes"]) + + self._authenticate_future.set_result( + True + ) # mark the sign-in as complete # Notify the bus about creds change if self.opts.get('auth_events') is True: with salt.utils.event.get_event(self.opts.get('__role'), opts=self.opts, listen=False) as event: @@ -681,7 +696,6 @@ class AsyncAuth(object): with the publication port and the shared AES key. ''' - auth = {} auth_timeout = self.opts.get('auth_timeout', None) if auth_timeout is not None: @@ -693,10 +707,6 @@ class AsyncAuth(object): if auth_tries is not None: tries = auth_tries - m_pub_fn = os.path.join(self.opts['pki_dir'], self.mpub) - - auth['master_uri'] = self.opts['master_uri'] - close_channel = False if not channel: close_channel = True @@ -722,55 +732,88 @@ class AsyncAuth(object): finally: if close_channel: channel.close() + ret = self.handle_signin_response(sign_in_payload, payload) + raise salt.ext.tornado.gen.Return(ret) - if not isinstance(payload, dict): - log.error('Sign-in attempt failed: %s', payload) - raise salt.ext.tornado.gen.Return(False) - if 'load' in payload: - if 'ret' in payload['load']: - if not payload['load']['ret']: - if self.opts['rejected_retry']: - log.error( - 'The Salt Master has rejected this minion\'s public ' - 'key.\nTo repair this issue, delete the public key ' - 'for this minion on the Salt Master.\nThe Salt ' - 'Minion will attempt to to re-authenicate.' - ) - raise salt.ext.tornado.gen.Return('retry') - else: - log.critical( - 'The Salt Master has rejected this minion\'s public ' - 'key!\nTo repair this issue, delete the public key ' - 'for this minion on the Salt Master and restart this ' - 'minion.\nOr restart the Salt Master in open mode to ' - 'clean out the keys. The Salt Minion will now exit.' - ) - # Add a random sleep here for systems that are using a - # a service manager to immediately restart the service - # to avoid overloading the system - time.sleep(random.randint(10, 20)) - sys.exit(salt.defaults.exitcodes.EX_NOPERM) - # has the master returned that its maxed out with minions? - elif payload['load']['ret'] == 'full': - raise salt.ext.tornado.gen.Return('full') - else: + def handle_signin_response(self, sign_in_payload, payload): + auth = {} + m_pub_fn = os.path.join(self.opts["pki_dir"], self.mpub) + auth["master_uri"] = self.opts["master_uri"] + if not isinstance(payload, dict) or "load" not in payload: + log.error("Sign-in attempt failed: %s", payload) + return False + + clear_signed_data = payload["load"] + clear_signature = payload["sig"] + payload = self.serial.loads(clear_signed_data) + + if "pub_key" in payload: + auth["aes"] = self.verify_master( + payload, master_pub="token" in sign_in_payload + ) + if not auth["aes"]: + log.critical( + "The Salt Master server's public key did not authenticate!\n" + "The master may need to be updated if it is a version of Salt " + "lower than %s, or\n" + "If you are confident that you are connecting to a valid Salt " + "Master, then remove the master public key and restart the " + "Salt Minion.\nThe master public key can be found " + "at:\n%s", + salt.version.__version__, + m_pub_fn, + ) + raise SaltClientError("Invalid master key") + + master_pubkey_path = os.path.join(self.opts["pki_dir"], self.mpub) + if os.path.exists(master_pubkey_path) and not verify_signature( + master_pubkey_path, clear_signed_data, clear_signature + ): + log.critical("The payload signature did not validate.") + raise SaltClientError("Invalid signature") + + if payload["nonce"] != sign_in_payload["nonce"]: + log.critical("The payload nonce did not validate.") + raise SaltClientError("Invalid nonce") + + if "ret" in payload: + if not payload["ret"]: + if self.opts["rejected_retry"]: log.error( - 'The Salt Master has cached the public key for this ' - 'node, this salt minion will wait for %s seconds ' - 'before attempting to re-authenticate', - self.opts['acceptance_wait_time'] + "The Salt Master has rejected this minion's public " + "key.\nTo repair this issue, delete the public key " + "for this minion on the Salt Master.\nThe Salt " + "Minion will attempt to re-authenicate." ) - raise salt.ext.tornado.gen.Return('retry') - auth['aes'] = self.verify_master(payload, master_pub='token' in sign_in_payload) - if not auth['aes']: - log.critical( - 'The Salt Master server\'s public key did not authenticate!\n' - 'The master may need to be updated if it is a version of Salt ' - 'lower than %s, or\n' - 'If you are confident that you are connecting to a valid Salt ' - 'Master, then remove the master public key and restart the ' - 'Salt Minion.\nThe master public key can be found ' - 'at:\n%s', salt.version.__version__, m_pub_fn + return "retry" + else: + log.critical( + "The Salt Master has rejected this minion's public " + "key!\nTo repair this issue, delete the public key " + "for this minion on the Salt Master and restart this " + "minion.\nOr restart the Salt Master in open mode to " + "clean out the keys. The Salt Minion will now exit." + ) + # Add a random sleep here for systems that are using a + # a service manager to immediately restart the service + # to avoid overloading the system + time.sleep(random.randint(10, 20)) + sys.exit(salt.defaults.exitcodes.EX_NOPERM) + # has the master returned that its maxed out with minions? + elif payload["ret"] == "full": + return "full" + else: + log.error( + "The Salt Master has cached the public key for this " + "node, this salt minion will wait for %s seconds " + "before attempting to re-authenticate", + self.opts["acceptance_wait_time"], + ) + return "retry" + + if self.opts.get("syndic_master", False): # Is syndic + syndic_finger = self.opts.get( + "syndic_finger", self.opts.get("master_finger", False) ) raise SaltClientError('Invalid master key') if self.opts.get('syndic_master', False): # Is syndic @@ -779,11 +822,17 @@ class AsyncAuth(object): if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != syndic_finger: self._finger_fail(syndic_finger, m_pub_fn) else: - if self.opts.get('master_finger', False): - if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != self.opts['master_finger']: - self._finger_fail(self.opts['master_finger'], m_pub_fn) - auth['publish_port'] = payload['publish_port'] - raise salt.ext.tornado.gen.Return(auth) + if self.opts.get("master_finger", False): + if ( + salt.utils.crypt.pem_finger( + m_pub_fn, sum_type=self.opts["hash_type"] + ) + != self.opts["master_finger"] + ): + self._finger_fail(self.opts["master_finger"], m_pub_fn) + + auth["publish_port"] = payload["publish_port"] + return auth def get_keys(self): ''' @@ -827,9 +876,10 @@ class AsyncAuth(object): :rtype: dict ''' payload = {} - payload['cmd'] = '_auth' - payload['id'] = self.opts['id'] - if 'autosign_grains' in self.opts: + payload["cmd"] = "_auth" + payload["id"] = self.opts["id"] + payload["nonce"] = uuid.uuid4().hex + if "autosign_grains" in self.opts: autosign_grains = {} for grain in self.opts['autosign_grains']: autosign_grains[grain] = self.opts['grains'].get(grain, None) @@ -1191,13 +1241,15 @@ class SAuth(AsyncAuth): self.token = Crypticle.generate_key_string() else: self.token = salt.utils.stringutils.to_bytes(Crypticle.generate_key_string()) + self.serial = salt.payload.Serial(self.opts) - self.pub_path = os.path.join(self.opts['pki_dir'], 'minion.pub') - self.rsa_path = os.path.join(self.opts['pki_dir'], 'minion.pem') - if 'syndic_master' in self.opts: - self.mpub = 'syndic_master.pub' - elif 'alert_master' in self.opts: - self.mpub = 'monitor_master.pub' + self.pub_path = os.path.join(self.opts["pki_dir"], "minion.pub") + self.rsa_path = os.path.join(self.opts["pki_dir"], "minion.pem") + self._creds = None + if "syndic_master" in self.opts: + self.mpub = "syndic_master.pub" + elif "alert_master" in self.opts: + self.mpub = "monitor_master.pub" else: self.mpub = 'minion_master.pub' if not os.path.isfile(self.pub_path): @@ -1253,8 +1305,14 @@ class SAuth(AsyncAuth): log.debug('Authentication wait time is %s', acceptance_wait_time) continue break - self._creds = creds - self._crypticle = Crypticle(self.opts, creds['aes']) + if self._creds is None: + log.error("%s Got new master aes key.", self) + self._creds = creds + self._crypticle = Crypticle(self.opts, creds["aes"]) + elif self._creds["aes"] != creds["aes"]: + log.error("%s The master's aes key has changed.", self) + self._creds = creds + self._crypticle = Crypticle(self.opts, creds["aes"]) def sign_in(self, timeout=60, safe=True, tries=1, channel=None): ''' @@ -1309,63 +1367,7 @@ class SAuth(AsyncAuth): if close_channel: channel.close() - if 'load' in payload: - if 'ret' in payload['load']: - if not payload['load']['ret']: - if self.opts['rejected_retry']: - log.error( - 'The Salt Master has rejected this minion\'s public ' - 'key.\nTo repair this issue, delete the public key ' - 'for this minion on the Salt Master.\nThe Salt ' - 'Minion will attempt to to re-authenicate.' - ) - return 'retry' - else: - log.critical( - 'The Salt Master has rejected this minion\'s public ' - 'key!\nTo repair this issue, delete the public key ' - 'for this minion on the Salt Master and restart this ' - 'minion.\nOr restart the Salt Master in open mode to ' - 'clean out the keys. The Salt Minion will now exit.' - ) - sys.exit(salt.defaults.exitcodes.EX_NOPERM) - # has the master returned that its maxed out with minions? - elif payload['load']['ret'] == 'full': - return 'full' - else: - log.error( - 'The Salt Master has cached the public key for this ' - 'node. If this is the first time connecting to this ' - 'master then this key may need to be accepted using ' - '\'salt-key -a %s\' on the salt master. This salt ' - 'minion will wait for %s seconds before attempting ' - 'to re-authenticate.', - self.opts['id'], self.opts['acceptance_wait_time'] - ) - return 'retry' - auth['aes'] = self.verify_master(payload, master_pub='token' in sign_in_payload) - if not auth['aes']: - log.critical( - 'The Salt Master server\'s public key did not authenticate!\n' - 'The master may need to be updated if it is a version of Salt ' - 'lower than %s, or\n' - 'If you are confident that you are connecting to a valid Salt ' - 'Master, then remove the master public key and restart the ' - 'Salt Minion.\nThe master public key can be found ' - 'at:\n%s', salt.version.__version__, m_pub_fn - ) - sys.exit(42) - if self.opts.get('syndic_master', False): # Is syndic - syndic_finger = self.opts.get('syndic_finger', self.opts.get('master_finger', False)) - if syndic_finger: - if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != syndic_finger: - self._finger_fail(syndic_finger, m_pub_fn) - else: - if self.opts.get('master_finger', False): - if salt.utils.crypt.pem_finger(m_pub_fn, sum_type=self.opts['hash_type']) != self.opts['master_finger']: - self._finger_fail(self.opts['master_finger'], m_pub_fn) - auth['publish_port'] = payload['publish_port'] - return auth + return self.handle_signin_response(sign_in_payload, payload) class Crypticle(object): @@ -1380,11 +1382,11 @@ class Crypticle(object): AES_BLOCK_SIZE = 16 SIG_SIZE = hashlib.sha256().digest_size - def __init__(self, opts, key_string, key_size=192): + def __init__(self, opts, key_string, key_size=192, serial=0): self.key_string = key_string self.keys = self.extract_keys(self.key_string, key_size) self.key_size = key_size - self.serial = salt.payload.Serial(opts) + self.serial = serial @classmethod def generate_key_string(cls, key_size=192): @@ -1464,19 +1466,45 @@ class Crypticle(object): else: return data[:-data[-1]] - def dumps(self, obj): - ''' + def dumps(self, obj, nonce=None): + """ Serialize and encrypt a python object - ''' - return self.encrypt(self.PICKLE_PAD + self.serial.dumps(obj)) + """ + if nonce: + toencrypt = ( + self.PICKLE_PAD + nonce.encode() + salt.payload.Serial({}).dumps(obj) + ) + else: + toencrypt = self.PICKLE_PAD + salt.payload.Serial({}).dumps(obj) + return self.encrypt(toencrypt) - def loads(self, data, raw=False): - ''' + def loads(self, data, raw=False, nonce=None): + """ Decrypt and un-serialize a python object - ''' + """ data = self.decrypt(data) # simple integrity check to verify that we got meaningful data if not data.startswith(self.PICKLE_PAD): return {} - load = self.serial.loads(data[len(self.PICKLE_PAD):], raw=raw) - return load + data = data[len(self.PICKLE_PAD) :] + if nonce: + ret_nonce = data[:32].decode() + data = data[32:] + if ret_nonce != nonce: + raise SaltClientError("Nonce verification error") + payload = salt.payload.Serial({}).loads(data, raw=raw) + if isinstance(payload, dict): + if "serial" in payload: + serial = payload.pop("serial") + if serial <= self.serial: + log.critical( + "A message with an invalid serial was received.\n" + "this serial: %d\n" + "last serial: %d\n" + "The minion will not honor this request.", + serial, + self.serial, + ) + return {} + self.serial = serial + return payload diff --git a/salt/master.py b/salt/master.py index 8bccc00036..dda920b511 100644 --- a/salt/master.py +++ b/salt/master.py @@ -138,6 +138,51 @@ class SMaster(object): ''' return salt.daemons.masterapi.access_keys(self.opts) + @classmethod + def get_serial(cls, opts=None, event=None, lock=True): + if lock: + with cls.secrets["aes"]["secret"].get_lock(): + if cls.secrets["aes"]["serial"].value == sys.maxsize: + cls.rotate_secrets(opts, event, use_lock=False) + else: + cls.secrets["aes"]["serial"].value += 1 + return cls.secrets["aes"]["serial"].value + else: + if cls.secrets["aes"]["serial"].value == sys.maxsize: + cls.rotate_secrets(opts, event, use_lock=False) + else: + cls.secrets["aes"]["serial"].value += 1 + return cls.secrets["aes"]["serial"].value + + @classmethod + def rotate_secrets(cls, opts=None, event=None, use_lock=True): + log.info("Rotating master AES key") + if opts is None: + opts = {} + + for secret_key, secret_map in cls.secrets.items(): + # should be unnecessary-- since no one else should be modifying + if use_lock: + with secret_map["secret"].get_lock(): + secret_map["secret"].value = salt.utils.stringutils.to_bytes( + secret_map["reload"]() + ) + if "serial" in secret_map: + secret_map["serial"].value = 0 + else: + secret_map["secret"].value = salt.utils.stringutils.to_bytes( + secret_map["reload"]() + ) + if "serial" in secret_map: + secret_map["serial"].value = 0 + if event: + event.fire_event({"rotate_{}_key".format(secret_key): True}, tag="key") + + if opts.get("ping_on_rotate"): + # Ping all minions to get them to pick up the new key + log.debug("Pinging all connected minions due to key rotation") + salt.utils.master.ping_all_connected_minions(opts) + class Maintenance(salt.utils.process.SignalHandlingProcess): ''' @@ -298,18 +343,8 @@ class Maintenance(salt.utils.process.SignalHandlingProcess): to_rotate = True if to_rotate: - log.info('Rotating master AES key') - for secret_key, secret_map in six.iteritems(SMaster.secrets): - # should be unnecessary-- since no one else should be modifying - with secret_map['secret'].get_lock(): - secret_map['secret'].value = salt.utils.stringutils.to_bytes(secret_map['reload']()) - self.event.fire_event({'rotate_{0}_key'.format(secret_key): True}, tag='key') + SMaster.rotate_secrets(self.opts, self.event) self.rotate = now - if self.opts.get('ping_on_rotate'): - # Ping all minions to get them to pick up the new key - log.debug('Pinging all connected minions ' - 'due to key rotation') - salt.utils.master.ping_all_connected_minions(self.opts) def handle_git_pillar(self): ''' @@ -671,9 +706,13 @@ class Master(SMaster): salt.crypt.Crypticle.generate_key_string() ) ), - 'reload': salt.crypt.Crypticle.generate_key_string + "serial": multiprocessing.Value( + ctypes.c_longlong, lock=False # We'll use the lock from 'secret' + ), + "reload": salt.crypt.Crypticle.generate_key_string, } - log.info('Creating master process manager') + + log.info("Creating master process manager") # Since there are children having their own ProcessManager we should wait for kill more time. self.process_manager = salt.utils.process.ProcessManager(wait_for_kill=5) pub_channels = [] diff --git a/salt/pillar/__init__.py b/salt/pillar/__init__.py index 6b36cda708..81a0417d0f 100644 --- a/salt/pillar/__init__.py +++ b/salt/pillar/__init__.py @@ -13,7 +13,7 @@ import logging import salt.ext.tornado.gen import sys import traceback -import inspect +import uuid # Import salt libs import salt.loader @@ -178,6 +178,9 @@ class AsyncRemotePillar(RemotePillarMixin): load, dictkey='pillar', ) + except salt.crypt.AuthenticationError as exc: + log.error(exc.message) + raise SaltClientError("Exception getting pillar.") except Exception: # pylint: disable=broad-except log.exception('Exception getting pillar:') raise SaltClientError('Exception getting pillar.') diff --git a/salt/transport/mixins/auth.py b/salt/transport/mixins/auth.py index ec736227ab..cfc1ef36a8 100644 --- a/salt/transport/mixins/auth.py +++ b/salt/transport/mixins/auth.py @@ -107,10 +107,10 @@ class AESReqServerMixin(object): self.master_key = salt.crypt.MasterKeys(self.opts) - def _encrypt_private(self, ret, dictkey, target): - ''' + def _encrypt_private(self, ret, dictkey, target, nonce=None, sign_messages=True): + """ The server equivalent of ReqChannel.crypted_transfer_decode_dictentry - ''' + """ # encrypt with a specific AES key pubfn = os.path.join(self.opts['pki_dir'], 'minions', @@ -124,9 +124,8 @@ class AESReqServerMixin(object): except (ValueError, IndexError, TypeError): return self.crypticle.dumps({}) except IOError: - log.error('AES key not found') - return {'error': 'AES key not found'} - + log.error("AES key not found") + return {"error": "AES key not found"} pret = {} if not six.PY2: key = salt.utils.stringutils.to_bytes(key) @@ -134,12 +133,34 @@ class AESReqServerMixin(object): pret['key'] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding) else: cipher = PKCS1_OAEP.new(pub) - pret['key'] = cipher.encrypt(key) - pret[dictkey] = pcrypt.dumps( - ret if ret is not False else {} - ) + pret["key"] = cipher.encrypt(key) + if ret is False: + ret = {} + if sign_messages: + if nonce is None: + return {"error": "Nonce not included in request"} + tosign = salt.payload.Serial({}).dumps( + {"key": pret["key"], "pillar": ret, "nonce": nonce} + ) + master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") + signed_msg = { + "data": tosign, + "sig": salt.crypt.sign_message(master_pem_path, tosign), + } + pret[dictkey] = pcrypt.dumps(signed_msg) + else: + pret[dictkey] = pcrypt.dumps(ret) return pret + def _clear_signed(self, load): + master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") + tosign = salt.payload.Serial({}).dumps(load) + return { + "enc": "clear", + "load": tosign, + "sig": salt.crypt.sign_message(master_pem_path, tosign), + } + def _update_aes(self): ''' Check to see if a fresh AES key is available and update the components @@ -161,8 +182,8 @@ class AESReqServerMixin(object): payload['load'] = self.crypticle.loads(payload['load']) return payload - def _auth(self, load): - ''' + def _auth(self, load, sign_messages=False): + """ Authenticate the client, use the sent public key to encrypt the AES key which was generated at start up. @@ -175,13 +196,15 @@ class AESReqServerMixin(object): # Make an RSA key with the pub key # Encrypt the AES key as an encrypted salt.payload # Package the return and return it - ''' + """ - if not salt.utils.verify.valid_id(self.opts, load['id']): - log.info('Authentication request from invalid id %s', load['id']) - return {'enc': 'clear', - 'load': {'ret': False}} - log.info('Authentication request from %s', load['id']) + if not salt.utils.verify.valid_id(self.opts, load["id"]): + log.info("Authentication request from invalid id %s", load["id"]) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} + log.info("Authentication request from %s", load["id"]) # 0 is default which should be 'unlimited' if self.opts['max_minions'] > 0: @@ -209,10 +232,16 @@ class AESReqServerMixin(object): 'id': load['id'], 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': 'full'}} + if self.opts.get("auth_events") is True: + self.event.fire_event( + eload, salt.utils.event.tagify(prefix="auth") + ) + if sign_messages: + return self._clear_signed( + {"ret": "full", "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": "full"}} # Check if key is configured to be auto-rejected/signed auto_reject = self.auto_key.check_autoreject(load['id']) @@ -236,16 +265,17 @@ class AESReqServerMixin(object): pass elif os.path.isfile(pubfn_rejected): # The key has been rejected, don't place it in pending - log.info('Public key rejected for %s. Key is present in ' - 'rejection key dir.', load['id']) - eload = {'result': False, - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} - + log.info( + "Public key rejected for %s. Key is present in " "rejection key dir.", + load["id"], + ) + eload = {"result": False, "id": load["id"], "pub": load["pub"]} + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} elif os.path.isfile(pubfn): # The key has been accepted, check it with salt.utils.files.fopen(pubfn, 'r') as pubfn_handle: @@ -256,29 +286,37 @@ class AESReqServerMixin(object): 'the Salt cluster.', load['id'] ) # put denied minion key into minions_denied - with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_: - fp_.write(load['pub']) - eload = {'result': False, - 'id': load['id'], - 'act': 'denied', - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} + with salt.utils.files.fopen(pubfn_denied, "w+") as fp_: + fp_.write(load["pub"]) + eload = { + "result": False, + "id": load["id"], + "act": "denied", + "pub": load["pub"], + } + if self.opts.get("auth_events") is True: + self.event.fire_event( + eload, salt.utils.event.tagify(prefix="auth") + ) + if sign_messages: + return self._clear_signed( + {"ret": False, "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": False}} elif not os.path.isfile(pubfn_pend): # The key has not been accepted, this is a new minion if os.path.isdir(pubfn_pend): # The key path is a directory, error out - log.info('New public key %s is a directory', load['id']) - eload = {'result': False, - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} + log.info("New public key %s is a directory", load["id"]) + eload = {"result": False, "id": load["id"], "pub": load["pub"]} + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} if auto_reject: key_path = pubfn_rejected @@ -297,17 +335,22 @@ class AESReqServerMixin(object): if key_path is not None: # Write the key to the appropriate location - with salt.utils.files.fopen(key_path, 'w+') as fp_: - fp_.write(load['pub']) - ret = {'enc': 'clear', - 'load': {'ret': key_result}} - eload = {'result': key_result, - 'act': key_act, - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return ret + with salt.utils.files.fopen(key_path, "w+") as fp_: + fp_.write(load["pub"]) + eload = { + "result": key_result, + "act": key_act, + "id": load["id"], + "pub": load["pub"], + } + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + return self._clear_signed( + {"ret": key_result, "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": key_result}} elif os.path.isfile(pubfn_pend): # This key is in the pending dir and is awaiting acceptance @@ -319,17 +362,22 @@ class AESReqServerMixin(object): shutil.move(pubfn_pend, pubfn_rejected) except (IOError, OSError): pass - log.info('Pending public key for %s rejected via ' - 'autoreject_file', load['id']) - ret = {'enc': 'clear', - 'load': {'ret': False}} - eload = {'result': False, - 'act': 'reject', - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return ret + log.info( + "Pending public key for %s rejected via " "autoreject_file", + load["id"], + ) + eload = { + "result": False, + "act": "reject", + "id": load["id"], + "pub": load["pub"], + } + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} elif not auto_sign: # This key is in the pending dir and is not being auto-signed. @@ -344,30 +392,46 @@ class AESReqServerMixin(object): 'attempt to compromise the Salt cluster.', load['id'] ) # put denied minion key into minions_denied - with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_: - fp_.write(load['pub']) - eload = {'result': False, - 'id': load['id'], - 'act': 'denied', - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} + with salt.utils.files.fopen(pubfn_denied, "w+") as fp_: + fp_.write(load["pub"]) + eload = { + "result": False, + "id": load["id"], + "act": "denied", + "pub": load["pub"], + } + if self.opts.get("auth_events") is True: + self.event.fire_event( + eload, salt.utils.event.tagify(prefix="auth") + ) + if sign_messages: + return self._clear_signed( + {"ret": False, "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": False}} else: log.info( 'Authentication failed from host %s, the key is in ' 'pending and needs to be accepted with salt-key ' '-a %s', load['id'], load['id'] ) - eload = {'result': True, - 'act': 'pend', - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': True}} + eload = { + "result": True, + "act": "pend", + "id": load["id"], + "pub": load["pub"], + } + if self.opts.get("auth_events") is True: + self.event.fire_event( + eload, salt.utils.event.tagify(prefix="auth") + ) + if sign_messages: + return self._clear_signed( + {"ret": True, "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": True}} else: # This key is in pending and has been configured to be # auto-signed. Check to see if it is the same key, and if @@ -381,28 +445,32 @@ class AESReqServerMixin(object): 'attempt to compromise the Salt cluster.', load['id'] ) # put denied minion key into minions_denied - with salt.utils.files.fopen(pubfn_denied, 'w+') as fp_: - fp_.write(load['pub']) - eload = {'result': False, - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} + with salt.utils.files.fopen(pubfn_denied, "w+") as fp_: + fp_.write(load["pub"]) + eload = {"result": False, "id": load["id"], "pub": load["pub"]} + if self.opts.get("auth_events") is True: + self.event.fire_event( + eload, salt.utils.event.tagify(prefix="auth") + ) + if sign_messages: + return self._clear_signed( + {"ret": False, "nonce": load["nonce"]} + ) + else: + return {"enc": "clear", "load": {"ret": False}} else: os.remove(pubfn_pend) else: # Something happened that I have not accounted for, FAIL! - log.warning('Unaccounted for authentication failure') - eload = {'result': False, - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) - return {'enc': 'clear', - 'load': {'ret': False}} + log.warning("Unaccounted for authentication failure") + eload = {"result": False, "id": load["id"], "pub": load["pub"]} + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} log.info('Authentication accepted from %s', load['id']) # only write to disk if you are adding the file, and in open mode, @@ -415,14 +483,16 @@ class AESReqServerMixin(object): if os.path.isfile(pubfn): with salt.utils.files.fopen(pubfn, 'r') as fp_: disk_key = fp_.read() - if load['pub'] and load['pub'] != disk_key: - log.debug('Host key change detected in open mode.') - with salt.utils.files.fopen(pubfn, 'w+') as fp_: - fp_.write(load['pub']) - elif not load['pub']: - log.error('Public key is empty: {0}'.format(load['id'])) - return {'enc': 'clear', - 'load': {'ret': False}} + if load["pub"] and load["pub"] != disk_key: + log.debug("Host key change detected in open mode.") + with salt.utils.files.fopen(pubfn, "w+") as fp_: + fp_.write(load["pub"]) + elif not load["pub"]: + log.error("Public key is empty: %s", load["id"]) + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} pub = None @@ -436,8 +506,10 @@ class AESReqServerMixin(object): pub = salt.crypt.get_rsa_pub_key(pubfn) except salt.crypt.InvalidKeyError as err: log.error('Corrupt public key "%s": %s', pubfn, err) - return {'enc': 'clear', - 'load': {'ret': False}} + if sign_messages: + return self._clear_signed({"ret": False, "nonce": load["nonce"]}) + else: + return {"enc": "clear", "load": {"ret": False}} if not HAS_M2: cipher = PKCS1_OAEP.new(pub) @@ -507,14 +579,15 @@ class AESReqServerMixin(object): ret['aes'] = pub.public_encrypt(aes, RSA.pkcs1_oaep_padding) else: - ret['aes'] = cipher.encrypt(aes) + ret["aes"] = cipher.encrypt(aes) + # Be aggressive about the signature digest = salt.utils.stringutils.to_bytes(hashlib.sha256(aes).hexdigest()) - ret['sig'] = salt.crypt.private_encrypt(self.master_key.key, digest) - eload = {'result': True, - 'act': 'accept', - 'id': load['id'], - 'pub': load['pub']} - if self.opts.get('auth_events') is True: - self.event.fire_event(eload, salt.utils.event.tagify(prefix='auth')) + ret["sig"] = salt.crypt.private_encrypt(self.master_key.key, digest) + eload = {"result": True, "act": "accept", "id": load["id"], "pub": load["pub"]} + if self.opts.get("auth_events") is True: + self.event.fire_event(eload, salt.utils.event.tagify(prefix="auth")) + if sign_messages: + ret["nonce"] = load["nonce"] + return self._clear_signed(ret) return ret diff --git a/salt/transport/tcp.py b/salt/transport/tcp.py index 12ef24e86f..9b2f6641ab 100644 --- a/salt/transport/tcp.py +++ b/salt/transport/tcp.py @@ -16,6 +16,7 @@ import sys import time import threading import traceback +import uuid import weakref # Import Salt Libs @@ -339,12 +340,17 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): def _package_load(self, load): return { - 'enc': self.crypt, - 'load': load, + "enc": self.crypt, + "load": load, + "version": 2, } @salt.ext.tornado.gen.coroutine - def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60): + def crypted_transfer_decode_dictentry( + self, load, dictkey=None, tries=3, timeout=60 + ): + nonce = uuid.uuid4().hex + load["nonce"] = nonce if not self.auth.authenticated: yield self.auth.authenticate() ret = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout) @@ -353,12 +359,30 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): aes = key.private_decrypt(ret['key'], RSA.pkcs1_oaep_padding) else: cipher = PKCS1_OAEP.new(key) - aes = cipher.decrypt(ret['key']) + aes = cipher.decrypt(ret["key"]) + + # Decrypt using the public key. pcrypt = salt.crypt.Crypticle(self.opts, aes) - data = pcrypt.loads(ret[dictkey]) - if six.PY3: - data = salt.transport.frame.decode_embedded_strs(data) - raise salt.ext.tornado.gen.Return(data) + signed_msg = pcrypt.loads(ret[dictkey]) + + # Validate the master's signature. + master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub") + if not salt.crypt.verify_signature( + master_pubkey_path, signed_msg["data"], signed_msg["sig"] + ): + raise salt.crypt.AuthenticationError( + "Pillar payload signature failed to validate." + ) + + # Make sure the signed key matches the key we used to decrypt the data. + data = salt.payload.Serial({}).loads(signed_msg["data"]) + if data["key"] != ret["key"]: + raise salt.crypt.AuthenticationError("Key verification failed.") + + # Validate the nonce. + if data["nonce"] != nonce: + raise salt.crypt.AuthenticationError("Pillar nonce verification failed.") + raise salt.ext.tornado.gen.Return(data["pillar"]) @salt.ext.tornado.gen.coroutine def _crypted_transfer(self, load, tries=3, timeout=60): @@ -368,6 +392,10 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): Indeed, we can fail too early in case of a master restart during a minion state execution call ''' + nonce = uuid.uuid4().hex + if load and isinstance(load, dict): + load["nonce"] = nonce + @salt.ext.tornado.gen.coroutine def _do_transfer(): data = yield self.message_client.send(self._package_load(self.auth.crypticle.dumps(load)), @@ -378,9 +406,8 @@ class AsyncTCPReqChannel(salt.transport.client.ReqChannel): # communication, we do not subscribe to return events, we just # upload the results to the master if data: - data = self.auth.crypticle.loads(data) - if six.PY3: - data = salt.transport.frame.decode_embedded_strs(data) + data = self.auth.crypticle.loads(data, nonce=nonce) + data = salt.transport.frame.decode_embedded_strs(data) raise salt.ext.tornado.gen.Return(data) if not self.auth.authenticated: @@ -448,8 +475,9 @@ class AsyncTCPPubChannel(salt.transport.mixins.auth.AESPubClientMixin, salt.tran def _package_load(self, load): return { - 'enc': self.crypt, - 'load': load, + "enc": self.crypt, + "load": load, + "version": 2, } @salt.ext.tornado.gen.coroutine @@ -719,13 +747,31 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra stream.send(self.serial.dumps('bad load: id {0} is not a string'.format(id_))) raise salt.ext.tornado.gen.Return() + version = 0 + if "version" in payload: + version = payload["version"] + + sign_messages = False + if version > 1: + sign_messages = True + # intercept the "_auth" commands, since the main daemon shouldn't know # anything about our key auth - if payload['enc'] == 'clear' and payload.get('load', {}).get('cmd') == '_auth': - yield stream.write(salt.transport.frame.frame_msg( - self._auth(payload['load']), header=header)) + if ( + payload["enc"] == "clear" + and payload.get("load", {}).get("cmd") == "_auth" + ): + yield stream.write( + salt.transport.frame.frame_msg( + self._auth(payload["load"], sign_messages), header=header + ) + ) raise salt.ext.tornado.gen.Return() + nonce = None + if version > 1: + nonce = payload["load"].pop("nonce", None) + # TODO: test try: ret, req_opts = yield self.payload_handler(payload) @@ -739,13 +785,21 @@ class TCPReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, salt.tra req_fun = req_opts.get('fun', 'send') if req_fun == 'send_clear': stream.write(salt.transport.frame.frame_msg(ret, header=header)) - elif req_fun == 'send': - stream.write(salt.transport.frame.frame_msg(self.crypticle.dumps(ret), header=header)) - elif req_fun == 'send_private': - stream.write(salt.transport.frame.frame_msg(self._encrypt_private(ret, - req_opts['key'], - req_opts['tgt'], - ), header=header)) + elif req_fun == "send": + stream.write( + salt.transport.frame.frame_msg( + self.crypticle.dumps(ret, nonce), header=header + ) + ) + elif req_fun == "send_private": + stream.write( + salt.transport.frame.frame_msg( + self._encrypt_private( + ret, req_opts["key"], req_opts["tgt"], nonce, sign_messages, + ), + header=header, + ) + ) else: log.error('Unknown req_fun %s', req_fun) # always attempt to return an error to the minion @@ -1274,8 +1328,9 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object): ''' TCP publisher ''' - def __init__(self, opts, io_loop=None): - super(PubServer, self).__init__(ssl_options=opts.get('ssl')) + + def __init__(self, opts, io_loop=None, pack_publish=lambda _: _): + super().__init__(ssl_options=opts.get("ssl")) self.io_loop = io_loop self.opts = opts self._closing = False @@ -1300,6 +1355,10 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object): opts=self.opts, listen=False ) + self._pack_publish = pack_publish + + def pack_publish(self, payload): + return self._pack_publish(payload) def close(self): if self._closing: @@ -1406,8 +1465,9 @@ class PubServer(salt.ext.tornado.tcpserver.TCPServer, object): # TODO: ACK the publish through IPC @salt.ext.tornado.gen.coroutine def publish_payload(self, package, _): - log.debug('TCP PubServer sending payload: %s', package) - payload = salt.transport.frame.frame_msg(package['payload']) + log.debug("TCP PubServer sending payload: %s", package) + package = self.pack_publish(package) + payload = salt.transport.frame.frame_msg(package["payload"]) to_remove = [] if 'topic_lst' in package: @@ -1482,7 +1542,9 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel): self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() # Spin up the publisher - pub_server = PubServer(self.opts, io_loop=self.io_loop) + pub_server = PubServer( + self.opts, io_loop=self.io_loop, pack_publish=self.pack_publish + ) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) _set_tcp_keepalive(sock, self.opts) @@ -1523,30 +1585,20 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel): ''' process_manager.add_process(self._publish_daemon, kwargs=kwargs) - def publish(self, load): - ''' + def pack_publish(self, load): + """ Publish "load" to minions - ''' - payload = {'enc': 'aes'} - - crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value) - payload['load'] = crypticle.dumps(load) - if self.opts['sign_pub_messages']: - master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem') - log.debug("Signing data packet") - payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load']) - # Use the Salt IPC server - if self.opts.get('ipc_mode', '') == 'tcp': - pull_uri = int(self.opts.get('tcp_master_publish_pull', 4514)) - else: - pull_uri = os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') - # TODO: switch to the actual asynchronous interface - #pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop) - pub_sock = salt.utils.asynchronous.SyncWrapper( - salt.transport.ipc.IPCMessageClient, - (pull_uri,) + """ + payload = {"enc": "aes"} + load["serial"] = salt.master.SMaster.get_serial() + crypticle = salt.crypt.Crypticle( + self.opts, salt.master.SMaster.secrets["aes"]["secret"].value ) - pub_sock.connect() + payload["load"] = crypticle.dumps(load) + if self.opts["sign_pub_messages"]: + master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") + log.debug("Signing data packet") + payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"]) int_payload = {'payload': self.serial.dumps(payload)} @@ -1562,6 +1614,23 @@ class TCPPubServerChannel(salt.transport.server.PubServerChannel): # Send list of miions thru so zmq can target them int_payload['topic_lst'] = match_ids else: - int_payload['topic_lst'] = load['tgt'] + int_payload["topic_lst"] = load["tgt"] + return int_payload + + def publish(self, load): + """ + Publish "load" to minions + """ # Send it over IPC! - pub_sock.send(int_payload) + # Use the Salt IPC server + # TODO: switch to the actual asynchronous interface + # pub_sock = salt.transport.ipc.IPCMessageClient(self.opts, io_loop=self.io_loop) + if self.opts.get("ipc_mode", "") == "tcp": + pull_uri = int(self.opts.get("tcp_master_publish_pull", 4514)) + else: + pull_uri = os.path.join(self.opts["sock_dir"], "publish_pull.ipc") + pub_sock = salt.utils.asynchronous.SyncWrapper( + salt.transport.ipc.IPCMessageClient, (pull_uri,), loop_kwarg="io_loop", + ) + pub_sock.connect() + pub_sock.send(load) diff --git a/salt/transport/zeromq.py b/salt/transport/zeromq.py index a5844e9132..540abb8853 100644 --- a/salt/transport/zeromq.py +++ b/salt/transport/zeromq.py @@ -13,6 +13,8 @@ import signal import socket import hashlib import logging +import signal +import uuid import weakref import threading from random import randint @@ -21,6 +23,7 @@ from random import randint import salt.auth import salt.crypt import salt.log.setup +import salt.utils.crypt import salt.utils.event import salt.utils.files import salt.utils.minions @@ -65,6 +68,7 @@ except ImportError: except ImportError: from Crypto.Cipher import PKCS1_OAEP + log = logging.getLogger(__name__) @@ -79,11 +83,12 @@ def _get_master_uri(master_ip, rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0); Source: http://api.zeromq.org/4-1:zmq-tcp ''' - from salt.utils.zeromq import ip_bracket - master_uri = 'tcp://{master_ip}:{master_port}'.format( - master_ip=ip_bracket(master_ip), master_port=master_port) + from salt.utils.zeromq import ip_bracket + master_uri = "tcp://{master_ip}:{master_port}".format( + master_ip=ip_bracket(master_ip), master_port=master_port + ) if source_ip or source_port: if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1): # The source:port syntax for ZeroMQ has been added in libzmq 4.1.6 @@ -292,23 +297,30 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): def _package_load(self, load): return { - 'enc': self.crypt, - 'load': load, + "enc": self.crypt, + "load": load, + "version": 2, } @salt.ext.tornado.gen.coroutine - def crypted_transfer_decode_dictentry(self, load, dictkey=None, tries=3, timeout=60): + def crypted_transfer_decode_dictentry( + self, load, dictkey=None, tries=3, timeout=60 + ): + nonce = uuid.uuid4().hex + load["nonce"] = nonce if not self.auth.authenticated: # Return control back to the caller, continue when authentication succeeds yield self.auth.authenticate() - # Return control to the caller. When send() completes, resume by populating ret with the Future.result + + # Return control to the caller. When send() completes, resume by + # populating ret with the Future.result ret = yield self.message_client.send( self._package_load(self.auth.crypticle.dumps(load)), timeout=timeout, tries=tries, ) - key = self.auth.get_keys() - if 'key' not in ret: + + if "key" not in ret: # Reauth in the case our key is deleted on the master side. yield self.auth.authenticate() ret = yield self.message_client.send( @@ -316,17 +328,37 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): timeout=timeout, tries=tries, ) + + key = self.auth.get_keys() if HAS_M2: aes = key.private_decrypt(ret['key'], RSA.pkcs1_oaep_padding) else: cipher = PKCS1_OAEP.new(key) - aes = cipher.decrypt(ret['key']) + aes = cipher.decrypt(ret["key"]) + + # Decrypt using the public key. pcrypt = salt.crypt.Crypticle(self.opts, aes) - data = pcrypt.loads(ret[dictkey]) - if six.PY3: - data = salt.transport.frame.decode_embedded_strs(data) - raise salt.ext.tornado.gen.Return(data) + signed_msg = pcrypt.loads(ret[dictkey]) + + # Validate the master's signature. + master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub") + if not salt.crypt.verify_signature( + master_pubkey_path, signed_msg["data"], signed_msg["sig"] + ): + raise salt.crypt.AuthenticationError( + "Pillar payload signature failed to validate." + ) + + # Make sure the signed key matches the key we used to decrypt the data. + data = salt.payload.Serial({}).loads(signed_msg["data"]) + if data["key"] != ret["key"]: + raise salt.crypt.AuthenticationError("Key verification failed.") + + # Validate the nonce. + if data["nonce"] != nonce: + raise salt.crypt.AuthenticationError("Pillar nonce verification failed.") + raise salt.ext.tornado.gen.Return(data["pillar"]) @salt.ext.tornado.gen.coroutine def _crypted_transfer(self, load, tries=3, timeout=60, raw=False): @@ -343,6 +375,10 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): :param int tries: The number of times to make before failure :param int timeout: The number of seconds on a response before failing ''' + nonce = uuid.uuid4().hex + if load and isinstance(load, dict): + load["nonce"] = nonce + @salt.ext.tornado.gen.coroutine def _do_transfer(): # Yield control to the caller. When send() completes, resume by populating data with the Future.result @@ -356,7 +392,7 @@ class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): # communication, we do not subscribe to return events, we just # upload the results to the master if data: - data = self.auth.crypticle.loads(data, raw) + data = self.auth.crypticle.loads(data, raw, nonce) if six.PY3 and not raw: data = salt.transport.frame.decode_embedded_strs(data) raise salt.ext.tornado.gen.Return(data) @@ -774,12 +810,24 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, stream.send(self.serial.dumps('bad load: id {0} is not a string'.format(id_))) raise salt.ext.tornado.gen.Return() + version = 0 + if "version" in payload: + version = payload["version"] + + sign_messages = False + if version > 1: + sign_messages = True + # intercept the "_auth" commands, since the main daemon shouldn't know # anything about our key auth - if payload['enc'] == 'clear' and payload.get('load', {}).get('cmd') == '_auth': - stream.send(self.serial.dumps(self._auth(payload['load']))) + if payload["enc"] == "clear" and payload.get("load", {}).get("cmd") == "_auth": + stream.send(self.serial.dumps(self._auth(payload["load"], sign_messages))) raise salt.ext.tornado.gen.Return() + nonce = None + if version > 1: + nonce = payload["load"].pop("nonce", None) + # TODO: test try: # Take the payload_handler function that was registered when we created the channel @@ -794,13 +842,16 @@ class ZeroMQReqServerChannel(salt.transport.mixins.auth.AESReqServerMixin, req_fun = req_opts.get('fun', 'send') if req_fun == 'send_clear': stream.send(self.serial.dumps(ret)) - elif req_fun == 'send': - stream.send(self.serial.dumps(self.crypticle.dumps(ret))) - elif req_fun == 'send_private': - stream.send(self.serial.dumps(self._encrypt_private(ret, - req_opts['key'], - req_opts['tgt'], - ))) + elif req_fun == "send": + stream.send(self.serial.dumps(self.crypticle.dumps(ret, nonce))) + elif req_fun == "send_private": + stream.send( + self.serial.dumps( + self._encrypt_private( + ret, req_opts["key"], req_opts["tgt"], nonce, sign_messages, + ) + ) + ) else: log.error('Unknown req_fun %s', req_fun) # always attempt to return an error to the minion @@ -878,6 +929,7 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): if log_queue: salt.log.setup.set_multiprocessing_logging_queue(log_queue) salt.log.setup.setup_multiprocessing_logging(log_queue) + salt.utils.crypt.reinit_crypto() # Set up the context context = zmq.Context(1) @@ -929,7 +981,9 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): try: log.debug('Publish daemon getting data from puller %s', pull_uri) package = pull_sock.recv() - log.debug('Publish daemon received payload. size=%d', len(package)) + log.debug("Publish daemon received payload. size=%d", len(package)) + load = salt.payload.Serial({}).loads(package) + package = self.pack_publish(load) unpacked_package = salt.payload.unpackage(package) if six.PY3: @@ -1012,17 +1066,20 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): self.pub_close() ctx = zmq.Context.instance() self._sock_data.sock = ctx.socket(zmq.PUSH) - self.pub_sock.setsockopt(zmq.LINGER, -1) - if self.opts.get('ipc_mode', '') == 'tcp': - pull_uri = 'tcp://127.0.0.1:{0}'.format( - self.opts.get('tcp_master_publish_pull', 4514) - ) + self._sock_data.sock.setsockopt(zmq.LINGER, -1) + self._sock_data.sock.setsockopt(zmq.SNDHWM, self.opts.get("pub_hwm", 1000)) + self._sock_data.sock.setsockopt(zmq.RCVHWM, self.opts.get("pub_hwm", 1000)) + self._sock_data.sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000)) + if self.opts.get("ipc_mode", "") == "tcp": + pull_uri = "tcp://127.0.0.1:{}".format( + self.opts.get("tcp_master_publish_pull", 4514) + ) else: pull_uri = 'ipc://{0}'.format( os.path.join(self.opts['sock_dir'], 'publish_pull.ipc') ) log.debug("Connecting to pub server: %s", pull_uri) - self.pub_sock.connect(pull_uri) + self._sock_data.sock.connect(pull_uri) return self._sock_data.sock def pub_close(self): @@ -1032,20 +1089,23 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): ''' if hasattr(self._sock_data, 'sock'): self._sock_data.sock.close() - delattr(self._sock_data, 'sock') + self._sock_data.sock = None - def publish(self, load): - ''' - Publish "load" to minions. This send the load to the publisher daemon - process with does the actual sending to minions. + def pack_publish(self, load): + """ + Package the "load" for a publish to minions. This send the load to the + publisher daemon process with does the actual sending to minions. :param dict load: A load to be sent across the wire to minions - ''' - payload = {'enc': 'aes'} - crypticle = salt.crypt.Crypticle(self.opts, salt.master.SMaster.secrets['aes']['secret'].value) - payload['load'] = crypticle.dumps(load) - if self.opts['sign_pub_messages']: - master_pem_path = os.path.join(self.opts['pki_dir'], 'master.pem') + """ + payload = {"enc": "aes"} + load["serial"] = salt.master.SMaster.get_serial() + crypticle = salt.crypt.Crypticle( + self.opts, salt.master.SMaster.secrets["aes"]["secret"].value + ) + payload["load"] = crypticle.dumps(load) + if self.opts["sign_pub_messages"]: + master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") log.debug("Signing data packet") payload['sig'] = salt.crypt.sign_message(master_pem_path, payload['load']) int_payload = {'payload': self.serial.dumps(payload)} @@ -1070,10 +1130,19 @@ class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): 'Sending payload to publish daemon. jid=%s size=%d', load.get('jid', None), len(payload), ) + return payload + + def publish(self, load): + """ + Publish "load" to minions. This send the load to the publisher daemon + process with does the actual sending to minions. + + :param dict load: A load to be sent across the wire to minions + """ if not self.pub_sock: self.pub_connect() - self.pub_sock.send(payload) - log.debug('Sent payload to publish daemon.') + self.pub_sock.send(self.serial.dumps(load)) + log.debug("Sent payload to publish daemon.") class AsyncReqMessageClientPool(salt.transport.MessageClientPool): diff --git a/salt/utils/minions.py b/salt/utils/minions.py index b02ab34777..6995a29131 100644 --- a/salt/utils/minions.py +++ b/salt/utils/minions.py @@ -742,21 +742,28 @@ class CkMinions(object): return _res def validate_tgt(self, valid, expr, tgt_type, minions=None, expr_form=None): - ''' - Return a Bool. This function returns if the expression sent in is - within the scope of the valid expression - ''' + """ + Validate the target minions against the possible valid minions. + + If ``minions`` is provided, they will be compared against the valid + minions. Otherwise, ``expr`` and ``tgt_type`` will be used to expand + to a list of target minions. - v_minions = set(self.check_minions(valid, 'compound').get('minions', [])) + Return True if all of the requested minions are valid minions, + otherwise return False. + """ + + v_minions = set(self.check_minions(valid, "compound").get("minions", [])) + if not v_minions: + # There are no valid minions, so it doesn't matter what we are + # targeting - this is a fail. + return False if minions is None: _res = self.check_minions(expr, tgt_type) minions = set(_res['minions']) else: minions = set(minions) - d_bool = not bool(minions.difference(v_minions)) - if len(v_minions) == len(minions) and d_bool: - return True - return d_bool + return minions.issubset(v_minions) def match_check(self, regex, fun): ''' diff --git a/tests/pytests/unit/test_crypt.py b/tests/pytests/unit/test_crypt.py index aa8f439b8c..6ffd912166 100644 --- a/tests/pytests/unit/test_crypt.py +++ b/tests/pytests/unit/test_crypt.py @@ -4,10 +4,159 @@ tests.pytests.unit.test_crypt Unit tests for salt's crypt module """ +import uuid + import pytest import salt.crypt +import salt.master import salt.utils.files +PRIV_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAoAsMPt+4kuIG6vKyw9r3+OuZrVBee/2vDdVetW+Js5dTlgrJ +aghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLnyHNJ/HpVhMG0M07MF6FMfILtDrrt8 +ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+fu6HYwu96HggmG2pqkOrn3iGfqBvV +YVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpef8vRUrNicRLc7dAcvfhtgt2DXEZ2 +d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvTIIPQIjR8htFxGTz02STVXfnhnJ0Z +k8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cYOwIDAQABAoIBABZUJEO7Y91+UnfC +H6XKrZEZkcnH7j6/UIaOD9YhdyVKxhsnax1zh1S9vceNIgv5NltzIsfV6vrb6v2K +Dx/F7Z0O0zR5o+MlO8ZncjoNKskex10gBEWG00Uqz/WPlddiQ/TSMJTv3uCBAzp+ +S2Zjdb4wYPUlgzSgb2ygxrhsRahMcSMG9PoX6klxMXFKMD1JxiY8QfAHahPzQXy9 +F7COZ0fCVo6BE+MqNuQ8tZeIxu8mOULQCCkLFwXmkz1FpfK/kNRmhIyhxwvCS+z4 +JuErW3uXfE64RLERiLp1bSxlDdpvRO2R41HAoNELTsKXJOEt4JANRHm/CeyA5wsh +NpscufUCgYEAxhgPfcMDy2v3nL6KtkgYjdcOyRvsAF50QRbEa8ldO+87IoMDD/Oe +osFERJ5hhyyEO78QnaLVegnykiw5DWEF02RKMhD/4XU+1UYVhY0wJjKQIBadsufB +2dnaKjvwzUhPh5BrBqNHl/FXwNCRDiYqXa79eWCPC9OFbZcUWWq70s8CgYEAztOI +61zRfmXJ7f70GgYbHg+GA7IrsAcsGRITsFR82Ho0lqdFFCxz7oK8QfL6bwMCGKyk +nzk+twh6hhj5UNp18KN8wktlo02zTgzgemHwaLa2cd6xKgmAyuPiTgcgnzt5LVNG +FOjIWkLwSlpkDTl7ZzY2QSy7t+mq5d750fpIrtUCgYBWXZUbcpPL88WgDB7z/Bjg +dlvW6JqLSqMK4b8/cyp4AARbNp12LfQC55o5BIhm48y/M70tzRmfvIiKnEc/gwaE +NJx4mZrGFFURrR2i/Xx5mt/lbZbRsmN89JM+iKWjCpzJ8PgIi9Wh9DIbOZOUhKVB +9RJEAgo70LvCnPTdS0CaVwKBgDJW3BllAvw/rBFIH4OB/vGnF5gosmdqp3oGo1Ik +jipmPAx6895AH4tquIVYrUl9svHsezjhxvjnkGK5C115foEuWXw0u60uiTiy+6Pt +2IS0C93VNMulenpnUrppE7CN2iWFAiaura0CY9fE/lsVpYpucHAWgi32Kok+ZxGL +WEttAoGAN9Ehsz4LeQxEj3x8wVeEMHF6OsznpwYsI2oVh6VxpS4AjgKYqeLVcnNi +TlZFsuQcqgod8OgzA91tdB+Rp86NygmWD5WzeKXpCOg9uA+y/YL+0sgZZHsuvbK6 +PllUgXdYxqClk/hdBFB7v9AQoaj7K9Ga22v32msftYDQRJ94xOI= +-----END RSA PRIVATE KEY----- +""" + + +PUB_KEY = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoAsMPt+4kuIG6vKyw9r3 ++OuZrVBee/2vDdVetW+Js5dTlgrJaghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLny +HNJ/HpVhMG0M07MF6FMfILtDrrt8ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+f +u6HYwu96HggmG2pqkOrn3iGfqBvVYVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpe +f8vRUrNicRLc7dAcvfhtgt2DXEZ2d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvT +IIPQIjR8htFxGTz02STVXfnhnJ0Zk8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cY +OwIDAQAB +-----END PUBLIC KEY----- +""" + +PRIV_KEY2 = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAp+8cTxguO6Vg+YO92VfHgNld3Zy8aM3JbZvpJcjTnis+YFJ7 +Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvTsMBZWvmUoEVUj1Xg8XXQkBvb9Ozy +Gqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc2cKeCVvWFqDi0GRFGzyaXLaX3PPm +M7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbuT1OqDfufXWQl/82JXeiwU2cOpqWq +7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww3oJSwvMbAmgzvOhqqhlqv+K7u0u7 +FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQbQIDAQABAoIBAADrqWDQnd5DVZEA +lR+WINiWuHJAy/KaIC7K4kAMBgbxrz2ZbiY9Ok/zBk5fcnxIZDVtXd1sZicmPlro +GuWodIxdPZAnWpZ3UtOXUayZK/vCP1YsH1agmEqXuKsCu6Fc+K8VzReOHxLUkmXn +FYM+tixGahXcjEOi/aNNTWitEB6OemRM1UeLJFzRcfyXiqzHpHCIZwBpTUAsmzcG +QiVDkMTKubwo/m+PVXburX2CGibUydctgbrYIc7EJvyx/cpRiPZXo1PhHQWdu4Y1 +SOaC66WLsP/wqvtHo58JQ6EN/gjSsbAgGGVkZ1xMo66nR+pLpR27coS7o03xCks6 +DY/0mukCgYEAuLIGgBnqoh7YsOBLd/Bc1UTfDMxJhNseo+hZemtkSXz2Jn51322F +Zw/FVN4ArXgluH+XsOhvG/MFFpojwZSrb0Qq5b1MRdo9qycq8lGqNtlN1WHqosDQ +zW29kpL0tlRrSDpww3wRESsN9rH5XIrJ1b3ZXuO7asR+KBVQMy/+NcUCgYEA6MSC +c+fywltKPgmPl5j0DPoDe5SXE/6JQy7w/vVGrGfWGf/zEJmhzS2R+CcfTTEqaT0T +Yw8+XbFgKAqsxwtE9MUXLTVLI3sSUyE4g7blCYscOqhZ8ItCUKDXWkSpt++rG0Um +1+cEJP/0oCazG6MWqvBC4NpQ1nzh46QpjWqMwokCgYAKDLXJ1p8rvx3vUeUJW6zR +dfPlEGCXuAyMwqHLxXgpf4EtSwhC5gSyPOtx2LqUtcrnpRmt6JfTH4ARYMW9TMef +QEhNQ+WYj213mKP/l235mg1gJPnNbUxvQR9lkFV8bk+AGJ32JRQQqRUTbU+yN2MQ +HEptnVqfTp3GtJIultfwOQKBgG+RyYmu8wBP650izg33BXu21raEeYne5oIqXN+I +R5DZ0JjzwtkBGroTDrVoYyuH1nFNEh7YLqeQHqvyufBKKYo9cid8NQDTu+vWr5UK +tGvHnwdKrJmM1oN5JOAiq0r7+QMAOWchVy449VNSWWV03aeftB685iR5BXkstbIQ +EVopAoGAfcGBTAhmceK/4Q83H/FXBWy0PAa1kZGg/q8+Z0KY76AqyxOVl0/CU/rB +3tO3sKhaMTHPME/MiQjQQGoaK1JgPY6JHYvly2KomrJ8QTugqNGyMzdVJkXAK2AM +GAwC8ivAkHf8CHrHa1W7l8t2IqBjW1aRt7mOW92nfG88Hck0Mbo= +-----END RSA PRIVATE KEY----- +""" + + +PUB_KEY2 = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp+8cTxguO6Vg+YO92VfH +gNld3Zy8aM3JbZvpJcjTnis+YFJ7Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvT +sMBZWvmUoEVUj1Xg8XXQkBvb9OzyGqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc +2cKeCVvWFqDi0GRFGzyaXLaX3PPmM7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbu +T1OqDfufXWQl/82JXeiwU2cOpqWq7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww +3oJSwvMbAmgzvOhqqhlqv+K7u0u7FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQ +bQIDAQAB +-----END PUBLIC KEY----- +""" + + +def test_cryptical_dumps_no_nonce(): + master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string()) + data = {"foo": "bar"} + ret = master_crypt.dumps(data) + + # Validate message structure + assert isinstance(ret, bytes) + une = master_crypt.decrypt(ret) + une.startswith(master_crypt.PICKLE_PAD) + assert salt.payload.Serial({}).loads(une[len(master_crypt.PICKLE_PAD) :]) == data + + # Validate load back to orig data + assert master_crypt.loads(ret) == data + + +def test_cryptical_dumps_valid_nonce(): + nonce = uuid.uuid4().hex + master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string()) + data = {"foo": "bar"} + ret = master_crypt.dumps(data, nonce=nonce) + + assert isinstance(ret, bytes) + une = master_crypt.decrypt(ret) + une.startswith(master_crypt.PICKLE_PAD) + nonce_and_data = une[len(master_crypt.PICKLE_PAD) :] + assert nonce_and_data.startswith(nonce.encode()) + assert salt.payload.Serial({}).loads(nonce_and_data[len(nonce) :]) == data + + assert master_crypt.loads(ret, nonce=nonce) == data + + +def test_cryptical_dumps_invalid_nonce(): + nonce = uuid.uuid4().hex + master_crypt = salt.crypt.Crypticle({}, salt.crypt.Crypticle.generate_key_string()) + data = {"foo": "bar"} + ret = master_crypt.dumps(data, nonce=nonce) + assert isinstance(ret, bytes) + with pytest.raises(salt.crypt.SaltClientError, match="Nonce verification error"): + assert master_crypt.loads(ret, nonce="abcde") + + +def test_verify_signature(tmpdir): + tmpdir.join("foo.pem").write(PRIV_KEY.strip()) + tmpdir.join("foo.pub").write(PUB_KEY.strip()) + tmpdir.join("bar.pem").write(PRIV_KEY2.strip()) + tmpdir.join("bar.pub").write(PUB_KEY2.strip()) + msg = b"foo bar" + sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg) + assert salt.crypt.verify_signature(str(tmpdir.join("foo.pub")), msg, sig) + + +def test_verify_signature_bad_sig(tmpdir): + tmpdir.join("foo.pem").write(PRIV_KEY.strip()) + tmpdir.join("foo.pub").write(PUB_KEY.strip()) + tmpdir.join("bar.pem").write(PRIV_KEY2.strip()) + tmpdir.join("bar.pub").write(PUB_KEY2.strip()) + msg = b"foo bar" + sig = salt.crypt.sign_message(str(tmpdir.join("foo.pem")), msg) + assert not salt.crypt.verify_signature(str(tmpdir.join("bar.pub")), msg, sig) def test_get_rsa_pub_key_bad_key(tmp_path): """ diff --git a/tests/pytests/unit/transport/test_zeromq.py b/tests/pytests/unit/transport/test_zeromq.py new file mode 100644 index 0000000000..16c8f1aa49 --- /dev/null +++ b/tests/pytests/unit/transport/test_zeromq.py @@ -0,0 +1,1042 @@ +""" + :codeauthor: Thomas Jackson <jacksontj.89@gmail.com> +""" + +import ctypes +import logging +import multiprocessing +import os +import uuid + +import pytest +import salt.config +import salt.crypt +import salt.exceptions +import salt.ext.tornado.gen +import salt.ext.tornado.ioloop +import salt.log.setup +import salt.transport.client +import salt.transport.server +import salt.transport.zeromq +import salt.utils.platform +import salt.utils.process +import salt.utils.stringutils +from salt.master import SMaster +from tests.support.mock import MagicMock + +try: + from M2Crypto import RSA + + HAS_M2 = True +except ImportError: + HAS_M2 = False + try: + from Cryptodome.Cipher import PKCS1_OAEP + except ImportError: + from Crypto.Cipher import PKCS1_OAEP # nosec + +log = logging.getLogger(__name__) + +MASTER_PRIV_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAoAsMPt+4kuIG6vKyw9r3+OuZrVBee/2vDdVetW+Js5dTlgrJ +aghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLnyHNJ/HpVhMG0M07MF6FMfILtDrrt8 +ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+fu6HYwu96HggmG2pqkOrn3iGfqBvV +YVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpef8vRUrNicRLc7dAcvfhtgt2DXEZ2 +d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvTIIPQIjR8htFxGTz02STVXfnhnJ0Z +k8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cYOwIDAQABAoIBABZUJEO7Y91+UnfC +H6XKrZEZkcnH7j6/UIaOD9YhdyVKxhsnax1zh1S9vceNIgv5NltzIsfV6vrb6v2K +Dx/F7Z0O0zR5o+MlO8ZncjoNKskex10gBEWG00Uqz/WPlddiQ/TSMJTv3uCBAzp+ +S2Zjdb4wYPUlgzSgb2ygxrhsRahMcSMG9PoX6klxMXFKMD1JxiY8QfAHahPzQXy9 +F7COZ0fCVo6BE+MqNuQ8tZeIxu8mOULQCCkLFwXmkz1FpfK/kNRmhIyhxwvCS+z4 +JuErW3uXfE64RLERiLp1bSxlDdpvRO2R41HAoNELTsKXJOEt4JANRHm/CeyA5wsh +NpscufUCgYEAxhgPfcMDy2v3nL6KtkgYjdcOyRvsAF50QRbEa8ldO+87IoMDD/Oe +osFERJ5hhyyEO78QnaLVegnykiw5DWEF02RKMhD/4XU+1UYVhY0wJjKQIBadsufB +2dnaKjvwzUhPh5BrBqNHl/FXwNCRDiYqXa79eWCPC9OFbZcUWWq70s8CgYEAztOI +61zRfmXJ7f70GgYbHg+GA7IrsAcsGRITsFR82Ho0lqdFFCxz7oK8QfL6bwMCGKyk +nzk+twh6hhj5UNp18KN8wktlo02zTgzgemHwaLa2cd6xKgmAyuPiTgcgnzt5LVNG +FOjIWkLwSlpkDTl7ZzY2QSy7t+mq5d750fpIrtUCgYBWXZUbcpPL88WgDB7z/Bjg +dlvW6JqLSqMK4b8/cyp4AARbNp12LfQC55o5BIhm48y/M70tzRmfvIiKnEc/gwaE +NJx4mZrGFFURrR2i/Xx5mt/lbZbRsmN89JM+iKWjCpzJ8PgIi9Wh9DIbOZOUhKVB +9RJEAgo70LvCnPTdS0CaVwKBgDJW3BllAvw/rBFIH4OB/vGnF5gosmdqp3oGo1Ik +jipmPAx6895AH4tquIVYrUl9svHsezjhxvjnkGK5C115foEuWXw0u60uiTiy+6Pt +2IS0C93VNMulenpnUrppE7CN2iWFAiaura0CY9fE/lsVpYpucHAWgi32Kok+ZxGL +WEttAoGAN9Ehsz4LeQxEj3x8wVeEMHF6OsznpwYsI2oVh6VxpS4AjgKYqeLVcnNi +TlZFsuQcqgod8OgzA91tdB+Rp86NygmWD5WzeKXpCOg9uA+y/YL+0sgZZHsuvbK6 +PllUgXdYxqClk/hdBFB7v9AQoaj7K9Ga22v32msftYDQRJ94xOI= +-----END RSA PRIVATE KEY----- +""" + + +MASTER_PUB_KEY = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoAsMPt+4kuIG6vKyw9r3 ++OuZrVBee/2vDdVetW+Js5dTlgrJaghWWn3doGmKlEjqh7E4UTa+t2Jd6w8RSLny +HNJ/HpVhMG0M07MF6FMfILtDrrt8ZX7eDVt8sx5gCEpYI+XG8Y07Ga9i3Hiczt+f +u6HYwu96HggmG2pqkOrn3iGfqBvVYVFJzSZYe7e4c1PeEs0xYcrA4k+apyGsMtpe +f8vRUrNicRLc7dAcvfhtgt2DXEZ2d72t/CR4ygtUvPXzisaTPW0G7OWAheCloqvT +IIPQIjR8htFxGTz02STVXfnhnJ0Zk8KhqKF2v1SQvIYxsZU7jaDgl5i3zpeh58cY +OwIDAQAB +-----END PUBLIC KEY----- +""" + +MASTER2_PRIV_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAp+8cTxguO6Vg+YO92VfHgNld3Zy8aM3JbZvpJcjTnis+YFJ7 +Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvTsMBZWvmUoEVUj1Xg8XXQkBvb9Ozy +Gqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc2cKeCVvWFqDi0GRFGzyaXLaX3PPm +M7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbuT1OqDfufXWQl/82JXeiwU2cOpqWq +7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww3oJSwvMbAmgzvOhqqhlqv+K7u0u7 +FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQbQIDAQABAoIBAADrqWDQnd5DVZEA +lR+WINiWuHJAy/KaIC7K4kAMBgbxrz2ZbiY9Ok/zBk5fcnxIZDVtXd1sZicmPlro +GuWodIxdPZAnWpZ3UtOXUayZK/vCP1YsH1agmEqXuKsCu6Fc+K8VzReOHxLUkmXn +FYM+tixGahXcjEOi/aNNTWitEB6OemRM1UeLJFzRcfyXiqzHpHCIZwBpTUAsmzcG +QiVDkMTKubwo/m+PVXburX2CGibUydctgbrYIc7EJvyx/cpRiPZXo1PhHQWdu4Y1 +SOaC66WLsP/wqvtHo58JQ6EN/gjSsbAgGGVkZ1xMo66nR+pLpR27coS7o03xCks6 +DY/0mukCgYEAuLIGgBnqoh7YsOBLd/Bc1UTfDMxJhNseo+hZemtkSXz2Jn51322F +Zw/FVN4ArXgluH+XsOhvG/MFFpojwZSrb0Qq5b1MRdo9qycq8lGqNtlN1WHqosDQ +zW29kpL0tlRrSDpww3wRESsN9rH5XIrJ1b3ZXuO7asR+KBVQMy/+NcUCgYEA6MSC +c+fywltKPgmPl5j0DPoDe5SXE/6JQy7w/vVGrGfWGf/zEJmhzS2R+CcfTTEqaT0T +Yw8+XbFgKAqsxwtE9MUXLTVLI3sSUyE4g7blCYscOqhZ8ItCUKDXWkSpt++rG0Um +1+cEJP/0oCazG6MWqvBC4NpQ1nzh46QpjWqMwokCgYAKDLXJ1p8rvx3vUeUJW6zR +dfPlEGCXuAyMwqHLxXgpf4EtSwhC5gSyPOtx2LqUtcrnpRmt6JfTH4ARYMW9TMef +QEhNQ+WYj213mKP/l235mg1gJPnNbUxvQR9lkFV8bk+AGJ32JRQQqRUTbU+yN2MQ +HEptnVqfTp3GtJIultfwOQKBgG+RyYmu8wBP650izg33BXu21raEeYne5oIqXN+I +R5DZ0JjzwtkBGroTDrVoYyuH1nFNEh7YLqeQHqvyufBKKYo9cid8NQDTu+vWr5UK +tGvHnwdKrJmM1oN5JOAiq0r7+QMAOWchVy449VNSWWV03aeftB685iR5BXkstbIQ +EVopAoGAfcGBTAhmceK/4Q83H/FXBWy0PAa1kZGg/q8+Z0KY76AqyxOVl0/CU/rB +3tO3sKhaMTHPME/MiQjQQGoaK1JgPY6JHYvly2KomrJ8QTugqNGyMzdVJkXAK2AM +GAwC8ivAkHf8CHrHa1W7l8t2IqBjW1aRt7mOW92nfG88Hck0Mbo= +-----END RSA PRIVATE KEY----- +""" + + +MASTER2_PUB_KEY = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAp+8cTxguO6Vg+YO92VfH +gNld3Zy8aM3JbZvpJcjTnis+YFJ7Zlkcc647yPRRwY9nYBNywahnt5kIeuT1rTvT +sMBZWvmUoEVUj1Xg8XXQkBvb9OzyGqy/G/p8KDDpzMP/U+XCnUeHiXTZrgnqgBIc +2cKeCVvWFqDi0GRFGzyaXLaX3PPmM7DJ0MIPL1qgmcDq6+7Ze0gJ9SrDYFAeLmbu +T1OqDfufXWQl/82JXeiwU2cOpqWq7n5fvPOWim7l1tzQ+dSiMRRm0xa6uNexCJww +3oJSwvMbAmgzvOhqqhlqv+K7u0u7FrFFojESsL36Gq4GBrISnvu2tk7u4GGNTYYQ +bQIDAQAB +-----END PUBLIC KEY----- +""" + + +MASTER_SIGNING_PRIV = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAtieqrBMTM0MSIbhPKkDcozHqyXKyL/+bXYYw+iVPsns7c7bJ +zBqenLQlWoRVyrVyBFrrwQSrKu/0Mqn3l639iOGPlUoR3I7aZKIpyEdDkqd3xGIC +e+BtNNDqhUai67L63hEdG+iYAchi8UZw3LZGtcGpJ3FkBH4cYFX9EOam2QjbD7WY +EO7m1+j6XEYIOTCmAP9dGAvBbU0Jblc+wYxG3qNr+2dBWsK76QXWEqib2VSOGP+z +gjJa8tqY7PXXdOJpalQXNphmD/4o4pHKR4Euy0yL/1oMkpacmrV61LWB8Trnx9nS +9gdVrUteQF/cL1KAGwOsdVmiLpHfvqLLRqSAAQIDAQABAoIBABjB+HEN4Kixf4fk +wKHKEhL+SF6b/7sFX00NXZ/KLXRhSnnWSMQ8g/1hgMg2P2DfW4FbCDsCUu9xkLvI +HTZY+CJAIh9U42uaYPWXkt09TmJi76TZ+2Nx4/XvRUjbCm7Fs1I2ekHeUbbAUS5g ++BsPjTnL+h05zLHNoDa5yT0gVGIgFsQcX/w38arZCe8Rjp9le7PXUB5IIqASsDiw +t8zJvdyWToeXd0WswCHTQu5coHvKo5MCjIZZ1Ink1yJcCCc3rKDc+q3jB2z9T9oW +cUsKzJ4VuleiYj1eRxFITBmXbjKrb/GPRRUkeqCQbs68Hyj2d3UtOFDPeF4vng/3 +jGsHPq8CgYEA0AHAbwykVC6NMa37BTvEqcKoxbjTtErxR+yczlmVDfma9vkwtZvx +FJdbS/+WGA/ucDby5x5b2T5k1J9ueMR86xukb+HnyS0WKsZ94Ie8WnJAcbp+38M6 +7LD0u74Cgk93oagDAzUHqdLq9cXxv/ppBpxVB1Uvu8DfVMHj+wt6ie8CgYEA4C7u +u+6b8EmbGqEdtlPpScKG0WFstJEDGXRARDCRiVP2w6wm25v8UssCPvWcwf8U1Hoq +lhMY+H6a5dnRRiNYql1MGQAsqMi7VeJNYb0B1uxi7X8MPM+SvXoAglX7wm1z0cVy +O4CE5sEKbBg6aQabx1x9tzdrm80SKuSsLc5HRQ8CgYEAp/mCKSuQWNru8ruJBwTp +IB4upN1JOUN77ZVKW+lD0XFMjz1U9JPl77b65ziTQQM8jioRpkqB6cHVM088qxIh +vssn06Iex/s893YrmPKETJYPLMhqRNEn+JQ+To53ADykY0uGg0SD18SYMbmULHBP ++CKvF6jXT0vGDnA1ZzoxzskCgYEA2nQhYrRS9EVlhP93KpJ+A8gxA5tCCHo+YPFt +JoWFbCKLlYUNoHZR3IPCPoOsK0Zbj+kz0mXtsUf9vPkR+py669haLQqEejyQgFIz +QYiiYEKc6/0feapzvXtDP751w7JQaBtVAzJrT0jQ1SCO2oT8C7rPLlgs3fdpOq72 +MPSPcnUCgYBWHm6bn4HvaoUSr0v2hyD9fHZS/wDTnlXVe5c1XXgyKlJemo5dvycf +HUCmN/xIuO6AsiMdqIzv+arNJdboz+O+bNtS43LkTJfEH3xj2/DdUogdvOgG/iPM +u9KBT1h+euws7PqC5qt4vqLwCTTCZXmUS8Riv+62RCC3kZ5AbpT3ZA== +-----END RSA PRIVATE KEY----- +""" + +MASTER_SIGNING_PUB = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAtieqrBMTM0MSIbhPKkDc +ozHqyXKyL/+bXYYw+iVPsns7c7bJzBqenLQlWoRVyrVyBFrrwQSrKu/0Mqn3l639 +iOGPlUoR3I7aZKIpyEdDkqd3xGICe+BtNNDqhUai67L63hEdG+iYAchi8UZw3LZG +tcGpJ3FkBH4cYFX9EOam2QjbD7WYEO7m1+j6XEYIOTCmAP9dGAvBbU0Jblc+wYxG +3qNr+2dBWsK76QXWEqib2VSOGP+zgjJa8tqY7PXXdOJpalQXNphmD/4o4pHKR4Eu +y0yL/1oMkpacmrV61LWB8Trnx9nS9gdVrUteQF/cL1KAGwOsdVmiLpHfvqLLRqSA +AQIDAQAB +-----END PUBLIC KEY----- +""" + +MINION_PRIV_KEY = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEowIBAAKCAQEAsT6TwnlI0L7urjXu6D5E11tFJ/NglQ45jW/WN9tAUNvphq6Q +cjJCd/aWmdqlqe7ix8y9M/8rgwghRQsnPXblVBvPwFcUEXhMRnOGzqbq/0zyQX01 +KecT0plBhlDt2lTyCLU6E4XCqyLbPfOxgXzsVqM0/TnzRtpVvGNy+5N4eFGylrjb +cJhPxKt2G9TDOCM/hYacDs5RVIYQQmcYb8LJq7G3++FfWpYRDaxdKoHNFDspEynd +jzr67hgThnwzc388OKNJx/7B2atwPTunPb3YBjgwDyRO/01OKK4gUHdw5KoctFgp +kDCDjwjemlyXV+MYODRTIdtOlAP83ZkntEuLoQIDAQABAoIBAAJOKNtvFGfF2l9H +S4CXZSUGU0a+JaCkR+wmnjsPwPn/dXDpAe8nGpidpNicPWqRm6WABjeQHaxda+fB +lpSrRtEdo3zoi2957xQJ5wddDtI1pmXJQrdbm0H/K39oIg/Xtv/IZT769TM6OtVg +paUxG/aftmeGXDtGfIL8w1jkuPABRBLOakWQA9uVdeG19KTU0Ag8ilpJdEX64uFJ +W75bpVjT+KO/6aV1inuCntQSP097aYvUWajRwuiYVJOxoBZHme3IObcE6mdnYXeQ +wblyWBpJUHrOS4MP4HCODV2pHKZ2rr7Nwhh8lMNw/eY9OP0ifz2AcAqe3sUMQOKP +T0qRC6ECgYEAyeU5JvUPOpxXvvChYh6gJ8pYTIh1ueDP0O5e4t3vhz6lfy9DKtRN +ROJLUorHvw/yVXMR72nT07a0z2VswcrUSw8ov3sI53F0NkLGEafQ35lVhTGs4vTl +CFoQCuAKPsxeUl4AIbfbpkDsLGQqzW1diFArK7YeQkpGuGaGodXl480CgYEA4L40 +x5cUXnAhTPsybo7sbcpiwFHoGblmdkvpYvHA2QxtNSi2iHHdqGo8qP1YsZjKQn58 +371NhtqidrJ6i/8EBFP1dy+y/jr9qYlZNNGcQeBi+lshrEOIf1ct56KePG79s8lm +DmD1OY8tO2R37+Py46Nq1n6viT/ST4NjLQI3GyUCgYEAiOswSDA3ZLs0cqRD/gPg +/zsliLmehTFmHj4aEWcLkz+0Ar3tojUaNdX12QOPFQ7efH6uMhwl8NVeZ6xUBlTk +hgbAzqLE1hjGBCpiowSZDZqyOcMHiV8ll/VkHcv0hsQYT2m6UyOaDXTH9g70TB6Y +KOKddGZsvO4cad/1+/jQkB0CgYAzDEEkzLY9tS57M9uCrUgasAu6L2CO50PUvu1m +Ig9xvZbYqkS7vVFhva/FmrYYsOHQNLbcgz0m0mZwm52mSuh4qzFoPxdjE7cmWSJA +ExRxCiyxPR3q6PQKKJ0urgtPIs7RlX9u6KsKxfC6OtnbTWWQO0A7NE9e13ZHxUoz +oPsvWQKBgCa0+Fb2lzUeiQz9bV1CBkWneDZUXuZHmabAZomokX+h/bq+GcJFzZjW +3kAHwYkIy9IAy3SyO/6CP0V3vAye1p+XbotiwsQ/XZnr0pflSQL3J1l1CyN3aopg +Niv7k/zBn15B72aK73R/CpUSk9W/eJGqk1NcNwf8hJHsboRYx6BR +-----END RSA PRIVATE KEY----- +""" + + +MINION_PUB_KEY = """ +-----BEGIN PUBLIC KEY----- +MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAsT6TwnlI0L7urjXu6D5E +11tFJ/NglQ45jW/WN9tAUNvphq6QcjJCd/aWmdqlqe7ix8y9M/8rgwghRQsnPXbl +VBvPwFcUEXhMRnOGzqbq/0zyQX01KecT0plBhlDt2lTyCLU6E4XCqyLbPfOxgXzs +VqM0/TnzRtpVvGNy+5N4eFGylrjbcJhPxKt2G9TDOCM/hYacDs5RVIYQQmcYb8LJ +q7G3++FfWpYRDaxdKoHNFDspEyndjzr67hgThnwzc388OKNJx/7B2atwPTunPb3Y +BjgwDyRO/01OKK4gUHdw5KoctFgpkDCDjwjemlyXV+MYODRTIdtOlAP83ZkntEuL +oQIDAQAB +-----END PUBLIC KEY----- +""" + +AES_KEY = "8wxWlOaMMQ4d3yT74LL4+hGrGTf65w8VgrcNjLJeLRQ2Q6zMa8ItY2EQUgMKKDb7JY+RnPUxbB0=" + + +@pytest.fixture +def pki_dir(tmpdir): + madir = tmpdir.mkdir("master") + mapriv = madir.join("master.pem") + mapriv.write(MASTER_PRIV_KEY.strip()) + mapub = madir.join("master.pub") + mapub.write(MASTER_PUB_KEY.strip()) + + maspriv = madir.join("master_sign.pem") + maspriv.write(MASTER_SIGNING_PRIV.strip()) + maspub = madir.join("master_sign.pub") + maspub.write(MASTER_SIGNING_PUB.strip()) + + for sdir in [ + "minions_autosign", + "minions_denied", + "minions_pre", + "minions_rejected", + ]: + madir.mkdir(sdir) + + mipub = madir.mkdir("minions").join("minion") + mipub.write(MINION_PUB_KEY.strip()) + + midir = tmpdir.mkdir("minion") + mipub = midir.join("minion.pub") + mipub.write(MINION_PUB_KEY.strip()) + mipriv = midir.join("minion.pem") + mipriv.write(MINION_PRIV_KEY.strip()) + mimapriv = midir.join("minion_master.pub") + mimapriv.write(MASTER_PUB_KEY.strip()) + mimaspriv = midir.join("master_sign.pub") + mimaspriv.write(MASTER_SIGNING_PUB.strip()) + try: + yield tmpdir + finally: + tmpdir.remove() + + +def test_req_server_chan_encrypt_v2(pki_dir): + loop = salt.ext.tornado.ioloop.IOLoop.current() + opts = { + "worker_threads": 1, + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "zmq_monitor": False, + "mworker_queue_niceness": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("master")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + server = salt.transport.zeromq.ZeroMQReqServerChannel(opts) + dictkey = "pillar" + nonce = "abcdefg" + pillar_data = {"pillar1": "meh"} + ret = server._encrypt_private(pillar_data, dictkey, "minion", nonce) + assert "key" in ret + assert dictkey in ret + + key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None) + if HAS_M2: + aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(key) + aes = cipher.decrypt(ret["key"]) + pcrypt = salt.crypt.Crypticle(opts, aes) + signed_msg = pcrypt.loads(ret[dictkey]) + + assert "sig" in signed_msg + assert "data" in signed_msg + data = salt.payload.Serial({}).loads(signed_msg["data"]) + assert "key" in data + assert data["key"] == ret["key"] + assert "key" in data + assert data["nonce"] == nonce + assert "pillar" in data + assert data["pillar"] == pillar_data + + +def test_req_server_chan_encrypt_v1(pki_dir): + loop = salt.ext.tornado.ioloop.IOLoop.current() + opts = { + "worker_threads": 1, + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "zmq_monitor": False, + "mworker_queue_niceness": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("master")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + server = salt.transport.zeromq.ZeroMQReqServerChannel(opts) + dictkey = "pillar" + nonce = "abcdefg" + pillar_data = {"pillar1": "meh"} + ret = server._encrypt_private(pillar_data, dictkey, "minion", sign_messages=False) + + assert "key" in ret + assert dictkey in ret + + key = salt.crypt.get_rsa_key(str(pki_dir.join("minion", "minion.pem")), None) + if HAS_M2: + aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(key) + aes = cipher.decrypt(ret["key"]) + pcrypt = salt.crypt.Crypticle(opts, aes) + data = pcrypt.loads(ret[dictkey]) + assert data == pillar_data + + +def test_req_chan_decode_data_dict_entry_v1(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop) + dictkey = "pillar" + target = "minion" + pillar_data = {"pillar1": "meh"} + ret = server._encrypt_private(pillar_data, dictkey, target, sign_messages=False) + key = client.auth.get_keys() + if HAS_M2: + aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(key) + aes = cipher.decrypt(ret["key"]) + pcrypt = salt.crypt.Crypticle(client.opts, aes) + ret_pillar_data = pcrypt.loads(ret[dictkey]) + assert ret_pillar_data == pillar_data + + +async def test_req_chan_decode_data_dict_entry_v2(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop) + + dictkey = "pillar" + target = "minion" + pillar_data = {"pillar1": "meh"} + + # Mock auth and message client. + auth = client.auth + auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY) + client.auth = MagicMock() + client.auth.authenticated = True + client.auth.get_keys = auth.get_keys + client.auth.crypticle.dumps = auth.crypticle.dumps + client.auth.crypticle.loads = auth.crypticle.loads + client.message_client = MagicMock() + + @salt.ext.tornado.gen.coroutine + def mocksend(msg, timeout=60, tries=3): + client.message_client.msg = msg + load = client.auth.crypticle.loads(msg["load"]) + ret = server._encrypt_private( + pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True + ) + raise salt.ext.tornado.gen.Return(ret) + + client.message_client.send = mocksend + + # Note the 'ver' value in 'load' does not represent the the 'version' sent + # in the top level of the transport's message. + load = { + "id": target, + "grains": {}, + "saltenv": "base", + "pillarenv": "base", + "pillar_override": True, + "extra_minion_data": {}, + "ver": "2", + "cmd": "_pillar", + } + ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",) + assert "version" in client.message_client.msg + assert client.message_client.msg["version"] == 2 + assert ret == {"pillar1": "meh"} + + +async def test_req_chan_decode_data_dict_entry_v2_bad_nonce(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop) + + dictkey = "pillar" + badnonce = "abcdefg" + target = "minion" + pillar_data = {"pillar1": "meh"} + + # Mock auth and message client. + auth = client.auth + auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY) + client.auth = MagicMock() + client.auth.authenticated = True + client.auth.get_keys = auth.get_keys + client.auth.crypticle.dumps = auth.crypticle.dumps + client.auth.crypticle.loads = auth.crypticle.loads + client.message_client = MagicMock() + ret = server._encrypt_private( + pillar_data, dictkey, target, nonce=badnonce, sign_messages=True + ) + + @salt.ext.tornado.gen.coroutine + def mocksend(msg, timeout=60, tries=3): + client.message_client.msg = msg + raise salt.ext.tornado.gen.Return(ret) + + client.message_client.send = mocksend + + # Note the 'ver' value in 'load' does not represent the the 'version' sent + # in the top level of the transport's message. + load = { + "id": target, + "grains": {}, + "saltenv": "base", + "pillarenv": "base", + "pillar_override": True, + "extra_minion_data": {}, + "ver": "2", + "cmd": "_pillar", + } + + with pytest.raises(salt.crypt.AuthenticationError) as excinfo: + ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",) + assert "Pillar nonce verification failed." == excinfo.value.message + + +async def test_req_chan_decode_data_dict_entry_v2_bad_signature(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop) + + dictkey = "pillar" + badnonce = "abcdefg" + target = "minion" + pillar_data = {"pillar1": "meh"} + + # Mock auth and message client. + auth = client.auth + auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY) + client.auth = MagicMock() + client.auth.authenticated = True + client.auth.get_keys = auth.get_keys + client.auth.crypticle.dumps = auth.crypticle.dumps + client.auth.crypticle.loads = auth.crypticle.loads + client.message_client = MagicMock() + + @salt.ext.tornado.gen.coroutine + def mocksend(msg, timeout=60, tries=3): + client.message_client.msg = msg + load = client.auth.crypticle.loads(msg["load"]) + ret = server._encrypt_private( + pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True + ) + + key = client.auth.get_keys() + if HAS_M2: + aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(key) + aes = cipher.decrypt(ret["key"]) + pcrypt = salt.crypt.Crypticle(client.opts, aes) + signed_msg = pcrypt.loads(ret[dictkey]) + # Changing the pillar data will cause the signature verification to + # fail. + data = salt.payload.Serial({}).loads(signed_msg["data"]) + data["pillar"] = {"pillar1": "bar"} + signed_msg["data"] = salt.payload.Serial({}).dumps(data) + ret[dictkey] = pcrypt.dumps(signed_msg) + raise salt.ext.tornado.gen.Return(ret) + + client.message_client.send = mocksend + + # Note the 'ver' value in 'load' does not represent the the 'version' sent + # in the top level of the transport's message. + load = { + "id": target, + "grains": {}, + "saltenv": "base", + "pillarenv": "base", + "pillar_override": True, + "extra_minion_data": {}, + "ver": "2", + "cmd": "_pillar", + } + + with pytest.raises(salt.crypt.AuthenticationError) as excinfo: + ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",) + assert "Pillar payload signature failed to validate." == excinfo.value.message + + +async def test_req_chan_decode_data_dict_entry_v2_bad_key(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=mockloop) + + dictkey = "pillar" + badnonce = "abcdefg" + target = "minion" + pillar_data = {"pillar1": "meh"} + + # Mock auth and message client. + auth = client.auth + auth._crypticle = salt.crypt.Crypticle(opts, AES_KEY) + client.auth = MagicMock() + client.auth.authenticated = True + client.auth.get_keys = auth.get_keys + client.auth.crypticle.dumps = auth.crypticle.dumps + client.auth.crypticle.loads = auth.crypticle.loads + client.message_client = MagicMock() + + @salt.ext.tornado.gen.coroutine + def mocksend(msg, timeout=60, tries=3): + client.message_client.msg = msg + load = client.auth.crypticle.loads(msg["load"]) + ret = server._encrypt_private( + pillar_data, dictkey, target, nonce=load["nonce"], sign_messages=True + ) + + key = client.auth.get_keys() + if HAS_M2: + aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(key) + aes = cipher.decrypt(ret["key"]) + pcrypt = salt.crypt.Crypticle(client.opts, aes) + signed_msg = pcrypt.loads(ret[dictkey]) + + # Now encrypt with a different key + key = salt.crypt.Crypticle.generate_key_string() + pcrypt = salt.crypt.Crypticle(opts, key) + pubfn = os.path.join(master_opts["pki_dir"], "minions", "minion") + pub = salt.crypt.get_rsa_pub_key(pubfn) + ret[dictkey] = pcrypt.dumps(signed_msg) + key = salt.utils.stringutils.to_bytes(key) + if HAS_M2: + ret["key"] = pub.public_encrypt(key, RSA.pkcs1_oaep_padding) + else: + cipher = PKCS1_OAEP.new(pub) + ret["key"] = cipher.encrypt(key) + raise salt.ext.tornado.gen.Return(ret) + + client.message_client.send = mocksend + + # Note the 'ver' value in 'load' does not represent the the 'version' sent + # in the top level of the transport's message. + load = { + "id": target, + "grains": {}, + "saltenv": "base", + "pillarenv": "base", + "pillar_override": True, + "extra_minion_data": {}, + "ver": "2", + "cmd": "_pillar", + } + + with pytest.raises(salt.crypt.AuthenticationError) as excinfo: + ret = await client.crypted_transfer_decode_dictentry(load, dictkey="pillar",) + assert "Key verification failed." == excinfo.value.message + + +async def test_req_serv_auth_v1(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "master_sign_pubkey": False, + "publish_port": 4505, + "auth_mode": 1, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + + pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub"))) + token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()) + nonce = uuid.uuid4().hex + + # We need to read the public key with fopen otherwise the newlines might + # not match on windows. + with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp: + pub_key = fp.read() + + load = { + "cmd": "_auth", + "id": "minion", + "token": token, + "pub": pub_key, + } + ret = server._auth(load, sign_messages=False) + assert "load" not in ret + + +async def test_req_serv_auth_v2(pki_dir): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "master_sign_pubkey": False, + "publish_port": 4505, + "auth_mode": 1, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + + pub = salt.crypt.get_rsa_pub_key(str(pki_dir.join("minion", "minion.pub"))) + token = salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()) + nonce = uuid.uuid4().hex + + # We need to read the public key with fopen otherwise the newlines might + # not match on windows. + with salt.utils.files.fopen(str(pki_dir.join("minion", "minion.pub")), "r") as fp: + pub_key = fp.read() + + load = { + "cmd": "_auth", + "id": "minion", + "nonce": nonce, + "token": token, + "pub": pub_key, + } + ret = server._auth(load, sign_messages=True) + assert "sig" in ret + assert "load" in ret + + +async def test_req_chan_auth_v2(pki_dir, io_loop): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "publish_port": 4505, + "auth_mode": 1, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + master_opts["master_sign_pubkey"] = False + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + opts["verify_master_pubkey_sign"] = False + opts["always_verify_signature"] = False + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop) + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + assert "version" in pload + assert pload["version"] == 2 + + ret = server._auth(pload["load"], sign_messages=True) + assert "sig" in ret + ret = client.auth.handle_signin_response(signin_payload, ret) + assert "aes" in ret + assert "master_uri" in ret + assert "publish_port" in ret + + +async def test_req_chan_auth_v2_with_master_signing(pki_dir, io_loop): + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "publish_port": 4505, + "auth_mode": 1, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + master_opts["master_sign_pubkey"] = True + master_opts["master_use_pubkey_signature"] = False + master_opts["signing_key_pass"] = True + master_opts["master_sign_key_name"] = "master_sign" + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + opts["verify_master_pubkey_sign"] = True + opts["always_verify_signature"] = True + opts["master_sign_key_name"] = "master_sign" + opts["master"] = "master" + + assert ( + pki_dir.join("minion", "minion_master.pub").read() + == pki_dir.join("master", "master.pub").read() + ) + + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop) + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + assert "version" in pload + assert pload["version"] == 2 + + server_reply = server._auth(pload["load"], sign_messages=True) + # With version 2 we always get a clear signed response + assert "enc" in server_reply + assert server_reply["enc"] == "clear" + assert "sig" in server_reply + assert "load" in server_reply + ret = client.auth.handle_signin_response(signin_payload, server_reply) + assert "aes" in ret + assert "master_uri" in ret + assert "publish_port" in ret + + # Now create a new master key pair and try auth with it. + mapriv = pki_dir.join("master", "master.pem") + mapriv.remove() + mapriv.write(MASTER2_PRIV_KEY.strip()) + mapub = pki_dir.join("master", "master.pub") + mapub.remove() + mapub.write(MASTER2_PUB_KEY.strip()) + + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + server_reply = server._auth(pload["load"], sign_messages=True) + ret = client.auth.handle_signin_response(signin_payload, server_reply) + + assert "aes" in ret + assert "master_uri" in ret + assert "publish_port" in ret + + assert ( + pki_dir.join("minion", "minion_master.pub").read() + == pki_dir.join("master", "master.pub").read() + ) + + +async def test_req_chan_auth_v2_new_minion_with_master_pub(pki_dir, io_loop): + + pki_dir.join("master", "minions", "minion").remove() + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "publish_port": 4505, + "auth_mode": 1, + "acceptance_wait_time": 3, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + master_opts["master_sign_pubkey"] = False + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + opts["verify_master_pubkey_sign"] = False + opts["always_verify_signature"] = False + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop) + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + assert "version" in pload + assert pload["version"] == 2 + + ret = server._auth(pload["load"], sign_messages=True) + assert "sig" in ret + ret = client.auth.handle_signin_response(signin_payload, ret) + assert ret == "retry" + + +async def test_req_chan_auth_v2_new_minion_with_master_pub_bad_sig(pki_dir, io_loop): + + pki_dir.join("master", "minions", "minion").remove() + + # Give the master a different key than the minion has. + mapriv = pki_dir.join("master", "master.pem") + mapriv.remove() + mapriv.write(MASTER2_PRIV_KEY.strip()) + mapub = pki_dir.join("master", "master.pub") + mapub.remove() + mapub.write(MASTER2_PUB_KEY.strip()) + + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "publish_port": 4505, + "auth_mode": 1, + "acceptance_wait_time": 3, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + master_opts["master_sign_pubkey"] = False + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + opts["verify_master_pubkey_sign"] = False + opts["always_verify_signature"] = False + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop) + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + assert "version" in pload + assert pload["version"] == 2 + + ret = server._auth(pload["load"], sign_messages=True) + assert "sig" in ret + with pytest.raises(salt.crypt.SaltClientError, match="Invalid signature"): + ret = client.auth.handle_signin_response(signin_payload, ret) + + +async def test_req_chan_auth_v2_new_minion_without_master_pub(pki_dir, io_loop): + + pki_dir.join("master", "minions", "minion").remove() + pki_dir.join("minion", "minion_master.pub").remove() + mockloop = MagicMock() + opts = { + "master_uri": "tcp://127.0.0.1:4506", + "interface": "127.0.0.1", + "ret_port": 4506, + "ipv6": False, + "sock_dir": ".", + "pki_dir": str(pki_dir.join("minion")), + "id": "minion", + "__role": "minion", + "keysize": 4096, + "max_minions": 0, + "auto_accept": False, + "open_mode": False, + "key_pass": None, + "publish_port": 4505, + "auth_mode": 1, + "acceptance_wait_time": 3, + } + SMaster.secrets["aes"] = { + "secret": multiprocessing.Array( + ctypes.c_char, + salt.utils.stringutils.to_bytes(salt.crypt.Crypticle.generate_key_string()), + ), + "reload": salt.crypt.Crypticle.generate_key_string, + } + master_opts = dict(opts, pki_dir=str(pki_dir.join("master"))) + master_opts["master_sign_pubkey"] = False + server = salt.transport.zeromq.ZeroMQReqServerChannel(master_opts) + server.auto_key = salt.daemons.masterapi.AutoKey(server.opts) + server.cache_cli = False + server.master_key = salt.crypt.MasterKeys(server.opts) + opts["verify_master_pubkey_sign"] = False + opts["always_verify_signature"] = False + client = salt.transport.zeromq.AsyncZeroMQReqChannel(opts, io_loop=io_loop) + signin_payload = client.auth.minion_sign_in_payload() + pload = client._package_load(signin_payload) + assert "version" in pload + assert pload["version"] == 2 + + ret = server._auth(pload["load"], sign_messages=True) + assert "sig" in ret + ret = client.auth.handle_signin_response(signin_payload, ret) + assert ret == "retry" diff --git a/tests/unit/transport/mixins.py b/tests/unit/transport/mixins.py index 540cd33d73..a29182a3f3 100644 --- a/tests/unit/transport/mixins.py +++ b/tests/unit/transport/mixins.py @@ -9,6 +9,7 @@ import salt.transport.client # Import 3rd-party libs from salt.ext import six import salt.ext.tornado.gen +import salt.transport.client def run_loop_in_thread(loop, evt): @@ -30,8 +31,7 @@ def run_loop_in_thread(loop, evt): loop.close() -class ReqChannelMixin(object): - +class ReqChannelMixin: def test_basic(self): ''' Test a variety of messages, make sure we get the expected responses @@ -42,8 +42,8 @@ class ReqChannelMixin(object): {'baz': 'qux', 'list': [1, 2, 3]}, ] for msg in msgs: - ret = self.channel.send(msg, timeout=2, tries=1) - self.assertEqual(ret['load'], msg) + ret = self.channel.send(dict(msg), timeout=2, tries=1) + self.assertEqual(ret["load"], msg) def test_normalization(self): ''' @@ -57,7 +57,7 @@ class ReqChannelMixin(object): ] for msg in msgs: ret = self.channel.send(msg, timeout=2, tries=1) - for k, v in six.iteritems(ret['load']): + for k, v in ret["load"].items(): self.assertEqual(types[k], type(v)) def test_badload(self): @@ -70,7 +70,7 @@ class ReqChannelMixin(object): self.assertEqual(ret, 'payload and load must be a dict') -class PubChannelMixin(object): +class PubChannelMixin: def test_basic(self): self.pub = None diff --git a/tests/unit/transport/test_tcp.py b/tests/unit/transport/test_tcp.py index 6cc29e1414..edf3d9b12e 100644 --- a/tests/unit/transport/test_tcp.py +++ b/tests/unit/transport/test_tcp.py @@ -27,6 +27,13 @@ from salt.transport.tcp import SaltMessageClientPool, SaltMessageClient, TCPPubS # Import Salt Testing libs from tests.support.unit import TestCase, skipIf from tests.support.helpers import get_unused_localhost_port, flaky +from salt.ext.tornado.testing import AsyncTestCase, gen_test +from salt.transport.tcp import ( + SaltMessageClient, + SaltMessageClientPool, + TCPPubServerChannel, +) +from tests.support.helpers import flaky, slowTest from tests.support.mixins import AdaptedConfigurationTestCaseMixin from tests.support.mock import MagicMock, patch from tests.unit.transport.mixins import PubChannelMixin, ReqChannelMixin, run_loop_in_thread @@ -61,11 +68,13 @@ class BaseTCPReqCase(TestCase, AdaptedConfigurationTestCaseMixin): ) cls.minion_config = cls.get_temp_config( - 'minion', - **{'transport': 'tcp', - 'master_ip': '127.0.0.1', - 'master_port': ret_port, - 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)} + "minion", + **{ + "transport": "tcp", + "master_ip": "127.0.0.1", + "master_port": ret_port, + "master_uri": "tcp://127.0.0.1:{}".format(ret_port), + } ) cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager') @@ -174,12 +183,14 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin): ) cls.minion_config = cls.get_temp_config( - 'minion', - **{'transport': 'tcp', - 'master_ip': '127.0.0.1', - 'auth_timeout': 1, - 'master_port': ret_port, - 'master_uri': 'tcp://127.0.0.1:{0}'.format(ret_port)} + "minion", + **{ + "transport": "tcp", + "master_ip": "127.0.0.1", + "auth_timeout": 1, + "master_port": ret_port, + "master_uri": "tcp://127.0.0.1:{}".format(ret_port), + } ) cls.process_manager = salt.utils.process.ProcessManager(name='ReqServer_ProcessManager') @@ -216,17 +227,17 @@ class BaseTCPPubCase(AsyncTestCase, AdaptedConfigurationTestCaseMixin): del cls.req_server_channel def setUp(self): - super(BaseTCPPubCase, self).setUp() + super().setUp() self._start_handlers = dict(self.io_loop._handlers) def tearDown(self): - super(BaseTCPPubCase, self).tearDown() + super().tearDown() failures = [] - for k, v in six.iteritems(self.io_loop._handlers): + for k, v in self.io_loop._handlers.items(): if self._start_handlers.get(k) != v: failures.append((k, v)) if failures: - raise Exception('FDs still attached to the IOLoop: {0}'.format(failures)) + raise Exception("FDs still attached to the IOLoop: {}".format(failures)) del self.channel del self._start_handlers @@ -260,7 +271,7 @@ class AsyncPubChannelTest(BaseTCPPubCase, PubChannelMixin): class SaltMessageClientPoolTest(AsyncTestCase): def setUp(self): - super(SaltMessageClientPoolTest, self).setUp() + super().setUp() sock_pool_size = 5 with patch('salt.transport.tcp.SaltMessageClient.__init__', MagicMock(return_value=None)): self.message_client_pool = SaltMessageClientPool({'sock_pool_size': sock_pool_size}, @@ -271,7 +282,7 @@ class SaltMessageClientPoolTest(AsyncTestCase): def tearDown(self): with patch('salt.transport.tcp.SaltMessageClient.close', MagicMock(return_value=None)): del self.original_message_clients - super(SaltMessageClientPoolTest, self).tearDown() + super().tearDown() def test_send(self): for message_client_mock in self.message_client_pool.message_clients: @@ -384,66 +395,64 @@ class SaltMessageClientCleanupTest(TestCase, AdaptedConfigurationTestCaseMixin): class TCPPubServerChannelTest(TestCase, AdaptedConfigurationTestCaseMixin): - @patch('salt.master.SMaster.secrets') - @patch('salt.crypt.Crypticle') - @patch('salt.utils.asynchronous.SyncWrapper') - def test_publish_filtering(self, sync_wrapper, crypticle, secrets): - opts = self.get_temp_config('master') + @patch("salt.master.SMaster.secrets") + @patch("salt.crypt.Crypticle") + def test_publish_filtering(self, crypticle, secrets): + opts = self.get_temp_config("master") opts["sign_pub_messages"] = False channel = TCPPubServerChannel(opts) - wrap = MagicMock() crypt = MagicMock() crypt.dumps.return_value = {"test": "value"} secrets.return_value = {"aes": {"secret": None}} crypticle.return_value = crypt - sync_wrapper.return_value = wrap # try simple publish with glob tgt_type - channel.publish({"test": "value", "tgt_type": "glob", "tgt": "*"}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "glob", "tgt": "*"} + ) # verify we send it without any specific topic assert "topic_lst" not in payload # try simple publish with list tgt_type - channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": ["minion01"]} + ) # verify we send it with correct topic assert "topic_lst" in payload self.assertEqual(payload["topic_lst"], ["minion01"]) # try with syndic settings - opts['order_masters'] = True - channel.publish({"test": "value", "tgt_type": "list", "tgt": ["minion01"]}) - payload = wrap.send.call_args[0][0] + opts["order_masters"] = True + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": ["minion01"]} + ) # verify we send it without topic for syndics assert "topic_lst" not in payload - @patch('salt.utils.minions.CkMinions.check_minions') - @patch('salt.master.SMaster.secrets') - @patch('salt.crypt.Crypticle') - @patch('salt.utils.asynchronous.SyncWrapper') - def test_publish_filtering_str_list(self, sync_wrapper, crypticle, secrets, check_minions): - opts = self.get_temp_config('master') + @patch("salt.utils.minions.CkMinions.check_minions") + @patch("salt.master.SMaster.secrets") + @patch("salt.crypt.Crypticle") + def test_publish_filtering_str_list(self, crypticle, secrets, check_minions): + opts = self.get_temp_config("master") opts["sign_pub_messages"] = False channel = TCPPubServerChannel(opts) - wrap = MagicMock() crypt = MagicMock() crypt.dumps.return_value = {"test": "value"} secrets.return_value = {"aes": {"secret": None}} crypticle.return_value = crypt - sync_wrapper.return_value = wrap check_minions.return_value = {"minions": ["minion02"]} # try simple publish with list tgt_type - channel.publish({"test": "value", "tgt_type": "list", "tgt": "minion02"}) - payload = wrap.send.call_args[0][0] + payload = channel.pack_publish( + {"test": "value", "tgt_type": "list", "tgt": "minion02"} + ) # verify we send it with correct topic assert "topic_lst" in payload diff --git a/tests/unit/transport/test_zeromq.py b/tests/unit/transport/test_zeromq.py index a68de7c21c..38921465e7 100644 --- a/tests/unit/transport/test_zeromq.py +++ b/tests/unit/transport/test_zeromq.py @@ -400,6 +400,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): ctypes.c_char, six.b(salt.crypt.Crypticle.generate_key_string()), ), + "serial": multiprocessing.Value(ctypes.c_longlong, lock=False), } cls.minion_config = cls.get_temp_config( 'minion', @@ -448,10 +449,14 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): sock.connect(pub_uri) last_msg = time.time() serial = salt.payload.Serial(opts) - crypticle = salt.crypt.Crypticle(opts, salt.master.SMaster.secrets['aes']['secret'].value) + crypticle = salt.crypt.Crypticle( + opts, salt.master.SMaster.secrets["aes"]["secret"].value + ) + unpacker = salt.utils.msgpack.Unpacker() + stop = False while time.time() - last_msg < timeout: try: - payload = sock.recv(zmq.NOBLOCK) + wire_bytes = sock.recv(zmq.NOBLOCK) except zmq.ZMQError: time.sleep(.01) else: @@ -459,13 +464,21 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): if messages != 1: messages -= 1 continue - payload = crypticle.loads(serial.loads(payload)['load']) - if 'stop' in payload: - break - last_msg = time.time() - results.append(payload['jid']) - @skipIf(salt.utils.platform.is_windows(), 'Skip on Windows OS') + unpacker.feed(wire_bytes) + for w_payload in unpacker: + payload = crypticle.loads(w_payload[b"load"]) + if not payload: + continue + if "stop" in payload: + stop = True + break + last_msg = time.time() + results.append(payload["jid"]) + if stop: + break + + @slowTest def test_publish_to_pubserv_ipc(self): ''' Test sending 10K messags to ZeroMQPubServerChannel using IPC transport @@ -496,6 +509,7 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): server_channel.pub_close() assert len(results) == send_num, (len(results), set(expect).difference(results)) + @slowTest def test_zeromq_publish_port(self): ''' test when connecting that we @@ -576,13 +590,14 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): Test sending messags to publisher using UDP with zeromq_filtering enabled ''' - opts = dict(self.master_config, ipc_mode='ipc', - pub_hwm=0, zmq_filtering=True, acceptance_wait_time=5) - server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) - server_channel.pre_fork(self.process_manager, kwargs={ - 'log_queue': salt.log.setup.get_multiprocessing_logging_queue() - }) - pub_uri = 'tcp://{interface}:{publish_port}'.format(**server_channel.opts) + opts = dict( + self.master_config, + ipc_mode="ipc", + pub_hwm=0, + zmq_filtering=True, + acceptance_wait_time=5, + ) + pub_uri = "tcp://{interface}:{publish_port}".format(**opts) send_num = 1 expect = [] results = [] @@ -590,20 +605,47 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): args=(self.minion_config, pub_uri, results,), kwargs={'messages': 2}) gather.start() - # Allow time for server channel to start, especially on windows - time.sleep(2) - expect.append(send_num) - load = {'tgt_type': 'glob', 'tgt': '*', 'jid': send_num} - with patch('salt.utils.minions.CkMinions.check_minions', - MagicMock(return_value={'minions': ['minion'], 'missing': [], - 'ssh_minions': False})): - server_channel.publish(load) - server_channel.publish( - {'tgt_type': 'glob', 'tgt': '*', 'stop': True} - ) - gather.join() - server_channel.pub_close() - assert len(results) == send_num, (len(results), set(expect).difference(results)) + with patch( + "salt.utils.minions.CkMinions.check_minions", + MagicMock( + return_value={ + "minions": ["minion"], + "missing": [], + "ssh_minions": False, + } + ), + ): + # Allow time for server channel to start, especially on windows + time.sleep(2) + server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) + server_channel.pre_fork( + self.process_manager, + kwargs={ + "log_queue": salt.log.setup.get_multiprocessing_logging_queue() + }, + ) + time.sleep(2) + expect.append(send_num) + load = {"tgt_type": "glob", "tgt": "*", "jid": send_num} + with patch( + "salt.utils.minions.CkMinions.check_minions", + MagicMock( + return_value={ + "minions": ["minion"], + "missing": [], + "ssh_minions": False, + } + ), + ): + server_channel.publish(load) + server_channel.publish({"tgt_type": "glob", "tgt": "*", "stop": True}) + time.sleep(0.3) + server_channel.pub_close() + gather.join() + assert len(results) == send_num, ( + len(results), + set(expect).difference(results), + ) def test_publish_to_pubserv_tcp(self): ''' @@ -636,7 +678,8 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): for i in range(num): load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i)} server_channel.publish(load) - server_channel.close() + time.sleep(0.3) + server_channel.pub_close() @staticmethod def _send_large(opts, sid, num=10, size=250000 * 3): @@ -644,7 +687,8 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): for i in range(num): load = {'tgt_type': 'glob', 'tgt': '*', 'jid': '{}-{}'.format(sid, i), 'xdata': '0' * size} server_channel.publish(load) - server_channel.close() + time.sleep(0.3) + server_channel.pub_close() def test_issue_36469_tcp(self): ''' @@ -652,19 +696,23 @@ class PubServerChannel(TestCase, AdaptedConfigurationTestCaseMixin): https://github.com/saltstack/salt/issues/36469 ''' - opts = dict(self.master_config, ipc_mode='tcp', pub_hwm=0) - server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) - server_channel.pre_fork(self.process_manager, kwargs={ - 'log_queue': salt.log.setup.get_multiprocessing_logging_queue() - }) + opts = dict(self.master_config, ipc_mode="tcp", pub_hwm=0) send_num = 10 * 4 expect = [] results = [] pub_uri = 'tcp://{interface}:{publish_port}'.format(**opts) # Allow time for server channel to start, especially on windows - time.sleep(2) - gather = threading.Thread(target=self._gather_results, args=(self.minion_config, pub_uri, results,)) + gather = threading.Thread( + target=self._gather_results, args=(self.minion_config, pub_uri, results,) + ) gather.start() + time.sleep(2) + server_channel = salt.transport.zeromq.ZeroMQPubServerChannel(opts) + server_channel.pre_fork( + self.process_manager, + kwargs={"log_queue": salt.log.setup.get_multiprocessing_logging_queue()}, + ) + time.sleep(2) with ThreadPoolExecutor(max_workers=4) as executor: executor.submit(self._send_small, opts, 1) executor.submit(self._send_small, opts, 2) -- 2.35.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor