Skip to content

Commit 6170283

Browse files
committed
Remove union-like type, remove unnecessary code, centralize deserialization
Signed-off-by: Dariusz Jędrzejczyk <dariusz.jedrzejczyk@broadcom.com>
1 parent 9afbb8e commit 6170283

File tree

3 files changed

+104
-122
lines changed

3 files changed

+104
-122
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,9 @@ public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> h
338338
.exceptionallyCompose(e -> {
339339
sseSink.error(e);
340340
return CompletableFuture.failedFuture(e);
341-
})).flatMap(responseEvent -> {
341+
}))
342+
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
343+
.flatMap(responseEvent -> {
342344
if (isClosing) {
343345
return Mono.empty();
344346
}
@@ -378,22 +380,23 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
378380
return Flux.<McpSchema.JSONRPCMessage>error(
379381
new RuntimeException("Failed to send message: " + responseEvent));
380382

381-
}).flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage))).onErrorResume(t -> {
383+
})
384+
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
385+
.onErrorComplete(t -> {
382386
if (!isClosing) {
383-
logger.error("SSE connection error", t);
387+
logger.warn("SSE stream observed an error", t);
384388
sink.error(t);
385389
}
386-
return Mono.empty();
387-
388-
}).onErrorComplete(t -> {
389-
logger.warn("SSE stream observed an error", t);
390390
return true;
391-
}).doFinally(s -> {
391+
})
392+
.doFinally(s -> {
392393
Disposable ref = this.sseSubscription.getAndSet(null);
393394
if (ref != null && !ref.isDisposed()) {
394395
ref.dispose();
395396
}
396-
}).contextWrite(sink.contextView()).subscribe();
397+
})
398+
.contextWrite(sink.contextView())
399+
.subscribe();
397400

398401
this.sseSubscription.set(connection);
399402
});
@@ -417,28 +420,19 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
417420
return Mono.empty();
418421
}
419422

420-
try {
421-
return this.serializeMessage(message)
422-
.flatMap(body -> sendHttpPost(messageEndpointUri, body))
423-
.doOnNext(response -> {
424-
if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202
425-
&& response.statusCode() != 206) {
426-
logger.error("Error sending message: {}", response.statusCode());
427-
}
428-
})
429-
.doOnError(error -> {
430-
if (!isClosing) {
431-
logger.error("Error sending message: {}", error.getMessage());
432-
}
433-
})
434-
.then();
435-
}
436-
catch (Exception e) {
437-
if (!isClosing) {
438-
return Mono.error(new RuntimeException("Failed to serialize message", e));
439-
}
440-
return Mono.empty();
441-
}
423+
return this.serializeMessage(message)
424+
.flatMap(body -> sendHttpPost(messageEndpointUri, body))
425+
.doOnNext(response -> {
426+
if (response.statusCode() != 200 && response.statusCode() != 201 && response.statusCode() != 202
427+
&& response.statusCode() != 206) {
428+
logger.error("Error sending message: {}", response.statusCode());
429+
}
430+
})
431+
.doOnError(error -> {
432+
if (!isClosing) {
433+
logger.error("Error sending message: {}", error.getMessage());
434+
}
435+
});
442436
}).then();
443437

