11from inspect import isawaitable
2-
32from asyncio import ensure_future , wait , shield
43from websockets import ConnectionClosed
54from graphql .execution .executors .asyncio import AsyncioExecutor
65
7- from .base import ConnectionClosedException , BaseConnectionContext , BaseSubscriptionServer
6+ from .base import (
7+ ConnectionClosedException , BaseConnectionContext , BaseSubscriptionServer )
88from .observable_aiter import setup_observable_extension
99
1010from .constants import (
11- GQL_CONNECTION_ACK ,
12- GQL_CONNECTION_ERROR ,
13- GQL_COMPLETE
14- )
11+ GQL_CONNECTION_ACK , GQL_CONNECTION_ERROR , GQL_COMPLETE )
1512
1613setup_observable_extension ()
1714
@@ -45,7 +42,8 @@ def __init__(self, schema, keep_alive=True, loop=None):
4542 def get_graphql_params (self , * args , ** kwargs ):
4643 params = super (WsLibSubscriptionServer ,
4744 self ).get_graphql_params (* args , ** kwargs )
48- return dict (params , return_promise = True , executor = AsyncioExecutor (loop = self .loop ))
45+ return dict (params , return_promise = True ,
46+ executor = AsyncioExecutor (loop = self .loop ))
4947
5048 async def _handle (self , ws , request_context ):
5149 connection_context = WsLibConnectionContext (ws , request_context )
@@ -87,9 +85,11 @@ async def on_connect(self, connection_context, payload):
8785 async def on_connection_init (self , connection_context , op_id , payload ):
8886 try :
8987 await self .on_connect (connection_context , payload )
90- await self .send_message (connection_context , op_type = GQL_CONNECTION_ACK )
88+ await self .send_message (
89+ connection_context , op_type = GQL_CONNECTION_ACK )
9190 except Exception as e :
92- await self .send_error (connection_context , op_id , e , GQL_CONNECTION_ERROR )
91+ await self .send_error (
92+ connection_context , op_id , e , GQL_CONNECTION_ERROR )
9393 await connection_context .close (1011 )
9494
9595 async def on_start (self , connection_context , op_id , params ):
@@ -100,14 +100,16 @@ async def on_start(self, connection_context, op_id, params):
100100 execution_result = await execution_result
101101
102102 if not hasattr (execution_result , '__aiter__' ):
103- await self .send_execution_result (connection_context , op_id , execution_result )
103+ await self .send_execution_result (
104+ connection_context , op_id , execution_result )
104105 else :
105106 iterator = await execution_result .__aiter__ ()
106107 connection_context .register_operation (op_id , iterator )
107108 async for single_result in iterator :
108109 if not connection_context .has_operation (op_id ):
109110 break
110- await self .send_execution_result (connection_context , op_id , single_result )
111+ await self .send_execution_result (
112+ connection_context , op_id , single_result )
111113 await self .send_message (connection_context , op_id , GQL_COMPLETE )
112114
113115 async def on_stop (self , connection_context , op_id ):
0 commit comments