66
77import java .util .Map ;
88import java .util .function .BiFunction ;
9- import java .util .function .Supplier ;
109
1110import com .fasterxml .jackson .databind .ObjectMapper ;
1211import io .modelcontextprotocol .client .McpAsyncClient ;
1312import io .modelcontextprotocol .client .McpClient ;
14- import io .modelcontextprotocol .client .McpSyncClient ;
1513import io .modelcontextprotocol .client .transport .WebClientStreamableHttpTransport ;
1614import io .modelcontextprotocol .client .transport .WebFluxSseClientTransport ;
1715import io .modelcontextprotocol .server .McpAsyncServerExchange ;
4947 *
5048 * <p>
5149 * This test class validates the end-to-end flow of transport context propagation in MCP
52- * communication for asynchronous server implementations. It tests various combinations of
53- * client types (sync/async) and server transport mechanisms (stateless, streamable, SSE)
54- * to ensure proper context handling across different configurations.
50+ * communication for asynchronous client and server implementations. It tests various
51+ * combinations of client types and server transport mechanisms (stateless, streamable,
52+ * SSE) to ensure proper context handling across different configurations.
5553 *
5654 * <h2>Context Propagation Flow</h2>
5755 * <ol>
58- * <li>Client sets a value in its transport context (either via thread-local for sync or
59- * Reactor context for async)</li>
56+ * <li>Client sets a value in its transport context via thread-local Reactor context</li>
6057 * <li>Client-side context provider extracts the value and adds it as an HTTP header to
6158 * the request</li>
6259 * <li>Server-side context extractor reads the header from the incoming request</li>
6764 *
6865 * @author Daniel Garnier-Moiroux
6966 * @author Christian Tzolov
70- * @see McpTransportContext
71- * @see McpTransportContextExtractor
72- * @see WebFluxStatelessServerTransport
73- * @see WebFluxStreamableServerTransportProvider
74- * @see WebFluxSseServerTransportProvider
7567 */
7668@ Timeout (15 )
7769public class AsyncServerMcpTransportContextIntegrationTests {
7870
7971 private static final int PORT = TestUtil .findAvailablePort ();
8072
81- private static final ThreadLocal <String > SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER = new ThreadLocal <>();
82-
8373 private static final String HEADER_NAME = "x-test" ;
8474
85- // Sync client context provider
86- private final Supplier <McpTransportContext > syncClientContextProvider = () -> {
87- var headerValue = SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER .get ();
88- return headerValue != null ? McpTransportContext .create (Map .of ("client-side-header-value" , headerValue ))
89- : McpTransportContext .EMPTY ;
90- };
91-
9275 // Async client context provider
9376 ExchangeFilterFunction asyncClientContextProvider = (request , next ) -> Mono .deferContextual (ctx -> {
94- var context = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
77+ var transportContext = ctx .getOrDefault (McpTransportContext .KEY , McpTransportContext .EMPTY );
9578 // // do stuff with the context
96- var headerValue = context .get ("client-side-header-value" );
79+ var headerValue = transportContext .get ("client-side-header-value" );
9780 if (headerValue == null ) {
9881 return next .exchange (request );
9982 }
@@ -156,26 +139,10 @@ public class AsyncServerMcpTransportContextIntegrationTests {
156139 .build ())
157140 .build ();
158141
159- // Sync clients
160- private final McpSyncClient syncStreamableClient = McpClient
161- .sync (WebClientStreamableHttpTransport
162- .builder (WebClient .builder ().baseUrl ("http://localhost:" + PORT ).filter (asyncClientContextProvider ))
163- .build ())
164- .transportContextProvider (syncClientContextProvider )
165- .build ();
166-
167- private final McpSyncClient syncSseClient = McpClient
168- .sync (WebFluxSseClientTransport
169- .builder (WebClient .builder ().baseUrl ("http://localhost:" + PORT ).filter (asyncClientContextProvider ))
170- .build ())
171- .transportContextProvider (syncClientContextProvider )
172- .build ();
173-
174142 private DisposableServer httpServer ;
175143
176144 @ AfterEach
177145 public void after () {
178- SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER .remove ();
179146 if (statelessServerTransport != null ) {
180147 statelessServerTransport .closeGracefully ().block ();
181148 }
@@ -185,36 +152,15 @@ public void after() {
185152 if (sseServerTransport != null ) {
186153 sseServerTransport .closeGracefully ().block ();
187154 }
155+ if (asyncStreamableClient != null ) {
156+ asyncStreamableClient .closeGracefully ().block ();
157+ }
158+ if (asyncSseClient != null ) {
159+ asyncSseClient .closeGracefully ().block ();
160+ }
188161 stopHttpServer ();
189162 }
190163
191- @ Test
192- void syncClientStatelessServer () {
193-
194- startHttpServer (statelessServerTransport .getRouterFunction ());
195-
196- var mcpServer = McpServer .async (statelessServerTransport )
197- .capabilities (McpSchema .ServerCapabilities .builder ().tools (true ).build ())
198- .tools (new McpStatelessServerFeatures .AsyncToolSpecification (tool , asyncStatelessHandler ))
199- .build ();
200-
201- McpSchema .InitializeResult initResult = syncStreamableClient .initialize ();
202- assertThat (initResult ).isNotNull ();
203-
204- SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER .set ("some important value" );
205- McpSchema .CallToolResult response = syncStreamableClient
206- .callTool (new McpSchema .CallToolRequest ("test-tool" , Map .of ()));
207-
208- assertThat (response ).isNotNull ();
209- assertThat (response .content ()).hasSize (1 )
210- .first ()
211- .extracting (McpSchema .TextContent .class ::cast )
212- .extracting (McpSchema .TextContent ::text )
213- .isEqualTo ("some important value" );
214-
215- mcpServer .close ();
216- }
217-
218164 @ Test
219165 void asyncClientStatelessServer () {
220166
@@ -247,33 +193,6 @@ void asyncClientStatelessServer() {
247193 mcpServer .close ();
248194 }
249195
250- @ Test
251- void syncClientStreamableServer () {
252-
253- startHttpServer (streamableServerTransport .getRouterFunction ());
254-
255- var mcpServer = McpServer .async (streamableServerTransport )
256- .capabilities (McpSchema .ServerCapabilities .builder ().tools (true ).build ())
257- .tools (new McpServerFeatures .AsyncToolSpecification (tool , null , asyncStatefulHandler ))
258- .build ();
259-
260- McpSchema .InitializeResult initResult = syncStreamableClient .initialize ();
261- assertThat (initResult ).isNotNull ();
262-
263- SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER .set ("some important value" );
264- McpSchema .CallToolResult response = syncStreamableClient
265- .callTool (new McpSchema .CallToolRequest ("test-tool" , Map .of ()));
266-
267- assertThat (response ).isNotNull ();
268- assertThat (response .content ()).hasSize (1 )
269- .first ()
270- .extracting (McpSchema .TextContent .class ::cast )
271- .extracting (McpSchema .TextContent ::text )
272- .isEqualTo ("some important value" );
273-
274- mcpServer .close ();
275- }
276-
277196 @ Test
278197 void asyncClientStreamableServer () {
279198
@@ -306,33 +225,6 @@ void asyncClientStreamableServer() {
306225 mcpServer .close ();
307226 }
308227
309- @ Test
310- void syncClientSseServer () {
311-
312- startHttpServer (sseServerTransport .getRouterFunction ());
313-
314- var mcpServer = McpServer .async (sseServerTransport )
315- .capabilities (McpSchema .ServerCapabilities .builder ().tools (true ).build ())
316- .tools (new McpServerFeatures .AsyncToolSpecification (tool , null , asyncStatefulHandler ))
317- .build ();
318-
319- McpSchema .InitializeResult initResult = syncSseClient .initialize ();
320- assertThat (initResult ).isNotNull ();
321-
322- SYNC_CLIENT_SIDE_HEADER_VALUE_HOLDER .set ("some important value" );
323- McpSchema .CallToolResult response = syncSseClient
324- .callTool (new McpSchema .CallToolRequest ("test-tool" , Map .of ()));
325-
326- assertThat (response ).isNotNull ();
327- assertThat (response .content ()).hasSize (1 )
328- .first ()
329- .extracting (McpSchema .TextContent .class ::cast )
330- .extracting (McpSchema .TextContent ::text )
331- .isEqualTo ("some important value" );
332-
333- mcpServer .close ();
334- }
335-
336228 @ Test
337229 void asyncClientSseServer () {
338230
0 commit comments