Skip to content

Commit 0e3879d

Browse files
authored
Use process tree and kill tree instead of parent only. Fix #90 (#92)
* Use process tree and kill tree instead of parent only. Fix #90 Bump to 1.4.1 Signed-off-by: Alexey Stepanov <penguinolog@gmail.com> * Force to close FIFO buffers instead of waiting for garbage collector Signed-off-by: Alexey Stepanov <penguinolog@gmail.com>
1 parent c4ecd03 commit 0e3879d

File tree

7 files changed

+120
-60
lines changed

7 files changed

+120
-60
lines changed

exec_helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
"ExecResult",
5252
)
5353

54-
__version__ = "1.4.0"
54+
__version__ = "1.4.1"
5555
__author__ = "Alexey Stepanov"
5656
__author_email__ = "penguinolog@gmail.com"
5757
__maintainers__ = {

exec_helpers/_ssh_client_base.py

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
import abc
2424
import base64
25-
import collections
25+
2626
# noinspection PyCompatibility
2727
import concurrent.futures
2828
import copy
@@ -98,19 +98,6 @@ class _MemorizedSSH(abc.ABCMeta):
9898

9999
__cache = {} # type: typing.Dict[typing.Tuple[str, int], SSHClientBase]
100100

101-
@classmethod
102-
def __prepare__(
103-
mcs, # type: typing.Type[_MemorizedSSH]
104-
name, # type: str
105-
bases, # type: typing.Iterable[typing.Type]
106-
**kwargs # type: typing.Any
107-
): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument
108-
"""Metaclass magic for object storage.
109-
110-
.. versionadded:: 1.2.0
111-
"""
112-
return collections.OrderedDict() # pragma: no cover
113-
114101
def __call__( # type: ignore
115102
cls, # type: _MemorizedSSH
116103
host, # type: str
@@ -650,7 +637,7 @@ def poll_streams(): # type: () -> None
650637
if stderr and interface.recv_stderr_ready():
651638
result.read_stderr(src=stderr, log=self.logger, verbose=verbose)
652639

653-
@threaded.threadpooled # type: ignore
640+
@threaded.threadpooled
654641
def poll_pipes(stop,): # type: (threading.Event) -> None
655642
"""Polling task for FIFO buffers.
656643
@@ -807,7 +794,7 @@ def execute_together(
807794
.. versionchanged:: 1.2.0 default timeout 1 hour
808795
.. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd
809796
"""
810-
@threaded.threadpooled # type: ignore
797+
@threaded.threadpooled
811798
def get_result(remote): # type: (SSHClientBase) -> exec_result.ExecResult
812799
"""Get result from remote call."""
813800
async_result = remote.execute_async(command, **kwargs) # type: SshExecuteAsyncResult

exec_helpers/proc_enums.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ def exit_code_to_enum(code): # type: (typing.Union[int, ExitCodes]) -> typing.U
155155
"""Convert exit code to enum if possible."""
156156
if isinstance(code, int) and code in ExitCodes.__members__.values():
157157
return ExitCodes(code)
158-
return code
158+
return code # pragma: no cover
159159

160160

161161
def exit_codes_to_enums(

exec_helpers/ssh_auth.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,13 @@ def __ne__(self, other): # type: (typing.Any) -> bool
209209

210210
def __deepcopy__(self, memo): # type: (typing.Any) -> SSHAuth
211211
"""Helper for copy.deepcopy."""
212-
return self.__class__( # type: ignore
212+
return self.__class__(
213213
username=self.username, password=self.__password, key=self.__key, keys=copy.deepcopy(self.__keys)
214214
)
215215

216216
def __copy__(self): # type: () -> SSHAuth
217217
"""Copy self."""
218-
return self.__class__( # type: ignore
219-
username=self.username, password=self.__password, key=self.__key, keys=self.__keys
220-
)
218+
return self.__class__(username=self.username, password=self.__password, key=self.__key, keys=self.__keys)
221219

222220
def __repr__(self): # type: (...) -> str
223221
"""Representation for debug purposes."""

exec_helpers/subprocess_runner.py

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,19 @@
2121
from __future__ import unicode_literals
2222

2323
import abc
24-
import collections
24+
2525
# noinspection PyCompatibility
2626
import concurrent.futures
2727
import errno
2828
import logging
2929
import os
30+
import platform
3031
import subprocess # nosec # Expected usage
3132
import threading
3233
import typing # noqa: F401 # pylint: disable=unused-import
3334

3435
import six
36+
import psutil # type: ignore
3537
import threaded
3638

3739
from exec_helpers import api
@@ -44,6 +46,39 @@
4446
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+
4547

4648

49+
# Adopt from:
50+
# https://stackoverflow.com/questions/1230669/subprocess-deleting-child-processes-in-windows
51+
def kill_proc_tree(pid, including_parent=True): # type: (int, bool) -> None # pragma: no cover
52+
"""Kill process tree.
53+
54+
:param pid: PID of parent process to kill
55+
:type pid: int
56+
:param including_parent: kill also parent process
57+
:type including_parent: bool
58+
"""
59+
parent = psutil.Process(pid)
60+
children = parent.children(recursive=True)
61+
for child in children: # type: psutil.Process
62+
child.kill()
63+
_, alive = psutil.wait_procs(children, timeout=5)
64+
for proc in alive: # type: psutil.Process
65+
proc.kill() # 2nd shot
66+
if including_parent:
67+
parent.kill()
68+
parent.wait(5)
69+
70+
71+
# Subprocess extra arguments.
72+
# Flags from:
73+
# https://stackoverflow.com/questions/13243807/popen-waiting-for-child-process-even-when-the-immediate-child-has-terminated
74+
kw = {} # type: typing.Dict[str, typing.Any]
75+
if "Windows" == platform.system(): # pragma: no cover
76+
kw["creationflags"] = 0x00000200
77+
else: # pragma: no cover
78+
kw["preexec_fn"] = os.setsid
79+
80+
81+
# noinspection PyTypeHints
4782
class SubprocessExecuteAsyncResult(api.ExecuteAsyncResult):
4883
"""Override original NamedTuple with proper typing."""
4984

@@ -70,19 +105,6 @@ def __call__(cls, *args, **kwargs): # type: (SingletonMeta, typing.Any, typing.
70105
cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
71106
return cls._instances[cls]
72107

73-
@classmethod
74-
def __prepare__(
75-
mcs,
76-
name, # type: str
77-
bases, # type: typing.Iterable[typing.Type]
78-
**kwargs # type: typing.Any
79-
): # type: (...) -> collections.OrderedDict # pylint: disable=unused-argument
80-
"""Metaclass magic for object storage.
81-
82-
.. versionadded:: 1.2.0
83-
"""
84-
return collections.OrderedDict() # pragma: no cover
85-
86108

87109
class Subprocess(six.with_metaclass(SingletonMeta, api.ExecHelper)):
88110
"""Subprocess helper with timeouts and lock-free FIFO."""
@@ -138,17 +160,24 @@ def _exec_command(
138160
139161
.. versionadded:: 1.2.0
140162
"""
141-
@threaded.threadpooled # type: ignore
163+
@threaded.threadpooled
142164
def poll_stdout(): # type: () -> None
143165
"""Sync stdout poll."""
144166
result.read_stdout(src=stdout, log=logger, verbose=verbose)
145167
interface.wait() # wait for the end of execution
146168

147-
@threaded.threadpooled # type: ignore
169+
@threaded.threadpooled
148170
def poll_stderr(): # type: () -> None
149171
"""Sync stderr poll."""
150172
result.read_stderr(src=stderr, log=logger, verbose=verbose)
151173

174+
def close_streams(): # type: () -> None
175+
"""Enforce FIFO closure."""
176+
if stdout is not None and not stdout.closed:
177+
stdout.close()
178+
if stderr is not None and not stderr.closed:
179+
stderr.close()
180+
152181
# Store command with hidden data
153182
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)
154183

@@ -167,11 +196,13 @@ def poll_stderr(): # type: () -> None
167196
# Process closed?
168197
if exit_code is not None:
169198
result.exit_code = exit_code
199+
close_streams()
170200
return result
171201
# Kill not ended process and wait for close
172202
try:
203+
kill_proc_tree(interface.pid, including_parent=False) # kill -9 for all subprocesses
173204
interface.kill() # kill -9
174-
concurrent.futures.wait([stdout_future, stderr_future], timeout=5)
205+
concurrent.futures.wait([stdout_future, stderr_future], timeout=0.5)
175206
# Force stop cycle if no exit code after kill
176207
stdout_future.cancel()
177208
stderr_future.cancel()
@@ -182,6 +213,8 @@ def poll_stderr(): # type: () -> None
182213
result.exit_code = exit_code
183214
return result
184215
raise # Some other error
216+
finally:
217+
close_streams()
185218

186219
wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
187220
logger.debug(wait_err_msg)
@@ -244,6 +277,7 @@ def execute_async(
244277
cwd=kwargs.get("cwd", None),
245278
env=kwargs.get("env", None),
246279
universal_newlines=False,
280+
**kw
247281
)
248282

249283
if stdin is None:
@@ -264,6 +298,7 @@ def execute_async(
264298
elif exc.errno in (errno.EPIPE, errno.ESHUTDOWN): # pragma: no cover
265299
self.logger.warning("STDIN Send failed: broken PIPE")
266300
else:
301+
kill_proc_tree(process.pid)
267302
process.kill()
268303
raise
269304
try:

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ advanced-descriptors>=1.0 # Apache-2.0
77
typing >= 3.6 # PSF
88
futures>=3.1; python_version == "2.7"
99
enum34>=1.1; python_version == "2.7"
10+
psutil >= 5.0

0 commit comments

Comments
 (0)