Skip to content

Commit 083c327

Browse files
authored
fix(mcp): remove faulty logic of trying to deduce HTTP errors (#3477)
1 parent a47c4e5 commit 083c327

File tree

1 file changed

+87
-57
lines changed
  • packages/opentelemetry-instrumentation-mcp/opentelemetry/instrumentation/mcp

1 file changed

+87
-57
lines changed

packages/opentelemetry-instrumentation-mcp/opentelemetry/instrumentation/mcp/instrumentation.py

Lines changed: 87 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,8 @@
11
from contextlib import asynccontextmanager
22
from dataclasses import dataclass
3-
from typing import Any, AsyncGenerator, Callable, Collection, Tuple, cast, Union
3+
from typing import Any, AsyncGenerator, Callable, Collection, Tuple, Union, cast
44
import json
55
import logging
6-
import re
7-
from http import HTTPStatus
86

97
from opentelemetry import context, propagate
108
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
@@ -18,7 +16,9 @@
1816

1917
from opentelemetry.instrumentation.mcp.version import __version__
2018
from opentelemetry.instrumentation.mcp.utils import dont_throw, Config
21-
from opentelemetry.instrumentation.mcp.fastmcp_instrumentation import FastMCPInstrumentor
19+
from opentelemetry.instrumentation.mcp.fastmcp_instrumentation import (
20+
FastMCPInstrumentor,
21+
)
2222

2323
_instruments = ("mcp >= 1.6.0",)
2424

@@ -42,13 +42,17 @@ def _instrument(self, **kwargs):
4242
# Instrument FastMCP Client to create a session-level span
4343
register_post_import_hook(
4444
lambda _: wrap_function_wrapper(
45-
"fastmcp.client", "Client.__aenter__", self._fastmcp_client_enter_wrapper(tracer)
45+
"fastmcp.client",
46+
"Client.__aenter__",
47+
self._fastmcp_client_enter_wrapper(tracer),
4648
),
4749
"fastmcp.client",
4850
)
4951
register_post_import_hook(
5052
lambda _: wrap_function_wrapper(
51-
"fastmcp.client", "Client.__aexit__", self._fastmcp_client_exit_wrapper(tracer)
53+
"fastmcp.client",
54+
"Client.__aexit__",
55+
self._fastmcp_client_exit_wrapper(tracer),
5256
),
5357
"fastmcp.client",
5458
)
@@ -121,9 +125,9 @@ async def traced_method(
121125
) -> AsyncGenerator[
122126
Union[
123127
Tuple[InstrumentedStreamReader, InstrumentedStreamWriter],
124-
Tuple[InstrumentedStreamReader, InstrumentedStreamWriter, Any]
128+
Tuple[InstrumentedStreamReader, InstrumentedStreamWriter, Any],
125129
],
126-
None
130+
None,
127131
]:
128132
async with wrapped(*args, **kwargs) as result:
129133
try:
@@ -136,12 +140,18 @@ async def traced_method(
136140
read_stream, write_stream, get_session_id_callback = result
137141
yield InstrumentedStreamReader(
138142
read_stream, tracer
139-
), InstrumentedStreamWriter(write_stream, tracer), get_session_id_callback
143+
), InstrumentedStreamWriter(
144+
write_stream, tracer
145+
), get_session_id_callback
140146
except Exception as e:
141-
logging.warning(f"mcp instrumentation _transport_wrapper exception: {e}")
147+
logging.warning(
148+
f"mcp instrumentation _transport_wrapper exception: {e}"
149+
)
142150
yield result
143151
except Exception as e:
144-
logging.warning(f"mcp instrumentation transport_wrapper exception: {e}")
152+
logging.warning(
153+
f"mcp instrumentation transport_wrapper exception: {e}"
154+
)
145155
yield result
146156

147157
return traced_method
@@ -190,24 +200,31 @@ async def traced_method(wrapped, instance, args, kwargs):
190200

191201
# Create different span types based on method
192202
if method == "tools/call":
193-
return await self._handle_tool_call(tracer, method, params, args, kwargs, wrapped)
203+
return await self._handle_tool_call(
204+
tracer, method, params, args, kwargs, wrapped
205+
)
194206
else:
195-
return await self._handle_mcp_method(tracer, method, args, kwargs, wrapped)
207+
return await self._handle_mcp_method(
208+
tracer, method, args, kwargs, wrapped
209+
)
196210

197211
return traced_method
198212

199213
def _fastmcp_client_enter_wrapper(self, tracer):
200214
"""Wrapper for FastMCP Client.__aenter__ to start a session trace"""
215+
201216
@dont_throw
202217
async def traced_method(wrapped, instance, args, kwargs):
203218
# Start a root span for the MCP client session and make it current
204219
span_context_manager = tracer.start_as_current_span("mcp.client.session")
205220
span = span_context_manager.__enter__()
206221
span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, "session")
207-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, "mcp.client.session")
222+
span.set_attribute(
223+
SpanAttributes.TRACELOOP_ENTITY_NAME, "mcp.client.session"
224+
)
208225

