|
5 | 5 | import atexit |
6 | 6 | import time |
7 | 7 | from concurrent.futures import Future |
8 | | -from threading import Event, Thread |
| 8 | +from functools import partial |
| 9 | +from threading import Thread |
9 | 10 | from typing import Any, Dict, List, Optional |
10 | 11 |
|
11 | 12 | import zmq |
@@ -54,17 +55,22 @@ def __init__( |
54 | 55 | self.socket = socket |
55 | 56 | self.session = session |
56 | 57 | self.ioloop = loop |
57 | | - evt = Event() |
| 58 | + f: Future = Future() |
58 | 59 |
|
59 | 60 | def setup_stream(): |
60 | | - assert self.socket is not None |
61 | | - self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) |
62 | | - self.stream.on_recv(self._handle_recv) |
63 | | - evt.set() |
| 61 | + try: |
| 62 | + assert self.socket is not None |
| 63 | + self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) |
| 64 | + self.stream.on_recv(self._handle_recv) |
| 65 | + except Exception as e: |
| 66 | + f.set_exception(e) |
| 67 | + else: |
| 68 | + f.set_result(None) |
64 | 69 |
|
65 | 70 | assert self.ioloop is not None |
66 | 71 | self.ioloop.add_callback(setup_stream) |
67 | | - evt.wait() |
| 72 | + # don't wait forever, raise any errors |
| 73 | + f.result(timeout=10) |
68 | 74 |
|
69 | 75 | _is_alive = False |
70 | 76 |
|
@@ -179,13 +185,31 @@ def flush(self, timeout: float = 1.0) -> None: |
179 | 185 | """ |
180 | 186 | # We do the IOLoop callback process twice to ensure that the IOLoop |
181 | 187 | # gets to perform at least one full poll. |
182 | | - stop_time = time.time() + timeout |
| 188 | + stop_time = time.monotonic() + timeout |
183 | 189 | assert self.ioloop is not None |
| 190 | + if self.stream is None or self.stream.closed(): |
| 191 | + # don't bother scheduling flush on a thread if we're closed |
| 192 | + _msg = "Attempt to flush closed stream" |
| 193 | + raise OSError(_msg) |
| 194 | + |
| 195 | + def flush(f): |
| 196 | + try: |
| 197 | + self._flush() |
| 198 | + except Exception as e: |
| 199 | + f.set_exception(e) |
| 200 | + else: |
| 201 | + f.set_result(None) |
| 202 | + |
184 | 203 | for _ in range(2): |
185 | | - self._flushed = False |
186 | | - self.ioloop.add_callback(self._flush) |
187 | | - while not self._flushed and time.time() < stop_time: |
188 | | - time.sleep(0.01) |
| 204 | + f: Future = Future() |
| 205 | + self.ioloop.add_callback(partial(flush, f)) |
| 206 | + # wait for async flush, re-raise any errors |
| 207 | + timeout = max(stop_time - time.monotonic(), 0) |
| 208 | + try: |
| 209 | + f.result(max(stop_time - time.monotonic(), 0)) |
| 210 | + except TimeoutError: |
| 211 | + # flush with a timeout means stop waiting, not raise |
| 212 | + return |
189 | 213 |
|
190 | 214 | def _flush(self) -> None: |
191 | 215 | """Callback for :method:`self.flush`.""" |
@@ -219,24 +243,32 @@ def start(self) -> None: |
219 | 243 | Don't return until self.ioloop is defined, |
220 | 244 | which is created in the thread |
221 | 245 | """ |
222 | | - self._start_event = Event() |
| 246 | + self._start_future: Future = Future() |
223 | 247 | Thread.start(self) |
224 | | - self._start_event.wait() |
| 248 | + # wait for start, re-raise any errors |
| 249 | + self._start_future.result(timeout=10) |
225 | 250 |
|
226 | 251 | def run(self) -> None: |
227 | 252 | """Run my loop, ignoring EINTR events in the poller""" |
228 | | - loop = asyncio.new_event_loop() |
229 | | - asyncio.set_event_loop(loop) |
| 253 | + try: |
| 254 | + loop = asyncio.new_event_loop() |
| 255 | + asyncio.set_event_loop(loop) |
| 256 | + |
| 257 | + async def assign_ioloop(): |
| 258 | + self.ioloop = IOLoop.current() |
| 259 | + |
| 260 | + loop.run_until_complete(assign_ioloop()) |
| 261 | + except Exception as e: |
| 262 | + self._start_future.set_exception(e) |
| 263 | + else: |
| 264 | + self._start_future.set_result(None) |
| 265 | + |
230 | 266 | loop.run_until_complete(self._async_run()) |
231 | 267 |
|
232 | 268 | async def _async_run(self): |
233 | | - self.ioloop = IOLoop.current() |
234 | | - # signal that self.ioloop is defined |
235 | | - self._start_event.set() |
236 | | - while True: |
| 269 | + """Run forever (until self._exiting is set)""" |
| 270 | + while not self._exiting: |
237 | 271 | await asyncio.sleep(1) |
238 | | - if self._exiting: |
239 | | - break |
240 | 272 |
|
241 | 273 | def stop(self) -> None: |
242 | 274 | """Stop the channel's event loop and join its thread. |
|
0 commit comments