Skip to content

Commit f1a6dfe

Browse files
author
Dan
committed
Refactored API - added run_command as the simplest method of using the library. Output is returned as generators by default. Added tests for new functions. Resolves #14
1 parent a410fc7 commit f1a6dfe

File tree

3 files changed

+150
-34
lines changed

3 files changed

+150
-34
lines changed

README.rst

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ Run `ls` on two remote hosts in parallel.
4242
>>> from pssh import ParallelSSHClient
4343
>>> hosts = ['myhost1', 'myhost2']
4444
>>> client = ParallelSSHClient(hosts)
45-
>>> cmds = client.exec_command('ls -ltrh /tmp/aasdfasdf', sudo = True)
46-
>>> print [client.get_stdout(cmd) for cmd in cmds]
45+
>>> output = client.run_command('ls -ltrh /tmp/aasdfasdf', sudo=True)
46+
>>> for host in output: print output
4747
[localhost] drwxr-xr-x 6 xxx xxx 4.0K Jan 1 00:00 xxx
48-
[{'localhost': {'exit_code': 0}}]
48+
{'localhost': {'exit_code': 0, 'stdout': <generator>, 'stderr': <generator>, 'channel': channel}}
4949

5050
**************************
5151
Frequently asked questions
@@ -84,6 +84,14 @@ Frequently asked questions
8484
>>> my_key = paramiko.RSAKey.from_private_key_file(my_rsa_key)
8585
>>> client = ParallelSSHClient(pkey=my_key)
8686

87+
:Q:
88+
Why should I use this module and not, for example, `fabric <https://github.com/fabric/fabric>`_?
89+
90+
:A:
91+
Fabric is a port of `capistrano <https://github.com/capistrano/capistrano>`_ from ruby to python. Its design goals are to provide a faithful port of capistrano with capistrano's `tasks` and `roles` to python with interactive command line being the intended usage - its use as a library is non-standard and in many cases just plain broken.
92+
Furthermore, its parallel commands use a combination of both threads and processes with extremely high CPU usage while its running. Fabric currently stands at more than 130,000 lines of code, a large proportion of which is untested, particularly if used as a library as opposed to less than 700 currently in `ParallelSSH` with over 70% code test coverage.
93+
ParallelSSH's design goals are to provide a *library* for running *asynchronous* SSH commands with **minimal** load induced on the system by doing so with the inteded usage being completely programmatic and non-interactive - Fabric provides none of these goals.
94+
8795
********
8896
SFTP/SCP
8997
********

pssh.py

Lines changed: 97 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
See :mod:`pssh.ParallelSSHClient` and :mod:`pssh.SSHClient` for class documentation.
2929
"""
3030

31+
import warnings
3132
from socket import gaierror as sock_gaierror, error as sock_error
3233
import logging
3334
import paramiko
@@ -36,7 +37,7 @@
3637
from gevent import monkey
3738
monkey.patch_all()
3839

39-
host_logger = logging.getLogger('host_logging')
40+
host_logger = logging.getLogger('host_logger')
4041
handler = logging.StreamHandler()
4142
host_log_format = logging.Formatter('%(message)s')
4243
handler.setFormatter(host_log_format)
@@ -213,8 +214,9 @@ def exec_command(self, command, sudo=False, user=None, **kwargs):
213214
channel.get_pty()
214215
_stdout, _stderr = channel.makefile('rb'), \
215216
channel.makefile_stderr('rb')
216-
stdout, stderr = self._read_output_buffer(_stdout), \
217-
self._read_output_buffer(_stderr)
217+
stdout, stderr = self._read_output_buffer(_stdout,), \
218+
self._read_output_buffer(_stderr,
219+
prefix='\t[err]')
218220
if sudo and not user:
219221
command = 'sudo -S bash -c "%s"' % command.replace('"', '\\"')
220222
elif user:
@@ -229,12 +231,12 @@ def exec_command(self, command, sudo=False, user=None, **kwargs):
229231
gevent.sleep(.2)
230232
return channel, self.host, stdout, stderr
231233

232-
def _read_output_buffer(self, output_buffer):
233-
"""Read from output buffers,
234-
allowing coroutines to execute in between reading"""
234+
def _read_output_buffer(self, output_buffer, prefix=''):
235+
"""Read from output buffers and log to host_logger"""
235236
for line in output_buffer:
236-
gevent.sleep()
237-
yield line.strip()
237+
output = line.strip()
238+
host_logger.info("[%s]%s\t%s", self.host, prefix, output,)
239+
yield output
238240

239241
def _make_sftp(self):
240242
"""Make SFTP client from open transport"""
@@ -404,9 +406,38 @@ def __init__(self, hosts,
404406
# To hold host clients
405407
self.host_clients = dict((host, None) for host in hosts)
406408

409+
def run_command(self, *args, **kwargs):
410+
"""Run command on all hosts in parallel, honoring self.pool_size
411+
412+
:param args: Position arguments for command
413+
:type args: tuple
414+
:param kwargs: Keyword arguments for command
415+
:type kwargs: dict
416+
417+
:rtype: List of :mod:`gevent.Greenlet`
418+
419+
**Example**:
420+
421+
>>> output = client.exec_command('ls -ltrh')
422+
423+
Wait for completion, no stdout:
424+
425+
>>> client.pool.join()
426+
427+
Alternatively/in addition print stdout for each command:
428+
429+
>>> for host in output:
430+
>>> print output[host]['stdout']
431+
"""
432+
for host in self.hosts:
433+
self.pool.spawn(self._exec_command, host, *args, **kwargs)
434+
return self.get_output()
435+
407436
def exec_command(self, *args, **kwargs):
408437
"""Run command on all hosts in parallel, honoring self.pool_size
409438
439+
**Superseeded by :mod:`ParallelSSH.run_command`**
440+
410441
:param args: Position arguments for command
411442
:type args: tuple
412443
:param kwargs: Keyword arguments for command
@@ -446,35 +477,80 @@ def _exec_command(self, host, *args, **kwargs):
446477
timeout=self.timeout)
447478
return self.host_clients[host].exec_command(*args, **kwargs)
448479