209226
# Store the span context manager on the instance to properly exit it later
210-
setattr(instance, '_tracing_session_context_manager', span_context_manager)
227+
setattr(instance, "_tracing_session_context_manager", span_context_manager)
211228

212229
try:
213230
# Call the original method
@@ -218,28 +235,35 @@ async def traced_method(wrapped, instance, args, kwargs):
218235
span.record_exception(e)
219236
span.set_status(Status(StatusCode.ERROR, str(e)))
220237
raise
238+
221239
return traced_method
222240

223241
def _fastmcp_client_exit_wrapper(self, tracer):
224242
"""Wrapper for FastMCP Client.__aexit__ to end the session trace"""
243+
225244
@dont_throw
226245
async def traced_method(wrapped, instance, args, kwargs):
227246
try:
228247
# Call the original method first
229248
result = await wrapped(*args, **kwargs)
230249

231250
# End the session span context manager
232-
context_manager = getattr(instance, '_tracing_session_context_manager', None)
251+
context_manager = getattr(
252+
instance, "_tracing_session_context_manager", None
253+
)
233254
if context_manager:
234255
context_manager.__exit__(None, None, None)
235256

236257
return result
237258
except Exception as e:
238259
# End the session span context manager with exception info
239-
context_manager = getattr(instance, '_tracing_session_context_manager', None)
260+
context_manager = getattr(
261+
instance, "_tracing_session_context_manager", None
262+
)
240263
if context_manager:
241264
context_manager.__exit__(type(e), e, e.__traceback__)
242265
raise
266+
243267
return traced_method
244268

245269
async def _handle_tool_call(self, tracer, method, params, args, kwargs, wrapped):
@@ -260,26 +284,40 @@ async def _handle_tool_call(self, tracer, method, params, args, kwargs, wrapped)
260284

261285
with tracer.start_as_current_span(span_name) as span:
262286
# Set tool-specific attributes
263-
span.set_attribute(SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value)
287+
span.set_attribute(
288+
SpanAttributes.TRACELOOP_SPAN_KIND, TraceloopSpanKindValues.TOOL.value
289+
)
264290
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, entity_name)
265291

266292
# Add input
267293
clean_input = self._extract_clean_input(method, params)
268294
if clean_input:
269295
try:
270-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, json.dumps(clean_input))
296+
span.set_attribute(
297+
SpanAttributes.TRACELOOP_ENTITY_INPUT, json.dumps(clean_input)
298+
)
271299
except (TypeError, ValueError):
272-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, str(clean_input))
300+
span.set_attribute(
301+
SpanAttributes.TRACELOOP_ENTITY_INPUT, str(clean_input)
302+
)
273303

274-
return await self._execute_and_handle_result(span, method, args, kwargs, wrapped, clean_output=True)
304+
return await self._execute_and_handle_result(
305+
span, method, args, kwargs, wrapped, clean_output=True
306+
)
275307

