Skip to content

Commit 28c417a

Browse files
replace tokio::spawn with tokio::select
1 parent 85a56b6 commit 28c417a

File tree

2 files changed

+160
-42
lines changed

2 files changed

+160
-42
lines changed

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use futures::{FutureExt, future::try_join_all};
1515
use tokio::time::Duration;
1616
use utils::fingerprint::Fingerprinter;
1717

18+
const TIMEOUT_THRESHOLD: u64 = 1800;
19+
const WARNING_THRESHOLD: u64 = 30;
20+
1821
#[derive(Debug)]
1922
pub(super) enum ValueTypeBuilder {
2023
Basic(BasicValueType),
@@ -814,9 +817,12 @@ impl AnalyzerContext {
814817
})?;
815818
let enable_cache = executor.enable_cache();
816819
let behavior_version = executor.behavior_version();
820+
// let timeout = executor.timeout()
821+
// .or(execution_options_timeout)
822+
// .or(Some(Duration::from_secs(300)));
817823
let timeout = executor.timeout()
818824
.or(execution_options_timeout)
819-
.or(Some(Duration::from_secs(300)));
825+
.or(Some(Duration::from_secs(TIMEOUT_THRESHOLD)));
820826
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
821827
let function_exec_info = AnalyzedFunctionExecInfo {
822828
enable_cache,

rust/cocoindex/src/execution/evaluator.rs

Lines changed: 153 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use utils::immutable::RefList;
1313

1414
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
1515

16+
const TIMEOUT_THRESHOLD: u64 = 1800;
17+
const WARNING_THRESHOLD: u64 = 30;
18+
1619
#[derive(Debug)]
1720
pub struct ScopeValueBuilder {
1821
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
@@ -379,25 +382,20 @@ async fn evaluate_op_scope(
379382
input_values.push(value?);
380383
}
381384

385+
// let timeout_duration = op
386+
// .function_exec_info
387+
// .timeout
388+
// .unwrap_or(Duration::from_secs(300));
389+
// let warn_duration = Duration::from_secs(30);
382390
let timeout_duration = op
383391
.function_exec_info
384392
.timeout
385-
.unwrap_or(Duration::from_secs(300));
386-
let warn_duration = Duration::from_secs(30);
393+
.unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD));
394+
let warn_duration = Duration::from_secs(WARNING_THRESHOLD);
387395

