|
7 | 7 | # Distributed under the terms of the Modified BSD License. |
8 | 8 | from __future__ import print_function |
9 | 9 |
|
| 10 | +import inspect |
10 | 11 | import json |
11 | 12 | import os |
12 | 13 | import sys |
|
20 | 21 | from jupyter_client.jsonutil import parse_date |
21 | 22 | from jupyter_client.session import Session |
22 | 23 | from tornado import ioloop |
23 | | -from tornado.gen import coroutine |
24 | | -from tornado.gen import maybe_future |
25 | 24 | from traitlets import Any |
26 | 25 | from traitlets import Bytes |
27 | 26 | from traitlets import Dict |
@@ -319,8 +318,7 @@ def dispatch_monitor_traffic(self, msg): |
319 | 318 | self.log.error("Unrecognized monitor topic: %r", switch) |
320 | 319 |
|
321 | 320 | @util.log_errors |
322 | | - @coroutine |
323 | | - def dispatch_query(self, msg): |
| 321 | + async def dispatch_query(self, msg): |
324 | 322 | """Route registration requests and queries from clients.""" |
325 | 323 | try: |
326 | 324 | idents, msg = self.session.feed_identities(msg) |
@@ -357,8 +355,8 @@ def dispatch_query(self, msg): |
357 | 355 |
|
358 | 356 | try: |
359 | 357 | f = handler(idents, msg) |
360 | | - if f: |
361 | | - yield maybe_future(f) |
| 358 | + if f and inspect.isawaitable(f): |
| 359 | + await f |
362 | 360 | except Exception: |
363 | 361 | content = error.wrap_exception() |
364 | 362 | self.log.error("Error handling request: %r", msg_type, exc_info=True) |
@@ -1450,16 +1448,15 @@ def db_query(self, client_id, msg): |
1450 | 1448 | buffers=buffers, |
1451 | 1449 | ) |
1452 | 1450 |
|
1453 | | - @coroutine |
1454 | | - def become_dask(self, client_id, msg): |
| 1451 | + async def become_dask(self, client_id, msg): |
1455 | 1452 | """Start a dask.distributed Scheduler.""" |
1456 | 1453 | if self.distributed_scheduler is None: |
1457 | 1454 | kwargs = msg['content'].get('scheduler_args', {}) |
1458 | 1455 | self.log.info("Becoming dask.distributed scheduler: %s", kwargs) |
1459 | 1456 | from distributed import Scheduler |
1460 | 1457 |
|
1461 | 1458 | self.distributed_scheduler = scheduler = Scheduler(**kwargs) |
1462 | | - yield scheduler.start() |
| 1459 | + await scheduler.start() |
1463 | 1460 | content = { |
1464 | 1461 | 'status': 'ok', |
1465 | 1462 | 'ip': self.distributed_scheduler.ip, |
|
0 commit comments