Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
SUSE:SLE-12:GA
pssh
0005-Add-an-explicit-API-entrypoint.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0005-Add-an-explicit-API-entrypoint.patch of Package pssh
From 90fb623eac7451c60623125f1087c4b80097d276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristoffer=20Gr=C3=B6nlund?= <krig@koru.se> Date: Tue, 7 Jan 2014 16:30:48 +0100 Subject: [PATCH 5/8] Add an explicit API entrypoint api.py has convencience wrappers for call, copy and slurp. These are easy-to-use programmatic versions of pssh, pscp and pslurp. --- psshlib/api.py | 332 +++++++++++++++++++++++++++++++++++++++++++++++++++ psshlib/callbacks.py | 59 +++++++++ psshlib/cli.py | 6 +- psshlib/manager.py | 38 ++++-- psshlib/task.py | 96 +++++++-------- test/api.py | 76 ++++++++++++ 6 files changed, 538 insertions(+), 69 deletions(-) create mode 100644 psshlib/api.py create mode 100644 psshlib/callbacks.py create mode 100644 test/api.py diff --git a/psshlib/api.py b/psshlib/api.py new file mode 100644 index 000000000000..085d83fc5b7c --- /dev/null +++ b/psshlib/api.py @@ -0,0 +1,332 @@ +# Copyright (c) 2013, Kristoffer Gronlund +# +# psshlib API +# +# Exposes an API for performing +# parallel SSH operations +# +# Three commands are supplied: +# +# call(hosts, cmdline, opts) +# +# copy(hosts, src, dst, opts) +# +# slurp(hosts, src, dst, opts) +# +# call returns {host: (rc, stdout, stdin) | error} +# copy returns {host: path | error} +# slurp returns {host: path | error} +# +# error is an error object which has an error message (or more) +# +# opts is command line options as given to pssh/pscp/pslurp +# +# call: Executes the given command on a set of hosts, collecting the output +# copy: Copies files from the local machine to a set of remote hosts +# slurp: Copies files from a set of remote hosts to local folders + +import os +import sys +from psshlib.cli import DEFAULT_PARALLELISM, DEFAULT_TIMEOUT +from psshlib.manager import Manager, FatalError +from psshlib.task import Task + + +class Error(object): + """ + Returned instead of a result for a host + in case of an error during the processing for + that host. + """ + def __init__(self, msg, task): + self.msg = msg + self.task = task + + def __str__(self): + if self.task and self.task.errorbuffer: + return "%s, Error output: %s" % (self.msg, + self.task.errorbuffer) + return self.msg + + +class Options(object): + """ + Common options for call, copy and slurp. + """ + limit = DEFAULT_PARALLELISM # Max number of parallel threads + timeout = DEFAULT_TIMEOUT # Timeout in seconds + askpass = False # Ask for a password + outdir = None # Write stdout to a file per host in this directory + errdir = None # Write stderr to a file per host in this directory + ssh_options = [] # Extra options to pass to SSH + ssh_extra = [] # Extra arguments to pass to SSH + verbose = False # Warning and diagnostic messages + quiet = False # Silence extra output + print_out = False # Print output to stdout when received + inline = True # Store stdout and stderr in memory buffers + inline_stdout = False # Store stdout in memory buffer + input_stream = None # Stream to read stdin from + default_user = None # User to connect as (unless overridden per host) + recursive = True # (copy, slurp only) Copy recursively + localdir = None # (slurp only) Local base directory to copy to + + +def _expand_host_port_user(lst): + """ + Input: list containing hostnames, (host, port)-tuples or (host, port, user)-tuples. + Output: list of (host, port, user)-tuples. + """ + def expand(v): + if isinstance(v, basestring): + return (v, None, None) + elif len(v) == 1: + return (v[0], None, None) + elif len(v) == 2: + return (v[0], v[1], None) + else: + return v + return [expand(x) for x in lst] + + +class _CallOutputBuilder(object): + def __init__(self): + self.finished_tasks = [] + + def finished(self, task, n): + """Called when Task is complete""" + self.finished_tasks.append(task) + + def result(self, manager): + """Called when all Tasks are complete to generate result""" + ret = {} + for task in self.finished_tasks: + if task.failures: + ret[task.host] = Error(', '.join(task.failures), task) + else: + ret[task.host] = (task.exitstatus, + task.outputbuffer or manager.outdir, + task.errorbuffer or manager.errdir) + return ret + + +def _build_call_cmd(host, port, user, cmdline, options, extra): + cmd = ['ssh', host, + '-o', 'NumberOfPasswordPrompts=1', + '-o', 'SendEnv=PSSH_NODENUM PSSH_HOST'] + if options: + for opt in options: + cmd += ['-o', opt] + if user: + cmd += ['-l', user] + if port: + cmd += ['-p', port] + if extra: + cmd.extend(extra) + if cmdline: + cmd.append(cmdline) + return cmd + + +def call(hosts, cmdline, opts=Options()): + """ + Executes the given command on a set of hosts, collecting the output + Returns {host: (rc, stdout, stdin) | Error} + """ + if opts.outdir and not os.path.exists(opts.outdir): + os.makedirs(opts.outdir) + if opts.errdir and not os.path.exists(opts.errdir): + os.makedirs(opts.errdir) + manager = Manager(limit=opts.limit, + timeout=opts.timeout, + askpass=opts.askpass, + outdir=opts.outdir, + errdir=opts.errdir, + callbacks=_CallOutputBuilder()) + for host, port, user in _expand_host_port_user(hosts): + cmd = _build_call_cmd(host, port, user, cmdline, + options=opts.ssh_options, + extra=opts.ssh_extra) + t = Task(host, port, user, cmd, + stdin=opts.input_stream, + verbose=opts.verbose, + quiet=opts.quiet, + print_out=opts.print_out, + inline=opts.inline, + inline_stdout=opts.inline_stdout, + default_user=opts.default_user) + manager.add_task(t) + try: + return manager.run() + except FatalError: + sys.exit(1) + + +class _CopyOutputBuilder(object): + def __init__(self): + self.finished_tasks = [] + + def finished(self, task, n): + self.finished_tasks.append(task) + + def result(self, manager): + ret = {} + for task in self.finished_tasks: + if task.failures: + ret[task.host] = Error(', '.join(task.failures), task) + else: + ret[task.host] = (task.exitstatus, + task.outputbuffer or manager.outdir, + task.errorbuffer or manager.errdir) + return ret + + +def _build_copy_cmd(host, port, user, src, dst, opts): + cmd = ['scp', '-qC'] + if opts.ssh_options: + for opt in opts.ssh_options: + cmd += ['-o', opt] + if port: + cmd += ['-P', port] + if opts.recursive: + cmd.append('-r') + if opts.ssh_extra: + cmd.extend(opts.ssh_extra) + cmd.append(src) + if user: + cmd.append('%s@%s:%s' % (user, host, dst)) + else: + cmd.append('%s:%s' % (host, dst)) + return cmd + + +def copy(hosts, src, dst, opts=Options()): + """ + Copies from the local node to a set of remote hosts + hosts: [(host, port, user)...] + src: local path + dst: remote path + opts: CopyOptions (optional) + Returns {host: (rc, stdout, stdin) | Error} + """ + if opts.outdir and not os.path.exists(opts.outdir): + os.makedirs(opts.outdir) + if opts.errdir and not os.path.exists(opts.errdir): + os.makedirs(opts.errdir) + manager = Manager(limit=opts.limit, + timeout=opts.timeout, + askpass=opts.askpass, + outdir=opts.outdir, + errdir=opts.errdir, + callbacks=_CopyOutputBuilder()) + for host, port, user in _expand_host_port_user(hosts): + cmd = _build_copy_cmd(host, port, user, src, dst, opts) + t = Task(host, port, user, cmd, + stdin=opts.input_stream, + verbose=opts.verbose, + quiet=opts.quiet, + print_out=opts.print_out, + inline=opts.inline, + inline_stdout=opts.inline_stdout, + default_user=opts.default_user) + manager.add_task(t) + try: + return manager.run() + except FatalError: + sys.exit(1) + + +class _SlurpOutputBuilder(object): + def __init__(self, localdirs): + self.finished_tasks = [] + self.localdirs = localdirs + + def finished(self, task, n): + self.finished_tasks.append(task) + + def result(self, manager): + ret = {} + for task in self.finished_tasks: + if task.failures: + ret[task.host] = Error(', '.join(task.failures), task) + else: + # TODO: save name of output file in Task + ret[task.host] = (task.exitstatus, + task.outputbuffer or manager.outdir, + task.errorbuffer or manager.errdir, + self.localdirs.get(task.host, None) +) + return ret + + +def _slurp_make_local_dirs(hosts, dst, opts): + if opts.localdir and not os.path.exists(opts.localdir): + os.makedirs(opts.localdir) + localdirs = {} + for host, port, user in _expand_host_port_user(hosts): + if opts.localdir: + dirname = os.path.join(opts.localdir, host) + else: + dirname = host + if not os.path.exists(dirname): + os.makedirs(dirname) + localdirs[host] = os.path.join(dirname, dst) + return localdirs + + +def _build_slurp_cmd(host, port, user, src, dst, opts): + cmd = ['scp', '-qC'] + if opts.ssh_options: + for opt in opts.ssh_options: + cmd += ['-o', opt] + if port: + cmd += ['-P', port] + if opts.recursive: + cmd.append('-r') + if opts.ssh_extra: + cmd.extend(opts.ssh_extra) + if user: + cmd.append('%s@%s:%s' % (user, host, src)) + else: + cmd.append('%s:%s' % (host, src)) + cmd.append(dst) + return cmd + + +def slurp(hosts, src, dst, opts=Options()): + """ + Copies from the remote node to the local node + hosts: [(host, port, user)...] + src: remote path + dst: local path + opts: CopyOptions (optional) + Returns {host: (rc, stdout, stdin, localpath) | Error} + """ + if os.path.isabs(dst): + raise ValueError("slurp: Destination must be a relative path") + localdirs = _slurp_make_local_dirs(hosts, dst, opts) + if opts.outdir and not os.path.exists(opts.outdir): + os.makedirs(opts.outdir) + if opts.errdir and not os.path.exists(opts.errdir): + os.makedirs(opts.errdir) + manager = Manager(limit=opts.limit, + timeout=opts.timeout, + askpass=opts.askpass, + outdir=opts.outdir, + errdir=opts.errdir, + callbacks=_SlurpOutputBuilder(localdirs)) + for host, port, user in _expand_host_port_user(hosts): + localpath = localdirs[host] + cmd = _build_slurp_cmd(host, port, user, src, localpath, opts) + t = Task(host, port, user, cmd, + stdin=opts.input_stream, + verbose=opts.verbose, + quiet=opts.quiet, + print_out=opts.print_out, + inline=opts.inline, + inline_stdout=opts.inline_stdout, + default_user=opts.default_user) + manager.add_task(t) + try: + return manager.run() + except FatalError: + sys.exit(1) diff --git a/psshlib/callbacks.py b/psshlib/callbacks.py new file mode 100644 index 000000000000..a00126ba4460 --- /dev/null +++ b/psshlib/callbacks.py @@ -0,0 +1,59 @@ +# Copyright (c) 2009-2012, Andrew McNabb +# Copyright (c) 2013, Kristoffer Gronlund + +import sys +import time + +from psshlib import color + + +class DefaultCallbacks(object): + """ + Passed to the Manager and called when events occur. + """ + def finished(self, task, n): + """Pretty prints a status report after the Task completes. + task: a Task object + n: Index in sequence of completed tasks. + """ + error = ', '.join(task.failures) + tstamp = time.asctime().split()[3] # Current time + if color.has_colors(sys.stdout): + progress = color.c("[%s]" % color.B(n)) + success = color.g("[%s]" % color.B("SUCCESS")) + failure = color.r("[%s]" % color.B("FAILURE")) + stderr = color.r("Stderr: ") + error = color.r(color.B(error)) + else: + progress = "[%s]" % n + success = "[SUCCESS]" + failure = "[FAILURE]" + stderr = "Stderr: " + host = task.pretty_host + if task.failures: + print(' '.join((progress, tstamp, failure, host, error))) + else: + print(' '.join((progress, tstamp, success, host))) + # NOTE: The extra flushes are to ensure that the data is output in + # the correct order with the C implementation of io. + if task.outputbuffer: + sys.stdout.flush() + try: + sys.stdout.buffer.write(task.outputbuffer) + sys.stdout.flush() + except AttributeError: + sys.stdout.write(task.outputbuffer) + if task.errorbuffer: + sys.stdout.write(stderr) + # Flush the TextIOWrapper before writing to the binary buffer. + sys.stdout.flush() + try: + sys.stdout.buffer.write(task.errorbuffer) + except AttributeError: + sys.stdout.write(task.errorbuffer) + + def result(self, manager): + """ + When all Tasks are completed, generate a result to return. + """ + return [task.exitstatus for task in manager.save_tasks if task in manager.done] diff --git a/psshlib/cli.py b/psshlib/cli.py index 58bd9332ce3d..611fadb6eb16 100644 --- a/psshlib/cli.py +++ b/psshlib/cli.py @@ -9,8 +9,8 @@ import textwrap from psshlib import version -_DEFAULT_PARALLELISM = 32 -_DEFAULT_TIMEOUT = 0 # "infinity" by default +DEFAULT_PARALLELISM = 32 +DEFAULT_TIMEOUT = 0 # "infinity" by default def common_parser(): @@ -60,7 +60,7 @@ def common_parser(): def common_defaults(**kwargs): - defaults = dict(par=_DEFAULT_PARALLELISM, timeout=_DEFAULT_TIMEOUT) + defaults = dict(par=DEFAULT_PARALLELISM, timeout=DEFAULT_TIMEOUT) defaults.update(**kwargs) envvars = [('user', 'PSSH_USER'), ('par', 'PSSH_PAR'), diff --git a/psshlib/manager.py b/psshlib/manager.py index 06e1cf31a010..3a72a07005bf 100644 --- a/psshlib/manager.py +++ b/psshlib/manager.py @@ -1,4 +1,5 @@ # Copyright (c) 2009-2012, Andrew McNabb +# Copyright (c) 2013, Kristoffer Gronlund from errno import EINTR import os @@ -15,6 +16,8 @@ except ImportError: from psshlib.askpass_server import PasswordServer from psshlib import psshutil +from psshlib.cli import DEFAULT_PARALLELISM, DEFAULT_TIMEOUT +from psshlib.callbacks import DefaultCallbacks READ_SIZE = 1 << 16 @@ -34,13 +37,29 @@ class Manager(object): limit: Maximum number of commands running at once. timeout: Maximum allowed execution time in seconds. """ - def __init__(self, opts): - self.limit = opts.par - self.timeout = opts.timeout - self.askpass = opts.askpass - self.outdir = opts.outdir - self.errdir = opts.errdir + def __init__(self, + limit=DEFAULT_PARALLELISM, + timeout=DEFAULT_TIMEOUT, + askpass=False, + outdir=None, + errdir=None, + callbacks=DefaultCallbacks()): + # Backwards compatibility with old __init__ + # format: Only argument is an options dict + if not isinstance(limit, int): + self.limit = limit.par + self.timeout = limit.timeout + self.askpass = limit.askpass + self.outdir = limit.outdir + self.errdir = limit.errdir + else: + self.limit = limit + self.timeout = timeout + self.askpass = askpass + self.outdir = outdir + self.errdir = errdir self.iomap = make_iomap() + self.callbacks = callbacks self.taskcount = 0 self.tasks = [] @@ -91,8 +110,7 @@ class Manager(object): writer.signal_quit() writer.join() - statuses = [task.exitstatus for task in self.save_tasks if task in self.done] - return statuses + return self.callbacks.result(self) def clear_sigchld_handler(self): signal.signal(signal.SIGCHLD, signal.SIG_DFL) @@ -195,10 +213,10 @@ class Manager(object): self.finished(task) def finished(self, task): - """Marks a task as complete and reports its status to stdout.""" + """Marks a task as complete and reports its status as finished.""" self.done.append(task) n = len(self.done) - task.report(n) + self.callbacks.finished(task, n) class IOMap(object): diff --git a/psshlib/task.py b/psshlib/task.py index 0c74db517ae4..c17cc96b986e 100644 --- a/psshlib/task.py +++ b/psshlib/task.py @@ -1,4 +1,5 @@ # Copyright (c) 2009-2012, Andrew McNabb +# Copyright (c) 2013, Kristoffer Gronlund from errno import EINTR from subprocess import Popen, PIPE @@ -9,7 +10,6 @@ import time import traceback from psshlib import askpass_client -from psshlib import color BUFFER_SIZE = 1 << 16 @@ -25,7 +25,38 @@ class Task(object): Upon completion, the `exitstatus` attribute is set to the exit status of the process. """ - def __init__(self, host, port, user, cmd, opts, stdin=None): + def __init__(self, + host, + port, + user, + cmd, + verbose=False, + quiet=False, + stdin=None, + print_out=False, + inline=False, + inline_stdout=False, + default_user=None): + + # Backwards compatibility: + if not isinstance(verbose, bool): + opts = verbose + verbose = opts.verbose + quiet = opts.quiet + try: + print_out = bool(opts.print_out) + except AttributeError: + print_out = False + try: + inline = bool(opts.inline) + except AttributeError: + inline = False + try: + inline_stdout = bool(opts.inline_stdout) + except AttributeError: + inline_stdout = False + default_user = opts.user + self.exitstatus = None self.host = host @@ -33,7 +64,7 @@ class Task(object): self.port = port self.cmd = cmd - if user != opts.user: + if user and user != default_user: self.pretty_host = '@'.join((user, self.pretty_host)) if port: self.pretty_host = ':'.join((self.pretty_host, port)) @@ -55,20 +86,11 @@ class Task(object): self.errfile = None # Set options. - self.verbose = opts.verbose - self.quiet = opts.quiet - try: - self.print_out = bool(opts.print_out) - except AttributeError: - self.print_out = False - try: - self.inline = bool(opts.inline) - except AttributeError: - self.inline = False - try: - self.inline_stdout = bool(opts.inline_stdout) - except AttributeError: - self.inline_stdout = False + self.verbose = verbose + self.quiet = quiet + self.print_out = print_out + self.inline = inline + self.inline_stdout = inline_stdout def start(self, nodenum, iomap, writer, askpass_socket=None): """Starts the process and registers files with the IOMap.""" @@ -252,43 +274,5 @@ class Task(object): exc = str(e) self.failures.append(exc) - def report(self, n): - """Pretty prints a status report after the Task completes.""" - error = ', '.join(self.failures) - tstamp = time.asctime().split()[3] # Current time - if color.has_colors(sys.stdout): - progress = color.c("[%s]" % color.B(n)) - success = color.g("[%s]" % color.B("SUCCESS")) - failure = color.r("[%s]" % color.B("FAILURE")) - stderr = color.r("Stderr: ") - error = color.r(color.B(error)) - else: - progress = "[%s]" % n - success = "[SUCCESS]" - failure = "[FAILURE]" - stderr = "Stderr: " - host = self.pretty_host - if not self.quiet: - if self.failures: - print(' '.join((progress, tstamp, failure, host, error))) - else: - print(' '.join((progress, tstamp, success, host))) - # NOTE: The extra flushes are to ensure that the data is output in - # the correct order with the C implementation of io. - if self.outputbuffer: - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.outputbuffer) - sys.stdout.flush() - except AttributeError: - sys.stdout.write(self.outputbuffer) - if self.errorbuffer: - sys.stdout.write(stderr) - # Flush the TextIOWrapper before writing to the binary buffer. - sys.stdout.flush() - try: - sys.stdout.buffer.write(self.errorbuffer) - except AttributeError: - sys.stdout.write(self.errorbuffer) - # vim:ts=4:sw=4:et: + diff --git a/test/api.py b/test/api.py new file mode 100644 index 000000000000..092a75ce97d0 --- /dev/null +++ b/test/api.py @@ -0,0 +1,76 @@ +#!/usr/bin/python + +# Copyright (c) 2013, Kristoffer Gronlund + +import os +import sys +import unittest +import tempfile +import shutil + +basedir, bin = os.path.split(os.path.dirname(os.path.abspath(sys.argv[0]))) +sys.path.insert(0, "%s" % basedir) + +print basedir + +from psshlib import api as pssh + +if os.getenv("TEST_HOSTS") is None: + raise Exception("Must define TEST_HOSTS") +g_hosts = os.getenv("TEST_HOSTS").split() + +if os.getenv("TEST_USER") is None: + raise Exception("Must define TEST_USER") +g_user = os.getenv("TEST_USER") + +class CallTest(unittest.TestCase): + def testSimpleCall(self): + opts = pssh.Options() + opts.default_user = g_user + for host, result in pssh.call(g_hosts, "ls -l /", opts).iteritems(): + rc, out, err = result + self.assertEqual(rc, 0) + self.assert_(len(out) > 0) + + def testUptime(self): + opts = pssh.Options() + opts.default_user = g_user + for host, result in pssh.call(g_hosts, "uptime", opts).iteritems(): + rc, out, err = result + self.assertEqual(rc, 0) + self.assert_(out.find("load average") != -1) + + def testFailingCall(self): + opts = pssh.Options() + opts.default_user = g_user + for host, result in pssh.call(g_hosts, "touch /foofoo/barbar/jfikjfdj", opts).iteritems(): + self.assert_(isinstance(result, pssh.Error)) + self.assert_(str(result).find('with error code') != -1) + +class CopySlurpTest(unittest.TestCase): + def setUp(self): + self.tmpDir = tempfile.mkdtemp() + + def tearDown(self): + shutil.rmtree(self.tmpDir) + + def testCopyFile(self): + opts = pssh.Options() + opts.default_user = g_user + opts.localdir = self.tmpDir + by_host = pssh.copy(g_hosts, "/etc/hosts", "/tmp/pssh.test", opts) + for host, result in by_host.iteritems(): + rc, _, _ = result + self.assertEqual(rc, 0) + + by_host = pssh.slurp(g_hosts, "/tmp/pssh.test", "pssh.test", opts) + for host, result in by_host.iteritems(): + rc, _, _, path = result + self.assertEqual(rc, 0) + self.assert_(path.endswith('%s/pssh.test' % (host))) + +if __name__ == '__main__': + suite = unittest.TestSuite() + suite.addTest(unittest.makeSuite(CallTest, "test")) + suite.addTest(unittest.makeSuite(CopySlurpTest, "test")) + unittest.TextTestRunner().run(suite) -- 1.8.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor