55from typing import Awaitable , Callable , List , Optional , Protocol , Union
66
77from redis .maintenance_events import (
8+ MaintenanceEvent ,
9+ NodeFailedOverEvent ,
10+ NodeFailingOverEvent ,
811 NodeMigratedEvent ,
912 NodeMigratingEvent ,
1013 NodeMovingEvent ,
@@ -167,20 +170,76 @@ async def read_response(
167170 raise NotImplementedError ()
168171
169172
170- _INVALIDATION_MESSAGE = (b"invalidate" , "invalidate" )
171- _MOVING_MESSAGE = (b"MOVING" , "MOVING" )
172- _MIGRATING_MESSAGE = (b"MIGRATING" , "MIGRATING" )
173- _MIGRATED_MESSAGE = (b"MIGRATED" , "MIGRATED" )
174- _FAILING_OVER_MESSAGE = (b"FAILING_OVER" , "FAILING_OVER" )
175- _FAILED_OVER_MESSAGE = (b"FAILED_OVER" , "FAILED_OVER" )
173+ class MaintenanceNotificationsParser :
174+ """Protocol defining maintenance push notification parsing functionality"""
175+
176+ @staticmethod
177+ def parse_maintenance_start_msg (response , notification_type ):
178+ # Expected message format is: <event_type> <seq_number> <time>
179+ id = response [1 ]
180+ ttl = response [2 ]
181+ return notification_type (id , ttl )
182+
183+ @staticmethod
184+ def parse_maintenance_completed_msg (response , notification_type ):
185+ # Expected message format is: <event_type> <seq_number>
186+ id = response [1 ]
187+ return notification_type (id )
188+
189+ @staticmethod
190+ def parse_moving_msg (response ):
191+ # Expected message format is: MOVING <seq_number> <time> <endpoint>
192+ id = response [1 ]
193+ ttl = response [2 ]
194+ if response [3 ] in [b"null" , "null" ]:
195+ host , port = None , None
196+ else :
197+ value = response [3 ]
198+ if isinstance (value , bytes ):
199+ value = value .decode ()
200+ host , port = value .split (":" )
201+ port = int (port ) if port is not None else None
202+
203+ return NodeMovingEvent (id , host , port , ttl )
204+
205+
206+ _INVALIDATION_MESSAGE = "invalidate"
207+ _MOVING_MESSAGE = "MOVING"
208+ _MIGRATING_MESSAGE = "MIGRATING"
209+ _MIGRATED_MESSAGE = "MIGRATED"
210+ _FAILING_OVER_MESSAGE = "FAILING_OVER"
211+ _FAILED_OVER_MESSAGE = "FAILED_OVER"
176212
177213_MAINTENANCE_MESSAGES = (
178- * _MIGRATING_MESSAGE ,
179- * _MIGRATED_MESSAGE ,
180- * _FAILING_OVER_MESSAGE ,
181- * _FAILED_OVER_MESSAGE ,
214+ _MIGRATING_MESSAGE ,
215+ _MIGRATED_MESSAGE ,
216+ _FAILING_OVER_MESSAGE ,
217+ _FAILED_OVER_MESSAGE ,
182218)
183219
220+ MSG_TYPE_TO_EVENT_PARSER_MAPPING : dict [str , tuple [type [MaintenanceEvent ], Callable ]] = {
221+ _MIGRATING_MESSAGE : (
222+ NodeMigratingEvent ,
223+ MaintenanceNotificationsParser .parse_maintenance_start_msg ,
224+ ),
225+ _MIGRATED_MESSAGE : (
226+ NodeMigratedEvent ,
227+ MaintenanceNotificationsParser .parse_maintenance_completed_msg ,
228+ ),
229+ _FAILING_OVER_MESSAGE : (
230+ NodeFailingOverEvent ,
231+ MaintenanceNotificationsParser .parse_maintenance_start_msg ,
232+ ),
233+ _FAILED_OVER_MESSAGE : (
234+ NodeFailedOverEvent ,
235+ MaintenanceNotificationsParser .parse_maintenance_completed_msg ,
236+ ),
237+ _MOVING_MESSAGE : (
238+ NodeMovingEvent ,
239+ MaintenanceNotificationsParser .parse_moving_msg ,
240+ ),
241+ }
242+
184243
185244class PushNotificationsParser (Protocol ):
186245 """Protocol defining RESP3-specific parsing functionality"""
@@ -196,39 +255,33 @@ def handle_pubsub_push_response(self, response):
196255
197256 def handle_push_response (self , response , ** kwargs ):
198257 msg_type = response [0 ]
258+ if isinstance (msg_type , bytes ):
259+ msg_type = msg_type .decode ()
260+
199261 if msg_type not in (
200- * _INVALIDATION_MESSAGE ,
262+ _INVALIDATION_MESSAGE ,
201263 * _MAINTENANCE_MESSAGES ,
202- * _MOVING_MESSAGE ,
264+ _MOVING_MESSAGE ,
203265 ):
204266 return self .pubsub_push_handler_func (response )
205267
206268 try :
207269 if (
208- msg_type in _INVALIDATION_MESSAGE
270+ msg_type == _INVALIDATION_MESSAGE
209271 and self .invalidation_push_handler_func
210272 ):
211273 return self .invalidation_push_handler_func (response )
212274
213- if msg_type in _MOVING_MESSAGE and self .node_moving_push_handler_func :
214- # Expected message format is: MOVING <seq_number> <time> <endpoint>
215- id = response [1 ]
216- ttl = response [2 ]
217- host , port = response [3 ].decode ().split (":" )
218- notification = NodeMovingEvent (id , host , port , ttl )
275+ if msg_type == _MOVING_MESSAGE and self .node_moving_push_handler_func :
276+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
277+
278+ notification = parser_function (response )
219279 return self .node_moving_push_handler_func (notification )
220280
221281 if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
222- notification = None
223-
224- if msg_type in _MIGRATING_MESSAGE :
225- # Expected message format is: MIGRATING <seq_number> <time> <shard_id-s>
226- id = response [1 ]
227- ttl = response [2 ]
228- notification = NodeMigratingEvent (id , ttl )
229- elif msg_type in _MIGRATED_MESSAGE :
230- id = response [1 ]
231- notification = NodeMigratedEvent (id )
282+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
283+ notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][0 ]
284+ notification = parser_function (response , notification_type )
232285
233286 if notification is not None :
234287 return self .maintenance_push_handler_func (notification )
@@ -268,38 +321,35 @@ async def handle_push_response(self, response, **kwargs):
268321 """Handle push responses asynchronously"""
269322
270323 msg_type = response [0 ]
324+ if isinstance (msg_type , bytes ):
325+ msg_type = msg_type .decode ()
326+
271327 if msg_type not in (
272- * _INVALIDATION_MESSAGE ,
328+ _INVALIDATION_MESSAGE ,
273329 * _MAINTENANCE_MESSAGES ,
274- * _MOVING_MESSAGE ,
330+ _MOVING_MESSAGE ,
275331 ):
276332 return await self .pubsub_push_handler_func (response )
277333
278334 try :
279335 if (
280- msg_type in _INVALIDATION_MESSAGE
336+ msg_type == _INVALIDATION_MESSAGE
281337 and self .invalidation_push_handler_func
282338 ):
283339 return await self .invalidation_push_handler_func (response )
284340
285- if msg_type in _MOVING_MESSAGE and self . node_moving_push_handler_func :
286- # push notification from enterprise cluster for node moving
287- id = response [ 1 ]
288- ttl = response [ 2 ]
289- host , port = response [ 3 ]. split ( ":" )
290- notification = NodeMovingEvent ( id , host , port , ttl )
341+ if isinstance ( msg_type , bytes ) :
342+ msg_type = msg_type . decode ()
343+
344+ if msg_type == _MOVING_MESSAGE and self . node_moving_push_handler_func :
345+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [ msg_type ][ 1 ]
346+ notification = parser_function ( response )
291347 return await self .node_moving_push_handler_func (notification )
292348
293349 if msg_type in _MAINTENANCE_MESSAGES and self .maintenance_push_handler_func :
294- notification = None
295-
296- if msg_type in _MIGRATING_MESSAGE :
297- id = response [1 ]
298- ttl = response [2 ]
299- notification = NodeMigratingEvent (id , ttl )
300- elif msg_type in _MIGRATED_MESSAGE :
301- id = response [1 ]
302- notification = NodeMigratedEvent (id )
350+ parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][1 ]
351+ notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING [msg_type ][0 ]
352+ notification = parser_function (response , notification_type )
303353
304354 if notification is not None :
305355 return await self .maintenance_push_handler_func (notification )
0 commit comments