diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 77b32091..76e389d0 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -22,6 +22,7 @@ import modelengine.fit.http.entity.TextEvent; import modelengine.fit.http.protocol.HttpRequestMethod; import modelengine.fitframework.flowable.Choir; +import modelengine.fitframework.flowable.Subscription; import modelengine.fitframework.log.Logger; import modelengine.fitframework.schedule.ExecutePolicy; import modelengine.fitframework.schedule.Task; @@ -76,6 +77,9 @@ public class DefaultMcpClient implements McpClient { private final Map pendingRequests = new ConcurrentHashMap<>(); private final Map pendingResults = new ConcurrentHashMap<>(); + private volatile Subscription subscription; + private volatile ThreadPoolScheduler pingScheduler; + /** * Constructs a new instance of the DefaultMcpClient. * @@ -113,11 +117,12 @@ public void initialize() { messages.subscribeOn(threadPool).subscribe(subscription -> { log.info("Prepare to create SSE channel."); subscription.request(Long.MAX_VALUE); + this.subscription = subscription; }, (subscription, textEvent) -> this.consumeTextEvent(textEvent), subscription -> log.info("SSE channel is completed."), (subscription, cause) -> log.error("SSE channel is failed.", cause)); - ThreadPoolScheduler pingScheduler = ThreadPoolScheduler.custom() + this.pingScheduler = ThreadPoolScheduler.custom() .threadPoolName("mcp-client-ping-" + this.name) .awaitTermination(3, TimeUnit.SECONDS) .isImmediateShutdown(true) @@ -127,7 +132,7 @@ public void initialize() { .workQueueCapacity(Integer.MAX_VALUE) .isDaemonThread(true) .build(); - pingScheduler.schedule(Task.builder() + this.pingScheduler.schedule(Task.builder() .runnable(this::pingServer) .policy(ExecutePolicy.fixedDelay(DELAY_MILLIS)) .build(), DELAY_MILLIS); @@ -375,4 +380,16 @@ private boolean waitInitialized() { } return this.initialized; } + + @Override + public void close() throws IOException { + this.subscription.cancel(); + try { + this.pingScheduler.shutdown(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + log.info("Close MCP client. [name={}, sessionId={}]", this.name, this.sessionId); + } } diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index a43a186f..f6cb6b39 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -16,6 +16,7 @@ import modelengine.fit.http.annotation.RequestQuery; import modelengine.fitframework.annotation.Component; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -55,6 +56,22 @@ public String initialize(@RequestQuery(name = "baseUri") String baseUri, return "Initialized"; } + /** + * Closes the MCP client and releases any resources associated with it. + * This method ensures that the MCP client is properly closed and resources are released. + * + * @return A string indicating that the close operation was successful. + */ + @PostMapping(path = "/close") + public String close() { + try { + this.client.close(); + } catch (IOException e) { + throw new IllegalStateException("Failed to close.", e); + } + return "Closed"; + } + /** * Retrieves a list of available tools from the MCP server. * This method calls the MCP client to fetch the list of tools and returns it to the caller. diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java index 9f75bbfd..6a50201b 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java @@ -8,6 +8,7 @@ import modelengine.fel.tool.mcp.entity.Tool; +import java.io.Closeable; import java.util.List; import java.util.Map; @@ -20,7 +21,7 @@ * @author 季聿阶 * @since 2025-05-21 */ -public interface McpClient { +public interface McpClient extends Closeable { /** * Initializes the MCP Client. */