Skip to content

Commit 1c674cd

Browse files
committed
test: Add additional MCP transport context integration tests
- Add integration tests for transport context propagation between MCP clients and servers - Test both sync and async server implementations across all transport types (stateless, streamable, SSE) - Cover Spring WebFlux and WebMVC environments with dedicated test suites - Validate context flow through HTTP headers for authentication, correlation IDs, and metadata - Rename existing McpTransportContextIntegrationTests to SyncServerMcpTransportContextIntegrationTests for clarity Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 629464b commit 1c674cd

File tree

6 files changed

+1363
-2
lines changed

6 files changed

+1363
-2
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,381 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.common;
6+
7+
import java.util.Map;
8+
import java.util.function.BiFunction;
9+
import java.util.function.Supplier;
10+
11+
import com.fasterxml.jackson.databind.ObjectMapper;
12+
import io.modelcontextprotocol.client.McpAsyncClient;
13+
import io.modelcontextprotocol.client.McpClient;
14+
import io.modelcontextprotocol.client.McpSyncClient;
15+
import io.modelcontextprotocol.client.transport.WebClientStreamableHttpTransport;
16+
import io.modelcontextprotocol.client.transport.WebFluxSseClientTransport;
17+
import io.modelcontextprotocol.server.McpAsyncServerExchange;
18+
import io.modelcontextprotocol.server.McpServer;
19+
import io.modelcontextprotocol.server.McpServerFeatures;
20+
import io.modelcontextprotocol.server.McpStatelessServerFeatures;
21+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
22+
import io.modelcontextprotocol.server.TestUtil;
23+
import io.modelcontextprotocol.server.transport.WebFluxSseServerTransportProvider;
24+
import io.modelcontextprotocol.server.transport.WebFluxStatelessServerTransport;
25+
import io.modelcontextprotocol.server.transport.WebFluxStreamableServerTransportProvider;
26+
import io.modelcontextprotocol.spec.McpSchema;
27+
import org.junit.jupiter.api.AfterEach;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.Timeout;
30+
import reactor.core.publisher.Mono;
31+
import reactor.netty.DisposableServer;
32+
import reactor.netty.http.server.HttpServer;
33+
import reactor.test.StepVerifier;
34+
35+
import org.springframework.http.server.reactive.HttpHandler;
36+
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
37+
import org.springframework.web.reactive.function.client.ClientRequest;
38+
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
39+
import org.springframework.web.reactive.function.client.WebClient;
40+
import org.springframework.web.reactive.function.server.RouterFunction;
41+
import org.springframework.web.reactive.function.server.RouterFunctions;
42+
import org.springframework.web.reactive.function.server.ServerRequest;
43+
44+
import static org.assertj.core.api.Assertions.assertThat;
45+
46+
/**
47+
* Integration tests for {@link McpTransportContext} propagation between MCP clients and
48+
* async servers using Spring WebFlux infrastructure.
49+
*
50+
* <p>
51+
* This test class validates the end-to-end flow of transport context propagation in MCP
52+
* communication for asynchronous server implementations. It tests various combinations of
53+
* client types (sync/async) and server transport mechanisms (stateless, streamable, SSE)
54+
* to ensure proper context handling across different configurations.
55+
*
56+
* <h2>Context Propagation Flow</h2>
57+
* <ol>
58+
* <li>Client sets a value in its transport context (either via thread-local for sync or
59+
* Reactor context for async)</li>
60+
* <li>Client-side context provider extracts the value and adds it as an HTTP header to
61+
* the request</li>
62+
* <li>Server-side context extractor reads the header from the incoming request</li>
63+
* <li>Server handler receives the extracted context and returns the value as the tool
64+
* call result</li>
65+
* <li>Test verifies the round-trip context propagation was successful</li>
66+
* </ol>
67+
*
68+
* @author Daniel Garnier-Moiroux
69+
* @author Christian Tzolov
70+
* @see McpTransportContext
71+
* @see McpTransportContextExtractor
72+
* @see WebFluxStatelessServerTransport
73+
* @see WebFluxStreamableServerTransportProvider
74+
* @see WebFluxSseServerTransportProvider
75+
*/
76+
@Timeout(15)
77+
public class AsyncServerMcpTransportContextIntegrationTests {
78+
79+
private static final int PORT = TestUtil.findAvailablePort();
80+
81+
private static final ThreadLocal<String> SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER = new ThreadLocal<>();
82+
83+
private static final String HEADER_NAME = "x-test";
84+
85+
// Sync client context provider
86+
private final Supplier<McpTransportContext> syncClientContextProvider = () -> {
87+
var headerValue = SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER.get();
88+
return headerValue != null ? McpTransportContext.create(Map.of("client-side-header-value", headerValue))
89+
: McpTransportContext.EMPTY;
90+
};
91+
92+
// Async client context provider
93+
ExchangeFilterFunction asyncClientContextProvider = (request, next) -> Mono.deferContextual(ctx -> {
94+
var context = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
95+
// // do stuff with the context
96+
var headerValue = context.get("client-side-header-value");
97+
if (headerValue == null) {
98+
return next.exchange(request);
99+
}
100+
var reqWithHeader = ClientRequest.from(request).header(HEADER_NAME, headerValue.toString()).build();
101+
return next.exchange(reqWithHeader);
102+
});
103+
104+
// Tools
105+
private final McpSchema.Tool tool = McpSchema.Tool.builder()
106+
.name("test-tool")
107+
.description("return the value of the x-test header from call tool request")
108+
.build();
109+
110+
private final BiFunction<McpTransportContext, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> asyncStatelessHandler = (
111+
transportContext, request) -> {
112+
return Mono
113+
.just(new McpSchema.CallToolResult(transportContext.get("server-side-header-value").toString(), null));
114+
};
115+
116+
private final BiFunction<McpAsyncServerExchange, McpSchema.CallToolRequest, Mono<McpSchema.CallToolResult>> asyncStatefulHandler = (
117+
exchange, request) -> {
118+
return asyncStatelessHandler.apply(exchange.transportContext(), request);
119+
};
120+
121+
// Server context extractor
122+
private final McpTransportContextExtractor<ServerRequest> serverContextExtractor = (ServerRequest r) -> {
123+
var headerValue = r.headers().firstHeader(HEADER_NAME);
124+
return headerValue != null ? McpTransportContext.create(Map.of("server-side-header-value", headerValue))
125+
: McpTransportContext.EMPTY;
126+
};
127+
128+
// Server transports
129+
private final WebFluxStatelessServerTransport statelessServerTransport = WebFluxStatelessServerTransport.builder()
130+
.objectMapper(new ObjectMapper())
131+
.contextExtractor(serverContextExtractor)
132+
.build();
133+
134+
private final WebFluxStreamableServerTransportProvider streamableServerTransport = WebFluxStreamableServerTransportProvider
135+
.builder()
136+
.objectMapper(new ObjectMapper())
137+
.contextExtractor(serverContextExtractor)
138+
.build();
139+
140+
private final WebFluxSseServerTransportProvider sseServerTransport = WebFluxSseServerTransportProvider.builder()
141+
.objectMapper(new ObjectMapper())
142+
.contextExtractor(serverContextExtractor)
143+
.messageEndpoint("/mcp/message")
144+
.build();
145+
146+
// Async clients
147+
private final McpAsyncClient asyncStreamableClient = McpClient
148+
.async(WebClientStreamableHttpTransport
149+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
150+
.build())
151+
.build();
152+
153+
private final McpAsyncClient asyncSseClient = McpClient
154+
.async(WebFluxSseClientTransport
155+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
156+
.build())
157+
.build();
158+
159+
// Sync clients
160+
private final McpSyncClient syncStreamableClient = McpClient
161+
.sync(WebClientStreamableHttpTransport
162+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
163+
.build())
164+
.transportContextProvider(syncClientContextProvider)
165+
.build();
166+
167+
private final McpSyncClient syncSseClient = McpClient
168+
.sync(WebFluxSseClientTransport
169+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT).filter(asyncClientContextProvider))
170+
.build())
171+
.transportContextProvider(syncClientContextProvider)
172+
.build();
173+
174+
private DisposableServer httpServer;
175+
176+
@AfterEach
177+
public void after() {
178+
SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER.remove();
179+
if (statelessServerTransport != null) {
180+
statelessServerTransport.closeGracefully().block();
181+
}
182+
if (streamableServerTransport != null) {
183+
streamableServerTransport.closeGracefully().block();
184+
}
185+
if (sseServerTransport != null) {
186+
sseServerTransport.closeGracefully().block();
187+
}
188+
stopHttpServer();
189+
}
190+
191+
@Test
192+
void syncClientStatelessServer() {
193+
194+
startHttpServer(statelessServerTransport.getRouterFunction());
195+
196+
var mcpServer = McpServer.async(statelessServerTransport)
197+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
198+
.tools(new McpStatelessServerFeatures.AsyncToolSpecification(tool, asyncStatelessHandler))
199+
.build();
200+
201+
McpSchema.InitializeResult initResult = syncStreamableClient.initialize();
202+
assertThat(initResult).isNotNull();
203+
204+
SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER.set("some important value");
205+
McpSchema.CallToolResult response = syncStreamableClient
206+
.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
207+
208+
assertThat(response).isNotNull();
209+
assertThat(response.content()).hasSize(1)
210+
.first()
211+
.extracting(McpSchema.TextContent.class::cast)
212+
.extracting(McpSchema.TextContent::text)
213+
.isEqualTo("some important value");
214+
215+
mcpServer.close();
216+
}
217+
218+
@Test
219+
void asyncClientStatelessServer() {
220+
221+
startHttpServer(statelessServerTransport.getRouterFunction());
222+
223+
var mcpServer = McpServer.async(statelessServerTransport)
224+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
225+
.tools(new McpStatelessServerFeatures.AsyncToolSpecification(tool, asyncStatelessHandler))
226+
.build();
227+
228+
StepVerifier.create(asyncStreamableClient.initialize()).assertNext(initResult -> {
229+
assertThat(initResult).isNotNull();
230+
}).verifyComplete();
231+
232+
// Test tool call with context
233+
StepVerifier
234+
.create(asyncStreamableClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
235+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
236+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
237+
.assertNext(response -> {
238+
assertThat(response).isNotNull();
239+
assertThat(response.content()).hasSize(1)
240+
.first()
241+
.extracting(McpSchema.TextContent.class::cast)
242+
.extracting(McpSchema.TextContent::text)
243+
.isEqualTo("some important value");
244+
})
245+
.verifyComplete();
246+
247+
mcpServer.close();
248+
}
249+
250+
@Test
251+
void syncClientStreamableServer() {
252+
253+
startHttpServer(streamableServerTransport.getRouterFunction());
254+
255+
var mcpServer = McpServer.async(streamableServerTransport)
256+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
257+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
258+
.build();
259+
260+
McpSchema.InitializeResult initResult = syncStreamableClient.initialize();
261+
assertThat(initResult).isNotNull();
262+
263+
SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER.set("some important value");
264+
McpSchema.CallToolResult response = syncStreamableClient
265+
.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
266+
267+
assertThat(response).isNotNull();
268+
assertThat(response.content()).hasSize(1)
269+
.first()
270+
.extracting(McpSchema.TextContent.class::cast)
271+
.extracting(McpSchema.TextContent::text)
272+
.isEqualTo("some important value");
273+
274+
mcpServer.close();
275+
}
276+
277+
@Test
278+
void asyncClientStreamableServer() {
279+
280+
startHttpServer(streamableServerTransport.getRouterFunction());
281+
282+
var mcpServer = McpServer.async(streamableServerTransport)
283+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
284+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
285+
.build();
286+
287+
StepVerifier.create(asyncStreamableClient.initialize()).assertNext(initResult -> {
288+
assertThat(initResult).isNotNull();
289+
}).verifyComplete();
290+
291+
// Test tool call with context
292+
StepVerifier
293+
.create(asyncStreamableClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
294+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
295+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
296+
.assertNext(response -> {
297+
assertThat(response).isNotNull();
298+
assertThat(response.content()).hasSize(1)
299+
.first()
300+
.extracting(McpSchema.TextContent.class::cast)
301+
.extracting(McpSchema.TextContent::text)
302+
.isEqualTo("some important value");
303+
})
304+
.verifyComplete();
305+
306+
mcpServer.close();
307+
}
308+
309+
@Test
310+
void syncClientSseServer() {
311+
312+
startHttpServer(sseServerTransport.getRouterFunction());
313+
314+
var mcpServer = McpServer.async(sseServerTransport)
315+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
316+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
317+
.build();
318+
319+
McpSchema.InitializeResult initResult = syncSseClient.initialize();
320+
assertThat(initResult).isNotNull();
321+
322+
SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER.set("some important value");
323+
McpSchema.CallToolResult response = syncSseClient
324+
.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()));
325+
326+
assertThat(response).isNotNull();
327+
assertThat(response.content()).hasSize(1)
328+
.first()
329+
.extracting(McpSchema.TextContent.class::cast)
330+
.extracting(McpSchema.TextContent::text)
331+
.isEqualTo("some important value");
332+
333+
mcpServer.close();
334+
}
335+
336+
@Test
337+
void asyncClientSseServer() {
338+
339+
startHttpServer(sseServerTransport.getRouterFunction());
340+
341+
var mcpServer = McpServer.async(sseServerTransport)
342+
.capabilities(McpSchema.ServerCapabilities.builder().tools(true).build())
343+
.tools(new McpServerFeatures.AsyncToolSpecification(tool, null, asyncStatefulHandler))
344+
.build();
345+
346+
StepVerifier.create(asyncSseClient.initialize()).assertNext(initResult -> {
347+
assertThat(initResult).isNotNull();
348+
}).verifyComplete();
349+
350+
// Test tool call with context
351+
StepVerifier
352+
.create(asyncSseClient.callTool(new McpSchema.CallToolRequest("test-tool", Map.of()))
353+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY,
354+
McpTransportContext.create(Map.of("client-side-header-value", "some important value")))))
355+
.assertNext(response -> {
356+
assertThat(response).isNotNull();
357+
assertThat(response.content()).hasSize(1)
358+
.first()
359+
.extracting(McpSchema.TextContent.class::cast)
360+
.extracting(McpSchema.TextContent::text)
361+
.isEqualTo("some important value");
362+
})
363+
.verifyComplete();
364+
365+
mcpServer.close();
366+
}
367+
368+
private void startHttpServer(RouterFunction<?> routerFunction) {
369+
370+
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
371+
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
372+
this.httpServer = HttpServer.create().port(PORT).handle(adapter).bindNow();
373+
}
374+
375+
private void stopHttpServer() {
376+
if (httpServer != null) {
377+
httpServer.disposeNow();
378+
}
379+
}
380+
381+
}

0 commit comments

Comments
 (0)