Skip to content

Commit edd09df

Browse files
feat: add timeout to prevent long-running functions (Closes #658)
1 parent cd96429 commit edd09df

File tree

11 files changed

+155
-17
lines changed

11 files changed

+155
-17
lines changed

python/cocoindex/flow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ class _SourceRefreshOptions:
486486
class _ExecutionOptions:
487487
max_inflight_rows: int | None = None
488488
max_inflight_bytes: int | None = None
489+
timeout: int | None = None
489490

490491

491492
class FlowBuilder:

python/cocoindex/op.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class OpArgs:
154154
- batching: Whether the executor will be batched.
155155
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
156156
changes. Must be provided if `cache` is True.
157+
- timeout: Timeout in seconds for this function execution. None means use default (300s).
157158
- arg_relationship: It specifies the relationship between an input argument and the output,
158159
e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the
159160
input argument with name `content`.
@@ -163,6 +164,7 @@ class OpArgs:
163164
cache: bool = False
164165
batching: bool = False
165166
behavior_version: int | None = None
167+
timeout: int | None = None
166168
arg_relationship: tuple[ArgRelationship, str] | None = None
167169

168170

@@ -201,6 +203,7 @@ def _register_op_factory(
201203

202204
class _WrappedExecutor:
203205
_executor: Any
206+
_spec: Any
204207
_args_info: list[_ArgInfo]
205208
_kwargs_info: dict[str, _ArgInfo]
206209
_result_encoder: Callable[[Any], Any]
@@ -390,6 +393,12 @@ def enable_cache(self) -> bool:
390393
def behavior_version(self) -> int | None:
391394
return op_args.behavior_version
392395

396+
def timeout(self) -> int | None: # ✅ Add this
397+
if op_args.timeout is not None:
398+
return op_args.timeout
399+
400+
return None
401+
393402
if category == OpCategory.FUNCTION:
394403
_engine.register_function_factory(
395404
op_kind,

src/base/spec.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ pub struct ExecutionOptions {
234234

235235
#[serde(default, skip_serializing_if = "Option::is_none")]
236236
pub max_inflight_bytes: Option<usize>,
237+
238+
#[serde(default, skip_serializing_if = "Option::is_none")]
239+
pub timeout: Option<std::time::Duration>,
237240
}
238241

239242
impl ExecutionOptions {
@@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec {
289292
pub struct TransformOpSpec {
290293
pub inputs: Vec<OpArgBinding>,
291294
pub op: OpSpec,
295+
296+
#[serde(default)]
297+
pub execution_options: ExecutionOptions,
292298
}
293299

294300
impl SpecFormatter for TransformOpSpec {

src/builder/analyzer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
};
1414
use futures::future::{BoxFuture, try_join3};
1515
use futures::{FutureExt, future::try_join_all};
16+
use tokio::time::Duration;
1617

1718
#[derive(Debug)]
1819
pub(super) enum ValueTypeBuilder {
@@ -727,16 +728,22 @@ impl AnalyzerContext {
727728
let output =
728729
op_scope.add_op_output(reactive_op.name.clone(), output_enriched_type)?;
729730
let op_name = reactive_op.name.clone();
731+
let op_kind = op.op.kind.clone();
732+
let execution_options_timeout = op.execution_options.timeout;
730733
async move {
731734
trace!("Start building executor for transform op `{op_name}`");
732735
let executor = executor.await.with_context(|| {
733736
format!("Preparing for transform op: {op_name}")
734737
})?;
735738
let enable_cache = executor.enable_cache();
736739
let behavior_version = executor.behavior_version();
740+
let timeout = executor.timeout()
741+
.or(execution_options_timeout)
742+
.or(Some(Duration::from_secs(300)));
737743
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
738744
let function_exec_info = AnalyzedFunctionExecInfo {
739745
enable_cache,
746+
timeout,
740747
behavior_version,
741748
fingerprinter: logic_fingerprinter
742749
.with(&behavior_version)?,
@@ -751,6 +758,7 @@ impl AnalyzerContext {
751758
}
752759
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
753760
name: op_name,
761+
op_kind,
754762
inputs: input_value_mappings,
755763
function_exec_info,
756764
executor,

src/builder/flow_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ impl FlowBuilder {
461461
})
462462
.collect(),
463463
op: spec,
464+
execution_options: Default::default(),
464465
}),
465466
};
466467

src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::prelude::*;
33

44
use crate::ops::interface::*;
55
use crate::utils::fingerprint::{Fingerprint, Fingerprinter};
6+
use std::time::Duration;
67

78
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
89
pub struct AnalyzedLocalFieldReference {
@@ -63,6 +64,7 @@ pub struct AnalyzedImportOp {
6364

6465
pub struct AnalyzedFunctionExecInfo {
6566
pub enable_cache: bool,
67+
pub timeout: Option<Duration>,
6668
pub behavior_version: Option<u32>,
6769

6870
/// Fingerprinter of the function's behavior.
@@ -73,6 +75,7 @@ pub struct AnalyzedFunctionExecInfo {
7375

7476
pub struct AnalyzedTransformOp {
7577
pub name: String,
78+
pub op_kind: String,
7679
pub inputs: Vec<AnalyzedValueMapping>,
7780
pub function_exec_info: AnalyzedFunctionExecInfo,
7881
pub executor: Box<dyn SimpleFunctionExecutor>,

src/execution/evaluator.rs

Lines changed: 103 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use crate::prelude::*;
22

33
use anyhow::{Context, Ok};
44
use futures::future::try_join_all;
5+
use log::warn;
6+
use tokio::time::Duration;
57

68
use crate::base::value::EstimatedByteSize;
79
use crate::builder::{AnalyzedTransientFlow, plan::*};
@@ -368,10 +370,13 @@ async fn evaluate_op_scope(
368370
for reactive_op in op_scope.reactive_ops.iter() {
369371
match reactive_op {
370372
AnalyzedReactiveOp::Transform(op) => {
373+
let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name);
374+
375+
// eprintln!("🔍 DEBUG: Transform op '{}' (function: {}) starting, timeout: {:?}",
376+
// op.name, op.op_kind, op.function_exec_info.timeout);
377+
371378
// Track transform operation start
372379
if let Some(ref op_stats) = operation_in_process_stats {
373-
let transform_key =
374-
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
375380
op_stats.start_processing(&transform_key, 1);
376381
}
377382

@@ -380,6 +385,28 @@ async fn evaluate_op_scope(
380385
input_values.push(value?);
381386
}
382387

388+
let timeout_duration = op
389+
.function_exec_info
390+
.timeout
391+
.unwrap_or(Duration::from_secs(300));
392+
let warn_duration = Duration::from_secs(30);
393+
394+
let op_name_for_warning = op.name.clone();
395+
let op_kind_for_warning = op.op_kind.clone();
396+
let warn_handle = tokio::spawn(async move {
397+
tokio::time::sleep(warn_duration).await;
398+
// eprintln!("WARNING: Function '{}' is taking longer than 30s", op_name_for_warning);
399+
// warn!("Function '{}' is taking longer than 30s", op_name_for_warning);
400+
eprintln!(
401+
"⚠️ WARNING: Function '{}' ({}) is taking longer than 30s",
402+
op_kind_for_warning, op_name_for_warning
403+
); // ✅ Show both
404+
warn!(
405+
"Function '{}' ({}) is taking longer than 30s",
406+
op_kind_for_warning, op_name_for_warning
407+
);
408+
});
409+
// Execute with timeout
383410
let result = if op.function_exec_info.enable_cache {
384411
let output_value_cell = memory.get_cache_entry(
385412
|| {
@@ -393,27 +420,88 @@ async fn evaluate_op_scope(
393420
&op.function_exec_info.output_type,
394421
/*ttl=*/ None,
395422
)?;
396-
evaluate_with_cell(output_value_cell.as_ref(), move || {
423+
424+
let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
397425
op.executor.evaluate(input_values)
398-
})
399-
.await
400-
.and_then(|v| head_scope.define_field(&op.output, &v))
426+
});
427+
428+
// Handle timeout
429+
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
430+
if timeout_result.is_err() {
431+
Err(anyhow!(
432+
// "Function '{}' timed out after {} seconds",
433+
"Function '{}' ({}) timed out after {} seconds",
434+
op.op_kind,
435+
op.name,
436+
timeout_duration.as_secs()
437+
))
438+
} else {
439+
timeout_result
440+
.unwrap()
441+
.and_then(|v| head_scope.define_field(&op.output, &v))
442+
}
401443
} else {
402-
op.executor
403-
.evaluate(input_values)
404-
.await
405-
.and_then(|v| head_scope.define_field(&op.output, &v))
406-
}
407-
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
444+
let eval_future = op.executor.evaluate(input_values);
445+
446+
// Handle timeout
447+
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
448+
if timeout_result.is_err() {
449+
Err(anyhow!(
450+
// "Function '{}' timed out after {} seconds",
451+
"Function '{}' ({}) timed out after {} seconds",
452+
op.op_kind,
453+
op.name,
454+
timeout_duration.as_secs()
455+
))
456+
} else {
457+
timeout_result
458+
.unwrap()
459+
.and_then(|v| head_scope.define_field(&op.output, &v))
460+
}
461+
};
462+
463+
warn_handle.abort();
408464

409465
// Track transform operation completion
410466
if let Some(ref op_stats) = operation_in_process_stats {
411-
let transform_key =
412-
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
413467
op_stats.finish_processing(&transform_key, 1);
414468
}
415469

416-
result?
470+
result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
471+
// let result = if op.function_exec_info.enable_cache {
472+
// let output_value_cell = memory.get_cache_entry(
473+
// || {
474+
// Ok(op
475+
// .function_exec_info
476+
// .fingerprinter
477+
// .clone()
478+
// .with(&input_values)?
479+
// .into_fingerprint())
480+
// },
481+
// &op.function_exec_info.output_type,
482+
// /*ttl=*/ None,
483+
// )?;
484+
// evaluate_with_cell(output_value_cell.as_ref(), move || {
485+
// op.executor.evaluate(input_values)
486+
// })
487+
// .await
488+
// .and_then(|v| head_scope.define_field(&op.output, &v))
489+
// } else {
490+
// op.executor
491+
// .evaluate(input_values)
492+
// .await
493+
// .and_then(|v| head_scope.define_field(&op.output, &v))
494+
// }
495+
// .with_context(|| format!("Evaluating Transform op `{}`", op.name,));
496+
497+
// // Track transform operation completion
498+
// if let Some(ref op_stats) = operation_in_process_stats {
499+
// let transform_key =
500+
// format!("transform/{}{}", op_scope.scope_qualifier, op.name);
501+
// op_stats.finish_processing(&transform_key, 1);
502+
// }
503+
504+
// result?
417505
}
418506

419507
AnalyzedReactiveOp::ForEach(op) => {

src/execution/live_updater.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ impl SourceUpdateTask {
172172
let mut change_stream = change_stream;
173173
let retry_options = retryable::RetryOptions {
174174
retry_timeout: None,
175+
per_call_timeout: None,
175176
initial_backoff: std::time::Duration::from_secs(5),
176177
max_backoff: std::time::Duration::from_secs(60),
177178
};

src/ops/interface.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
185185
fn behavior_version(&self) -> Option<u32> {
186186
None
187187
}
188+
189+
/// Returns None to use the default timeout (300s)
190+
fn timeout(&self) -> Option<std::time::Duration> {
191+
None
192+
}
188193
}
189194

190195
#[async_trait]

src/ops/py_factory.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct PyFunctionExecutor {
4343

4444
enable_cache: bool,
4545
behavior_version: Option<u32>,
46+
timeout: Option<std::time::Duration>,
4647
}
4748

4849
impl PyFunctionExecutor {
@@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
112113
fn behavior_version(&self) -> Option<u32> {
113114
self.behavior_version
114115
}
116+
117+
fn timeout(&self) -> Option<std::time::Duration> {
118+
self.timeout
119+
}
115120
}
116121

117122
struct PyBatchedFunctionExecutor {
@@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor {
121126

122127
enable_cache: bool,
123128
behavior_version: Option<u32>,
129+
timeout: Option<std::time::Duration>,
124130
}
125131

126132
#[async_trait]
@@ -237,7 +243,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
237243
.as_ref()
238244
.ok_or_else(|| anyhow!("Python execution context is missing"))?
239245
.clone();
240-
let (prepare_fut, enable_cache, behavior_version) =
246+
let (prepare_fut, enable_cache, behavior_version, timeout) =
241247
Python::with_gil(|py| -> anyhow::Result<_> {
242248
let prepare_coro = executor
243249
.call_method(py, "prepare", (), None)
@@ -257,7 +263,12 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
257263
.call_method(py, "behavior_version", (), None)
258264
.to_result_with_py_trace(py)?
259265
.extract::<Option<u32>>(py)?;
260-
Ok((prepare_fut, enable_cache, behavior_version))
266+
let timeout = executor
267+
.call_method(py, "timeout", (), None)
268+
.to_result_with_py_trace(py)?
269+
.extract::<Option<u64>>(py)?
270+
.map(std::time::Duration::from_secs);
271+
Ok((prepare_fut, enable_cache, behavior_version, timeout))
261272
})?;
262273
prepare_fut.await?;
263274
let executor: Box<dyn interface::SimpleFunctionExecutor> = if self.batching {
@@ -268,6 +279,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
268279
result_type,
269280
enable_cache,
270281
behavior_version,
282+
timeout,
271283
}
272284
.into_fn_executor(),
273285
)
@@ -280,6 +292,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
280292
result_type,
281293
enable_cache,
282294
behavior_version,
295+
timeout,
283296
}))
284297
};
285298
Ok(executor)

0 commit comments

Comments
 (0)