388396
let op_name_for_warning = op.name.clone();
389397
let op_kind_for_warning = op.op_kind.clone();
390-
let warn_handle = tokio::spawn(async move {
391-
tokio::time::sleep(warn_duration).await;
392-
eprintln!(
393-
"WARNING: Function '{}' ({}) is taking longer than 30s",
394-
op_kind_for_warning, op_name_for_warning
395-
);
396-
warn!(
397-
"Function '{}' ({}) is taking longer than 30s",
398-
op_kind_for_warning, op_name_for_warning
399-
);
400-
});
398+
401399
let result = if op.function_exec_info.enable_cache {
402400
let output_value_cell = memory.get_cache_entry(
403401
|| {
@@ -416,38 +414,152 @@ async fn evaluate_op_scope(
416414
op.executor.evaluate(input_values)
417415
});
418416

419-
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
420-
if timeout_result.is_err() {
421-
Err(anyhow!(
422-
"Function '{}' ({}) timed out after {} seconds",
423-
op.op_kind,
424-
op.name,
425-
timeout_duration.as_secs()
426-
))
427-
} else {
428-
timeout_result
429-
.unwrap()
430-
.and_then(|v| head_scope.define_field(&op.output, &v))
431-
}
417+
// Warning + timeout logic
418+
let mut eval_future = Box::pin(eval_future);
419+
let mut warned = false;
420+
let timeout_future = tokio::time::sleep(timeout_duration);
421+
tokio::pin!(timeout_future);
422+
423+
let res = loop {
424+
tokio::select! {
425+
res = &mut eval_future => {
426+
break Ok(res?);
427+
}
428+
_ = &mut timeout_future => {
429+
break Err(anyhow!(
430+
"Function '{}' ({}) timed out after {} seconds",
431+
op.op_kind, op.name, timeout_duration.as_secs()
432+
));
433+
}
434+
_ = tokio::time::sleep(warn_duration), if !warned => {
435+
eprintln!(
436+
"WARNING: Function '{}' ({}) is taking longer than 30s",
437+
op_kind_for_warning, op_name_for_warning
438+
);
439+
warn!(
440+
"Function '{}' ({}) is taking longer than {}s",
441+
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
442+
);
443+
warned = true;
444+
}
445+
}
446+
};
447+
448+
res.and_then(|v| head_scope.define_field(&op.output, &v))
432449
} else {
433450
let eval_future = op.executor.evaluate(input_values);
434451

435-
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
436-
if timeout_result.is_err() {
437-
Err(anyhow!(
438-
"Function '{}' ({}) timed out after {} seconds",
439-
op.op_kind,
440-
op.name,
441-
timeout_duration.as_secs()
442-
))
443-
} else {
444-
timeout_result
445-
.unwrap()
446-
.and_then(|v| head_scope.define_field(&op.output, &v))
447-
}
452+
// Warning + timeout logic
453+
let mut eval_future = Box::pin(eval_future);
454+
let mut warned = false;
455+
let timeout_future = tokio::time::sleep(timeout_duration);
456+
tokio::pin!(timeout_future);
457+
458+
let res = loop {
459+
tokio::select! {
460+
res = &mut eval_future => {
461+
break Ok(res?);
462+
}
463+
_ = &mut timeout_future => {
464+
break Err(anyhow!(
465+
"Function '{}' ({}) timed out after {} seconds",
466+
op.op_kind, op.name, timeout_duration.as_secs()
467+
));
468+
}
469+
_ = tokio::time::sleep(warn_duration), if !warned => {
470+
eprintln!(
471+
"WARNING: Function '{}' ({}) is taking longer than 30s",
472+
op_kind_for_warning, op_name_for_warning
473+
);
474+
warn!(
475+
"Function '{}' ({}) is taking longer than {}s",
476+
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
477+
);
478+
warned = true;
479+
}
480+
}
481+
};
482+
483+
res.and_then(|v| head_scope.define_field(&op.output, &v))
448484
};
449-
450-
warn_handle.abort();
485+
// let warn_handle = tokio::spawn(async move {
486+
// tokio::time::sleep(warn_duration).await;
487+
// eprintln!(
488+
// "WARNING: Function '{}' ({}) is taking longer than 30s",
489+
// op_kind_for_warning, op_name_for_warning
490+
// );
491+
// warn!(
492+
// "Function '{}' ({}) is taking longer than 30s",
493+
// op_kind_for_warning, op_name_for_warning
494+
// );
495+
// });
496+
497+
// let mut op_future = Box::pin(op.executor.evaluate(input_values));
498+
// let mut warned = false;
499+
// let warn_handle = loop{
500+
// tokio::select!{
501+
// res = &mut op_future => {
502+
// break res;
503+
// }
504+
// _ = tokio::time::sleep(warn_duration), if !warned => {
505+
// warn!(
506+
// "Function '{}' ({}) is taking longer than {}s",
507+
// op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
508+
// );
509+
// warned = true;
510+
// }
511+
// }
512+
// };
513+
// let result = if op.function_exec_info.enable_cache {
514+
// let output_value_cell = memory.get_cache_entry(
515+
// || {
516+
// Ok(op
517+
// .function_exec_info
518+
// .fingerprinter
519+
// .clone()
520+
// .with(&input_values)?
521+
// .into_fingerprint())
522+
// },
523+
// &op.function_exec_info.output_type,
524+
// /*ttl=*/ None,
525+
// )?;
526+
527+
// let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
528+
// op.executor.evaluate(input_values)
529+
// });
530+
531+
// let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
532+
// if timeout_result.is_err() {
533+
// Err(anyhow!(
534+
// "Function '{}' ({}) timed out after {} seconds",
535+
// op.op_kind,
536+
// op.name,
537+
// timeout_duration.as_secs()
538+
// ))
539+
// } else {
540+
// timeout_result
541+
// .unwrap()
542+
// .and_then(|v| head_scope.define_field(&op.output, &v))
543+
// }
544+
// } else {
545+
// let eval_future = op.executor.evaluate(input_values);
546+
547+
// let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
548+
// if timeout_result.is_err() {
549+
// Err(anyhow!(
550+
// "Function '{}' ({}) timed out after {} seconds",
551+
// op.op_kind,
552+
// op.name,
553+
// timeout_duration.as_secs()
554+
// ))
555+
// } else {
556+
// timeout_result
557+
// .unwrap()
558+
// .and_then(|v| head_scope.define_field(&op.output, &v))
559+
// }
560+
// };
561+
562+
// warn_handle.abort();
451563

452564
if let Some(ref op_stats) = operation_in_process_stats {
453565
op_stats.finish_processing(&transform_key, 1);

0 commit comments

Comments
 (0)