Skip to content

Commit 1a14087

Browse files
authored
chore: always pass PyExecutionContext to the analyzer (#1195)
1 parent 2e5c046 commit 1a14087

File tree

4 files changed

+18
-14
lines changed

4 files changed

+18
-14
lines changed

python/cocoindex/flow.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,9 @@ class _FlowBuilderState:
459459
field_name_builder: _NameBuilder
460460

461461
def __init__(self, full_name: str):
462-
self.engine_flow_builder = _engine.FlowBuilder(full_name)
462+
self.engine_flow_builder = _engine.FlowBuilder(
463+
full_name, execution_context.event_loop
464+
)
463465
self.field_name_builder = _NameBuilder()
464466

465467
def get_data_slice(self, v: Any) -> _engine.DataSlice:
@@ -931,9 +933,7 @@ def _create_engine_flow() -> _engine.Flow:
931933
flow_builder_state, flow_builder_state.engine_flow_builder.root_scope()
932934
)
933935
fl_def(FlowBuilder(flow_builder_state), root_scope)
934-
return flow_builder_state.engine_flow_builder.build_flow(
935-
execution_context.event_loop
936-
)
936+
return flow_builder_state.engine_flow_builder.build_flow()
937937

938938
return Flow(flow_name, _create_engine_flow)
939939

src/builder/analyzed_flow.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ impl AnalyzedTransientFlow {
5858
transient_flow: spec::TransientFlowSpec,
5959
py_exec_ctx: Option<crate::py::PythonExecutionContext>,
6060
) -> Result<Self> {
61-
let ctx = analyzer::build_flow_instance_context(&transient_flow.name, py_exec_ctx);
61+
let ctx =
62+
analyzer::build_flow_instance_context(&transient_flow.name, py_exec_ctx.map(Arc::new));
6263
let (output_type, data_schema, execution_plan_fut) =
6364
analyzer::analyze_transient_flow(&transient_flow, ctx).await?;
6465
Ok(Self {

src/builder/analyzer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,12 +1033,12 @@ impl AnalyzerContext {
10331033

10341034
pub fn build_flow_instance_context(
10351035
flow_inst_name: &str,
1036-
py_exec_ctx: Option<crate::py::PythonExecutionContext>,
1036+
py_exec_ctx: Option<Arc<crate::py::PythonExecutionContext>>,
10371037
) -> Arc<FlowInstanceContext> {
10381038
Arc::new(FlowInstanceContext {
10391039
flow_instance_name: flow_inst_name.to_string(),
10401040
auth_registry: get_auth_registry().clone(),
1041-
py_exec_ctx: py_exec_ctx.map(Arc::new),
1041+
py_exec_ctx: py_exec_ctx,
10421042
})
10431043
}
10441044

src/builder/flow_builder.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,7 @@ pub struct FlowBuilder {
247247
#[pymethods]
248248
impl FlowBuilder {
249249
#[new]
250-
pub fn new(py: Python<'_>, name: &str) -> PyResult<Self> {
250+
pub fn new(py: Python<'_>, name: &str, py_event_loop: Py<PyAny>) -> PyResult<Self> {
251251
let lib_context = py
252252
.allow_threads(|| -> anyhow::Result<Arc<LibContext>> {
253253
get_runtime().block_on(get_lib_context())
@@ -258,7 +258,13 @@ impl FlowBuilder {
258258
None,
259259
Arc::new(Mutex::new(DataScopeBuilder::new())),
260260
);
261-
let flow_inst_context = build_flow_instance_context(name, None);
261+
let flow_inst_context = build_flow_instance_context(
262+
name,
263+
Some(Arc::new(crate::py::PythonExecutionContext::new(
264+
py,
265+
py_event_loop,
266+
))),
267+
);
262268
let result = Self {
263269
lib_context,
264270
flow_inst_context,
@@ -606,18 +612,15 @@ impl FlowBuilder {
606612
}))
607613
}
608614

609-
pub fn build_flow(&self, py: Python<'_>, py_event_loop: Py<PyAny>) -> PyResult<py::Flow> {
615+
pub fn build_flow(&self, py: Python<'_>) -> PyResult<py::Flow> {
610616
let spec = spec::FlowInstanceSpec {
611617
name: self.flow_instance_name.clone(),
612618
import_ops: self.import_ops.clone(),
613619
reactive_ops: self.reactive_ops.clone(),
614620
export_ops: self.export_ops.clone(),
615621
declarations: self.declarations.clone(),
616622
};
617-
let flow_instance_ctx = build_flow_instance_context(
618-
&self.flow_instance_name,
619-
Some(crate::py::PythonExecutionContext::new(py, py_event_loop)),
620-
);
623+
let flow_instance_ctx = self.flow_inst_context.clone();
621624
let flow_ctx = py
622625
.allow_threads(|| {
623626
get_runtime().block_on(async move {

0 commit comments

Comments
 (0)