Skip to content

Commit 4b034f3

Browse files
committed
Add builder for WebClientStreamableHttpTransport
1 parent 1680a27 commit 4b034f3

File tree

4 files changed

+74
-16
lines changed

4 files changed

+74
-16
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 71 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
1111
import io.modelcontextprotocol.spec.McpTransportSession;
1212
import io.modelcontextprotocol.spec.McpTransportStream;
13+
import io.modelcontextprotocol.util.Assert;
1314
import org.reactivestreams.Publisher;
1415
import org.slf4j.Logger;
1516
import org.slf4j.LoggerFactory;
@@ -38,6 +39,8 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
3839

3940
private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);
4041

42+
private static final String DEFAULT_ENDPOINT = "/mcp";
43+
4144
/**
4245
* Event type for JSON-RPC messages received through the SSE connection. The server
4346
* sends messages with this event type to transmit JSON-RPC protocol data.
@@ -64,7 +67,7 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {
6467
private final AtomicReference<Consumer<Throwable>> exceptionHandler = new AtomicReference<>();
6568

6669
// TODO: builder
67-
public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
70+
private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Builder webClientBuilder,
6871
String endpoint, boolean resumableStreams, boolean openConnectionOnStartup) {
6972
this.objectMapper = objectMapper;
7073
this.webClient = webClientBuilder.build();
@@ -74,15 +77,8 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
7477
this.activeSession.set(createTransportSession());
7578
}
7679

77-
private DefaultMcpTransportSession createTransportSession() {
78-
Supplier<Publisher<Void>> onClose = () -> {
79-
DefaultMcpTransportSession transportSession = this.activeSession.get();
80-
return transportSession.sessionId().isEmpty() ? Mono.empty()
81-
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
82-
httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
83-
}).retrieve().toBodilessEntity().doOnError(e -> logger.info("Got response {}", e)).then();
84-
};
85-
return new DefaultMcpTransportSession(onClose);
80+
public static Builder builder(WebClient.Builder webClientBuilder) {
81+
return new Builder(webClientBuilder);
8682
}
8783

8884
@Override
@@ -97,6 +93,17 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
9793
});
9894
}
9995

96+
private DefaultMcpTransportSession createTransportSession() {
97+
Supplier<Publisher<Void>> onClose = () -> {
98+
DefaultMcpTransportSession transportSession = this.activeSession.get();
99+
return transportSession.sessionId().isEmpty() ? Mono.empty()
100+
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
101+
httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
102+
}).retrieve().toBodilessEntity().doOnError(e -> logger.info("Got response {}", e)).then();
103+
};
104+
return new DefaultMcpTransportSession(onClose);
105+
}
106+
100107
@Override
101108
public void setExceptionHandler(Consumer<Throwable> handler) {
102109
logger.debug("Exception handler registered");
@@ -385,4 +392,58 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
385392
}
386393
}
387394

395+
public static class Builder {
396+
397+
private ObjectMapper objectMapper;
398+
399+
private WebClient.Builder webClientBuilder;
400+
401+
private String endpoint = DEFAULT_ENDPOINT;
402+
403+
private boolean resumableStreams = true;
404+
405+
private boolean openConnectionOnStartup = false;
406+
407+
private Builder(WebClient.Builder webClientBuilder) {
408+
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
409+
this.webClientBuilder = webClientBuilder;
410+
}
411+
412+
public Builder objectMapper(ObjectMapper objectMapper) {
413+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
414+
this.objectMapper = objectMapper;
415+
return this;
416+
}
417+
418+
public Builder webClientBuilder(WebClient.Builder webClientBuilder) {
419+
Assert.notNull(webClientBuilder, "WebClient.Builder must not be null");
420+
this.webClientBuilder = webClientBuilder;
421+
return this;
422+
}
423+
424+
public Builder endpoint(String endpoint) {
425+
Assert.hasText(endpoint, "endpoint must be a non-empty String");
426+
this.endpoint = endpoint;
427+
return this;
428+
}
429+
430+
public Builder resumableStreams(boolean resumableStreams) {
431+
this.resumableStreams = resumableStreams;
432+
return this;
433+
}
434+
435+
public Builder openConnectionOnStartup(boolean openConnectionOnStartup) {
436+
this.openConnectionOnStartup = openConnectionOnStartup;
437+
return this;
438+
}
439+
440+
public WebClientStreamableHttpTransport build() {
441+
ObjectMapper objectMapper = this.objectMapper != null ? this.objectMapper : new ObjectMapper();
442+
443+
return new WebClientStreamableHttpTransport(objectMapper, this.webClientBuilder, endpoint, resumableStreams,
444+
openConnectionOnStartup);
445+
}
446+
447+
}
448+
388449
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientResiliencyTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,7 @@ public class WebClientStreamableHttpAsyncClientResiliencyTests extends AbstractM
1111

1212
@Override
1313
protected McpClientTransport createMcpTransport() {
14-
return new WebClientStreamableHttpTransport(new ObjectMapper(), WebClient.builder().baseUrl(host), "/mcp", true,
15-
false);
14+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
1615
}
1716

1817
}

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpAsyncClientTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ public class WebClientStreamableHttpAsyncClientTests extends AbstractMcpAsyncCli
2424

2525
@Override
2626
protected McpClientTransport createMcpTransport() {
27-
return new WebClientStreamableHttpTransport(new ObjectMapper(), WebClient.builder().baseUrl(host), "/mcp", true,
28-
false);
27+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
2928
}
3029

3130
@Override

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/WebClientStreamableHttpSyncClientTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ public class WebClientStreamableHttpSyncClientTests extends AbstractMcpSyncClien
2323

2424
@Override
2525
protected McpClientTransport createMcpTransport() {
26-
return new WebClientStreamableHttpTransport(new ObjectMapper(), WebClient.builder().baseUrl(host), "/mcp", true,
27-
false);
26+
return WebClientStreamableHttpTransport.builder(WebClient.builder().baseUrl(host)).build();
2827
}
2928

3029
@Override

0 commit comments

Comments
 (0)