1717 ClusterCrossSlotError ,
1818 ClusterDownError ,
1919 ClusterError ,
20+ ConnectionError ,
2021 DataError ,
2122 MasterDownError ,
2223 MovedError ,
@@ -374,6 +375,12 @@ class RedisCluster(RedisClusterCommands):
374375 ),
375376 )
376377
378+ ERRORS_ALLOW_RETRY = (
379+ ConnectionError ,
380+ TimeoutError ,
381+ ClusterDownError ,
382+ )
383+
377384 def __init__ (
378385 self ,
379386 host = None ,
@@ -385,8 +392,6 @@ def __init__(
385392 reinitialize_steps = 10 ,
386393 read_from_replicas = False ,
387394 url = None ,
388- retry_on_timeout = False ,
389- retry = None ,
390395 ** kwargs ,
391396 ):
392397 """
@@ -417,11 +422,6 @@ def __init__(
417422 :cluster_error_retry_attempts: 'int'
418423 Retry command execution attempts when encountering ClusterDownError
419424 or ConnectionError
420- :retry_on_timeout: 'bool'
421- To specify a retry policy, first set `retry_on_timeout` to `True`
422- then set `retry` to a valid `Retry` object
423- :retry: 'Retry'
424- a `Retry` object
425425 :reinitialize_steps: 'int'
426426 Specifies the number of MOVED errors that need to occur before
427427 reinitializing the whole cluster topology. If a MOVED error occurs
@@ -452,9 +452,6 @@ def __init__(
452452 "Argument 'db' is not possible to use in cluster mode"
453453 )
454454
455- if retry_on_timeout :
456- kwargs .update ({"retry_on_timeout" : retry_on_timeout , "retry" : retry })
457-
458455 # Get the startup node/s
459456 from_url = False
460457 if url is not None :
@@ -850,7 +847,7 @@ def _parse_target_nodes(self, target_nodes):
850847
851848 def execute_command (self , * args , ** kwargs ):
852849 """
853- Wrapper for ClusterDownError and ConnectionError error handling.
850+ Wrapper for ERRORS_ALLOW_RETRY error handling.
854851
855852 It will try the number of times specified by the config option
856853 "self.cluster_error_retry_attempts" which defaults to 3 unless manually
@@ -865,18 +862,19 @@ def execute_command(self, *args, **kwargs):
865862 dict<Any, ClusterNode>
866863 """
867864 target_nodes_specified = False
868- target_nodes = kwargs .pop ("target_nodes" , None )
869- if target_nodes is not None and not self ._is_nodes_flag (target_nodes ):
870- target_nodes = self ._parse_target_nodes (target_nodes )
865+ target_nodes = None
866+ passed_targets = kwargs .pop ("target_nodes" , None )
867+ if passed_targets is not None and not self ._is_nodes_flag (passed_targets ):
868+ target_nodes = self ._parse_target_nodes (passed_targets )
871869 target_nodes_specified = True
872- # If ClusterDownError/ConnectionError were thrown, the nodes
873- # and slots cache were reinitialized. We will retry executing the
874- # command with the updated cluster setup only when the target nodes
875- # can be determined again with the new cache tables. Therefore,
876- # when target nodes were passed to this function, we cannot retry
877- # the command execution since the nodes may not be valid anymore
878- # after the tables were reinitialized. So in case of passed target
879- # nodes, retry_attempts will be set to 1.
870+ # If an error that allows retrying was thrown, the nodes and slots
871+ # cache were reinitialized. We will retry executing the command with
872+ # the updated cluster setup only when the target nodes can be
873+ # determined again with the new cache tables. Therefore, when target
874+ # nodes were passed to this function, we cannot retry the command
875+ # execution since the nodes may not be valid anymore after the tables
876+ # were reinitialized. So in case of passed target nodes,
877+ # retry_attempts will be set to 1.
880878 retry_attempts = (
881879 1 if target_nodes_specified else self .cluster_error_retry_attempts
882880 )
@@ -887,7 +885,7 @@ def execute_command(self, *args, **kwargs):
887885 if not target_nodes_specified :
888886 # Determine the nodes to execute the command on
889887 target_nodes = self ._determine_nodes (
890- * args , ** kwargs , nodes_flag = target_nodes
888+ * args , ** kwargs , nodes_flag = passed_targets
891889 )
892890 if not target_nodes :
893891 raise RedisClusterException (
@@ -897,11 +895,14 @@ def execute_command(self, *args, **kwargs):
897895 res [node .name ] = self ._execute_command (node , * args , ** kwargs )
898896 # Return the processed result
899897 return self ._process_result (args [0 ], res , ** kwargs )
900- except (ClusterDownError , ConnectionError ) as e :
901- # The nodes and slots cache were reinitialized.
902- # Try again with the new cluster setup. All other errors
903- # should be raised.
904- exception = e
898+ except BaseException as e :
899+ if type (e ) in RedisCluster .ERRORS_ALLOW_RETRY :
900+ # The nodes and slots cache were reinitialized.
901+ # Try again with the new cluster setup.
902+ exception = e
903+ else :
904+ # All other errors should be raised.
905+ raise e
905906
906907 # If it fails the configured number of times then raise exception back
907908 # to caller of this method
@@ -953,11 +954,11 @@ def _execute_command(self, target_node, *args, **kwargs):
953954 )
954955 return response
955956
956- except (RedisClusterException , BusyLoadingError ):
957- log .exception ("RedisClusterException || BusyLoadingError" )
957+ except (RedisClusterException , BusyLoadingError ) as e :
958+ log .exception (type ( e ) )
958959 raise
959- except ConnectionError :
960- log .exception ("ConnectionError" )
960+ except ( ConnectionError , TimeoutError ) as e :
961+ log .exception (type ( e ) )
961962 # ConnectionError can also be raised if we couldn't get a
962963 # connection from the pool before timing out, so check that
963964 # this is an actual connection before attempting to disconnect.
@@ -976,13 +977,6 @@ def _execute_command(self, target_node, *args, **kwargs):
976977 # and try again with the new setup
977978 self .nodes_manager .initialize ()
978979 raise
979- except TimeoutError :
980- log .exception ("TimeoutError" )
981- if connection is not None :
982- connection .disconnect ()
983-
984- if ttl < self .RedisClusterRequestTTL / 2 :
985- time .sleep (0.05 )
986980 except MovedError as e :
987981 # First, we will try to patch the slots/nodes cache with the
988982 # redirected node output and try again. If MovedError exceeds
@@ -1016,7 +1010,7 @@ def _execute_command(self, target_node, *args, **kwargs):
10161010 # ClusterDownError can occur during a failover and to get
10171011 # self-healed, we will try to reinitialize the cluster layout
10181012 # and retry executing the command
1019- time .sleep (0.05 )
1013+ time .sleep (0.25 )
10201014 self .nodes_manager .initialize ()
10211015 raise e
10221016 except ResponseError as e :
@@ -1342,7 +1336,7 @@ def initialize(self):
13421336 raise RedisClusterException (
13431337 "Cluster mode is not enabled on this node"
13441338 )
1345- cluster_slots = r .execute_command ("CLUSTER SLOTS" )
1339+ cluster_slots = str_if_bytes ( r .execute_command ("CLUSTER SLOTS" ) )
13461340 startup_nodes_reachable = True
13471341 except (ConnectionError , TimeoutError ) as e :
13481342 msg = e .__str__
@@ -1631,29 +1625,28 @@ def get_redis_connection(self):
16311625 return self .node .redis_connection
16321626
16331627
1634- ERRORS_ALLOW_RETRY = (
1635- ConnectionError ,
1636- TimeoutError ,
1637- MovedError ,
1638- AskError ,
1639- TryAgainError ,
1640- )
1641-
1642-
16431628class ClusterPipeline (RedisCluster ):
16441629 """
16451630 Support for Redis pipeline
16461631 in cluster mode
16471632 """
16481633
1634+ ERRORS_ALLOW_RETRY = (
1635+ ConnectionError ,
1636+ TimeoutError ,
1637+ MovedError ,
1638+ AskError ,
1639+ TryAgainError ,
1640+ )
1641+
16491642 def __init__ (
16501643 self ,
16511644 nodes_manager ,
16521645 result_callbacks = None ,
16531646 cluster_response_callbacks = None ,
16541647 startup_nodes = None ,
16551648 read_from_replicas = False ,
1656- cluster_error_retry_attempts = 3 ,
1649+ cluster_error_retry_attempts = 5 ,
16571650 reinitialize_steps = 10 ,
16581651 ** kwargs ,
16591652 ):
@@ -1915,7 +1908,11 @@ def _send_cluster_commands(
19151908 # collect all the commands we are allowed to retry.
19161909 # (MOVED, ASK, or connection errors or timeout errors)
19171910 attempt = sorted (
1918- (c for c in attempt if isinstance (c .result , ERRORS_ALLOW_RETRY )),
1911+ (
1912+ c
1913+ for c in attempt
1914+ if isinstance (c .result , ClusterPipeline .ERRORS_ALLOW_RETRY )
1915+ ),
19191916 key = lambda x : x .position ,
19201917 )
19211918 if attempt and allow_redirections :
0 commit comments