Skip to content

Commit 19d02f2

Browse files
committed
optimize and add test for listening sse closed
1 parent 90ecbb4 commit 19d02f2

File tree

4 files changed

+53
-14
lines changed

4 files changed

+53
-14
lines changed

mcp-spring/mcp-spring-webmvc/src/test/java/io/modelcontextprotocol/server/WebMvcStreamableIntegrationTests.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,24 @@ public void before() {
8181
throw new RuntimeException("Failed to start Tomcat", e);
8282
}
8383

84-
clientBuilders
85-
.put("httpclient",
86-
McpClient.sync(HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT)
87-
.endpoint(MESSAGE_ENDPOINT)
88-
.build()).initializationTimeout(Duration.ofHours(10)).requestTimeout(Duration.ofHours(10)));
89-
90-
clientBuilders.put("webflux",
91-
McpClient.sync(WebClientStreamableHttpTransport
92-
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT))
93-
.endpoint(MESSAGE_ENDPOINT)
94-
.build()));
84+
var httpClientTransport = HttpClientStreamableHttpTransport.builder("http://localhost:" + PORT)
85+
.endpoint(MESSAGE_ENDPOINT)
86+
.openConnectionOnStartup(true)
87+
.build();
88+
89+
clientTransportBuilders.put("httpclient", httpClientTransport);
90+
clientBuilders.put("httpclient",
91+
McpClient.sync(httpClientTransport)
92+
.initializationTimeout(Duration.ofHours(10))
93+
.requestTimeout(Duration.ofHours(10)));
94+
var webClientTransport = WebClientStreamableHttpTransport
95+
.builder(WebClient.builder().baseUrl("http://localhost:" + PORT))
96+
.endpoint(MESSAGE_ENDPOINT)
97+
.openConnectionOnStartup(true)
98+
.build();
99+
clientTransportBuilders.put("webflux", webClientTransport);
100+
101+
clientBuilders.put("webflux", McpClient.sync(webClientTransport));
95102

96103
// Get the transport from Spring context
97104
this.mcpServerTransportProvider = tomcatServer.appContext()

mcp-spring/mcp-spring-webmvc/src/test/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
<logger name="io.modelcontextprotocol.server.transport" level="INFO"/>
1919

2020
<!-- Spec package -->
21-
<logger name="io.modelcontextprotocol.spec" level="INFO"/>
21+
<logger name="io.modelcontextprotocol.spec" level="DEBUG"/>
2222

2323
<!-- Root logger -->
2424
<root level="INFO">

mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import static org.awaitility.Awaitility.await;
1313
import static org.mockito.Mockito.mock;
1414

15+
import java.io.ByteArrayOutputStream;
16+
import java.io.IOException;
17+
import java.io.PrintStream;
1518
import java.net.URI;
1619
import java.net.http.HttpClient;
1720
import java.net.http.HttpRequest;
@@ -29,6 +32,7 @@
2932
import java.util.function.Function;
3033
import java.util.stream.Collectors;
3134

35+
import io.modelcontextprotocol.spec.McpClientTransport;
3236
import org.junit.jupiter.params.ParameterizedTest;
3337
import org.junit.jupiter.params.provider.ValueSource;
3438

@@ -66,6 +70,8 @@ public abstract class AbstractMcpClientServerIntegrationTests {
6670

6771
protected ConcurrentHashMap<String, McpClient.SyncSpec> clientBuilders = new ConcurrentHashMap<>();
6872

73+
protected ConcurrentHashMap<String, McpClientTransport> clientTransportBuilders = new ConcurrentHashMap<>();
74+
6975
abstract protected void prepareClients(int port, String mcpEndpoint);
7076

7177
abstract protected McpServer.AsyncSpecification<?> prepareAsyncServerBuilder();
@@ -836,7 +842,7 @@ void testThrowingToolCallIsCaughtBeforeTimeout(String clientType) {
836842

837843
@ParameterizedTest(name = "{0} : {displayName} ")
838844
@ValueSource(strings = { "httpclient", "webflux" })
839-
void testToolCallSuccessWithTranportContextExtraction(String clientType) {
845+
void testToolCallSuccessWithTransportContextExtraction(String clientType) {
840846

841847
var clientBuilder = clientBuilders.get(clientType);
842848

@@ -999,6 +1005,32 @@ void testInitialize(String clientType) {
9991005
mcpServer.close();
10001006
}
10011007

1008+
@ParameterizedTest(name = "{0} : {displayName} ")
1009+
@ValueSource(strings = { "httpclient", "webflux" })
1010+
void testListeningStreamWillClosedWhenNew(String clientType) throws IOException {
1011+
var clientTransport = clientTransportBuilders.get(clientType);
1012+
if (clientTransport == null) {
1013+
return;
1014+
}
1015+
PrintStream originalOut = System.out;
1016+
ByteArrayOutputStream capturedOutput = new ByteArrayOutputStream();
1017+
System.setOut(new PrintStream(capturedOutput));
1018+
1019+
var clientBuilder = clientBuilders.get(clientType);
1020+
var mcpServer = prepareSyncServerBuilder().build();
1021+
var mcpClient = clientBuilder.build();
1022+
InitializeResult initResult = mcpClient.initialize();
1023+
assertThat(initResult).isNotNull();
1024+
clientTransport.connect(message -> Mono.empty()).subscribe();
1025+
await().atMost(Duration.ofSeconds(1)).untilAsserted(() -> {
1026+
assertThat(capturedOutput.toString().contains("Listening stream already exists for this session")).isTrue();
1027+
});
1028+
System.setOut(originalOut);
1029+
capturedOutput.close();
1030+
mcpClient.close();
1031+
mcpServer.close();
1032+
}
1033+
10021034
// ---------------------------------------
10031035
// Logging Tests
10041036
// ---------------------------------------

mcp/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ public Mono<Void> delete() {
139139
public Mono<McpStreamableServerSessionStream> listeningStream(McpStreamableServerTransport transport) {
140140
McpStreamableServerSessionStream listeningStream = new McpStreamableServerSessionStream(transport);
141141
McpLoggableSession oldStream = this.listeningStreamRef.getAndSet(listeningStream);
142-
if (oldStream != null) {
142+
if (oldStream != null && !(oldStream instanceof MissingMcpTransportSession)) {
143143
logger.debug(
144144
"Listening stream already exists for this session:{} and will be closed to make way for the new listening SSE stream",
145145
this.id);

0 commit comments

Comments
 (0)