Skip to content

Commit 9afbb8e

Browse files
committed
Improve error handling and logging
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent a644fcf commit 9afbb8e

File tree

6 files changed

+36
-53
lines changed

6 files changed

+36
-53
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,11 +129,10 @@ private DefaultMcpTransportSession createTransportSession() {
129129
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
130130
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
131131
httpHeaders.add("mcp-session-id", sessionId);
132-
})
133-
.retrieve()
134-
.toBodilessEntity()
135-
.doOnError(e -> logger.warn("Got error when closing transport", e))
136-
.then();
132+
}).retrieve().toBodilessEntity().onErrorComplete(e -> {
133+
logger.warn("Got error when closing transport", e);
134+
return true;
135+
}).then();
137136
return new DefaultMcpTransportSession(onClose);
138137
}
139138

@@ -305,12 +304,12 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
305304
}
306305
})
307306
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
308-
.onErrorResume(t -> {
307+
.onErrorComplete(t -> {
309308
// handle the error first
310309
this.handleException(t);
311310
// inform the caller of sendMessage
312311
sink.error(t);
313-
return Flux.empty();
312+
return true;
314313
})
315314
.doFinally(s -> {
316315
Disposable ref = disposableRef.getAndSet(null);

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,6 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
336336
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
337337
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
338338
.exceptionallyCompose(e -> {
339-
logger.warn("Error sending message", e);
340339
sseSink.error(e);
341340
return CompletableFuture.failedFuture(e);
342341
})).flatMap(responseEvent -> {
@@ -386,6 +385,9 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
386385
}
387386
return Mono.empty();
388387

388+
}).onErrorComplete(t -> {
389+
logger.warn("SSE stream observed an error", t);
390+
return true;
389391
}).doFinally(s -> {
390392
Disposable ref = this.sseSubscription.getAndSet(null);
391393
if (ref != null && !ref.isDisposed()) {
@@ -459,11 +461,7 @@ private Mono<HttpResponse<Void>> sendHttpPost(final String endpoint, final Strin
459461
.POST(HttpRequest.BodyPublishers.ofString(body))
460462
.build();
461463

462-
return Mono.fromFuture(
463-
httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()).exceptionallyCompose(e -> {
464-
logger.warn("Error sending message", e);
465-
return CompletableFuture.failedFuture(e);
466-
}));
464+
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()));
467465
}
468466

469467
/**

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
138138
this.handler.set(handler);
139139
if (openConnectionOnStartup) {
140140
logger.debug("Eagerly opening connection on startup");
141-
return this.reconnect(null).then();
141+
return this.reconnect(null).onErrorComplete(t -> {
142+
logger.warn("Eager connect failed ", t);
143+
return true;
144+
}).then();
142145
}
143146
return Mono.empty();
144147
});
@@ -151,26 +154,14 @@ private DefaultMcpTransportSession createTransportSession() {
151154
}
152155

153156
private Publisher<Void> createDelete(String sessionId) {
154-
155-
return Mono.defer(() -> { // Do we need to defer this?
156-
157-
HttpRequest request = this.requestBuilder.copy()
158-
.uri(Utils.resolveUri(this.baseUri, this.endpoint))
159-
.header("Cache-Control", "no-cache")
160-
.header("mcp-session-id", sessionId)
161-
.DELETE()
162-
.build();
163-
164-
return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())
165-
.whenComplete((response, throwable) -> {
166-
if (throwable != null) {
167-
logger.warn("Error sending message", throwable);
168-
}
169-
else {
170-
logger.debug("SSE connection established successfully");
171-
}
172-
})).doOnError(e -> logger.warn("Got error when closing transport", e)).then();
173-
});
157+
HttpRequest request = this.requestBuilder.copy()
158+
.uri(Utils.resolveUri(this.baseUri, this.endpoint))
159+
.header("Cache-Control", "no-cache")
160+
.header("mcp-session-id", sessionId)
161+
.DELETE()
162+
.build();
163+
164+
return Mono.fromFuture(() -> this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())).then();
174165
}
175166

176167
@Override
@@ -238,7 +229,6 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
238229
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
239230
.whenComplete((response, throwable) -> {
240231
if (throwable != null) {
241-
logger.warn("Error sending message", throwable);
242232
sseSink.error(throwable);
243233
}
244234
else {
@@ -378,13 +368,12 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
378368
Mono.fromFuture(this.httpClient.sendAsync(request, this.toSendMessageBodySubscriber(responseEventSink))
379369
.whenComplete((response, throwable) -> {
380370
if (throwable != null) {
381-
logger.warn("Error sending message", throwable);
382371
responseEventSink.error(throwable);
383372
}
384373
else {
385374
logger.debug("SSE connection established successfully");
386375
}
387-
})).subscribe();
376+
})).onErrorComplete().subscribe();
388377

389378
}).flatMap(responseEvent -> {
390379
if (transportSession.markInitialized(
@@ -467,12 +456,12 @@ else if (statusCode == BAD_REQUEST) {
467456

468457
return Flux.<McpSchema.JSONRPCMessage>error(
469458
new RuntimeException("Failed to send message: " + responseEvent));
470-
}).flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))).onErrorResume(t -> {
459+
}).flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage))).onErrorComplete(t -> {
471460
// handle the error first
472461
this.handleException(t);
473462
// inform the caller of sendMessage
474463
messageSink.error(t);
475-
return Flux.empty();
464+
return true;
476465
}).doFinally(s -> {
477466
logger.debug("SendMessage finally: {}", s);
478467
Disposable ref = disposableRef.getAndSet(null);

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,14 +141,18 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
141141
var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null,
142142
new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
143143
error.getMessage(), null));
144-
return this.transport.sendMessage(errorResponse).then(Mono.empty());
145-
}).flatMap(this.transport::sendMessage).subscribe();
144+
return Mono.just(errorResponse);
145+
}).flatMap(this.transport::sendMessage).onErrorComplete(t -> {
146+
logger.warn("Issue sending response to the client, ", t);
147+
return true;
148+
}).subscribe();
146149
}
147150
else if (message instanceof McpSchema.JSONRPCNotification notification) {
148151
logger.debug("Received notification: {}", notification);
149-
handleIncomingNotification(notification)
150-
.doOnError(error -> logger.error("Error handling notification: {}", error.getMessage()))
151-
.subscribe();
152+
handleIncomingNotification(notification).onErrorComplete(t -> {
153+
logger.error("Error handling notification: {}", t.getMessage());
154+
return true;
155+
}).subscribe();
152156
}
153157
else {
154158
logger.warn("Received unknown message type: {}", message);
@@ -171,11 +175,7 @@ private Mono<McpSchema.JSONRPCResponse> handleIncomingRequest(McpSchema.JSONRPCR
171175
}
172176

173177
return handler.handle(request.params())
174-
.map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), result, null))
175-
.onErrorResume(error -> Mono.just(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(),
176-
null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
177-
error.getMessage(), null)))); // TODO: add error message
178-
// through the data field
178+
.map(result -> new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), result, null));
179179
});
180180
}
181181

mcp/src/test/java/io/modelcontextprotocol/client/HttpClientStreamableHttpAsyncClientResiliencyTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ protected McpClientTransport createMcpTransport() {
2222
}
2323

2424
@Test
25-
void testPingWithEaxctExceptionType() {
25+
void testPingWithExactExceptionType() {
2626
withClient(createMcpTransport(), mcpAsyncClient -> {
2727
StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
2828

mcp/src/test/resources/logback.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
<!-- Spec package -->
1818
<logger name="org.springframework.ai.mcp.spec" level="INFO"/>
19-
20-
<logger name="io.modelcontextprotocol.spec.McpClientSession" level="DEBUG"/>
21-
<logger name="io.modelcontextprotocol.client.transport" level="DEBUG"/>
2219

2320
<!-- Root logger -->
2421
<root level="INFO">

0 commit comments

Comments
 (0)