Skip to content

Commit b4273a0

Browse files
committed
Add sse lost-connection test to verify that no timeouts are thrown
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 9b0ac03 commit b4273a0

File tree

1 file changed

+137
-0
lines changed

1 file changed

+137
-0
lines changed
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client;
6+
7+
import static org.assertj.core.api.Assertions.assertThatCode;
8+
9+
import java.io.IOException;
10+
import java.time.Duration;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
import java.util.function.Consumer;
13+
14+
import org.junit.jupiter.api.Test;
15+
import org.junit.jupiter.api.Timeout;
16+
import org.slf4j.Logger;
17+
import org.slf4j.LoggerFactory;
18+
import org.testcontainers.containers.GenericContainer;
19+
import org.testcontainers.containers.Network;
20+
import org.testcontainers.containers.ToxiproxyContainer;
21+
import org.testcontainers.containers.wait.strategy.Wait;
22+
23+
import eu.rekawek.toxiproxy.Proxy;
24+
import eu.rekawek.toxiproxy.ToxiproxyClient;
25+
import eu.rekawek.toxiproxy.model.ToxicDirection;
26+
import io.modelcontextprotocol.client.transport.HttpClientSseClientTransport;
27+
import io.modelcontextprotocol.spec.McpClientTransport;
28+
import io.modelcontextprotocol.spec.McpSchema;
29+
import reactor.test.StepVerifier;
30+
31+
@Timeout(15)
32+
public class HttpSseMcpAsyncClientLostConnectionTests {
33+
34+
private static final Logger logger = LoggerFactory.getLogger(HttpSseMcpAsyncClientLostConnectionTests.class);
35+
36+
static Network network = Network.newNetwork();
37+
static String host = "http://localhost:3001";
38+
39+
// Uses the https://github.com/tzolov/mcp-everything-server-docker-image
40+
@SuppressWarnings("resource")
41+
static GenericContainer<?> container = new GenericContainer<>("docker.io/tzolov/mcp-everything-server:v2")
42+
.withCommand("node dist/index.js sse")
43+
.withLogConsumer(outputFrame -> System.out.println(outputFrame.getUtf8String()))
44+
.withNetwork(network)
45+
.withNetworkAliases("everything-server")
46+
.withExposedPorts(3001)
47+
.waitingFor(Wait.forHttp("/").forStatusCode(404));
48+
49+
static ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0").withNetwork(network)
50+
.withExposedPorts(8474, 3000);
51+
52+
static Proxy proxy;
53+
54+
static {
55+
container.start();
56+
57+
toxiproxy.start();
58+
59+
final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
60+
try {
61+
proxy = toxiproxyClient.createProxy("everything-server", "0.0.0.0:3000", "everything-server:3001");
62+
}
63+
catch (IOException e) {
64+
throw new RuntimeException("Can't create proxy!", e);
65+
}
66+
67+
final String ipAddressViaToxiproxy = toxiproxy.getHost();
68+
final int portViaToxiproxy = toxiproxy.getMappedPort(3000);
69+
70+
host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
71+
}
72+
73+
static void disconnect() {
74+
long start = System.nanoTime();
75+
try {
76+
proxy.toxics().resetPeer("RESET_DOWNSTREAM", ToxicDirection.DOWNSTREAM, 0);
77+
proxy.toxics().resetPeer("RESET_UPSTREAM", ToxicDirection.UPSTREAM, 0);
78+
logger.info("Disconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
79+
}
80+
catch (IOException e) {
81+
throw new RuntimeException("Failed to disconnect", e);
82+
}
83+
}
84+
85+
static void reconnect() {
86+
long start = System.nanoTime();
87+
try {
88+
proxy.toxics().get("RESET_UPSTREAM").remove();
89+
proxy.toxics().get("RESET_DOWNSTREAM").remove();
90+
logger.info("Reconnect took {} ms", Duration.ofNanos(System.nanoTime() - start).toMillis());
91+
}
92+
catch (IOException e) {
93+
throw new RuntimeException("Failed to reconnect", e);
94+
}
95+
}
96+
97+
McpAsyncClient client(McpClientTransport transport) {
98+
AtomicReference<McpAsyncClient> client = new AtomicReference<>();
99+
100+
assertThatCode(() -> {
101+
McpClient.AsyncSpec builder = McpClient.async(transport)
102+
.requestTimeout(Duration.ofSeconds(14))
103+
.initializationTimeout(Duration.ofSeconds(2))
104+
.capabilities(McpSchema.ClientCapabilities.builder().roots(true).build());
105+
client.set(builder.build());
106+
}).doesNotThrowAnyException();
107+
108+
return client.get();
109+
}
110+
111+
void withClient(McpClientTransport transport, Consumer<McpAsyncClient> c) {
112+
var client = client(transport);
113+
try {
114+
c.accept(client);
115+
}
116+
finally {
117+
StepVerifier.create(client.closeGracefully()).expectComplete().verify(Duration.ofSeconds(10));
118+
}
119+
}
120+
121+
@Test
122+
void testPingWithEaxctExceptionType() {
123+
withClient(HttpClientSseClientTransport.builder(host).build(), mcpAsyncClient -> {
124+
StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
125+
126+
disconnect();
127+
128+
// Veryfiy that the exception type is IOException and not TimeoutException
129+
StepVerifier.create(mcpAsyncClient.ping()).expectError(IOException.class).verify();
130+
131+
reconnect();
132+
133+
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
134+
});
135+
}
136+
137+
}

0 commit comments

Comments
 (0)