|
4 | 4 | import asyncio |
5 | 5 | import atexit |
6 | 6 | import time |
| 7 | +from concurrent.futures import Future |
7 | 8 | from threading import Event, Thread |
8 | 9 | from typing import Any, Dict, List, Optional |
9 | 10 |
|
10 | 11 | import zmq |
11 | 12 | from tornado.ioloop import IOLoop |
12 | 13 | from traitlets import Instance, Type |
| 14 | +from traitlets.log import get_logger |
13 | 15 | from zmq.eventloop import zmqstream |
14 | 16 |
|
15 | 17 | from .channels import HBChannel |
@@ -45,7 +47,7 @@ def __init__( |
45 | 47 | session : :class:`session.Session` |
46 | 48 | The session to use. |
47 | 49 | loop |
48 | | - A pyzmq ioloop to connect the socket to using a ZMQStream |
| 50 | + A tornado ioloop to connect the socket to using a ZMQStream |
49 | 51 | """ |
50 | 52 | super().__init__() |
51 | 53 |
|
@@ -79,7 +81,30 @@ def stop(self) -> None: |
79 | 81 | self._is_alive = False |
80 | 82 |
|
81 | 83 | def close(self) -> None: |
82 | | - """ "Close the channel.""" |
| 84 | + """Close the channel.""" |
| 85 | + if self.stream is not None and self.ioloop is not None: |
| 86 | + # c.f.Future for threadsafe results |
| 87 | + f: Future = Future() |
| 88 | + |
| 89 | + def close_stream(): |
| 90 | + try: |
| 91 | + if self.stream is not None: |
| 92 | + self.stream.close(linger=0) |
| 93 | + self.stream = None |
| 94 | + except Exception as e: |
| 95 | + f.set_exception(e) |
| 96 | + else: |
| 97 | + f.set_result(None) |
| 98 | + |
| 99 | + self.ioloop.add_callback(close_stream) |
| 100 | + # wait for result |
| 101 | + try: |
| 102 | + f.result(timeout=5) |
| 103 | + except Exception as e: |
| 104 | + log = get_logger() |
| 105 | + msg = f"Error closing stream {self.stream}: {e}" |
| 106 | + log.warning(msg, RuntimeWarning, stacklevel=2) |
| 107 | + |
83 | 108 | if self.socket is not None: |
84 | 109 | try: |
85 | 110 | self.socket.close(linger=0) |
|
0 commit comments