Skip to content

Commit 5c1d3a5

Browse files
viniciusdsmellogustavocidornelas
authored andcommitted
feat: enhance LangFlow integration with improved context detection
1 parent 7362f53 commit 5c1d3a5

File tree

1 file changed

+52
-35
lines changed

1 file changed

+52
-35
lines changed

src/openlayer/lib/integrations/langchain_callback.py

Lines changed: 52 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,21 @@
22

33
# pylint: disable=unused-argument
44
import time
5-
from typing import Any, Dict, List, Optional, Union, TYPE_CHECKING, Callable
5+
from typing import Any, Dict, List, Optional, Union, Callable
66
from uuid import UUID
77

88
try:
9-
from langchain import schema as langchain_schema
10-
from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
9+
try:
10+
from langchain_core import messages as langchain_schema
11+
from langchain_core.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
12+
except ImportError:
13+
from langchain import schema as langchain_schema
14+
from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
1115

1216
HAVE_LANGCHAIN = True
1317
except ImportError:
1418
HAVE_LANGCHAIN = False
1519

16-
if TYPE_CHECKING:
17-
from langchain import schema as langchain_schema
18-
from langchain.callbacks.base import BaseCallbackHandler, AsyncCallbackHandler
1920

2021
from ..tracing import tracer, steps, traces, enums
2122
from .. import utils
@@ -50,6 +51,8 @@ def __init__(self, **kwargs: Any) -> None:
5051
self.metadata: Dict[str, Any] = kwargs or {}
5152
self.steps: Dict[UUID, steps.Step] = {}
5253
self.root_steps: set[UUID] = set() # Track which steps are root
54+
# Track standalone traces (consistent with async handler)
55+
self._traces_by_root: Dict[UUID, traces.Trace] = {}
5356
# Extract inference_id from kwargs if provided
5457
self._inference_id = kwargs.get("inference_id")
5558
# Extract metadata_transformer from kwargs if provided
@@ -94,17 +97,17 @@ def _start_step(
9497
current_trace = tracer.get_current_trace()
9598

9699
if current_step is not None:
97-
# We're inside a @trace() decorated function - add as nested step
100+
# We're inside an existing step context - add as nested
98101
current_step.add_nested_step(step)
99102
elif current_trace is not None:
100-
# There's an existing trace but no current step
103+
# Existing trace but no current step - add to trace
101104
current_trace.add_step(step)
105+
# Don't track in _traces_by_root since we're using external trace
102106
else:
103-
# No existing trace - create new one (standalone mode)
104-
current_trace = traces.Trace()
105-
tracer._current_trace.set(current_trace)
106-
tracer._rag_context.set(None)
107-
current_trace.add_step(step)
107+
# No existing context - create standalone trace
108+
trace = traces.Trace()
109+
trace.add_step(step)
110+
self._traces_by_root[run_id] = trace
108111

109112
# Track root steps (those without parent_run_id)
110113
if parent_run_id is None:
@@ -151,23 +154,22 @@ def _end_step(
151154
if hasattr(step, key):
152155
setattr(step, key, value)
153156

154-
# Only upload trace if this was a root step and we're not in a @trace() context
155-
if is_root_step and tracer.get_current_step() is None:
156-
self._process_and_upload_trace(step)
157+
# Only upload if this is a standalone trace (not integrated with external trace)
158+
# If current_step is set, we're part of a larger trace and shouldn't upload
159+
if is_root_step and run_id in self._traces_by_root and tracer.get_current_step() is None:
160+
trace = self._traces_by_root.pop(run_id)
161+
self._process_and_upload_trace(trace)
157162

158-
def _process_and_upload_trace(self, root_step: steps.Step) -> None:
163+
def _process_and_upload_trace(self, trace: traces.Trace) -> None:
159164
"""Process and upload the completed trace (only for standalone root steps)."""
160-
current_trace = tracer.get_current_trace()
161-
if not current_trace:
165+
if not trace:
162166
return
163167

164168
# Convert all LangChain objects in the trace once at the end
165-
self._convert_step_objects_recursively(root_step)
166-
for step in current_trace.steps:
167-
if step != root_step: # Avoid converting root_step twice
168-
self._convert_step_objects_recursively(step)
169+
for step in trace.steps:
170+
self._convert_step_objects_recursively(step)
169171

170-
trace_data, input_variable_names = tracer.post_process_trace(current_trace)
172+
trace_data, input_variable_names = tracer.post_process_trace(trace)
171173

172174
config = dict(
173175
tracer.ConfigLlmData(
@@ -1098,15 +1100,29 @@ def _start_step(
10981100
parent_step = self.steps[parent_run_id]
10991101
parent_step.add_nested_step(step)
11001102
else:
1101-
# This is a root step - create a new trace
1102-
trace = traces.Trace()
1103-
trace.add_step(step)
1104-
self._traces_by_root[run_id] = trace
1105-
self.root_steps.add(run_id)
1106-
1107-
# Override step ID with custom inference_id if provided
1108-
if self._inference_id is not None:
1109-
step.id = self._inference_id
1103+
# Check if we're in an existing trace context via ContextVars
1104+
current_step = tracer.get_current_step()
1105+
current_trace = tracer.get_current_trace()
1106+
1107+
if current_step is not None:
1108+
# We're inside an existing step context - add as nested
1109+
current_step.add_nested_step(step)
1110+
elif current_trace is not None:
1111+
# Existing trace but no current step - add to trace
1112+
current_trace.add_step(step)
1113+
# Don't track in _traces_by_root since we're using external trace
1114+
else:
1115+
# No existing context - create standalone trace
1116+
trace = traces.Trace()
1117+
trace.add_step(step)
1118+
self._traces_by_root[run_id] = trace
1119+
1120+
# Track root steps
1121+
if parent_run_id is None:
1122+
self.root_steps.add(run_id)
1123+
# Override step ID with custom inference_id if provided
1124+
if self._inference_id is not None:
1125+
step.id = self._inference_id
11101126

11111127
self.steps[run_id] = step
11121128
return step
@@ -1146,8 +1162,9 @@ def _end_step(
11461162
if hasattr(step, key):
11471163
setattr(step, key, value)
11481164

1149-
# If this is a root step, process and upload the trace
1150-
if is_root_step and run_id in self._traces_by_root:
1165+
# Only upload if this is a standalone trace (not integrated with external trace)
1166+
# If current_step is set, we're part of a larger trace and shouldn't upload
1167+
if is_root_step and run_id in self._traces_by_root and tracer.get_current_step() is None:
11511168
trace = self._traces_by_root.pop(run_id)
11521169
self._process_and_upload_async_trace(trace)
11531170

0 commit comments

Comments
 (0)