Skip to content

Commit a930f54

Browse files
feat: add timeout to prevent long-running functions (Closes #658)
1 parent 1d5bf05 commit a930f54

File tree

11 files changed

+156
-17
lines changed

11 files changed

+156
-17
lines changed

python/cocoindex/flow.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,7 @@ class _SourceRefreshOptions:
486486
class _ExecutionOptions:
487487
max_inflight_rows: int | None = None
488488
max_inflight_bytes: int | None = None
489+
timeout: int | None = None
489490

490491

491492
class FlowBuilder:

python/cocoindex/op.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class OpArgs:
154154
- max_batch_size: The maximum batch size for the executor. Only valid if `batching` is True.
155155
- behavior_version: The behavior version of the executor. Cache will be invalidated if it
156156
changes. Must be provided if `cache` is True.
157+
- timeout: Timeout in seconds for this function execution. None means use default (300s).
157158
- arg_relationship: It specifies the relationship between an input argument and the output,
158159
e.g. `(ArgRelationship.CHUNKS_BASE_TEXT, "content")` means the output is chunks for the
159160
input argument with name `content`.
@@ -164,6 +165,7 @@ class OpArgs:
164165
batching: bool = False
165166
max_batch_size: int | None = None
166167
behavior_version: int | None = None
168+
timeout: int | None = None
167169
arg_relationship: tuple[ArgRelationship, str] | None = None
168170

169171

@@ -202,6 +204,7 @@ def _register_op_factory(
202204

203205
class _WrappedExecutor:
204206
_executor: Any
207+
_spec: Any
205208
_args_info: list[_ArgInfo]
206209
_kwargs_info: dict[str, _ArgInfo]
207210
_result_encoder: Callable[[Any], Any]
@@ -391,6 +394,12 @@ def enable_cache(self) -> bool:
391394
def behavior_version(self) -> int | None:
392395
return op_args.behavior_version
393396

397+
def timeout(self) -> int | None:
398+
if op_args.timeout is not None:
399+
return op_args.timeout
400+
401+
return None
402+
394403
def batching_options(self) -> dict[str, Any] | None:
395404
if op_args.batching:
396405
return {

rust/cocoindex/src/base/spec.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,9 @@ pub struct ExecutionOptions {
234234

235235
#[serde(default, skip_serializing_if = "Option::is_none")]
236236
pub max_inflight_bytes: Option<usize>,
237+
238+
#[serde(default, skip_serializing_if = "Option::is_none")]
239+
pub timeout: Option<std::time::Duration>,
237240
}
238241

239242
impl ExecutionOptions {
@@ -289,6 +292,9 @@ impl fmt::Display for ImportOpSpec {
289292
pub struct TransformOpSpec {
290293
pub inputs: Vec<OpArgBinding>,
291294
pub op: OpSpec,
295+
296+
#[serde(default)]
297+
pub execution_options: ExecutionOptions,
292298
}
293299

294300
impl SpecFormatter for TransformOpSpec {

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::{
1212
};
1313
use futures::future::{BoxFuture, try_join3};
1414
use futures::{FutureExt, future::try_join_all};
15+
use tokio::time::Duration;
1516
use utils::fingerprint::Fingerprinter;
1617

1718
#[derive(Debug)]
@@ -804,16 +805,22 @@ impl AnalyzerContext {
804805
let output =
805806
op_scope.add_op_output(reactive_op_name.clone(), output_enriched_type)?;
806807
let op_name = reactive_op_name.clone();
808+
let op_kind = op.op.kind.clone();
809+
let execution_options_timeout = op.execution_options.timeout;
807810
async move {
808811
trace!("Start building executor for transform op `{op_name}`");
809812
let executor = executor.await.with_context(|| {
810813
format!("Preparing for transform op: {op_name}")
811814
})?;
812815
let enable_cache = executor.enable_cache();
813816
let behavior_version = executor.behavior_version();
817+
let timeout = executor.timeout()
818+
.or(execution_options_timeout)
819+
.or(Some(Duration::from_secs(300)));
814820
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
815821
let function_exec_info = AnalyzedFunctionExecInfo {
816822
enable_cache,
823+
timeout,
817824
behavior_version,
818825
fingerprinter: logic_fingerprinter
819826
.with(&behavior_version)?,
@@ -828,6 +835,7 @@ impl AnalyzerContext {
828835
}
829836
Ok(AnalyzedReactiveOp::Transform(AnalyzedTransformOp {
830837
name: op_name,
838+
op_kind,
831839
inputs: input_value_mappings,
832840
function_exec_info,
833841
executor,

rust/cocoindex/src/builder/flow_builder.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,7 @@ impl FlowBuilder {
461461
})
462462
.collect(),
463463
op: spec,
464+
execution_options: Default::default(),
464465
}),
465466
};
466467

rust/cocoindex/src/builder/plan.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::prelude::*;
44

55
use crate::ops::interface::*;
66
use utils::fingerprint::{Fingerprint, Fingerprinter};
7+
use std::time::Duration;
78

89
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
910
pub struct AnalyzedLocalFieldReference {
@@ -64,6 +65,7 @@ pub struct AnalyzedImportOp {
6465

6566
pub struct AnalyzedFunctionExecInfo {
6667
pub enable_cache: bool,
68+
pub timeout: Option<Duration>,
6769
pub behavior_version: Option<u32>,
6870

6971
/// Fingerprinter of the function's behavior.
@@ -74,6 +76,7 @@ pub struct AnalyzedFunctionExecInfo {
7476

7577
pub struct AnalyzedTransformOp {
7678
pub name: String,
79+
pub op_kind: String,
7780
pub inputs: Vec<AnalyzedValueMapping>,
7881
pub function_exec_info: AnalyzedFunctionExecInfo,
7982
pub executor: Box<dyn SimpleFunctionExecutor>,

rust/cocoindex/src/execution/evaluator.rs

Lines changed: 103 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ use crate::prelude::*;
22

33
use anyhow::{Context, Ok};
44
use futures::future::try_join_all;
5+
use log::warn;
6+
use tokio::time::Duration;
57

68
use crate::base::value::EstimatedByteSize;
79
use crate::base::{schema, value};
@@ -366,10 +368,13 @@ async fn evaluate_op_scope(
366368
for reactive_op in op_scope.reactive_ops.iter() {
367369
match reactive_op {
368370
AnalyzedReactiveOp::Transform(op) => {
371+
let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name);
372+
373+
// eprintln!("🔍 DEBUG: Transform op '{}' (function: {}) starting, timeout: {:?}",
374+
// op.name, op.op_kind, op.function_exec_info.timeout);
375+
369376
// Track transform operation start
370377
if let Some(ref op_stats) = operation_in_process_stats {
371-
let transform_key =
372-
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
373378
op_stats.start_processing(&transform_key, 1);
374379
}
375380

@@ -378,6 +383,28 @@ async fn evaluate_op_scope(
378383
input_values.push(value?);
379384
}
380385

386+
let timeout_duration = op
387+
.function_exec_info
388+
.timeout
389+
.unwrap_or(Duration::from_secs(300));
390+
let warn_duration = Duration::from_secs(30);
391+
392+
let op_name_for_warning = op.name.clone();
393+
let op_kind_for_warning = op.op_kind.clone();
394+
let warn_handle = tokio::spawn(async move {
395+
tokio::time::sleep(warn_duration).await;
396+
// eprintln!("WARNING: Function '{}' is taking longer than 30s", op_name_for_warning);
397+
// warn!("Function '{}' is taking longer than 30s", op_name_for_warning);
398+
eprintln!(
399+
"⚠️ WARNING: Function '{}' ({}) is taking longer than 30s",
400+
op_kind_for_warning, op_name_for_warning
401+
); // ✅ Show both
402+
warn!(
403+
"Function '{}' ({}) is taking longer than 30s",
404+
op_kind_for_warning, op_name_for_warning
405+
);
406+
});
407+
// Execute with timeout
381408
let result = if op.function_exec_info.enable_cache {
382409
let output_value_cell = memory.get_cache_entry(
383410
|| {
@@ -391,27 +418,88 @@ async fn evaluate_op_scope(
391418
&op.function_exec_info.output_type,
392419
/*ttl=*/ None,
393420
)?;
394-
evaluate_with_cell(output_value_cell.as_ref(), move || {
421+
422+
let eval_future = evaluate_with_cell(output_value_cell.as_ref(), move || {
395423
op.executor.evaluate(input_values)
396-
})
397-
.await
398-
.and_then(|v| head_scope.define_field(&op.output, &v))
424+
});
425+
426+
// Handle timeout
427+
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
428+
if timeout_result.is_err() {
429+
Err(anyhow!(
430+
// "Function '{}' timed out after {} seconds",
431+
"Function '{}' ({}) timed out after {} seconds",
432+
op.op_kind,
433+
op.name,
434+
timeout_duration.as_secs()
435+
))
436+
} else {
437+
timeout_result
438+
.unwrap()
439+
.and_then(|v| head_scope.define_field(&op.output, &v))
440+
}
399441
} else {
400-
op.executor
401-
.evaluate(input_values)
402-
.await
403-
.and_then(|v| head_scope.define_field(&op.output, &v))
404-
}
405-
.with_context(|| format!("Evaluating Transform op `{}`", op.name,));
442+
let eval_future = op.executor.evaluate(input_values);
443+
444+
// Handle timeout
445+
let timeout_result = tokio::time::timeout(timeout_duration, eval_future).await;
446+
if timeout_result.is_err() {
447+
Err(anyhow!(
448+
// "Function '{}' timed out after {} seconds",
449+
"Function '{}' ({}) timed out after {} seconds",
450+
op.op_kind,
451+
op.name,
452+
timeout_duration.as_secs()
453+
))
454+
} else {
455+
timeout_result
456+
.unwrap()
457+
.and_then(|v| head_scope.define_field(&op.output, &v))
458+
}
459+
};
460+
461+
warn_handle.abort();
406462

407463
// Track transform operation completion
408464
if let Some(ref op_stats) = operation_in_process_stats {
409-
let transform_key =
410-
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
411465
op_stats.finish_processing(&transform_key, 1);
412466
}
413467

414-
result?
468+
result.with_context(|| format!("Evaluating Transform op `{}`", op.name))?
469+
// let result = if op.function_exec_info.enable_cache {
470+
// let output_value_cell = memory.get_cache_entry(
471+
// || {
472+
// Ok(op
473+
// .function_exec_info
474+
// .fingerprinter
475+
// .clone()
476+
// .with(&input_values)?
477+
// .into_fingerprint())
478+
// },
479+
// &op.function_exec_info.output_type,
480+
// /*ttl=*/ None,
481+
// )?;
482+
// evaluate_with_cell(output_value_cell.as_ref(), move || {
483+
// op.executor.evaluate(input_values)
484+
// })
485+
// .await
486+
// .and_then(|v| head_scope.define_field(&op.output, &v))
487+
// } else {
488+
// op.executor
489+
// .evaluate(input_values)
490+
// .await
491+
// .and_then(|v| head_scope.define_field(&op.output, &v))
492+
// }
493+
// .with_context(|| format!("Evaluating Transform op `{}`", op.name,));
494+
495+
// // Track transform operation completion
496+
// if let Some(ref op_stats) = operation_in_process_stats {
497+
// let transform_key =
498+
// format!("transform/{}{}", op_scope.scope_qualifier, op.name);
499+
// op_stats.finish_processing(&transform_key, 1);
500+
// }
501+
502+
// result?
415503
}
416504

417505
AnalyzedReactiveOp::ForEach(op) => {

rust/cocoindex/src/execution/live_updater.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ impl SourceUpdateTask {
172172
let mut change_stream = change_stream;
173173
let retry_options = retryable::RetryOptions {
174174
retry_timeout: None,
175+
per_call_timeout: None,
175176
initial_backoff: std::time::Duration::from_secs(5),
176177
max_backoff: std::time::Duration::from_secs(60),
177178
};

rust/cocoindex/src/ops/interface.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ pub trait SimpleFunctionExecutor: Send + Sync {
185185
fn behavior_version(&self) -> Option<u32> {
186186
None
187187
}
188+
189+
/// Returns None to use the default timeout (300s)
190+
fn timeout(&self) -> Option<std::time::Duration> {
191+
None
192+
}
188193
}
189194

190195
#[async_trait]

rust/cocoindex/src/ops/py_factory.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ struct PyFunctionExecutor {
4343

4444
enable_cache: bool,
4545
behavior_version: Option<u32>,
46+
timeout: Option<std::time::Duration>,
4647
}
4748

4849
impl PyFunctionExecutor {
@@ -112,6 +113,10 @@ impl interface::SimpleFunctionExecutor for Arc<PyFunctionExecutor> {
112113
fn behavior_version(&self) -> Option<u32> {
113114
self.behavior_version
114115
}
116+
117+
fn timeout(&self) -> Option<std::time::Duration> {
118+
self.timeout
119+
}
115120
}
116121

117122
struct PyBatchedFunctionExecutor {
@@ -121,6 +126,7 @@ struct PyBatchedFunctionExecutor {
121126

122127
enable_cache: bool,
123128
behavior_version: Option<u32>,
129+
timeout: Option<std::time::Duration>,
124130
batching_options: batching::BatchingOptions,
125131
}
126132

@@ -240,7 +246,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
240246
.as_ref()
241247
.ok_or_else(|| anyhow!("Python execution context is missing"))?
242248
.clone();
243-
let (prepare_fut, enable_cache, behavior_version, batching_options) =
249+
let (prepare_fut, enable_cache, behavior_version, timeout, batching_options) =
244250
Python::with_gil(|py| -> anyhow::Result<_> {
245251
let prepare_coro = executor
246252
.call_method(py, "prepare", (), None)
@@ -260,6 +266,11 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
260266
.call_method(py, "behavior_version", (), None)
261267
.to_result_with_py_trace(py)?
262268
.extract::<Option<u32>>(py)?;
269+
let timeout = executor
270+
.call_method(py, "timeout", (), None)
271+
.to_result_with_py_trace(py)?
272+
.extract::<Option<u64>>(py)?
273+
.map(std::time::Duration::from_secs);
263274
let batching_options = executor
264275
.call_method(py, "batching_options", (), None)
265276
.to_result_with_py_trace(py)?
@@ -270,7 +281,8 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
270281
Ok((
271282
prepare_fut,
272283
enable_cache,
273-
behavior_version,
284+
behavior_version,
285+
timeout,
274286
batching_options,
275287
))
276288
})?;
@@ -284,6 +296,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
284296
result_type,
285297
enable_cache,
286298
behavior_version,
299+
timeout,
287300
batching_options,
288301
}
289302
.into_fn_executor(),
@@ -297,6 +310,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
297310
result_type,
298311
enable_cache,
299312
behavior_version,
313+
timeout,
300314
}))
301315
};
302316
Ok(executor)

0 commit comments

Comments
 (0)