Skip to content

Commit 91b2f1a

Browse files
replace tokio::spawn with tokio::select
1 parent 8f1a107 commit 91b2f1a

File tree

2 files changed

+160
-42
lines changed

2 files changed

+160
-42
lines changed

src/builder/analyzer.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use futures::future::{BoxFuture, try_join3};
1515
use futures::{FutureExt, future::try_join_all};
1616
use tokio::time::Duration;
1717

18+
const TIMEOUT_THRESHOLD: u64 = 1800;
19+
const WARNING_THRESHOLD: u64 = 30;
20+
1821
#[derive(Debug)]
1922
pub(super) enum ValueTypeBuilder {
2023
Basic(BasicValueType),
@@ -737,9 +740,12 @@ impl AnalyzerContext {
737740
})?;
738741
let enable_cache = executor.enable_cache();
739742
let behavior_version = executor.behavior_version();
743+
// let timeout = executor.timeout()
744+
// .or(execution_options_timeout)
745+
// .or(Some(Duration::from_secs(300)));
740746
let timeout = executor.timeout()
741747
.or(execution_options_timeout)
742-
.or(Some(Duration::from_secs(300)));
748+
.or(Some(Duration::from_secs(TIMEOUT_THRESHOLD)));
743749
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
744750
let function_exec_info = AnalyzedFunctionExecInfo {
745751
enable_cache,

src/execution/evaluator.rs

Lines changed: 153 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ use crate::{
1515

1616
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
1717

18+
const TIMEOUT_THRESHOLD: u64 = 1800;
19+
const WARNING_THRESHOLD: u64 = 30;
20+
1821
#[derive(Debug)]
1922
pub struct ScopeValueBuilder {
2023
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
@@ -381,25 +384,20 @@ async fn evaluate_op_scope(
381384
input_values.push(value?);
382385
}
383386

387+
// let timeout_duration = op
388+
// .function_exec_info
389+
// .timeout
390+
// .unwrap_or(Duration::from_secs(300));
391+
// let warn_duration = Duration::from_secs(30);
384392
let timeout_duration = op
385393
.function_exec_info
386394
.timeout
387-
.unwrap_or(Duration::from_secs(300));
388-
let warn_duration = Duration::from_secs(30);
395+
.unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD));
396+
let warn_duration = Duration::from_secs(WARNING_THRESHOLD);
389397

390398
let op_name_for_warning = op.name.clone();
391399
let op_kind_for_warning = op.op_kind.clone();
392-
let warn_handle = tokio::spawn(async move {
393-
tokio::time::sleep(warn_duration).await;
394-
eprintln!(
395-
"WARNING: Function '{}' ({}) is taking longer than 30s",
396-
op_kind_for_warning, op_name_for_warning
397-
);
398-
warn!(
399-
"Function '{}' ({}) is taking longer than 30s",
400-
op_kind_for_warning, op_name_for_warning
401-
);
402-
});
400+
403401
let result = if op.function_exec_info.enable_cache {
404402
let output_value_cell = memory.get_cache_entry(
405403
|| {
@@ -418,38 +416,152 @@ async fn evaluate_op_scope(
418416
op.executor.evaluate(input_values)
419417
});
420418

421-
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
422-
if timeout_result.is_err() {
423-
Err(anyhow!(
424-
"Function '{}' ({}) timed out after {} seconds",
425-
op.op_kind,
426-
op.name,
427-
timeout_duration.as_secs()
428-
))
429-
} else {
430-
timeout_result
431-
.unwrap()
432-
.and_then(|v| head_scope.define_field(&op.output, &v))
433-
}
419+
// Warning + timeout logic
420+
let mut eval_future = Box::pin(eval_future);
421+
let mut warned = false;
422+
let timeout_future = tokio::time::sleep(timeout_duration);
423+
tokio::pin!(timeout_future);
424+
425+
let res = loop {
426+
tokio::select! {
427+
res = &mut eval_future => {
428+
break Ok(res?);
429+
}
430+
_ = &mut timeout_future => {
431+
break Err(anyhow!(
432+
"Function '{}' ({}) timed out after {} seconds",
433+
op.op_kind, op.name, timeout_duration.as_secs()
434+
));
435+
}
436+
_ = tokio::time::sleep(warn_duration), if !warned => {
437+
eprintln!(
438+
"WARNING: Function '{}' ({}) is taking longer than 30s",
439+
op_kind_for_warning, op_name_for_warning
440+
);
441+
warn!(
442+
"Function '{}' ({}) is taking longer than {}s",
443+
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
444+
);
445+
warned = true;
446+
}
447+
}
448+
};
449+
450+
res.and_then(|v| head_scope.define_field(&op.output, &v))
434451
} else {
435452
let eval_future = op.executor.evaluate(input_values);
436453

437-
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
438-
if timeout_result.is_err() {
439-
Err(anyhow!(
440-
"Function '{}' ({}) timed out after {} seconds",
441-
op.op_kind,
442-
op.name,
443-
timeout_duration.as_secs()
444-
))
445-
} else {
446-
timeout_result
447-
.unwrap()
448-
.and_then(|v| head_scope.define_field(&op.output, &v))
449-
}
454+
// Warning + timeout logic
455+
let mut eval_future = Box::pin(eval_future);
456+
let mut warned = false;
457+
let timeout_future = tokio::time::sleep(timeout_duration);
458+
tokio::pin!(timeout_future);
459+
460+
let res = loop {
461+
tokio::select! {
462+
res = &mut eval_future => {
463+
break Ok(res?);
464+
}
465+
_ = &mut timeout_future => {
466+
break Err(anyhow!(
467+
"Function '{}' ({}) timed out after {} seconds",
468+
op.op_kind, op.name, timeout_duration.as_secs()
469+
));
470+
}
471+
_ = tokio::time::sleep(warn_duration), if !warned => {
472+
eprintln!(
473+
"WARNING: Function '{}' ({}) is taking longer than 30s",
474+
op_kind_for_warning, op_name_for_warning
475+
);
476+
warn!(
477+
"Function '{}' ({}) is taking longer than {}s",
478+
op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
479+
);
480+
warned = true;
481+
}
482+
}
483+
};
484+
485+
res.and_then(|v| head_scope.define_field(&op.output, &v))
450486
};
451-
452-
warn_handle.abort();
487+
// let warn_handle = tokio::spawn(async move {
488+
// tokio::time::sleep(warn_duration).await;
489+
// eprintln!(
490+
// "WARNING: Function '{}' ({}) is taking longer than 30s",
491+
// op_kind_for_warning, op_name_for_warning
492+
// );
493+
// warn!(
494+
// "Function '{}' ({}) is taking longer than 30s",
495+
// op_kind_for_warning, op_name_for_warning
496+
// );
497+
// });
498+
499+
// let mut op_future = Box::pin(op.executor.evaluate(input_values));
500+
// let mut warned = false;
501+
// let warn_handle = loop{
502+
// tokio::select!{
503+
// res = &mut op_future => {
504+
// break res;
505+
// }
506+
// _ = tokio::time::sleep(warn_duration), if !warned => {
507+
// warn!(
508+
// "Function '{}' ({}) is taking longer than {}s",
509+
// op_kind_for_warning, op_name_for_warning, WARNING_THRESHOLD
510+
// );
511+
// warned = true;
512+
// }
513+
// }
514+
// };
515+
// let result = if op.function_exec_info.enable_cache {
516+
// let output_value_cell = memory.get_cache_entry(
517+
// || {
518+
// Ok(op
519+
// .function_exec_info
520+
// .fingerprinter
521+
// .clone()
522+
// .with(&input_values)?
523+
// .into_fingerprint())
524+
// },
525+
// &op.function_exec_info.output_type,
526+
// /*ttl=*/ None,
527+
// )?;
528+
529+
// let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
530+
// op.executor.evaluate(input_values)
531+
// });
532+
533+
// let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
534+
// if timeout_result.is_err() {
535+
// Err(anyhow!(
536+
// "Function '{}' ({}) timed out after {} seconds",
537+
// op.op_kind,
538+
// op.name,
539+
// timeout_duration.as_secs()
540+
// ))
541+
// } else {
542+
// timeout_result
543+
// .unwrap()
544+
// .and_then(|v| head_scope.define_field(&op.output, &v))
545+
// }
546+
// } else {
547+
// let eval_future = op.executor.evaluate(input_values);
548+
549+
// let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
550+
// if timeout_result.is_err() {
551+
// Err(anyhow!(
552+
// "Function '{}' ({}) timed out after {} seconds",
553+
// op.op_kind,
554+
// op.name,
555+
// timeout_duration.as_secs()
556+
// ))
557+
// } else {
558+
// timeout_result
559+
// .unwrap()
560+
// .and_then(|v| head_scope.define_field(&op.output, &v))
561+
// }
562+
// };
563+
564+
// warn_handle.abort();
453565

454566
if let Some(ref op_stats) = operation_in_process_stats {
455567
op_stats.finish_processing(&transform_key, 1);

0 commit comments

Comments
 (0)