Skip to content

Commit c6e062d

Browse files
author
Dan
committed
Make cmd starting async. Resolves #56. Updated tests and documentation.
1 parent adea3c7 commit c6e062d

File tree

4 files changed

+53
-29
lines changed

4 files changed

+53
-29
lines changed

pssh/pssh_client.py

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,15 @@ def __init__(self, hosts,
163163
>>> import paramiko
164164
>>> client_key = paramiko.RSAKey.from_private_key_file('user.key')
165165
>>> client = ParallelSSHClient(['myhost1', 'myhost2'], pkey=client_key)
166+
167+
**Example with expression as host list**
168+
169+
Any type of iterator may be used as host list, including generator and
170+
list comprehension expressions.
171+
172+
>>> hosts = ['dc1.myhost1', 'dc2.myhost2']
173+
>>> client = ParallelSSHClient([h for h in hosts if h.find('dc1')])
174+
>>> client.run_command(<..>)
166175
167176
.. note ::
168177
@@ -184,7 +193,7 @@ def __init__(self, hosts,
184193
185194
Connection is terminated.
186195
"""
187-
self.pool_size = len(hosts) if len(hosts) < pool_size else pool_size
196+
self.pool_size = pool_size
188197
self.pool = gevent.pool.Pool(size=self.pool_size)
189198
self.hosts = hosts
190199
self.user = user
@@ -202,15 +211,21 @@ def __init__(self, hosts,
202211
def run_command(self, *args, **kwargs):
203212
"""Run command on all hosts in parallel, honoring self.pool_size,
204213
and return output buffers.
205-
214+
206215
This function will block until all commands have **started** and
207-
then return immediately. Any connection and/or authentication exceptions
208-
will be raised here and need catching.
209-
216+
then return immediately.
217+
218+
Any connection and/or authentication exceptions will be raised here
219+
and need catching _unless_ `run_command` is called with
220+
`stop_on_errors=False`.
221+
210222
:param args: Positional arguments for command
211223
:type args: tuple
212224
:param sudo: (Optional) Run with sudo. Defaults to False
213225
:type sudo: bool
226+
:param user: (Optional) User to run command as. Requires sudo access \
227+
for that user from the logged in user account.
228+
:type user: str
214229
:param stop_on_errors: (Optional) Raise exception on errors running command. \
215230
Defaults to True. With stop_on_errors set to False, exceptions are instead \
216231
added to output of `run_command`. See example usage below.
@@ -235,18 +250,20 @@ def run_command(self, *args, **kwargs):
235250
236251
>>> for host in output:
237252
>>> for line in output[host]['stdout']: print line
238-
253+
239254
*Get exit codes after command has finished*
240-
255+
241256
>>> client.get_exit_codes(output)
242257
>>> for host in output:
243258
>>> ... print output[host]['exit_code']
244259
0
245260
0
246261
247-
*Wait for completion, no stdout printing*
262+
*Wait for completion, no stdout/stderr*
248263
249264
>>> client.join(output)
265+
>>> print output[host]['exit_code']
266+
0
250267
251268
*Run with sudo*
252269
@@ -370,33 +387,40 @@ def get_output(self, cmd, output):
370387
try:
371388
host = ex.args[1]
372389
except IndexError:
373-
logger.error("Got exception with no host argument - cannot update output data with %s", ex)
390+
logger.error("Got exception with no host argument - "
391+
"cannot update output data with %s", ex)
374392
raise ex
375393
self._update_host_output(output, host, None, None, None, None, cmd,
376394
exception=ex)
377395
raise ex
378396
self._update_host_output(output, host, self._get_exit_code(channel),
379397
channel, stdout, stderr, cmd)
380398

381-
def _update_host_output(self, output, host, exit_code, channel, stdout, stderr, cmd,
382-
exception=None):
399+
def _update_host_output(self, output, host, exit_code, channel, stdout,
400+
stderr, cmd, exception=None):
383401
"""Update host output with given data"""
384402
if host in output:
385403
new_host = "_".join([host,
386404
''.join(random.choice(
387405
string.ascii_lowercase + string.digits)
388406
for _ in xrange(8))])
389-
logger.warning("Already have output for host %s - changing host key for %s to %s",
390-
host, host, new_host)
407+
logger.warning("Already have output for host %s - changing host "
408+
"key for %s to %s", host, host, new_host)
391409
host = new_host
392410
output.setdefault(host, {})
393411
output[host].update({'exit_code' : exit_code,
394412
'channel' : channel,
395-
'stdout' : stdout,
396-
'stderr' : stderr,
413+
'stdout' : self._read_buff_ex_code(stdout, output),
414+
'stderr' : self._read_buff_ex_code(stderr, output),
397415
'cmd' : cmd,
398416
'exception' : exception,})
399-
417+
418+
def _read_buff_ex_code(self, _buffer, output):
419+
if _buffer:
420+
for line in _buffer:
421+
yield line
422+
self.get_exit_codes(output)
423+
400424
def join(self, output):
401425
"""Block until all remote commands in output have finished
402426
and retrieve exit codes"""

pssh/ssh_client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,9 +229,7 @@ def exec_command(self, command, sudo=False, user=None, **kwargs):
229229
logger.debug("Running command %s on %s", command, self.host)
230230
channel.exec_command(command, **kwargs)
231231
logger.debug("Command started")
232-
while not (channel.recv_ready() or channel.closed or
233-
channel.exit_status_ready()):
234-
gevent.sleep(.2)
232+
gevent.sleep(0)
235233
return channel, self.host, stdout, stderr
236234

237235
def _read_output_buffer(self, output_buffer, prefix=''):

tests/test_pssh_client.py

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,8 @@ def test_pssh_client_exec_command(self):
7474
def test_pssh_client_no_stdout_non_zero_exit_code(self):
7575
output = self.client.run_command('exit 1')
7676
expected_exit_code = 1
77+
self.client.join(output)
7778
exit_code = output[self.host]['exit_code']
78-
self.client.pool.join()
7979
self.assertEqual(expected_exit_code, exit_code,
8080
msg="Got unexpected exit code - %s, expected %s" %
8181
(exit_code,
@@ -114,9 +114,9 @@ def test_pssh_client_run_command_get_output(self):
114114
expected_exit_code = 0
115115
expected_stdout = [self.fake_resp]
116116
expected_stderr = []
117-
exit_code = output[self.host]['exit_code']
118117
stdout = list(output[self.host]['stdout'])
119118
stderr = list(output[self.host]['stderr'])
119+
exit_code = output[self.host]['exit_code']
120120
self.assertEqual(expected_exit_code, exit_code,
121121
msg="Got unexpected exit code - %s, expected %s" %
122122
(exit_code,
@@ -141,9 +141,9 @@ def test_pssh_client_run_command_get_output_explicit(self):
141141
expected_exit_code = 0
142142
expected_stdout = [self.fake_resp]
143143
expected_stderr = []
144-
exit_code = output[self.host]['exit_code']
145144
stdout = list(output[self.host]['stdout'])
146145
stderr = list(output[self.host]['stderr'])
146+
exit_code = output[self.host]['exit_code']
147147
self.assertEqual(expected_exit_code, exit_code,
148148
msg="Got unexpected exit code - %s, expected %s" %
149149
(exit_code,
@@ -378,22 +378,25 @@ def test_pssh_client_directory(self):
378378
shutil.rmtree(remote_test_path)
379379

380380
def test_pssh_pool_size(self):
381-
"""Test pool size logic"""
381+
"""Test setting pool size to non default values"""
382382
hosts = ['host-%01d' % d for d in xrange(5)]
383-
client = ParallelSSHClient(hosts)
384-
expected, actual = len(hosts), client.pool.size
383+
pool_size = 2
384+
client = ParallelSSHClient(hosts, pool_size=pool_size)
385+
expected, actual = pool_size, client.pool.size
385386
self.assertEqual(expected, actual,
386387
msg="Expected pool size to be %s, got %s" % (
387388
expected, actual,))
388389
hosts = ['host-%01d' % d for d in xrange(15)]
389-
client = ParallelSSHClient(hosts)
390+
pool_size = 5
391+
client = ParallelSSHClient(hosts, pool_size=pool_size)
390392
expected, actual = client.pool_size, client.pool.size
391393
self.assertEqual(expected, actual,
392394
msg="Expected pool size to be %s, got %s" % (
393395
expected, actual,))
394396
hosts = ['host-%01d' % d for d in xrange(15)]
395-
client = ParallelSSHClient(hosts, pool_size=len(hosts)+5)
396-
expected, actual = len(hosts), client.pool.size
397+
pool_size = len(hosts)+5
398+
client = ParallelSSHClient(hosts, pool_size=pool_size)
399+
expected, actual = pool_size, client.pool.size
397400
self.assertEqual(expected, actual,
398401
msg="Expected pool size to be %s, got %s" % (
399402
expected, actual,))

tests/test_ssh_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ def test_ssh_agent_authentication(self):
205205
client = SSHClient(self.host, port=self.listen_port,
206206
agent=agent)
207207
channel, host, stdout, stderr = client.exec_command(self.fake_cmd)
208-
channel.close()
209208
output = list(stdout)
210209
stderr = list(stderr)
211210
expected = [self.fake_resp]

0 commit comments

Comments
 (0)