Skip to content

Commit 10ea1e2

Browse files
committed
Add javadoc, clean up
1 parent 4b034f3 commit 10ea1e2

File tree

8 files changed

+144
-14
lines changed

8 files changed

+144
-14
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,31 @@
3535
import java.util.function.Function;
3636
import java.util.function.Supplier;
3737

38+
/**
39+
* An implementation of the Streamable HTTP protocol as defined by the
40+
* <code>2025-03-26</code> version of the MCP specification.
41+
*
42+
* <p>
43+
* The transport is capable of resumability and reconnects. It reacts to transport-level
44+
* session invalidation and will propagate {@link McpTransportSessionNotFoundException
45+
* appropriate exceptions} to the higher level abstraction layer when needed in order to
46+
* allow proper state management. The implementation handles servers that are stateful and
47+
* provide session meta information, but can also communicate with stateless servers that
48+
* do not provide a session identifier and do not support SSE streams.
49+
* </p>
50+
* <p>
51+
* This implementation does not handle backwards compatibility with the <a href=
52+
* "https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">"HTTP
53+
* with SSE" transport</a>. In order to communicate over the phased-out
54+
* <code>2024-11-05</code> protocol, use {@link HttpClientSseClientTransport} or
55+
* {@link WebFluxSseClientTransport}.
56+
* </p>
57+
*
58+
* @author Dariusz Jędrzejczyk
59+
* @see <a href=
60+
* "https://modelcontextprotocol.io/specification/2025-03-26/basic/transports#streamable-http">Streamable
61+
* HTTP transport specification</a>
62+
*/
3863
public class WebClientStreamableHttpTransport implements McpClientTransport {
3964

4065
private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);
@@ -47,7 +72,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
4772
*/
4873
private static final String MESSAGE_EVENT_TYPE = "message";
4974

50-
public static final ParameterizedTypeReference<ServerSentEvent<String>> PARAMETERIZED_TYPE_REF = new ParameterizedTypeReference<>() {
75+
private static final ParameterizedTypeReference<ServerSentEvent<String>> PARAMETERIZED_TYPE_REF = new ParameterizedTypeReference<>() {
5176
};
5277

5378
private final ObjectMapper objectMapper;
@@ -66,7 +91,6 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
6691

6792
private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();
6893

69-
// TODO: builder
7094
private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
7195
String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
7296
this.objectMapper = objectMapper;
@@ -77,6 +101,13 @@ private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bu
77101
this.activeSession.set(createTransportSession());
78102
}
79103

104+
/**
105+
* Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
106+
* instances.
107+
* @param webClientBuilder the {@link WebClient.Builder} to use
108+
* @return a builder which will create an instance of
109+
* {@link WebClientStreamableHttpTransport} once {@link Builder#build()} is called
110+
*/
80111
public static Builder builder(WebClient.Builder webClientBuilder) {
81112
return new Builder(webClientBuilder);
82113
}
@@ -392,6 +423,9 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
392423
}
393424
}
394425

