Skip to content

Commit f69caad

Browse files
committed
Introduce HttpRequest.Builder customizer for HttpClient-based transport
1 parent c3a0b18 commit f69caad

File tree

6 files changed

+648
-275
lines changed

6 files changed

+648
-275
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.modelcontextprotocol.client.transport;
2+
3+
import java.net.URI;
4+
import java.net.http.HttpRequest;
5+
import org.reactivestreams.Publisher;
6+
import reactor.core.publisher.Mono;
7+
import reactor.core.scheduler.Schedulers;
8+
import reactor.util.annotation.Nullable;
9+
10+
/**
11+
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
12+
* Streamable HTTP transport.
13+
* <p>
14+
* When used in a non-blocking context, implementations MUST be non-blocking.
15+
*
16+
* @author Daniel Garnier-Moiroux
17+
*/
18+
public interface AsyncHttpRequestCustomizer {
19+
20+
Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
21+
@Nullable String body);
22+
23+
AsyncHttpRequestCustomizer NOOP = new Noop();
24+
25+
/**
26+
* Wrap a sync implementation in an async wrapper.
27+
* <p>
28+
* Do NOT wrap a blocking implementation for use in a non-blocking context. For a
29+
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
30+
*/
31+
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
32+
return (builder, method, uri, body) -> Mono.defer(() -> {
33+
customizer.customize(builder, method, uri, body);
34+
return Mono.just(builder);
35+
});
36+
}
37+
38+
class Noop implements AsyncHttpRequestCustomizer {
39+
40+
@Override
41+
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
42+
String body) {
43+
return Mono.just(builder);
44+
}
45+
46+
}
47+
48+
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 124 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
102102
*/
103103
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
104104

105+
/**
106+
* Customizer to modify requests before they are executed.
107+
*/
108+
private final AsyncHttpRequestCustomizer httpRequestCustomizer;
109+
105110
/**
106111
* Creates a new transport instance with default HTTP client and object mapper.
107112
* @param baseUri the base URI of the MCP server
@@ -172,18 +177,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
172177
* @param objectMapper the object mapper for JSON serialization/deserialization
173178
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
174179
*/
180+
@Deprecated(forRemoval = true)
175181
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
176182
String sseEndpoint, ObjectMapper objectMapper) {
183+
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
184+
}
185+
186+
/**
187+
* Creates a new transport instance with custom HTTP client builder, object mapper,
188+
* and headers.
189+
* @param httpClient the HTTP client to use
190+
* @param requestBuilder the HTTP request builder to use
191+
* @param baseUri the base URI of the MCP server
192+
* @param sseEndpoint the SSE endpoint path
193+
* @param objectMapper the object mapper for JSON serialization/deserialization
194+
* @param httpRequestCustomizer customizer for the requestBuilder before executing
195+
* requests
196+
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
197+
*/
198+
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
199+
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
177200
Assert.notNull(objectMapper, "ObjectMapper must not be null");
178201
Assert.hasText(baseUri, "baseUri must not be empty");
179202
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
180203
Assert.notNull(httpClient, "httpClient must not be null");
181204
Assert.notNull(requestBuilder, "requestBuilder must not be null");
205+
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
182206
this.baseUri = URI.create(baseUri);
183207
this.sseEndpoint = sseEndpoint;
184208
this.objectMapper = objectMapper;
185209
this.httpClient = httpClient;
186210
this.requestBuilder = requestBuilder;
211+
this.httpRequestCustomizer = httpRequestCustomizer;
187212
}
188213

189214
/**
@@ -213,6 +238,8 @@ public static class Builder {
213238
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
214239
.header("Content-Type", "application/json");
215240

241+
private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
242+
216243
/**
217244
* Creates a new builder instance.
218245
*/
@@ -310,94 +337,124 @@ public Builder objectMapper(ObjectMapper objectMapper) {
310337
return this;
311338
}
312339

340+
/**
341+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
342+
* executing them.
343+
* <p>
344+
* Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
345+
* context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
346+
* instead.
347+
* @param syncHttpRequestCustomizer the request customizer
348+
* @return this builder
349+
*/
350+
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
351+
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
352+
return this;
353+
}
354+
355+
/**
356+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
357+
* executing them.
358+
* <p>
359+
* Do NOT use a blocking implementation in a non-blocking context.
360+
* @param asyncHttpRequestCustomizer the request customizer
361+
* @return this builder
362+
*/
363+
public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
364+
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
365+
return this;
366+
}
367+
313368
/**
314369
* Builds a new {@link HttpClientSseClientTransport} instance.
315370
* @return a new transport instance
316371
*/
317372
public HttpClientSseClientTransport build() {
318373
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
319-
objectMapper);
374+
objectMapper, httpRequestCustomizer);
320375
}
321376

322377
}
323378

324379
@Override
325380
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
381+
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
326382

