Skip to content

Commit 8c716e2

Browse files
authored
feat(attachment): support target attachment in the core setup engine (#1139)
* feat(attachment): add interface for target attachment * feat(attachment): support target attachment in the core setup engine
1 parent 5db5971 commit 8c716e2

File tree

11 files changed

+323
-39
lines changed

11 files changed

+323
-39
lines changed

python/cocoindex/flow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,7 @@ def export(
405405
/,
406406
*,
407407
primary_key_fields: Sequence[str],
408+
attachments: Sequence[op.TargetAttachmentSpec] = (),
408409
vector_indexes: Sequence[index.VectorIndexDef] = (),
409410
vector_index: Sequence[tuple[str, index.VectorSimilarityMetric]] = (),
410411
setup_by_user: bool = False,
@@ -436,6 +437,7 @@ def export(
436437
target_name,
437438
_spec_kind(target_spec),
438439
dump_engine_object(target_spec),
440+
dump_engine_object(attachments),
439441
dump_engine_object(index_options),
440442
self._engine_data_collector,
441443
setup_by_user,

python/cocoindex/op.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class OpCategory(Enum):
4646
SOURCE = "source"
4747
TARGET = "target"
4848
DECLARATION = "declaration"
49+
TARGET_ATTACHMENT = "target_attachment"
4950

5051

5152
@dataclass_transform()
@@ -81,6 +82,10 @@ class TargetSpec(metaclass=SpecMeta, category=OpCategory.TARGET): # pylint: dis
8182
"""A target spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8283

8384

85+
class TargetAttachmentSpec(metaclass=SpecMeta, category=OpCategory.TARGET_ATTACHMENT): # pylint: disable=too-few-public-methods
86+
"""A target attachment spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
87+
88+
8489
class DeclarationSpec(metaclass=SpecMeta, category=OpCategory.DECLARATION): # pylint: disable=too-few-public-methods
8590
"""A declaration spec. All its subclass can be instantiated similar to a dataclass, i.e. ClassName(field1=value1, field2=value2, ...)"""
8691

src/base/spec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,10 @@ impl fmt::Display for IndexOptions {
493493
pub struct ExportOpSpec {
494494
pub collector_name: FieldName,
495495
pub target: OpSpec,
496+
497+
#[serde(default, skip_serializing_if = "Vec::is_empty")]
498+
pub attachments: Vec<OpSpec>,
499+
496500
pub index_options: IndexOptions,
497501
pub setup_by_user: bool,
498502
}

src/builder/analyzer.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use crate::builder::exec_ctx::AnalyzedSetupState;
2-
use crate::ops::{get_function_factory, get_source_factory, get_target_factory};
2+
use crate::ops::{
3+
get_attachment_factory, get_function_factory, get_source_factory, get_target_factory,
4+
};
35
use crate::prelude::*;
46

57
use super::plan::*;
@@ -913,6 +915,27 @@ impl AnalyzerContext {
913915
let op_name = export_op.name.clone();
914916
let export_target_factory = export_op_group.target_factory.clone();
915917

918+
let attachments = export_op
919+
.spec
920+
.attachments
921+
.iter()
922+
.map(|attachment| {
923+
let attachment_factory = get_attachment_factory(&attachment.kind)?;
924+
let attachment_state = attachment_factory.get_state(
925+
&op_name,
926+
&export_op.spec.target.spec,
927+
serde_json::Value::Object(attachment.spec.clone()),
928+
)?;
929+
Ok((
930+
interface::AttachmentSetupKey(
931+
attachment.kind.clone(),
932+
attachment_state.setup_key,
933+
),
934+
attachment_state.setup_state,
935+
))
936+
})
937+
.collect::<Result<IndexMap<_, _>>>()?;
938+
916939
let export_op_ss = exec_ctx::AnalyzedTargetSetupState {
917940
target_kind: target_kind.to_string(),
918941
setup_key: data_coll_output.setup_key,
@@ -925,6 +948,7 @@ impl AnalyzerContext {
925948
.map(|field| field.value_type.typ.clone())
926949
.collect::<Box<[_]>>(),
927950
),
951+
attachments,
928952
};
929953
targets_analyzed_ss[*idx] = Some(export_op_ss);
930954

@@ -956,6 +980,7 @@ impl AnalyzerContext {
956980
desired_setup_state,
957981
setup_by_user: false,
958982
key_type: None,
983+
attachments: IndexMap::new(),
959984
};
960985
declarations_analyzed_ss.push(decl_ss);
961986
}

src/builder/exec_ctx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ pub struct AnalyzedTargetSetupState {
2626
pub setup_by_user: bool,
2727
/// None for declarations.
2828
pub key_type: Option<Box<[schema::ValueType]>>,
29+
30+
pub attachments: IndexMap<interface::AttachmentSetupKey, serde_json::Value>,
2931
}
3032

3133
pub struct AnalyzedSetupState {
@@ -176,6 +178,7 @@ fn build_export_op_exec_ctx(
176178
} else {
177179
max_schema_version_id + 1
178180
};
181+
179182
match target_states.entry(resource_id) {
180183
indexmap::map::Entry::Occupied(entry) => {
181184
api_bail!(
@@ -194,6 +197,7 @@ fn build_export_op_exec_ctx(
194197
key_type: analyzed_target_ss.key_type.clone(),
195198
},
196199
state: analyzed_target_ss.desired_setup_state.clone(),
200+
attachments: analyzed_target_ss.attachments.clone(),
197201
});
198202
}
199203
}

src/builder/flow_builder.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,12 +549,13 @@ impl FlowBuilder {
549549
Ok(())
550550
}
551551

552-
#[pyo3(signature = (name, kind, op_spec, index_options, input, setup_by_user=false))]
552+
#[pyo3(signature = (name, kind, op_spec, attachments, index_options, input, setup_by_user=false))]
553553
pub fn export(
554554
&mut self,
555555
name: String,
556556
kind: String,
557557
op_spec: py::Pythonized<serde_json::Map<String, serde_json::Value>>,
558+
attachments: py::Pythonized<Vec<spec::OpSpec>>,
558559
index_options: py::Pythonized<spec::IndexOptions>,
559560
input: &DataCollector,
560561
setup_by_user: bool,
@@ -574,6 +575,7 @@ impl FlowBuilder {
574575
spec: spec::ExportOpSpec {
575576
collector_name: input.name.clone(),
576577
target: spec,
578+
attachments: attachments.into_inner(),
577579
index_options: index_options.into_inner(),
578580
setup_by_user,
579581
},

src/ops/interface.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,9 +316,52 @@ pub trait TargetFactory: Send + Sync {
316316
) -> Result<()>;
317317
}
318318

319+
pub struct TargetAttachmentState {
320+
pub setup_key: serde_json::Value,
321+
pub setup_state: serde_json::Value,
322+
}
323+
324+
#[async_trait]
325+
pub trait AttachmentSetupChangeAction {
326+
fn describe_change(&self) -> String;
327+
328+
async fn apply_change(&self) -> Result<()>;
329+
}
330+
331+
pub trait TargetAttachmentFactory: Send + Sync {
332+
/// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
333+
/// This should always return the canonical serialized form.
334+
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
335+
336+
fn get_state(
337+
&self,
338+
target_name: &str,
339+
target_spec: &serde_json::Map<String, serde_json::Value>,
340+
attachment_spec: serde_json::Value,
341+
) -> Result<TargetAttachmentState>;
342+
343+
/// Should return Some if and only if any changes are needed.
344+
fn diff_setup_states(
345+
&self,
346+
key: &serde_json::Value,
347+
new_state: Option<serde_json::Value>,
348+
existing_states: setup::CombinedState<serde_json::Value>,
349+
) -> Result<Option<Box<dyn AttachmentSetupChangeAction + Send + Sync>>>;
350+
}
351+
319352
#[derive(Clone)]
320353
pub enum ExecutorFactory {
321354
Source(Arc<dyn SourceFactory + Send + Sync>),
322355
SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
323356
ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
357+
TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
358+
}
359+
360+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
361+
pub struct AttachmentSetupKey(pub String, pub serde_json::Value);
362+
363+
impl std::fmt::Display for AttachmentSetupKey {
364+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365+
write!(f, "{}:{}", self.0, self.1)
366+
}
324367
}

src/ops/registration.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ pub fn get_optional_target_factory(
5656
registry.get_target(kind).cloned()
5757
}
5858

59+
pub fn get_optional_attachment_factory(
60+
kind: &str,
61+
) -> Option<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
62+
let registry = EXECUTOR_FACTORY_REGISTRY.read().unwrap();
63+
registry.get_target_attachment(kind).cloned()
64+
}
65+
5966
pub fn get_source_factory(
6067
kind: &str,
6168
) -> Result<std::sync::Arc<dyn super::interface::SourceFactory + Send + Sync>> {
@@ -77,6 +84,13 @@ pub fn get_target_factory(
7784
.ok_or_else(|| anyhow::anyhow!("Target factory not found for op kind: {}", kind))
7885
}
7986

87+
pub fn get_attachment_factory(
88+
kind: &str,
89+
) -> Result<std::sync::Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
90+
get_optional_attachment_factory(kind)
91+
.ok_or_else(|| anyhow::anyhow!("Attachment factory not found for op kind: {}", kind))
92+
}
93+
8094
pub fn register_factory(name: String, factory: ExecutorFactory) -> Result<()> {
8195
let mut registry = EXECUTOR_FACTORY_REGISTRY.write().unwrap();
8296
registry.register(name, factory)

src/ops/registry.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ pub struct ExecutorFactoryRegistry {
88
function_factories:
99
HashMap<String, Arc<dyn super::interface::SimpleFunctionFactory + Send + Sync>>,
1010
target_factories: HashMap<String, Arc<dyn super::interface::TargetFactory + Send + Sync>>,
11+
target_attachment_factories:
12+
HashMap<String, Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>>,
1113
}
1214

1315
impl Default for ExecutorFactoryRegistry {
@@ -22,6 +24,7 @@ impl ExecutorFactoryRegistry {
2224
source_factories: HashMap::new(),
2325
function_factories: HashMap::new(),
2426
target_factories: HashMap::new(),
27+
target_attachment_factories: HashMap::new(),
2528
}
2629
}
2730

@@ -61,6 +64,18 @@ impl ExecutorFactoryRegistry {
6164
}
6265
}
6366
}
67+
ExecutorFactory::TargetAttachment(target_attachment_factory) => {
68+
match self.target_attachment_factories.entry(name) {
69+
std::collections::hash_map::Entry::Occupied(entry) => Err(anyhow::anyhow!(
70+
"Target attachment factory with name already exists: {}",
71+
entry.key()
72+
)),
73+
std::collections::hash_map::Entry::Vacant(entry) => {
74+
entry.insert(target_attachment_factory);
75+
Ok(())
76+
}
77+
}
78+
}
6479
}
6580
}
6681

@@ -84,4 +99,11 @@ impl ExecutorFactoryRegistry {
8499
) -> Option<&Arc<dyn super::interface::TargetFactory + Send + Sync>> {
85100
self.target_factories.get(name)
86101
}
102+
103+
pub fn get_target_attachment(
104+
&self,
105+
name: &str,
106+
) -> Option<&Arc<dyn super::interface::TargetAttachmentFactory + Send + Sync>> {
107+
self.target_attachment_factories.get(name)
108+
}
87109
}

0 commit comments

Comments
 (0)