Skip to content

Commit 266cee4

Browse files
committed
client transport support connection closed event
1 parent 2ee5853 commit 266cee4

File tree

5 files changed

+268
-55
lines changed

5 files changed

+268
-55
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 42 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.function.Consumer;
1717
import java.util.function.Function;
1818

19+
import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport.Builder;
20+
import io.modelcontextprotocol.client.transport.ResponseSubscribers.SseResponseEvent;
1921
import org.slf4j.Logger;
2022
import org.slf4j.LoggerFactory;
2123
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
@@ -116,6 +118,8 @@ public class HttpClientSseClientTransport implements McpClientTransport {
116118
*/
117119
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
118120

121+
private final AtomicReference<Consumer<Void>> connectionClosedHandler = new AtomicReference<>();
122+
119123
/**
120124
* Creates a new transport instance with custom HTTP client builder, object mapper,
121125
* and headers.
@@ -129,7 +133,8 @@ public class HttpClientSseClientTransport implements McpClientTransport {
129133
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
130134
*/
131135
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
132-
String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
136+
String sseEndpoint, McpJsonMapper jsonMapper, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
137+
Consumer<Void> connectionClosedHandler) {
133138
Assert.notNull(jsonMapper, "jsonMapper must not be null");
134139
Assert.hasText(baseUri, "baseUri must not be empty");
135140
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
@@ -142,13 +147,28 @@ public class HttpClientSseClientTransport implements McpClientTransport {
142147
this.httpClient = httpClient;
143148
this.requestBuilder = requestBuilder;
144149
this.httpRequestCustomizer = httpRequestCustomizer;
150+
this.connectionClosedHandler.set(connectionClosedHandler);
145151
}
146152

147153
@Override
148154
public List<String> protocolVersions() {
149155
return List.of(ProtocolVersions.MCP_2024_11_05);
150156
}
151157

158+
@Override
159+
public void setConnectionClosedHandler(Consumer<Void> closedHandler) {
160+
logger.debug("Connection closed handler registered");
161+
connectionClosedHandler.set(closedHandler);
162+
}
163+
164+
private void handleConnectionClosed() {
165+
logger.debug("Handling connection closed");
166+
Consumer<Void> handler = this.connectionClosedHandler.get();
167+
if (handler != null) {
168+
handler.accept(null);
169+
}
170+
}
171+
152172
/**
153173
* Creates a new builder for {@link HttpClientSseClientTransport}.
154174
* @param baseUri the base URI of the MCP server
@@ -177,6 +197,8 @@ public static class Builder {
177197

178198
private Duration connectTimeout = Duration.ofSeconds(10);
179199

200+
private Consumer<Void> connectionClosedHandler = null;
201+
180202
/**
181203
* Creates a new builder instance.
182204
*/
@@ -320,14 +342,26 @@ public Builder connectTimeout(Duration connectTimeout) {
320342
return this;
321343
}
322344

345+
/**
346+
* Set the connection closed handler.
347+
* @param connectionClosedHandler the connection closed handler
348+
* @return this builder
349+
*/
350+
public Builder connectionClosedHandler(Consumer<Void> connectionClosedHandler) {
351+
Assert.notNull(connectionClosedHandler, "connectionClosedHandler must not be null");
352+
this.connectionClosedHandler = connectionClosedHandler;
353+
return this;
354+
}
355+
323356
/**
324357
* Builds a new {@link HttpClientSseClientTransport} instance.
325358
* @return a new transport instance
326359
*/
327360
public HttpClientSseClientTransport build() {
328361
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
329362
return new HttpClientSseClientTransport(httpClient, requestBuilder, baseUri, sseEndpoint,
330-
jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer);
363+
jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper, httpRequestCustomizer,
364+
connectionClosedHandler);
331365
}
332366

333367
}
@@ -352,9 +386,7 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
352386
.exceptionallyCompose(e -> {
353387
sseSink.error(e);
354388
return CompletableFuture.failedFuture(e);
355-
}))
356-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
357-
.flatMap(responseEvent -> {
389+
})).map(responseEvent -> (SseResponseEvent) responseEvent).flatMap(responseEvent -> {
358390
if (isClosing) {
359391
return Mono.empty();
360392
}
@@ -388,26 +420,21 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
388420
sink.error(new McpTransportException("Error processing SSE event", e));
389421
}
390422
}
391-
return Flux.<McpSchema.JSONRPCMessage>error(
392-
new RuntimeException("Failed to send message: " + responseEvent));
423+
return Flux.<JSONRPCMessage>error(new RuntimeException("Failed to send message: " + responseEvent));
393424

