@@ -31,6 +31,10 @@ def is_sentinal(cls, event: str | bytes | type[_SENTINAL]) -> TypeIs[type[_SENTI
3131 return event is cls
3232
3333
34+ class StopStreaming (Exception ): # noqa: N818
35+ """Raised by an EventListener implementation to signal that the entire EventManagerListener should stop streaming events."""
36+
37+
3438class EventListenerManager :
3539 """Manages event listeners and provides a shared event queue for them to push events into.
3640
@@ -65,8 +69,12 @@ def _listener_wrapper(self, listener: EventListener) -> None:
6569 if not self .running :
6670 break
6771
72+ except StopStreaming :
73+ logger .debug ("Event listener requested to stop streaming." )
74+ self .event_queue .put (_SENTINAL )
75+
6876 except Exception :
69- logger .exception ("Error in wrapped listener." )
77+ logger .exception ("Unexpected error in wrapped listener %s. Stopping just this listener." , listener )
7078
7179 def stop (self ) -> None :
7280 """Stops the event generation loop and waits for all threads to finish.
@@ -79,10 +87,12 @@ def stop(self) -> None:
7987 self .event_queue .put (_SENTINAL )
8088
8189 for thread in self .listeners_lookup_by_thread :
82- self .listeners_lookup_by_thread [thread ].stop ()
83- thread .join ()
90+ logger .debug ("Stopping listener %s." )
91+ if thread .is_alive ():
92+ self .listeners_lookup_by_thread [thread ].stop ()
93+ thread .join ()
8494
85- logger .info ("All listeners have been stopped." )
95+ logger .info ("All listeners have been stopped." )
8696
8797 def event_stream (self ) -> Generator [str | bytes , None , None ]:
8898 """Starts all registered listeners, sets the running flag to True, and yields events from the shared queue."""
@@ -96,6 +106,7 @@ def event_stream(self) -> Generator[str | bytes, None, None]:
96106 while True :
97107 event = self .event_queue .get ()
98108 if _SENTINAL .is_sentinal (event ):
109+ logger .debug ("Sentinal received, stopping event stream." )
99110 break # Exit loop immediately if the sentinal is received
100111 yield event
101112 finally :
0 commit comments