Skip to content

Commit eff2af0

Browse files
committed
Codestyle changes
1 parent 97d2b3c commit eff2af0

File tree

9 files changed

+732
-292
lines changed

9 files changed

+732
-292
lines changed

redis/_parsers/commands.py

Lines changed: 115 additions & 78 deletions
Large diffs are not rendered by default.

redis/asyncio/cluster.py

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -437,8 +437,12 @@ def __init__(
437437
SLOT_ID: RequestPolicy.DEFAULT_KEYED,
438438
}
439439

440-
self._policies_callback_mapping: dict[Union[RequestPolicy, ResponsePolicy], Callable] = {
441-
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [self.get_random_primary_or_all_nodes(command_name)],
440+
self._policies_callback_mapping: dict[
441+
Union[RequestPolicy, ResponsePolicy], Callable
442+
] = {
443+
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
444+
self.get_random_primary_or_all_nodes(command_name)
445+
],
442446
RequestPolicy.DEFAULT_KEYED: self.get_nodes_from_slot,
443447
RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
444448
RequestPolicy.ALL_SHARDS: self.get_primaries,
@@ -680,7 +684,9 @@ def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
680684
Returns a list of nodes for commands with a special policy.
681685
"""
682686
if not self._aggregate_nodes:
683-
raise RedisClusterException('Cannot execute FT.CURSOR commands without FT.AGGREGATE')
687+
raise RedisClusterException(
688+
"Cannot execute FT.CURSOR commands without FT.AGGREGATE"
689+
)
684690

685691
return self._aggregate_nodes
686692

@@ -708,7 +714,11 @@ def set_response_callback(self, command: str, callback: ResponseCallbackT) -> No
708714
self.response_callbacks[command] = callback
709715

710716
async def _determine_nodes(
711-
self, command: str, *args: Any, request_policy: RequestPolicy, node_flag: Optional[str] = None
717+
self,
718+
command: str,
719+
*args: Any,
720+
request_policy: RequestPolicy,
721+
node_flag: Optional[str] = None,
712722
) -> List["ClusterNode"]:
713723
# Determine which nodes should be executed the command on.
714724
# Returns a list of target nodes.
@@ -855,7 +865,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
855865
)
856866
else:
857867
if command_flag in self._command_flags_mapping:
858-
command_policies = CommandPolicies(request_policy=self._command_flags_mapping[command_flag])
868+
command_policies = CommandPolicies(
869+
request_policy=self._command_flags_mapping[command_flag]
870+
)
859871
else:
860872
command_policies = CommandPolicies()
861873
elif not command_policies and target_nodes_specified:
@@ -876,7 +888,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
876888
if not target_nodes_specified:
877889
# Determine the nodes to execute the command on
878890
target_nodes = await self._determine_nodes(
879-
*args, request_policy=command_policies.request_policy, node_flag=passed_targets
891+
*args,
892+
request_policy=command_policies.request_policy,
893+
node_flag=passed_targets,
880894
)
881895
if not target_nodes:
882896
raise RedisClusterException(
@@ -890,7 +904,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
890904
ret = self.result_callbacks[command](
891905
command, {target_nodes[0].name: ret}, **kwargs
892906
)
893-
return self._policies_callback_mapping[command_policies.response_policy](ret)
907+
return self._policies_callback_mapping[
908+
command_policies.response_policy
909+
](ret)
894910
else:
895911
keys = [node.name for node in target_nodes]
896912
values = await asyncio.gather(
@@ -905,7 +921,9 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
905921
return self.result_callbacks[command](
906922
command, dict(zip(keys, values)), **kwargs
907923
)
908-
return self._policies_callback_mapping[command_policies.response_policy](dict(zip(keys, values)))
924+
return self._policies_callback_mapping[
925+
command_policies.response_policy
926+
](dict(zip(keys, values)))
909927
except Exception as e:
910928
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
911929
# The nodes and slots cache were should be reinitialized.
@@ -2062,7 +2080,9 @@ async def _execute(
20622080
nodes = {}
20632081
for cmd in todo:
20642082
passed_targets = cmd.kwargs.pop("target_nodes", None)
2065-
command_policies = await client._policy_resolver.resolve(cmd.args[0].lower())
2083+
command_policies = await client._policy_resolver.resolve(
2084+
cmd.args[0].lower()
2085+
)
20662086

20672087
if passed_targets and not client._is_node_flag(passed_targets):
20682088
target_nodes = client._parse_target_nodes(passed_targets)
@@ -2088,12 +2108,17 @@ async def _execute(
20882108
else:
20892109
if command_flag in client._command_flags_mapping:
20902110
command_policies = CommandPolicies(
2091-
request_policy=client._command_flags_mapping[command_flag])
2111+
request_policy=client._command_flags_mapping[
2112+
command_flag
2113+
]
2114+
)
20922115
else:
20932116
command_policies = CommandPolicies()
20942117

20952118
target_nodes = await client._determine_nodes(
2096-
*cmd.args, request_policy=command_policies.request_policy, node_flag=passed_targets
2119+
*cmd.args,
2120+
request_policy=command_policies.request_policy,
2121+
node_flag=passed_targets,
20972122
)
20982123
if not target_nodes:
20992124
raise RedisClusterException(
@@ -2120,11 +2145,9 @@ async def _execute(
21202145
for cmd in todo:
21212146
if isinstance(cmd.result, (TryAgainError, MovedError, AskError)):
21222147
try:
2123-
cmd.result = client._policies_callback_mapping[cmd.command_policies.response_policy](
2124-
await client.execute_command(
2125-
*cmd.args, **cmd.kwargs
2126-
)
2127-
)
2148+
cmd.result = client._policies_callback_mapping[
2149+
cmd.command_policies.response_policy
2150+
](await client.execute_command(*cmd.args, **cmd.kwargs))
21282151
except Exception as e:
21292152
cmd.result = e
21302153

redis/cluster.py

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -727,14 +727,20 @@ def __init__(
727727
SLOT_ID: RequestPolicy.DEFAULT_KEYED,
728728
}
729729

730-
self._policies_callback_mapping: dict[Union[RequestPolicy, ResponsePolicy], Callable] = {
731-
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [self.get_random_primary_or_all_nodes(command_name)],
732-
RequestPolicy.DEFAULT_KEYED: lambda command, *args: self.get_nodes_from_slot(command, *args),
730+
self._policies_callback_mapping: dict[
731+
Union[RequestPolicy, ResponsePolicy], Callable
732+
] = {
733+
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
734+
self.get_random_primary_or_all_nodes(command_name)
735+
],
736+
RequestPolicy.DEFAULT_KEYED: lambda command,
737+
*args: self.get_nodes_from_slot(command, *args),
733738
RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
734739
RequestPolicy.ALL_SHARDS: self.get_primaries,
735740
RequestPolicy.ALL_NODES: self.get_nodes,
736741
RequestPolicy.ALL_REPLICAS: self.get_replicas,
737-
RequestPolicy.MULTI_SHARD: lambda *args, **kwargs: self._split_multi_shard_command(*args, **kwargs),
742+
RequestPolicy.MULTI_SHARD: lambda *args,
743+
**kwargs: self._split_multi_shard_command(*args, **kwargs),
738744
RequestPolicy.SPECIAL: self.get_special_nodes,
739745
ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
740746
ResponsePolicy.DEFAULT_KEYED: lambda res: res,
@@ -866,10 +872,12 @@ def _split_multi_shard_command(self, *args, **kwargs) -> list[dict]:
866872
commands = []
867873

868874
for key in keys:
869-
commands.append({
870-
'args': (args[0], key),
871-
'kwargs': kwargs,
872-
})
875+
commands.append(
876+
{
877+
"args": (args[0], key),
878+
"kwargs": kwargs,
879+
}
880+
)
873881

874882
return commands
875883

@@ -878,7 +886,9 @@ def get_special_nodes(self) -> Optional[list["ClusterNode"]]:
878886
Returns a list of nodes for commands with a special policy.
879887
"""
880888
if not self._aggregate_nodes:
881-
raise RedisClusterException('Cannot execute FT.CURSOR commands without FT.AGGREGATE')
889+
raise RedisClusterException(
890+
"Cannot execute FT.CURSOR commands without FT.AGGREGATE"
891+
)
882892

883893
return self._aggregate_nodes
884894

@@ -912,7 +922,6 @@ def _evaluate_all_succeeded(self, res):
912922

913923
return first_successful_response
914924

915-
916925
def set_default_node(self, node):
917926
"""
918927
Set the default node of the cluster.
@@ -1062,7 +1071,9 @@ def set_response_callback(self, command, callback):
10621071
"""Set a custom Response Callback"""
10631072
self.cluster_response_callbacks[command] = callback
10641073

1065-
def _determine_nodes(self, *args, request_policy: RequestPolicy, **kwargs) -> List["ClusterNode"]:
1074+
def _determine_nodes(
1075+
self, *args, request_policy: RequestPolicy, **kwargs
1076+
) -> List["ClusterNode"]:
10661077
"""
10671078
Determines a nodes the command should be executed on.
10681079
"""
@@ -1274,7 +1285,9 @@ def _internal_execute_command(self, *args, **kwargs):
12741285
)
12751286
else:
12761287
if command_flag in self._command_flags_mapping:
1277-
command_policies = CommandPolicies(request_policy=self._command_flags_mapping[command_flag])
1288+
command_policies = CommandPolicies(
1289+
request_policy=self._command_flags_mapping[command_flag]
1290+
)
12781291
else:
12791292
command_policies = CommandPolicies()
12801293
elif not command_policies and target_nodes_specified:
@@ -1297,7 +1310,9 @@ def _internal_execute_command(self, *args, **kwargs):
12971310
if not target_nodes_specified:
12981311
# Determine the nodes to execute the command on
12991312
target_nodes = self._determine_nodes(
1300-
*args, request_policy=command_policies.request_policy, nodes_flag=passed_targets
1313+
*args,
1314+
request_policy=command_policies.request_policy,
1315+
nodes_flag=passed_targets,
13011316
)
13021317
if not target_nodes:
13031318
raise RedisClusterException(
@@ -1315,7 +1330,12 @@ def _internal_execute_command(self, *args, **kwargs):
13151330
break
13161331

13171332
# Return the processed result
1318-
return self._process_result(args[0], res, response_policy=command_policies.response_policy, **kwargs)
1333+
return self._process_result(
1334+
args[0],
1335+
res,
1336+
response_policy=command_policies.response_policy,
1337+
**kwargs,
1338+
)
13191339
except Exception as e:
13201340
if retry_attempts > 0 and type(e) in self.__class__.ERRORS_ALLOW_RETRY:
13211341
if is_default_node:
@@ -2348,14 +2368,20 @@ def __init__(
23482368
SLOT_ID: RequestPolicy.DEFAULT_KEYED,
23492369
}
23502370

2351-
self._policies_callback_mapping: dict[Union[RequestPolicy, ResponsePolicy], Callable] = {
2352-
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [self.get_random_primary_or_all_nodes(command_name)],
2353-
RequestPolicy.DEFAULT_KEYED: lambda command, *args: self.get_nodes_from_slot(command, *args),
2371+
self._policies_callback_mapping: dict[
2372+
Union[RequestPolicy, ResponsePolicy], Callable
2373+
] = {
2374+
RequestPolicy.DEFAULT_KEYLESS: lambda command_name: [
2375+
self.get_random_primary_or_all_nodes(command_name)
2376+
],
2377+
RequestPolicy.DEFAULT_KEYED: lambda command,
2378+
*args: self.get_nodes_from_slot(command, *args),
23542379
RequestPolicy.DEFAULT_NODE: lambda: [self.get_default_node()],
23552380
RequestPolicy.ALL_SHARDS: self.get_primaries,
23562381
RequestPolicy.ALL_NODES: self.get_nodes,
23572382
RequestPolicy.ALL_REPLICAS: self.get_replicas,
2358-
RequestPolicy.MULTI_SHARD: lambda *args, **kwargs: self._split_multi_shard_command(*args, **kwargs),
2383+
RequestPolicy.MULTI_SHARD: lambda *args,
2384+
**kwargs: self._split_multi_shard_command(*args, **kwargs),
23592385
RequestPolicy.SPECIAL: self.get_special_nodes,
23602386
ResponsePolicy.DEFAULT_KEYLESS: lambda res: res,
23612387
ResponsePolicy.DEFAULT_KEYED: lambda res: res,
@@ -2959,7 +2985,11 @@ def _send_cluster_commands(
29592985
else:
29602986
if not command_policies:
29612987
command = c.args[0].upper()
2962-
if len(c.args) >= 2 and f"{c.args[0]} {c.args[1]}".upper() in self._pipe.command_flags:
2988+
if (
2989+
len(c.args) >= 2
2990+
and f"{c.args[0]} {c.args[1]}".upper()
2991+
in self._pipe.command_flags
2992+
):
29632993
command = f"{c.args[0]} {c.args[1]}".upper()
29642994

29652995
# We only could resolve key properties if command is not
@@ -2981,12 +3011,17 @@ def _send_cluster_commands(
29813011
else:
29823012
if command_flag in self._pipe._command_flags_mapping:
29833013
command_policies = CommandPolicies(
2984-
request_policy=self._pipe._command_flags_mapping[command_flag])
3014+
request_policy=self._pipe._command_flags_mapping[
3015+
command_flag
3016+
]
3017+
)
29853018
else:
29863019
command_policies = CommandPolicies()
29873020

29883021
target_nodes = self._determine_nodes(
2989-
*c.args, request_policy=command_policies.request_policy, node_flag=passed_targets
3022+
*c.args,
3023+
request_policy=command_policies.request_policy,
3024+
node_flag=passed_targets,
29903025
)
29913026
if not target_nodes:
29923027
raise RedisClusterException(
@@ -3117,7 +3152,9 @@ def _send_cluster_commands(
31173152
if c.args[0] in self._pipe.cluster_response_callbacks:
31183153
# Remove keys entry, it needs only for cache.
31193154
c.options.pop("keys", None)
3120-
c.result = self._pipe._policies_callback_mapping[c.command_policies.response_policy](
3155+
c.result = self._pipe._policies_callback_mapping[
3156+
c.command_policies.response_policy
3157+
](
31213158
self._pipe.cluster_response_callbacks[c.args[0]](
31223159
c.result, **c.options
31233160
)
@@ -3152,7 +3189,9 @@ def _parse_target_nodes(self, target_nodes):
31523189
)
31533190
return nodes
31543191

3155-
def _determine_nodes(self, *args, request_policy: RequestPolicy, **kwargs) -> List["ClusterNode"]:
3192+
def _determine_nodes(
3193+
self, *args, request_policy: RequestPolicy, **kwargs
3194+
) -> List["ClusterNode"]:
31563195
# Determine which nodes should be executed the command on.
31573196
# Returns a list of target nodes.
31583197
command = args[0].upper()

0 commit comments

Comments
 (0)