426+
/**
427+
* Builder for {@link WebClientStreamableHttpTransport}.
428+
*/
395429
public static class Builder {
396430

397431
private ObjectMapper objectMapper;
@@ -409,34 +443,71 @@ private Builder(WebClient.Builder webClientBuilder) {
409443
this.webClientBuilder = webClientBuilder;
410444
}
411445

446+
/**
447+
* Configure the {@link ObjectMapper} to use.
448+
* @param objectMapper instance to use
449+
* @return the builder instance
450+
*/
412451
public Builder objectMapper(ObjectMapper objectMapper) {
413452
Assert.notNull(objectMapper, "ObjectMapper must not be null");
414453
this.objectMapper = objectMapper;
415454
return this;
416455
}
417456

457+
/**
458+
* Configure the {@link WebClient.Builder} to construct the {@link WebClient}.
459+
* @param webClientBuilder instance to use
460+
* @return the builder instance
461+
*/
418462
public Builder webClientBuilder(WebClient.Builder webClientBuilder) {
419463
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
420464
this.webClientBuilder = webClientBuilder;
421465
return this;
422466
}
423467

468+
/**
469+
* Configure the endpoint to make HTTP requests against.
470+
* @param endpoint endpoint to use
471+
* @return the builder instance
472+
*/
424473
public Builder endpoint(String endpoint) {
425474
Assert.hasText(endpoint, "endpoint must be a non-empty String");
426475
this.endpoint = endpoint;
427476
return this;
428477
}
429478

479+
/**
480+
* Configure whether to use the stream resumability feature by keeping track of
481+
* SSE event ids.
482+
* @param resumableStreams if {@code true} event ids will be tracked and upon
483+
* disconnection, the last seen id will be used upon reconnection as a header to
484+
* resume consuming messages.
485+
* @return the builder instance
486+
*/
430487
public Builder resumableStreams(boolean resumableStreams) {
431488
this.resumableStreams = resumableStreams;
432489
return this;
433490
}
434491

492+
/**
493+
* Configure whether the client should open an SSE connection upon startup. Not
494+
* all servers support this (although it is in theory possible with the current
495+
* specification), so use with caution. By default, this value is {@code false}.
496+
* @param openConnectionOnStartup if {@code true} the {@link #connect(Function)}
497+
* method call will try to open an SSE connection before sending any JSON-RPC
498+
* request
499+
* @return the builder instance
500+
*/
435501
public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
436502
this.openConnectionOnStartup = openConnectionOnStartup;
437503
return this;
438504
}
439505

