Skip to content

Commit 4b14fdf

Browse files
committed
Address rewview comments
Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent cf02a57 commit 4b14fdf

File tree

5 files changed

+44
-70
lines changed

5 files changed

+44
-70
lines changed

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public abstract class AbstractMcpAsyncClientResiliencyTests {
7979
host = "http://" + ipAddressViaToxiproxy + ":" + portViaToxiproxy;
8080
}
8181

82-
private static void disconnect() {
82+
static void disconnect() {
8383
long start = System.nanoTime();
8484
try {
8585
// disconnect
@@ -96,7 +96,7 @@ private static void disconnect() {
9696
}
9797
}
9898

99-
private static void reconnect() {
99+
static void reconnect() {
100100
long start = System.nanoTime();
101101
try {
102102
proxy.toxics().get("RESET_UPSTREAM").remove();
@@ -110,7 +110,7 @@ private static void reconnect() {
110110
}
111111
}
112112

113-
private static void restartMcpServer() {
113+
static void restartMcpServer() {
114114
container.stop();
115115
container.start();
116116
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import java.time.Duration;
1414
import java.util.List;
1515
import java.util.Optional;
16+
import java.util.concurrent.CompletionException;
1617
import java.util.concurrent.atomic.AtomicReference;
1718
import java.util.function.Consumer;
1819
import java.util.function.Function;
@@ -135,7 +136,7 @@ public static Builder builder(String baseUri) {
135136
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
136137
return Mono.deferContextual(ctx -> {
137138
this.handler.set(handler);
138-
if (openConnectionOnStartup) {
139+
if (this.openConnectionOnStartup) {
139140
logger.debug("Eagerly opening connection on startup");
140141
return this.reconnect(null).onErrorComplete(t -> {
141142
logger.warn("Eager connect failed ", t);
@@ -286,6 +287,7 @@ else if (statusCode == BAD_REQUEST) {
286287

287288
}).<McpSchema
288289
.JSONRPCMessage>flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
290+
.onErrorMap(CompletionException.class, t -> t.getCause())
289291
.onErrorComplete(t -> {
290292
this.handleException(t);
291293
return true;
@@ -373,7 +375,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
373375
else {
374376
logger.debug("SSE connection established successfully");
375377
}
376-
})).onErrorComplete().subscribe();
378+
})).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe();
377379

378380
}).flatMap(responseEvent -> {
379381
if (transportSession.markInitialized(
@@ -464,19 +466,25 @@ else if (statusCode == BAD_REQUEST) {
464466

465467
return Flux.<McpSchema.JSONRPCMessage>error(
466468
new RuntimeException("Failed to send message: " + responseEvent));
467-
}).flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))).onErrorComplete(t -> {
468-
// handle the error first
469-
this.handleException(t);
470-
// inform the caller of sendMessage
471-
messageSink.error(t);
472-
return true;
473-
}).doFinally(s -> {
474-
logger.debug("SendMessage finally: {}", s);
475-
Disposable ref = disposableRef.getAndSet(null);
476-
if (ref != null) {
477-
transportSession.removeConnection(ref);
478-
}
479-
}).contextWrite(messageSink.contextView()).subscribe();
469+
})
470+
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
471+
.onErrorMap(CompletionException.class, t -> t.getCause())
472+
.onErrorComplete(t -> {
473+
// handle the error first
474+
this.handleException(t);
475+
// inform the caller of sendMessage
476+
messageSink.error(t);
477+
return true;
478+
})
479+
.doFinally(s -> {
480+
logger.debug("SendMessage finally: {}", s);
481+
Disposable ref = disposableRef.getAndSet(null);
482+
if (ref != null) {
483+
transportSession.removeConnection(ref);
484+
}
485+
})
486+
.contextWrite(messageSink.contextView())
487+
.subscribe();
480488

481489
disposableRef.set(connection);
482490
transportSession.addConnection(connection);

mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
import reactor.core.publisher.BaseSubscriber;
1616
import reactor.core.publisher.FluxSink;
1717

18+
/**
19+
* Utility class providing various {@link BodySubscriber} implementations for handling
20+
* different types of HTTP response bodies in the context of Model Context Protocol (MCP)
21+
* clients.
22+
*
23+
* <p>
24+
* Defines subscribers for processing Server-Sent Events (SSE), aggregate responses, and
25+
* bodiless responses.
26+
*
27+
* @author Christian Tzolov
28+
* @author Dariusz Jędrzejczyk
29+
*/
1830
class ResponseSubscribers {
1931

20-
/**
21-
* Represents a Server-Sent Event with its standard fields.
22-
*
23-
* @param id the event ID, may be {@code null}
24-
* @param event the event type, may be {@code null} (defaults to "message")
25-
* @param data the event payload data, never {@code null}
26-
*/
2732
record SseEvent(String id, String event, String data) {
2833
}
2934

@@ -115,10 +120,6 @@ public SseLineSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> sink
115120
this.responseInfo = responseInfo;
116121
}
117122

118-
/**
119-
* Initializes the subscription and sets up disposal callback.
120-
* @param subscription the {@link Subscription} to the upstream line source
121-
*/
122123
@Override
123124
protected void hookOnSubscribe(Subscription subscription) {
124125

@@ -132,12 +133,6 @@ protected void hookOnSubscribe(Subscription subscription) {
132133
});
133134
}
134135

135-
/**
136-
* Processes each line from the SSE stream according to the SSE protocol. Empty
137-
* lines trigger event emission, other lines are parsed for data, id, or event
138-
* type.
139-
* @param line the line to process from the SSE stream
140-
*/
141136
@Override
142137
protected void hookOnNext(String line) {
143138
if (line.isEmpty()) {
@@ -172,9 +167,6 @@ else if (line.startsWith("event:")) {
172167
}
173168
}
174169

175-
/**
176-
* Called when the upstream line source completes normally.
177-
*/
178170
@Override
179171
protected void hookOnComplete() {
180172
if (this.eventBuilder.length() > 0) {
@@ -185,10 +177,6 @@ protected void hookOnComplete() {
185177
this.sink.complete();
186178
}
187179

188-
/**
189-
* Called when an error occurs in the upstream line source.
190-
* @param throwable the error that occurred
191-
*/
192180
@Override
193181
protected void hookOnError(Throwable throwable) {
194182
this.sink.error(throwable);
@@ -225,10 +213,6 @@ public AggregateSubscriber(ResponseInfo responseInfo, FluxSink<ResponseEvent> si
225213
this.responseInfo = responseInfo;
226214
}
227215

228-
/**
229-
* Initializes the subscription and sets up disposal callback.
230-
* @param subscription the {@link Subscription} to the upstream line source
231-
*/
232216
@Override
233217
protected void hookOnSubscribe(Subscription subscription) {
234218
sink.onRequest(subscription::request);
@@ -237,18 +221,11 @@ protected void hookOnSubscribe(Subscription subscription) {
237221
sink.onDispose(subscription::cancel);
238222
}
239223

240-
/**
241-
* Aggregate each line from the Http response.
242-
* @param line next line to process from the Http response
243-
*/
244224
@Override
245225
protected void hookOnNext(String line) {
246226
this.eventBuilder.append(line).append("\n");
247227
}
248228

249-
/**
250-
* Called when the upstream line source completes normally.
251-
*/
252229
@Override
253230
protected void hookOnComplete() {
254231
if (this.eventBuilder.length() > 0) {
@@ -258,10 +235,6 @@ protected void hookOnComplete() {
258235
this.sink.complete();
259236
}
260237

261-
/**
262-
* Called when an error occurs in the upstream line source.
263-
* @param throwable the error that occurred
264-
*/
265238
@Override
266239
protected void hookOnError(Throwable throwable) {
267240
this.sink.error(throwable);
@@ -283,10 +256,6 @@ public BodilessResponseLineSubscriber(ResponseInfo responseInfo, FluxSink<Respon
283256
this.responseInfo = responseInfo;
284257
}
285258

286-
/**
287-
* Initializes the subscription and sets up disposal callback.
288-
* @param subscription the {@link Subscription} to the upstream line source
289-
*/
290259
@Override
291260
protected void hookOnSubscribe(Subscription subscription) {
292261

@@ -300,9 +269,6 @@ protected void hookOnSubscribe(Subscription subscription) {
300269
});
301270
}
302271

303-
/**
304-
* Called when the upstream line source completes normally.
305-
*/
306272
@Override
307273
protected void hookOnComplete() {
308274
// emit dummy event to be able to inspect the response info
@@ -313,10 +279,6 @@ protected void hookOnComplete() {
313279
this.sink.complete();
314280
}
315281

316-
/**
317-
* Called when an error occurs in the upstream line source.
318-
* @param throwable the error that occurred
319-
*/
320282
@Override
321283
protected void hookOnError(Throwable throwable) {
322284
this.sink.error(throwable);

mcp/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
/*
2+
* Copyright 2024-2024 the original author or authors.
3+
*/
14
package io.modelcontextprotocol.client;
25

36
import eu.rekawek.toxiproxy.Proxy;
@@ -35,6 +38,7 @@
3538
*
3639
* @author Dariusz Jędrzejczyk
3740
*/
41+
// KEEP IN SYNC with the class in mcp-test module
3842
public abstract class AbstractMcpAsyncClientResiliencyTests {
3943

4044
private static final Logger logger = LoggerFactory.getLogger(AbstractMcpAsyncClientResiliencyTests.class);

mcp/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpAsyncClientResiliencyTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
package io.modelcontextprotocol.client;
66

7-
import java.util.concurrent.CompletionException;
7+
import java.io.IOException;
88

99
import org.junit.jupiter.api.Test;
1010
import org.junit.jupiter.api.Timeout;
@@ -28,7 +28,7 @@ void testPingWithExactExceptionType() {
2828

2929
disconnect();
3030

31-
StepVerifier.create(mcpAsyncClient.ping()).expectError(CompletionException.class).verify();
31+
StepVerifier.create(mcpAsyncClient.ping()).expectError(IOException.class).verify();
3232

3333
reconnect();
3434

0 commit comments

Comments
 (0)