|
8 | 8 | from threading import Event |
9 | 9 | from threading import Thread |
10 | 10 | from typing import Any |
11 | | -from typing import Awaitable |
12 | 11 | from typing import Dict |
13 | 12 | from typing import List |
14 | 13 | from typing import Optional |
15 | | -from typing import Union |
16 | 14 |
|
17 | 15 | import zmq |
18 | 16 | from traitlets import Instance |
|
30 | 28 | # during garbage collection of threads at exit |
31 | 29 |
|
32 | 30 |
|
33 | | -async def get_msg(msg: Awaitable) -> Union[List[bytes], List[zmq.Message]]: |
34 | | - return await msg |
35 | | - |
36 | | - |
37 | 31 | class ThreadedZMQSocketChannel(object): |
38 | 32 | """A ZMQ socket invoking a callback in the ioloop""" |
39 | 33 |
|
@@ -68,6 +62,7 @@ def __init__( |
68 | 62 | evt = Event() |
69 | 63 |
|
70 | 64 | def setup_stream(): |
| 65 | + assert self.socket is not None |
71 | 66 | self.stream = zmqstream.ZMQStream(self.socket, self.ioloop) |
72 | 67 | self.stream.on_recv(self._handle_recv) |
73 | 68 | evt.set() |
@@ -113,13 +108,11 @@ def thread_send(): |
113 | 108 | assert self.ioloop is not None |
114 | 109 | self.ioloop.add_callback(thread_send) |
115 | 110 |
|
116 | | - def _handle_recv(self, future_msg: Awaitable) -> None: |
| 111 | + def _handle_recv(self, msg_list: List[bytes]) -> None: |
117 | 112 | """Callback for stream.on_recv. |
118 | 113 |
|
119 | 114 | Unpacks message, and calls handlers with it. |
120 | 115 | """ |
121 | | - assert self.ioloop is not None |
122 | | - msg_list = self.ioloop._asyncio_event_loop.run_until_complete(get_msg(future_msg)) |
123 | 116 | assert self.session is not None |
124 | 117 | ident, smsg = self.session.feed_identities(msg_list) |
125 | 118 | msg = self.session.deserialize(smsg) |
|
0 commit comments