Skip to content

Commit 8bb3f8d

Browse files
committed
fix: add session management for stateful HTTP MCP connection
HTTP server uses stateful sessions. Updated test to: - Create MCPHttpSession class to manage session state - Extract session ID from SSE comments (: session: <id>) - Send X-Session-ID header in subsequent requests - Reuse requests.Session() for connection pooling This matches rmcp StreamableHttpService session management where: 1. First request creates session, returns ID in SSE comment 2. Subsequent requests must include X-Session-ID header 3. Session persists MCP state across tool calls Without session management, server returns 401 Unauthorized.
1 parent ec9a592 commit 8bb3f8d

File tree

1 file changed

+78
-44
lines changed

1 file changed

+78
-44
lines changed

test_http_agentic_tools.py

Lines changed: 78 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -161,57 +161,87 @@ def check_server():
161161
print(f" ./start_http_server.sh")
162162
return False
163163

164-
def send_mcp_request(payload, timeout=60):
165-
"""Send MCP request via HTTP POST and wait for SSE response."""
166-
try:
167-
start_time = time.time()
164+
class MCPHttpSession:
165+
"""Manages stateful MCP session over HTTP with SSE."""
166+
167+
def __init__(self, base_url):
168+
self.base_url = base_url
169+
self.session = requests.Session()
170+
self.session_id = None
171+
172+
def send_request(self, payload, timeout=60):
173+
"""Send MCP request via HTTP POST and wait for SSE response."""
174+
try:
175+
start_time = time.time()
168176

169-
# HTTP server returns SSE stream - need to accept text/event-stream
170-
response = requests.post(
171-
f"{BASE_URL}/mcp",
172-
headers={
177+
# HTTP server returns SSE stream - need to accept text/event-stream
178+
headers = {
173179
"Content-Type": "application/json",
174180
"Accept": "text/event-stream"
175-
},
176-
json=payload,
177-
timeout=timeout,
178-
stream=True
179-
)
181+
}
180182

181-
duration = time.time() - start_time
183+
# Add session ID if we have one
184+
if self.session_id:
185+
headers["X-Session-ID"] = self.session_id
182186

183-
if response.status_code == 200:
184-
# Parse SSE stream for JSON-RPC responses
185-
result_data = None
186-
for line in response.iter_lines(decode_unicode=True):
187-
if not line or line.startswith(':'):
188-
continue
189-
if line.startswith('data: '):
190-
data = line[6:] # Remove 'data: ' prefix
191-
try:
192-
event = json.loads(data)
193-
# Look for the final result
194-
if "result" in event:
195-
result_data = event
196-
break
197-
except json.JSONDecodeError:
187+
response = self.session.post(
188+
f"{self.base_url}/mcp",
189+
headers=headers,
190+
json=payload,
191+
timeout=timeout,
192+
stream=True
193+
)
194+
195+
if response.status_code == 200:
196+
# Parse SSE stream for JSON-RPC responses and session ID
197+
result_data = None
198+
199+
for line in response.iter_lines(decode_unicode=True):
200+
if not line:
201+
continue
202+
203+
# SSE comments (session info)
204+
if line.startswith(':'):
205+
# Check for session ID in comments
206+
if 'session:' in line:
207+
session_id = line.split('session:')[1].strip()
208+
if self.session_id is None:
209+
self.session_id = session_id
210+
print(f" 📝 Session ID: {session_id[:16]}...")
198211
continue
199212

200-
if result_data:
201-
return result_data, time.time() - start_time
213+
# SSE data events
214+
if line.startswith('data: '):
215+
data = line[6:] # Remove 'data: ' prefix
216+
try:
217+
event = json.loads(data)
218+
# Look for the final result
219+
if "result" in event:
220+
result_data = event
221+
# Don't break - keep reading to consume full stream
222+
elif "error" in event:
223+
print(f"❌ MCP error: {event['error']}")
224+
return None, time.time() - start_time
225+
except json.JSONDecodeError as e:
226+
print(f"⚠️ Failed to parse SSE data: {e}")
227+
continue
228+
229+
duration = time.time() - start_time
230+
if result_data:
231+
return result_data, duration
232+
else:
233+
print(f"⚠️ No result found in SSE stream")
234+
return None, duration
202235
else:
203-
print(f"⚠️ No result found in SSE stream")
236+
print(f"❌ HTTP {response.status_code}: {response.text[:200]}")
204237
return None, time.time() - start_time
205-
else:
206-
print(f"❌ HTTP {response.status_code}: {response.text[:200]}")
207-
return None, duration
208238

209-
except requests.exceptions.Timeout:
210-
print(f"⚠️ Request timed out after {timeout}s")
211-
return None, timeout
212-
except requests.exceptions.RequestException as e:
213-
print(f"❌ Request failed: {e}")
214-
return None, 0
239+
except requests.exceptions.Timeout:
240+
print(f"⚠️ Request timed out after {timeout}s")
241+
return None, timeout
242+
except requests.exceptions.RequestException as e:
243+
print(f"❌ Request failed: {e}")
244+
return None, 0
215245

216246
def extract_file_locations(structured_output):
217247
"""Extract file locations from structured output."""
@@ -251,6 +281,9 @@ def run():
251281
if not check_server():
252282
return 1
253283

284+
# Create stateful MCP session
285+
session = MCPHttpSession(BASE_URL)
286+
254287
# MCP initialization
255288
print("\n" + "=" * 72)
256289
print("Initializing MCP connection...")
@@ -267,10 +300,11 @@ def run():
267300
}
268301
}
269302

270-
init_response, _ = send_mcp_request(init_req, timeout=5)
303+
init_response, _ = session.send_request(init_req, timeout=5)
271304
if not init_response or "error" in init_response:
272305
print("❌ Initialization failed")
273-
print(json.dumps(init_response, indent=2))
306+
if init_response:
307+
print(json.dumps(init_response, indent=2))
274308
return 1
275309

276310
print("✓ MCP connection initialized")
@@ -294,7 +328,7 @@ def run():
294328
start_time = time.time()
295329
print(f"⏳ Sending request...", end="", flush=True)
296330

297-
response, duration = send_mcp_request(payload, timeout=timeout)
331+
response, duration = session.send_request(payload, timeout=timeout)
298332

299333
# Parse result
300334
success = False

0 commit comments

Comments
 (0)