Skip to content

Commit 8910825

Browse files
diptanuJulio Martinez
andauthored
added queue size config (#1497)
* added queue size config * Move comments about queue size to struct field. --------- Co-authored-by: Julio Martinez <julio@tensorflow.ai>
1 parent 1798006 commit 8910825

File tree

5 files changed

+33
-16
lines changed

5 files changed

+33
-16
lines changed

server/processor/src/graph_processor.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@ pub struct GraphProcessor {
2424
pub task_cache: Arc<task_cache::TaskCache>,
2525
pub state_transition_latency: Histogram<f64>,
2626
pub processor_processing_latency: Histogram<f64>,
27+
pub queue_size: u32,
2728
}
2829

2930
impl GraphProcessor {
30-
pub fn new(indexify_state: Arc<IndexifyState>, task_cache: Arc<task_cache::TaskCache>) -> Self {
31+
pub fn new(
32+
indexify_state: Arc<IndexifyState>,
33+
task_cache: Arc<task_cache::TaskCache>,
34+
queue_size: u32,
35+
) -> Self {
3136
let meter = opentelemetry::global::meter("processor_metrics");
3237

3338
let processor_processing_latency = meter
@@ -49,6 +54,7 @@ impl GraphProcessor {
4954
task_cache,
5055
state_transition_latency,
5156
processor_processing_latency,
57+
queue_size,
5258
}
5359
}
5460

@@ -214,7 +220,7 @@ impl GraphProcessor {
214220

215221
scheduler_update.extend(self.task_cache.try_allocate(indexes.clone()));
216222

217-
scheduler_update.extend(task_allocator::allocate(indexes, clock)?);
223+
scheduler_update.extend(task_allocator::allocate(indexes, clock, self.queue_size)?);
218224
StateMachineUpdateRequest {
219225
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
220226
processed_state_changes: vec![state_change.clone()],
@@ -223,8 +229,12 @@ impl GraphProcessor {
223229
ChangeType::ExecutorUpserted(_) |
224230
ChangeType::ExecutorRemoved(_) |
225231
ChangeType::TombStoneExecutor(_) => {
226-
let scheduler_update =
227-
task_allocator::invoke(indexes, clock, &state_change.change_type)?;
232+
let scheduler_update = task_allocator::invoke(
233+
indexes,
234+
clock,
235+
&state_change.change_type,
236+
self.queue_size,
237+
)?;
228238
StateMachineUpdateRequest {
229239
payload: RequestPayload::SchedulerUpdate(Box::new(scheduler_update)),
230240
processed_state_changes: vec![state_change.clone()],

server/processor/src/task_allocator.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,30 +29,32 @@ use state_store::{
2929
};
3030
use tracing::{debug, error, info, info_span, warn};
3131

32-
// Maximum number of allocations per executor.
33-
//
34-
// In the future, this should be a dynamic value based on:
35-
// - function concurrency configuration
36-
// - function batching configuration
37-
// - function timeout configuration
38-
const MAX_ALLOCATIONS_PER_FN_EXECUTOR: usize = 20;
39-
4032
struct TaskAllocationProcessor<'a> {
4133
in_memory_state: &'a mut InMemoryState,
4234
clock: u64,
35+
36+
// Maximum number of allocations per executor.
37+
//
38+
// In the future, this should be a dynamic value based on:
39+
// - function concurrency configuration
40+
// - function batching configuration
41+
// - function timeout configuration
42+
queue_size: u32,
4343
}
4444

4545
#[tracing::instrument(skip(in_memory_state, clock, change))]
4646
pub fn invoke(
4747
in_memory_state: Arc<RwLock<InMemoryState>>,
4848
clock: u64,
4949
change: &ChangeType,
50+
queue_size: u32,
5051
) -> Result<SchedulerUpdateRequest> {
5152
let mut in_memory_state = in_memory_state.write().unwrap();
5253

5354
let mut task_allocator = TaskAllocationProcessor {
5455
in_memory_state: &mut in_memory_state.deref_mut(),
5556
clock,
57+
queue_size,
5658
};
5759

5860
task_allocator.invoke(change)
@@ -64,12 +66,14 @@ pub fn invoke(
6466
pub fn allocate(
6567
in_memory_state: Arc<RwLock<InMemoryState>>,
6668
clock: u64,
69+
queue_size: u32,
6770
) -> Result<SchedulerUpdateRequest> {
6871
let mut in_memory_state = in_memory_state.write().unwrap();
6972

7073
let mut task_allocator = TaskAllocationProcessor {
7174
in_memory_state: &mut in_memory_state.deref_mut(),
7275
clock,
76+
queue_size,
7377
};
7478

7579
task_allocator.allocate()
@@ -255,7 +259,7 @@ impl<'a> TaskAllocationProcessor<'a> {
255259
let mut update = SchedulerUpdateRequest::default();
256260
let mut function_executors = self
257261
.in_memory_state
258-
.candidate_function_executors(task, MAX_ALLOCATIONS_PER_FN_EXECUTOR)?;
262+
.candidate_function_executors(task, self.queue_size)?;
259263
if function_executors.function_executors.is_empty() &&
260264
function_executors.num_pending_function_executors == 0
261265
{
@@ -269,7 +273,7 @@ impl<'a> TaskAllocationProcessor<'a> {
269273
)?;
270274
function_executors = self
271275
.in_memory_state
272-
.candidate_function_executors(task, MAX_ALLOCATIONS_PER_FN_EXECUTOR)?;
276+
.candidate_function_executors(task, self.queue_size)?;
273277
}
274278
info!(
275279
num_function_executors = function_executors.function_executors.len(),

server/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub struct ServerConfig {
1717
pub blob_storage: BlobStorageConfig,
1818
pub tracing: TracingConfig,
1919
pub executor: ExecutorConfig,
20+
pub queue_size: u32,
2021
}
2122

2223
impl Default for ServerConfig {
@@ -30,6 +31,7 @@ impl Default for ServerConfig {
3031
blob_storage: Default::default(),
3132
tracing: TracingConfig::default(),
3233
executor: ExecutorConfig::default(),
34+
queue_size: 20,
3335
}
3436
}
3537
}

server/src/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl Service {
8080
let graph_processor = Arc::new(GraphProcessor::new(
8181
indexify_state.clone(),
8282
task_cache.clone(),
83+
config.queue_size,
8384
));
8485
Ok(Self {
8586
config,

server/state_store/src/in_memory_state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,7 @@ impl InMemoryState {
10171017
pub fn candidate_function_executors(
10181018
&self,
10191019
task: &Task,
1020-
capacity_threshold: usize,
1020+
capacity_threshold: u32,
10211021
) -> Result<CandidateFunctionExecutors> {
10221022
let mut candidates = Vec::new();
10231023
let fn_uri = FunctionURI::from(task);
@@ -1043,7 +1043,7 @@ impl InMemoryState {
10431043
.and_then(|alloc_map| alloc_map.get(&function_executor.function_executor.id))
10441044
.map(|allocs| allocs.len())
10451045
.unwrap_or(0);
1046-
if allocation_count < capacity_threshold {
1046+
if (allocation_count as u32) < capacity_threshold {
10471047
candidates.push(function_executor.clone());
10481048
}
10491049
}

0 commit comments

Comments
 (0)