Skip to content

Commit 038ba13

Browse files
authored
Implement asyncio API for subprocess (#95)
[x] API mimic sync version [x] partial migration to pytest for parametrized tests [x] Fix no stdin in exec result
1 parent d4c3a47 commit 038ba13

29 files changed

+1970
-848
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
.pytest_cache/*
77

88
### Generated code
9-
/exec_helpers/*.c
9+
/exec_helpers/**/*.c
1010

1111
### TortoiseGit template
1212
# Project-level settings

.travis.yml

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ language: python
33
os: linux
44
install:
55
- &upgrade_python_toolset pip install --upgrade pip setuptools wheel
6-
- &install_test_deps pip install --upgrade pytest pytest-sugar
6+
- &install_test_deps pip install --upgrade asynctest mock pytest pytest-mock pytest-asyncio pytest-sugar
77
- &install_deps pip install -r CI_REQUIREMENTS.txt
88
- pip install --upgrade pytest-cov coveralls
99

1010
_python:
11-
- &python34
12-
name: "Python 3.4"
13-
python: 3.4
1411
- &python35
1512
name: "Python 3.5"
1613
python: 3.5
@@ -110,8 +107,6 @@ jobs:
110107
script:
111108
- black --check exec_helpers
112109

113-
- stage: test
114-
<<: *python34
115110
- stage: test
116111
<<: *python35
117112
- stage: test
@@ -121,8 +116,6 @@ jobs:
121116
- stage: test
122117
<<: *pypy3
123118

124-
- <<: *test_cythonized
125-
<<: *python34
126119
- <<: *test_cythonized
127120
<<: *python35
128121
- <<: *test_cythonized

README.rst

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,13 +44,12 @@ Pros:
4444

4545
::
4646

47-
Python 3.4
4847
Python 3.5
4948
Python 3.6
5049
Python 3.7
5150
PyPy3 3.5+
5251

53-
.. note:: For Python 2.7 and PyPy please use versions 1.x.x
52+
.. note:: For Python 2.7 and PyPy please use versions 1.x.x. For python 3.4 use versions 2.x.x
5453

5554
This package includes:
5655

@@ -64,6 +63,9 @@ This package includes:
6463
* `Subprocess` - `subprocess.Popen` wrapper with timeouts, polling and almost the same API, as `SSHClient`
6564
(except specific flags, like `cwd` for subprocess and `get_tty` for ssh).
6665

66+
* `async_api.Subprocess` - the same, as `Subprocess` helper, but works with asyncio.
67+
.. note:: for Windows `ProactorEventLoop` or another non-standard event loop should be used!
68+
6769
* `ExecResult` - class for execution results storage.
6870
Contains exit code, stdout, stderr and getters for decoding as JSON, YAML, string, bytearray and brief strings (up to 7 lines).
6971

@@ -315,6 +317,23 @@ Kwargs set properties:
315317

316318
.. note:: `shell=true` is always set.
317319

320+
async_api.Subprocess specific
321+
-----------------------------
322+
323+
All standard methods are coroutines. Async context manager also available.
324+
325+
Example:
326+
327+
.. code-block:: python
328+
329+
async with helper:
330+
result = await helper.execute(
331+
command, # type: str
332+
verbose=False, # type: bool
333+
timeout=1 * 60 * 60, # type: typing.Union[int, float, None]
334+
**kwargs
335+
)
336+
318337
Testing
319338
=======
320339
The main test mechanism for the package `exec-helpers` is using `tox`.

build_requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
Cython; platform_python_implementation == "CPython"
2-
wheel
2+
wheel==0.31.1
33
-r CI_REQUIREMENTS.txt
44
-r requirements.txt

doc/source/ExecResult.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ API: ExecResult
66
.. py:module:: exec_helpers
77
.. py:currentmodule:: exec_helpers
88
9-
.. py:class:: ExecResult(object)
9+
.. py:class:: ExecResult()
1010
1111
Command execution result.
1212

@@ -27,6 +27,7 @@ API: ExecResult
2727
2828
``threading.RLock``
2929
Lock object for thread-safe operation.
30+
3031
.. versionadded:: 2.2.0
3132

3233
.. py:attribute:: stderr_lock

doc/source/SSHClient.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ API: SSHClient and SSHAuth.
325325
:rtype: ``bool``
326326

327327

328-
.. py:class:: SSHAuth(object)
328+
.. py:class:: SSHAuth()
329329
330330
SSH credentials object.
331331

exec_helpers/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@
2525
ExecHelperTimeoutError,
2626
)
2727

28-
from .exec_result import ExecResult
2928
from .api import ExecHelper
29+
from .exec_result import ExecResult
3030
from .ssh_auth import SSHAuth
31-
from ._ssh_client_base import SshExecuteAsyncResult
3231
from .ssh_client import SSHClient
32+
from ._ssh_client_base import SshExecuteAsyncResult
3333
from .subprocess_runner import Subprocess, SubprocessExecuteAsyncResult # nosec # Expected
34+
from . import async_api
3435