480+
def get_output(self, commands=None):
481+
"""Get output from running commands.
482+
483+
Stdout and stderr are also logged via the logger named ``host_logger``
484+
which is enabled by default.
485+
``host_logger`` output can be disabled by removing its handler.
486+
>>> logger = logging.getLogger('host_logger')
487+
>>> for handler in logger.handlers: logger.removeHandler(handler)
488+
489+
**Example usage**:
490+
491+
>>> output = client.get_output()
492+
>>> for host in output: print output[host]['stdout']
493+
<stdout>
494+
>>> # Get exit code after command has finished
495+
>>> self.get_exit_code(output[host])
496+
0
497+
498+
:param commands: (Optional) Override commands to get output from.
499+
Uses running commands in pool if not given
500+
:type commands: :mod:`gevent.Greenlet`
501+
:rtype: Dictionary with host as key as in:
502+
``{'myhost1': {'exit_code': exit code if ready else None,
503+
'channel' : SSH channel of command,
504+
'stdout' : <iterable>,
505+
'stderr' : <iterable>,}}``"""
506+
if not commands:
507+
commands = list(self.pool.greenlets)
508+
return {host: {'exit_code': self._get_exit_code(channel),
509+
'channel' : channel,
510+
'stdout' : stdout,
511+
'stderr' : stderr, }
512+
for cmd in commands
513+
for (channel, host, stdout, stderr) in [cmd.get()]}
514+
515+
def get_exit_code(self, host_output):
516+
"""Get exit code from host output if available
517+
:param host_output: Per host output as returned by `self.get_output`
518+
:rtype: int or None if exit code not ready"""
519+
if not 'channel' in host_output:
520+
logger.error("%s does not look like host output..", host_output,)
521+
return
522+
channel = host_output['channel']
523+
return self._get_exit_code(channel)
524+
525+
def _get_exit_code(self, channel):
526+
"""Get exit code from channel if ready"""
527+
if not channel.exit_status_ready():
528+
return
529+
channel.close()
530+
return channel.recv_exit_status()
531+
449532
def get_stdout(self, greenlet, return_buffers=False):
450533
"""Get/print stdout from greenlet and return exit code for host
451534
452-
:mod:`pssh.get_stdout` will close the open SSH channel but this does
453-
**not** close the established connection to the remote host, only the
454-
authenticated SSH channel within it. This is standard practise
455-
in SSH when a command has finished executing. A new command
456-
will open a new channel which is very fast on already established
457-
connections.
458-
459-
By default, stdout and stderr will be logged via the logger named \
460-
``host_logger`` unless ``return_buffers`` is set to ``True`` in which case \
461-
both buffers are instead returned along with the exit status.
462-
535+
**Deprecated** - use self.get_output() instead.
536+
463537
:param greenlet: Greenlet object containing an \
464538
SSH channel reference, hostname, stdout and stderr buffers
465539
:type greenlet: :mod:`gevent.Greenlet`
466-
540+
467541
:param return_buffers: Flag to turn on returning stdout and stderr \
468542
buffers along with exit code. Defaults to off.
469543
:type return_buffers: bool
470-
544+
471545
:rtype: Dictionary containing ``{host: {'exit_code': exit code}}`` entry \
472546
for example ``{'myhost1': {'exit_code': 0}}``
473547
:rtype: With ``return_buffers=True``: ``{'myhost1': {'exit_code': 0,
474548
'channel' : None or SSH channel of command if command is still executing,
475549
'stdout' : <iterable>,
476550
'stderr' : <iterable>,}}``
477551
"""
552+
warnings.warn("This method is being deprecated and will be removed in \
553+
future releases - use self.get_output instead", DeprecationWarning)
478554
gevent.sleep(.2)
479555
channel, host, stdout, stderr = greenlet.get()
480556
if channel.exit_status_ready():
@@ -496,16 +572,6 @@ def get_stdout(self, greenlet, return_buffers=False):
496572
'stdout' : stdout,
497573
'stderr' : stderr, }}
498574

