Skip to content

Commit c01eda0

Browse files
authored
Allow parallel run of commands (#83)
* all execute methods can be started in parallel if required - use `lock` attribute * Change `execute_async` signature and use typed NamedTuple. Types are exported for typing purposes only.
1 parent 8117b28 commit c01eda0

File tree

7 files changed

+197
-113
lines changed

7 files changed

+197
-113
lines changed

doc/source/SSHClient.rst

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,12 @@ API: SSHClient and SSHAuth.
138138
:param log_mask_re: regex lookup rule to mask command for logger.
139139
all MATCHED groups will be replaced by '<*masked*>'
140140
:type log_mask_re: typing.Optional[str]
141-
:rtype: ``typing.Tuple[paramiko.Channel, paramiko.ChannelFile, typing.Optional[paramiko.ChannelFile], typing.Optional[paramiko.ChannelFile]]``
141+
:rtype: SshExecuteAsyncResult
142142

143143
.. versionchanged:: 1.2.0 open_stdout and open_stderr flags
144144
.. versionchanged:: 1.2.0 stdin data
145145
.. versionchanged:: 1.2.0 get_pty moved to `**kwargs`
146+
.. versionchanged:: 2.1.0 Use typed NamedTuple as result
146147

147148
.. py:method:: execute(command, verbose=False, timeout=1*60*60, **kwargs)
148149
@@ -388,3 +389,24 @@ API: SSHClient and SSHAuth.
388389
:param log: Log on generic connection failure
389390
:type log: ``bool``
390391
:raises paramiko.AuthenticationException: Authentication failed.
392+
393+
394+
.. py:class:: SshExecuteAsyncResult
395+
396+
Typed NamedTuple
397+
398+
.. py:attribute:: interface
399+
400+
``paramiko.Channel``
401+
402+
.. py:attribute:: stdin
403+
404+
``paramiko.ChannelFile``
405+
406+
.. py:attribute:: stderr
407+
408+
``typing.Optional[paramiko.ChannelFile]``
409+
410+
.. py:attribute:: stdout
411+
412+
``typing.Optional[paramiko.ChannelFile]``

doc/source/Subprocess.rst

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ API: Subprocess
5656
:param log_mask_re: regex lookup rule to mask command for logger.
5757
all MATCHED groups will be replaced by '<*masked*>'
5858
:type log_mask_re: ``typing.Optional[str]``
59-
:rtype: ``typing.Tuple[subprocess.Popen, None, typing.Optional[typing.IO], typing.Optional[typing.IO], ]``
59+
:rtype: SubprocessExecuteAsyncResult
60+
:raises OSError: impossible to process STDIN
6061

6162
.. versionadded:: 1.2.0
63+
.. versionchanged:: 2.1.0 Use typed NamedTuple as result
6264

6365
.. py:method:: execute(command, verbose=False, timeout=1*60*60, **kwargs)
6466
@@ -124,3 +126,24 @@ API: Subprocess
124126

125127
.. versionchanged:: 1.1.0 make method
126128
.. versionchanged:: 1.2.0 default timeout 1 hour
129+
130+
131+
.. py:class:: SubprocessExecuteAsyncResult
132+
133+
Typed NamedTuple
134+
135+
.. py:attribute:: interface
136+
137+
``subprocess.Popen``
138+
139+
.. py:attribute:: stdin
140+
141+
``typing.Optional[typing.IO]``
142+
143+
.. py:attribute:: stderr
144+
145+
``typing.Optional[typing.IO]``
146+
147+
.. py:attribute:: stdout
148+
149+
``typing.Optional[typing.IO]``

exec_helpers/__init__.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
from .exec_result import ExecResult
2929
from .api import ExecHelper
3030
from .ssh_auth import SSHAuth
31+
from ._ssh_client_base import SshExecuteAsyncResult
3132
from .ssh_client import SSHClient
32-
from .subprocess_runner import Subprocess # nosec # Expected
33+
from .subprocess_runner import Subprocess, SubprocessExecuteAsyncResult # nosec # Expected
3334

3435
__all__ = (
3536
'ExecHelperError',
@@ -40,8 +41,10 @@
4041
'ExecHelperTimeoutError',
4142
'ExecHelper',
4243
'SSHClient',
44+
'SshExecuteAsyncResult',
4345
'SSHAuth',
4446
'Subprocess',
47+
'SubprocessExecuteAsyncResult',
4548
'ExitCodes',
4649
'ExecResult',
4750
)

exec_helpers/_ssh_client_base.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
"""SSH client helper based on Paramiko. Base class."""
1818

19+
__all__ = (
20+
'SSHClientBase',
21+
'SshExecuteAsyncResult',
22+
)
23+
1924
import abc
2025
import base64
2126
import collections
@@ -43,18 +48,33 @@
4348
from exec_helpers import ssh_auth
4449
from exec_helpers import _log_templates
4550

46-
__all__ = ('SSHClientBase', )
47-
4851
logging.getLogger('paramiko').setLevel(logging.WARNING)
4952
logging.getLogger('iso8601').setLevel(logging.WARNING)
5053

5154

52-
_type_execute_async = typing.Tuple[
53-
paramiko.Channel,
54-
paramiko.ChannelFile,
55-
typing.Optional[paramiko.ChannelFile],
56-
typing.Optional[paramiko.ChannelFile]
57-
]
55+
class SshExecuteAsyncResult(api.ExecuteAsyncResult):
56+
"""Override original NamedTuple with proper typing."""
57+
58+
@property
59+
def interface(self) -> paramiko.Channel:
60+
"""Override original NamedTuple with proper typing."""
61+
return super(SshExecuteAsyncResult, self).interface
62+
63+
@property
64+
def stdin(self) -> paramiko.ChannelFile: # type: ignore
65+
"""Override original NamedTuple with proper typing."""
66+
return super(SshExecuteAsyncResult, self).stdin
67+
68+
@property
69+
def stderr(self) -> typing.Optional[paramiko.ChannelFile]: # type: ignore
70+
"""Override original NamedTuple with proper typing."""
71+
return super(SshExecuteAsyncResult, self).stderr
72+
73+
@property
74+
def stdout(self) -> typing.Optional[paramiko.ChannelFile]: # type: ignore
75+
"""Override original NamedTuple with proper typing."""
76+
return super(SshExecuteAsyncResult, self).stdout
77+
5878

5979
CPYTHON = 'CPython' == platform.python_implementation()
6080

@@ -584,7 +604,7 @@ def execute_async(
584604
verbose: bool = False,
585605
log_mask_re: typing.Optional[str] = None,
586606
**kwargs: typing.Any
587-
) -> _type_execute_async:
607+
) -> SshExecuteAsyncResult:
588608
"""Execute command in async mode and return channel with IO objects.
589609
590610
:param command: Command for execution
@@ -603,16 +623,20 @@ def execute_async(
603623
:param kwargs: additional parameters for call.
604624
:type kwargs: typing.Any
605625
:return: Tuple with control interface and file-like objects for STDIN/STDERR/STDOUT
606-
:rtype: typing.Tuple[
607-
paramiko.Channel,
608-
paramiko.ChannelFile,
609-
typing.Optional[paramiko.ChannelFile],
610-
typing.Optional[paramiko.ChannelFile],
611-
]
626+
:rtype: typing.NamedTuple(
627+
'SshExecuteAsyncResult',
628+
[
629+
('interface', paramiko.Channel),
630+
('stdin', paramiko.ChannelFile),
631+
('stderr', typing.Optional[paramiko.ChannelFile]),
632+
('stdout', typing.Optional[paramiko.ChannelFile]),
633+
]
634+
)
612635
613636
.. versionchanged:: 1.2.0 open_stdout and open_stderr flags
614637
.. versionchanged:: 1.2.0 stdin data
615638
.. versionchanged:: 1.2.0 get_pty moved to `**kwargs`
639+
.. versionchanged:: 2.1.0 Use typed NamedTuple as result
616640
"""
617641
cmd_for_log = self._mask_command(
618642
cmd=command,
@@ -657,7 +681,7 @@ def execute_async(
657681
else:
658682
self.logger.warning('STDIN Send failed: closed channel')
659683

660-
return chan, _stdin, stderr, stdout
684+
return SshExecuteAsyncResult(chan, _stdin, stderr, stdout)
661685

662686
def _exec_command(
663687
self,
@@ -887,18 +911,10 @@ def execute_together(
887911
@threaded.threadpooled # type: ignore
888912
def get_result(remote: 'SSHClientBase') -> exec_result.ExecResult:
889913
"""Get result from remote call."""
890-
(
891-
chan,
892-
_,
893-
stderr,
894-
stdout,
895-
) = remote.execute_async(
896-
command,
897-
**kwargs
898-
) # type: _type_execute_async
914+
async_result = remote.execute_async(command, **kwargs) # type: SshExecuteAsyncResult
899915

900-
chan.status_event.wait(timeout)
901-
exit_code = chan.recv_exit_status()
916+
async_result.interface.status_event.wait(timeout)
917+
exit_code = async_result.interface.recv_exit_status()
902918

903919
# pylint: disable=protected-access
904920
cmd_for_log = remote._mask_command(
@@ -908,11 +924,11 @@ def get_result(remote: 'SSHClientBase') -> exec_result.ExecResult:
908924
# pylint: enable=protected-access
909925

910926
result = exec_result.ExecResult(cmd=cmd_for_log)
911-
result.read_stdout(src=stdout)
912-
result.read_stderr(src=stderr)
927+
result.read_stdout(src=async_result.stdout)
928+
result.read_stderr(src=async_result.stderr)
913929
result.exit_code = exit_code
914930

915-
chan.close()
931+
async_result.interface.close()
916932
return result
917933

918934
expected = expected or [proc_enums.ExitCodes.EX_OK]

exec_helpers/api.py

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,11 @@
1919
.. versionchanged:: 1.3.5 make API public to use as interface
2020
"""
2121

22+
__all__ = (
23+
'ExecHelper',
24+
'ExecuteAsyncResult',
25+
)
26+
2227
import abc
2328
import logging
2429
import re
@@ -31,6 +36,17 @@
3136
from exec_helpers import proc_enums
3237

3338

39+
ExecuteAsyncResult = typing.NamedTuple(
40+
'ExecuteAsyncResult',
41+
[
42+
('interface', typing.Any),
43+
('stdin', typing.Optional[typing.Any]),
44+
('stderr', typing.Optional[typing.Any]),
45+
('stdout', typing.Optional[typing.Any]),
46+
]
47+
)
48+
49+
3450
class ExecHelper(metaclass=abc.ABCMeta):
3551
"""ExecHelper global API."""
3652

@@ -142,7 +158,7 @@ def execute_async(
142158
verbose: bool = False,
143159
log_mask_re: typing.Optional[str] = None,
144160
**kwargs: typing.Any
145-
) -> typing.Tuple[typing.Any, typing.Any, typing.Any, typing.Any]:
161+
) -> ExecuteAsyncResult:
146162
"""Execute command in async mode and return remote interface with IO objects.
147163
148164
:param command: Command for execution
@@ -160,11 +176,20 @@ def execute_async(
160176
:type log_mask_re: typing.Optional[str]
161177
:param kwargs: additional parameters for call.
162178
:type kwargs: typing.Any
163-
:return: Tuple with control interface and file-like objects for STDIN/STDERR/STDOUT
164-
:rtype: typing.Tuple[typing.Any, typing.Any, typing.Any, typing.Any]
179+
:return: NamedTuple with control interface and file-like objects for STDIN/STDERR/STDOUT
180+
:rtype: typing.NamedTuple(
181+
'ExecuteAsyncResult',
182+
[
183+
('interface', typing.Any),
184+
('stdin', typing.Optional[typing.Any]),
185+
('stderr', typing.Optional[typing.Any]),
186+
('stdout', typing.Optional[typing.Any]),
187+
]
188+
)
165189
166190
.. versionchanged:: 1.2.0 open_stdout and open_stderr flags
167191
.. versionchanged:: 1.2.0 stdin data
192+
.. versionchanged:: 2.1.0 Use typed NamedTuple as result
168193
"""
169194
raise NotImplementedError # pragma: no cover
170195

@@ -229,34 +254,25 @@ def execute(
229254
:raises ExecHelperTimeoutError: Timeout exceeded
230255
231256
.. versionchanged:: 1.2.0 default timeout 1 hour
257+
.. versionchanged:: 2.1.0 Allow parallel calls
232258
"""
233-
with self.lock:
234-
(
235-
iface,
236-
_,
237-
stderr,
238-
stdout,
239-
) = self.execute_async(
240-
command,
241-
verbose=verbose,
242-
**kwargs
243-
)
244-
245-
result = self._exec_command(
246-
command=command,
247-
interface=iface,
248-
stdout=stdout,
249-
stderr=stderr,
250-
timeout=timeout,
251-
verbose=verbose,
252-
**kwargs
253-
)
254-
message = "Command {result.cmd!r} exit code: {result.exit_code!s}".format(result=result)
255-
self.logger.log( # type: ignore
256-
level=logging.INFO if verbose else logging.DEBUG,
257-
msg=message
258-
)
259-
return result
259+
async_result = self.execute_async(command, verbose=verbose, **kwargs) # type: ExecuteAsyncResult
260+
261+
result = self._exec_command(
262+
command=command,
263+
interface=async_result.interface,
264+
stdout=async_result.stdout,
265+
stderr=async_result.stderr,
266+
timeout=timeout,
267+
verbose=verbose,
268+
**kwargs
269+
)
270+
message = "Command {result.cmd!r} exit code: {result.exit_code!s}".format(result=result)
271+
self.logger.log( # type: ignore
272+
level=logging.INFO if verbose else logging.DEBUG,
273+
msg=message
274+
)
275+
return result
260276

261277
def check_call(
262278
self,

0 commit comments

Comments
 (0)