Skip to content

Commit 0865453

Browse files
gustavocidornelaswhoseoyster
authored andcommitted
Completes OPEN-5692 Stream trace to Openlayer
1 parent 621c51b commit 0865453

File tree

5 files changed

+111
-34
lines changed

5 files changed

+111
-34
lines changed

openlayer/llm_monitors.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,17 @@ def __init__(
126126
self.monitor_output_only = monitor_output_only
127127
self.monitoring_on = False
128128
self.df = pd.DataFrame(columns=["input", "output", "tokens", "latency"])
129-
130-
self.data_streamer = data_streamer.DataStreamer(
131-
openlayer_api_key=openlayer_api_key,
132-
openlayer_project_name=openlayer_project_name,
133-
openlayer_inference_pipeline_name=openlayer_inference_pipeline_name,
134-
openlayer_inference_pipeline_id=openlayer_inference_pipeline_id,
135-
publish=publish,
136-
)
129+
self.publish = publish
130+
self.data_streamer = None
131+
132+
if self.publish is True:
133+
self.data_streamer = data_streamer.DataStreamer(
134+
openlayer_api_key=openlayer_api_key,
135+
openlayer_project_name=openlayer_project_name,
136+
openlayer_inference_pipeline_name=openlayer_inference_pipeline_name,
137+
openlayer_inference_pipeline_id=openlayer_inference_pipeline_id,
138+
publish=publish,
139+
)
137140

138141
def __enter__(self):
139142
self.start_monitoring()
@@ -473,7 +476,7 @@ def start_monitoring(self) -> None:
473476
self.monitoring_on = True
474477
self._overwrite_completion_methods()
475478
print("All the calls to OpenAI models are now being monitored!")
476-
if self.data_streamer.publish:
479+
if self.publish:
477480
print(
478481
"Furthermore, since `publish` was set to True, the data is being"
479482
f" published to your '{self.data_streamer.openlayer_project_name}' Openlayer project."
@@ -502,7 +505,7 @@ def stop_monitoring(self):
502505
self._restore_completion_methods()
503506
self.monitoring_on = False
504507
print("Monitoring stopped.")
505-
if not self.data_streamer.publish:
508+
if not self.publish:
506509
print(
507510
"To publish the data collected so far to your Openlayer project, "
508511
"call the `publish_batch_data` method."
@@ -520,7 +523,7 @@ def _restore_completion_methods(self) -> None:
520523
def publish_batch_data(self):
521524
"""Manually publish the accumulated data to Openlayer when automatic publishing
522525
is disabled (i.e., ``publish=False``)."""
523-
if self.data_streamer.publish:
526+
if self.publish:
524527
print(
525528
"You have set `publish` to True, so every request you've made so far"
526529
" was already published to Openlayer."

openlayer/services/data_streamer.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
Validates the arguments needed for data streaming and handles the streaming
44
process.
55
"""
6+
67
import logging
78
from typing import Dict, Optional
89

@@ -79,6 +80,14 @@ def _validate_attributes(self) -> None:
7980
"or set the OPENLAYER_INFERENCE_PIPELINE_ID or"
8081
" OPENLAYER_INFERENCE_PIPELINE_NAME environment variables."
8182
)
83+
logger.info(
84+
"Data will be streamed to Openlayer project %s and inference pipeline %s.",
85+
self.openlayer_project_name,
86+
(
87+
self.openlayer_inference_pipeline_id
88+
or self.openlayer_inference_pipeline_name
89+
),
90+
)
8291

8392
def stream_data(self, data: Dict[str, any], config: Dict[str, any]) -> None:
8493
"""Stream data to the Openlayer platform.
@@ -90,6 +99,7 @@ def stream_data(self, data: Dict[str, any], config: Dict[str, any]) -> None:
9099

91100
self._check_inference_pipeline_ready()
92101
self.inference_pipeline.stream_data(stream_data=data, stream_config=config)
102+
logger.info("Data streamed to Openlayer.")
93103

94104
def _check_inference_pipeline_ready(self) -> None:
95105
"""Lazy load the inference pipeline and check if it is ready."""
@@ -144,3 +154,4 @@ def publish_batch_data(self, df: pd.DataFrame, config: Dict[str, any]) -> None:
144154
"""
145155
self._check_inference_pipeline_ready()
146156
self.inference_pipeline.publish_batch_data(batch_df=df, batch_config=config)
157+
logger.info("Batch of data published to Openlayer.")

openlayer/tracing/steps.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ def add_nested_step(self, nested_step: "Step") -> None:
2929
"""Adds a nested step to the current step."""
3030
self.steps.append(nested_step)
3131

32-
def update_data(self, **kwargs: Any) -> None:
33-
"""Updates the step data."""
32+
def log(self, **kwargs: Any) -> None:
33+
"""Logs step data."""
3434
for key, value in kwargs.items():
3535
if hasattr(self, key):
3636
setattr(self, key, value)

openlayer/tracing/tracer.py

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,28 @@
11
"""Module with the logic to create and manage traces and steps."""
22

3+
import contextvars
34
import inspect
4-
from typing import Any, Dict, Optional, Generator
5+
import logging
6+
import time
57
from contextlib import contextmanager
6-
import contextvars
78
from functools import wraps
9+
from typing import Any, Dict, Generator, List, Optional, Tuple
810

9-
from . import steps
10-
from . import traces
11-
import time
11+
from ..services import data_streamer
12+
from . import steps, traces
13+
14+
logger = logging.getLogger(__name__)
15+
16+
_streamer = None
17+
try:
18+
_streamer = data_streamer.DataStreamer(publish=True)
19+
except Exception as exc:
20+
logger.error(
21+
"You have not provided enough information to upload traces to Openlayer."
22+
"\n%s \n"
23+
"To upload the traces, please provide the missing information and try again.",
24+
exc,
25+
)
1226

1327
_current_step = contextvars.ContextVar("current_step")
1428
_current_trace = contextvars.ContextVar("current_trace")
@@ -23,36 +37,85 @@ def create_step(
2337
metadata: Dict[str, any] = {},
2438
) -> Generator[steps.Step, None, None]:
2539
"""Starts a trace and yields a Step object."""
26-
new_step = steps.step_factory(
40+
new_step: steps.Step = steps.step_factory(
2741
step_type=step_type, name=name, inputs=inputs, output=output, metadata=metadata
2842
)
29-
parent_step = _current_step.get(None)
30-
is_root_step = parent_step is None
43+
44+
parent_step: Optional[steps.Step] = _current_step.get(None)
45+
is_root_step: bool = parent_step is None
3146

3247
if parent_step is None:
33-
print("Starting a new trace...")
48+
logger.debug("Starting a new trace...")
3449
current_trace = traces.Trace()
3550
_current_trace.set(current_trace) # Set the current trace in context
3651
current_trace.add_step(new_step)
3752
else:
38-
print(f"Adding step {name} to parent step {parent_step.name}")
53+
logger.debug(f"Adding step {name} to parent step {parent_step.name}")
3954
current_trace = _current_trace.get()
4055
parent_step.add_nested_step(new_step)
4156

4257
token = _current_step.set(new_step)
43-
4458
try:
4559
yield new_step
4660
finally:
4761
_current_step.reset(token)
4862
if is_root_step:
49-
print("Ending the trace...")
50-
print("-" * 80)
51-
print(current_trace.to_dict())
52-
print("-" * 80)
63+
logger.debug("Ending the trace...")
64+
trace_data, input_variable_names = process_trace_for_upload(current_trace)
65+
config = {
66+
"outputColumnName": "output",
67+
"inputVariableNames": input_variable_names,
68+
"label": "production",
69+
"groundTruthColumnName": "groundTruth",
70+
"latencyColumnName": "latency",
71+
}
72+
if isinstance(new_step, steps.OpenAIChatCompletionStep):
73+
config.update(
74+
{
75+
"costColumnName": "cost",
76+
"numOfTokenColumnName": "tokens",
77+
"prompt": new_step.inputs.get("prompt"),
78+
}
79+
)
80+
if _streamer:
81+
_streamer.stream_data(data=trace_data, config=config)
82+
else:
83+
logger.warning(
84+
"Trace computed but not uploaded to Openlayer. "
85+
"You have not provided enough information to upload traces to"
86+
" Openlayer."
87+
)
5388
else:
54-
# TODO: stream to Openlayer
55-
print(f"Ending step {name}")
89+
logger.debug(f"Ending step {name}")
90+
91+
92+
def process_trace_for_upload(trace: traces.Trace) -> Tuple[Dict[str, Any], List[str]]:
93+
"""Post processing of the trace data before uploading to Openlayer.
94+
95+
This is done to ensure backward compatibility with data on Openlayer.
96+
"""
97+
root_step = trace.steps[0]
98+
99+
input_variables = root_step.inputs
100+
input_variable_names = list(input_variables.keys())
101+
102+
trace_data = {
103+
**input_variables,
104+
"output": root_step.output,
105+
"groundTruth": root_step.ground_truth,
106+
"latency": root_step.latency,
107+
"steps": trace.to_dict(),
108+
}
109+
# Extra fields for openai_chat_completion step
110+
if isinstance(root_step, steps.OpenAIChatCompletionStep):
111+
trace_data.update(
112+
{
113+
"cost": root_step.cost,
114+
"tokens": root_step.prompt_tokens + root_step.completion_tokens,
115+
}
116+
)
117+
118+
return trace_data, input_variable_names
56119

57120

58121
def trace(*step_args, **step_kwargs):
@@ -74,7 +137,7 @@ def wrapper(*func_args, **func_kwargs):
74137
inputs.pop("self", None)
75138
inputs.pop("cls", None)
76139

77-
step.update_data(
140+
step.log(
78141
inputs=inputs,
79142
output=output,
80143
end_time=end_time,

openlayer/tracing/traces.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Module with the Trace class."""
22

3-
from typing import Any, Dict
3+
from typing import Any, Dict, List
44

55
from .steps import Step
66

@@ -14,6 +14,6 @@ def add_step(self, step: Step) -> None:
1414
"""Adds a step to the trace."""
1515
self.steps.append(step)
1616

17-
def to_dict(self) -> Dict[str, Any]:
17+
def to_dict(self) -> List[Dict[str, Any]]:
1818
"""Dictionary representation of the Trace."""
19-
return {"rows": [step.to_dict() for step in self.steps]}
19+
return [step.to_dict() for step in self.steps]

0 commit comments

Comments
 (0)