Skip to content

Commit 1209d96

Browse files
committed
[minimcp] Add ContextManager for handler contexts
- Implement ContextManager for tracking active handler contexts - Add Context dataclass for holding request metadata (message, time_limiter, scope, responder) - Support thread-safe and async-safe context isolation using contextvars - Add helper methods (get_scope, get_responder) for common access patterns - Add comprehensive unit test suite
1 parent 0b2de2a commit 1209d96

File tree

2 files changed

+370
-0
lines changed

2 files changed

+370
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
import logging
2+
from contextlib import contextmanager
3+
from contextvars import ContextVar, Token
4+
from dataclasses import dataclass
5+
from typing import Generic, TypeVar
6+
7+
from mcp.server.minimcp.exceptions import ContextError
8+
from mcp.server.minimcp.limiter import TimeLimiter
9+
from mcp.server.minimcp.responder import Responder
10+
from mcp.types import JSONRPCMessage
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
ScopeT = TypeVar("ScopeT", bound=object)
16+
17+
18+
@dataclass(slots=True) # Use NamedTuple once MCP drops support for Python 3.10
19+
class Context(Generic[ScopeT]):
20+
"""
21+
Context object holds request metadata available to message handlers.
22+
23+
The Context provides access to the current message, time limiter for managing
24+
idle timeout, optional scope object for passing custom data (auth, user info,
25+
session data, database handles, etc.), and optional responder for sending
26+
notifications back to the client.
27+
28+
Attributes:
29+
message: The parsed JSON-RPC request message being handled.
30+
time_limiter: TimeLimiter for managing handler idle timeout. Call
31+
time_limiter.reset() to extend the deadline during active processing.
32+
scope: Optional scope object passed to mcp.handle(). Use this to pass
33+
authentication details, user info, session data, or database handles
34+
to your handlers.
35+
responder: Optional responder for sending notifications to the client.
36+
Available when bidirectional communication is supported by the transport.
37+
"""
38+
39+
message: JSONRPCMessage
40+
time_limiter: TimeLimiter
41+
scope: ScopeT | None = None
42+
responder: Responder | None = None
43+
44+
45+
class ContextManager(Generic[ScopeT]):
46+
"""
47+
ContextManager tracks the currently active handler context.
48+
49+
The ContextManager provides access to request metadata (such as the message,
50+
scope, responder, and timeout) directly inside handlers. It uses contextvars
51+
to maintain thread-safe and async-safe context isolation across concurrent
52+
handler executions.
53+
54+
You can retrieve the current context using mcp.context.get(). If called
55+
outside of a handler, this method raises a ContextError.
56+
57+
For common use cases, helper methods (get_scope, get_responder) are provided
58+
to avoid null checks when accessing optional context attributes.
59+
"""
60+
61+
_ctx: ContextVar[Context[ScopeT]] = ContextVar("ctx")
62+
63+
@contextmanager
64+
def active(self, context: Context[ScopeT]):
65+
"""Set the active context for the current handler execution.
66+
67+
This context manager sets the context at the start of handler execution
68+
and clears it when the handler completes, ensuring proper cleanup.
69+
70+
Args:
71+
context: The context to make active during handler execution.
72+
73+
Yields:
74+
None. The context is accessible via get() during execution.
75+
"""
76+
# Set context
77+
token: Token[Context[ScopeT]] = self._ctx.set(context)
78+
try:
79+
yield
80+
finally:
81+
# Clear context
82+
self._ctx.reset(token)
83+
84+
def get(self) -> Context[ScopeT]:
85+
"""Get the current handler context.
86+
87+
Returns:
88+
The active Context object containing message, time_limiter, scope,
89+
and responder.
90+
91+
Raises:
92+
ContextError: If called outside of an active handler context.
93+
"""
94+
try:
95+
return self._ctx.get()
96+
except LookupError as e:
97+
msg = "No Context: Called mcp.context.get() outside of an active handler context"
98+
logger.error(msg)
99+
raise ContextError(msg) from e
100+
101+
def get_scope(self) -> ScopeT:
102+
"""Get the scope object from the current context.
103+
104+
This helper method retrieves the scope and raises an error if it's not
105+
available, avoiding the need for null checks in your code.
106+
107+
Returns:
108+
The scope object passed to mcp.handle().
109+
110+
Raises:
111+
ContextError: If called outside of an active handler context or if
112+
no scope was provided to mcp.handle().
113+
"""
114+
scope = self.get().scope
115+
if scope is None:
116+
raise ContextError("ContextError: Scope is not available in current context")
117+
return scope
118+
119+
def get_responder(self) -> Responder:
120+
"""Get the responder from the current context.
121+
122+
This helper method retrieves the responder and raises an error if it's not
123+
available, avoiding the need for null checks in your code. The responder
124+
is only available when using transports that support bidirectional
125+
communication (stdio, Streamable HTTP).
126+
127+
Returns:
128+
The Responder for sending notifications to the client.
129+
130+
Raises:
131+
ContextError: If called outside of an active handler context or if
132+
the responder is not available (e.g., when using HTTP transport).
133+
"""
134+
responder = self.get().responder
135+
if responder is None:
136+
raise ContextError("ContextError: Responder is not available in current context")
137+
return responder
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
from unittest.mock import Mock
2+
3+
import pytest
4+
5+
import mcp.types as types
6+
from mcp.server.minimcp.exceptions import ContextError
7+
from mcp.server.minimcp.limiter import TimeLimiter
8+
from mcp.server.minimcp.managers.context_manager import Context, ContextManager
9+
from mcp.server.minimcp.responder import Responder
10+
11+
pytestmark = pytest.mark.anyio
12+
13+
14+
class TestContext:
15+
"""Test suite for Context dataclass."""
16+
17+
def test_context_creation_minimal(self):
18+
"""Test creating a Context with minimal required fields."""
19+
message = types.JSONRPCMessage(types.JSONRPCRequest(method="test", id=1, jsonrpc="2.0"))
20+
time_limiter = Mock(spec=TimeLimiter)
21+
22+
context = Context[None](message=message, time_limiter=time_limiter)
23+
24+
assert context.message == message
25+
assert context.time_limiter == time_limiter
26+
assert context.scope is None
27+
assert context.responder is None
28+
29+
def test_context_creation_with_all_fields(self):
30+
"""Test creating a Context with all fields."""
31+
message = types.JSONRPCMessage(types.JSONRPCRequest(method="test", id=1, jsonrpc="2.0"))
32+
time_limiter = Mock(spec=TimeLimiter)
33+
scope = {"test": "scope"}
34+
responder = Mock(spec=Responder)
35+
36+
context = Context(message=message, time_limiter=time_limiter, scope=scope, responder=responder)
37+
38+
assert context.message == message
39+
assert context.time_limiter == time_limiter
40+
assert context.scope == scope
41+
assert context.responder == responder
42+
43+
44+
class TestContextManager:
45+
"""Test suite for ContextManager class."""
46+
47+
@pytest.fixture
48+
def sample_context(self) -> Context[dict[str, str]]:
49+
"""Create a sample Context for testing."""
50+
message = types.JSONRPCMessage(types.JSONRPCRequest(method="test", id=1, jsonrpc="2.0"))
51+
time_limiter = Mock(spec=TimeLimiter)
52+
scope = {"test": "scope"}
53+
responder = Mock(spec=Responder)
54+
55+
return Context[dict[str, str]](message=message, time_limiter=time_limiter, scope=scope, responder=responder)
56+
57+
def test_get_without_active_context_raises_error(self):
58+
"""Test that get() raises ContextError when no context is active."""
59+
context_manager = ContextManager[None]()
60+
with pytest.raises(ContextError, match="outside of an active handler context"):
61+
context_manager.get()
62+
63+
def test_get_scope_without_active_context_raises_error(self):
64+
"""Test that get_scope() raises ContextError when no context is active."""
65+
context_manager = ContextManager[None]()
66+
with pytest.raises(ContextError, match="outside of an active handler context"):
67+
context_manager.get_scope()
68+
69+
def test_get_responder_without_active_context_raises_error(self):
70+
"""Test that get_responder() raises ContextError when no context is active."""
71+
context_manager = ContextManager[None]()
72+
with pytest.raises(ContextError, match="outside of an active handler context"):
73+
context_manager.get_responder()
74+
75+
def test_active_context_manager(self, sample_context: Context[dict[str, str]]):
76+
"""Test the active context manager sets and clears context properly."""
77+
context_manager = ContextManager[dict[str, str]]()
78+
# Verify no context initially
79+
with pytest.raises(ContextError):
80+
context_manager.get()
81+
82+
# Use context manager
83+
with context_manager.active(sample_context):
84+
# Context should be available
85+
retrieved_context = context_manager.get()
86+
assert retrieved_context == sample_context
87+
88+
# Context should be cleared after exiting
89+
with pytest.raises(ContextError):
90+
context_manager.get()
91+
92+
def test_get_within_active_context(self, sample_context: Context[dict[str, str]]):
93+
"""Test that get() returns the active context."""
94+
context_manager = ContextManager[dict[str, str]]()
95+
with context_manager.active(sample_context):
96+
retrieved_context = context_manager.get()
97+
assert retrieved_context == sample_context
98+
assert retrieved_context.message == sample_context.message
99+
assert retrieved_context.time_limiter == sample_context.time_limiter
100+
assert retrieved_context.scope == sample_context.scope
101+
assert retrieved_context.responder == sample_context.responder
102+
103+
def test_get_scope_within_active_context(self, sample_context: Context[dict[str, str]]):
104+
"""Test that get_scope() returns the scope from active context."""
105+
context_manager = ContextManager[dict[str, str]]()
106+
with context_manager.active(sample_context):
107+
scope = context_manager.get_scope()
108+
assert scope == sample_context.scope
109+
110+
def test_get_scope_when_scope_is_none_raises_error(self):
111+
"""Test that get_scope() raises ContextError when scope is None."""
112+
113+
context_manager = ContextManager[None]()
114+
message = types.JSONRPCMessage(types.JSONRPCRequest(method="test", id=1, jsonrpc="2.0"))
115+
time_limiter = Mock(spec=TimeLimiter)
116+
117+
context_without_scope = Context[None](message=message, time_limiter=time_limiter, scope=None)
118+
119+
with context_manager.active(context_without_scope):
120+
with pytest.raises(ContextError, match="Scope is not available in current context"):
121+
context_manager.get_scope()
122+
123+
def test_get_responder_within_active_context(self, sample_context: Context[dict[str, str]]):
124+
"""Test that get_responder() returns the responder from active context."""
125+
context_manager = ContextManager[dict[str, str]]()
126+
with context_manager.active(sample_context):
127+
responder = context_manager.get_responder()
128+
assert responder == sample_context.responder
129+
130+
def test_get_responder_when_responder_is_none_raises_error(self):
131+
"""Test that get_responder() raises ContextError when responder is None."""
132+
context_manager = ContextManager[None]()
133+
message = types.JSONRPCMessage(types.JSONRPCRequest(method="test", id=1, jsonrpc="2.0"))
134+
time_limiter = Mock(spec=TimeLimiter)
135+
context_without_responder = Context[None](message=message, time_limiter=time_limiter, responder=None)
136+
137+
with context_manager.active(context_without_responder):
138+
with pytest.raises(ContextError, match="Responder is not available in current context"):
139+
context_manager.get_responder()
140+
141+
def test_nested_context_managers(self):
142+
"""Test nested context managers work correctly."""
143+
context_manager = ContextManager[str]()
144+
message1 = types.JSONRPCMessage(types.JSONRPCRequest(method="test1", id=1, jsonrpc="2.0"))
145+
message2 = types.JSONRPCMessage(types.JSONRPCRequest(method="test2", id=2, jsonrpc="2.0"))
146+
time_limiter = Mock(spec=TimeLimiter)
147+
148+
context1 = Context[str](message=message1, time_limiter=time_limiter, scope="scope1")
149+
context2 = Context[str](message=message2, time_limiter=time_limiter, scope="scope2")
150+
151+
with context_manager.active(context1):
152+
assert context_manager.get_scope() == "scope1"
153+
154+
with context_manager.active(context2):
155+
assert context_manager.get_scope() == "scope2"
156+
157+
# Should return to outer context
158+
assert context_manager.get_scope() == "scope1"
159+
160+
def test_context_manager_exception_handling(self, sample_context: Context[dict[str, str]]):
161+
"""Test that context is properly cleared even when exception occurs."""
162+
context_manager = ContextManager[dict[str, str]]()
163+
with pytest.raises(ValueError):
164+
with context_manager.active(sample_context):
165+
# Context should be available
166+
assert context_manager.get() == sample_context
167+
# Raise exception
168+
raise ValueError("Test exception")
169+
170+
# Context should be cleared even after exception
171+
with pytest.raises(ContextError):
172+
context_manager.get()
173+
174+
def test_multiple_context_managers_share_context_var(self):
175+
"""Test that multiple ContextManager instances share the same ContextVar (by design)."""
176+
manager1 = ContextManager[str]()
177+
manager2 = ContextManager[str]()
178+
179+
message1 = types.JSONRPCMessage(types.JSONRPCRequest(method="test1", id=1, jsonrpc="2.0"))
180+
message2 = types.JSONRPCMessage(types.JSONRPCRequest(method="test2", id=2, jsonrpc="2.0"))
181+
time_limiter = Mock(spec=TimeLimiter)
182+
183+
context1 = Context[str](message=message1, time_limiter=time_limiter, scope="scope1")
184+
context2 = Context[str](message=message2, time_limiter=time_limiter, scope="scope2")
185+
186+
# ContextVar is shared across instances, so the last set context wins
187+
with manager1.active(context1):
188+
with manager2.active(context2):
189+
# Both managers see the same context (context2) since they share the ContextVar
190+
assert manager1.get_scope() == "scope2"
191+
assert manager2.get_scope() == "scope2"
192+
193+
async def test_context_var_isolation(self):
194+
"""Test that ContextVar properly isolates contexts across different execution contexts."""
195+
import anyio
196+
197+
context_manager = ContextManager[str]()
198+
199+
message1 = types.JSONRPCMessage(types.JSONRPCRequest(method="test1", id=1, jsonrpc="2.0"))
200+
message2 = types.JSONRPCMessage(types.JSONRPCRequest(method="test2", id=2, jsonrpc="2.0"))
201+
time_limiter = Mock(spec=TimeLimiter)
202+
203+
context1 = Context[str](message=message1, time_limiter=time_limiter, scope="scope1")
204+
context2 = Context[str](message=message2, time_limiter=time_limiter, scope="scope2")
205+
206+
async def task1():
207+
with context_manager.active(context1):
208+
await anyio.sleep(0.01)
209+
return context_manager.get_scope()
210+
211+
async def task2():
212+
with context_manager.active(context2):
213+
await anyio.sleep(0.01)
214+
return context_manager.get_scope()
215+
216+
results: list[str] = []
217+
218+
async def collect_task1():
219+
result = await task1()
220+
results.append(result)
221+
222+
async def collect_task2():
223+
result = await task2()
224+
results.append(result)
225+
226+
async with anyio.create_task_group() as tg:
227+
tg.start_soon(collect_task1)
228+
tg.start_soon(collect_task2)
229+
230+
# Results order may vary, so check both are present
231+
assert "scope1" in results
232+
assert "scope2" in results
233+
assert len(results) == 2

0 commit comments

Comments
 (0)