Skip to content

Commit 0bcdbee

Browse files
committed
Address review and format
1 parent 9b0b02a commit 0bcdbee

File tree

6 files changed

+58
-58
lines changed

6 files changed

+58
-58
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import io.modelcontextprotocol.spec.McpClientTransport;
88
import io.modelcontextprotocol.spec.McpError;
99
import io.modelcontextprotocol.spec.McpSchema;
10-
import io.modelcontextprotocol.spec.McpSessionNotFoundException;
10+
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
1111
import io.modelcontextprotocol.spec.McpTransportSession;
1212
import io.modelcontextprotocol.spec.McpTransportStream;
1313
import org.reactivestreams.Publisher;
@@ -77,16 +77,10 @@ public WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bui
7777
private DefaultMcpTransportSession createTransportSession() {
7878
Supplier<Publisher<Void>> onClose = () -> {
7979
DefaultMcpTransportSession transportSession = this.activeSession.get();
80-
return transportSession.sessionId().isEmpty() ? Mono.empty() : webClient
81-
.delete()
82-
.uri(this.endpoint)
83-
.headers(httpHeaders -> {
84-
httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
85-
})
86-
.retrieve()
87-
.toBodilessEntity()
88-
.doOnError(e -> logger.info("Got response {}", e))
89-
.then();
80+
return transportSession.sessionId().isEmpty() ? Mono.empty()
81+
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
82+
httpHeaders.add("mcp-session-id", transportSession.sessionId().get());
83+
}).retrieve().toBodilessEntity().doOnError(e -> logger.info("Got response {}", e)).then();
9084
};
9185
return new DefaultMcpTransportSession(onClose);
9286
}
@@ -111,7 +105,7 @@ public void setExceptionHandler(Consumer<Throwable> handler) {
111105

112106
private void handleException(Throwable t) {
113107
logger.debug("Handling exception for session {}", sessionIdOrPlaceholder(this.activeSession.get()), t);
114-
if (t instanceof McpSessionNotFoundException) {
108+
if (t instanceof McpTransportSessionNotFoundException) {
115109
McpTransportSession<?> invalidSession = this.activeSession.getAndSet(createTransportSession());
116110
logger.warn("Server does not recognize session {}. Invalidating.", invalidSession.sessionId());
117111
invalidSession.close();
@@ -291,7 +285,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
291285
private static Flux<McpSchema.JSONRPCMessage> mcpSessionNotFoundError(String sessionRepresentation) {
292286
logger.warn("Session {} was not found on the MCP server", sessionRepresentation);
293287
// inform the stream/connection subscriber
294-
return Flux.error(new McpSessionNotFoundException(sessionRepresentation));
288+
return Flux.error(new McpTransportSessionNotFoundException(sessionRepresentation));
295289
}
296290

297291
private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, String sessionRepresentation) {
@@ -316,7 +310,7 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
316310
// invalidate the session
317311
// https://github.com/modelcontextprotocol/typescript-sdk/issues/389
318312
if (responseException.getStatusCode().isSameCodeAs(HttpStatus.BAD_REQUEST)) {
319-
return Mono.error(new McpSessionNotFoundException(sessionRepresentation, toPropagate));
313+
return Mono.error(new McpTransportSessionNotFoundException(sessionRepresentation, toPropagate));
320314
}
321315
return Mono.empty();
322316
}).flux();

mcp-test/src/main/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,11 @@ void testCallTool() {
199199
void testSessionClose() {
200200
withClient(createMcpTransport(), mcpAsyncClient -> {
201201
StepVerifier.create(mcpAsyncClient.initialize()).expectNextCount(1).verifyComplete();
202-
// In case of Streamable HTTP this call should issue a HTTP DELETE request invalidating the session
202+
// In case of Streamable HTTP this call should issue a HTTP DELETE request
203+
// invalidating the session
203204
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
204-
// The next use should immediately re-initialize with no issue and send the request without any broken connections.
205+
// The next use should immediately re-initialize with no issue and send the
206+
// request without any broken connections.
205207
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
206208
});
207209
}

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
3131
import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest;
3232
import io.modelcontextprotocol.spec.McpSchema.Root;
33-
import io.modelcontextprotocol.spec.McpSessionNotFoundException;
33+
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
3434
import io.modelcontextprotocol.util.Assert;
3535
import io.modelcontextprotocol.util.Utils;
3636
import org.slf4j.Logger;
@@ -98,7 +98,7 @@ public class McpAsyncClient {
9898
public static final TypeReference<LoggingMessageNotification> LOGGING_MESSAGE_NOTIFICATION_TYPE_REF = new TypeReference<>() {
9999
};
100100

101-
private final AtomicReference<Initialization> initialization = new AtomicReference<>();
101+
private final AtomicReference<Initialization> initializationRef = new AtomicReference<>();
102102

103103
/**
104104
* The max timeout to await for the client-server connection to be initialized.
@@ -240,22 +240,27 @@ public class McpAsyncClient {
240240

241241
private void handleException(Throwable t) {
242242
logger.warn("Handling exception", t);
243-
if (t instanceof McpSessionNotFoundException) {
244-
Initialization previous = this.initialization.getAndSet(null);
243+
if (t instanceof McpTransportSessionNotFoundException) {
244+
Initialization previous = this.initializationRef.getAndSet(null);
245245
if (previous != null) {
246246
previous.close();
247247
}
248248
withSession("re-initializing", result -> Mono.empty()).subscribe();
249249
}
250250
}
251251

252+
private McpSchema.InitializeResult currentInitializationResult() {
253+
Initialization current = this.initializationRef.get();
254+
McpSchema.InitializeResult initializeResult = current != null ? current.result.get() : null;
255+
return initializeResult;
256+
}
257+
252258
/**
253259
* Get the server capabilities that define the supported features and functionality.
254260
* @return The server capabilities
255261
*/
256262
public McpSchema.ServerCapabilities getServerCapabilities() {
257-
Initialization current = this.initialization.get();
258-
McpSchema.InitializeResult initializeResult = current != null ? current.result.get() : null;
263+
McpSchema.InitializeResult initializeResult = currentInitializationResult();
259264
return initializeResult != null ? initializeResult.capabilities() : null;
260265
}
261266

@@ -265,8 +270,7 @@ public McpSchema.ServerCapabilities getServerCapabilities() {
265270
* @return The server instructions
266271
*/
267272
public String getServerInstructions() {
268-
Initialization current = this.initialization.get();
269-
McpSchema.InitializeResult initializeResult = current != null ? current.result.get() : null;
273+
McpSchema.InitializeResult initializeResult = currentInitializationResult();
270274
return initializeResult != null ? initializeResult.instructions() : null;
271275
}
272276

@@ -275,8 +279,7 @@ public String getServerInstructions() {
275279
* @return The server implementation details
276280
*/
277281
public McpSchema.Implementation getServerInfo() {
278-
Initialization current = this.initialization.get();
279-
McpSchema.InitializeResult initializeResult = current != null ? current.result.get() : null;
282+
McpSchema.InitializeResult initializeResult = currentInitializationResult();
280283
return initializeResult != null ? initializeResult.serverInfo() : null;
281284
}
282285

@@ -285,7 +288,7 @@ public McpSchema.Implementation getServerInfo() {
285288
* @return true if the client-server connection is initialized
286289
*/
287290
public boolean isInitialized() {
288-
Initialization current = this.initialization.get();
291+
Initialization current = this.initializationRef.get();
289292
return current != null && (current.result.get() != null);
290293
}
291294

@@ -309,7 +312,7 @@ public McpSchema.Implementation getClientInfo() {
309312
* Closes the client connection immediately.
310313
*/
311314
public void close() {
312-
Initialization current = this.initialization.getAndSet(null);
315+
Initialization current = this.initializationRef.getAndSet(null);
313316
if (current != null) {
314317
current.close();
315318
}
@@ -322,7 +325,7 @@ public void close() {
322325
*/
323326
public Mono<Void> closeGracefully() {
324327
return Mono.defer(() -> {
325-
Initialization current = this.initialization.getAndSet(null);
328+
Initialization current = this.initializationRef.getAndSet(null);
326329
Mono<?> sessionClose = current != null ? current.closeGracefully() : Mono.empty();
327330
return sessionClose.then(transport.closeGracefully());
328331
});
@@ -361,16 +364,16 @@ public Mono<McpSchema.InitializeResult> initialize() {
361364
return withSession("by explicit API call", init -> Mono.just(init.get()));
362365
}
363366

364-
private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession session) {
367+
private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession mcpClientSession) {
365368
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
366369

367370
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
368371
latestVersion,
369372
this.clientCapabilities,
370373
this.clientInfo); // @formatter:on
371374

372-
Mono<McpSchema.InitializeResult> result = session.sendRequest(McpSchema.METHOD_INITIALIZE, initializeRequest,
373-
INITIALIZE_RESULT_TYPE_REF);
375+
Mono<McpSchema.InitializeResult> result = mcpClientSession.sendRequest(McpSchema.METHOD_INITIALIZE,
376+
initializeRequest, INITIALIZE_RESULT_TYPE_REF);
374377

375378
return result.flatMap(initializeResult -> {
376379
logger.info("Server response with Protocol: {}, Capabilities: {}, Info: {} and Instructions {}",
@@ -382,7 +385,7 @@ private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession session)
382385
"Unsupported protocol version from the server: " + initializeResult.protocolVersion()));
383386
}
384387

385-
return session.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
388+
return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
386389
.thenReturn(initializeResult);
387390
});
388391
}
@@ -403,7 +406,7 @@ void setMcpClientSession(McpClientSession mcpClientSession) {
403406
this.mcpClientSession.set(mcpClientSession);
404407
}
405408

406-
McpClientSession session() {
409+
McpClientSession mcpSession() {
407410
return this.mcpClientSession.get();
408411
}
409412

@@ -427,11 +430,11 @@ void error(Throwable t) {
427430
}
428431

429432
void close() {
430-
this.session().close();
433+
this.mcpSession().close();
431434
}
432435

433436
Mono<Void> closeGracefully() {
434-
return this.session().closeGracefully();
437+
return this.mcpSession().closeGracefully();
435438
}
436439

437440
}
@@ -447,7 +450,7 @@ Mono<Void> closeGracefully() {
447450
private <T> Mono<T> withSession(String actionName, Function<Initialization, Mono<T>> operation) {
448451
return Mono.defer(() -> {
449452
Initialization newInit = Initialization.create();
450-
Initialization previous = this.initialization.compareAndExchange(null, newInit);
453+
Initialization previous = this.initializationRef.compareAndExchange(null, newInit);
451454

452455
boolean needsToInitialize = previous == null;
453456
logger.debug(needsToInitialize ? "Initialization process started" : "Joining previous initialization");
@@ -456,12 +459,12 @@ private <T> Mono<T> withSession(String actionName, Function<Initialization, Mono
456459
}
457460

458461
Mono<McpSchema.InitializeResult> initializationJob = needsToInitialize
459-
? doInitialize(newInit.session()).doOnNext(newInit::complete).onErrorResume(ex -> {
462+
? doInitialize(newInit.mcpSession()).doOnNext(newInit::complete).onErrorResume(ex -> {
460463
newInit.error(ex);
461464
return Mono.error(ex);
462465
}) : previous.await();
463466

464-
return initializationJob.map(initializeResult -> this.initialization.get())
467+
return initializationJob.map(initializeResult -> this.initializationRef.get())
465468
.timeout(this.initializationTimeout)
466469
.onErrorResume(ex -> {
467470
logger.warn("Failed to initialize", ex);
@@ -481,7 +484,7 @@ private <T> Mono<T> withSession(String actionName, Function<Initialization, Mono
481484
*/
482485
public Mono<Object> ping() {
483486
return this.withSession("pinging the server",
484-
init -> init.session().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF));
487+
init -> init.mcpSession().sendRequest(McpSchema.METHOD_PING, null, OBJECT_TYPE_REF));
485488
}
486489

487490
// --------------------------
@@ -562,7 +565,7 @@ public Mono<Void> removeRoot(String rootUri) {
562565
*/
563566
public Mono<Void> rootsListChangedNotification() {
564567
return this.withSession("sending roots list changed notification",
565-
init -> init.session().sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED));
568+
init -> init.mcpSession().sendNotification(McpSchema.METHOD_NOTIFICATION_ROOTS_LIST_CHANGED));
566569
}
567570

568571
private RequestHandler<McpSchema.ListRootsResult> rootsListRequestHandler() {
@@ -612,7 +615,8 @@ public Mono<McpSchema.CallToolResult> callTool(McpSchema.CallToolRequest callToo
612615
if (init.get().capabilities().tools() == null) {
613616
return Mono.error(new McpError("Server does not provide tools capability"));
614617
}
615-
return init.session().sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
618+
return init.mcpSession()
619+
.sendRequest(McpSchema.METHOD_TOOLS_CALL, callToolRequest, CALL_TOOL_RESULT_TYPE_REF);
616620
});
617621
}
618622

@@ -634,7 +638,7 @@ public Mono<McpSchema.ListToolsResult> listTools(String cursor) {
634638
if (init.get().capabilities().tools() == null) {
635639
return Mono.error(new McpError("Server does not provide tools capability"));
636640
}
637-
return init.session()
641+
return init.mcpSession()
638642
.sendRequest(McpSchema.METHOD_TOOLS_LIST, new McpSchema.PaginatedRequest(cursor),
639643
LIST_TOOLS_RESULT_TYPE_REF);
640644
});
@@ -692,7 +696,7 @@ public Mono<McpSchema.ListResourcesResult> listResources(String cursor) {
692696
if (init.get().capabilities().resources() == null) {
693697
return Mono.error(new McpError("Server does not provide the resources capability"));
694698
}
695-
return init.session()
699+
return init.mcpSession()
696700
.sendRequest(McpSchema.METHOD_RESOURCES_LIST, new McpSchema.PaginatedRequest(cursor),
697701
LIST_RESOURCES_RESULT_TYPE_REF);
698702
});
@@ -724,7 +728,7 @@ public Mono<McpSchema.ReadResourceResult> readResource(McpSchema.ReadResourceReq
724728
if (init.get().capabilities().resources() == null) {
725729
return Mono.error(new McpError("Server does not provide the resources capability"));
726730
}
727-
return init.session()
731+
return init.mcpSession()
728732
.sendRequest(McpSchema.METHOD_RESOURCES_READ, readResourceRequest, READ_RESOURCE_RESULT_TYPE_REF);
729733
});
730734
}
@@ -753,7 +757,7 @@ public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String
753757
if (init.get().capabilities().resources() == null) {
754758
return Mono.error(new McpError("Server does not provide the resources capability"));
755759
}
756-
return init.session()
760+
return init.mcpSession()
757761
.sendRequest(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, new McpSchema.PaginatedRequest(cursor),
758762
LIST_RESOURCE_TEMPLATES_RESULT_TYPE_REF);
759763
});
@@ -769,7 +773,7 @@ public Mono<McpSchema.ListResourceTemplatesResult> listResourceTemplates(String
769773
* @see #unsubscribeResource(McpSchema.UnsubscribeRequest)
770774
*/
771775
public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest) {
772-
return this.withSession("subscribing to resources", init -> init.session()
776+
return this.withSession("subscribing to resources", init -> init.mcpSession()
773777
.sendRequest(McpSchema.METHOD_RESOURCES_SUBSCRIBE, subscribeRequest, VOID_TYPE_REFERENCE));
774778
}
775779

@@ -783,7 +787,7 @@ public Mono<Void> subscribeResource(McpSchema.SubscribeRequest subscribeRequest)
783787
* @see #subscribeResource(McpSchema.SubscribeRequest)
784788
*/
785789
public Mono<Void> unsubscribeResource(McpSchema.UnsubscribeRequest unsubscribeRequest) {
786-
return this.withSession("unsubscribing from resources", init -> init.session()
790+
return this.withSession("unsubscribing from resources", init -> init.mcpSession()
787791
.sendRequest(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, unsubscribeRequest, VOID_TYPE_REFERENCE));
788792
}
789793

@@ -825,7 +829,7 @@ public Mono<ListPromptsResult> listPrompts() {
825829
* @see #getPrompt(GetPromptRequest)
826830
*/
827831
public Mono<ListPromptsResult> listPrompts(String cursor) {
828-
return this.withSession("listing prompts", init -> init.session()
832+
return this.withSession("listing prompts", init -> init.mcpSession()
829833
.sendRequest(McpSchema.METHOD_PROMPT_LIST, new PaginatedRequest(cursor), LIST_PROMPTS_RESULT_TYPE_REF));
830834
}
831835

@@ -839,7 +843,7 @@ public Mono<ListPromptsResult> listPrompts(String cursor) {
839843
* @see #listPrompts()
840844
*/
841845
public Mono<GetPromptResult> getPrompt(GetPromptRequest getPromptRequest) {
842-
return this.withSession("getting prompts", init -> init.session()
846+
return this.withSession("getting prompts", init -> init.mcpSession()
843847
.sendRequest(McpSchema.METHOD_PROMPT_GET, getPromptRequest, GET_PROMPT_RESULT_TYPE_REF));
844848
}
845849

@@ -892,7 +896,7 @@ public Mono<Void> setLoggingLevel(LoggingLevel loggingLevel) {
892896

893897
return this.withSession("setting logging level", init -> {
894898
var params = new McpSchema.SetLevelRequest(loggingLevel);
895-
return init.session().sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, OBJECT_TYPE_REF).then();
899+
return init.mcpSession().sendRequest(McpSchema.METHOD_LOGGING_SET_LEVEL, params, OBJECT_TYPE_REF).then();
896900
});
897901
}
898902

@@ -922,7 +926,7 @@ void setProtocolVersions(List<String> protocolVersions) {
922926
* @see McpSchema.CompleteResult
923927
*/
924928
public Mono<McpSchema.CompleteResult> completeCompletion(McpSchema.CompleteRequest completeRequest) {
925-
return this.withSession("complete completions", init -> init.session()
929+
return this.withSession("complete completions", init -> init.mcpSession()
926930
.sendRequest(McpSchema.METHOD_COMPLETION_COMPLETE, completeRequest, COMPLETION_COMPLETE_RESULT_TYPE_REF));
927931
}
928932

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportSession.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ public class DefaultMcpTransportSession implements McpTransportSession<Disposabl
2525
private final Supplier<Publisher<Void>> onClose;
2626

2727
public DefaultMcpTransportSession(Supplier<Publisher<Void>> onClose) {
28-
this.onClose = onClose;
29-
}
28+
this.onClose = onClose;
29+
}
3030

3131
@Override
3232
public Optional<String> sessionId() {

mcp/src/main/java/io/modelcontextprotocol/spec/DefaultMcpTransportStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public long streamId() {
4848
public Publisher<McpSchema.JSONRPCMessage> consumeSseStream(
4949
Publisher<Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>>> eventStream) {
5050
return Flux.deferContextual(ctx -> Flux.from(eventStream).doOnError(e -> {
51-
if (resumable && !(e instanceof McpSessionNotFoundException)) {
51+
if (resumable && !(e instanceof McpTransportSessionNotFoundException)) {
5252
Mono.from(reconnect.apply(this)).contextWrite(ctx).subscribe();
5353
}
5454
}).doOnNext(idAndMessage -> idAndMessage.getT1().ifPresent(id -> {

mcp/src/main/java/io/modelcontextprotocol/spec/McpSessionNotFoundException.java renamed to mcp/src/main/java/io/modelcontextprotocol/spec/McpTransportSessionNotFoundException.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
*
77
* @author Dariusz Jędrzejczyk
88
*/
9-
public class McpSessionNotFoundException extends RuntimeException {
9+
public class McpTransportSessionNotFoundException extends RuntimeException {
1010

11-
public McpSessionNotFoundException(String sessionId, Exception cause) {
11+
public McpTransportSessionNotFoundException(String sessionId, Exception cause) {
1212
super("Session " + sessionId + " not found on the server", cause);
1313

1414
}
1515

16-
public McpSessionNotFoundException(String sessionId) {
16+
public McpTransportSessionNotFoundException(String sessionId) {
1717
super("Session " + sessionId + " not found on the server");
1818
}
1919

0 commit comments

Comments
 (0)