2525 loop_cls = asyncio .ProactorEventLoop # type: ignore[attr-defined,misc]
2626
2727
28+ # pylint: disable=logging-fstring-interpolation
29+
2830class AsyncioEventLoop (BaseEventLoop , asyncio .Protocol ,
2931 asyncio .SubprocessProtocol ):
3032 """`BaseEventLoop` subclass that uses `asyncio` as a backend."""
@@ -42,6 +44,7 @@ def connection_made(self, transport):
4244
4345 def connection_lost (self , exc ):
4446 """Used to signal `asyncio.Protocol` of a lost connection."""
47+ debug (f"connection_lost: exc = { exc } " )
4548 self ._on_error (exc .args [0 ] if exc else 'EOF' )
4649
4750 def data_received (self , data : bytes ) -> None :
@@ -71,6 +74,7 @@ def pipe_data_received(self, fd, data):
7174
7275 def process_exited (self ) -> None :
7376 """Used to signal `asyncio.SubprocessProtocol` when the child exits."""
77+ debug ("process_exited" )
7478 self ._on_error ('EOF' )
7579
7680 def _init (self ) -> None :
@@ -81,50 +85,61 @@ def _init(self) -> None:
8185 self ._child_watcher = None
8286
8387 def _connect_tcp (self , address : str , port : int ) -> None :
84- coroutine = self ._loop .create_connection (self ._fact , address , port )
85- self ._loop .run_until_complete (coroutine )
88+ async def connect_tcp ():
89+ await self ._loop .create_connection (self ._fact , address , port )
90+ debug (f"tcp connection successful: { address } :{ port } " )
91+
92+ self ._loop .run_until_complete (connect_tcp ())
8693
8794 def _connect_socket (self , path : str ) -> None :
88- if os .name == 'nt' :
89- coroutine = self ._loop .create_pipe_connection ( # type: ignore[attr-defined]
90- self ._fact , path
91- )
92- else :
93- coroutine = self ._loop .create_unix_connection (self ._fact , path )
94- self ._loop .run_until_complete (coroutine )
95+ async def connect_socket ():
96+ if os .name == 'nt' :
97+ transport , _ = await self ._loop .create_pipe_connection (self ._fact , path )
98+ else :
99+ transport , _ = await self ._loop .create_unix_connection (self ._fact , path )
100+ debug ("socket connection successful: %s" , transport )
101+
102+ self ._loop .run_until_complete (connect_socket ())
95103
96104 def _connect_stdio (self ) -> None :
97- if os .name == 'nt' :
98- pipe : Any = PipeHandle (
99- msvcrt .get_osfhandle (sys .stdin .fileno ()) # type: ignore[attr-defined]
100- )
101- else :
102- pipe = sys .stdin
103- coroutine = self ._loop .connect_read_pipe (self ._fact , pipe )
104- self ._loop .run_until_complete (coroutine )
105- debug ("native stdin connection successful" )
105+ async def connect_stdin ():
106+ if os .name == 'nt' :
107+ pipe = PipeHandle (msvcrt .get_osfhandle (sys .stdin .fileno ()))
108+ else :
109+ pipe = sys .stdin
110+ await self ._loop .connect_read_pipe (self ._fact , pipe )
111+ debug ("native stdin connection successful" )
112+ self ._loop .run_until_complete (connect_stdin ())
106113
107114 # Make sure subprocesses don't clobber stdout,
108115 # send the output to stderr instead.
109116 rename_stdout = os .dup (sys .stdout .fileno ())
110117 os .dup2 (sys .stderr .fileno (), sys .stdout .fileno ())
111118
112- if os .name == 'nt' :
113- pipe = PipeHandle (
114- msvcrt .get_osfhandle (rename_stdout ) # type: ignore[attr-defined]
115- )
116- else :
117- pipe = os .fdopen (rename_stdout , 'wb' )
118- coroutine = self ._loop .connect_write_pipe (self ._fact , pipe ) # type: ignore[assignment]
119- self ._loop .run_until_complete (coroutine )
120- debug ("native stdout connection successful" )
119+ async def connect_stdout ():
120+ if os .name == 'nt' :
121+ pipe = PipeHandle (msvcrt .get_osfhandle (rename_stdout ))
122+ else :
123+ pipe = os .fdopen (rename_stdout , 'wb' )
124+
125+ await self ._loop .connect_write_pipe (self ._fact , pipe )
126+ debug ("native stdout connection successful" )
127+
128+ self ._loop .run_until_complete (connect_stdout ())
121129
122130 def _connect_child (self , argv : List [str ]) -> None :
123131 if os .name != 'nt' :
124- self ._child_watcher = asyncio .get_child_watcher ()
125- self ._child_watcher .attach_loop (self ._loop )
126- coroutine = self ._loop .subprocess_exec (self ._fact , * argv )
127- self ._loop .run_until_complete (coroutine )
132+ # see #238, #241
133+ _child_watcher = asyncio .get_child_watcher ()
134+ _child_watcher .attach_loop (self ._loop )
135+
136+ async def create_subprocess ():
137+ transport : asyncio .SubprocessTransport
138+ transport , protocol = await self ._loop .subprocess_exec (self ._fact , * argv )
139+ pid = transport .get_pid ()
140+ debug ("child subprocess_exec successful, PID = %s" , pid )
141+
142+ self ._loop .run_until_complete (create_subprocess ())
128143
129144 def _start_reading (self ) -> None :
130145 pass
0 commit comments