@@ -65,7 +65,7 @@ async def __aenter__(self):
6565 "Listening for incoming events from broadcast channel (first listener started)"
6666 )
6767 # Start task listening on incoming broadcasts
68- self ._event_broadcaster .start_reader_task ()
68+ await self ._event_broadcaster .start_reader_task ()
6969
7070 if self ._share :
7171 self ._event_broadcaster ._share_count += 1
@@ -159,6 +159,7 @@ def __init__(
159159 self ._context_manager = None
160160 self ._context_manager_lock = asyncio .Lock ()
161161 self ._tasks = set ()
162+ self .listening_broadcast_channel = None
162163
163164 async def __broadcast_notifications__ (self , subscription : Subscription , data ):
164165 """
@@ -221,9 +222,12 @@ async def __aenter__(self):
221222 return await self ._context_manager .__aenter__ ()
222223
223224 async def __aexit__ (self , exc_type , exc , tb ):
225+ if self .listening_broadcast_channel is not None :
226+ await self .listening_broadcast_channel .disconnect ()
227+ self .listening_broadcast_channel = None
224228 await self ._context_manager .__aexit__ (exc_type , exc , tb )
225229
226- def start_reader_task (self ):
230+ async def start_reader_task (self ):
227231 """Spawn a task reading incoming broadcasts and posting them to the intreal notifier
228232 Raises:
229233 BroadcasterAlreadyStarted: if called more than once per context
@@ -237,6 +241,20 @@ def start_reader_task(self):
237241 "No need for listen task, already started broadcast listen task for this notifier"
238242 )
239243 return
244+
245+ # Init new broadcast channel for reading
246+ try :
247+ if self .listening_broadcast_channel is None :
248+ self .listening_broadcast_channel = self ._broadcast_type (
249+ self ._broadcast_url
250+ )
251+ await self .listening_broadcast_channel .connect ()
252+ except Exception as e :
253+ logger .error (
254+ f"Failed to connect to broadcast channel for reading incoming events: { e } "
255+ )
256+ raise e
257+
240258 # Trigger the task
241259 logger .debug ("Spawning broadcast listen task" )
242260 self ._subscription_task = asyncio .create_task (self .__read_notifications__ ())
@@ -249,44 +267,41 @@ async def __read_notifications__(self):
249267 """
250268 read incoming broadcasts and posting them to the intreal notifier
251269 """
252- logger .info ("Starting broadcaster listener" )
253- # Init new broadcast channel for reading
254- listening_broadcast_channel = self ._broadcast_type (self ._broadcast_url )
255- async with listening_broadcast_channel :
256- # Subscribe to our channel
257- async with listening_broadcast_channel .subscribe (
258- channel = self ._channel
259- ) as subscriber :
260- async for event in subscriber :
261- try :
262- notification = BroadcastNotification .parse_raw (event .message )
263- # Avoid re-publishing our own broadcasts
264- if notification .notifier_id != self ._id :
265- logger .debug (
266- "Handling incoming broadcast event: {}" .format (
267- {
268- "topics" : notification .topics ,
269- "src" : notification .notifier_id ,
270- }
271- )
270+ logger .debug ("Starting broadcaster listener" )
271+ # Subscribe to our channel
272+ async with self .listening_broadcast_channel .subscribe (
273+ channel = self ._channel
274+ ) as subscriber :
275+ async for event in subscriber :
276+ try :
277+ notification = BroadcastNotification .parse_raw (event .message )
278+ # Avoid re-publishing our own broadcasts
279+ if notification .notifier_id != self ._id :
280+ logger .debug (
281+ "Handling incoming broadcast event: {}" .format (
282+ {
283+ "topics" : notification .topics ,
284+ "src" : notification .notifier_id ,
285+ }
272286 )
273- # Notify subscribers of message received from broadcast
274- task = asyncio . create_task (
275- self . _notifier . notify (
276- notification . topics ,
277- notification .data ,
278- notifier_id = self . _id ,
279- )
287+ )
288+ # Notify subscribers of message received from broadcast
289+ task = asyncio . create_task (
290+ self . _notifier . notify (
291+ notification .topics ,
292+ notification . data ,
293+ notifier_id = self . _id ,
280294 )
295+ )
281296
282- self ._tasks .add (task )
297+ self ._tasks .add (task )
283298
284- def cleanup (task ):
285- self ._tasks .remove (task )
299+ def cleanup (task ):
300+ self ._tasks .remove (task )
286301
287- task .add_done_callback (cleanup )
288- except :
289- logger .exception ("Failed handling incoming broadcast" )
290- logger .info (
291- "No more events to read from subscriber (underlying connection closed)"
292- )
302+ task .add_done_callback (cleanup )
303+ except :
304+ logger .exception ("Failed handling incoming broadcast" )
305+ logger .info (
306+ "No more events to read from subscriber (underlying connection closed)"
307+ )
0 commit comments