Skip to content

Commit 6e1b7b3

Browse files
fixed merge conflicts
1 parent 199bed9 commit 6e1b7b3

File tree

9 files changed

+59
-23
lines changed

9 files changed

+59
-23
lines changed

python/cocoindex/flow.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ class _SourceRefreshOptions:
486486
class _ExecutionOptions:
487487
max_inflight_rows: int | None = None
488488
max_inflight_bytes: int | None = None
489-
timeout: int | None = None
489+
timeout: datetime.timedelta | None = None
490490

491491

492492
class FlowBuilder:

python/cocoindex/op.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
)
5050
from .runtime import to_async_call
5151
from .index import IndexOptions
52+
import datetime
5253

5354

5455
class OpCategory(Enum):
@@ -165,7 +166,7 @@ class OpArgs:
165166
batching: bool = False
166167
max_batch_size: int | None = None
167168
behavior_version: int | None = None
168-
timeout: int | None = None
169+
timeout: datetime.timedelta | None = None
169170
arg_relationship: tuple[ArgRelationship, str] | None = None
170171

171172

@@ -394,7 +395,7 @@ def enable_cache(self) -> bool:
394395
def behavior_version(self) -> int | None:
395396
return op_args.behavior_version
396397

397-
def timeout(self) -> int | None:
398+
def timeout(self) -> datetime.timedelta | None:
398399
if op_args.timeout is not None:
399400
return op_args.timeout
400401

rust/cocoindex/src/builder/analyzer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ use crate::{
1212
};
1313
use futures::future::{BoxFuture, try_join3};
1414
use futures::{FutureExt, future::try_join_all};
15-
use tokio::time::Duration;
15+
use std::time::Duration;
1616
use utils::fingerprint::Fingerprinter;
1717

18-
const TIMEOUT_THRESHOLD: u64 = 1800;
18+
const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
1919

