@@ -184,25 +184,12 @@ def _execute_node_commands(self, n):
184184
185185 n .read ()
186186
187- def _send_cluster_commands (self , stack , raise_on_error = True , allow_redirections = True ):
188- """
189- Send a bunch of cluster commands to the redis cluster.
190-
191- `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
192- automatically. If set to false it will raise RedisClusterException.
193- """
194- # the first time sending the commands we send all of the commands that were queued up.
195- # if we have to run through it again, we only retry the commands that failed.
196- attempt = sorted (stack , key = lambda x : x .position )
197-
198- # build a list of node objects based on node names we need to
187+ def _get_commands_by_node (self , cmds ):
199188 nodes = {}
200189 proxy_node_by_master = {}
201190 connection_by_node = {}
202191
203- # as we move through each command that still needs to be processed,
204- # we figure out the slot number that command maps to, then from the slot determine the node.
205- for c in attempt :
192+ for c in cmds :
206193 # refer to our internal node -> slot table that tells us where a given
207194 # command should route to.
208195 slot = self ._determine_slot (* c .args )
@@ -218,8 +205,6 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
218205 # little hack to make sure the node name is populated. probably could clean this up.
219206 self .connection_pool .nodes .set_node_name (node )
220207
221- # now that we know the name of the node ( it's just a string in the form of host:port )
222- # we can build a list of commands for each node.
223208 node_name = node ['name' ]
224209 if node_name not in nodes :
225210 if node_name in connection_by_node :
@@ -231,6 +216,29 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
231216
232217 nodes [node_name ].append (c )
233218
219+ return nodes
220+
221+ def _execute_single_command (self , cmd ):
222+ try :
223+ # send each command individually like we do in the main client.
224+ cmd .result = super (ClusterPipeline , self ).execute_command (* cmd .args , ** cmd .options )
225+ except RedisError as e :
226+ cmd .result = e
227+
228+ def _send_cluster_commands (self , stack , raise_on_error = True , allow_redirections = True ):
229+ """
230+ Send a bunch of cluster commands to the redis cluster.
231+
232+ `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
233+ automatically. If set to false it will raise RedisClusterException.
234+ """
235+ # the first time sending the commands we send all of the commands that were queued up.
236+ # if we have to run through it again, we only retry the commands that failed.
237+ attempt = sorted (stack , key = lambda x : x .position )
238+
239+ # build a list of node objects based on node names we need to
240+ nodes = self ._get_commands_by_node (attempt )
241+
234242 # send the commands in sequence.
235243 # we write to all the open sockets for each node first, before reading anything
236244 # this allows us to flush all the requests out across the network essentially in parallel
@@ -280,14 +288,13 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
280288 # flag to rebuild the slots table from scratch. So MOVED errors should
281289 # correct themselves fairly quickly.
282290
283- log .debug ("pipeline has failed commands: {}" .format (attempt ))
291+ log .debug ("pipeline has failed commands: {}" .format ([c .result for c in attempt ]))
292+
284293 self .connection_pool .nodes .increment_reinitialize_counter (len (attempt ))
294+ events = []
285295 for c in attempt :
286- try :
287- # send each command individually like we do in the main client.
288- c .result = super (ClusterPipeline , self ).execute_command (* c .args , ** c .options )
289- except RedisError as e :
290- c .result = e
296+ events .append (gevent .spawn (self ._execute_single_command , c ))
297+ gevent .joinall (events )
291298
292299 # turn the response back into a simple flat array that corresponds
293300 # to the sequence of commands issued in the stack in pipeline.execute()
0 commit comments