394-
})
395-
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
396-
.onErrorComplete(t -> {
425+
}).flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage))).onErrorComplete(t -> {
397426
if (!isClosing) {
398427
logger.warn("SSE stream observed an error", t);
399428
sink.error(t);
400429
}
401430
return true;
402-
})
403-
.doFinally(s -> {
431+
}).doFinally(s -> {
404432
Disposable ref = this.sseSubscription.getAndSet(null);
405433
if (ref != null && !ref.isDisposed()) {
406434
ref.dispose();
407435
}
408-
})
409-
.contextWrite(sink.contextView())
410-
.subscribe();
436+
handleConnectionClosed();
437+
}).contextWrite(sink.contextView()).subscribe();
411438

412439
this.sseSubscription.set(connection);
413440
}));

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 57 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -4,31 +4,12 @@
44

55
package io.modelcontextprotocol.client.transport;
66

7-
import java.io.IOException;
8-
import java.net.URI;
9-
import java.net.http.HttpClient;
10-
import java.net.http.HttpRequest;
11-
import java.net.http.HttpResponse;
12-
import java.net.http.HttpResponse.BodyHandler;
13-
import java.time.Duration;
14-
import java.util.List;
15-
import java.util.Optional;
16-
import java.util.concurrent.CompletionException;
17-
import java.util.concurrent.atomic.AtomicReference;
18-
import java.util.function.Consumer;
19-
import java.util.function.Function;
20-
21-
import org.reactivestreams.Publisher;
22-
import org.slf4j.Logger;
23-
import org.slf4j.LoggerFactory;
24-
25-
import io.modelcontextprotocol.json.TypeRef;
26-
import io.modelcontextprotocol.json.McpJsonMapper;
27-
7+
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
288
import io.modelcontextprotocol.client.transport.customizer.McpAsyncHttpClientRequestCustomizer;
299
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
30-
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
3110
import io.modelcontextprotocol.common.McpTransportContext;
11+
import io.modelcontextprotocol.json.McpJsonMapper;
12+
import io.modelcontextprotocol.json.TypeRef;
3213
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
3314
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
3415
import io.modelcontextprotocol.spec.HttpHeaders;
@@ -41,13 +22,30 @@
4122
import io.modelcontextprotocol.spec.ProtocolVersions;
4223
import io.modelcontextprotocol.util.Assert;
4324
import io.modelcontextprotocol.util.Utils;
25+
import org.reactivestreams.Publisher;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
4428
import reactor.core.Disposable;
4529
import reactor.core.publisher.Flux;
4630
import reactor.core.publisher.FluxSink;
4731
import reactor.core.publisher.Mono;
4832
import reactor.util.function.Tuple2;
4933
import reactor.util.function.Tuples;
5034

35+
import java.io.IOException;
36+
import java.net.URI;
37+
import java.net.http.HttpClient;
38+
import java.net.http.HttpRequest;
39+
import java.net.http.HttpResponse;
40+
import java.net.http.HttpResponse.BodyHandler;
41+
import java.time.Duration;
42+
import java.util.List;
43+
import java.util.Optional;
44+
import java.util.concurrent.CompletionException;
45+
import java.util.concurrent.atomic.AtomicReference;
46+
import java.util.function.Consumer;
47+
import java.util.function.Function;
48+
5149
/**
5250
* An implementation of the Streamable HTTP protocol as defined by the
5351
* <code>2025-03-26</code> version of the MCP specification.
@@ -87,7 +85,9 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
8785
*/
8886
private final HttpClient httpClient;
8987

90-
/** HTTP request builder for building requests to send messages to the server */
88+
/**
89+
* HTTP request builder for building requests to send messages to the server
90+
*/
9191
private final HttpRequest.Builder requestBuilder;
9292

