Skip to content

Commit e555da6

Browse files
committed
wip: HttpRequest.Builder customizer
1 parent 95df67e commit e555da6

File tree

4 files changed

+411
-268
lines changed

4 files changed

+411
-268
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package io.modelcontextprotocol.client.transport;
2+
3+
import java.net.URI;
4+
import java.net.http.HttpRequest;
5+
import org.reactivestreams.Publisher;
6+
import reactor.core.publisher.Mono;
7+
import reactor.util.annotation.Nullable;
8+
9+
/**
10+
* Customize {@link HttpRequest.Builder} before sending out SSE or Streamable HTTP
11+
* transport.
12+
* <p>
13+
* When used in a non-blocking context, implementations MUST be non-blocking.
14+
*/
15+
public interface AsyncHttpRequestCustomizer {
16+
17+
Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
18+
@Nullable String body);
19+
20+
AsyncHttpRequestCustomizer NOOP = new Noop();
21+
22+
/**
23+
* Wrap a sync implementation in an async wrapper.
24+
* <p>
25+
* Do NOT use in a non-blocking context.
26+
*/
27+
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
28+
return (builder, method, uri, body) -> Mono.defer(() -> {
29+
customizer.customize(builder, method, uri, body);
30+
return Mono.just(builder);
31+
});
32+
}
33+
34+
class Noop implements AsyncHttpRequestCustomizer {
35+
36+
@Override
37+
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
38+
String body) {
39+
return Mono.just(builder);
40+
}
41+
42+
}
43+
44+
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 110 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ public class HttpClientSseClientTransport implements McpClientTransport {
102102
*/
103103
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
104104

105+
// TODO
106+
private final AsyncHttpRequestCustomizer httpRequestCustomizer;
107+
105108
/**
106109
* Creates a new transport instance with default HTTP client and object mapper.
107110
* @param baseUri the base URI of the MCP server
@@ -172,18 +175,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
172175
* @param objectMapper the object mapper for JSON serialization/deserialization
173176
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
174177
*/
178+
@Deprecated(forRemoval = true)
175179
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
176180
String sseEndpoint, ObjectMapper objectMapper) {
181+
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
182+
}
183+
184+
/**
185+
* Creates a new transport instance with custom HTTP client builder, object mapper,
186+
* and headers.
187+
* @param httpClient the HTTP client to use
188+
* @param requestBuilder the HTTP request builder to use
189+
* @param baseUri the base URI of the MCP server
190+
* @param sseEndpoint the SSE endpoint path
191+
* @param objectMapper the object mapper for JSON serialization/deserialization
192+
* @param httpRequestCustomizer customizer for the requestBuilder before sending
193+
* requests
194+
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
195+
*/
196+
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
197+
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
177198
Assert.notNull(objectMapper, "ObjectMapper must not be null");
178199
Assert.hasText(baseUri, "baseUri must not be empty");
179200
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
180201
Assert.notNull(httpClient, "httpClient must not be null");
181202
Assert.notNull(requestBuilder, "requestBuilder must not be null");
203+
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
182204
this.baseUri = URI.create(baseUri);
183205
this.sseEndpoint = sseEndpoint;
184206
this.objectMapper = objectMapper;
185207
this.httpClient = httpClient;
186208
this.requestBuilder = requestBuilder;
209+
this.httpRequestCustomizer = httpRequestCustomizer;
187210
}
188211

189212
/**
@@ -213,6 +236,8 @@ public static class Builder {
213236
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
214237
.header("Content-Type", "application/json");
215238

239+
private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
240+
216241
/**
217242
* Creates a new builder instance.
218243
*/
@@ -310,96 +335,111 @@ public Builder objectMapper(ObjectMapper objectMapper) {
310335
return this;
311336
}
312337

338+
/**
339+
* In reactive, DONT USE THIS. Use AsyncHttpRequestCustomizer.
340+
*/
341+
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
342+
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
343+
return this;
344+
}
345+
346+
public Builder httpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
347+
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
348+
return this;
349+
}
350+
313351
/**
314352
* Builds a new {@link HttpClientSseClientTransport} instance.
315353
* @return a new transport instance
316354
*/
317355
public HttpClientSseClientTransport build() {
318356
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
319-
objectMapper);
357+
objectMapper, httpRequestCustomizer);
320358
}
321359

322360
}
323361

324362
@Override
325363
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
364+
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
326365

