Skip to content

Commit f846e1c

Browse files
nirgaclaude
andauthored
fix(agno): add streaming support for Agent.run() and Agent.arun() (#3483)
Co-authored-by: Claude <noreply@anthropic.com>
1 parent bc97e56 commit f846e1c

File tree

9 files changed

+1619
-86
lines changed

9 files changed

+1619
-86
lines changed

packages/opentelemetry-instrumentation-agno/opentelemetry/instrumentation/agno/__init__.py

Lines changed: 190 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
_FunctionCallAExecuteWrapper,
1111
)
1212
from opentelemetry.instrumentation.agno.config import Config
13+
from opentelemetry.instrumentation.agno.streaming import AgnoAsyncStream, AgnoStream
1314
from opentelemetry.instrumentation.agno.utils import (
1415
dont_throw,
1516
should_send_prompts,
@@ -149,12 +150,16 @@ def __call__(self, wrapped, instance, args, kwargs):
149150
) or context_api.get_value("suppress_agno_instrumentation"):
150151
return wrapped(*args, **kwargs)
151152

152-
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
153+
is_streaming = kwargs.get("stream", False)
154+
155+
if is_streaming:
156+
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
157+
158+
span = self._tracer.start_span(
159+
span_name,
160+
kind=SpanKind.CLIENT,
161+
)
153162

154-
with self._tracer.start_as_current_span(
155-
span_name,
156-
kind=SpanKind.CLIENT,
157-
) as span:
158163
try:
159164
span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno")
160165
span.set_attribute(
@@ -181,51 +186,100 @@ def __call__(self, wrapped, instance, args, kwargs):
181186

182187
start_time = time.time()
183188

184-
result = wrapped(*args, **kwargs)
189+
response = wrapped(*args, **kwargs)
185190

186-
duration = time.time() - start_time
191+
return AgnoStream(
192+
span,
193+
response,
194+
instance,
195+
start_time,
196+
self._duration_histogram,
197+
self._token_histogram,
198+
)
187199

188-
if hasattr(result, "content") and should_send_prompts():
200+
except Exception as e:
201+
span.set_status(Status(StatusCode.ERROR, str(e)))
202+
span.record_exception(e)
203+
span.end()
204+
raise
205+
else:
206+
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
207+
208+
with self._tracer.start_as_current_span(
209+
span_name,
210+
kind=SpanKind.CLIENT,
211+
) as span:
212+
try:
213+
span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno")
189214
span.set_attribute(
190-
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
215+
SpanAttributes.TRACELOOP_SPAN_KIND,
216+
TraceloopSpanKindValues.AGENT.value,
191217
)
192218

193-
if hasattr(result, "run_id"):
194-
span.set_attribute("agno.run.id", result.run_id)
219+
if hasattr(instance, "name"):
220+
span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name)
195221

196-
if hasattr(result, "metrics"):
197-
metrics = result.metrics
198-
if hasattr(metrics, "input_tokens"):
199-
span.set_attribute(
200-
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
201-
metrics.input_tokens,
222+
if hasattr(instance, "model") and instance.model:
223+
model_name = getattr(
224+
instance.model, "id", getattr(instance.model, "name", "unknown")
202225
)
203-
if hasattr(metrics, "output_tokens"):
226+
span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name)
227+
228+
if args and should_send_prompts():
229+
input_message = str(args[0])
204230
span.set_attribute(
205-
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
206-
metrics.output_tokens,
231+
SpanAttributes.TRACELOOP_ENTITY_INPUT, input_message
207232
)
208-
if hasattr(metrics, "total_tokens"):
233+
234+
import time
235+
236+
start_time = time.time()
237+
238+
result = wrapped(*args, **kwargs)
239+
240+
duration = time.time() - start_time
241+
242+
if hasattr(result, "content") and should_send_prompts():
209243
span.set_attribute(
210-
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
244+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
211245
)
212246

213-
span.set_status(Status(StatusCode.OK))
214-
215-
self._duration_histogram.record(
216-
duration,
217-
attributes={
218-
GenAIAttributes.GEN_AI_SYSTEM: "agno",
219-
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
220-
},
221-
)
247+
if hasattr(result, "run_id"):
248+
span.set_attribute("agno.run.id", result.run_id)
249+
250+
if hasattr(result, "metrics"):
251+
metrics = result.metrics
252+
if hasattr(metrics, "input_tokens"):
253+
span.set_attribute(
254+
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
255+
metrics.input_tokens,
256+
)
257+
if hasattr(metrics, "output_tokens"):
258+
span.set_attribute(
259+
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
260+
metrics.output_tokens,
261+
)
262+
if hasattr(metrics, "total_tokens"):
263+
span.set_attribute(
264+
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
265+
)
266+
267+
span.set_status(Status(StatusCode.OK))
268+
269+
self._duration_histogram.record(
270+
duration,
271+
attributes={
272+
GenAIAttributes.GEN_AI_SYSTEM: "agno",
273+
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
274+
},
275+
)
222276

223-
return result
277+
return result
224278

225-
except Exception as e:
226-
span.set_status(Status(StatusCode.ERROR, str(e)))
227-
span.record_exception(e)
228-
raise
279+
except Exception as e:
280+
span.set_status(Status(StatusCode.ERROR, str(e)))
281+
span.record_exception(e)
282+
raise
229283

230284

231285
class _AgentARunWrapper:
@@ -238,19 +292,23 @@ def __init__(self, tracer, duration_histogram, token_histogram):
238292
self._token_histogram = token_histogram
239293