9393
/**
@@ -124,9 +124,12 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
124124

125125
private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();
126126

127+
private final AtomicReference<Consumer<Void>> connectionClosedHandler = new AtomicReference<>();
128+
127129
private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient httpClient,
128130
HttpRequest.Builder requestBuilder, String baseUri, String endpoint, boolean resumableStreams,
129-
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer) {
131+
boolean openConnectionOnStartup, McpAsyncHttpClientRequestCustomizer httpRequestCustomizer,
132+
Consumer<Void> connectionClosedHandler) {
130133
this.jsonMapper = jsonMapper;
131134
this.httpClient = httpClient;
132135
this.requestBuilder = requestBuilder;
@@ -136,6 +139,7 @@ private HttpClientStreamableHttpTransport(McpJsonMapper jsonMapper, HttpClient h
136139
this.openConnectionOnStartup = openConnectionOnStartup;
137140
this.activeSession.set(createTransportSession());
138141
this.httpRequestCustomizer = httpRequestCustomizer;
142+
this.connectionClosedHandler.set(connectionClosedHandler);
139143
}
140144

141145
@Override
@@ -193,6 +197,12 @@ public void setExceptionHandler(Consumer<Throwable> handler) {
193197
this.exceptionHandler.set(handler);
194198
}
195199

200+
@Override
201+
public void setConnectionClosedHandler(Consumer<Void> closedHandler) {
202+
logger.debug("Connection closed handler registered");
203+
this.connectionClosedHandler.set(closedHandler);
204+
}
205+
196206
private void handleException(Throwable t) {
197207
logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
198208
if (t instanceof McpTransportSessionNotFoundException) {
@@ -206,6 +216,14 @@ private void handleException(Throwable t) {
206216
}
207217
}
208218

219+
private void handleConnectionClosed() {
220+
logger.debug("Handling connection closed for session {}", sessionIdOrPlaceholder(this.activeSession.get()));
221+
Consumer<Void> handler = this.connectionClosedHandler.get();
222+
if (handler != null) {
223+
handler.accept(null);
224+
}
225+
}
226+
209227
@Override
210228
public Mono<Void> closeGracefully() {
211229
return Mono.defer(() -> {
@@ -356,6 +374,7 @@ else if (statusCode == BAD_REQUEST) {
356374
if (ref != null) {
357375
transportSession.removeConnection(ref);
358376
}
377+
this.handleConnectionClosed();
359378
}))
360379
.contextWrite(ctx)
361380
.subscribe();
@@ -615,6 +634,8 @@ public static class Builder {
615634

616635
private Duration connectTimeout = Duration.ofSeconds(10);
617636

637+
private Consumer<Void> connectionClosedHandler = null;
638+
618639
/**
619640
* Creates a new builder with the specified base URI.
620641
* @param baseUri the base URI of the MCP server
@@ -763,6 +784,17 @@ public Builder connectTimeout(Duration connectTimeout) {
763784
return this;
764785
}
765786

787+
/**
788+
* Set the connection closed handler.
789+
* @param connectionClosedHandler the connection closed handler
790+
* @return this builder
791+
*/
792+
public Builder connectionClosedHandler(Consumer<Void> connectionClosedHandler) {
793+
Assert.notNull(connectionClosedHandler, "connectionClosedHandler must not be null");
794+
this.connectionClosedHandler = connectionClosedHandler;
795+
return this;
796+
}
797+
766798
/**
767799
* Construct a fresh instance of {@link HttpClientStreamableHttpTransport} using
768800
* the current builder configuration.
@@ -772,7 +804,7 @@ public HttpClientStreamableHttpTransport build() {
772804
HttpClient httpClient = this.clientBuilder.connectTimeout(this.connectTimeout).build();
773805
return new HttpClientStreamableHttpTransport(jsonMapper == null ? McpJsonMapper.getDefault() : jsonMapper,
774806
httpClient, requestBuilder, baseUri, endpoint, resumableStreams, openConnectionOnStartup,
775-
httpRequestCustomizer);
807+
httpRequestCustomizer, connectionClosedHandler);
776808
}
777809

778810
}

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpClientTransport.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,12 @@ public interface McpClientTransport extends McpTransport {
3838
default void setExceptionHandler(Consumer<Throwable> handler) {
3939
}
4040

41+
/**
42+
* Sets the handler for the transport closed event.
43+
* @param closedHandler Allows reacting to transport closed event by the higher layers
44+
*/
45+
default void setConnectionClosedHandler(Consumer<Void> closedHandler) {
46+
47+
}
48+
4149
}

0 commit comments

Comments
 (0)