3536
__all__ = (
3637
"ExecHelperError",
@@ -47,9 +48,10 @@
4748
"SubprocessExecuteAsyncResult",
4849
"ExitCodes",
4950
"ExecResult",
51+
"async_api",
5052
)
5153

52-
__version__ = "2.2.0"
54+
__version__ = "3.0.0"
5355
__author__ = "Alexey Stepanov"
5456
__author_email__ = "penguinolog@gmail.com"
5557
__maintainers__ = {

exec_helpers/_ssh_client_base.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -614,12 +614,10 @@ def execute_async(
614614

615615
return SshExecuteAsyncResult(chan, _stdin, stderr, stdout)
616616

617-
def _exec_command(
617+
def _exec_command( # type: ignore
618618
self,
619619
command: str,
620-
interface: paramiko.channel.Channel,
621-
stdout: typing.Optional[paramiko.ChannelFile],
622-
stderr: typing.Optional[paramiko.ChannelFile],
620+
async_result: SshExecuteAsyncResult,
623621
timeout: typing.Union[int, float, None],
624622
verbose: bool = False,
625623
log_mask_re: typing.Optional[str] = None,
@@ -629,12 +627,8 @@ def _exec_command(
629627
630628
:param command: executed command (for logs)
631629
:type command: str
632-
:param interface: interface to control execution
633-
:type interface: paramiko.channel.Channel
634-
:param stdout: source for STDOUT read
635-
:type stdout: typing.Optional[paramiko.ChannelFile]
636-
:param stderr: source for STDERR read
637-
:type stderr: typing.Optional[paramiko.ChannelFile]
630+
:param async_result: execute_async result
631+
:type async_result: SshExecuteAsyncResult
638632
:param timeout: timeout before stop execution with TimeoutError
639633
:type timeout: typing.Union[int, float, None]
640634
:param verbose: produce log.info records for STDOUT/STDERR
@@ -653,10 +647,10 @@ def _exec_command(
653647

654648
def poll_streams() -> None:
655649
"""Poll FIFO buffers if data available."""
656-
if stdout and interface.recv_ready():
657-
result.read_stdout(src=stdout, log=self.logger, verbose=verbose)
658-
if stderr and interface.recv_stderr_ready():
659-
result.read_stderr(src=stderr, log=self.logger, verbose=verbose)
650+
if async_result.stdout and async_result.interface.recv_ready():
651+
result.read_stdout(src=async_result.stdout, log=self.logger, verbose=verbose)
652+
if async_result.stderr and async_result.interface.recv_stderr_ready():
653+
result.read_stderr(src=async_result.stderr, log=self.logger, verbose=verbose)
660654

661655
@threaded.threadpooled
662656
def poll_pipes(stop: threading.Event) -> None:
@@ -666,21 +660,21 @@ def poll_pipes(stop: threading.Event) -> None:
666660
"""
667661
while not stop.is_set():
668662
time.sleep(0.1)
669-
if stdout or stderr:
663+
if async_result.stdout or async_result.stderr:
670664
poll_streams()
671665

672-
if interface.status_event.is_set():
673-
result.read_stdout(src=stdout, log=self.logger, verbose=verbose)
674-
result.read_stderr(src=stderr, log=self.logger, verbose=verbose)
675-
result.exit_code = interface.exit_status
666+
if async_result.interface.status_event.is_set():
667+
result.read_stdout(src=async_result.stdout, log=self.logger, verbose=verbose)
668+
result.read_stderr(src=async_result.stderr, log=self.logger, verbose=verbose)
669+
result.exit_code = async_result.interface.exit_status
676670

677671
stop.set()
678672

679673
# channel.status_event.wait(timeout)
680674
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)
681675

682676
# Store command with hidden data
683-
result = exec_result.ExecResult(cmd=cmd_for_log)
677+
result = exec_result.ExecResult(cmd=cmd_for_log, stdin=kwargs.get("stdin"))
684678

685679
stop_event = threading.Event()
686680

@@ -693,11 +687,11 @@ def poll_pipes(stop: threading.Event) -> None:
693687

694688
# Process closed?
695689
if stop_event.is_set():
696-
interface.close()
690+
async_result.interface.close()
697691
return result
698692

699693
stop_event.set()
700-
interface.close()
694+
async_result.interface.close()
701695
future.cancel()
702696

703697
wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
@@ -774,9 +768,15 @@ def execute_through_host(
774768

775769
channel.exec_command(command) # nosec # Sanitize on caller side
776770

771+
async_result = SshExecuteAsyncResult(interface=channel, stdin=None, stdout=stdout, stderr=stderr)
772+
777773
# noinspection PyDictCreation
778774
result = self._exec_command(
779-
command, channel, stdout, stderr, timeout, verbose=verbose, log_mask_re=kwargs.get("log_mask_re", None)
775+
command,
776+
async_result=async_result,
777+
timeout=timeout,
778+
verbose=verbose,
779+
log_mask_re=kwargs.get("log_mask_re", None),
780780
)
781781

782782
intermediate_channel.close()
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
# Copyright 2018 Alexey Stepanov aka penguinolog.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
# not use this file except in compliance with the License. You may obtain
5+
# a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12+
# License for the specific language governing permissions and limitations
13+
# under the License.
14+
15+
"""Python subprocess shared code."""
16+
17+
__all__ = ("kill_proc_tree", "subprocess_kw")
18+
19+
import platform
20+
21+
# pylint: disable=unused-import
22+
import typing # noqa: F401
23+
24+
# pylint: enable=unused-import
25+
26+
import psutil # type: ignore
27+
28+
29+
# Adopt from:
30+
# https://stackoverflow.com/questions/1230669/subprocess-deleting-child-processes-in-windows
31+
def kill_proc_tree(pid: int, including_parent: bool = True) -> None: # pragma: no cover
32+
"""Kill process tree.
33+
34+
:param pid: PID of parent process to kill
35+
:type pid: int
36+
:param including_parent: kill also parent process
37+
:type including_parent: bool
38+
"""
39+
parent = psutil.Process(pid)
40+
children = parent.children(recursive=True)
41+
for child in children: # type: psutil.Process
42+
child.kill()
43+
_, alive = psutil.wait_procs(children, timeout=5)
44+
for proc in alive: # type: psutil.Process
45+
proc.kill() # 2nd shot
46+
if including_parent:
47+
parent.kill()
48+
parent.wait(5)
49+
50+
51+
# Subprocess extra arguments.
52+
# Flags from:
53+
# https://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated
54+
subprocess_kw = {} # type: typing.Dict[str, typing.Any]
55+
if "Windows" == platform.system(): # pragma: no cover
56+
subprocess_kw["creationflags"] = 0x00000200
57+
else: # pragma: no cover
58+
subprocess_kw["start_new_session"] = True

exec_helpers/api.py

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -184,9 +184,7 @@ def execute_async(
184184
def _exec_command(
185185
self,
186186
command: str,
187-
interface: typing.Any,
188-
stdout: typing.Any,
189-
stderr: typing.Any,
187+
async_result: ExecuteAsyncResult,
190188
timeout: typing.Union[int, float, None],
191189
verbose: bool = False,
192190
log_mask_re: typing.Optional[str] = None,
@@ -196,12 +194,8 @@ def _exec_command(
196194
197195
:param command: Command for execution
198196
:type command: str
199-
:param interface: Control interface
200-
:type interface: typing.Any
201-
:param stdout: STDOUT pipe or file-like object
202-
:type stdout: typing.Any
203-
:param stderr: STDERR pipe or file-like object
204-
:type stderr: typing.Any
197+
:param async_result: execute_async result
198+
:type async_result: SubprocessExecuteAsyncResult
205199
:param timeout: Timeout for command execution
206200
:type timeout: typing.Union[int, float, None]
207201
:param verbose: produce verbose log record on command call
@@ -246,13 +240,7 @@ def execute(
246240
async_result = self.execute_async(command, verbose=verbose, **kwargs) # type: ExecuteAsyncResult
247241

248242
result = self._exec_command(
249-
command=command,
250-
interface=async_result.interface,
251-
stdout=async_result.stdout,
252-
stderr=async_result.stderr,
253-
timeout=timeout,
254-
verbose=verbose,
255-
**kwargs
243+
command=command, async_result=async_result, timeout=timeout, verbose=verbose, **kwargs
256244
)
257245
message = "Command {result.cmd!r} exit code: {result.exit_code!s}".format(result=result)
258246
self.logger.log(level=logging.INFO if verbose else logging.DEBUG, msg=message) # type: ignore
@@ -300,7 +288,7 @@ def check_call(
300288
append=error_info + "\n" if error_info else "", result=ret, expected=expected
301289
)
302290
)
303-
self.logger.error(message)
291+
self.logger.error(msg=message)
304292
if raise_on_err:
305293
raise exceptions.CalledProcessError(result=ret, expected=expected)
306294
return ret
@@ -340,10 +328,10 @@ def check_stderr(
340328
)
341329
if ret.stderr:
342330
message = (
343-
"{append}Command {result.cmd!r} STDERR while not expected\n"
331+
"{append}Command {result.cmd!r} output contains STDERR while not expected\n"
344332
"\texit code: {result.exit_code!s}".format(append=error_info + "\n" if error_info else "", result=ret)
345333
)
346-
self.logger.error(message)
334+
self.logger.error(msg=message)
347335
if raise_on_err:
348336
raise exceptions.CalledProcessError(result=ret, expected=kwargs.get("expected"))
349337
return ret

0 commit comments

Comments
 (0)