Skip to content

Commit 57985c0

Browse files
committed
refactor: consolidate MCP client initialization logic
- Move session setup, completion, and error callbacks into doInitialize method - Rename variables in McpClientSession for better clarity (sink -> responseSink/resultSink) Signed-off-by: Christian Tzolov <christian.tzolov@broadcom.com>
1 parent b701a36 commit 57985c0

File tree

2 files changed

+18
-16
lines changed

2 files changed

+18
-16
lines changed

mcp/src/main/java/io/modelcontextprotocol/client/McpAsyncClient.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,12 @@ public Mono<McpSchema.InitializeResult> initialize() {
387387
return withSession("by explicit API call", init -> Mono.just(init.get()));
388388
}
389389

390-
private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession mcpClientSession) {
390+
private Mono<McpSchema.InitializeResult> doInitialize(Initialization initializaiton) {
391+
392+
initializaiton.setMcpClientSession(this.sessionSupplier.get());
393+
394+
McpClientSession mcpClientSession = initializaiton.mcpSession();
395+
391396
String latestVersion = this.protocolVersions.get(this.protocolVersions.size() - 1);
392397

393398
McpSchema.InitializeRequest initializeRequest = new McpSchema.InitializeRequest(// @formatter:off
@@ -410,6 +415,9 @@ private Mono<McpSchema.InitializeResult> doInitialize(McpClientSession mcpClient
410415

411416
return mcpClientSession.sendNotification(McpSchema.METHOD_NOTIFICATION_INITIALIZED, null)
412417
.thenReturn(initializeResult);
418+
}).doOnNext(initializaiton::complete).onErrorResume(ex -> {
419+
initializaiton.error(ex);
420+
return Mono.error(ex);
413421
});
414422
}
415423

@@ -477,15 +485,9 @@ private <T> Mono<T> withSession(String actionName, Function<Initialization, Mono
477485

478486
boolean needsToInitialize = previous == null;
479487
logger.debug(needsToInitialize ? "Initialization process started" : "Joining previous initialization");
480-
if (needsToInitialize) {
481-
newInit.setMcpClientSession(this.sessionSupplier.get());
482-
}
483488

484-
Mono<McpSchema.InitializeResult> initializationJob = needsToInitialize
485-
? doInitialize(newInit.mcpSession()).doOnNext(newInit::complete).onErrorResume(ex -> {
486-
newInit.error(ex);
487-
return Mono.error(ex);
488-
}) : previous.await();
489+
Mono<McpSchema.InitializeResult> initializationJob = needsToInitialize ? doInitialize(newInit)
490+
: previous.await();
489491

490492
return initializationJob.map(initializeResult -> this.initializationRef.get())
491493
.timeout(this.initializationTimeout)

mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -229,27 +229,27 @@ private String generateRequestId() {
229229
public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReference<T> typeRef) {
230230
String requestId = this.generateRequestId();
231231

232-
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(sink -> {
232+
return Mono.deferContextual(ctx -> Mono.<McpSchema.JSONRPCResponse>create(responseSink -> {
233233
logger.debug("Sending message for method {}", method);
234-
this.pendingResponses.put(requestId, sink);
234+
this.pendingResponses.put(requestId, responseSink);
235235
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, method,
236236
requestId, requestParams);
237237
this.transport.sendMessage(jsonrpcRequest).contextWrite(ctx).subscribe(v -> {
238238
}, error -> {
239239
this.pendingResponses.remove(requestId);
240-
sink.error(error);
240+
responseSink.error(error);
241241
});
242-
})).timeout(this.requestTimeout).handle((jsonRpcResponse, sink) -> {
242+
})).timeout(this.requestTimeout).handle((jsonRpcResponse, resultSink) -> {
243243
if (jsonRpcResponse.error() != null) {
244244
logger.error("Error handling request: {}", jsonRpcResponse.error());
245-
sink.error(new McpError(jsonRpcResponse.error()));
245+
resultSink.error(new McpError(jsonRpcResponse.error()));
246246
}
247247
else {
248248
if (typeRef.getType().equals(Void.class)) {
249-
sink.complete();
249+
resultSink.complete();
250250
}
251251
else {
252-
sink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
252+
resultSink.next(this.transport.unmarshalFrom(jsonRpcResponse.result(), typeRef));
253253
}
254254
}
255255
});

0 commit comments

Comments
 (0)