Skip to content

Commit 9b0b02a

Browse files
committed
Handle HTTP DELETE upon client close
1 parent e402f75 commit 9b0b02a

File tree

4 files changed

+46
-9
lines changed

4 files changed

+46
-9
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.modelcontextprotocol.spec.McpSessionNotFoundException;
1111
import io.modelcontextprotocol.spec.McpTransportSession;
1212
import io.modelcontextprotocol.spec.McpTransportStream;
13+
import org.reactivestreams.Publisher;
1314
import org.slf4j.Logger;
1415
import org.slf4j.LoggerFactory;
1516
import org.springframework.core.ParameterizedTypeReference;
@@ -31,6 +32,7 @@
3132
import java.util.concurrent.atomic.AtomicReference;
3233
import java.util.function.Consumer;
3334
import java.util.function.Function;
35+
import java.util.function.Supplier;
3436

3537
public class WebClientStreamableHttpTransport implements McpClientTransport {
3638

@@ -69,7 +71,24 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
6971
this.endpoint = endpoint;
7072
this.resumableStreams = resumableStreams;
7173
this.openConnectionOnStartup = openConnectionOnStartup;
72-
this.activeSession.set(new DefaultMcpTransportSession());
74+
this.activeSession.set(createTransportSession());
75+
}
76+
77+
private DefaultMcpTransportSession createTransportSession() {
78+
Supplier<Publisher<Void>> onClose = () -> {
79+
DefaultMcpTransportSession transportSession = this.activeSession.get();
80+
return transportSession.sessionId().isEmpty() ? Mono.empty() : webClient
81+
.delete()
82+
.uri(this.endpoint)
83+
.headers(httpHeaders -> {
84+
httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
85+
})
86+
.retrieve()
87+
.toBodilessEntity()
88+
.doOnError(e -> logger.info("Got response {}", e))
89+
.then();
90+
};
91+
return new DefaultMcpTransportSession(onClose);
7392
}
7493

7594
@Override
@@ -93,7 +112,7 @@ public void setExceptionHandler(Consumer<Throwable> handler) {
93112
private void handleException(Throwable t) {
94113
logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
95114
if (t instanceof McpSessionNotFoundException) {
96-
McpTransportSession<?> invalidSession = this.activeSession.getAndSet(new DefaultMcpTransportSession());
115+
McpTransportSession<?> invalidSession = this.activeSession.getAndSet(createTransportSession());
97116
logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
98117
invalidSession.close();
99118
}
@@ -107,7 +126,7 @@ private void handleException(Throwable t) {
107126
public Mono<Void> closeGracefully() {
108127
return Mono.defer(() -> {
109128
logger.debug("Graceful close triggered");
110-
DefaultMcpTransportSession currentSession = this.activeSession.get();
129+
DefaultMcpTransportSession currentSession = this.activeSession.getAndSet(createTransportSession());
111130
if (currentSession != null) {
112131
return currentSession.closeGracefully();
113132
}

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,4 +195,15 @@ void testCallTool() {
195195
});
196196
}
197197

198+
@Test
199+
void testSessionClose() {
200+
withClient(createMcpTransport(), mcpAsyncClient -> {
201+
StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
202+
// In case of Streamable HTTP this call should issue a HTTP DELETE request invalidating the session
203+
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
204+
// The next use should immediately re-initialize with no issue and send the request without any broken connections.
205+
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
206+
});
207+
}
208+
198209
}

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,9 +321,11 @@ public void close() {
321321
* @return A Mono that completes when the connection is closed
322322
*/
323323
public Mono<Void> closeGracefully() {
324-
Initialization current = this.initialization.getAndSet(null);
325-
Mono<?> sessionClose = current != null ? current.closeGracefully() : Mono.empty();
326-
return sessionClose.then(transport.closeGracefully());
324+
return Mono.defer(() -> {
325+
Initialization current = this.initialization.getAndSet(null);
326+
Mono<?> sessionClose = current != null ? current.closeGracefully() : Mono.empty();
327+
return sessionClose.then(transport.closeGracefully());
328+
});
327329
}
328330

329331
// --------------------------

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.modelcontextprotocol.spec;
22

3+
import org.reactivestreams.Publisher;
34
import org.slf4j.Logger;
45
import org.slf4j.LoggerFactory;
56
import reactor.core.Disposable;
@@ -9,6 +10,7 @@
910
import java.util.Optional;
1011
import java.util.concurrent.atomic.AtomicBoolean;
1112
import java.util.concurrent.atomic.AtomicReference;
13+
import java.util.function.Supplier;
1214

1315
public class DefaultMcpTransportSession implements McpTransportSession<Disposable> {
1416

@@ -20,8 +22,11 @@ public class DefaultMcpTransportSession implements McpTransportSession<Disposabl
2022

2123
private final AtomicReference<String> sessionId = new AtomicReference<>();
2224

23-
public DefaultMcpTransportSession() {
24-
}
25+
private final Supplier<Publisher<Void>> onClose;
26+
27+
public DefaultMcpTransportSession(Supplier<Publisher<Void>> onClose) {
28+
this.onClose = onClose;
29+
}
2530

2631
@Override
2732
public Optional<String> sessionId() {
@@ -61,7 +66,7 @@ public void close() {
6166

6267
@Override
6368
public Mono<Void> closeGracefully() {
64-
return Mono.fromRunnable(this.openConnections::dispose);
69+
return Mono.from(this.onClose.get()).then(Mono.fromRunnable(this.openConnections::dispose));
6570
}
6671

6772
}

0 commit comments

Comments
 (0)