276308
async def _handle_mcp_method(self, tracer, method, args, kwargs, wrapped):
277309
"""Handle non-tool MCP methods with simple serialization"""
278310
with tracer.start_as_current_span(f"{method}.mcp") as span:
279-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_INPUT, f"{serialize(args[0])}")
280-
return await self._execute_and_handle_result(span, method, args, kwargs, wrapped, clean_output=False)
281-
282-
async def _execute_and_handle_result(self, span, method, args, kwargs, wrapped, clean_output=False):
311+
span.set_attribute(
312+
SpanAttributes.TRACELOOP_ENTITY_INPUT, f"{serialize(args[0])}"
313+
)
314+
return await self._execute_and_handle_result(
315+
span, method, args, kwargs, wrapped, clean_output=False
316+
)
317+
318+
async def _execute_and_handle_result(
319+
self, span, method, args, kwargs, wrapped, clean_output=False
320+
):
283321
"""Execute the wrapped function and handle the result"""
284322
try:
285323
result = await wrapped(*args, **kwargs)
@@ -288,18 +326,25 @@ async def _execute_and_handle_result(self, span, method, args, kwargs, wrapped,
288326
clean_output_data = self._extract_clean_output(method, result)
289327
if clean_output_data:
290328
try:
291-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_OUTPUT, json.dumps(clean_output_data))
329+
span.set_attribute(
330+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
331+
json.dumps(clean_output_data),
332+
)
292333
except (TypeError, ValueError):
293-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_OUTPUT, str(clean_output_data))
334+
span.set_attribute(
335+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT,
336+
str(clean_output_data),
337+
)
294338
else:
295-
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_OUTPUT, serialize(result))
339+
span.set_attribute(
340+
SpanAttributes.TRACELOOP_ENTITY_OUTPUT, serialize(result)
341+
)
296342
# Handle errors
297343
if hasattr(result, "isError") and result.isError:
298344
if len(result.content) > 0:
299-
span.set_status(Status(StatusCode.ERROR, f"{result.content[0].text}"))
300-
error_type = get_error_type(result.content[0].text)
301-
if error_type is not None:
302-
span.set_attribute(ERROR_TYPE, error_type)
345+
span.set_status(
346+
Status(StatusCode.ERROR, f"{result.content[0].text}")
347+
)
303348
else:
304349
span.set_status(Status(StatusCode.OK))
305350
return result
@@ -337,7 +382,7 @@ def _extract_clean_input(self, method: str, params: Any) -> dict:
337382
# Remove internal fields starting with _ and non-serializable objects
338383
clean_params = {}
339384
for k, v in params.__dict__.items():
340-
if not k.startswith('_'):
385+
if not k.startswith("_"):
341386
try:
342387
# Test if the value is JSON serializable
343388
json.dumps(v)
@@ -390,28 +435,18 @@ def _extract_clean_output(self, method: str, result: Any) -> dict:
390435
else:
391436
# For other methods, try to serialize result cleanly
392437
if hasattr(result, "__dict__"):
393-
clean_result = {k: v for k, v in result.__dict__.items() if not k.startswith('_')}
438+
clean_result = {
439+
k: v
440+
for k, v in result.__dict__.items()
441+
if not k.startswith("_")
442+
}
394443
return clean_result
395444
else:
396445
return {"result": str(result)}
397446
except Exception:
398447
return {}
399448

400449

401-
def get_error_type(error_message):
402-
if not isinstance(error_message, str):
403-
return None
404-
match = re.search(r"\b(4\d{2}|5\d{2})\b", error_message)
405-
if match:
406-
num = int(match.group())
407-
if 400 <= num <= 599:
408-
return HTTPStatus(num).name
409-
else:
410-
return None
411-
else:
412-
return None
413-
414-
415450
def serialize(request, depth=0, max_depth=4):
416451
"""Serialize input args to MCP server into JSON.
417452
The function accepts input object and converts into JSON
@@ -473,11 +508,11 @@ async def __aiter__(self) -> AsyncGenerator[Any, None]:
473508
async for item in self.__wrapped__:
474509
# Handle different item types based on what's available
475510
request = None
476-
if hasattr(item, 'message') and hasattr(item.message, 'root'):
511+
if hasattr(item, "message") and hasattr(item.message, "root"):
477512
request = item.message.root
478513
elif type(item) is JSONRPCMessage:
479514
request = cast(JSONRPCMessage, item).root
480-
elif hasattr(item, 'root'):
515+
elif hasattr(item, "root"):
481516
request = item.root
482517
else:
483518
yield item
@@ -518,11 +553,11 @@ async def send(self, item: Any) -> Any:
518553

519554
# Handle different item types based on what's available
520555
request = None
521-
if hasattr(item, 'message') and hasattr(item.message, 'root'):
556+
if hasattr(item, "message") and hasattr(item.message, "root"):
522557
request = item.message.root
523558
elif type(item) is JSONRPCMessage:
524559
request = cast(JSONRPCMessage, item).root
525-
elif hasattr(item, 'root'):
560+
elif hasattr(item, "root"):
526561
request = item.root
527562
else:
528563
return await self.__wrapped__.send(item)
@@ -540,11 +575,6 @@ async def send(self, item: Any) -> Any:
540575
f"{request.result['content'][0]['text']}",
541576
)
542577
)
543-
error_type = get_error_type(
544-
request.result["content"][0]["text"]
545-
)
546-
if error_type is not None:
547-
span.set_attribute(ERROR_TYPE, error_type)
548578
if hasattr(request, "id"):
549579
span.set_attribute(SpanAttributes.MCP_REQUEST_ID, f"{request.id}")
550580

0 commit comments

Comments
 (0)