@@ -679,6 +679,7 @@ async def _register_subscription(
679679 groups ,
680680 publish_callback ,
681681 unsubscribed_callback ,
682+ initial_payload ,
682683 notification_queue_limit = None ,
683684 ):
684685 """Register a new subscription when client subscribes.
@@ -697,6 +698,7 @@ async def _register_subscription(
697698 subscription groups current subscription belongs to.
698699 unsubscribed_callback: Called to notify when a client
699700 unsubscribes.
701+ initial_payload: Initial payload.
700702 notification_queue_limit: LImit for the subscribtion
701703 notification queue. Default is used if not set.
702704
@@ -720,13 +722,25 @@ async def _register_subscription(
720722 queue_size = self .subscription_notification_queue_limit
721723 notification_queue = asyncio .Queue (maxsize = queue_size )
722724
725+ # Enqueue the initial payload.
726+ if initial_payload is not self .SKIP :
727+ notification_queue .put_nowait (Serializer .serialize (initial_payload ))
728+
723729 # Start an endless task which listens the `notification_queue`
724730 # and invokes subscription "resolver" on new notifications.
725731 async def notifier ():
726732 """Watch the notification queue and notify clients."""
727733
728734 # Assert we run in a proper thread.
729735 self ._assert_thread ()
736+
737+ # Dirty hack to partially workaround the race between:
738+ # 1) call to `result.subscribe` in `_on_gql_start`; and
739+ # 2) call to `trigger.on_next` below in this function.
740+ # The first call must be earlier. Otherwise, first one or more notifications
741+ # may be lost.
742+ await asyncio .sleep (1 )
743+
730744 while True :
731745 serialized_payload = await notification_queue .get ()
732746
0 commit comments