Skip to content

Commit ef541a1

Browse files
committed
Rename WebFluxSseClientTransport to WebClientSseClientTransport
Currently, we have HTTP `McpClientTransport` implementations: - HttpClientSseClientTransport - HttpClientStreamableHttpTransport - WebFluxSseClientTransport - WebClientStreamableHttpTransport WebFlux is specific for the server side, `WebFluxSseClientTransport` should align with `WebClientStreamableHttpTransport`. Deprecate `WebFluxSseClientTransport` for backward compatability, it should be removed in the future. Signed-off-by: Yanming Zhou <zhouyanming@gmail.com>
1 parent 5e035ea commit ef541a1

File tree

13 files changed

+1004
-357
lines changed

13 files changed

+1004
-357
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,362 @@
1+
/*
2+
* Copyright 2024 - 2024 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import com.fasterxml.jackson.core.type.TypeReference;
8+
import com.fasterxml.jackson.databind.ObjectMapper;
9+
import io.modelcontextprotocol.spec.*;
10+
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
11+
import io.modelcontextprotocol.util.Assert;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.springframework.core.ParameterizedTypeReference;
15+
import org.springframework.http.MediaType;
16+
import org.springframework.http.codec.ServerSentEvent;
17+
import org.springframework.web.reactive.function.client.WebClient;
18+
import reactor.core.Disposable;
19+
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
21+
import reactor.core.publisher.Sinks;
22+
import reactor.core.publisher.SynchronousSink;
23+
import reactor.core.scheduler.Schedulers;
24+
import reactor.util.retry.Retry;
25+
import reactor.util.retry.Retry.RetrySignal;
26+
27+
import java.io.IOException;
28+
import java.util.List;
29+
import java.util.function.BiConsumer;
30+
import java.util.function.Function;
31+
32+
/**
33+
* Server-Sent Events (SSE) implementation of the {@link McpTransport} that follows the
34+
* MCP HTTP with SSE transport specification.
35+
*
36+
* <p>
37+
* This transport establishes a bidirectional communication channel where:
38+
* <ul>
39+
* <li>Inbound messages are received through an SSE connection from the server</li>
40+
* <li>Outbound messages are sent via HTTP POST requests to a server-provided
41+
* endpoint</li>
42+
* </ul>
43+
*
44+
* <p>
45+
* The message flow follows these steps:
46+
* <ol>
47+
* <li>The client establishes an SSE connection to the server's /sse endpoint</li>
48+
* <li>The server sends an 'endpoint' event containing the URI for sending messages</li>
49+
* </ol>
50+
*
51+
* This implementation uses {@link WebClient} for HTTP communications and supports JSON
52+
* serialization/deserialization of messages.
53+
*
54+
* NOTE: This class is temporarily used by deprecated {@link WebFluxSseClientTransport},
55+
* it should be merged into {@link WebClientSseClientTransport} once
56+
* {@link WebFluxSseClientTransport} is removed.
57+
*
58+
* @author Christian Tzolov
59+
* @author Yanming Zhou
60+
* @see <a href=
61+
* "https://spec.modelcontextprotocol.io/specification/basic/transports/#http-with-sse">MCP
62+
* HTTP with SSE Transport Specification</a>
63+
*/
64+
abstract class InternalWebClientSseClientTransport implements McpClientTransport {
65+
66+
private final Logger logger = LoggerFactory.getLogger(getClass());
67+
68+
private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05;
69+
70+
/**
71+
* Event type for JSON-RPC messages received through the SSE connection. The server
72+
* sends messages with this event type to transmit JSON-RPC protocol data.
73+
*/
74+
private static final String MESSAGE_EVENT_TYPE = "message";
75+
76+
/**
77+
* Event type for receiving the message endpoint URI from the server. The server MUST
78+
* send this event when a client connects, providing the URI where the client should
79+
* send its messages via HTTP POST.
80+
*/
81+
private static final String ENDPOINT_EVENT_TYPE = "endpoint";
82+
83+
/**
84+
* Default SSE endpoint path as specified by the MCP transport specification. This
85+
* endpoint is used to establish the SSE connection with the server.
86+
*/
87+
static final String DEFAULT_SSE_ENDPOINT = "/sse";
88+
89+
/**
90+
* Type reference for parsing SSE events containing string data.
91+
*/
92+
private static final ParameterizedTypeReference<ServerSentEvent<String>> SSE_TYPE = new ParameterizedTypeReference<>() {
93+
};
94+
95+
/**
96+
* WebClient instance for handling both SSE connections and HTTP POST requests. Used
97+
* for establishing the SSE connection and sending outbound messages.
98+
*/
99+
private final WebClient webClient;
100+
101+
/**
102+
* ObjectMapper for serializing outbound messages and deserializing inbound messages.
103+
* Handles conversion between JSON-RPC messages and their string representation.
104+
*/
105+
protected ObjectMapper objectMapper;
106+
107+
/**
108+
* Subscription for the SSE connection handling inbound messages. Used for cleanup
109+
* during transport shutdown.
110+
*/
111+
private Disposable inboundSubscription;
112+
113+
/**
114+
* Flag indicating if the transport is in the process of shutting down. Used to
115+
* prevent new operations during shutdown and handle cleanup gracefully.
116+
*/
117+
private volatile boolean isClosing = false;
118+
119+
/**
120+
* Sink for managing the message endpoint URI provided by the server. Stores the most
121+
* recent endpoint URI and makes it available for outbound message processing.
122+
*/
123+
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
124+
125+
/**
126+
* The SSE endpoint URI provided by the server. Used for sending outbound messages via
127+
* HTTP POST requests.
128+
*/
129+
private String sseEndpoint;
130+
131+
/**
132+
* Constructs a new SseClientTransport with the specified WebClient builder. Uses a
133+
* default ObjectMapper instance for JSON processing.
134+
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
135+
* instance
136+
* @throws IllegalArgumentException if webClientBuilder is null
137+
*/
138+
public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder) {
139+
this(webClientBuilder, new ObjectMapper());
140+
}
141+
142+
/**
143+
* Constructs a new SseClientTransport with the specified WebClient builder and
144+
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
145+
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
146+
* instance
147+
* @param objectMapper the ObjectMapper to use for JSON processing
148+
* @throws IllegalArgumentException if either parameter is null
149+
*/
150+
public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper) {
151+
this(webClientBuilder, objectMapper, DEFAULT_SSE_ENDPOINT);
152+
}
153+
154+
/**
155+
* Constructs a new SseClientTransport with the specified WebClient builder and
156+
* ObjectMapper. Initializes both inbound and outbound message processing pipelines.
157+
* @param webClientBuilder the WebClient.Builder to use for creating the WebClient
158+
* instance
159+
* @param objectMapper the ObjectMapper to use for JSON processing
160+
* @param sseEndpoint the SSE endpoint URI to use for establishing the connection
161+
* @throws IllegalArgumentException if either parameter is null
162+
*/
163+
public InternalWebClientSseClientTransport(WebClient.Builder webClientBuilder, ObjectMapper objectMapper,
164+
String sseEndpoint) {
165+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
166+
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
167+
Assert.hasText(sseEndpoint, "SSE endpoint must not be null or empty");
168+
169+
this.objectMapper = objectMapper;
170+
this.webClient = webClientBuilder.build();
171+
this.sseEndpoint = sseEndpoint;
172+
}
173+
174+
@Override
175+
public List<String> protocolVersions() {
176+
return List.of(MCP_PROTOCOL_VERSION);
177+
}
178+
179+
/**
180+
* Establishes a connection to the MCP server using Server-Sent Events (SSE). This
181+
* method initiates the SSE connection and sets up the message processing pipeline.
182+
*
183+
* <p>
184+
* The connection process follows these steps:
185+
* <ol>
186+
* <li>Establishes an SSE connection to the server's /sse endpoint</li>
187+
* <li>Waits for the server to send an 'endpoint' event with the message posting
188+
* URI</li>
189+
* <li>Sets up message handling for incoming JSON-RPC messages</li>
190+
* </ol>
191+
*
192+
* <p>
193+
* The connection is considered established only after receiving the endpoint event
194+
* from the server.
195+
* @param handler a function that processes incoming JSON-RPC messages and returns
196+
* responses
197+
* @return a Mono that completes when the connection is fully established
198+
* @throws McpError if there's an error processing SSE events or if an unrecognized
199+
* event type is received
200+
*/
201+
@Override
202+
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
203+
// TODO: Avoid eager connection opening and enable resilience
204+
// -> upon disconnects, re-establish connection
205+
// -> allow optimizing for eager connection start using a constructor flag
206+
Flux<ServerSentEvent<String>> events = eventStream();
207+
this.inboundSubscription = events.concatMap(event -> Mono.just(event).<JSONRPCMessage>handle((e, s) -> {
208+
if (ENDPOINT_EVENT_TYPE.equals(event.event())) {
209+
String messageEndpointUri = event.data();
210+
if (messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
211+
s.complete();
212+
}
213+
else {
214+
// TODO: clarify with the spec if multiple events can be
215+
// received
216+
s.error(new McpError("Failed to handle SSE endpoint event"));
217+
}
218+
}
219+
else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
220+
try {
221+
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
222+
s.next(message);
223+
}
224+
catch (IOException ioException) {
225+
s.error(ioException);
226+
}
227+
}
228+
else {
229+
logger.debug("Received unrecognized SSE event type: {}", event);
230+
s.complete();
231+
}
232+
}).transform(handler)).subscribe();
233+
234+
// The connection is established once the server sends the endpoint event
235+
return messageEndpointSink.asMono().then();
236+
}
237+
238+
/**
239+
* Sends a JSON-RPC message to the server using the endpoint provided during
240+
* connection.
241+
*
242+
* <p>
243+
* Messages are sent via HTTP POST requests to the server-provided endpoint URI. The
244+
* message is serialized to JSON before transmission. If the transport is in the
245+
* process of closing, the message send operation is skipped gracefully.
246+
* @param message the JSON-RPC message to send
247+
* @return a Mono that completes when the message has been sent successfully
248+
* @throws RuntimeException if message serialization fails
249+
*/
250+
@Override
251+
public Mono<Void> sendMessage(JSONRPCMessage message) {
252+
// The messageEndpoint is the endpoint URI to send the messages
253+
// It is provided by the server as part of the endpoint event
254+
return messageEndpointSink.asMono().flatMap(messageEndpointUri -> {
255+
if (isClosing) {
256+
return Mono.empty();
257+
}
258+
try {
259+
String jsonText = this.objectMapper.writeValueAsString(message);
260+
return webClient.post()
261+
.uri(messageEndpointUri)
262+
.contentType(MediaType.APPLICATION_JSON)
263+
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
264+
.bodyValue(jsonText)
265+
.retrieve()
266+
.toBodilessEntity()
267+
.doOnSuccess(response -> {
268+
logger.debug("Message sent successfully");
269+
})
270+
.doOnError(error -> {
271+
if (!isClosing) {
272+
logger.error("Error sending message: {}", error.getMessage());
273+
}
274+
});
275+
}
276+
catch (IOException e) {
277+
if (!isClosing) {
278+
return Mono.error(new RuntimeException("Failed to serialize message", e));
279+
}
280+
return Mono.empty();
281+
}
282+
}).then(); // TODO: Consider non-200-ok response
283+
}
284+
285+
/**
286+
* Initializes and starts the inbound SSE event processing. Establishes the SSE
287+
* connection and sets up event handling for both message and endpoint events.
288+
* Includes automatic retry logic for handling transient connection failures.
289+
*/
290+
// visible for tests
291+
protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
292+
return this.webClient
293+
.get()
294+
.uri(this.sseEndpoint)
295+
.accept(MediaType.TEXT_EVENT_STREAM)
296+
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
297+
.retrieve()
298+
.bodyToFlux(SSE_TYPE)
299+
.retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler)));
300+
} // @formatter:on
301+
302+
/**
303+
* Retry handler for the inbound SSE stream. Implements the retry logic for handling
304+
* connection failures and other errors.
305+
*/
306+
private BiConsumer<RetrySignal, SynchronousSink<Object>> inboundRetryHandler = (retrySpec, sink) -> {
307+
if (isClosing) {
308+
logger.debug("SSE connection closed during shutdown");
309+
sink.error(retrySpec.failure());
310+
return;
311+
}
312+
if (retrySpec.failure() instanceof IOException) {
313+
logger.debug("Retrying SSE connection after IO error");
314+
sink.next(retrySpec);
315+
return;
316+
}
317+
logger.error("Fatal SSE error, not retrying: {}", retrySpec.failure().getMessage());
318+
sink.error(retrySpec.failure());
319+
};
320+
321+
/**
322+
* Implements graceful shutdown of the transport. Cleans up all resources including
323+
* subscriptions and schedulers. Ensures orderly shutdown of both inbound and outbound
324+
* message processing.
325+
* @return a Mono that completes when shutdown is finished
326+
*/
327+
@Override
328+
public Mono<Void> closeGracefully() { // @formatter:off
329+
return Mono.fromRunnable(() -> {
330+
isClosing = true;
331+
332+
// Dispose of subscriptions
333+
334+
if (inboundSubscription != null) {
335+
inboundSubscription.dispose();
336+
}
337+
338+
})
339+
.then()
340+
.subscribeOn(Schedulers.boundedElastic());
341+
} // @formatter:on
342+
343+
/**
344+
* Unmarshalls data from a generic Object into the specified type using the configured
345+
* ObjectMapper.
346+
*
347+
* <p>
348+
* This method is particularly useful when working with JSON-RPC parameters or result
349+
* objects that need to be converted to specific Java types. It leverages Jackson's
350+
* type conversion capabilities to handle complex object structures.
351+
* @param <T> the target type to convert the data into
352+
* @param data the source object to convert
353+
* @param typeRef the TypeReference describing the target type
354+
* @return the unmarshalled object of type T
355+
* @throws IllegalArgumentException if the conversion cannot be performed
356+
*/
357+
@Override
358+
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
359+
return this.objectMapper.convertValue(data, typeRef);
360+
}
361+
362+
}

0 commit comments

Comments
 (0)