-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Support for maintenance push notifications handling during server upgrade or maintenance procedures. #3756
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Support for maintenance push notifications handling during server upgrade or maintenance procedures. #3756
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
5139688
Hitless upgrade: Support initial implementation for synchronous Redis…
petyaslavova 6f13953
Adding handling of FAILING_OVER and FAILED_OVER events/push notificat…
petyaslavova f2c677a
Hitless upgrade: Adding handshake command to enable the notifications…
elena-kolevska 32d837a
Hitless-Upgrade: Add handling of MOVING push notification with "null"…
petyaslavova File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,17 @@ | ||
| import logging | ||
| import sys | ||
| from abc import ABC | ||
| from asyncio import IncompleteReadError, StreamReader, TimeoutError | ||
| from typing import Callable, List, Optional, Protocol, Union | ||
| from typing import Awaitable, Callable, List, Optional, Protocol, Union | ||
|
|
||
| from redis.maintenance_events import ( | ||
| MaintenanceEvent, | ||
| NodeFailedOverEvent, | ||
| NodeFailingOverEvent, | ||
| NodeMigratedEvent, | ||
| NodeMigratingEvent, | ||
| NodeMovingEvent, | ||
| ) | ||
|
|
||
| if sys.version_info.major >= 3 and sys.version_info.minor >= 11: | ||
| from asyncio import timeout as async_timeout | ||
|
|
@@ -50,6 +60,8 @@ | |
| "Client sent AUTH, but no password is set": AuthenticationError, | ||
| } | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class BaseParser(ABC): | ||
| EXCEPTION_CLASSES = { | ||
|
|
@@ -158,48 +170,195 @@ async def read_response( | |
| raise NotImplementedError() | ||
|
|
||
|
|
||
| _INVALIDATION_MESSAGE = [b"invalidate", "invalidate"] | ||
| class MaintenanceNotificationsParser: | ||
| """Protocol defining maintenance push notification parsing functionality""" | ||
|
|
||
| @staticmethod | ||
| def parse_maintenance_start_msg(response, notification_type): | ||
| # Expected message format is: <event_type> <seq_number> <time> | ||
| id = response[1] | ||
| ttl = response[2] | ||
| return notification_type(id, ttl) | ||
|
|
||
| @staticmethod | ||
| def parse_maintenance_completed_msg(response, notification_type): | ||
| # Expected message format is: <event_type> <seq_number> | ||
| id = response[1] | ||
| return notification_type(id) | ||
|
|
||
| @staticmethod | ||
| def parse_moving_msg(response): | ||
| # Expected message format is: MOVING <seq_number> <time> <endpoint> | ||
| id = response[1] | ||
| ttl = response[2] | ||
| if response[3] in [b"null", "null"]: | ||
| host, port = None, None | ||
| else: | ||
| value = response[3] | ||
| if isinstance(value, bytes): | ||
| value = value.decode() | ||
| host, port = value.split(":") | ||
| port = int(port) if port is not None else None | ||
|
|
||
| return NodeMovingEvent(id, host, port, ttl) | ||
|
|
||
|
|
||
| _INVALIDATION_MESSAGE = "invalidate" | ||
| _MOVING_MESSAGE = "MOVING" | ||
| _MIGRATING_MESSAGE = "MIGRATING" | ||
| _MIGRATED_MESSAGE = "MIGRATED" | ||
| _FAILING_OVER_MESSAGE = "FAILING_OVER" | ||
| _FAILED_OVER_MESSAGE = "FAILED_OVER" | ||
|
|
||
| _MAINTENANCE_MESSAGES = ( | ||
| _MIGRATING_MESSAGE, | ||
| _MIGRATED_MESSAGE, | ||
| _FAILING_OVER_MESSAGE, | ||
| _FAILED_OVER_MESSAGE, | ||
| ) | ||
|
|
||
| MSG_TYPE_TO_EVENT_PARSER_MAPPING: dict[str, tuple[type[MaintenanceEvent], Callable]] = { | ||
|
||
| _MIGRATING_MESSAGE: ( | ||
| NodeMigratingEvent, | ||
| MaintenanceNotificationsParser.parse_maintenance_start_msg, | ||
| ), | ||
| _MIGRATED_MESSAGE: ( | ||
| NodeMigratedEvent, | ||
| MaintenanceNotificationsParser.parse_maintenance_completed_msg, | ||
| ), | ||
| _FAILING_OVER_MESSAGE: ( | ||
| NodeFailingOverEvent, | ||
| MaintenanceNotificationsParser.parse_maintenance_start_msg, | ||
| ), | ||
| _FAILED_OVER_MESSAGE: ( | ||
| NodeFailedOverEvent, | ||
| MaintenanceNotificationsParser.parse_maintenance_completed_msg, | ||
| ), | ||
| _MOVING_MESSAGE: ( | ||
| NodeMovingEvent, | ||
| MaintenanceNotificationsParser.parse_moving_msg, | ||
| ), | ||
| } | ||
|
|
||
|
|
||
| class PushNotificationsParser(Protocol): | ||
| """Protocol defining RESP3-specific parsing functionality""" | ||
|
|
||
| pubsub_push_handler_func: Callable | ||
| invalidation_push_handler_func: Optional[Callable] = None | ||
| node_moving_push_handler_func: Optional[Callable] = None | ||
| maintenance_push_handler_func: Optional[Callable] = None | ||
|
|
||
| def handle_pubsub_push_response(self, response): | ||
| """Handle pubsub push responses""" | ||
| raise NotImplementedError() | ||
|
|
||
| def handle_push_response(self, response, **kwargs): | ||
| if response[0] not in _INVALIDATION_MESSAGE: | ||
| msg_type = response[0] | ||
| if isinstance(msg_type, bytes): | ||
| msg_type = msg_type.decode() | ||
|
|
||
| if msg_type not in ( | ||
| _INVALIDATION_MESSAGE, | ||
| *_MAINTENANCE_MESSAGES, | ||
| _MOVING_MESSAGE, | ||
| ): | ||
| return self.pubsub_push_handler_func(response) | ||
| if self.invalidation_push_handler_func: | ||
| return self.invalidation_push_handler_func(response) | ||
|
|
||
| try: | ||
| if ( | ||
| msg_type == _INVALIDATION_MESSAGE | ||
| and self.invalidation_push_handler_func | ||
| ): | ||
| return self.invalidation_push_handler_func(response) | ||
|
|
||
| if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: | ||
| parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1] | ||
|
|
||
| notification = parser_function(response) | ||
| return self.node_moving_push_handler_func(notification) | ||
|
|
||
| if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: | ||
| parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1] | ||
| notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0] | ||
| notification = parser_function(response, notification_type) | ||
|
|
||
| if notification is not None: | ||
| return self.maintenance_push_handler_func(notification) | ||
| except Exception as e: | ||
| logger.error( | ||
| "Error handling {} message ({}): {}".format(msg_type, response, e) | ||
| ) | ||
|
|
||
| return None | ||
|
|
||
| def set_pubsub_push_handler(self, pubsub_push_handler_func): | ||
| self.pubsub_push_handler_func = pubsub_push_handler_func | ||
|
|
||
| def set_invalidation_push_handler(self, invalidation_push_handler_func): | ||
| self.invalidation_push_handler_func = invalidation_push_handler_func | ||
|
|
||
| def set_node_moving_push_handler(self, node_moving_push_handler_func): | ||
| self.node_moving_push_handler_func = node_moving_push_handler_func | ||
|
|
||
| def set_maintenance_push_handler(self, maintenance_push_handler_func): | ||
| self.maintenance_push_handler_func = maintenance_push_handler_func | ||
|
|
||
|
|
||
| class AsyncPushNotificationsParser(Protocol): | ||
| """Protocol defining async RESP3-specific parsing functionality""" | ||
|
|
||
| pubsub_push_handler_func: Callable | ||
| invalidation_push_handler_func: Optional[Callable] = None | ||
| node_moving_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None | ||
| maintenance_push_handler_func: Optional[Callable[..., Awaitable[None]]] = None | ||
|
|
||
| async def handle_pubsub_push_response(self, response): | ||
| """Handle pubsub push responses asynchronously""" | ||
| raise NotImplementedError() | ||
|
|
||
| async def handle_push_response(self, response, **kwargs): | ||
| """Handle push responses asynchronously""" | ||
| if response[0] not in _INVALIDATION_MESSAGE: | ||
|
|
||
| msg_type = response[0] | ||
| if isinstance(msg_type, bytes): | ||
| msg_type = msg_type.decode() | ||
|
|
||
| if msg_type not in ( | ||
| _INVALIDATION_MESSAGE, | ||
| *_MAINTENANCE_MESSAGES, | ||
| _MOVING_MESSAGE, | ||
| ): | ||
| return await self.pubsub_push_handler_func(response) | ||
| if self.invalidation_push_handler_func: | ||
| return await self.invalidation_push_handler_func(response) | ||
|
|
||
| try: | ||
| if ( | ||
| msg_type == _INVALIDATION_MESSAGE | ||
| and self.invalidation_push_handler_func | ||
| ): | ||
| return await self.invalidation_push_handler_func(response) | ||
|
|
||
| if isinstance(msg_type, bytes): | ||
| msg_type = msg_type.decode() | ||
|
|
||
| if msg_type == _MOVING_MESSAGE and self.node_moving_push_handler_func: | ||
| parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1] | ||
| notification = parser_function(response) | ||
| return await self.node_moving_push_handler_func(notification) | ||
|
|
||
| if msg_type in _MAINTENANCE_MESSAGES and self.maintenance_push_handler_func: | ||
| parser_function = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][1] | ||
| notification_type = MSG_TYPE_TO_EVENT_PARSER_MAPPING[msg_type][0] | ||
| notification = parser_function(response, notification_type) | ||
|
|
||
| if notification is not None: | ||
| return await self.maintenance_push_handler_func(notification) | ||
| except Exception as e: | ||
| logger.error( | ||
| "Error handling {} message ({}): {}".format(msg_type, response, e) | ||
| ) | ||
|
|
||
| return None | ||
|
|
||
| def set_pubsub_push_handler(self, pubsub_push_handler_func): | ||
| """Set the pubsub push handler function""" | ||
|
|
@@ -209,6 +368,12 @@ def set_invalidation_push_handler(self, invalidation_push_handler_func): | |
| """Set the invalidation push handler function""" | ||
| self.invalidation_push_handler_func = invalidation_push_handler_func | ||
|
|
||
| def set_node_moving_push_handler(self, node_moving_push_handler_func): | ||
| self.node_moving_push_handler_func = node_moving_push_handler_func | ||
|
|
||
| def set_maintenance_push_handler(self, maintenance_push_handler_func): | ||
| self.maintenance_push_handler_func = maintenance_push_handler_func | ||
|
|
||
|
|
||
| class _AsyncRESPBase(AsyncBaseParser): | ||
| """Base class for async resp parsing""" | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.