Skip to content

Commit 12b18d1

Browse files
authored
Fix: Race condition on process kill: sometimes FIFO are still open (#107)
Log critical if readers are still running. Allow exception on stream close (do not handle) to protect from leak. Primary use SIGTERM for graceful shutdown, then retry with SIGKILL
1 parent c3d60ed commit 12b18d1

File tree

4 files changed

+37
-12
lines changed

4 files changed

+37
-12
lines changed

exec_helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
"ExecResult",
5252
)
5353

54-
__version__ = "1.9.4"
54+
__version__ = "1.9.5"
5555
__author__ = "Alexey Stepanov"
5656
__author_email__ = "penguinolog@gmail.com"
5757
__maintainers__ = {

exec_helpers/_subprocess_helpers.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,27 @@ def kill_proc_tree(pid, including_parent=True): # type:(int, bool) -> None # p
4040
:param including_parent: kill also parent process
4141
:type including_parent: bool
4242
"""
43+
def safe_stop(proc, kill=False): # type: (psutil.Process, bool) -> None
44+
"""Do not crash on already stopped process."""
45+
try:
46+
if kill:
47+
proc.kill()
48+
proc.terminate()
49+
except psutil.NoSuchProcess:
50+
pass
51+
4352
parent = psutil.Process(pid)
4453
children = parent.children(recursive=True)
4554
for child in children: # type: psutil.Process
46-
child.kill()
55+
safe_stop(child) # SIGTERM to allow cleanup
4756
_, alive = psutil.wait_procs(children, timeout=5)
48-
for proc in alive: # type: psutil.Process
49-
proc.kill() # 2nd shot
57+
for child in alive: # type: psutil.Process
58+
safe_stop(child, kill=True) # 2nd shot: SIGKILL
5059
if including_parent:
51-
parent.kill()
60+
safe_stop(parent) # SIGTERM to allow cleanup
61+
_, alive = psutil.wait_procs((parent,), timeout=5)
62+
if alive:
63+
safe_stop(parent, kill=True) # 2nd shot: SIGKILL
5264
parent.wait(5)
5365

5466

exec_helpers/subprocess_runner.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,13 +104,13 @@ def _exec_command( # type: ignore
104104
@threaded.threadpooled
105105
def poll_stdout(): # type: () -> None
106106
"""Sync stdout poll."""
107-
result.read_stdout(src=async_result.stdout, log=logger, verbose=verbose)
107+
result.read_stdout(src=async_result.stdout, log=self.logger, verbose=verbose)
108108
async_result.interface.wait() # wait for the end of execution
109109

110110
@threaded.threadpooled
111111
def poll_stderr(): # type: () -> None
112112
"""Sync stderr poll."""
113-
result.read_stderr(src=async_result.stderr, log=logger, verbose=verbose)
113+
result.read_stderr(src=async_result.stderr, log=self.logger, verbose=verbose)
114114

115115
def close_streams(): # type: () -> None
116116
"""Enforce FIFO closure."""
@@ -142,24 +142,34 @@ def close_streams(): # type: () -> None
142142
# Kill not ended process and wait for close
143143
try:
144144
# kill -9 for all subprocesses
145-
_subprocess_helpers.kill_proc_tree(async_result.interface.pid, including_parent=False)
145+
_subprocess_helpers.kill_proc_tree(async_result.interface.pid)
146146
async_result.interface.kill() # kill -9
147147
# Force stop cycle if no exit code after kill
148148
except OSError:
149149
exit_code = async_result.interface.poll()
150150
if exit_code is not None: # Nothing to kill
151-
logger.warning("{!s} has been completed just after timeout: please validate timeout.".format(command))
151+
self.logger.warning(
152+
"{!s} has been completed just after timeout: please validate timeout.".format(command))
152153
concurrent.futures.wait([stdout_future, stderr_future], timeout=0.1)
153154
result.exit_code = exit_code
154155
return result
155156
raise # Some other error
156157
finally:
157158
stdout_future.cancel()
158159
stderr_future.cancel()
160+
_, not_done = concurrent.futures.wait([stdout_future, stderr_future], timeout=5)
161+
if not_done:
162+
exit_code = async_result.interface.poll()
163+
if exit_code:
164+
self.logger.critical(
165+
"Process {!s} was closed with exit code {!s}, but FIFO buffers are still open".format(
166+
command, exit_code
167+
)
168+
)
159169
close_streams()
160170

161171
wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
162-
logger.debug(wait_err_msg)
172+
self.logger.debug(wait_err_msg)
163173
raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout)
164174

165175
def execute_async(

test/test_subprocess_special.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,13 @@ def read_stream(stream): # type: (FakeFileStream) -> typing.Tuple[bytes]
7878
"positive_verbose": dict(stdout=(b" \n", b"2\n", b"3\n", b" \n"), verbose=True),
7979
"no_stdout": dict(),
8080
"IOError_on_stdout_read": dict(stdout=(b" \n", b"2\n", IOError())),
81-
"TimeoutError": dict(wait=(None,), poll=(None,), stdout=(), expect_exc=exec_helpers.ExecHelperTimeoutError),
81+
"TimeoutError": dict(
82+
wait=(None,), poll=(None, None, None,), stdout=(), expect_exc=exec_helpers.ExecHelperTimeoutError),
8283
"TimeoutError_closed": dict(
8384
wait=(None,), poll=(None, 0), stdout=(b" \n", b"2\n", b"3\n", b" \n"), kill=(OSError(),)
8485
),
85-
"TimeoutError_no_kill": dict(wait=(None,), poll=(None, None), stdout=(), kill=(OSError(),), expect_exc=OSError),
86+
"TimeoutError_no_kill": dict(
87+
wait=(None,), poll=(None, None, None,), stdout=(), kill=(OSError(),), expect_exc=OSError),
8688
"stdin_closed_PIPE_windows": dict(stdout=(b" \n", b"2\n", b"3\n", b" \n"), stdin="Warning", write=einval_exc),
8789
"stdin_broken_PIPE": dict(stdout=(b" \n", b"2\n", b"3\n", b" \n"), stdin="Warning", write=epipe_exc),
8890
"stdin_closed_PIPE": dict(stdout=(b" \n", b"2\n", b"3\n", b" \n"), stdin="Warning", write=eshutdown_exc),
@@ -152,6 +154,7 @@ def exec_result(run_parameters):
152154
@pytest.fixture
153155
def create_subprocess_shell(mocker, run_parameters):
154156
mocker.patch("psutil.Process")
157+
mocker.patch("psutil.wait_procs", return_value=([], []))
155158

156159
def create_mock(
157160
stdout=None, # type: typing.Optional[typing.Tuple]

0 commit comments

Comments
 (0)