Skip to content

Commit a644fcf

Browse files
committed
refactor: improve HTTP client error handling and cleanup debug code
- Replace exceptionallyCompose with whenComplete for better async error handling - Make ResponseSubscribers class package-private Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent b4273a0 commit a644fcf

File tree

2 files changed

+24
-14
lines changed

2 files changed

+24
-14
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,13 @@ private Publisher<Void> createDelete(String sessionId) {
162162
.build();
163163

164164
return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
165-
.exceptionallyCompose(e -> {
166-
logger.warn("Error sending message", e);
167-
168-
return CompletableFuture.failedFuture(e);
165+
.whenComplete((response, throwable) -> {
166+
if (throwable != null) {
167+
logger.warn("Error sending message", throwable);
168+
}
169+
else {
170+
logger.debug("SSE connection established successfully");
171+
}
169172
})).doOnError(e -> logger.warn("Got error when closing transport", e)).then();
170173
});
171174
}
@@ -233,10 +236,14 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
233236

234237
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
235238
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
236-
.exceptionallyCompose(e -> {
237-
logger.warn("Error sending message", e);
238-
sseSink.error(e);
239-
return CompletableFuture.failedFuture(e);
239+
.whenComplete((response, throwable) -> {
240+
if (throwable != null) {
241+
logger.warn("Error sending message", throwable);
242+
sseSink.error(throwable);
243+
}
244+
else {
245+
logger.debug("SSE connection established successfully");
246+
}
240247
})).flatMap(responseEvent -> {
241248
int statusCode = responseEvent.responseInfo().statusCode();
242249

@@ -369,10 +376,14 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
369376

370377
// Create the async request with proper body subscriber selection
371378
Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(responseEventSink))
372-
.exceptionallyCompose(e -> {
373-
logger.warn("Error sending message", e);
374-
responseEventSink.error(e);
375-
return CompletableFuture.failedFuture(e);
379+
.whenComplete((response, throwable) -> {
380+
if (throwable != null) {
381+
logger.warn("Error sending message", throwable);
382+
responseEventSink.error(throwable);
383+
}
384+
else {
385+
logger.debug("SSE connection established successfully");
386+
}
376387
})).subscribe();
377388

378389
}).flatMap(responseEvent -> {

mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import reactor.core.publisher.BaseSubscriber;
2121
import reactor.core.publisher.FluxSink;
2222

23-
public class ResponseSubscribers {
23+
class ResponseSubscribers {
2424

2525
/**
2626
* Represents a Server-Sent Event with its standard fields.
@@ -328,7 +328,6 @@ protected void hookOnSubscribe(Subscription subscription) {
328328

329329
@Override
330330
protected void hookOnNext(String line) {
331-
System.out.println(">>>>>>>>>>>>>> Received line: " + line);
332331
}
333332

334333
/**

0 commit comments

Comments
 (0)