327-
return Mono.create(sink -> {
328-
329-
HttpRequest request = requestBuilder.copy()
330-
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
383+
return Mono
384+
.just(requestBuilder.copy()
385+
.uri(uri)
331386
.header("Accept", "text/event-stream")
332387
.header("Cache-Control", "no-cache")
333-
.GET()
334-
.build();
335-
336-
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
337-
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
338-
.exceptionallyCompose(e -> {
339-
sseSink.error(e);
340-
return CompletableFuture.failedFuture(e);
341-
}))
342-
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
343-
.flatMap(responseEvent -> {
344-
if (isClosing) {
345-
return Mono.empty();
346-
}
347-
348-
int statusCode = responseEvent.responseInfo().statusCode();
388+
.GET())
389+
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null)))
390+
.map(HttpRequest.Builder::build)
391+
.flatMap(request -> Mono.create(sink -> {
392+
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
393+
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
394+
.exceptionallyCompose(e -> {
395+
sseSink.error(e);
396+
return CompletableFuture.failedFuture(e);
397+
}))
398+
.map(responseEvent -> (ResponseSubscribers.SseResponseEvent) responseEvent)
399+
.flatMap(responseEvent -> {
400+
if (isClosing) {
401+
return Mono.empty();
402+
}
349403

350-
if (statusCode >= 200 && statusCode < 300) {
351-
try {
352-
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
353-
String messageEndpointUri = responseEvent.sseEvent().data();
354-
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
404+
int statusCode = responseEvent.responseInfo().statusCode();
405+
406+
if (statusCode >= 200 && statusCode < 300) {
407+
try {
408+
if (ENDPOINT_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
409+
String messageEndpointUri = responseEvent.sseEvent().data();
410+
if (this.messageEndpointSink.tryEmitValue(messageEndpointUri).isSuccess()) {
411+
sink.success();
412+
return Flux.empty(); // No further processing
413+
// needed
414+
}
415+
else {
416+
sink.error(new McpError("Failed to handle SSE endpoint event"));
417+
}
418+
}
419+
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
420+
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
421+
responseEvent.sseEvent().data());
355422
sink.success();
356-
return Flux.empty(); // No further processing needed
423+
return Flux.just(message);
357424
}
358425
else {
359-
sink.error(new McpError("Failed to handle SSE endpoint event"));
426+
logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
427+
sink.success();
360428
}
361429
}
362-
else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
363-
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper,
364-
responseEvent.sseEvent().data());
365-
sink.success();
366-
return Flux.just(message);
367-
}
368-
else {
369-
logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
370-
sink.success();
430+
catch (IOException e) {
431+
logger.error("Error processing SSE event", e);
432+
sink.error(new McpError("Error processing SSE event"));
371433
}
372434
}
373-
catch (IOException e) {
374-
logger.error("Error processing SSE event", e);
375-
sink.error(new McpError("Error processing SSE event"));
435+
return Flux.<McpSchema.JSONRPCMessage>error(
436+
new RuntimeException("Failed to send message: " + responseEvent));
437+
438+
})
439+
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
440+
.onErrorComplete(t -> {
441+
if (!isClosing) {
442+
logger.warn("SSE stream observed an error", t);
443+
sink.error(t);
376444
}
377-
}
378-
return Flux.<McpSchema.JSONRPCMessage>error(
379-
new RuntimeException("Failed to send message: " + responseEvent));
380-
381-
})
382-
.flatMap(jsonRpcMessage -> handler.apply(Mono.just(jsonRpcMessage)))
383-
.onErrorComplete(t -> {
384-
if (!isClosing) {
385-
logger.warn("SSE stream observed an error", t);
386-
sink.error(t);
387-
}
388-
return true;
389-
})
390-
.doFinally(s -> {
391-
Disposable ref = this.sseSubscription.getAndSet(null);
392-
if (ref != null && !ref.isDisposed()) {
393-
ref.dispose();
394-
}
395-
})
396-
.contextWrite(sink.contextView())
397-
.subscribe();
445+
return true;
446+
})
447+
.doFinally(s -> {
448+
Disposable ref = this.sseSubscription.getAndSet(null);
449+
if (ref != null && !ref.isDisposed()) {
450+
ref.dispose();
451+
}
452+
})
453+
.contextWrite(sink.contextView())
454+
.subscribe();
398455

399-
this.sseSubscription.set(connection);
400-
});
456+
this.sseSubscription.set(connection);
457+
}));
401458
}
402459

403460
/**
@@ -453,13 +510,10 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
453510

454511
private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) {
455512
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
456-
final HttpRequest request = this.requestBuilder.copy()
457-
.uri(requestUri)
458-
.POST(HttpRequest.BodyPublishers.ofString(body))
459-
.build();
460-
461-
// TODO: why discard the body?
462-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
513+
return Mono.just(this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body)))
514+
.flatMap(builder -> Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body)))
515+
.map(HttpRequest.Builder::build)
516+
.flatMap(request -> Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString())));
463517
}
464518

465519
/**

0 commit comments

Comments
 (0)