444438
}
@@ -449,6 +443,7 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
449443
return Mono.just(objectMapper.writeValueAsString(message));
450444
}
451445
catch (IOException e) {
446+
// TODO: why McpError and not RuntimeException?
452447
return Mono.error(new McpError("Failed to serialize message"));
453448
}
454449
});
@@ -461,6 +456,7 @@ private Mono<HttpResponse<Void>> sendHttpPost(final String endpoint, final Strin
461456
.POST(HttpRequest.BodyPublishers.ofString(body))
462457
.build();
463458

459+
// TODO: why discard the body?
464460
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()));
465461
}
466462

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 41 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import java.time.Duration;
1414
import java.util.List;
1515
import java.util.Optional;
16-
import java.util.concurrent.CompletableFuture;
1716
import java.util.concurrent.atomic.AtomicReference;
1817
import java.util.function.Consumer;
1918
import java.util.function.Function;
@@ -60,7 +59,7 @@
6059
* "https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse">"HTTP
6160
* with SSE" transport</a>. In order to communicate over the phased-out
6261
* <code>2024-11-05</code> protocol, use {@link HttpClientSseClientTransport} or
63-
* {@link WebFluxSseClientTransport}.
62+
* {@code WebFluxSseClientTransport}.
6463
* </p>
6564
*
6665
* @author Christian Tzolov
@@ -234,7 +233,9 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
234233
else {
235234
logger.debug("SSE connection established successfully");
236235
}
237-
})).flatMap(responseEvent -> {
236+
}))
237+
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
238+
.flatMap(responseEvent -> {
238239
int statusCode = responseEvent.responseInfo().statusCode();
239240

240241
if (statusCode >= 200 && statusCode < 300) {
@@ -319,12 +320,11 @@ private BodyHandler<Void> toSendMessageBodySubscriber(FluxSink<ResponseEvent> si
319320
else if (contentType.contains(APPLICATION_JSON)) {
320321
// For JSON responses and others, use string subscriber
321322
logger.debug("Received response, using string subscriber");
322-
return ResponseSubscribers.jsonoBodySubscriber(responseInfo, sink);
323+
return ResponseSubscribers.aggregateBodySubscriber(responseInfo, sink);
323324
}
324325

325326
logger.debug("Received Bodyless response, using discarding subscriber");
326-
// return HttpResponse.BodySubscribers.discarding();
327-
return ResponseSubscribers.bodylessBodySubscriber(responseInfo, sink);
327+
return ResponseSubscribers.bodilessBodySubscriber(responseInfo, sink);
328328
};
329329

330330
return responseBodyHandler;
@@ -404,34 +404,42 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
404404
return Flux.empty();
405405
}
406406
else if (contentType.contains(TEXT_EVENT_STREAM)) {
407-
try {
408-
// We don't support batching ATM and probably won't since the
409-
// next version considers removing it.
410-
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper,
411-
responseEvent.sseEvent().data());
412-
413-
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
414-
.of(Optional.ofNullable(responseEvent.sseEvent().id()), List.of(message));
415-
416-
McpTransportStream<Disposable> sessionStream = new DefaultMcpTransportStream<>(
417-
this.resumableStreams, this::reconnect);
418-
419-
logger.debug("Connected stream {}", sessionStream.streamId());
420-
421-
messageSink.success();
422-
423-
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
424-
425-
}
426-
catch (IOException ioException) {
427-
return Flux.<McpSchema.JSONRPCMessage>error(
428-
new McpError("Error parsing JSON-RPC message: " + responseEvent.sseEvent().data()));
429-
}
407+
return Flux.just(((ResponseSubscribers.SseResponseEvent) responseEvent).sseEvent())
408+
.flatMap(sseEvent -> {
409+
try {
410+
// We don't support batching ATM and probably won't
411+
// since the
412+
// next version considers removing it.
413+
McpSchema.JSONRPCMessage message = McpSchema
414+
.deserializeJsonRpcMessage(this.objectMapper, sseEvent.data());
415+
416+
Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> idWithMessages = Tuples
417+
.of(Optional.ofNullable(sseEvent.id()), List.of(message));
418+
419+
McpTransportStream<Disposable> sessionStream = new DefaultMcpTransportStream<>(
420+
this.resumableStreams, this::reconnect);
421+
422+
logger.debug("Connected stream {}", sessionStream.streamId());
423+
424+
messageSink.success();
425+
426+
return Flux.from(sessionStream.consumeSseStream(Flux.just(idWithMessages)));
427+
}
428+
catch (IOException ioException) {
429+
return Flux.<McpSchema.JSONRPCMessage>error(
430+
new McpError("Error parsing JSON-RPC message: " + sseEvent.data()));
431+
}
432+
});
430433
}
431434
else if (contentType.contains(APPLICATION_JSON)) {
432-
McpSchema.JSONRPCMessage jsonRpcResponse = responseEvent.jsonRpcMessage();
433435
messageSink.success();
434-
return Flux.just(jsonRpcResponse); // ???
436+
String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
437+
try {
438+
return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
439+
}
440+
catch (IOException e) {
441+
return Mono.error(e);
442+
}
435443
}
436444
logger.warn("Unknown media type {} returned for POST in session {}", contentType,
437445
sessionRepresentation);
@@ -489,9 +497,9 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
489497
*/
490498
public static class Builder {
491499

492-
private ObjectMapper objectMapper;
500+
private final String baseUri;
493501

494-
private String baseUri;
502+
private ObjectMapper objectMapper;
495503

496504
private HttpClient.Builder clientBuilder = HttpClient.newBuilder()
497505
.version(HttpClient.Version.HTTP_1_1)

0 commit comments

Comments
 (0)