From 38898ad94f5d8c1d6ffd39600db52306c2365dec Mon Sep 17 00:00:00 2001 From: daveads Date: Fri, 13 Dec 2024 01:40:26 +0100 Subject: [PATCH] error logging --- fastapi_websocket_pubsub/event_broadcaster.py | 104 +++++++++--------- 1 file changed, 54 insertions(+), 50 deletions(-) diff --git a/fastapi_websocket_pubsub/event_broadcaster.py b/fastapi_websocket_pubsub/event_broadcaster.py index 4aa2d55..163f0c2 100644 --- a/fastapi_websocket_pubsub/event_broadcaster.py +++ b/fastapi_websocket_pubsub/event_broadcaster.py @@ -161,6 +161,23 @@ def __init__( self._tasks = set() self.listening_broadcast_channel = None + async def _connect_broadcast_channel(self): + """Helper method to connect to broadcast channel with proper error handling""" + try: + if self.listening_broadcast_channel is None: + self.listening_broadcast_channel = self._broadcast_type(self._broadcast_url) + await self.listening_broadcast_channel.connect() + logger.info(f"Successfully connected to broadcast channel at {self._broadcast_url}") + except ConnectionError as e: + error_msg = f"Failed to connect to broadcast channel - Connection error: {str(e)}" + logger.error(error_msg) + raise ConnectionError(error_msg) from e + except Exception as e: + error_msg = f"Failed to connect to broadcast channel: {e.__class__.__name__}: {str(e)}" + logger.error(error_msg) + raise + + async def __broadcast_notifications__(self, subscription: Subscription, data): """ Share incoming internal notifications with the entire broadcast channel @@ -225,34 +242,14 @@ async def __aexit__(self, exc_type, exc, tb): await self._context_manager.__aexit__(exc_type, exc, tb) async def start_reader_task(self): - """Spawn a task reading incoming broadcasts and posting them to the intreal notifier - Raises: - BroadcasterAlreadyStarted: if called more than once per context - Returns: - the spawned task - """ - # Make sure a task wasn't started already + """Spawn a task reading incoming broadcasts with improved error handling""" if self._subscription_task is not None: - # we already started a task for this worker process - logger.debug( - "No need for listen task, already started broadcast listen task for this notifier" - ) + logger.debug("Broadcast listen task already started") return - # Init new broadcast channel for reading - try: - if self.listening_broadcast_channel is None: - self.listening_broadcast_channel = self._broadcast_type( - self._broadcast_url - ) - await self.listening_broadcast_channel.connect() - except Exception as e: - logger.error( - f"Failed to connect to broadcast channel for reading incoming events: {e}" - ) - raise e + # Connect with proper error handling + await self._connect_broadcast_channel() - # Trigger the task logger.debug("Spawning broadcast listen task") self._subscription_task = asyncio.create_task(self.__read_notifications__()) return self._subscription_task @@ -260,30 +257,26 @@ async def start_reader_task(self): def get_reader_task(self): return self._subscription_task + + async def __read_notifications__(self): - """ - read incoming broadcasts and posting them to the intreal notifier - """ + """Read incoming broadcasts with improved error handling""" logger.debug("Starting broadcaster listener") + try: - # Subscribe to our channel - async with self.listening_broadcast_channel.subscribe( - channel=self._channel - ) as subscriber: + if self.listening_broadcast_channel is None: + raise RuntimeError("Broadcast channel not initialized") + + async with self.listening_broadcast_channel.subscribe(channel=self._channel) as subscriber: async for event in subscriber: try: notification = BroadcastNotification.parse_raw(event.message) - # Avoid re-publishing our own broadcasts if notification.notifier_id != self._id: logger.debug( - "Handling incoming broadcast event: {}".format( - { - "topics": notification.topics, - "src": notification.notifier_id, - } - ) + f"Handling incoming broadcast event: " + f"topics={notification.topics}, src={notification.notifier_id}" ) - # Notify subscribers of message received from broadcast + task = asyncio.create_task( self._notifier.notify( notification.topics, @@ -293,17 +286,28 @@ async def __read_notifications__(self): ) self._tasks.add(task) - - def cleanup(task): - self._tasks.remove(task) - - task.add_done_callback(cleanup) - except: - logger.exception("Failed handling incoming broadcast") - logger.info( - "No more events to read from subscriber (underlying connection closed)" - ) + task.add_done_callback(lambda t: self._tasks.remove(t)) + + except Exception as e: + logger.error(f"Failed handling incoming broadcast: {str(e)}") + # Log full traceback for debugging + logger.exception("Full error traceback:") + + except ConnectionError as e: + logger.error(f"Broadcast channel connection error: {str(e)}") + raise + + except Exception as e: + logger.error(f"Error in broadcast listener: {e.__class__.__name__}: {str(e)}") + logger.exception("Full error traceback:") + raise + finally: if self.listening_broadcast_channel is not None: - await self.listening_broadcast_channel.disconnect() + try: + await self.listening_broadcast_channel.disconnect() + except Exception as e: + logger.error(f"Error disconnecting broadcast channel: {str(e)}") self.listening_broadcast_channel = None + + logger.info("Broadcast listener stopped") \ No newline at end of file