Skip to content

Commit a17bd88

Browse files
viniciusdsmellogustavocidornelas
authored andcommitted
refactor(tracing): optimize chunk streaming and content extraction in oci_tracer.py
- Simplified the streaming statistics tracking by reducing the number of metrics and focusing on essential timing information. - Enhanced performance by introducing a new `_extract_chunk_content` function for fast content extraction from OCI chunks, minimizing overhead during processing. - Removed redundant code related to raw output handling and chunk sampling, streamlining the overall logic for better readability and maintainability. - Updated comments and docstrings to reflect the changes and ensure compliance with Google-style guidelines. - Maintained comprehensive type annotations and logging practices to support ongoing maintainability and observability.
1 parent d0700ae commit a17bd88

File tree

1 file changed

+116
-149
lines changed

1 file changed

+116
-149
lines changed

src/openlayer/lib/integrations/oci_tracer.py

Lines changed: 116 additions & 149 deletions
Original file line numberDiff line numberDiff line change
@@ -140,152 +140,54 @@ def stream_chunks(
140140
"""Streams the chunks of the completion and traces the completion."""
141141
collected_output_data = []
142142
collected_function_calls = []
143-
raw_outputs = []
144-
# Use the timing from the actual OCI call (passed as parameter)
145-
# start_time is already provided
146-
147-
# For grouping raw outputs into a more organized structure
148-
streaming_stats = {
149-
"total_chunks": 0,
150-
"first_chunk_time": None,
151-
"last_chunk_time": None,
152-
"chunk_sample": [], # Keep first few and last few chunks
153-
"content_progression": [], # Track content building up
154-
}
143+
# Simplified streaming stats - only track essential metrics
144+
total_chunks = 0
145+
first_chunk_time = None
146+
last_chunk_time = None
147+
chunk_samples = [] # Simplified sampling
148+
155149
end_time = None
156150
first_token_time = None
157151
num_of_completion_tokens = num_of_prompt_tokens = None
158152
latency = None
159153

160154
try:
161-
i = 0
162155
for i, chunk in enumerate(chunks):
163-
streaming_stats["total_chunks"] = i + 1
164-
current_time = time.time()
165-
166-
if streaming_stats["first_chunk_time"] is None:
167-
streaming_stats["first_chunk_time"] = current_time
168-
streaming_stats["last_chunk_time"] = current_time
169-
170-
# Store raw output in a more organized way
171-
chunk_data = None
172-
if hasattr(chunk, "data"):
173-
if hasattr(chunk.data, "__dict__"):
174-
chunk_data = chunk.data.__dict__
175-
else:
176-
chunk_data = str(chunk.data)
177-
else:
178-
chunk_data = str(chunk)
179-
180-
# Keep sample chunks (first 3 and last 3) instead of all chunks
181-
if i < 3: # First 3 chunks
182-
streaming_stats["chunk_sample"].append(
183-
{"index": i, "type": "first", "data": chunk_data, "timestamp": current_time}
184-
)
185-
elif i < 100: # Don't store every chunk for very long streams
186-
# Store every 10th chunk for middle chunks
187-
if i % 10 == 0:
188-
streaming_stats["chunk_sample"].append(
189-
{"index": i, "type": "middle", "data": chunk_data, "timestamp": current_time}
190-
)
191-
156+
total_chunks = i + 1
157+
158+
# Only track timing for first and last chunks to minimize overhead
192159
if i == 0:
193160
first_token_time = time.time()
161+
first_chunk_time = first_token_time
194162
# Extract prompt tokens from first chunk if available
195163
if hasattr(chunk, "data") and hasattr(chunk.data, "usage"):
196164
usage = chunk.data.usage
197165
num_of_prompt_tokens = getattr(usage, "prompt_tokens", 0)
198166
else:
199167
# OCI doesn't provide usage info, estimate from chat_details
200168
num_of_prompt_tokens = estimate_prompt_tokens_from_chat_details(chat_details)
201-
169+
170+
# Store first chunk sample (only for debugging)
171+
if hasattr(chunk, "data"):
172+
chunk_samples.append({"index": 0, "type": "first"})
173+
174+
# Update completion tokens count
202175
if i > 0:
203176
num_of_completion_tokens = i + 1
204177

205-
# Extract content from chunk based on OCI response structure
206-
try:
207-
if hasattr(chunk, "data"):
208-
# Handle OCI SSE Event chunks where data is a JSON string
209-
if isinstance(chunk.data, str):
210-
try:
211-
import json
212-
213-
parsed_data = json.loads(chunk.data)
214-
215-
# Handle OCI streaming structure: message.content[0].text
216-
if "message" in parsed_data and "content" in parsed_data["message"]:
217-
content = parsed_data["message"]["content"]
218-
if isinstance(content, list) and content:
219-
for content_item in content:
220-
if isinstance(content_item, dict) and content_item.get("type") == "TEXT":
221-
text = content_item.get("text", "")
222-
if text: # Only append non-empty text
223-
collected_output_data.append(text)
224-
elif content: # Handle as string
225-
collected_output_data.append(str(content))
226-
227-
# Handle function calls if present
228-
elif "function_call" in parsed_data:
229-
collected_function_calls.append(
230-
{
231-
"name": parsed_data["function_call"].get("name", ""),
232-
"arguments": parsed_data["function_call"].get("arguments", ""),
233-
}
234-
)
235-
236-
# Handle direct text field
237-
elif "text" in parsed_data:
238-
text = parsed_data["text"]
239-
if text:
240-
collected_output_data.append(text)
241-
242-
except json.JSONDecodeError as e:
243-
logger.debug("Error parsing chunk JSON: %s", e)
244-
245-
# Handle object-based chunks (fallback for other structures)
246-
else:
247-
data = chunk.data
248-
249-
# Handle different response structures
250-
if hasattr(data, "choices") and data.choices:
251-
choice = data.choices[0]
252-
253-
# Handle delta content
254-
if hasattr(choice, "delta"):
255-
delta = choice.delta
256-
if hasattr(delta, "content") and delta.content:
257-
collected_output_data.append(delta.content)
258-
elif hasattr(delta, "function_call") and delta.function_call:
259-
collected_function_calls.append(
260-
{
261-
"name": getattr(delta.function_call, "name", ""),
262-
"arguments": getattr(delta.function_call, "arguments", ""),
263-
}
264-
)
265-
266-
# Handle message content
267-
elif hasattr(choice, "message"):
268-
message = choice.message
269-
if hasattr(message, "content") and message.content:
270-
collected_output_data.append(message.content)
271-
elif hasattr(message, "function_call") and message.function_call:
272-
collected_function_calls.append(
273-
{
274-
"name": getattr(message.function_call, "name", ""),
275-
"arguments": getattr(message.function_call, "arguments", ""),
276-
}
277-
)
278-
279-
# Handle text-only responses
280-
elif hasattr(data, "text") and data.text:
281-
collected_output_data.append(data.text)
282-
283-
except Exception as chunk_error:
284-
logger.debug("Error processing chunk: %s", chunk_error)
178+
# Fast content extraction - optimized for performance
179+
content = _extract_chunk_content(chunk)
180+
if content:
181+
if isinstance(content, dict) and "function_call" in content:
182+
collected_function_calls.append(content["function_call"])
183+
elif content: # Text content
184+
collected_output_data.append(str(content))
285185

286186
yield chunk
287187

288-
end_time = time.time()
188+
# Update final timing
189+
last_chunk_time = time.time()
190+
end_time = last_chunk_time
289191
latency = (end_time - start_time) * 1000
290192

291193
except Exception as e:
@@ -309,25 +211,11 @@ def stream_chunks(
309211
# Calculate total tokens
310212
total_tokens = (num_of_prompt_tokens or 0) + (num_of_completion_tokens or 0)
311213

312-
# Add streaming metadata
313-
streaming_metadata = {
214+
# Simplified metadata - only essential timing info
215+
metadata = {
314216
"timeToFirstToken": ((first_token_time - start_time) * 1000 if first_token_time else None),
315217
}
316218

317-
# Extract additional metadata from the first chunk if available
318-
additional_metadata = {}
319-
if raw_outputs:
320-
# Try to extract metadata from the first chunk or response structure
321-
first_chunk = raw_outputs[0]
322-
if isinstance(first_chunk, dict):
323-
# Look for common OCI response metadata fields
324-
for key in ["model_id", "model_version", "time_created", "finish_reason", "api_format"]:
325-
if key in first_chunk:
326-
additional_metadata[key] = first_chunk[key]
327-
328-
# Combine streaming and additional metadata
329-
metadata = {**streaming_metadata, **additional_metadata}
330-
331219
trace_args = create_trace_args(
332220
end_time=end_time,
333221
inputs=extract_inputs_from_chat_details(chat_details),
@@ -340,16 +228,9 @@ def stream_chunks(
340228
model_parameters=get_model_parameters(chat_details),
341229
raw_output={
342230
"streaming_summary": {
343-
"total_chunks": streaming_stats["total_chunks"],
344-
"duration_seconds": (streaming_stats["last_chunk_time"] - streaming_stats["first_chunk_time"])
345-
if streaming_stats["last_chunk_time"] and streaming_stats["first_chunk_time"]
346-
else 0,
347-
"chunks_per_second": streaming_stats["total_chunks"]
348-
/ max(0.001, (streaming_stats["last_chunk_time"] - streaming_stats["first_chunk_time"]))
349-
if streaming_stats["last_chunk_time"] and streaming_stats["first_chunk_time"]
350-
else 0,
231+
"total_chunks": total_chunks,
232+
"duration_seconds": (last_chunk_time - first_chunk_time) if last_chunk_time and first_chunk_time else 0,
351233
},
352-
"sample_chunks": streaming_stats["chunk_sample"],
353234
"complete_response": "".join(collected_output_data) if collected_output_data else None,
354235
},
355236
id=None,
@@ -766,6 +647,92 @@ def create_trace_args(
766647
return trace_args
767648

768649

650+
def _extract_chunk_content(chunk) -> Optional[Union[str, Dict[str, Any]]]:
651+
"""Fast content extraction from OCI chunk - optimized for performance."""
652+
try:
653+
if not hasattr(chunk, "data"):
654+
return None
655+
656+
data = chunk.data
657+
658+
# Fast path: Handle JSON string chunks
659+
if isinstance(data, str):
660+
try:
661+
parsed_data = json.loads(data)
662+
663+
# Handle OCI streaming structure: message.content[0].text
664+
if "message" in parsed_data and "content" in parsed_data["message"]:
665+
content = parsed_data["message"]["content"]
666+
if isinstance(content, list) and content:
667+
for content_item in content:
668+
if isinstance(content_item, dict) and content_item.get("type") == "TEXT":
669+
text = content_item.get("text")
670+
if text:
671+
return text
672+
elif content:
673+
return str(content)
674+
675+
# Handle function calls
676+
elif "function_call" in parsed_data:
677+
return {
678+
"function_call": {
679+
"name": parsed_data["function_call"].get("name", ""),
680+
"arguments": parsed_data["function_call"].get("arguments", ""),
681+
}
682+
}
683+
684+
# Handle direct text field
685+
elif "text" in parsed_data:
686+
text = parsed_data["text"]
687+
if text:
688+
return text
689+
690+
except json.JSONDecodeError:
691+
return None
692+
693+
# Fast path: Handle object-based chunks
694+
else:
695+
# Handle choices-based structure
696+
if hasattr(data, "choices") and data.choices:
697+
choice = data.choices[0]
698+
699+
# Handle delta content
700+
if hasattr(choice, "delta"):
701+
delta = choice.delta
702+
if hasattr(delta, "content") and delta.content:
703+
return delta.content
704+
elif hasattr(delta, "function_call") and delta.function_call:
705+
return {
706+
"function_call": {
707+
"name": getattr(delta.function_call, "name", ""),
708+
"arguments": getattr(delta.function_call, "arguments", ""),
709+
}
710+
}
711+
712+
# Handle message content
713+
elif hasattr(choice, "message"):
714+
message = choice.message
715+
if hasattr(message, "content") and message.content:
716+
return message.content
717+
elif hasattr(message, "function_call") and message.function_call:
718+
return {
719+
"function_call": {
720+
"name": getattr(message.function_call, "name", ""),
721+
"arguments": getattr(message.function_call, "arguments", ""),
722+
}
723+
}
724+
725+
# Handle direct text responses
726+
elif hasattr(data, "text") and data.text:
727+
return data.text
728+
729+
except Exception:
730+
# Silent failure for performance - don't log per chunk
731+
pass
732+
733+
return None
734+
735+
769736
def add_to_trace(**kwargs) -> None:
770737
"""Add a chat completion step to the trace."""
771738
tracer.add_chat_completion_step_to_trace(**kwargs, name="Oracle OCI Chat Completion", provider="OCI")

0 commit comments

Comments
 (0)