11import asyncio
22import socket
33import sys
4+ from logging import getLogger
45from typing import Callable , List , Optional , TypedDict , Union
56
67if sys .version_info .major >= 3 and sys .version_info .minor >= 11 :
1112from ..exceptions import ConnectionError , InvalidResponse , RedisError
1213from ..typing import EncodableT
1314from ..utils import HIREDIS_AVAILABLE
14- from .base import AsyncBaseParser , BaseParser
15+ from .base import (
16+ AsyncBaseParser ,
17+ AsyncPushNotificationsParser ,
18+ BaseParser ,
19+ PushNotificationsParser ,
20+ )
1521from .socket import (
1622 NONBLOCKING_EXCEPTION_ERROR_NUMBERS ,
1723 NONBLOCKING_EXCEPTIONS ,
@@ -32,21 +38,29 @@ class _HiredisReaderArgs(TypedDict, total=False):
3238 errors : Optional [str ]
3339
3440
35- class _HiredisParser (BaseParser ):
41+ class _HiredisParser (BaseParser , PushNotificationsParser ):
3642 "Parser class for connections using Hiredis"
3743
3844 def __init__ (self , socket_read_size ):
3945 if not HIREDIS_AVAILABLE :
4046 raise RedisError ("Hiredis is not installed" )
4147 self .socket_read_size = socket_read_size
4248 self ._buffer = bytearray (socket_read_size )
49+ self .pubsub_push_handler_func = self .handle_pubsub_push_response
50+ self .invalidation_push_handler_func = None
51+ self ._hiredis_PushNotificationType = None
4352
4453 def __del__ (self ):
4554 try :
4655 self .on_disconnect ()
4756 except Exception :
4857 pass
4958
59+ def handle_pubsub_push_response (self , response ):
60+ logger = getLogger ("push_response" )
61+ logger .debug ("Push response: " + str (response ))
62+ return response
63+
5064 def on_connect (self , connection , ** kwargs ):
5165 import hiredis
5266
@@ -64,6 +78,12 @@ def on_connect(self, connection, **kwargs):
6478 self ._reader = hiredis .Reader (** kwargs )
6579 self ._next_response = NOT_ENOUGH_DATA
6680
81+ try :
82+ self ._hiredis_PushNotificationType = hiredis .PushNotification
83+ except AttributeError :
84+ # hiredis < 3.2
85+ self ._hiredis_PushNotificationType = None
86+
6787 def on_disconnect (self ):
6888 self ._sock = None
6989 self ._reader = None
@@ -109,14 +129,24 @@ def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
109129 if custom_timeout :
110130 sock .settimeout (self ._socket_timeout )
111131
112- def read_response (self , disable_decoding = False ):
132+ def read_response (self , disable_decoding = False , push_request = False ):
113133 if not self ._reader :
114134 raise ConnectionError (SERVER_CLOSED_CONNECTION_ERROR )
115135
116136 # _next_response might be cached from a can_read() call
117137 if self ._next_response is not NOT_ENOUGH_DATA :
118138 response = self ._next_response
119139 self ._next_response = NOT_ENOUGH_DATA
140+ if self ._hiredis_PushNotificationType is not None and isinstance (
141+ response , self ._hiredis_PushNotificationType
142+ ):
143+ response = self .handle_push_response (response )
144+ if not push_request :
145+ return self .read_response (
146+ disable_decoding = disable_decoding , push_request = push_request
147+ )
148+ else :
149+ return response
120150 return response
121151
122152 if disable_decoding :
@@ -135,6 +165,16 @@ def read_response(self, disable_decoding=False):
135165 # happened
136166 if isinstance (response , ConnectionError ):
137167 raise response
168+ elif self ._hiredis_PushNotificationType is not None and isinstance (
169+ response , self ._hiredis_PushNotificationType
170+ ):
171+ response = self .handle_push_response (response )
172+ if not push_request :
173+ return self .read_response (
174+ disable_decoding = disable_decoding , push_request = push_request
175+ )
176+ else :
177+ return response
138178 elif (
139179 isinstance (response , list )
140180 and response
@@ -144,7 +184,7 @@ def read_response(self, disable_decoding=False):
144184 return response
145185
146186
147- class _AsyncHiredisParser (AsyncBaseParser ):
187+ class _AsyncHiredisParser (AsyncBaseParser , AsyncPushNotificationsParser ):
148188 """Async implementation of parser class for connections using Hiredis"""
149189
150190 __slots__ = ("_reader" ,)
@@ -154,6 +194,14 @@ def __init__(self, socket_read_size: int):
154194 raise RedisError ("Hiredis is not available." )
155195 super ().__init__ (socket_read_size = socket_read_size )
156196 self ._reader = None
197+ self .pubsub_push_handler_func = self .handle_pubsub_push_response
198+ self .invalidation_push_handler_func = None
199+ self ._hiredis_PushNotificationType = None
200+
201+ async def handle_pubsub_push_response (self , response ):
202+ logger = getLogger ("push_response" )
203+ logger .debug ("Push response: " + str (response ))
204+ return response
157205
158206 def on_connect (self , connection ):
159207 import hiredis
@@ -171,6 +219,14 @@ def on_connect(self, connection):
171219 self ._reader = hiredis .Reader (** kwargs )
172220 self ._connected = True
173221
222+ try :
223+ self ._hiredis_PushNotificationType = getattr (
224+ hiredis , "PushNotification" , None
225+ )
226+ except AttributeError :
227+ # hiredis < 3.2
228+ self ._hiredis_PushNotificationType = None
229+
174230 def on_disconnect (self ):
175231 self ._connected = False
176232
@@ -195,7 +251,7 @@ async def read_from_socket(self):
195251 return True
196252
197253 async def read_response (
198- self , disable_decoding : bool = False
254+ self , disable_decoding : bool = False , push_request : bool = False
199255 ) -> Union [EncodableT , List [EncodableT ]]:
200256 # If `on_disconnect()` has been called, prohibit any more reads
201257 # even if they could happen because data might be present.
@@ -207,6 +263,7 @@ async def read_response(
207263 response = self ._reader .gets (False )
208264 else :
209265 response = self ._reader .gets ()
266+
210267 while response is NOT_ENOUGH_DATA :
211268 await self .read_from_socket ()
212269 if disable_decoding :
@@ -219,6 +276,16 @@ async def read_response(
219276 # happened
220277 if isinstance (response , ConnectionError ):
221278 raise response
279+ elif self ._hiredis_PushNotificationType is not None and isinstance (
280+ response , self ._hiredis_PushNotificationType
281+ ):
282+ response = await self .handle_push_response (response )
283+ if not push_request :
284+ return await self .read_response (
285+ disable_decoding = disable_decoding , push_request = push_request
286+ )
287+ else :
288+ return response
222289 elif (
223290 isinstance (response , list )
224291 and response
0 commit comments