Skip to content

Commit f98d3b4

Browse files
authored
Subprocess: increase number of threads and return to blocking pipes. (#61)
Found several use-cases, when non-blocking pipes can cause data loose. In tests not checking mised stdout/stderr because of expected race condition during log output. Signed-off-by: Alexey Stepanov <penguinolog@gmail.com>
1 parent 05578d8 commit f98d3b4

File tree

5 files changed

+94
-351
lines changed

5 files changed

+94
-351
lines changed

exec_helpers/exec_result.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,10 +185,7 @@ def __poll_stream(
185185
if log:
186186
log.log(
187187
level=logging.INFO if verbose else logging.DEBUG,
188-
msg=line.decode(
189-
'utf-8',
190-
errors='backslashreplace'
191-
).rstrip()
188+
msg=line.decode('utf-8', errors='backslashreplace').rstrip()
192189
)
193190
except IOError:
194191
pass

exec_helpers/subprocess_runner.py

Lines changed: 37 additions & 177 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,8 @@
2626
import errno
2727
import logging
2828
import os
29-
import select
30-
import sys
3129
import subprocess # nosec # Expected usage
3230
import threading
33-
import time
3431
import typing # noqa # pylint: disable=unused-import
3532

3633
import six
@@ -45,19 +42,6 @@
4542
# noinspection PyUnresolvedReferences
4643
devnull = open(os.devnull) # subprocess.DEVNULL is py3.3+
4744

48-
_win = sys.platform == "win32" # type: bool
49-
_posix = 'posix' in sys.builtin_module_names # type: bool
50-
51-
if _posix: # pragma: no cover
52-
import fcntl # pylint: disable=import-error
53-
54-
elif _win: # pragma: no cover
55-
import ctypes
56-
from ctypes import wintypes # pylint: disable=import-error
57-
from ctypes import windll # pylint: disable=import-error
58-
# noinspection PyUnresolvedReferences
59-
import msvcrt # pylint: disable=import-error
60-
6145

6246
class SingletonMeta(type):
6347
"""Metaclass for Singleton.
@@ -72,9 +56,8 @@ def __call__(cls, *args, **kwargs):
7256
"""Singleton."""
7357
with cls._lock:
7458
if cls not in cls._instances:
75-
cls._instances[cls] = super(
76-
SingletonMeta, cls
77-
).__call__(*args, **kwargs)
59+
# noinspection PySuperArguments
60+
cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
7861
return cls._instances[cls]
7962

8063
@classmethod
@@ -91,77 +74,6 @@ def __prepare__(
9174
return collections.OrderedDict() # pragma: no cover
9275

9376

94-
def set_nonblocking_pipe(pipe): # type: (typing.Any) -> None
95-
"""Set PIPE unblocked to allow polling of all pipes in parallel."""
96-
if pipe is None: # pragma: no cover
97-
return
98-
99-
descriptor = pipe.fileno() # pragma: no cover
100-
101-
if _posix: # pragma: no cover
102-
# Get flags
103-
flags = fcntl.fcntl(descriptor, fcntl.F_GETFL)
104-
105-
# Set nonblock mode
106-
fcntl.fcntl(descriptor, fcntl.F_SETFL, flags | os.O_NONBLOCK)
107-
108-
elif _win: # pragma: no cover
109-
# noinspection PyPep8Naming
110-
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
111-
SetNamedPipeHandleState.argtypes = [
112-
wintypes.HANDLE, # hNamedPipe
113-
wintypes.LPDWORD, # lpMode
114-
wintypes.LPDWORD, # lpMaxCollectionCount
115-
wintypes.LPDWORD, # lpCollectDataTimeout
116-
]
117-
SetNamedPipeHandleState.restype = wintypes.BOOL
118-
# noinspection PyPep8Naming
119-
PIPE_NOWAIT = wintypes.DWORD(0x00000001)
120-
handle = msvcrt.get_osfhandle(descriptor)
121-
122-
windll.kernel32.SetNamedPipeHandleState(
123-
handle,
124-
ctypes.byref(PIPE_NOWAIT), None, None
125-
)
126-
127-
128-
def set_blocking_pipe(pipe): # type: (typing.Any) -> None
129-
"""Set pipe blocking mode for final read on process close.
130-
131-
This will allow to read pipe until closed on remote side.
132-
"""
133-
if pipe is None: # pragma: no cover
134-
return
135-
136-
descriptor = pipe.fileno() # pragma: no cover
137-
138-
if _posix: # pragma: no cover
139-
# Get flags
140-
flags = fcntl.fcntl(descriptor, fcntl.F_GETFL)
141-
142-
# Set block mode
143-
fcntl.fcntl(descriptor, fcntl.F_SETFL, flags & (flags ^ os.O_NONBLOCK))
144-
145-
elif _win: # pragma: no cover
146-
# noinspection PyPep8Naming
147-
SetNamedPipeHandleState = windll.kernel32.SetNamedPipeHandleState
148-
SetNamedPipeHandleState.argtypes = [
149-
wintypes.HANDLE, # hNamedPipe
150-
wintypes.LPDWORD, # lpMode
151-
wintypes.LPDWORD, # lpMaxCollectionCount
152-
wintypes.LPDWORD, # lpCollectDataTimeout
153-
]
154-
SetNamedPipeHandleState.restype = wintypes.BOOL
155-
# noinspection PyPep8Naming
156-
PIPE_WAIT = wintypes.DWORD(0x00000000)
157-
handle = msvcrt.get_osfhandle(descriptor)
158-
159-
windll.kernel32.SetNamedPipeHandleState(
160-
handle,
161-
ctypes.byref(PIPE_WAIT), None, None
162-
)
163-
164-
16577
class Subprocess(six.with_metaclass(SingletonMeta, api.ExecHelper)):
16678
"""Subprocess helper with timeouts and lock-free FIFO."""
16779

@@ -179,10 +91,7 @@ def __init__(
17991
18092
.. versionchanged:: 1.2.0 log_mask_re regex rule for masking cmd
18193
"""
182-
super(Subprocess, self).__init__(
183-
logger=logger,
184-
log_mask_re=log_mask_re
185-
)
94+
super(Subprocess, self).__init__(logger=logger, log_mask_re=log_mask_re)
18695
self.__process = None
18796

18897
def _exec_command(
@@ -218,98 +127,57 @@ def _exec_command(
218127
219128
.. versionadded:: 1.2.0
220129
"""
221-
def poll_streams():
222-
"""Poll streams to the result object."""
223-
if _win: # pragma: no cover
224-
# select.select is not supported on windows
225-
result.read_stdout(src=stdout, log=logger, verbose=verbose)
226-
result.read_stderr(src=stderr, log=logger, verbose=verbose)
227-
else: # pragma: no cover
228-
rlist, _, _ = select.select(
229-
[item for item in (stdout, stderr) if item is not None],
230-
[],
231-
[])
232-
if rlist:
233-
if stdout in rlist:
234-
result.read_stdout(
235-
src=stdout,
236-
log=logger,
237-
verbose=verbose
238-
)
239-
if stderr in rlist:
240-
result.read_stderr(
241-
src=stderr,
242-
log=logger,
243-
verbose=verbose
244-
)
130+
@threaded.threadpooled
131+
def poll_stdout():
132+
"""Sync stdout poll."""
133+
result.read_stdout(
134+
src=stdout,
135+
log=logger,
136+
verbose=verbose
137+
)
245138

246139
@threaded.threadpooled
247-
def poll_pipes(stop, ): # type: (threading.Event) -> None
248-
"""Polling task for FIFO buffers.
249-
250-
:type stop: Event
251-
"""
252-
while not stop.is_set():
253-
time.sleep(0.1)
254-
if stdout or stderr:
255-
poll_streams()
256-
257-
interface.poll()
258-
259-
if interface.returncode is not None:
260-
set_blocking_pipe(stdout)
261-
set_blocking_pipe(stderr)
262-
result.read_stdout(
263-
src=stdout,
264-
log=logger,
265-
verbose=verbose
266-
)
267-
result.read_stderr(
268-
src=stderr,
269-
log=logger,
270-
verbose=verbose
271-
)
272-
result.exit_code = interface.returncode
273-
274-
stop.set()
140+
def poll_stderr():
141+
"""Sync stderr poll."""
142+
result.read_stderr(
143+
src=stderr,
144+
log=logger,
145+
verbose=verbose
146+
)
275147

276148
# Store command with hidden data
277-
cmd_for_log = self._mask_command(
278-
cmd=command,
279-
log_mask_re=log_mask_re
280-
)
149+
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)
281150

282151
result = exec_result.ExecResult(cmd=cmd_for_log)
283-
stop_event = threading.Event()
284152

285153
# pylint: disable=assignment-from-no-return
286-
future = poll_pipes(stop_event) # type: concurrent.futures.Future
154+
stdout_future = poll_stdout() # type: concurrent.futures.Future
155+
stderr_future = poll_stderr() # type: concurrent.futures.Future
287156
# pylint: enable=assignment-from-no-return
288-
# wait for process close
289157

290-
concurrent.futures.wait([future], timeout)
158+
concurrent.futures.wait([stdout_future, stderr_future], timeout=timeout) # Wait real timeout here
159+
exit_code = interface.poll() # Update exit code
291160

292161
# Process closed?
293-
if stop_event.is_set():
162+
if exit_code is not None:
163+
result.exit_code = exit_code
294164
return result
295165
# Kill not ended process and wait for close
296166
try:
297167
interface.kill() # kill -9
298-
stop_event.wait(5)
168+
concurrent.futures.wait([stdout_future, stderr_future], timeout=5)
299169
# Force stop cycle if no exit code after kill
300-
stop_event.set()
301-
future.cancel()
170+
stdout_future.cancel()
171+
stderr_future.cancel()
302172
except OSError:
303-
# Nothing to kill
304-
logger.warning(
305-
u"{!s} has been completed just after timeout: "
306-
"please validate timeout.".format(command))
307-
return result
308-
309-
wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(
310-
result=result,
311-
timeout=timeout
312-
)
173+
exit_code = interface.poll()
174+
if exit_code is not None: # Nothing to kill
175+
logger.warning("{!s} has been completed just after timeout: please validate timeout.".format(command))
176+
result.exit_code = exit_code
177+
return result
178+
raise # Some other error
179+
180+
wait_err_msg = _log_templates.CMD_WAIT_ERROR.format(result=result, timeout=timeout)
313181
logger.debug(wait_err_msg)
314182
raise exceptions.ExecHelperTimeoutError(result=result, timeout=timeout)
315183

@@ -347,10 +215,7 @@ def execute_async(
347215
348216
.. versionadded:: 1.2.0
349217
"""
350-
cmd_for_log = self._mask_command(
351-
cmd=command,
352-
log_mask_re=log_mask_re
353-
)
218+
cmd_for_log = self._mask_command(cmd=command, log_mask_re=log_mask_re)
354219

355220
self.logger.log(
356221
level=logging.INFO if verbose else logging.DEBUG,
@@ -395,9 +260,4 @@ def execute_async(
395260
process.kill()
396261
raise
397262

398-
if open_stdout:
399-
set_nonblocking_pipe(process.stdout)
400-
if open_stderr:
401-
set_nonblocking_pipe(process.stderr)
402-
403263
return process, None, process.stderr, process.stdout

exec_helpers/subprocess_runner.pyi

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ from exec_helpers import exec_result, api
99
logger: logging.Logger
1010
devnull: typing.IO
1111

12-
_win: bool
13-
_posix: bool
14-
1512
class SingletonMeta(type):
1613
_instances: typing.Dict[typing.Type, typing.Any] = ...
1714
_lock: threading.RLock = ...
@@ -26,12 +23,6 @@ class SingletonMeta(type):
2623
**kwargs: typing.Dict
2724
) -> collections.OrderedDict: ...
2825

29-
30-
def set_nonblocking_pipe(pipe: typing.Any) -> None: ...
31-
32-
def set_blocking_pipe(pipe: typing.Any) -> None: ...
33-
34-
3526
class Subprocess(api.ExecHelper, metaclass=SingletonMeta):
3627
def __init__(self, log_mask_re: typing.Optional[str] = ...) -> None: ...
3728

0 commit comments

Comments
 (0)