From 1e6dca49b9ec2c62688cac1c43709dd1a37fc1ec Mon Sep 17 00:00:00 2001 From: CodeCaster Date: Wed, 28 May 2025 20:48:55 +0800 Subject: [PATCH] [fel] Improve McpClient lifecycle management with closed state tracking - Added a closed flag to DefaultMcpClient to track client lifecycle state. - Prevent method invocations after closure by adding checks in initialize(), getTools(), and callTool(). - Safely handle potential nulls in close() to avoid exceptions during partial initialization. - Ensures consistent behavior and prevents resource leaks or usage after the client has been closed. This change improves reliability and correctness by enforcing proper usage of the MCP client throughout its lifecycle. --- .../mcp/client/support/DefaultMcpClient.java | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 76e389d0..58bcd787 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -70,6 +70,7 @@ public class DefaultMcpClient implements McpClient { private volatile String sessionId; private volatile ServerSchema serverSchema; private volatile boolean initialized = false; + private volatile boolean closed = false; private final List tools = new ArrayList<>(); private final Object initializedLock = LockUtils.newSynchronizedLock(); private final Object toolsLock = LockUtils.newSynchronizedLock(); @@ -99,6 +100,9 @@ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient clien @Override public void initialize() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint); Choir messages = this.client.exchangeStream(request, TextEvent.class); @@ -116,8 +120,8 @@ public void initialize() { .build(); messages.subscribeOn(threadPool).subscribe(subscription -> { log.info("Prepare to create SSE channel."); - subscription.request(Long.MAX_VALUE); this.subscription = subscription; + subscription.request(Long.MAX_VALUE); }, (subscription, textEvent) -> this.consumeTextEvent(textEvent), subscription -> log.info("SSE channel is completed."), @@ -242,6 +246,9 @@ private void recordServerSchema(JsonRpc.Response response) { @Override public List getTools() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } if (this.isNotInitialized()) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } @@ -278,6 +285,9 @@ private void getTools0(JsonRpc.Response response) { @Override public Object callTool(String name, Map arguments) { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } if (this.isNotInitialized()) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } @@ -383,9 +393,14 @@ private boolean waitInitialized() { @Override public void close() throws IOException { - this.subscription.cancel(); + this.closed = true; + if (this.subscription != null) { + this.subscription.cancel(); + } try { - this.pingScheduler.shutdown(); + if (this.pingScheduler != null) { + this.pingScheduler.shutdown(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e);