4040
4141
4242class ParallelSSHClient (object ):
43- """Uses :mod :`pssh.ssh_client.SSHClient`, performs tasks over SSH on multiple hosts in \
43+ """Uses :py:class :`pssh.ssh_client.SSHClient`, performs tasks over SSH on multiple hosts in \
4444 parallel.
4545
4646 Connections to hosts are established in parallel when ``run_command`` is called,
@@ -67,7 +67,7 @@ def __init__(self, hosts,
6767 to None which uses SSH default
6868 :type port: int
6969 :param pkey: (Optional) Client's private key to be used to connect with
70- :type pkey: :mod: `paramiko.PKey`
70+ :type pkey: :py:class: `paramiko.pkey .PKey`
7171 :param num_retries: (Optional) Number of retries for connection attempts \
7272 before the client gives up. Defaults to 3.
7373 :type num_retries: int
@@ -99,10 +99,10 @@ def __init__(self, hosts,
9999 :param proxy_pkey: (Optional) Private key to be used for authentication \
100100 with ``proxy_host``. Defaults to available keys from SSHAgent and user's \
101101 home directory keys
102- :type proxy_pkey: :mod: `paramiko.PKey`
102+ :type proxy_pkey: :py:class: `paramiko.pkey .PKey`
103103 :param agent: (Optional) SSH agent object to programmatically supply an \
104104 agent to override system SSH agent with
105- :type agent: :mod :`pssh.agent.SSHAgent`
105+ :type agent: :py:class :`pssh.agent.SSHAgent`
106106 :param host_config: (Optional) Per-host configuration for cases where \
107107 not all hosts use the same configuration values.
108108 :type host_config: dict
@@ -262,10 +262,10 @@ def __init__(self, hosts,
262262 **Per-Host configuration**
263263
264264 Per host configuration can be provided for any or all of user, password port
265- and private key. Private key value is a :mod: `paramiko.PKey` object as
266- returned by :mod :`pssh.utils.load_private_key`.
265+ and private key. Private key value is a :py:class: `paramiko.pkey .PKey` object as
266+ returned by :py:func :`pssh.utils.load_private_key`.
267267
268- :mod :`pssh.utils.load_private_key` accepts both file names and file-like
268+ :py:func :`pssh.utils.load_private_key` accepts both file names and file-like
269269 objects and will attempt to load all available key types, returning
270270 `None` if they all fail.
271271
@@ -361,26 +361,31 @@ def run_command(self, *args, **kwargs):
361361 :param use_shell: (Optional) Run command with or without shell. Defaults \
362362 to True - use shell defined in user login to run command string
363363 :type use_shell: bool
364+ :param use_pty: (Optional) Enable/Disable use of pseudo terminal \
365+ emulation. This is required in vast majority of cases, exception \
366+ being where a shell is not used and/or stdout/stderr/stdin buffers \
367+ are not required. Defaults to ``True``
368+ :type use_pty: bool
364369 :param host_args: (Optional) Format command string with per-host \
365370 arguments in ``host_args``. ``host_args`` length must equal length of \
366- host list - :mod :`pssh.exceptions.HostArgumentException` is raised \
371+ host list - :py:class :`pssh.exceptions.HostArgumentException` is raised \
367372 otherwise
368373 :type host_args: tuple or list
369374 :rtype: Dictionary with host as key as per \
370- :mod :`pssh.pssh_client.ParallelSSHClient.get_output`
375+ :py:func :`pssh.pssh_client.ParallelSSHClient.get_output`
371376
372- :raises: :mod :`pssh.exceptions.AuthenticationException` on \
377+ :raises: :py:class :`pssh.exceptions.AuthenticationException` on \
373378 authentication error
374- :raises: :mod :`pssh.exceptions.UnknownHostException` on DNS resolution \
379+ :raises: :py:class :`pssh.exceptions.UnknownHostException` on DNS resolution \
375380 error
376- :raises: :mod :`pssh.exceptions.ConnectionErrorException` on error \
381+ :raises: :py:class :`pssh.exceptions.ConnectionErrorException` on error \
377382 connecting
378- :raises: :mod :`pssh.exceptions.SSHException` on other undefined SSH \
383+ :raises: :py:class :`pssh.exceptions.SSHException` on other undefined SSH \
379384 errors
380- :raises: :mod :`pssh.exceptions.HostArgumentException` on number of \
385+ :raises: :py:class :`pssh.exceptions.HostArgumentException` on number of \
381386 host arguments not equal to number of hosts
382- :raises: `TypeError` on not enough host arguments for cmd string format
383- :raises: `KeyError` on no host argument key in arguments dict for cmd \
387+ :raises: :py:class: `TypeError` on not enough host arguments for cmd string format
388+ :raises: :py:class: `KeyError` on no host argument key in arguments dict for cmd \
384389 string format
385390
386391 **Example Usage**
@@ -431,7 +436,7 @@ def run_command(self, *args, **kwargs):
431436 large enough.
432437
433438 Iterating over stdout/stderr by definition implies blocking until
434- command has finished. To only see output as it comes in without blocking
439+ command has finished. To only log output as it comes in without blocking
435440 the host logger can be enabled - see `Enabling Host Logger` above.
436441
437442 .. code-block:: python
@@ -589,6 +594,7 @@ def run_command(self, *args, **kwargs):
589594 """
590595 stop_on_errors = kwargs .pop ('stop_on_errors' , True )
591596 host_args = kwargs .pop ('host_args' , None )
597+ output = {}
592598 if host_args :
593599 try :
594600 cmds = [self .pool .spawn (self ._exec_command , host ,
@@ -600,17 +606,17 @@ def run_command(self, *args, **kwargs):
600606 "Number of host arguments provided does not match "
601607 "number of hosts " )
602608 else :
603- cmds = [self .pool .spawn (self . _exec_command , host , * args , ** kwargs )
604- for host in self . hosts ]
605- output = {}
609+ cmds = [self .pool .spawn (
610+ self . _exec_command , host , * args , ** kwargs )
611+ for host in self . hosts ]
606612 for cmd in cmds :
607613 try :
608614 self .get_output (cmd , output )
609615 except Exception :
610616 if stop_on_errors :
611617 raise
612618 return output
613-
619+
614620 def _get_host_config_values (self , host ):
615621 _user = self .host_config .get (host , {}).get ('user' , self .user )
616622 _port = self .host_config .get (host , {}).get ('port' , self .port )
@@ -637,12 +643,12 @@ def _exec_command(self, host, *args, **kwargs):
637643 agent = self .agent ,
638644 channel_timeout = self .channel_timeout )
639645 return self .host_clients [host ].exec_command (* args , ** kwargs )
640-
646+
641647 def get_output (self , cmd , output ):
642648 """Get output from command.
643649
644650 :param cmd: Command to get output from
645- :type cmd: :mod :`gevent.Greenlet`
651+ :type cmd: :py:class :`gevent.Greenlet`
646652 :param output: Dictionary containing output to be updated with output \
647653 from cmd
648654 :type output: dict
@@ -689,6 +695,12 @@ def get_output(self, cmd, output):
689695 self ._update_host_output (output , host , None , None , None , None , None , cmd ,
690696 exception = ex )
691697 raise
698+ stdout = self .host_clients [host ].read_output_buffer (
699+ stdout , callback = self .get_exit_codes ,
700+ callback_args = (output ,))
701+ stderr = self .host_clients [host ].read_output_buffer (
702+ stderr , prefix = '\t [err]' , callback = self .get_exit_codes ,
703+ callback_args = (output ,))
692704 self ._update_host_output (output , host , self ._get_exit_code (channel ),
693705 channel , stdout , stderr , stdin , cmd )
694706
@@ -704,20 +716,15 @@ def _update_host_output(self, output, host, exit_code, channel, stdout,
704716 "key for %s to %s" , host , host , new_host )
705717 host = new_host
706718 output .setdefault (host , {})
707- output [host ].update ({'exit_code' : exit_code ,
708- 'channel' : channel ,
709- 'stdout' : self ._read_buff_ex_code (stdout , output ),
710- 'stderr' : self ._read_buff_ex_code (stderr , output ),
711- 'stdin' : stdin ,
712- 'cmd' : cmd ,
713- 'exception' : exception ,})
714-
715- def _read_buff_ex_code (self , _buffer , output ):
716- if _buffer :
717- for line in _buffer :
718- yield line
719- self .get_exit_codes (output )
720-
719+ output [host ].update ({
720+ 'exit_code' : exit_code ,
721+ 'channel' : channel ,
722+ 'stdout' : stdout ,
723+ 'stderr' : stderr ,
724+ 'stdin' : stdin ,
725+ 'cmd' : cmd ,
726+ 'exception' : exception ,})
727+
721728 def join (self , output ):
722729 """Block until all remote commands in output have finished
723730 and retrieve exit codes"""
@@ -726,34 +733,34 @@ def join(self, output):
726733 if output [host ]['channel' ]:
727734 output [host ]['channel' ].recv_exit_status ()
728735 self .get_exit_codes (output )
729-
736+
730737 def finished (self , output ):
731738 """Check if commands have finished without blocking
732739
733- :param output: As returned by :mod :`pssh.pssh_client.ParallelSSHClient.get_output`
740+ :param output: As returned by :py:func :`pssh.pssh_client.ParallelSSHClient.get_output`
734741 :rtype: bool
735742 """
736743 for host in output :
737744 chan = output [host ]['channel' ]
738745 if chan and not chan .closed :
739746 return False
740747 return True
741-
748+
742749 def get_exit_codes (self , output ):
743750 """Get exit code for all hosts in output *if available*.
744751 Output parameter is modified in-place.
745-
746- :param output: As returned by :mod :`pssh.pssh_client.ParallelSSHClient.get_output`
752+
753+ :param output: As returned by :py:func :`pssh.pssh_client.ParallelSSHClient.get_output`
747754 :rtype: None
748755 """
749756 for host in output :
750757 output [host ].update ({'exit_code' : self .get_exit_code (output [host ])})
751758
752759 def get_exit_code (self , host_output ):
753760 """Get exit code from host output *if available*.
754-
761+
755762 :param host_output: Per host output as returned by \
756- :mod :`pssh.pssh_client.ParallelSSHClient.get_output`
763+ :py:func :`pssh.pssh_client.ParallelSSHClient.get_output`
757764 :rtype: int or None if exit code not ready"""
758765 if not 'channel' in host_output :
759766 logger .error ("%s does not look like host output.." , host_output ,)
@@ -774,7 +781,7 @@ def copy_file(self, local_file, remote_file, recurse=False):
774781 This function returns a list of greenlets which can be
775782 `join`-ed on to wait for completion.
776783
777- :mod :`gevent.joinall` function may be used to join on all greenlets and
784+ :py:func :`gevent.joinall` function may be used to join on all greenlets and
778785 will also raise exceptions if called with ``raise_error=True`` - default
779786 is `False`.
780787
@@ -791,17 +798,17 @@ def copy_file(self, local_file, remote_file, recurse=False):
791798 :param recurse: Whether or not to descend into directories recursively.
792799 :type recurse: bool
793800
794- :raises: :mod :`ValueError` when a directory is supplied to local_file \
801+ :raises: :py:class :`ValueError` when a directory is supplied to local_file \
795802 and recurse is not set
796- :raises: :mod :`IOError` on I/O errors writing files
797- :raises: :mod :`OSError` on OS errors like permission denied
803+ :raises: :py:class :`IOError` on I/O errors writing files
804+ :raises: :py:class :`OSError` on OS errors like permission denied
798805
799806 .. note ::
800807
801808 Remote directories in `remote_file` that do not exist will be
802809 created as long as permissions allow.
803810
804- :rtype: List(:mod :`gevent.Greenlet`) of greenlets for remote copy \
811+ :rtype: List(:py:class :`gevent.Greenlet`) of greenlets for remote copy \
805812 commands
806813 """
807814 return [self .pool .spawn (self ._copy_file , host , local_file , remote_file ,
@@ -823,10 +830,10 @@ def copy_remote_file(self, remote_file, local_file, recurse=False,
823830 the resulting filename will be ``myfile_myhost`` for the file from host
824831 ``myhost``.
825832
826- This function, like :mod :`ParallelSSHClient.copy_file`, returns a list
833+ This function, like :py:func :`ParallelSSHClient.copy_file`, returns a list
827834 of greenlets which can be `join`-ed on to wait for completion.
828835
829- :mod :`gevent.joinall` function may be used to join on all greenlets and
836+ :py:func :`gevent.joinall` function may be used to join on all greenlets and
830837 will also raise exceptions if called with ``raise_error=True`` - default
831838 is `False`.
832839
@@ -857,7 +864,7 @@ def copy_remote_file(self, remote_file, local_file, recurse=False,
857864 File names will be de-duplicated by appending the hostname to the
858865 filepath separated by ``suffix_separator``.
859866
860- :rtype: list(:mod :`gevent.Greenlet`) of greenlets for remote copy \
867+ :rtype: list(:py:class :`gevent.Greenlet`) of greenlets for remote copy \
861868 commands
862869 """
863870 return [self .pool .spawn (
0 commit comments