Skip to content

Commit c846086

Browse files
committed
feat: add instrumentation interface for observability
Add a pluggable instrumentation interface for monitoring MCP request/response lifecycle. This lays groundwork for OpenTelemetry and other observability integrations. Changes: - Define Instrumenter protocol with on_request_start, on_request_end, and on_error hooks - Add NoOpInstrumenter as default implementation with minimal overhead - Wire instrumenter into ServerSession and ClientSession constructors - Add instrumentation calls in Server._handle_request for server-side monitoring - Add request_id to log records via extra field for correlation - Add comprehensive tests for instrumentation protocol - Add documentation with examples and best practices Addresses #421
1 parent 5983a65 commit c846086

File tree

7 files changed

+589
-3
lines changed

7 files changed

+589
-3
lines changed

docs/instrumentation.md

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
# Instrumentation
2+
3+
The MCP Python SDK provides a pluggable instrumentation interface for monitoring request/response lifecycle. This enables integration with OpenTelemetry, custom metrics, logging frameworks, and other observability tools.
4+
5+
## Overview
6+
7+
The `Instrumenter` protocol defines three hooks:
8+
9+
- `on_request_start`: Called when a request starts processing
10+
- `on_request_end`: Called when a request completes (successfully or not)
11+
- `on_error`: Called when an error occurs during request processing
12+
13+
All methods are optional (no-op implementations are valid). Exceptions raised by instrumentation hooks are logged but do not affect request processing.
14+
15+
## Basic Usage
16+
17+
### Server-Side Instrumentation
18+
19+
```python
20+
from mcp.server.lowlevel import Server
21+
from mcp.shared.instrumentation import Instrumenter
22+
from mcp.types import RequestId
23+
24+
class MyInstrumenter:
25+
"""Custom instrumenter implementation."""
26+
27+
def on_request_start(
28+
self,
29+
request_id: RequestId,
30+
request_type: str,
31+
method: str | None = None,
32+
**metadata,
33+
) -> None:
34+
print(f"Request {request_id} started: {request_type}")
35+
36+
def on_request_end(
37+
self,
38+
request_id: RequestId,
39+
request_type: str,
40+
success: bool,
41+
duration_seconds: float | None = None,
42+
**metadata,
43+
) -> None:
44+
status = "succeeded" if success else "failed"
45+
print(f"Request {request_id} {status} in {duration_seconds:.3f}s")
46+
47+
def on_error(
48+
self,
49+
request_id: RequestId | None,
50+
error: Exception,
51+
error_type: str,
52+
**metadata,
53+
) -> None:
54+
print(f"Error in request {request_id}: {error_type} - {error}")
55+
56+
# Create server with custom instrumenter
57+
server = Server("my-server")
58+
59+
# Pass instrumenter when running the server
60+
async def run_server():
61+
async with stdio_server() as (read_stream, write_stream):
62+
await server.run(
63+
read_stream,
64+
write_stream,
65+
server.create_initialization_options(),
66+
instrumenter=MyInstrumenter(),
67+
)
68+
```
69+
70+
### Client-Side Instrumentation
71+
72+
```python
73+
from mcp.client.session import ClientSession
74+
from mcp.shared.instrumentation import Instrumenter
75+
76+
# Create client session with instrumenter
77+
async with ClientSession(
78+
read_stream=read_stream,
79+
write_stream=write_stream,
80+
instrumenter=MyInstrumenter(),
81+
) as session:
82+
await session.initialize()
83+
# Use session...
84+
```
85+
86+
## Metadata
87+
88+
Instrumentation hooks receive metadata via `**metadata` keyword arguments:
89+
90+
- `on_request_start` metadata:
91+
- `session_type`: "server" or "client"
92+
- Any additional context provided by the framework
93+
94+
- `on_request_end` metadata:
95+
- `cancelled`: True if the request was cancelled
96+
- `error`: Error message if request failed
97+
- Any additional context
98+
99+
- `on_error` metadata:
100+
- Additional error context
101+
102+
## Request ID
103+
104+
The `request_id` parameter is consistent across all hooks for a given request, allowing you to correlate the request lifecycle. The `request_id` is also added to log records via the `extra` field, so you can filter logs by request.
105+
106+
## OpenTelemetry Integration
107+
108+
A full OpenTelemetry instrumenter will be provided in a future release or as a separate package. Here's a basic example to get started:
109+
110+
```python
111+
from opentelemetry import trace
112+
from opentelemetry.trace import Status, StatusCode
113+
114+
tracer = trace.get_tracer(__name__)
115+
116+
class OpenTelemetryInstrumenter:
117+
def __init__(self):
118+
self.spans = {}
119+
120+
def on_request_start(self, request_id, request_type, **metadata):
121+
span = tracer.start_span(
122+
f"mcp.request.{request_type}",
123+
attributes={
124+
"mcp.request_id": str(request_id),
125+
"mcp.request_type": request_type,
126+
**metadata,
127+
}
128+
)
129+
self.spans[request_id] = span
130+
131+
def on_request_end(self, request_id, request_type, success, duration_seconds=None, **metadata):
132+
if span := self.spans.pop(request_id, None):
133+
if duration_seconds:
134+
span.set_attribute("mcp.duration_seconds", duration_seconds)
135+
span.set_status(Status(StatusCode.OK if success else StatusCode.ERROR))
136+
span.end()
137+
138+
def on_error(self, request_id, error, error_type, **metadata):
139+
if span := self.spans.get(request_id):
140+
span.record_exception(error)
141+
span.set_status(Status(StatusCode.ERROR, str(error)))
142+
```
143+
144+
## Default Behavior
145+
146+
If no instrumenter is provided, a no-op implementation is used automatically. This has minimal overhead and doesn't affect request processing.
147+
148+
```python
149+
from mcp.shared.instrumentation import get_default_instrumenter
150+
151+
# Get the default no-op instrumenter
152+
instrumenter = get_default_instrumenter()
153+
```
154+
155+
## Best Practices
156+
157+
1. **Keep hooks fast**: Instrumentation hooks are called synchronously in the request path. Keep processing minimal to avoid impacting request latency.
158+
159+
2. **Handle errors gracefully**: Exceptions in instrumentation hooks are caught and logged, but it's best to handle errors within your instrumenter.
160+
161+
3. **Use appropriate metadata**: Include relevant context in metadata fields to aid debugging and analysis.
162+
163+
4. **Consider sampling**: For high-volume servers, consider implementing sampling in your instrumenter to reduce overhead.
164+
165+
## Example: Custom Metrics
166+
167+
```python
168+
from collections import defaultdict
169+
from typing import Dict
170+
171+
class MetricsInstrumenter:
172+
"""Track request counts and durations."""
173+
174+
def __init__(self):
175+
self.request_counts: Dict[str, int] = defaultdict(int)
176+
self.request_durations: Dict[str, list[float]] = defaultdict(list)
177+
self.error_counts: Dict[str, int] = defaultdict(int)
178+
179+
def on_request_start(self, request_id, request_type, **metadata):
180+
self.request_counts[request_type] += 1
181+
182+
def on_request_end(self, request_id, request_type, success, duration_seconds=None, **metadata):
183+
if duration_seconds is not None:
184+
self.request_durations[request_type].append(duration_seconds)
185+
186+
def on_error(self, request_id, error, error_type, **metadata):
187+
self.error_counts[error_type] += 1
188+
189+
def get_stats(self):
190+
"""Get statistics summary."""
191+
stats = {}
192+
for request_type, durations in self.request_durations.items():
193+
if durations:
194+
avg_duration = sum(durations) / len(durations)
195+
stats[request_type] = {
196+
"count": self.request_counts[request_type],
197+
"avg_duration": avg_duration,
198+
}
199+
return stats
200+
```
201+
202+
## Future Work
203+
204+
- Full OpenTelemetry integration as a separate module
205+
- Additional built-in instrumenters (Prometheus, StatsD, etc.)
206+
- Client-side request instrumentation
207+
- Async hook support for long-running instrumentation operations
208+

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ nav:
1818
- Low-Level Server: low-level-server.md
1919
- Authorization: authorization.md
2020
- Testing: testing.md
21+
- Instrumentation: instrumentation.md
2122
- API Reference: api.md
2223

