From 32fc96fd5de2dc531ff4e5837607cc4f37163976 Mon Sep 17 00:00:00 2001 From: CodeCaster Date: Wed, 28 May 2025 16:59:00 +0800 Subject: [PATCH] [fel] Add close support in McpClient and release resources properly - Added Closeable interface to McpClient to support proper resource cleanup. - Implemented close() method in DefaultMcpClient to: - Cancel SSE subscription. - Shutdown ping scheduler gracefully. - Log client closure with context. - Added /close endpoint in TestController for testing purposes to trigger client shutdown via HTTP. This change ensures that the MCP client releases all held resources when no longer needed, improving reliability and preventing leaks. --- .../mcp/client/support/DefaultMcpClient.java | 21 +++++++++++++++++-- .../fel/tool/mcp/test/TestController.java | 17 +++++++++++++++ .../fel/tool/mcp/client/McpClient.java | 3 ++- 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 77b32091..76e389d0 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -22,6 +22,7 @@ import modelengine.fit.http.entity.TextEvent; import modelengine.fit.http.protocol.HttpRequestMethod; import modelengine.fitframework.flowable.Choir; +import modelengine.fitframework.flowable.Subscription; import modelengine.fitframework.log.Logger; import modelengine.fitframework.schedule.ExecutePolicy; import modelengine.fitframework.schedule.Task; @@ -76,6 +77,9 @@ public class DefaultMcpClient implements McpClient { private final Map pendingRequests = new ConcurrentHashMap<>(); private final Map pendingResults = new ConcurrentHashMap<>(); + private volatile Subscription subscription; + private volatile ThreadPoolScheduler pingScheduler; + /** * Constructs a new instance of the DefaultMcpClient. * @@ -113,11 +117,12 @@ public void initialize() { messages.subscribeOn(threadPool).subscribe(subscription -> { log.info("Prepare to create SSE channel."); subscription.request(Long.MAX_VALUE); + this.subscription = subscription; }, (subscription, textEvent) -> this.consumeTextEvent(textEvent), subscription -> log.info("SSE channel is completed."), (subscription, cause) -> log.error("SSE channel is failed.", cause)); - ThreadPoolScheduler pingScheduler = ThreadPoolScheduler.custom() + this.pingScheduler = ThreadPoolScheduler.custom() .threadPoolName("mcp-client-ping-" + this.name) .awaitTermination(3, TimeUnit.SECONDS) .isImmediateShutdown(true) @@ -127,7 +132,7 @@ public void initialize() { .workQueueCapacity(Integer.MAX_VALUE) .isDaemonThread(true) .build(); - pingScheduler.schedule(Task.builder() + this.pingScheduler.schedule(Task.builder() .runnable(this::pingServer) .policy(ExecutePolicy.fixedDelay(DELAY_MILLIS)) .build(), DELAY_MILLIS); @@ -375,4 +380,16 @@ private boolean waitInitialized() { } return this.initialized; } + + @Override + public void close() throws IOException { + this.subscription.cancel(); + try { + this.pingScheduler.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + log.info("Close MCP client. [name={}, sessionId={}]", this.name, this.sessionId); + } } diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index a43a186f..f6cb6b39 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -16,6 +16,7 @@ import modelengine.fit.http.annotation.RequestQuery; import modelengine.fitframework.annotation.Component; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -55,6 +56,22 @@ public String initialize(@RequestQuery(name = "baseUri") String baseUri, return "Initialized"; } + /** + * Closes the MCP client and releases any resources associated with it. + * This method ensures that the MCP client is properly closed and resources are released. + * + * @return A string indicating that the close operation was successful. + */ + @PostMapping(path = "/close") + public String close() { + try { + this.client.close(); + } catch (IOException e) { + throw new IllegalStateException("Failed to close.", e); + } + return "Closed"; + } + /** * Retrieves a list of available tools from the MCP server. * This method calls the MCP client to fetch the list of tools and returns it to the caller. diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java index 9f75bbfd..6a50201b 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java @@ -8,6 +8,7 @@ import modelengine.fel.tool.mcp.entity.Tool; +import java.io.Closeable; import java.util.List; import java.util.Map; @@ -20,7 +21,7 @@ * @author 季聿阶 * @since 2025-05-21 */ -public interface McpClient { +public interface McpClient extends Closeable { /** * Initializes the MCP Client. */