2020
#[derive(Debug)]
2121
pub(super) enum ValueTypeBuilder {
@@ -818,7 +818,7 @@ impl AnalyzerContext {
818818
let behavior_version = executor.behavior_version();
819819
let timeout = executor.timeout()
820820
.or(execution_options_timeout)
821-
.or(Some(Duration::from_secs(TIMEOUT_THRESHOLD)));
821+
.or(Some(TIMEOUT_THRESHOLD));
822822
trace!("Finished building executor for transform op `{op_name}`, enable cache: {enable_cache}, behavior version: {behavior_version:?}");
823823
let function_exec_info = AnalyzedFunctionExecInfo {
824824
enable_cache,

rust/cocoindex/src/builder/plan.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use crate::base::spec::FieldName;
33
use crate::prelude::*;
44

55
use crate::ops::interface::*;
6-
use utils::fingerprint::{Fingerprint, Fingerprinter};
76
use std::time::Duration;
7+
use utils::fingerprint::{Fingerprint, Fingerprinter};
88

99
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
1010
pub struct AnalyzedLocalFieldReference {

rust/cocoindex/src/execution/evaluator.rs

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use utils::immutable::RefList;
1313

1414
use super::memoization::{EvaluationMemory, EvaluationMemoryOptions, evaluate_with_cell};
1515

16-
const TIMEOUT_THRESHOLD: u64 = 1800;
17-
const WARNING_THRESHOLD: u64 = 30;
16+
const TIMEOUT_THRESHOLD: Duration = Duration::from_secs(1800);
17+
const WARNING_THRESHOLD: Duration = Duration::from_secs(30);
1818

1919
#[derive(Debug)]
2020
pub struct ScopeValueBuilder {
@@ -390,7 +390,7 @@ where
390390
_ = tokio::time::sleep(warn_duration), if !warned => {
391391
warn!(
392392
"Function '{}' ({}) is taking longer than {}s",
393-
op_kind, op_name, WARNING_THRESHOLD
393+
op_kind, op_name, WARNING_THRESHOLD.as_secs()
394394
);
395395
warned = true;
396396
}
@@ -408,21 +408,20 @@ async fn evaluate_op_scope(
408408
for reactive_op in op_scope.reactive_ops.iter() {
409409
match reactive_op {
410410
AnalyzedReactiveOp::Transform(op) => {
411-
let transform_key = format!("transform/{}{}", op_scope.scope_qualifier, op.name);
412-
411+
// Track transform operation start
413412
if let Some(ref op_stats) = operation_in_process_stats {
413+
let transform_key =
414+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
414415
op_stats.start_processing(&transform_key, 1);
415416
}
416417

417418
let mut input_values = Vec::with_capacity(op.inputs.len());
418419
for value in assemble_input_values(&op.inputs, scoped_entries) {
419420
input_values.push(value?);
420421
}
421-
let timeout_duration = op
422-
.function_exec_info
423-
.timeout
424-
.unwrap_or(Duration::from_secs(TIMEOUT_THRESHOLD));
425-
let warn_duration = Duration::from_secs(WARNING_THRESHOLD);
422+
423+
let timeout_duration = op.function_exec_info.timeout.unwrap_or(TIMEOUT_THRESHOLD);
424+
let warn_duration = WARNING_THRESHOLD;
426425

427426
let op_name_for_warning = op.name.clone();
428427
let op_kind_for_warning = op.op_kind.clone();
@@ -468,7 +467,10 @@ async fn evaluate_op_scope(
468467
head_scope.define_field(&op.output, &v)
469468
};
470469

470+
// Track transform operation completion
471471
if let Some(ref op_stats) = operation_in_process_stats {
472+
let transform_key =
473+
format!("transform/{}{}", op_scope.scope_qualifier, op.name);
472474
op_stats.finish_processing(&transform_key, 1);
473475
}
474476

@@ -574,6 +576,43 @@ async fn evaluate_op_scope(
574576
let collector_entry = scoped_entries
575577
.headn(op.collector_ref.scope_up_level as usize)
576578
.ok_or_else(|| anyhow::anyhow!("Collector level out of bound"))?;
579+
580+
// Assemble input values
581+
let input_values: Vec<value::Value> =
582+
assemble_input_values(&op.input.fields, scoped_entries)
583+
.collect::<Result<Vec<_>>>()?;
584+
585+
// Create field_values vector for all fields in the merged schema
586+
let mut field_values = op
587+
.field_index_mapping
588+
.iter()
589+
.map(|idx| {
590+
idx.map_or(value::Value::Null, |input_idx| {
591+
input_values[input_idx].clone()
592+
})
593+
})
594+
.collect::<Vec<_>>();
595+
596+
// Handle auto_uuid_field (assumed to be at position 0 for efficiency)
597+
if op.has_auto_uuid_field {
598+
if let Some(uuid_idx) = op.collector_schema.auto_uuid_field_idx {
599+
let uuid = memory.next_uuid(
600+
op.fingerprinter
601+
.clone()
602+
.with(
603+
&field_values
604+
.iter()
605+
.enumerate()
606+
.filter(|(i, _)| *i != uuid_idx)
607+
.map(|(_, v)| v)
608+
.collect::<Vec<_>>(),
609+
)?
610+
.into_fingerprint(),
611+
)?;
612+
field_values[uuid_idx] = value::Value::Basic(value::BasicValue::Uuid(uuid));
613+
}
614+
}
615+
577616
{
578617
let mut collected_records = collector_entry.collected_values
579618
[op.collector_ref.local.collector_idx as usize]

rust/cocoindex/src/execution/live_updater.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,6 @@ impl SourceUpdateTask {
172172
let mut change_stream = change_stream;
173173
let retry_options = retryable::RetryOptions {
174174
retry_timeout: None,
175-
per_call_timeout: None,
176175
initial_backoff: std::time::Duration::from_secs(5),
177176
max_backoff: std::time::Duration::from_secs(60),
178177
};

rust/cocoindex/src/ops/interface.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ pub trait SimpleFunctionExecutor: Send + Sync {
186186
None
187187
}
188188

189-
/// Returns None to use the default timeout (300s)
189+
/// Returns None to use the default timeout (1800s)
190190
fn timeout(&self) -> Option<std::time::Duration> {
191191
None
192192
}

rust/cocoindex/src/ops/py_factory.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ impl interface::SimpleFunctionFactory for PyFunctionFactory {
281281
Ok((
282282
prepare_fut,
283283
enable_cache,
284-
behavior_version,
284+
behavior_version,
285285
timeout,
286286
batching_options,
287287
))

rust/utils/src/retryable.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ pub fn Ok<T>(value: T) -> Result<T> {
109109

110110
pub struct RetryOptions {
111111
pub retry_timeout: Option<Duration>,
112-
pub per_call_timeout: Option<Duration>,
113112
pub initial_backoff: Duration,
114113
pub max_backoff: Duration,
115114
}
@@ -118,7 +117,6 @@ impl Default for RetryOptions {
118117
fn default() -> Self {
119118
Self {
120119
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
121-
per_call_timeout: None,
122120
initial_backoff: Duration::from_millis(100),
123121
max_backoff: Duration::from_secs(10),
124122
}
@@ -127,7 +125,6 @@ impl Default for RetryOptions {
127125

128126
pub static HEAVY_LOADED_OPTIONS: RetryOptions = RetryOptions {
129127
retry_timeout: Some(DEFAULT_RETRY_TIMEOUT),
130-
per_call_timeout: Some(Duration::from_secs(300)),
131128
initial_backoff: Duration::from_secs(1),
132129
max_backoff: Duration::from_secs(60),
133130
};

0 commit comments

Comments
 (0)