Skip to content

Commit a710c08

Browse files
[FEATURE]: Add timeout to prevent long-running functions (Closes #658) (#1241)
* feat: add timeout to prevent long-running functions (Closes #658) * cleaned evaluator.rs * cleaned markers * replace tokio::spawn with tokio::select * cleaned commented code * added evaluate_with_timeout_and_warning * fixed merge conflicts * fix timeout type
1 parent 9e9ae74 commit a710c08

File tree

9 files changed

+128
-12
lines changed

9 files changed

+128
-12
lines changed

python/cocoindex/flow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ class _SourceRefreshOptions:
486486
class _ExecutionOptions:
487487
max_inflight_rows: int | None = None
488488
max_inflight_bytes: int | None = None
489+
timeout: datetime.timedelta | None = None
489490

490491

491492
class FlowBuilder:

python/cocoindex/op.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
)
5050
from .runtime import to_async_call
5151
from .index import IndexOptions
52+
import datetime
5253

5354

5455
class OpCategory(Enum):
@@ -154,6 +155,7 @@ class OpArgs:
154155
- max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True.
155156
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
156157
changes. Must be provided if `cache` is True.
158+
- timeout: Timeout in seconds for this function execution. None means use default (300s).
157159
- arg_relationship: It specifies the relationship between an input argument and the output,
158160
e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the
159161
input argument with name `content`.
@@ -164,6 +166,7 @@ class OpArgs:
164166
batching: bool = False
165167
max_batch_size: int | None = None
166168
behavior_version: int | None = None
169+
timeout: datetime.timedelta | None = None
167170
arg_relationship: tuple[ArgRelationship, str] | None = None
168171

169172

@@ -202,6 +205,7 @@ def _register_op_factory(
202205

203206
class _WrappedExecutor:
204207
_executor: Any
208+
_spec: Any
205209
_args_info: list[_ArgInfo]
206210
_kwargs_info: dict[str, _ArgInfo]
207211
_result_encoder: Callable[[Any], Any]
@@ -391,6 +395,9 @@ def enable_cache(self) -> bool:
391395
def behavior_version(self) -> int | None:
392396
return op_args.behavior_version
393397

398+
def timeout(self) -> datetime.timedelta | None:
399+
return op_args.timeout
400+
394401
def batching_options(self) -> dict[str, Any] | None:
395402
if op_args.batching:
396403
return {

rust/cocoindex/src/base/spec.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ pub struct ExecutionOptions {
234234

235235
#[serde(default, skip_serializing_if = "Option::is_none")]
236236
pub max_inflight_bytes: Option<usize>,
237+
238+
#[serde(default, skip_serializing_if = "Option::is_none")]
239+
pub timeout: Option<std::time::Duration>,
237240
}
238241

239242
impl ExecutionOptions {
@@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec {
289292
pub struct TransformOpSpec {
290293
pub inputs: Vec<OpArgBinding>,
291294
pub op: OpSpec,
295+
296+
#[serde(default)]
297+
pub execution_options: ExecutionOptions,
292298
}
293299

294300
impl SpecFormatter for TransformOpSpec {

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,11 @@ use crate::{
1212
};
1313
use futures::future::{BoxFuture, try_join3};
1414
use futures::{FutureExt, future::try_join_all};
15+
use std::time::Duration;
1516
use utils::fingerprint::Fingerprinter;
1617

18+
const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
19+
1720
#[derive(Debug)]
1821
pub(super) enum ValueTypeBuilder {
1922
Basic(BasicValueType),
@@ -804,16 +807,22 @@ impl AnalyzerContext {
804807
let output =
805808
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
806809
let op_name = reactive_op_name.clone();
810+
let op_kind = op.op.kind.clone();
811+
let execution_options_timeout = op.execution_options.timeout;
807812
async move {
808813
trace!("Start building executor for transform op `{op_name}`");
809814
let executor = executor.await.with_context(|| {
810815
format!("Preparing for transform op: {op_name}")
811816
})?;
812817
let enable_cache = executor.enable_cache();
813818
let behavior_version = executor.behavior_version();
819+
let timeout = executor.timeout()
820+
.or(execution_options_timeout)
821+
.or(Some(TIMEOUT_THRESHOLD));
814822
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
815823
let function_exec_info = AnalyzedFunctionExecInfo {
816824
enable_cache,
825+
timeout,
817826
behavior_version,
818827
fingerprinter: logic_fingerprinter
819828
.with(&behavior_version)?,
@@ -828,6 +837,7 @@ impl AnalyzerContext {
828837
}
829838
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
830839
name: op_name,
840+
op_kind,
831841
inputs: input_value_mappings,
832842
function_exec_info,
833843
executor,

rust/cocoindex/src/builder/flow_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ impl FlowBuilder {
461461
})
462462
.collect(),
463463
op: spec,
464+
execution_options: Default::default(),
464465
}),
465466
};
466467

rust/cocoindex/src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::base::spec::FieldName;
33
use crate::prelude::*;
44

55
use crate::ops::interface::*;
6+
use std::time::Duration;
67
use utils::fingerprint::{Fingerprint, Fingerprinter};
78

89
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
@@ -64,6 +65,7 @@ pub struct AnalyzedImportOp {
6465

6566
pub struct AnalyzedFunctionExecInfo {
6667
pub enable_cache: bool,
68+
pub timeout: Option<Duration>,
6769
pub behavior_version: Option<u32>,
6870

6971
/// Fingerprinter of the function's behavior.
@@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo {
7476

7577
pub struct AnalyzedTransformOp {
7678
pub name: String,
79+
pub op_kind: String,
7780
pub inputs: Vec<AnalyzedValueMapping>,
7881
pub function_exec_info: AnalyzedFunctionExecInfo,
7982
pub executor: Box<dyn SimpleFunctionExecutor>,

rust/cocoindex/src/execution/evaluator.rs

Lines changed: 74 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use crate::prelude::*;
22

33
use anyhow::{Context, Ok};
44
use futures::future::try_join_all;
5+
use log::warn;
6+
use tokio::time::Duration;
57

68
use crate::base::value::EstimatedByteSize;
79
use crate::base::{schema, value};
@@ -11,6 +13,9 @@ use utils::immutable::RefList;
1113

1214
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
1315

16+
const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
17+
const WARNING_THRESHOLD: Duration = Duration::from_secs(30);
18+
1419
#[derive(Debug)]
1520
pub struct ScopeValueBuilder {
1621
// TODO: Share the same lock for values produced in the same execution scope, for stricter atomicity.
@@ -356,6 +361,43 @@ async fn evaluate_child_op_scope(
356361
})
357362
}
358363

364+
async fn evaluate_with_timeout_and_warning<F, T>(
365+
eval_future: F,
366+
timeout_duration: Duration,
367+
warn_duration: Duration,
368+
op_kind: String,
369+
op_name: String,
370+
) -> Result<T>
371+
where
372+
F: std::future::Future<Output = Result<T>>,
373+
{
374+
let mut eval_future = Box::pin(eval_future);
375+
let mut warned = false;
376+
let timeout_future = tokio::time::sleep(timeout_duration);
377+
tokio::pin!(timeout_future);
378+
379+
loop {
380+
tokio::select! {
381+
res = &mut eval_future => {
382+
return res;
383+
}
384+
_ = &mut timeout_future => {
385+
return Err(anyhow!(
386+
"Function '{}' ({}) timed out after {} seconds",
387+
op_kind, op_name, timeout_duration.as_secs()
388+
));
389+
}
390+
_ = tokio::time::sleep(warn_duration), if !warned => {
391+
warn!(
392+
"Function '{}' ({}) is taking longer than {}s",
393+
op_kind, op_name, WARNING_THRESHOLD.as_secs()
394+
);
395+
warned = true;
396+
}
397+
}
398+
}
399+
}
400+
359401
async fn evaluate_op_scope(
360402
op_scope: &AnalyzedOpScope,
361403
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
@@ -378,6 +420,12 @@ async fn evaluate_op_scope(
378420
input_values.push(value?);
379421
}
380422

423+
let timeout_duration = op.function_exec_info.timeout.unwrap_or(TIMEOUT_THRESHOLD);
424+
let warn_duration = WARNING_THRESHOLD;
425+
426+
let op_name_for_warning = op.name.clone();
427+
let op_kind_for_warning = op.op_kind.clone();
428+
381429
let result = if op.function_exec_info.enable_cache {
382430
let output_value_cell = memory.get_cache_entry(
383431
|| {
@@ -391,18 +439,33 @@ async fn evaluate_op_scope(
391439
&op.function_exec_info.output_type,
392440
/*ttl=*/ None,
393441
)?;
394-
evaluate_with_cell(output_value_cell.as_ref(), move || {
442+
443+
let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
395444
op.executor.evaluate(input_values)
396-
})
397-
.await
398-
.and_then(|v| head_scope.define_field(&op.output, &v))
445+
});
446+
let v = evaluate_with_timeout_and_warning(
447+
eval_future,
448+
timeout_duration,
449+
warn_duration,
450+
op_kind_for_warning,
451+
op_name_for_warning,
452+
)
453+
.await?;
454+
455+
head_scope.define_field(&op.output, &v)
399456
} else {
400-
op.executor
401-
.evaluate(input_values)
402-
.await
403-
.and_then(|v| head_scope.define_field(&op.output, &v))
404-
}
405-
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
457+
let eval_future = op.executor.evaluate(input_values);
458+
let v = evaluate_with_timeout_and_warning(
459+
eval_future,
460+
timeout_duration,
461+
warn_duration,
462+
op_kind_for_warning,
463+
op_name_for_warning,
464+
)
465+
.await?;
466+
467+
head_scope.define_field(&op.output, &v)
468+
};
406469

407470
// Track transform operation completion
408471
if let Some(ref op_stats) = operation_in_process_stats {
@@ -411,7 +474,7 @@ async fn evaluate_op_scope(
411474
op_stats.finish_processing(&transform_key, 1);
412475
}
413476

414-
result?
477+
result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
415478
}
416479

417480
AnalyzedReactiveOp::ForEach(op) => {

rust/cocoindex/src/ops/interface.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
185185
fn behavior_version(&self) -> Option<u32> {
186186
None
187187
}
188+
189+
/// Returns None to use the default timeout (1800s)
190+
fn timeout(&self) -> Option<std::time::Duration> {
191+
None
192+
}
188193
}
189194

190195
#[async_trait]

rust/cocoindex/src/ops/py_factory.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct PyFunctionExecutor {
4343

4444
enable_cache: bool,
4545
behavior_version: Option<u32>,
46+
timeout: Option<std::time::Duration>,
4647
}
4748

4849
impl PyFunctionExecutor {
@@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
112113
fn behavior_version(&self) -> Option<u32> {
113114
self.behavior_version
114115
}
116+
117+
fn timeout(&self) -> Option<std::time::Duration> {
118+
self.timeout
119+
}
115120
}
116121

117122
struct PyBatchedFunctionExecutor {
@@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor {
121126

122127
enable_cache: bool,
123128
behavior_version: Option<u32>,
129+
timeout: Option<std::time::Duration>,
124130
batching_options: batching::BatchingOptions,
125131
}
126132

@@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
240246
.as_ref()
241247
.ok_or_else(|| anyhow!("Python execution context is missing"))?
242248
.clone();
243-
let (prepare_fut, enable_cache, behavior_version, batching_options) =
249+
let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) =
244250
Python::with_gil(|py| -> anyhow::Result<_> {
245251
let prepare_coro = executor
246252
.call_method(py, "prepare", (), None)
@@ -260,6 +266,17 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
260266
.call_method(py, "behavior_version", (), None)
261267
.to_result_with_py_trace(py)?
262268
.extract::<Option<u32>>(py)?;
269+
let timeout = executor
270+
.call_method(py, "timeout", (), None)
271+
.to_result_with_py_trace(py)?;
272+
let timeout = if timeout.is_none(py) {
273+
None
274+
} else {
275+
let td = timeout.into_bound(py);
276+
let total_seconds =
277+
td.call_method0("total_seconds")?.extract::<f64>()?;
278+
Some(std::time::Duration::from_secs_f64(total_seconds))
279+
};
263280
let batching_options = executor
264281
.call_method(py, "batching_options", (), None)
265282
.to_result_with_py_trace(py)?
@@ -271,6 +288,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
271288
prepare_fut,
272289
enable_cache,
273290
behavior_version,
291+
timeout,
274292
batching_options,
275293
))
276294
})?;
@@ -284,6 +302,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
284302
result_type,
285303
enable_cache,
286304
behavior_version,
305+
timeout,
287306
batching_options,
288307
}
289308
.into_fn_executor(),
@@ -297,6 +316,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
297316
result_type,
298317
enable_cache,
299318
behavior_version,
319+
timeout,
300320
}))
301321
};
302322
Ok(executor)

0 commit comments

Comments
 (0)