@@ -222,9 +222,6 @@ async def __aenter__(self):
222222 return await self ._context_manager .__aenter__ ()
223223
224224 async def __aexit__ (self , exc_type , exc , tb ):
225- if self .listening_broadcast_channel is not None :
226- await self .listening_broadcast_channel .disconnect ()
227- self .listening_broadcast_channel = None
228225 await self ._context_manager .__aexit__ (exc_type , exc , tb )
229226
230227 async def start_reader_task (self ):
@@ -268,40 +265,45 @@ async def __read_notifications__(self):
268265 read incoming broadcasts and posting them to the intreal notifier
269266 """
270267 logger .debug ("Starting broadcaster listener" )
271- # Subscribe to our channel
272- async with self .listening_broadcast_channel .subscribe (
273- channel = self ._channel
274- ) as subscriber :
275- async for event in subscriber :
276- try :
277- notification = BroadcastNotification .parse_raw (event .message )
278- # Avoid re-publishing our own broadcasts
279- if notification .notifier_id != self ._id :
280- logger .debug (
281- "Handling incoming broadcast event: {}" .format (
282- {
283- "topics" : notification .topics ,
284- "src" : notification .notifier_id ,
285- }
268+ try :
269+ # Subscribe to our channel
270+ async with self .listening_broadcast_channel .subscribe (
271+ channel = self ._channel
272+ ) as subscriber :
273+ async for event in subscriber :
274+ try :
275+ notification = BroadcastNotification .parse_raw (event .message )
276+ # Avoid re-publishing our own broadcasts
277+ if notification .notifier_id != self ._id :
278+ logger .debug (
279+ "Handling incoming broadcast event: {}" .format (
280+ {
281+ "topics" : notification .topics ,
282+ "src" : notification .notifier_id ,
283+ }
284+ )
286285 )
287- )
288- # Notify subscribers of message received from broadcast
289- task = asyncio . create_task (
290- self . _notifier . notify (
291- notification .topics ,
292- notification . data ,
293- notifier_id = self . _id ,
286+ # Notify subscribers of message received from broadcast
287+ task = asyncio . create_task (
288+ self . _notifier . notify (
289+ notification . topics ,
290+ notification .data ,
291+ notifier_id = self . _id ,
292+ )
294293 )
295- )
296294
297- self ._tasks .add (task )
295+ self ._tasks .add (task )
298296
299- def cleanup (task ):
300- self ._tasks .remove (task )
297+ def cleanup (task ):
298+ self ._tasks .remove (task )
301299
302- task .add_done_callback (cleanup )
303- except :
304- logger .exception ("Failed handling incoming broadcast" )
305- logger .info (
306- "No more events to read from subscriber (underlying connection closed)"
307- )
300+ task .add_done_callback (cleanup )
301+ except :
302+ logger .exception ("Failed handling incoming broadcast" )
303+ logger .info (
304+ "No more events to read from subscriber (underlying connection closed)"
305+ )
306+ finally :
307+ if self .listening_broadcast_channel is not None :
308+ await self .listening_broadcast_channel .disconnect ()
309+ self .listening_broadcast_channel = None
0 commit comments