@@ -728,15 +728,23 @@ async def notifier():
728728 # Assert we run in a proper thread.
729729 self ._assert_thread ()
730730 while True :
731- payload = await notification_queue .get ()
731+ serialized_payload = await notification_queue .get ()
732+
732733 # Run a subscription's `publish` method (invoked by the
733734 # `trigger.on_next` function) within the threadpool used
734735 # for processing other GraphQL resolver functions.
735- # NOTE: `lambda` is important to run the deserialization
736+ # NOTE: it is important to run the deserialization
736737 # in the worker thread as well.
737- await self ._run_in_worker (
738- lambda : trigger .on_next (Serializer .deserialize (payload ))
739- )
738+ def workload ():
739+ try :
740+ payload = Serializer .deserialize (serialized_payload )
741+ except Exception as ex : # pylint: disable=broad-except
742+ trigger .on_error (f"Cannot deserialize payload. { ex } " )
743+ else :
744+ trigger .on_next (payload )
745+
746+ await self ._run_in_worker (workload )
747+
740748 # Message processed. This allows `Queue.join` to work.
741749 notification_queue .task_done ()
742750
@@ -746,23 +754,23 @@ async def notifier():
746754 lambda publish_returned : publish_returned is not self .SKIP
747755 )
748756
749- # Start listening for broadcasts (subscribe to the Channels
750- # groups), spawn the notification processing task and put
751- # subscription information into the registry.
752- # NOTE: Update of `_sids_by_group` & `_subscriptions` must be
753- # atomic i.e. without `awaits` in between.
757+ # Start listening for broadcasts (subscribe to the Channels
758+ # groups), spawn the notification processing task and put
759+ # subscription information into the registry.
760+ # NOTE: Update of `_sids_by_group` & `_subscriptions` must be
761+ # atomic i.e. without `awaits` in between.
754762 waitlist = []
755- for group in groups :
756- self ._sids_by_group .setdefault (group , []).append (operation_id )
763+ for group in groups :
764+ self ._sids_by_group .setdefault (group , []).append (operation_id )
757765 waitlist .append (self ._channel_layer .group_add (group , self .channel_name ))
758766 notifier_task = self ._spawn_background_task (notifier ())
759- self ._subscriptions [operation_id ] = self ._SubInf (
760- groups = groups ,
761- sid = operation_id ,
762- unsubscribed_callback = unsubscribed_callback ,
763- notification_queue = notification_queue ,
764- notifier_task = notifier_task ,
765- )
767+ self ._subscriptions [operation_id ] = self ._SubInf (
768+ groups = groups ,
769+ sid = operation_id ,
770+ unsubscribed_callback = unsubscribed_callback ,
771+ notification_queue = notification_queue ,
772+ notifier_task = notifier_task ,
773+ )
766774
767775 await asyncio .wait (waitlist )
768776
0 commit comments