@@ -216,12 +216,14 @@ fn attempt_place_unassigned_shards(
216216) -> Result < SchedulingSolution , NotEnoughCapacity > {
217217 let mut solution = partial_solution. clone ( ) ;
218218 for source in unassigned_shards {
219- let indexers_with_most_available_capacity =
220- compute_indexer_available_capacity ( problem, & solution)
221- . sorted_by_key ( |( indexer_ord, capacity) | Reverse ( ( * capacity, * indexer_ord) ) ) ;
219+ let mut indexers_with_most_available_capacity =
220+ compute_indexer_available_capacity ( problem, & solution) . collect_vec ( ) ;
221+ indexers_with_most_available_capacity
222+ . sort_by_key ( |( indexer_ord, capacity) | Reverse ( ( * capacity, * indexer_ord) ) ) ;
222223 place_unassigned_shards_single_source (
223224 source,
224225 indexers_with_most_available_capacity,
226+ problem. unscaled_indexer_cpu_capacities ( ) ,
225227 & mut solution,
226228 ) ?;
227229 }
@@ -241,7 +243,7 @@ fn place_unassigned_shards_with_affinity(
241243 for source in & unassigned_shards {
242244 // List of indexer with a non-null affinity and some available capacity, sorted by
243245 // (affinity, available capacity) in that order.
244- let indexers_with_affinity_and_available_capacity = source
246+ let indexers_with_available_capacity = source
245247 . affinities
246248 . iter ( )
247249 . filter ( |& ( _, & affinity) | affinity != 0u32 )
@@ -254,10 +256,12 @@ fn place_unassigned_shards_with_affinity(
254256 . sorted_by_key ( |( indexer_ord, affinity, capacity) | {
255257 Reverse ( ( * affinity, * capacity, * indexer_ord) )
256258 } )
257- . map ( |( indexer_ord, _, capacity) | ( indexer_ord, capacity) ) ;
259+ . map ( |( indexer_ord, _, capacity) | ( indexer_ord, capacity) )
260+ . collect_vec ( ) ;
258261 let _ = place_unassigned_shards_single_source (
259262 source,
260- indexers_with_affinity_and_available_capacity,
263+ indexers_with_available_capacity,
264+ problem. unscaled_indexer_cpu_capacities ( ) ,
261265 solution,
262266 ) ;
263267 }
@@ -346,26 +350,73 @@ struct NotEnoughCapacity;
346350/// amongst the node with their given node capacity.
347351fn place_unassigned_shards_single_source (
348352 source : & Source ,
349- mut indexer_with_capacities : impl Iterator < Item = ( IndexerOrd , CpuCapacity ) > ,
353+ mut indexer_with_capacities : Vec < ( IndexerOrd , CpuCapacity ) > ,
354+ unscaled_capacities : & [ CpuCapacity ] ,
350355 solution : & mut SchedulingSolution ,
351356) -> Result < ( ) , NotEnoughCapacity > {
352357 let mut num_shards = source. num_shards ;
353- while num_shards > 0 {
354- let Some ( ( indexer_ord, available_capacity) ) = indexer_with_capacities. next ( ) else {
355- return Err ( NotEnoughCapacity ) ;
356- } ;
357- let num_placable_shards = available_capacity. cpu_millis ( ) / source. load_per_shard ;
358- let num_shards_to_place = num_placable_shards. min ( num_shards) ;
358+ let mut previous_num_shards = u32:: MAX ;
359+ while previous_num_shards > num_shards {
360+ previous_num_shards = num_shards;
361+ let indexer_with_capacities_iter = indexer_with_capacities
362+ . iter_mut ( )
363+ . map ( |( indexer_ord, available_capacity) | ( * indexer_ord, available_capacity) ) ;
364+ place_unassigned_shards_single_source_iteration (
365+ source,
366+ & mut num_shards,
367+ indexer_with_capacities_iter,
368+ unscaled_capacities,
369+ solution,
370+ ) ;
371+ if num_shards == 0 {
372+ // All shards have been placed.
373+ return Ok ( ( ) ) ;
374+ }
375+ }
376+ // Last placement iteration didn't make progress,
377+ // we won't be able to place the remaining shards
378+ Err ( NotEnoughCapacity )
379+ }
380+
381+ /// Places as many shards as possible to indexers while respecting both the the
382+ /// remaining scaled node capacities and the original unscaled node capacities.
383+ fn place_unassigned_shards_single_source_iteration < ' a > (
384+ source : & Source ,
385+ remaining_shards_to_place : & mut u32 ,
386+ indexer_with_capacities : impl Iterator < Item = ( IndexerOrd , & ' a mut CpuCapacity ) > ,
387+ unscaled_capacities : & [ CpuCapacity ] ,
388+ solution : & mut SchedulingSolution ,
389+ ) {
390+ for ( indexer_ord, available_capacity) in indexer_with_capacities {
391+ if * remaining_shards_to_place == 0 {
392+ return ;
393+ }
394+ let num_placable_shards_into_scaled_capacity =
395+ available_capacity. cpu_millis ( ) / source. load_per_shard ;
396+
397+ // We limit each node's shard allocation per iteration to what fits in
398+ // its original capacity. This introduces a behavior that distributes
399+ // shards more evenly accross nodes when the system capacity is
400+ // over-subscribed. If the shard's load doesn't fit into the original
401+ // capacity, we still allow one shard to be placed.
402+ let num_placable_shards_into_original_capacity =
403+ ( unscaled_capacities[ indexer_ord] . cpu_millis ( ) / source. load_per_shard ) . max ( 1 ) ;
404+
405+ let num_shards_to_place = num_placable_shards_into_scaled_capacity
406+ . min ( num_placable_shards_into_original_capacity)
407+ . min ( * remaining_shards_to_place) ;
408+
359409 // Update the solution, the shard load, and the number of shards to place.
360410 if num_shards_to_place == 0u32 {
361411 // No need to fill indexer_assignments with empty assignments.
362412 continue ;
363413 }
364414 solution. indexer_assignments [ indexer_ord]
365415 . add_shards ( source. source_ord , num_shards_to_place) ;
366- num_shards -= num_shards_to_place;
416+ * remaining_shards_to_place -= num_shards_to_place;
417+ * available_capacity = * available_capacity
418+ - CpuCapacity :: from_cpu_millis ( num_shards_to_place * source. load_per_shard . get ( ) ) ;
367419 }
368- Ok ( ( ) )
369420}
370421
371422/// Compute the sources/shards that have not been assigned to any indexer yet.
@@ -419,7 +470,7 @@ mod tests {
419470 use std:: num:: NonZeroU32 ;
420471
421472 use proptest:: prelude:: * ;
422- use quickwit_proto:: indexing:: mcpu;
473+ use quickwit_proto:: indexing:: { PIPELINE_FULL_CAPACITY , mcpu} ;
423474
424475 use super :: * ;
425476
@@ -783,4 +834,28 @@ mod tests {
783834 solve( problem, solution) ;
784835 }
785836 }
837+
838+ #[ test]
839+ fn test_oversubscribing_sources_get_balanced ( ) {
840+ let mut problem: SchedulingProblem = SchedulingProblem :: with_indexer_cpu_capacities ( vec ! [
841+ mcpu( 8000 ) ,
842+ mcpu( 8000 ) ,
843+ mcpu( 8000 ) ,
844+ mcpu( 8000 ) ,
845+ ] ) ;
846+ for _ in 0 ..12 {
847+ problem. add_source (
848+ 4 ,
849+ NonZeroU32 :: new ( PIPELINE_FULL_CAPACITY . cpu_millis ( ) ) . unwrap ( ) ,
850+ ) ;
851+ }
852+
853+ let old_solution = problem. new_solution ( ) ;
854+ let solution = solve ( problem, old_solution) ;
855+ for assignement in & solution. indexer_assignments {
856+ for & num_shards in assignement. num_shards_per_source . values ( ) {
857+ assert_eq ! ( num_shards, 2 ) ;
858+ }
859+ }
860+ }
786861}
0 commit comments