Skip to content

Commit 3ef98ef

Browse files
committed
Fix the simulateIncomingMessage coordination
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent 139a65c commit 3ef98ef

File tree

2 files changed

+14
-35
lines changed

2 files changed

+14
-35
lines changed

mcp/src/test/java/io/modelcontextprotocol/MockMcpTransport.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
package io.modelcontextprotocol;
66

7+
import java.util.concurrent.TimeUnit;
78
import java.util.concurrent.atomic.AtomicInteger;
89
import java.util.function.Function;
910

@@ -33,37 +34,40 @@ public class MockMcpTransport implements ClientMcpTransport, ServerMcpTransport
3334

3435
private final Flux<McpSchema.JSONRPCMessage> outboundView = outgoing.asFlux().cache(1);
3536

37+
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
38+
3639
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
3740
if (inbound.tryEmitNext(message).isFailure()) {
3841
throw new RuntimeException("Failed to emit message " + message);
3942
}
4043
inboundMessageCount.incrementAndGet();
41-
42-
try {
43-
Thread.sleep(200);
44-
}
45-
catch (InterruptedException e) {
46-
e.printStackTrace();
47-
}
44+
latch = new java.util.concurrent.CountDownLatch(1);
4845
}
4946

5047
@Override
5148
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
5249
if (outgoing.tryEmitNext(message).isFailure()) {
5350
return Mono.error(new RuntimeException("Can't emit outgoing message " + message));
5451
}
52+
latch.countDown();
5553
return Mono.empty();
5654
}
5755

5856
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
59-
return (JSONRPCRequest) outboundView.blockFirst();
57+
return (JSONRPCRequest) getLastSentMessage();
6058
}
6159

6260
public McpSchema.JSONRPCNotification getLastSentMessageAsNotifiation() {
63-
return (JSONRPCNotification) outboundView.blockFirst();
61+
return (JSONRPCNotification) getLastSentMessage();
6462
}
6563

6664
public McpSchema.JSONRPCMessage getLastSentMessage() {
65+
try {
66+
latch.await(200, TimeUnit.MILLISECONDS);
67+
}
68+
catch (InterruptedException e) {
69+
e.printStackTrace();
70+
}
6771
return outboundView.blockFirst();
6872
}
6973

mcp/src/test/java/io/modelcontextprotocol/client/McpAsyncClientResponseHandlerTests.java

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,6 @@ private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTr
3737
.build(),
3838
new McpSchema.Implementation("test-server", "1.0.0"), "Test instructions");
3939

40-
// Use CountDownLatch to coordinate between threads
41-
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
42-
4340
// Create a Mono that will handle the initialization and response simulation
4441
return asyncMcpClient.initialize().doOnSubscribe(subscription -> {
4542
// Run in a separate reactive context to avoid blocking the main subscription
@@ -51,17 +48,7 @@ private InitializeResult initialization(McpAsyncClient asyncMcpClient, MockMcpTr
5148
McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
5249
initRequest.id(), mockInitResult, null);
5350
transport.simulateIncomingMessage(initResponse);
54-
latch.countDown();
5551
}).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe();
56-
}).doOnTerminate(() -> {
57-
try {
58-
// Wait for the response simulation to complete
59-
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
60-
}
61-
catch (InterruptedException e) {
62-
Thread.currentThread().interrupt();
63-
throw new RuntimeException("Interrupted while waiting for initialization", e);
64-
}
6552
}).block();
6653
}
6754

@@ -82,9 +69,6 @@ void testSuccessfulInitialization() {
8269
McpSchema.InitializeResult mockInitResult = new McpSchema.InitializeResult(McpSchema.LATEST_PROTOCOL_VERSION,
8370
mockServerCapabilities, mockServerInfo, "Test instructions");
8471

85-
// Use CountDownLatch to coordinate between threads
86-
java.util.concurrent.CountDownLatch latch = new java.util.concurrent.CountDownLatch(1);
87-
8872
// Start initialization with reactive handling
8973
InitializeResult result = asyncMcpClient.initialize().doOnSubscribe(subscription -> {
9074
// Run in a separate reactive context to avoid blocking the main subscription
@@ -96,17 +80,8 @@ void testSuccessfulInitialization() {
9680
McpSchema.JSONRPCResponse initResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION,
9781
initRequest.id(), mockInitResult, null);
9882
transport.simulateIncomingMessage(initResponse);
99-
latch.countDown();
83+
// latch.countDown();
10084
}).subscribeOn(reactor.core.scheduler.Schedulers.boundedElastic()).subscribe();
101-
}).doOnTerminate(() -> {
102-
try {
103-
// Wait for the response simulation to complete
104-
latch.await(5, java.util.concurrent.TimeUnit.SECONDS);
105-
}
106-
catch (InterruptedException e) {
107-
Thread.currentThread().interrupt();
108-
throw new RuntimeException("Interrupted while waiting for initialization", e);
109-
}
11085
}).block();
11186

11287
// Verify initialized notification was sent

0 commit comments

Comments
 (0)