Skip to content

Commit 9072fb6

Browse files
committed
fix todo clear pending responses
1 parent 7a07c75 commit 9072fb6

File tree

1 file changed

+16
-12
lines changed

1 file changed

+16
-12
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,27 +4,27 @@
44

55
package io.modelcontextprotocol.spec;
66

7-
import java.time.Duration;
8-
import java.util.Map;
9-
import java.util.concurrent.ConcurrentHashMap;
10-
import java.util.concurrent.atomic.AtomicBoolean;
11-
import java.util.concurrent.atomic.AtomicInteger;
12-
import java.util.concurrent.atomic.AtomicLong;
13-
import java.util.concurrent.atomic.AtomicReference;
14-
157
import io.modelcontextprotocol.common.McpTransportContext;
8+
import io.modelcontextprotocol.json.TypeRef;
169
import io.modelcontextprotocol.server.McpAsyncServerExchange;
1710
import io.modelcontextprotocol.server.McpInitRequestHandler;
1811
import io.modelcontextprotocol.server.McpNotificationHandler;
1912
import io.modelcontextprotocol.server.McpRequestHandler;
20-
import io.modelcontextprotocol.json.TypeRef;
2113
import io.modelcontextprotocol.util.Assert;
2214
import org.slf4j.Logger;
2315
import org.slf4j.LoggerFactory;
2416
import reactor.core.publisher.Mono;
2517
import reactor.core.publisher.MonoSink;
2618
import reactor.core.publisher.Sinks;
2719

20+
import java.time.Duration;
21+
import java.util.Map;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
2828
/**
2929
* Represents a Model Context Protocol (MCP) session on the server side. It manages
3030
* bidirectional JSON-RPC communication with the client.
@@ -37,7 +37,9 @@ public class McpServerSession implements McpLoggableSession {
3737

3838
private final String id;
3939

40-
/** Duration to wait for request responses before timing out */
40+
/**
41+
* Duration to wait for request responses before timing out
42+
*/
4143
private final Duration requestTimeout;
4244

4345
private final AtomicLong requestCounter = new AtomicLong(0);
@@ -348,8 +350,9 @@ private MethodNotFoundError getMethodNotFoundError(String method) {
348350

349351
@Override
350352
public Mono<Void> closeGracefully() {
351-
// TODO: clear pendingResponses and emit errors?
352353
if (this.closed.compareAndSet(false, true)) {
354+
this.pendingResponses.forEach((id, response) -> response.error(new RuntimeException("Session closed")));
355+
this.pendingResponses.clear();
353356
return this.transport.closeGracefully();
354357
}
355358
else {
@@ -359,8 +362,9 @@ public Mono<Void> closeGracefully() {
359362

360363
@Override
361364
public void close() {
362-
// TODO: clear pendingResponses and emit errors?
363365
if (this.closed.compareAndSet(false, true)) {
366+
this.pendingResponses.forEach((id, response) -> response.error(new RuntimeException("Session closed")));
367+
this.pendingResponses.clear();
364368
this.transport.close();
365369
}
366370
}

0 commit comments

Comments
 (0)