Skip to content

Commit 5f6185e

Browse files
committed
Introduce HttpRequest.Builder customizer for HttpClient-based transport
- Minor improvement: speed up HttpClientSseClientTransportTests by reusing the MCP Server container across tests. - Minor improvement: rename "messageSink" to "deliveredSink" in HttpClientStreamableHttpTransport#sendMessage
1 parent c3a0b18 commit 5f6185e

File tree

6 files changed

+526
-150
lines changed

6 files changed

+526
-150
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.URI;
8+
import java.net.http.HttpRequest;
9+
import org.reactivestreams.Publisher;
10+
import reactor.core.publisher.Mono;
11+
import reactor.core.scheduler.Schedulers;
12+
import reactor.util.annotation.Nullable;
13+
14+
/**
15+
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
16+
* Streamable HTTP transport.
17+
* <p>
18+
* When used in a non-blocking context, implementations MUST be non-blocking.
19+
*
20+
* @author Daniel Garnier-Moiroux
21+
*/
22+
public interface AsyncHttpRequestCustomizer {
23+
24+
Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
25+
@Nullable String body);
26+
27+
AsyncHttpRequestCustomizer NOOP = new Noop();
28+
29+
/**
30+
* Wrap a sync implementation in an async wrapper.
31+
* <p>
32+
* Do NOT wrap a blocking implementation for use in a non-blocking context. For a
33+
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
34+
*/
35+
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
36+
return (builder, method, uri, body) -> Mono.fromSupplier(() -> {
37+
customizer.customize(builder, method, uri, body);
38+
return builder;
39+
});
40+
}
41+
42+
class Noop implements AsyncHttpRequestCustomizer {
43+
44+
@Override
45+
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
46+
String body) {
47+
return Mono.just(builder);
48+
}
49+
50+
}
51+
52+
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 74 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
2+
* Copyright 2024 - 2025 the original author or authors.
33
*/
44
package io.modelcontextprotocol.client.transport;
55

@@ -102,6 +102,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
102102
*/
103103
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
104104

105+
/**
106+
* Customizer to modify requests before they are executed.
107+
*/
108+
private final AsyncHttpRequestCustomizer httpRequestCustomizer;
109+
105110
/**
106111
* Creates a new transport instance with default HTTP client and object mapper.
107112
* @param baseUri the base URI of the MCP server
@@ -172,18 +177,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
172177
* @param objectMapper the object mapper for JSON serialization/deserialization
173178
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
174179
*/
180+
@Deprecated(forRemoval = true)
175181
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
176182
String sseEndpoint, ObjectMapper objectMapper) {
183+
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
184+
}
185+
186+
/**
187+
* Creates a new transport instance with custom HTTP client builder, object mapper,
188+
* and headers.
189+
* @param httpClient the HTTP client to use
190+
* @param requestBuilder the HTTP request builder to use
191+
* @param baseUri the base URI of the MCP server
192+
* @param sseEndpoint the SSE endpoint path
193+
* @param objectMapper the object mapper for JSON serialization/deserialization
194+
* @param httpRequestCustomizer customizer for the requestBuilder before executing
195+
* requests
196+
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
197+
*/
198+
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
199+
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
177200
Assert.notNull(objectMapper, "ObjectMapper must not be null");
178201
Assert.hasText(baseUri, "baseUri must not be empty");
179202
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
180203
Assert.notNull(httpClient, "httpClient must not be null");
181204
Assert.notNull(requestBuilder, "requestBuilder must not be null");
205+
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
182206
this.baseUri = URI.create(baseUri);
183207
this.sseEndpoint = sseEndpoint;
184208
this.objectMapper = objectMapper;
185209
this.httpClient = httpClient;
186210
this.requestBuilder = requestBuilder;
211+
this.httpRequestCustomizer = httpRequestCustomizer;
187212
}
188213

189214
/**
@@ -213,6 +238,8 @@ public static class Builder {
213238
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
214239
.header("Content-Type", "application/json");
215240

241+
private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
242+
216243
/**
217244
* Creates a new builder instance.
218245
*/
@@ -310,31 +337,60 @@ public Builder objectMapper(ObjectMapper objectMapper) {
310337
return this;
311338
}
312339

340+
/**
341+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
342+
* executing them.
343+
* <p>
344+
* Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
345+
* context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
346+
* instead.
347+
* @param syncHttpRequestCustomizer the request customizer
348+
* @return this builder
349+
*/
350+
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
351+
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
352+
return this;
353+
}
354+
355+
/**
356+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
357+
* executing them.
358+
* <p>
359+
* Do NOT use a blocking implementation in a non-blocking context.
360+
* @param asyncHttpRequestCustomizer the request customizer
361+
* @return this builder
362+
*/
363+
public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
364+
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
365+
return this;
366+
}
367+
313368
/**
314369
* Builds a new {@link HttpClientSseClientTransport} instance.
315370
* @return a new transport instance
316371
*/
317372
public HttpClientSseClientTransport build() {
318373
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
319-
objectMapper);
374+
objectMapper, httpRequestCustomizer);
320375
}
321376

322377
}
323378

324379
@Override
325380
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
381+
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
326382

327-
return Mono.create(sink -> {
328-
329-
HttpRequest request = requestBuilder.copy()
330-
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
383+
return Mono.defer(() -> {
384+
var builder = requestBuilder.copy()
385+
.uri(uri)
331386
.header("Accept", "text/event-stream")
332387
.header("Cache-Control", "no-cache")
333-
.GET()
334-
.build();
335-
388+
.GET();
389+
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
390+
}).flatMap(requestBuilder -> Mono.create(sink -> {
336391
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
337-
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
392+
.sendAsync(requestBuilder.build(),
393+
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
338394
.exceptionallyCompose(e -> {
339395
sseSink.error(e);
340396
return CompletableFuture.failedFuture(e);
@@ -397,7 +453,7 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
397453
.subscribe();
398454

399455
this.sseSubscription.set(connection);
400-
});
456+
}));
401457
}
402458

403459
/**
@@ -453,13 +509,13 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
453509

454510
private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) {
455511
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
456-
final HttpRequest request = this.requestBuilder.copy()
457-
.uri(requestUri)
458-
.POST(HttpRequest.BodyPublishers.ofString(body))
459-
.build();
460-
461-
// TODO: why discard the body?
462-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
512+
return Mono.defer(() -> {
513+
var builder = this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body));
514+
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body));
515+
}).flatMap(customizedBuilder -> {
516+
var request = customizedBuilder.build();
517+
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
518+
});
463519
}
464520

465521
/**

0 commit comments

Comments
 (0)