From cd07c0d5ba1b9dcbae41c8cc3d031bfc67d22b94 Mon Sep 17 00:00:00 2001 From: CodeCaster Date: Thu, 29 May 2025 11:22:21 +0800 Subject: [PATCH] [fel] Add configurable ping interval - Introduced configurable ping interval via constructor and configuration (application.yml) with a fallback default value (15,000 ms). - Moved ping scheduler creation to after initialization to ensure valid session state before sending pings. - Added parameter validation in DefaultMcpClient constructor using notNull and notBlank checks for better failure visibility. - Propagated ping interval configuration through DefaultMcpClientFactory using @Value injection. - Improved testability and lifecycle management by decoupling scheduler setup from constructor logic. This change increases flexibility and robustness of the MCP client, especially in different deployment and test environments. --- .../mcp/client/support/DefaultMcpClient.java | 44 ++++++++++--------- .../support/DefaultMcpClientFactory.java | 13 ++++-- .../src/main/resources/application.yml | 6 ++- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 58bcd787..66eb241d 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -6,6 +6,8 @@ package modelengine.fel.tool.mcp.client.support; +import static modelengine.fitframework.inspection.Validation.notBlank; +import static modelengine.fitframework.inspection.Validation.notNull; import static modelengine.fitframework.util.ObjectUtils.cast; import modelengine.fel.tool.mcp.client.McpClient; @@ -57,7 +59,6 @@ */ public class DefaultMcpClient implements McpClient { private static final Logger log = Logger.get(DefaultMcpClient.class); - private static final long DELAY_MILLIS = 30_000L; private final ObjectSerializer jsonSerializer; private final HttpClassicClient client; @@ -65,6 +66,7 @@ public class DefaultMcpClient implements McpClient { private final String sseEndpoint; private final String name; private final AtomicLong id = new AtomicLong(0); + private final long pingInterval; private volatile String messageEndpoint; private volatile String sessionId; @@ -88,14 +90,16 @@ public class DefaultMcpClient implements McpClient { * @param client The HTTP client used for communication with the MCP server. * @param baseUri The base URI of the MCP server. * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. + * @param pingInterval The interval for sending ping messages to the MCP server. Unit: milliseconds. */ public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String baseUri, - String sseEndpoint) { - this.jsonSerializer = jsonSerializer; - this.client = client; - this.baseUri = baseUri; - this.sseEndpoint = sseEndpoint; + String sseEndpoint, long pingInterval) { + this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); + this.client = notNull(client, "The http client cannot be null."); + this.baseUri = notBlank(baseUri, "The MCP server base URI cannot be blank."); + this.sseEndpoint = notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); this.name = UuidUtils.randomUuidString(); + this.pingInterval = pingInterval > 0 ? pingInterval : 15_000; } @Override @@ -126,20 +130,6 @@ public void initialize() { (subscription, textEvent) -> this.consumeTextEvent(textEvent), subscription -> log.info("SSE channel is completed."), (subscription, cause) -> log.error("SSE channel is failed.", cause)); - this.pingScheduler = ThreadPoolScheduler.custom() - .threadPoolName("mcp-client-ping-" + this.name) - .awaitTermination(3, TimeUnit.SECONDS) - .isImmediateShutdown(true) - .corePoolSize(1) - .maximumPoolSize(1) - .keepAliveTime(60, TimeUnit.SECONDS) - .workQueueCapacity(Integer.MAX_VALUE) - .isDaemonThread(true) - .build(); - this.pingScheduler.schedule(Task.builder() - .runnable(this::pingServer) - .policy(ExecutePolicy.fixedDelay(DELAY_MILLIS)) - .build(), DELAY_MILLIS); if (!this.waitInitialized()) { throw new IllegalStateException("Failed to initialize."); } @@ -236,6 +226,20 @@ private void initializedMcpServer(JsonRpc.Response response) { } catch (IOException e) { throw new IllegalStateException(e); } + this.pingScheduler = ThreadPoolScheduler.custom() + .threadPoolName("mcp-client-ping-" + this.name) + .awaitTermination(3, TimeUnit.SECONDS) + .isImmediateShutdown(true) + .corePoolSize(1) + .maximumPoolSize(1) + .keepAliveTime(60, TimeUnit.SECONDS) + .workQueueCapacity(Integer.MAX_VALUE) + .isDaemonThread(true) + .build(); + this.pingScheduler.schedule(Task.builder() + .runnable(this::pingServer) + .policy(ExecutePolicy.fixedDelay(this.pingInterval)) + .build(), this.pingInterval); } private void recordServerSchema(JsonRpc.Response response) { diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index f35bb082..4af4869e 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -6,12 +6,15 @@ package modelengine.fel.tool.mcp.client.support; +import static modelengine.fitframework.inspection.Validation.notNull; + import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fit.http.client.HttpClassicClient; import modelengine.fit.http.client.HttpClassicClientFactory; import modelengine.fitframework.annotation.Component; import modelengine.fitframework.annotation.Fit; +import modelengine.fitframework.annotation.Value; import modelengine.fitframework.serialization.ObjectSerializer; /** @@ -26,25 +29,29 @@ public class DefaultMcpClientFactory implements McpClientFactory { private final HttpClassicClient client; private final ObjectSerializer jsonSerializer; + private final long pingInterval; /** * Constructs a new instance of the DefaultMcpClientFactory. * * @param clientFactory The factory used to create the HTTP client. * @param jsonSerializer The JSON serializer used for serialization and deserialization. + * @param pingInterval The interval between ping requests. Units: milliseconds. */ public DefaultMcpClientFactory(HttpClassicClientFactory clientFactory, - @Fit(alias = "json") ObjectSerializer jsonSerializer) { + @Fit(alias = "json") ObjectSerializer jsonSerializer, + @Value("${mcp.client.ping-interval}") long pingInterval) { this.client = clientFactory.create(HttpClassicClientFactory.Config.builder() .connectTimeout(30_000) .socketTimeout(60_000) .connectionRequestTimeout(60_000) .build()); - this.jsonSerializer = jsonSerializer; + this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); + this.pingInterval = pingInterval; } @Override public McpClient create(String baseUri, String sseEndpoint) { - return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint); + return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint, this.pingInterval); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml index 9c9531e3..77b64240 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml @@ -1,4 +1,8 @@ fit: beans: packages: - - 'modelengine.fel.tool.mcp.client.support' \ No newline at end of file + - 'modelengine.fel.tool.mcp.client.support' + +mcp: + client: + ping-interval: 15000 \ No newline at end of file