@@ -36,15 +36,13 @@ def __init__(self, *, listening_addr: str='127.0.0.1',
3636 self .listen_task = None
3737
3838 async def _wait (self , work ):
39- work_task = asyncio .ensure_future (work , loop = self .loop )
40- stop_event_task = asyncio .ensure_future (self .stop_event .wait (),
41- loop = self .loop )
39+ work_task = asyncio .ensure_future (work )
40+ stop_event_task = asyncio .ensure_future (self .stop_event .wait ())
4241
4342 try :
4443 await asyncio .wait (
4544 [work_task , stop_event_task ],
46- return_when = asyncio .FIRST_COMPLETED ,
47- loop = self .loop )
45+ return_when = asyncio .FIRST_COMPLETED )
4846
4947 if self .stop_event .is_set ():
5048 raise StopServer ()
@@ -58,7 +56,8 @@ async def _wait(self, work):
5856
5957 def start (self ):
6058 started = threading .Event ()
61- self .thread = threading .Thread (target = self ._start , args = (started ,))
59+ self .thread = threading .Thread (
60+ target = self ._start_thread , args = (started ,))
6261 self .thread .start ()
6362 if not started .wait (timeout = 2 ):
6463 raise RuntimeError ('fuzzer proxy failed to start' )
@@ -70,13 +69,14 @@ def stop(self):
7069 def _stop (self ):
7170 self .stop_event .set ()
7271
73- def _start (self , started_event ):
72+ def _start_thread (self , started_event ):
7473 self .loop = asyncio .new_event_loop ()
74+ asyncio .set_event_loop (self .loop )
7575
76- self .connectivity = asyncio .Event (loop = self . loop )
76+ self .connectivity = asyncio .Event ()
7777 self .connectivity .set ()
78- self .connectivity_loss = asyncio .Event (loop = self . loop )
79- self .stop_event = asyncio .Event (loop = self . loop )
78+ self .connectivity_loss = asyncio .Event ()
79+ self .stop_event = asyncio .Event ()
8080
8181 if self .listening_port is None :
8282 self .listening_port = cluster .find_available_port ()
@@ -92,15 +92,15 @@ def _start(self, started_event):
9292 self .loop .close ()
9393
9494 async def _main (self , started_event ):
95- self .listen_task = asyncio .ensure_future (self .listen (), loop = self . loop )
95+ self .listen_task = asyncio .ensure_future (self .listen ())
9696 # Notify the main thread that we are ready to go.
9797 started_event .set ()
9898 try :
9999 await self .listen_task
100100 finally :
101101 for c in list (self .connections ):
102102 c .close ()
103- await asyncio .sleep (0.01 , loop = self . loop )
103+ await asyncio .sleep (0.01 )
104104 if hasattr (self .loop , 'remove_reader' ):
105105 self .loop .remove_reader (self .sock .fileno ())
106106 self .sock .close ()
@@ -176,15 +176,15 @@ def close(self):
176176
177177 async def handle (self ):
178178 self .proxy_to_backend_task = asyncio .ensure_future (
179- self .proxy_to_backend (), loop = self . loop )
179+ self .proxy_to_backend ())
180180
181181 self .proxy_from_backend_task = asyncio .ensure_future (
182- self .proxy_from_backend (), loop = self . loop )
182+ self .proxy_from_backend ())
183183
184184 try :
185185 await asyncio .wait (
186186 [self .proxy_to_backend_task , self .proxy_from_backend_task ],
187- loop = self . loop , return_when = asyncio .FIRST_COMPLETED )
187+ return_when = asyncio .FIRST_COMPLETED )
188188
189189 finally :
190190 # Asyncio fails to properly remove the readers and writers
@@ -201,17 +201,14 @@ async def handle(self):
201201
202202 async def _read (self , sock , n ):
203203 read_task = asyncio .ensure_future (
204- self .loop .sock_recv (sock , n ),
205- loop = self .loop )
204+ self .loop .sock_recv (sock , n ))
206205 conn_event_task = asyncio .ensure_future (
207- self .connectivity_loss .wait (),
208- loop = self .loop )
206+ self .connectivity_loss .wait ())
209207
210208 try :
211209 await asyncio .wait (
212210 [read_task , conn_event_task ],
213- return_when = asyncio .FIRST_COMPLETED ,
214- loop = self .loop )
211+ return_when = asyncio .FIRST_COMPLETED )
215212
216213 if self .connectivity_loss .is_set ():
217214 return None
@@ -225,15 +222,14 @@ async def _read(self, sock, n):
225222
226223 async def _write (self , sock , data ):
227224 write_task = asyncio .ensure_future (
228- self .loop .sock_sendall (sock , data ), loop = self . loop )
225+ self .loop .sock_sendall (sock , data ))
229226 conn_event_task = asyncio .ensure_future (
230- self .connectivity_loss .wait (), loop = self . loop )
227+ self .connectivity_loss .wait ())
231228
232229 try :
233230 await asyncio .wait (
234231 [write_task , conn_event_task ],
235- return_when = asyncio .FIRST_COMPLETED ,
236- loop = self .loop )
232+ return_when = asyncio .FIRST_COMPLETED )
237233
238234 if self .connectivity_loss .is_set ():
239235 return None
0 commit comments