@@ -693,16 +693,24 @@ async def execute_command(self, *args: EncodableT):
693693 # legitimate message off the stack if the connection is already
694694 # subscribed to one or more channels
695695
696+ await self .connect ()
697+ connection = self .connection
698+ kwargs = {"check_health" : not self .subscribed }
699+ await self ._execute (connection , connection .send_command , * args , ** kwargs )
700+
701+ async def connect (self ):
702+ """
703+ Ensure that the PubSub is connected
704+ """
696705 if self .connection is None :
697706 self .connection = await self .connection_pool .get_connection (
698707 "pubsub" , self .shard_hint
699708 )
700709 # register a callback that re-subscribes to any channels we
701710 # were listening to when we were disconnected
702711 self .connection .register_connect_callback (self .on_connect )
703- connection = self .connection
704- kwargs = {"check_health" : not self .subscribed }
705- await self ._execute (connection , connection .send_command , * args , ** kwargs )
712+ else :
713+ await self .connection .connect ()
706714
707715 async def _disconnect_raise_connect (self , conn , error ):
708716 """
@@ -962,6 +970,7 @@ async def run(
962970 if handler is None :
963971 raise PubSubError (f"Pattern: '{ pattern } ' has no handler registered" )
964972
973+ await self .connect ()
965974 while True :
966975 try :
967976 await self .get_message (
0 commit comments