22
33import asyncio
44import concurrent .futures
5- import contextlib
65import contextvars
76import functools
87import inspect
98import threading
109import time
11- import traceback
1210import warnings
1311import weakref
1412from collections import deque
@@ -285,9 +283,6 @@ def check_canceled_sync() -> bool:
285283 return True
286284
287285
288- # __threadpool_executor = ThreadPoolExecutor(thread_name_prefix="sub_asyncio")
289-
290-
291286def run_in_thread (func : Callable [..., _T ], / , * args : Any , ** kwargs : Any ) -> asyncio .Future [_T ]:
292287 global __tread_pool_executor
293288
@@ -296,12 +291,6 @@ def run_in_thread(func: Callable[..., _T], /, *args: Any, **kwargs: Any) -> asyn
296291 ctx = contextvars .copy_context ()
297292 func_call = functools .partial (ctx .run , func , * args , ** kwargs )
298293
299- # return cast(
300- # "asyncio.Future[_T]",
301- # # loop.run_in_executor(__threadpool_executor, cast(Callable[..., _T], func_call)),
302- # loop.run_in_executor(None, cast(Callable[..., _T], func_call)),
303- # )
304-
305294 executor = ThreadPoolExecutor (max_workers = 1 , thread_name_prefix = "sub_asyncio" )
306295 try :
307296 return cast (
@@ -341,33 +330,9 @@ async def create_inner_task(coro: Callable[..., Coroutine[Any, Any, _T]], *args:
341330 return await inner_task
342331
343332 def run (coro : Callable [..., Coroutine [Any , Any , _T ]], * args : Any , ** kwargs : Any ) -> _T :
344-
345- old_name = threading .current_thread ().name
346333 threading .current_thread ().name = coro .__qualname__
347- try :
348- return asyncio .run (create_inner_task (coro , * args , ** kwargs ))
349- finally :
350- threading .current_thread ().name = old_name
351-
352- # loop = asyncio.new_event_loop()
353-
354- # try:
355- # asyncio.set_event_loop(loop)
356-
357- # t = loop.create_task(create_inner_task(coro, *args, **kwargs), name=coro.__qualname__)
358-
359- # return loop.run_until_complete(t)
360- # finally:
361- # try:
362- # running_tasks = asyncio.all_tasks(loop)
363- # if running_tasks:
364- # loop.run_until_complete(asyncio.gather(*running_tasks, return_exceptions=True))
365334
366- # loop.run_until_complete(loop.shutdown_asyncgens())
367- # finally:
368- # asyncio.set_event_loop(None)
369- # loop.close()
370- # threading.current_thread().setName(old_name)
335+ return asyncio .run (create_inner_task (coro , * args , ** kwargs ))
371336
372337 cti = get_current_future_info ()
373338 result = run_in_thread (run , coro , * args , ** kwargs )
@@ -391,58 +356,6 @@ def done(task: asyncio.Future[_T]) -> None:
391356 return result
392357
393358
394- @contextlib .asynccontextmanager
395- async def async_lock (lock : threading .RLock ) -> AsyncGenerator [None , None ]:
396- import time
397-
398- start_time = time .monotonic ()
399- locked = lock .acquire (blocking = False )
400- while not locked :
401- if time .monotonic () - start_time >= 1800 :
402- raise TimeoutError ("Timeout waiting for lock" )
403-
404- await asyncio .sleep (0.001 )
405-
406- locked = lock .acquire (blocking = False )
407- try :
408- yield
409- finally :
410- if locked :
411- lock .release ()
412- # with lock:
413- # yield
414-
415-
416- class NewEvent :
417- def __init__ (self , value : bool = False ) -> None :
418- self ._event = threading .Event ()
419- if value :
420- self ._event .set ()
421-
422- def is_set (self ) -> bool :
423- return self ._event .is_set ()
424-
425- def set (self ) -> None :
426- self ._event .set ()
427-
428- def clear (self ) -> None :
429- self ._event .clear ()
430-
431- async def wait (self , timeout : Optional [float ] = None ) -> bool :
432- if timeout is not None and timeout > 0 :
433- start = time .monotonic ()
434- else :
435- start = None
436-
437- while not (result := self .is_set ()):
438- if start is not None and timeout is not None and (time .monotonic () - start ) > timeout :
439- break
440-
441- await asyncio .sleep (0 )
442-
443- return result
444-
445-
446359class Event :
447360 """Thread safe version of an async Event"""
448361
@@ -516,208 +429,6 @@ async def wait(self, timeout: Optional[float] = None) -> bool:
516429 return False
517430
518431
519- class Semaphore :
520- """Thread safe version of a Semaphore"""
521-
522- def __init__ (self , value : int = 1 ) -> None :
523- if value < 0 :
524- raise ValueError ("Semaphore initial value must be >= 0" )
525- self ._value = value
526- self ._waiters : Deque [asyncio .Future [Any ]] = deque ()
527-
528- self ._lock = threading .RLock ()
529-
530- def __repr__ (self ) -> str :
531- res = super ().__repr__ ()
532- extra = "locked" if self .locked () else f"unlocked, value:{ self ._value } "
533- if self ._waiters :
534- extra = f"{ extra } , waiters:{ len (self ._waiters )} "
535- return f"<{ res [1 :- 1 ]} [{ extra } ]>"
536-
537- async def _wake_up_next (self ) -> None :
538- async with async_lock (self ._lock ):
539- while self ._waiters :
540- waiter = self ._waiters .popleft ()
541-
542- if not waiter .done ():
543- if waiter .get_loop () == asyncio .get_running_loop ():
544- if not waiter .done ():
545- waiter .set_result (True )
546- else :
547- if waiter .get_loop ().is_running ():
548-
549- def set_result (w : asyncio .Future [Any ], ev : threading .Event ) -> None :
550- try :
551- if w .get_loop ().is_running () and not w .done ():
552- w .set_result (True )
553- finally :
554- ev .set ()
555-
556- if not waiter .done ():
557- done = threading .Event ()
558-
559- waiter .get_loop ().call_soon_threadsafe (set_result , waiter , done )
560-
561- start = time .monotonic ()
562- while not done .is_set ():
563-
564- if time .monotonic () - start > 120 :
565- raise TimeoutError ("Can't set future result." )
566-
567- await asyncio .sleep (0.001 )
568-
569- def locked (self ) -> bool :
570- with self ._lock :
571- return self ._value == 0
572-
573- async def acquire (self , timeout : Optional [float ] = None ) -> bool :
574- while True :
575- async with async_lock (self ._lock ):
576- if self ._value > 0 :
577- break
578- fut = create_sub_future ()
579- self ._waiters .append (fut )
580-
581- try :
582- await asyncio .wait_for (fut , timeout )
583- except asyncio .TimeoutError :
584- return False
585- except (SystemExit , KeyboardInterrupt ):
586- raise
587-
588- except BaseException :
589- if not fut .done ():
590- fut .cancel ()
591- if self ._value > 0 and not fut .cancelled ():
592- await self ._wake_up_next ()
593-
594- raise
595-
596- async with async_lock (self ._lock ):
597- self ._value -= 1
598-
599- return True
600-
601- async def release (self ) -> None :
602- self ._value += 1
603- await self ._wake_up_next ()
604-
605- async def __aenter__ (self ) -> None :
606- await self .acquire ()
607-
608- async def __aexit__ (
609- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
610- ) -> None :
611- await self .release ()
612-
613-
614- class BoundedSemaphore (Semaphore ):
615- """Thread safe version of a BoundedSemaphore"""
616-
617- def __init__ (self , value : int = 1 ) -> None :
618- self ._bound_value = value
619- super ().__init__ (value )
620-
621- async def release (self ) -> None :
622- if self ._value >= self ._bound_value :
623- raise ValueError ("BoundedSemaphore released too many times" )
624- await super ().release ()
625-
626-
627- class NewNewLock :
628- def __init__ (self ) -> None :
629- self ._lock = threading .Lock ()
630- self ._owner_thread : Optional [threading .Thread ] = None
631- self ._owner_task : Optional [asyncio .Task [Any ]] = None
632-
633- def locked (self ) -> bool :
634- return self ._lock .locked ()
635-
636- def acquire (self , blocking : bool = True , timeout : float = - 1 ) -> bool :
637- return self ._lock .acquire (blocking = blocking , timeout = timeout )
638-
639- def release (self ) -> None :
640- self ._lock .release ()
641- self ._owner_task = None
642- self ._owner_thread = None
643-
644- async def acquire_async (self , blocking : bool = True , timeout : float = - 1 ) -> bool :
645- start = time .monotonic ()
646- while not (aquired := self .acquire (blocking = False )):
647- if not blocking :
648- return False
649-
650- current = time .monotonic () - start
651- if timeout > 0 and current > timeout :
652- break
653-
654- if current > 30 and self ._owner_task is not None :
655- tb = traceback .format_stack (self ._owner_task .get_stack ()[0 ]) if self ._owner_task is not None else ""
656- warnings .warn (
657- f"locking takes to long { self ._owner_thread } { self ._owner_task } { tb } " ,
658- )
659-
660- await asyncio .sleep (0 )
661-
662- try :
663- await asyncio .sleep (0 )
664- except asyncio .CancelledError :
665- if aquired :
666- self ._lock .release ()
667- aquired = False
668- raise
669-
670- self ._owner_task = asyncio .current_task ()
671- self ._owner_thread = threading .current_thread ()
672-
673- return aquired
674-
675- async def release_async (self ) -> None :
676- self .release ()
677-
678- def __enter__ (self ) -> None :
679- self .acquire ()
680-
681- def __exit__ (
682- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
683- ) -> None :
684- self .release ()
685-
686- async def __aenter__ (self ) -> None :
687- await self .acquire_async ()
688-
689- async def __aexit__ (
690- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
691- ) -> None :
692- await self .release_async ()
693-
694-
695- class NewLock :
696- def __init__ (self ) -> None :
697- self ._block = BoundedSemaphore (value = 1 )
698-
699- def __repr__ (self ) -> str :
700- return "<%s _block=%s>" % (self .__class__ .__name__ , self ._block )
701-
702- async def acquire (self , timeout : Optional [float ] = None ) -> bool :
703- return await self ._block .acquire (timeout )
704-
705- async def release (self ) -> None :
706- await self ._block .release ()
707-
708- @property
709- def locked (self ) -> bool :
710- return self ._block .locked ()
711-
712- async def __aenter__ (self ) -> None :
713- await self .acquire ()
714-
715- async def __aexit__ (
716- self , exc_type : Optional [Type [BaseException ]], exc_val : Optional [BaseException ], exc_tb : Optional [TracebackType ]
717- ) -> None :
718- await self .release ()
719-
720-
721432class Lock :
722433 """Threadsafe version of an async Lock."""
723434
@@ -797,11 +508,11 @@ async def _wake_up_next(self) -> None:
797508 if fut in self ._waiters :
798509 self ._waiters .remove (fut )
799510
800- if fut .get_loop () == asyncio . get_running_loop ():
801- if not fut .done ():
802- fut .set_result ( True )
803- else :
804- if fut . get_loop (). is_running () :
511+ if fut .get_loop (). is_running () and not fut . get_loop (). is_closed ():
512+ if fut .get_loop () == asyncio . get_running_loop ():
513+ if not fut .done ():
514+ fut . set_result ( True )
515+ else :
805516
806517 def set_result (w : asyncio .Future [Any ], ev : threading .Event ) -> None :
807518 try :
@@ -822,6 +533,9 @@ def set_result(w: asyncio.Future[Any], ev: threading.Event) -> None:
822533 raise TimeoutError ("Can't set future result." )
823534
824535 await asyncio .sleep (0.001 )
536+ else :
537+ warnings .warn (f"Future { repr (fut )} loop is closed" )
538+ await self ._wake_up_next ()
825539
826540
827541class FutureInfo :
0 commit comments