1+ /*
2+ * Copyright 2025-2025 the original author or authors.
3+ */
4+
15package io .modelcontextprotocol .client .transport ;
26
37import java .io .IOException ;
2327
2428import io .modelcontextprotocol .spec .DefaultMcpTransportSession ;
2529import io .modelcontextprotocol .spec .DefaultMcpTransportStream ;
30+ import io .modelcontextprotocol .spec .HttpHeaders ;
2631import io .modelcontextprotocol .spec .McpClientTransport ;
2732import io .modelcontextprotocol .spec .McpError ;
2833import io .modelcontextprotocol .spec .McpSchema ;
34+ import io .modelcontextprotocol .spec .McpTransportException ;
2935import io .modelcontextprotocol .spec .McpTransportSession ;
3036import io .modelcontextprotocol .spec .McpTransportSessionNotFoundException ;
3137import io .modelcontextprotocol .spec .McpTransportStream ;
38+ import io .modelcontextprotocol .spec .ProtocolVersions ;
3239import io .modelcontextprotocol .util .Assert ;
40+ import io .modelcontextprotocol .util .Utils ;
3341import reactor .core .Disposable ;
3442import reactor .core .publisher .Flux ;
3543import reactor .core .publisher .Mono ;
6371 */
6472public class WebClientStreamableHttpTransport implements McpClientTransport {
6573
74+ private static final String MISSING_SESSION_ID = "[missing_session_id]" ;
75+
6676 private static final Logger logger = LoggerFactory .getLogger (WebClientStreamableHttpTransport .class );
6777
78+ private static final String MCP_PROTOCOL_VERSION = ProtocolVersions .MCP_2025_03_26 ;
79+
6880 private static final String DEFAULT_ENDPOINT = "/mcp" ;
6981
7082 /**
@@ -102,6 +114,11 @@ private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bu
102114 this .activeSession .set (createTransportSession ());
103115 }
104116
117+ @ Override
118+ public List <String > protocolVersions () {
119+ return List .of (ProtocolVersions .MCP_2024_11_05 , ProtocolVersions .MCP_2025_03_26 );
120+ }
121+
105122 /**
106123 * Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
107124 * instances.
@@ -127,12 +144,17 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
127144
128145 private DefaultMcpTransportSession createTransportSession () {
129146 Function <String , Publisher <Void >> onClose = sessionId -> sessionId == null ? Mono .empty ()
130- : webClient .delete ().uri (this .endpoint ).headers (httpHeaders -> {
131- httpHeaders .add ("mcp-session-id" , sessionId );
132- }).retrieve ().toBodilessEntity ().onErrorComplete (e -> {
133- logger .warn ("Got error when closing transport" , e );
134- return true ;
135- }).then ();
147+ : webClient .delete ()
148+ .uri (this .endpoint )
149+ .header (HttpHeaders .MCP_SESSION_ID , sessionId )
150+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
151+ .retrieve ()
152+ .toBodilessEntity ()
153+ .onErrorComplete (e -> {
154+ logger .warn ("Got error when closing transport" , e );
155+ return true ;
156+ })
157+ .then ();
136158 return new DefaultMcpTransportSession (onClose );
137159 }
138160
@@ -185,10 +207,11 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
185207 Disposable connection = webClient .get ()
186208 .uri (this .endpoint )
187209 .accept (MediaType .TEXT_EVENT_STREAM )
210+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
188211 .headers (httpHeaders -> {
189- transportSession .sessionId ().ifPresent (id -> httpHeaders .add ("mcp-session-id" , id ));
212+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders . MCP_SESSION_ID , id ));
190213 if (stream != null ) {
191- stream .lastId ().ifPresent (id -> httpHeaders .add ("last-event-id" , id ));
214+ stream .lastId ().ifPresent (id -> httpHeaders .add (HttpHeaders . LAST_EVENT_ID , id ));
192215 }
193216 })
194217 .exchangeToFlux (response -> {
@@ -201,8 +224,13 @@ else if (isNotAllowed(response)) {
201224 return Flux .empty ();
202225 }
203226 else if (isNotFound (response )) {
204- String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
205- return mcpSessionNotFoundError (sessionIdRepresentation );
227+ if (transportSession .sessionId ().isPresent ()) {
228+ String sessionIdRepresentation = sessionIdOrPlaceholder (transportSession );
229+ return mcpSessionNotFoundError (sessionIdRepresentation );
230+ }
231+ else {
232+ return this .extractError (response , MISSING_SESSION_ID );
233+ }
206234 }
207235 else {
208236 return response .<McpSchema .JSONRPCMessage >createError ().doOnError (e -> {
@@ -244,14 +272,15 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
244272
245273 Disposable connection = webClient .post ()
246274 .uri (this .endpoint )
247- .accept (MediaType .TEXT_EVENT_STREAM , MediaType .APPLICATION_JSON )
275+ .accept (MediaType .APPLICATION_JSON , MediaType .TEXT_EVENT_STREAM )
276+ .header (HttpHeaders .PROTOCOL_VERSION , MCP_PROTOCOL_VERSION )
248277 .headers (httpHeaders -> {
249- transportSession .sessionId ().ifPresent (id -> httpHeaders .add ("mcp-session-id" , id ));
278+ transportSession .sessionId ().ifPresent (id -> httpHeaders .add (HttpHeaders . MCP_SESSION_ID , id ));
250279 })
251280 .bodyValue (message )
252281 .exchangeToFlux (response -> {
253282 if (transportSession
254- .markInitialized (response .headers ().asHttpHeaders ().getFirst ("mcp-session-id" ))) {
283+ .markInitialized (response .headers ().asHttpHeaders ().getFirst (HttpHeaders . MCP_SESSION_ID ))) {
255284 // Once we have a session, we try to open an async stream for
256285 // the server to send notifications and requests out-of-band.
257286 reconnect (null ).contextWrite (sink .contextView ()).subscribe ();
@@ -287,7 +316,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
287316 logger .trace ("Received response to POST for session {}" , sessionRepresentation );
288317 // communicate to caller the message was delivered
289318 sink .success ();
290- return responseFlux ( response );
319+ return directResponseFlux ( message , response );
291320 }
292321 else {
293322 logger .warn ("Unknown media type {} returned for POST in session {}" , contentType ,
@@ -297,10 +326,10 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
297326 }
298327 }
299328 else {
300- if (isNotFound (response )) {
329+ if (isNotFound (response ) && ! sessionRepresentation . equals ( MISSING_SESSION_ID ) ) {
301330 return mcpSessionNotFoundError (sessionRepresentation );
302331 }
303- return extractError (response , sessionRepresentation );
332+ return this . extractError (response , sessionRepresentation );
304333 }
305334 })
306335 .flatMap (jsonRpcMessage -> this .handler .get ().apply (Mono .just (jsonRpcMessage )))
@@ -340,10 +369,11 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
340369 McpSchema .JSONRPCResponse jsonRpcResponse = objectMapper .readValue (body ,
341370 McpSchema .JSONRPCResponse .class );
342371 jsonRpcError = jsonRpcResponse .error ();
343- toPropagate = new McpError (jsonRpcError );
372+ toPropagate = jsonRpcError != null ? new McpError (jsonRpcError )
373+ : new McpTransportException ("Can't parse the jsonResponse " + jsonRpcResponse );
344374 }
345375 catch (IOException ex ) {
346- toPropagate = new RuntimeException ("Sending request failed" , e );
376+ toPropagate = new McpTransportException ("Sending request failed, " + e . getMessage () , e );
347377 logger .debug ("Received content together with {} HTTP code response: {}" , response .statusCode (), body );
348378 }
349379
@@ -352,7 +382,11 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
352382 // invalidate the session
353383 // https://github.com/modelcontextprotocol/typescript-sdk/issues/389
354384 if (responseException .getStatusCode ().isSameCodeAs (HttpStatus .BAD_REQUEST )) {
355- return Mono .error (new McpTransportSessionNotFoundException (sessionRepresentation , toPropagate ));
385+ if (!sessionRepresentation .equals (MISSING_SESSION_ID )) {
386+ return Mono .error (new McpTransportSessionNotFoundException (sessionRepresentation , toPropagate ));
387+ }
388+ return Mono .error (new McpTransportException ("Received 400 BAD REQUEST for session "
389+ + sessionRepresentation + ". " + toPropagate .getMessage (), toPropagate ));
356390 }
357391 return Mono .error (toPropagate );
358392 }).flux ();
@@ -381,18 +415,25 @@ private static boolean isEventStream(ClientResponse response) {
381415 }
382416
383417 private static String sessionIdOrPlaceholder (McpTransportSession <?> transportSession ) {
384- return transportSession .sessionId ().orElse ("[missing_session_id]" );
418+ return transportSession .sessionId ().orElse (MISSING_SESSION_ID );
385419 }
386420
387- private Flux <McpSchema .JSONRPCMessage > responseFlux (ClientResponse response ) {
421+ private Flux <McpSchema .JSONRPCMessage > directResponseFlux (McpSchema .JSONRPCMessage sentMessage ,
422+ ClientResponse response ) {
388423 return response .bodyToMono (String .class ).<Iterable <McpSchema .JSONRPCMessage >>handle ((responseMessage , s ) -> {
389424 try {
390- McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema .deserializeJsonRpcMessage (objectMapper ,
391- responseMessage );
392- s .next (List .of (jsonRpcResponse ));
425+ if (sentMessage instanceof McpSchema .JSONRPCNotification && Utils .hasText (responseMessage )) {
426+ logger .warn ("Notification: {} received non-compliant response: {}" , sentMessage , responseMessage );
427+ s .complete ();
428+ }
429+ else {
430+ McpSchema .JSONRPCMessage jsonRpcResponse = McpSchema .deserializeJsonRpcMessage (objectMapper ,
431+ responseMessage );
432+ s .next (List .of (jsonRpcResponse ));
433+ }
393434 }
394435 catch (IOException e ) {
395- s .error (e );
436+ s .error (new McpTransportException ( e ) );
396437 }
397438 }).flatMapIterable (Function .identity ());
398439 }
@@ -419,11 +460,12 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
419460 return Tuples .of (Optional .ofNullable (event .id ()), List .of (message ));
420461 }
421462 catch (IOException ioException ) {
422- throw new McpError ("Error parsing JSON-RPC message: " + event .data ());
463+ throw new McpTransportException ("Error parsing JSON-RPC message: " + event .data (), ioException );
423464 }
424465 }
425466 else {
426- throw new McpError ("Received unrecognized SSE event type: " + event .event ());
467+ logger .debug ("Received SSE event with type: {}" , event );
468+ return Tuples .of (Optional .empty (), List .of ());
427469 }
428470 }
429471
0 commit comments