@@ -165,58 +165,13 @@ impl RpcClient {
165165 } )
166166 }
167167
168- #[ inline]
169- fn inner ( & self ) -> Arc < RpcClientInner > {
170- self . inner . clone ( )
171- }
172-
173- fn group_tasks_by_region < Task > (
174- & self ,
175- tasks : Vec < Task > ,
176- ) -> impl Future < Output = Result < GroupedTasks < Task > > >
177- where
178- Task : GroupingTask ,
179- {
180- let result: Option < GroupedTasks < Task > > = None ;
181- let inner = self . inner ( ) ;
182- loop_fn ( ( 0 , tasks, result) , move |( mut index, tasks, mut result) | {
183- if index == tasks. len ( ) {
184- future:: Either :: Left ( future:: ok ( Loop :: Break ( result) ) )
185- } else {
186- let inner = Arc :: clone ( & inner) ;
187- future:: Either :: Right ( inner. get_region ( tasks[ index] . key ( ) ) . map_ok (
188- move |location| {
189- while let Some ( item) = tasks. get ( index) {
190- if !location. contains ( item. key ( ) ) {
191- break ;
192- }
193- let ver_id = location. ver_id ( ) ;
194- let item = item. clone ( ) ;
195- if let Some ( ref mut grouped) = result {
196- grouped. add ( ver_id, item) ;
197- } else {
198- result = Some ( GroupedTasks :: new ( ver_id, item) ) ;
199- }
200- index += 1 ;
201- }
202- if index == tasks. len ( ) {
203- Loop :: Break ( result)
204- } else {
205- Loop :: Continue ( ( index, tasks, result) )
206- }
207- } ,
208- ) )
209- }
210- } )
211- . map_ok ( |r| r. unwrap_or_default ( ) )
212- }
213-
214168 pub fn raw_get (
215169 & self ,
216170 key : Key ,
217171 cf : Option < ColumnFamily > ,
218172 ) -> impl Future < Output = Result < Option < Value > > > {
219- self . inner ( )
173+ self . inner
174+ . clone ( )
220175 . raw ( & key)
221176 . and_then ( |context| context. client . raw_get ( context. region , cf, key) )
222177 . map_ok ( |value| if value. is_empty ( ) { None } else { Some ( value) } )
@@ -227,7 +182,7 @@ impl RpcClient {
227182 keys : Vec < Key > ,
228183 cf : Option < ColumnFamily > ,
229184 ) -> impl Future < Output = Result < Vec < KvPair > > > {
230- let inner = self . inner ( ) ;
185+ let inner = self . inner . clone ( ) ;
231186 self . group_tasks_by_region ( keys)
232187 . and_then ( move |task_groups| {
233188 let tasks: Vec < _ > = task_groups
@@ -260,7 +215,8 @@ impl RpcClient {
260215 future:: Either :: Left ( future:: err ( Error :: empty_value ( ) ) )
261216 } else {
262217 future:: Either :: Right (
263- self . inner ( )
218+ self . inner
219+ . clone ( )
264220 . raw ( & key)
265221 . and_then ( |context| context. client . raw_put ( context. region , cf, key, value) ) ,
266222 )
@@ -275,7 +231,7 @@ impl RpcClient {
275231 if pairs. iter ( ) . any ( |p| p. value ( ) . is_empty ( ) ) {
276232 future:: Either :: Left ( future:: err ( Error :: empty_value ( ) ) )
277233 } else {
278- let inner = self . inner ( ) ;
234+ let inner = self . inner . clone ( ) ;
279235 future:: Either :: Right (
280236 self . group_tasks_by_region ( pairs)
281237 . and_then ( move |task_groups| {
@@ -301,7 +257,8 @@ impl RpcClient {
301257 key : Key ,
302258 cf : Option < ColumnFamily > ,
303259 ) -> impl Future < Output = Result < ( ) > > {
304- self . inner ( )
260+ self . inner
261+ . clone ( )
305262 . raw ( & key)
306263 . and_then ( |context| context. client . raw_delete ( context. region , cf, key) )
307264 }
@@ -311,7 +268,7 @@ impl RpcClient {
311268 keys : Vec < Key > ,
312269 cf : Option < ColumnFamily > ,
313270 ) -> impl Future < Output = Result < ( ) > > {
314- let inner = self . inner ( ) ;
271+ let inner = self . inner . clone ( ) ;
315272 self . group_tasks_by_region ( keys)
316273 . and_then ( move |task_groups| {
317274 let tasks: Vec < _ > = task_groups
@@ -407,6 +364,47 @@ impl RpcClient {
407364 } ) )
408365 } )
409366 }
367+
368+ fn group_tasks_by_region < Task > (
369+ & self ,
370+ tasks : Vec < Task > ,
371+ ) -> impl Future < Output = Result < GroupedTasks < Task > > >
372+ where
373+ Task : GroupingTask ,
374+ {
375+ let result: Option < GroupedTasks < Task > > = None ;
376+ let inner = self . inner . clone ( ) ;
377+ loop_fn ( ( 0 , tasks, result) , move |( mut index, tasks, mut result) | {
378+ if index == tasks. len ( ) {
379+ future:: Either :: Left ( future:: ok ( Loop :: Break ( result) ) )
380+ } else {
381+ let inner = Arc :: clone ( & inner) ;
382+ future:: Either :: Right ( inner. get_region ( tasks[ index] . key ( ) ) . map_ok (
383+ move |location| {
384+ while let Some ( item) = tasks. get ( index) {
385+ if !location. contains ( item. key ( ) ) {
386+ break ;
387+ }
388+ let ver_id = location. ver_id ( ) ;
389+ let item = item. clone ( ) ;
390+ if let Some ( ref mut grouped) = result {
391+ grouped. add ( ver_id, item) ;
392+ } else {
393+ result = Some ( GroupedTasks :: new ( ver_id, item) ) ;
394+ }
395+ index += 1 ;
396+ }
397+ if index == tasks. len ( ) {
398+ Loop :: Break ( result)
399+ } else {
400+ Loop :: Continue ( ( index, tasks, result) )
401+ }
402+ } ,
403+ ) )
404+ }
405+ } )
406+ . map_ok ( |r| r. unwrap_or_default ( ) )
407+ }
410408}
411409
412410impl fmt:: Debug for RpcClient {
0 commit comments