240294
@dont_throw
241-
async def __call__(self, wrapped, instance, args, kwargs):
295+
def __call__(self, wrapped, instance, args, kwargs):
242296
"""Wrap the Agent.arun() call with tracing instrumentation."""
243297
if context_api.get_value(
244298
context_api._SUPPRESS_INSTRUMENTATION_KEY
245299
) or context_api.get_value("suppress_agno_instrumentation"):
246-
return await wrapped(*args, **kwargs)
300+
return wrapped(*args, **kwargs)
247301

248-
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
302+
is_streaming = kwargs.get("stream", False)
303+
304+
if is_streaming:
305+
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
306+
307+
span = self._tracer.start_span(
308+
span_name,
309+
kind=SpanKind.CLIENT,
310+
)
249311

250-
with self._tracer.start_as_current_span(
251-
span_name,
252-
kind=SpanKind.CLIENT,
253-
) as span:
254312
try:
255313
span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno")
256314
span.set_attribute(
@@ -277,51 +335,103 @@ async def __call__(self, wrapped, instance, args, kwargs):
277335

278336
start_time = time.time()
279337

280-
result = await wrapped(*args, **kwargs)
338+
response = wrapped(*args, **kwargs)
281339

282-
duration = time.time() - start_time
283-
284-
if hasattr(result, "content") and should_send_prompts():
285-
span.set_attribute(
286-
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
287-
)
288-
289-
if hasattr(result, "run_id"):
290-
span.set_attribute("agno.run.id", result.run_id)
340+
return AgnoAsyncStream(
341+
span,
342+
response,
343+
instance,
344+
start_time,
345+
self._duration_histogram,
346+
self._token_histogram,
347+
)
291348

292-
if hasattr(result, "metrics"):
293-
metrics = result.metrics
294-
if hasattr(metrics, "input_tokens"):
295-
span.set_attribute(
296-
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
297-
metrics.input_tokens,
298-
)
299-
if hasattr(metrics, "output_tokens"):
300-
span.set_attribute(
301-
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
302-
metrics.output_tokens,
303-
)
304-
if hasattr(metrics, "total_tokens"):
349+
except Exception as e:
350+
span.set_status(Status(StatusCode.ERROR, str(e)))
351+
span.record_exception(e)
352+
span.end()
353+
raise
354+
else:
355+
async def async_wrapper():
356+
span_name = f"{getattr(instance, 'name', 'unknown')}.agent"
357+
358+
with self._tracer.start_as_current_span(
359+
span_name,
360+
kind=SpanKind.CLIENT,
361+
) as span:
362+
try:
363+
span.set_attribute(GenAIAttributes.GEN_AI_SYSTEM, "agno")
305364
span.set_attribute(
306-
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
365+
SpanAttributes.TRACELOOP_SPAN_KIND,
366+
TraceloopSpanKindValues.AGENT.value,
307367
)
308368

309-
span.set_status(Status(StatusCode.OK))
369+
if hasattr(instance, "name"):
370+
span.set_attribute(GenAIAttributes.GEN_AI_AGENT_NAME, instance.name)
371+
372+
if hasattr(instance, "model") and instance.model:
373+
model_name = getattr(
374+
instance.model, "id", getattr(instance.model, "name", "unknown")
375+
)
376+
span.set_attribute(GenAIAttributes.GEN_AI_REQUEST_MODEL, model_name)
377+
378+
if args and should_send_prompts():
379+
input_message = str(args[0])
380+
span.set_attribute(
381+
SpanAttributes.TRACELOOP_ENTITY_INPUT, input_message
382+
)
383+
384+
import time
385+
386+
start_time = time.time()
387+
388+
result = await wrapped(*args, **kwargs)
389+
390+
duration = time.time() - start_time
391+
392+
if hasattr(result, "content") and should_send_prompts():
393+
span.set_attribute(
394+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(result.content)
395+
)
396+
397+
if hasattr(result, "run_id"):
398+
span.set_attribute("agno.run.id", result.run_id)
399+
400+
if hasattr(result, "metrics"):
401+
metrics = result.metrics
402+
if hasattr(metrics, "input_tokens"):
403+
span.set_attribute(
404+
GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS,
405+
metrics.input_tokens,
406+
)
407+
if hasattr(metrics, "output_tokens"):
408+
span.set_attribute(
409+
GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS,
410+
metrics.output_tokens,
411+
)
412+
if hasattr(metrics, "total_tokens"):
413+
span.set_attribute(
414+
SpanAttributes.LLM_USAGE_TOTAL_TOKENS, metrics.total_tokens
415+
)
416+
417+
span.set_status(Status(StatusCode.OK))
418+
419+
self._duration_histogram.record(
420+
duration,
421+
attributes={
422+
GenAIAttributes.GEN_AI_SYSTEM: "agno",
423+
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
424+
},
425+
)
310426

311-
self._duration_histogram.record(
312-
duration,
313-
attributes={
314-
GenAIAttributes.GEN_AI_SYSTEM: "agno",
315-
SpanAttributes.TRACELOOP_SPAN_KIND: TraceloopSpanKindValues.AGENT.value,
316-
},
317-
)
427+
return result
318428

319-
return result
429+
except Exception as e:
430+
span.set_status(Status(StatusCode.ERROR, str(e)))
431+
span.record_exception(e)
432+
raise
320433

321-
except Exception as e:
322-
span.set_status(Status(StatusCode.ERROR, str(e)))
323-
span.record_exception(e)
324-
raise
434+
return async_wrapper()
325435

326436

327437
class _TeamRunWrapper:

0 commit comments

Comments
 (0)