@@ -25,7 +25,7 @@ use quickwit_common::pubsub::{EventBroker, EventSubscriber};
2525use quickwit_common:: { rate_limited_error, rate_limited_warn} ;
2626use quickwit_proto:: control_plane:: {
2727 ControlPlaneService , ControlPlaneServiceClient , GetOrCreateOpenShardsRequest ,
28- GetOrCreateOpenShardsSubrequest ,
28+ GetOrCreateOpenShardsSubrequest , GetOrCreateOpenShardsSuccess ,
2929} ;
3030use quickwit_proto:: indexing:: ShardPositionsUpdate ;
3131use quickwit_proto:: ingest:: ingester:: {
@@ -45,13 +45,14 @@ use tracing::{error, info};
4545
4646use super :: broadcast:: LocalShardsUpdate ;
4747use super :: debouncing:: {
48- DebouncedGetOrCreateOpenShardsRequest , GetOrCreateOpenShardsRequestDebouncer ,
48+ DebouncedGetOrCreateOpenShardsRequest , GetOrCreateOpenShardsErrorDebounced ,
49+ GetOrCreateOpenShardsRequestDebouncer , Rendezvous ,
4950} ;
5051use super :: ingester:: PERSIST_REQUEST_TIMEOUT ;
5152use super :: metrics:: IngestResultMetrics ;
5253use super :: routing_table:: { NextOpenShardError , RoutingTable } ;
53- use super :: workbench:: IngestWorkbench ;
54- use super :: { pending_subrequests , IngesterPool } ;
54+ use super :: workbench:: { pending_subrequests_for_attempt , IngestWorkbench } ;
55+ use super :: IngesterPool ;
5556use crate :: { get_ingest_router_buffer_size, LeaderId } ;
5657
5758/// Duration after which ingest requests time out with [`IngestV2Error::Timeout`].
@@ -155,7 +156,7 @@ impl IngestRouter {
155156 }
156157
157158 /// Inspects the shard table for each subrequest and returns the appropriate
158- /// [`GetOrCreateOpenShardsRequest`] request if open shards do not exist for all the them .
159+ /// [`GetOrCreateOpenShardsRequest`] request for those without open shards .
159160 async fn make_get_or_create_open_shard_request (
160161 & self ,
161162 workbench : & mut IngestWorkbench ,
@@ -169,7 +170,9 @@ impl IngestRouter {
169170
170171 let mut state_guard = self . state . lock ( ) . await ;
171172
172- for subrequest in pending_subrequests ( & workbench. subworkbenches ) {
173+ for subrequest in
174+ pending_subrequests_for_attempt ( & workbench. subworkbenches , workbench. num_attempts )
175+ {
173176 if !state_guard. routing_table . has_open_shards (
174177 & subrequest. index_id ,
175178 & subrequest. source_id ,
@@ -192,7 +195,7 @@ impl IngestRouter {
192195 debounced_request. push_subrequest ( subrequest, permit) ;
193196 }
194197 Err ( barrier) => {
195- debounced_request. push_barrier ( barrier) ;
198+ debounced_request. push_barrier ( subrequest . subrequest_id , barrier) ;
196199 }
197200 }
198201 }
@@ -219,28 +222,65 @@ impl IngestRouter {
219222 workbench : & mut IngestWorkbench ,
220223 debounced_request : DebouncedGetOrCreateOpenShardsRequest ,
221224 ) {
222- let ( request_opt, rendezvous) = debounced_request. take ( ) ;
225+ let ( request_opt, mut rendezvous) = debounced_request. take ( ) ;
226+
227+ let Some ( request) = request_opt else {
228+ return ;
229+ } ;
230+
231+ let successes = self
232+ . try_get_or_create_open_shards ( request, workbench, & mut rendezvous)
233+ . await ;
234+
235+ self . populate_routing_table ( successes) . await ;
223236
224- if let Some ( request) = request_opt {
225- self . populate_routing_table ( workbench, request) . await ;
237+ let rendezvouz_errors = rendezvous. wait ( ) . await ;
238+ for ( subrequest_id, error) in rendezvouz_errors. into_iter ( ) {
239+ match error {
240+ GetOrCreateOpenShardsErrorDebounced :: ControlPlaneError ( control_plane_error) => {
241+ workbench. record_get_or_create_open_shards_error (
242+ subrequest_id,
243+ & control_plane_error,
244+ ) ;
245+ }
246+ GetOrCreateOpenShardsErrorDebounced :: Failure ( failure) => {
247+ workbench. record_get_or_create_open_shards_failure ( failure) ;
248+ }
249+ }
226250 }
227- rendezvous. wait ( ) . await ;
228251 }
229252
230- /// Issues a [`GetOrCreateOpenShardsRequest`] request to the control plane and populates the
231- /// shard table according to the response received.
232- async fn populate_routing_table (
253+ /// Issues a [`GetOrCreateOpenShardsRequest`] request, returning the
254+ /// successful shard creations if any and recording the failures to the
255+ /// provided workbench and rendezvous
256+ async fn try_get_or_create_open_shards (
233257 & self ,
234- workbench : & mut IngestWorkbench ,
235258 request : GetOrCreateOpenShardsRequest ,
236- ) {
259+ workbench : & mut IngestWorkbench ,
260+ rendezvous : & mut Rendezvous ,
261+ ) -> Vec < GetOrCreateOpenShardsSuccess > {
237262 if request. subrequests . is_empty ( ) {
238- return ;
263+ return Vec :: new ( ) ;
239264 }
240- let response_result = self . control_plane . get_or_create_open_shards ( request) . await ;
265+ let response_result = self
266+ . control_plane
267+ . get_or_create_open_shards ( request. clone ( ) )
268+ . await ;
241269 let response = match response_result {
242270 Ok ( response) => response,
243271 Err ( control_plane_error) => {
272+ for subrequest in & request. subrequests {
273+ rendezvous. write_error (
274+ subrequest. subrequest_id ,
275+ GetOrCreateOpenShardsErrorDebounced :: ControlPlaneError (
276+ control_plane_error. clone ( ) ,
277+ ) ,
278+ ) ;
279+ workbench. record_get_or_create_open_shards_error (
280+ subrequest. subrequest_id ,
281+ & control_plane_error,
282+ ) ;
283+ }
244284 if workbench. is_last_attempt ( ) {
245285 rate_limited_error ! (
246286 limit_per_min = 10 ,
@@ -252,23 +292,34 @@ impl IngestRouter {
252292 "failed to get open shards from control plane: {control_plane_error}"
253293 ) ;
254294 } ;
255- return ;
295+ return Vec :: new ( ) ;
256296 }
257297 } ;
258- let mut state_guard = self . state . lock ( ) . await ;
259298
260- for success in response. successes {
299+ for failure in response. failures {
300+ rendezvous. write_error (
301+ failure. subrequest_id ,
302+ GetOrCreateOpenShardsErrorDebounced :: Failure ( failure. clone ( ) ) ,
303+ ) ;
304+ workbench. record_get_or_create_open_shards_failure ( failure) ;
305+ }
306+
307+ response. successes
308+ }
309+
310+ async fn populate_routing_table ( & self , successes : Vec < GetOrCreateOpenShardsSuccess > ) {
311+ if successes. is_empty ( ) {
312+ return ;
313+ }
314+
315+ let mut state_guard = self . state . lock ( ) . await ;
316+ for success in successes {
261317 state_guard. routing_table . replace_shards (
262318 success. index_uid ( ) . clone ( ) ,
263319 success. source_id ,
264320 success. open_shards ,
265321 ) ;
266322 }
267- drop ( state_guard) ;
268-
269- for failure in response. failures {
270- workbench. record_get_or_create_open_shards_failure ( failure) ;
271- }
272323 }
273324
274325 async fn process_persist_results (
@@ -375,7 +426,9 @@ impl IngestRouter {
375426 let rate_limited_shards: & HashSet < ShardId > = & workbench. rate_limited_shards ;
376427 let state_guard = self . state . lock ( ) . await ;
377428
378- for subrequest in pending_subrequests ( & workbench. subworkbenches ) {
429+ for subrequest in
430+ pending_subrequests_for_attempt ( & workbench. subworkbenches , workbench. num_attempts )
431+ {
379432 let next_open_shard_res_opt = state_guard
380433 . routing_table
381434 . find_entry ( & subrequest. index_id , & subrequest. source_id )
@@ -547,6 +600,7 @@ fn update_ingest_metrics(ingest_result: &IngestV2Result<IngestResponseV2>, num_s
547600 ingest_results_metrics. router_load_shedding . inc ( )
548601 }
549602 IngestFailureReason :: LoadShedding => ingest_results_metrics. load_shedding . inc ( ) ,
603+ IngestFailureReason :: Unavailable => ingest_results_metrics. unavailable . inc ( ) ,
550604 }
551605 }
552606 }
@@ -1026,9 +1080,15 @@ mod tests {
10261080 closed_shards : Vec :: new ( ) ,
10271081 unavailable_leaders : Vec :: new ( ) ,
10281082 } ;
1029- router
1030- . populate_routing_table ( & mut workbench, get_or_create_open_shards_request)
1083+ let successes = router
1084+ . try_get_or_create_open_shards (
1085+ get_or_create_open_shards_request,
1086+ & mut workbench,
1087+ & mut Rendezvous :: default ( ) ,
1088+ )
10311089 . await ;
1090+ assert_eq ! ( successes. len( ) , 2 ) ;
1091+ router. populate_routing_table ( successes) . await ;
10321092
10331093 let state_guard = router. state . lock ( ) . await ;
10341094 let routing_table = & state_guard. routing_table ;
@@ -1430,7 +1490,7 @@ mod tests {
14301490 let subworkbench = workbench. subworkbenches . get ( & 1 ) . unwrap ( ) ;
14311491 assert ! ( matches!(
14321492 subworkbench. last_failure_opt,
1433- Some ( SubworkbenchFailure :: Unavailable )
1493+ Some ( SubworkbenchFailure :: IngesterUnavailable )
14341494 ) ) ;
14351495 }
14361496
0 commit comments