2324
theme:

src/mcp/client/session.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import mcp.types as types
1111
from mcp.shared.context import RequestContext
12+
from mcp.shared.instrumentation import Instrumenter, get_default_instrumenter
1213
from mcp.shared.message import SessionMessage
1314
from mcp.shared.session import BaseSession, ProgressFnT, RequestResponder
1415
from mcp.shared.version import SUPPORTED_PROTOCOL_VERSIONS
@@ -118,6 +119,7 @@ def __init__(
118119
logging_callback: LoggingFnT | None = None,
119120
message_handler: MessageHandlerFnT | None = None,
120121
client_info: types.Implementation | None = None,
122+
instrumenter: Instrumenter | None = None,
121123
) -> None:
122124
super().__init__(
123125
read_stream,
@@ -127,6 +129,7 @@ def __init__(
127129
read_timeout_seconds=read_timeout_seconds,
128130
)
129131
self._client_info = client_info or DEFAULT_CLIENT_INFO
132+
self._instrumenter = instrumenter or get_default_instrumenter()
130133
self._sampling_callback = sampling_callback or _default_sampling_callback
131134
self._elicitation_callback = elicitation_callback or _default_elicitation_callback
132135
self._list_roots_callback = list_roots_callback or _default_list_roots_callback
@@ -135,6 +138,11 @@ def __init__(
135138
self._tool_output_schemas: dict[str, dict[str, Any] | None] = {}
136139
self._server_capabilities: types.ServerCapabilities | None = None
137140

141+
@property
142+
def instrumenter(self) -> Instrumenter:
143+
"""Get the instrumenter for this session."""
144+
return self._instrumenter
145+
138146
async def initialize(self) -> types.InitializeResult:
139147
sampling = types.SamplingCapability() if self._sampling_callback is not _default_sampling_callback else None
140148
elicitation = (

src/mcp/server/lowlevel/server.py

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ async def main():
7070
import contextvars
7171
import json
7272
import logging
73+
import time
7374
import warnings
7475
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable
7576
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
@@ -85,6 +86,7 @@ async def main():
8586
from mcp.server.lowlevel.func_inspection import create_call_wrapper
8687
from mcp.server.lowlevel.helper_types import ReadResourceContents
8788
from mcp.server.models import InitializationOptions
89+
from mcp.shared.instrumentation import Instrumenter
8890
from mcp.server.session import ServerSession
8991
from mcp.shared.context import RequestContext
9092
from mcp.shared.exceptions import McpError
@@ -615,6 +617,7 @@ async def run(
615617
# the initialization lifecycle, but can do so with any available node
616618
# rather than requiring initialization for each connection.
617619
stateless: bool = False,
620+
instrumenter: Instrumenter | None = None,
618621
):
619622
async with AsyncExitStack() as stack:
620623
lifespan_context = await stack.enter_async_context(self.lifespan(self))
@@ -624,6 +627,7 @@ async def run(
624627
write_stream,
625628
initialization_options,
626629
stateless=stateless,
630+
instrumenter=instrumenter,
627631
)
628632
)
629633

@@ -674,11 +678,27 @@ async def _handle_request(
674678
lifespan_context: LifespanResultT,
675679
raise_exceptions: bool,
676680
):
677-
logger.info("Processing request of type %s", type(req).__name__)
681+
request_type = type(req).__name__
682+
log_extra = {"request_id": str(message.request_id)}
683+
logger.info("Processing request of type %s", request_type, extra=log_extra)
684+
685+
# Start instrumentation
686+
start_time = time.monotonic()
687+
try:
688+
session.instrumenter.on_request_start(
689+
request_id=message.request_id,
690+
request_type=request_type,
691+
session_type="server",
692+
)
693+
except Exception: # pragma: no cover
694+
logger.exception("Error in instrumentation on_request_start")
695+
678696
if handler := self.request_handlers.get(type(req)): # type: ignore
679-
logger.debug("Dispatching request of type %s", type(req).__name__)
697+
logger.debug("Dispatching request of type %s", request_type, extra=log_extra)
680698

681699
token = None
700+
response = None
701+
success = False
682702
try:
683703
# Extract request context from message metadata
684704
request_data = None
@@ -699,22 +719,61 @@ async def _handle_request(
699719
)
700720
)
701721
response = await handler(req)
722+
success = not isinstance(response, types.ErrorData)
702723
except McpError as err: # pragma: no cover
703724
response = err.error
725+
try:
726+
session.instrumenter.on_error(
727+
request_id=message.request_id,
728+
error=err,
729+
error_type=type(err).__name__,
730+
)
731+
except Exception: # pragma: no cover
732+
logger.exception("Error in instrumentation on_error")
704733
except anyio.get_cancelled_exc_class(): # pragma: no cover
705734
logger.info(
706735
"Request %s cancelled - duplicate response suppressed",
707736
message.request_id,
737+
extra=log_extra,
708738
)
739+
try:
740+
session.instrumenter.on_request_end(
741+
request_id=message.request_id,
742+
request_type=request_type,
743+
success=False,
744+
duration_seconds=time.monotonic() - start_time,
745+
cancelled=True,
746+
)
747+
except Exception: # pragma: no cover
748+
logger.exception("Error in instrumentation on_request_end")
709749
return
710750
except Exception as err: # pragma: no cover
751+
try:
752+
session.instrumenter.on_error(
753+
request_id=message.request_id,
754+
error=err,
755+
error_type=type(err).__name__,
756+
)
757+
except Exception: # pragma: no cover
758+
logger.exception("Error in instrumentation on_error")
711759
if raise_exceptions:
712760
raise err
713761
response = types.ErrorData(code=0, message=str(err), data=None)
714762
finally:
715763
# Reset the global state after we are done
716764
if token is not None: # pragma: no branch
717765
request_ctx.reset(token)
766+
767+
# End instrumentation
768+
try:
769+
session.instrumenter.on_request_end(
770+
request_id=message.request_id,
771+
request_type=request_type,
772+
success=success,
773+
duration_seconds=time.monotonic() - start_time,
774+
)
775+
except Exception: # pragma: no cover
776+
logger.exception("Error in instrumentation on_request_end")
718777

719778
await message.respond(response)
720779
else: # pragma: no cover
@@ -724,8 +783,18 @@ async def _handle_request(
724783
message="Method not found",
725784
)
726785
)
786+
try:
787+
session.instrumenter.on_request_end(
788+
request_id=message.request_id,
789+
request_type=request_type,
790+
success=False,
791+
duration_seconds=time.monotonic() - start_time,
792+
error="Method not found",
793+
)
794+
except Exception: # pragma: no cover
795+
logger.exception("Error in instrumentation on_request_end")
727796

728-
logger.debug("Response sent")
797+
logger.debug("Response sent", extra=log_extra)
729798

730799
async def _handle_notification(self, notify: Any):
731800
if handler := self.notification_handlers.get(type(notify)): # type: ignore

0 commit comments

Comments
 (0)