327-
return Mono.create(sink -> {
328-
329-
HttpRequest request = requestBuilder.copy()
330-
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
366+
return Mono
367+
.just(requestBuilder.copy()
368+
.uri(uri)
331369
.header("Accept", "text/event-stream")
332370
.header("Cache-Control", "no-cache")
333-
.GET()
334-
.build();
335-
336-
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
337-
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
338-
.exceptionallyCompose(e -> {
339-
sseSink.error(e);
340-
return CompletableFuture.failedFuture(e);
341-
}))
342-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
343-
.flatMap(responseEvent -> {
344-
if (isClosing) {
345-
return Mono.empty();
346-
}
347-
348-
int statusCode = responseEvent.responseInfo().statusCode();
371+
.GET())
372+
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null)))
373+
.map(HttpRequest.Builder::build)
374+
.flatMap(request -> Mono.create(sink -> {
375+
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
376+
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
377+
.exceptionallyCompose(e -> {
378+
sseSink.error(e);
379+
return CompletableFuture.failedFuture(e);
380+
}))
381+
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
382+
.flatMap(responseEvent -> {
383+
if (isClosing) {
384+
return Mono.empty();
385+
}
349386

350-
if (statusCode >= 200 && statusCode < 300) {
351-
try {
352-
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
353-
String messageEndpointUri = responseEvent.sseEvent().data();
354-
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
387+
int statusCode = responseEvent.responseInfo().statusCode();
388+
389+
if (statusCode >= 200 && statusCode < 300) {
390+
try {
391+
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
392+
String messageEndpointUri = responseEvent.sseEvent().data();
393+
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
394+
sink.success();
395+
return Flux.empty(); // No further processing
396+
// needed
397+
}
398+
else {
399+
sink.error(new McpError("Failed to handle SSE endpoint event"));
400+
}
401+
}
402+
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
403+
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
404+
responseEvent.sseEvent().data());
355405
sink.success();
356-
return Flux.empty(); // No further processing needed
406+
return Flux.just(message);
357407
}
358408
else {
359-
sink.error(new McpError("Failed to handle SSE endpoint event"));
409+
logger.error("Received unrecognized SSE event type: {}",
410+
responseEvent.sseEvent().event());
411+
sink.error(new McpError("Received unrecognized SSE event type: "
412+
+ responseEvent.sseEvent().event()));
360413
}
361414
}
362-
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
363-
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
364-
responseEvent.sseEvent().data());
365-
sink.success();
366-
return Flux.just(message);
367-
}
368-
else {
369-
logger.error("Received unrecognized SSE event type: {}",
370-
responseEvent.sseEvent().event());
371-
sink.error(new McpError(
372-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
415+
catch (IOException e) {
416+
logger.error("Error processing SSE event", e);
417+
sink.error(new McpError("Error processing SSE event"));
373418
}
374419
}
375-
catch (IOException e) {
376-
logger.error("Error processing SSE event", e);
377-
sink.error(new McpError("Error processing SSE event"));
420+
return Flux.<McpSchema.JSONRPCMessage>error(
421+
new RuntimeException("Failed to send message: " + responseEvent));
422+
423+
})
424+
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
425+
.onErrorComplete(t -> {
426+
if (!isClosing) {
427+
logger.warn("SSE stream observed an error", t);
428+
sink.error(t);
378429
}
379-
}
380-
return Flux.<McpSchema.JSONRPCMessage>error(
381-
new RuntimeException("Failed to send message: " + responseEvent));
382-
383-
})
384-
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
385-
.onErrorComplete(t -> {
386-
if (!isClosing) {
387-
logger.warn("SSE stream observed an error", t);
388-
sink.error(t);
389-
}
390-
return true;
391-
})
392-
.doFinally(s -> {
393-
Disposable ref = this.sseSubscription.getAndSet(null);
394-
if (ref != null && !ref.isDisposed()) {
395-
ref.dispose();
396-
}
397-
})
398-
.contextWrite(sink.contextView())
399-
.subscribe();
430+
return true;
431+
})
432+
.doFinally(s -> {
433+
Disposable ref = this.sseSubscription.getAndSet(null);
434+
if (ref != null && !ref.isDisposed()) {
435+
ref.dispose();
436+
}
437+
})
438+
.contextWrite(sink.contextView())
439+
.subscribe();
400440

401-
this.sseSubscription.set(connection);
402-
});
441+
this.sseSubscription.set(connection);
442+
}));
403443
}
404444

405445
/**
@@ -455,13 +495,11 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
455495

456496
private Mono<HttpResponse<Void>> sendHttpPost(final String endpoint, final String body) {
457497
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
458-
final HttpRequest request = this.requestBuilder.copy()
459-
.uri(requestUri)
460-
.POST(HttpRequest.BodyPublishers.ofString(body))
461-
.build();
462-
463-
// TODO: why discard the body?
464-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()));
498+
return Mono.just(this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body)))
499+
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body)))
500+
.map(HttpRequest.Builder::build)
501+
// TODO: why discard the body?
502+
.flatMap(request -> Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding())));
465503
}
466504

467505
/**

0 commit comments

Comments
 (0)