499-
# WIP
500-
# def wait_for_exit_status(self, channel):
501-
# """Block and wait for exit status on channel.
502-
# WARNING - this will block forever if the command executed never exits
503-
# :rtype: int - Exit code of command executed"""
504-
# while not channel.exit_status_ready():
505-
# gevent.sleep()
506-
# channel.close()
507-
# return channel.recv_exit_status()
508-
509575
def copy_file(self, local_file, remote_file):
510576
"""Copy local file to remote file in parallel
511577

tests/test_pssh_client.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,48 @@ def test_pssh_client_exec_command_get_buffers(self):
9696
del client
9797
server.join()
9898

99+
def test_pssh_client_run_command_get_output(self):
100+
server = start_server({ self.fake_cmd : self.fake_resp }, self.listen_socket)
101+
client = ParallelSSHClient(['127.0.0.1'], port=self.listen_port,
102+
pkey=self.user_key)
103+
output = client.run_command(self.fake_cmd)
104+
# import ipdb; ipdb.set_trace()
105+
expected_exit_code = 0
106+
expected_stdout = [self.fake_resp]
107+
expected_stderr = []
108+
exit_code = output['127.0.0.1']['exit_code']
109+
stdout = list(output['127.0.0.1']['stdout'])
110+
stderr = list(output['127.0.0.1']['stderr'])
111+
self.assertEqual(expected_exit_code, exit_code,
112+
msg = "Got unexpected exit code - %s, expected %s" %
113+
(exit_code,
114+
expected_exit_code,))
115+
self.assertEqual(expected_stdout, stdout,
116+
msg = "Got unexpected stdout - %s, expected %s" %
117+
(stdout,
118+
expected_stdout,))
119+
self.assertEqual(expected_stderr, stderr,
120+
msg = "Got unexpected stderr - %s, expected %s" %
121+
(stderr,
122+
expected_stderr,))
123+
del client
124+
server.join()
125+
126+
def test_pssh_client_run_long_command(self):
127+
expected_lines = 5
128+
server = start_server({ self.long_running_cmd :
129+
self.long_running_response(expected_lines) },
130+
self.listen_socket)
131+
client = ParallelSSHClient(['127.0.0.1'], port=self.listen_port,
132+
pkey=self.user_key)
133+
output = client.run_command(self.long_running_cmd)
134+
self.assertTrue('127.0.0.1' in output, msg="Got no output for command")
135+
stdout = list(output['127.0.0.1']['stdout'])
136+
self.assertTrue(len(stdout) == expected_lines, msg="Expected %s lines of response, got %s" %
137+
(expected_lines, len(stdout)))
138+
del client
139+
server.kill()
140+
99141
def test_pssh_client_auth_failure(self):
100142
server = start_server({ self.fake_cmd : self.fake_resp },
101143
self.listen_socket, fail_auth=True)

0 commit comments

Comments
 (0)