506+
/**
507+
* Construct a fresh instance of {@link WebClientStreamableHttpTransport} using
508+
* the current builder configuration.
509+
* @return a new instance of {@link WebClientStreamableHttpTransport}
510+
*/
440511
public WebClientStreamableHttpTransport build() {
441512
ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
442513

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import eu.rekawek.toxiproxy.model.ToxicDirection;
66
import io.modelcontextprotocol.spec.McpClientTransport;
77
import io.modelcontextprotocol.spec.McpSchema;
8+
import io.modelcontextprotocol.spec.McpTransport;
89
import org.junit.jupiter.api.Test;
910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
@@ -24,6 +25,16 @@
2425

2526
import static org.assertj.core.api.Assertions.assertThatCode;
2627

28+
/**
29+
* Resiliency test suite for the {@link McpAsyncClient} that can be used with different
30+
* {@link McpTransport} implementations that support Streamable HTTP.
31+
*
32+
* The purpose of these tests is to allow validating the transport layer resiliency
33+
* instead of the functionality offered by the logical layer of MCP concepts such as
34+
* tools, resources, prompts, etc.
35+
*
36+
* @author Dariusz Jędrzejczyk
37+
*/
2738
public abstract class AbstractMcpAsyncClientResiliencyTests {
2839

2940
private static final Logger logger = LoggerFactory.getLogger(AbstractMcpAsyncClientResiliencyTests.class);

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
* @see McpClient
7676
* @see McpSchema
7777
* @see McpClientSession
78+
* @see McpClientTransport
7879
*/
7980
public class McpAsyncClient {
8081

mcp/src/main/java/io/modelcontextprotocol/client/transport/StdioClientTransport.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -396,15 +396,4 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
396396
return this.objectMapper.convertValue(data, typeRef);
397397
}
398398

399-
private static void measureTime(Runnable op, String opName) {
400-
long start = System.nanoTime();
401-
try {
402-
op.run();
403-
}
404-
finally {
405-
long delta = System.nanoTime() - start;
406-
logger.info("{} took {}ms", opName, Duration.ofNanos(delta).toMillis());
407-
}
408-
}
409-
410399
}

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,13 @@
1212
import java.util.concurrent.atomic.AtomicReference;
1313
import java.util.function.Supplier;
1414

15+
/**
16+
* Default implementation of {@link McpTransportSession} which manages the open
17+
* connections using tye {@link Disposable} type and allows to perform clean up using the
18+
* {@link Disposable#dispose()} method.
19+
*
20+
* @author Dariusz Jędrzejczyk
21+
*/
1522
public class DefaultMcpTransportSession implements McpTransportSession<Disposable> {
1623

1724
private static final Logger logger = LoggerFactory.getLogger(DefaultMcpTransportSession.class);

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportStream.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@
1212
import java.util.concurrent.atomic.AtomicReference;
1313
import java.util.function.Function;
1414

15+
/**
16+
* An implementation of {@link McpTransportStream} using Project Reactor types.
17+
*
18+
* @param <CONNECTION> the resource serving the stream
19+
* @author Dariusz Jędrzejczyk
20+
*/
1521
public class DefaultMcpTransportStream<CONNECTION> implements McpTransportStream<CONNECTION> {
1622

1723
private static final Logger logger = LoggerFactory.getLogger(DefaultMcpTransportStream.class);
@@ -27,6 +33,14 @@ public class DefaultMcpTransportStream<CONNECTION> implements McpTransportStream
2733

2834
private final Function<McpTransportStream<CONNECTION>, Publisher<CONNECTION>> reconnect;
2935

36+
/**
37+
* Constructs a new instance representing a particular stream that can resume using
38+
* the provided reconnect mechanism.
39+
* @param resumable whether the stream is resumable and should try to reconnect
40+
* @param reconnect the mechanism to use in case an error is observed on the current
41+
* event stream to asynchronously kick off a resumed stream consumption, potentially
42+
* using the stored {@link #lastId()}.
43+
*/
3044
public DefaultMcpTransportStream(boolean resumable,
3145
Function<McpTransportStream<CONNECTION>, Publisher<CONNECTION>> reconnect) {
3246
this.reconnect = reconnect;

mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionNotFoundException.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,20 @@
88
*/
99
public class McpTransportSessionNotFoundException extends RuntimeException {
1010

11+
/**
12+
* Construct an instance with a known {@link Exception cause}.
13+
* @param sessionId transport session identifier
14+
* @param cause the cause that was identified as a session not found error
15+
*/
1116
public McpTransportSessionNotFoundException(String sessionId, Exception cause) {
1217
super("Session " + sessionId + " not found on the server", cause);
13-
1418
}
1519

20+
/**
21+
* Construct an instance with the session identifier but without a {@link Exception
22+
* cause}.
23+
* @param sessionId transport session identifier
24+
*/
1625
public McpTransportSessionNotFoundException(String sessionId) {
1726
super("Session " + sessionId + " not found on the server");
1827
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportStream.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,40 @@
55

66
import java.util.Optional;
77

8+
/**
9+
* A representation of a stream at the transport layer of the MCP protocol. In particular,
10+
* it is currently used in the Streamable HTTP implementation to potentially be able to
11+
* resume a broken connection from where it left off by optionally keeping track of
12+
* attached SSE event ids.
13+
*
14+
* @param <CONNECTION> the resource on which the stream is being served and consumed via
15+
* this mechanism
16+
* @author Dariusz Jędrzejczyk
17+
*/
818
public interface McpTransportStream<CONNECTION> {
919

20+
/**
21+
* The last observed event identifier.
22+
* @return if not empty, contains the most recent event that was consumed
23+
*/
1024
Optional<String> lastId();
1125

26+
/**
27+
* An internal stream identifier used to distinguish streams while debugging.
28+
* @return a {@code long} stream identifier value
29+
*/
1230
long streamId();
1331

32+
/**
33+
* Allows keeping track of the transport stream of events (currently an SSE stream
34+
* from Streamable HTTP specification) and enable resumability and reconnects in case
35+
* of stream errors.
36+
* @param eventStream a {@link Publisher} of tuples (pairs) of an optional identifier
37+
* associated with a collection of messages
38+
* @return a flattened {@link Publisher} of
39+
* {@link io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage JSON-RPC messages}
40+
* with the identifier stripped away
41+
*/
1442
Publisher<McpSchema.JSONRPCMessage> consumeSseStream(
1543
Publisher<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> eventStream);
1644

0 commit comments

Comments
 (0)