44import logging
55from typing import (
66 Any ,
7+ Awaitable ,
78 Callable ,
89 List ,
910 Optional ,
1011 cast ,
1112)
13+ from weakref import WeakSet
1214
1315from eth .tools .logging import TraceLogger
1416
@@ -30,6 +32,8 @@ def __init__(self) -> None:
3032class BaseService (ABC , CancellableMixin ):
3133 logger : TraceLogger = None
3234 _child_services : List ['BaseService' ]
35+ # Use a WeakSet so that we don't have to bother updating it when tasks finish.
36+ _tasks : 'WeakSet[asyncio.Future[Any]]'
3337 _finished_callbacks : List [Callable [['BaseService' ], None ]]
3438 # Number of seconds cancel() will wait for run() to finish.
3539 _wait_until_finished_timeout = 5
@@ -45,6 +49,7 @@ def __init__(self,
4549 self .events = ServiceEvents ()
4650 self ._run_lock = asyncio .Lock ()
4751 self ._child_services = []
52+ self ._tasks = WeakSet ()
4853 self ._finished_callbacks = []
4954
5055 self ._loop = loop
@@ -114,12 +119,26 @@ async def run(
114119 def add_finished_callback (self , finished_callback : Callable [['BaseService' ], None ]) -> None :
115120 self ._finished_callbacks .append (finished_callback )
116121
117- def run_child_service (self , child_service : 'BaseService' ) -> 'asyncio.Future[Any]' :
122+ def run_task (self , awaitable : Awaitable [Any ]) -> None :
123+ """Run the given awaitable in the background.
124+
125+ The awaitable should return whenever this service's cancel token is triggered.
126+
127+ If it raises OperationCancelled, that is caught and ignored.
128+ """
129+ async def f () -> None :
130+ try :
131+ await awaitable
132+ except OperationCancelled :
133+ pass
134+ self ._tasks .add (asyncio .ensure_future (f ()))
135+
136+ def run_child_service (self , child_service : 'BaseService' ) -> None :
118137 """
119138 Run a child service and keep a reference to it to be considered during the cleanup.
120139 """
121140 self ._child_services .append (child_service )
122- return asyncio . ensure_future (child_service .run ())
141+ self . run_task (child_service .run ())
123142
124143 async def _run_in_executor (self , callback : Callable [..., Any ], * args : Any ) -> Any :
125144 loop = self .get_event_loop ()
@@ -136,6 +155,7 @@ async def cleanup(self) -> None:
136155 await asyncio .gather (* [
137156 child_service .events .cleaned_up .wait ()
138157 for child_service in self ._child_services ],
158+ * [task for task in self ._tasks ],
139159 self ._cleanup ()
140160 )
141161 self .events .cleaned_up .set ()
@@ -155,10 +175,20 @@ async def cancel(self) -> None:
155175 await asyncio .wait_for (
156176 self .events .cleaned_up .wait (), timeout = self ._wait_until_finished_timeout )
157177 except asyncio .futures .TimeoutError :
158- self .logger .info ("Timed out waiting for %s to finish its cleanup, exiting anyway" , self )
178+ self .logger .info (
179+ "Timed out waiting for %s to finish its cleanup, forcibly cancelling pending "
180+ "tasks and exiting anyway" , self )
181+ self ._forcibly_cancel_all_tasks ()
182+ # Sleep a bit because the Future.cancel() method just schedules the callbacks, so we
183+ # need to give the event loop a chance to actually call them.
184+ await asyncio .sleep (0.5 )
159185 else :
160186 self .logger .debug ("%s finished cleanly" , self )
161187
188+ def _forcibly_cancel_all_tasks (self ) -> None :
189+ for task in self ._tasks :
190+ task .cancel ()
191+
162192 @property
163193 def is_running (self ) -> bool :
164194 return self ._run_lock .locked ()
@@ -187,13 +217,12 @@ async def _run(self) -> None:
187217 """
188218 raise NotImplementedError ()
189219
190- @abstractmethod
191220 async def _cleanup (self ) -> None :
192221 """Clean up any resources held by this service.
193222
194223 Called after the service's _run() method returns.
195224 """
196- raise NotImplementedError ()
225+ pass
197226
198227
199228def service_timeout (timeout : int ) -> Callable [..., Any ]:
0 commit comments