11from typing import (
2+ AsyncIterator ,
23 Awaitable ,
34 TypeVar ,
45)
56
67from cancel_token import CancelToken
78
9+ TReturn = TypeVar ('TReturn' )
10+
811
912class CancellableMixin :
1013 cancel_token : CancelToken = None
1114
12- _TReturn = TypeVar ('_TReturn' )
13-
1415 async def wait (self ,
15- awaitable : Awaitable [_TReturn ],
16+ awaitable : Awaitable [TReturn ],
1617 token : CancelToken = None ,
17- timeout : float = None ) -> _TReturn :
18+ timeout : float = None ) -> TReturn :
1819 """See wait_first()"""
1920 return await self .wait_first (awaitable , token = token , timeout = timeout )
2021
2122 async def wait_first (self ,
22- * awaitables : Awaitable [_TReturn ],
23+ * awaitables : Awaitable [TReturn ],
2324 token : CancelToken = None ,
24- timeout : float = None ) -> _TReturn :
25+ timeout : float = None ) -> TReturn :
2526 """
2627 Wait for the first awaitable to complete, unless we timeout or the token chain is triggered.
2728
@@ -39,3 +40,30 @@ async def wait_first(self,
3940 else :
4041 token_chain = token .chain (self .cancel_token )
4142 return await token_chain .cancellable_wait (* awaitables , timeout = timeout )
43+
44+ async def wait_iter (
45+ self ,
46+ aiterable : AsyncIterator [TReturn ],
47+ token : CancelToken = None ,
48+ timeout : float = None ) -> AsyncIterator [TReturn ]:
49+ """
50+ Iterate through an async iterator, raising the OperationCancelled exception if the token is
51+ triggered. For example:
52+
53+ ::
54+
55+ async for val in self.wait_iter(my_async_iterator()):
56+ do_stuff(val)
57+
58+ See :meth:`CancellableMixin.wait_first` for using arguments ``token`` and ``timeout``
59+ """
60+ aiter = aiterable .__aiter__ ()
61+ while True :
62+ try :
63+ yield await self .wait (
64+ aiter .__anext__ (),
65+ token = token ,
66+ timeout = timeout ,
67+ )
68+ except StopAsyncIteration :
69+ break
0 commit comments