diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 76e389d0..58bcd787 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -70,6 +70,7 @@ public class DefaultMcpClient implements McpClient { private volatile String sessionId; private volatile ServerSchema serverSchema; private volatile boolean initialized = false; + private volatile boolean closed = false; private final List tools = new ArrayList<>(); private final Object initializedLock = LockUtils.newSynchronizedLock(); private final Object toolsLock = LockUtils.newSynchronizedLock(); @@ -99,6 +100,9 @@ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient clien @Override public void initialize() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint); Choir messages = this.client.exchangeStream(request, TextEvent.class); @@ -116,8 +120,8 @@ public void initialize() { .build(); messages.subscribeOn(threadPool).subscribe(subscription -> { log.info("Prepare to create SSE channel."); - subscription.request(Long.MAX_VALUE); this.subscription = subscription; + subscription.request(Long.MAX_VALUE); }, (subscription, textEvent) -> this.consumeTextEvent(textEvent), subscription -> log.info("SSE channel is completed."), @@ -242,6 +246,9 @@ private void recordServerSchema(JsonRpc.Response response) { @Override public List getTools() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } if (this.isNotInitialized()) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } @@ -278,6 +285,9 @@ private void getTools0(JsonRpc.Response response) { @Override public Object callTool(String name, Map arguments) { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } if (this.isNotInitialized()) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } @@ -383,9 +393,14 @@ private boolean waitInitialized() { @Override public void close() throws IOException { - this.subscription.cancel(); + this.closed = true; + if (this.subscription != null) { + this.subscription.cancel(); + } try { - this.pingScheduler.shutdown(); + if (this.pingScheduler != null) { + this.pingScheduler.shutdown(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e);