Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package modelengine.fel.tool.mcp.client.support;

import static modelengine.fitframework.inspection.Validation.notBlank;
import static modelengine.fitframework.inspection.Validation.notNull;
import static modelengine.fitframework.util.ObjectUtils.cast;

import modelengine.fel.tool.mcp.client.McpClient;
Expand Down Expand Up @@ -57,14 +59,14 @@
*/
public class DefaultMcpClient implements McpClient {
private static final Logger log = Logger.get(DefaultMcpClient.class);
private static final long DELAY_MILLIS = 30_000L;

private final ObjectSerializer jsonSerializer;
private final HttpClassicClient client;
private final String baseUri;
private final String sseEndpoint;
private final String name;
private final AtomicLong id = new AtomicLong(0);
private final long pingInterval;

private volatile String messageEndpoint;
private volatile String sessionId;
Expand All @@ -88,14 +90,16 @@ public class DefaultMcpClient implements McpClient {
* @param client The HTTP client used for communication with the MCP server.
* @param baseUri The base URI of the MCP server.
* @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection.
* @param pingInterval The interval for sending ping messages to the MCP server. Unit: milliseconds.
*/
public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String baseUri,
String sseEndpoint) {
this.jsonSerializer = jsonSerializer;
this.client = client;
this.baseUri = baseUri;
this.sseEndpoint = sseEndpoint;
String sseEndpoint, long pingInterval) {
this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null.");
this.client = notNull(client, "The http client cannot be null.");
this.baseUri = notBlank(baseUri, "The MCP server base URI cannot be blank.");
this.sseEndpoint = notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank.");
this.name = UuidUtils.randomUuidString();
this.pingInterval = pingInterval > 0 ? pingInterval : 15_000;
}

@Override
Expand Down Expand Up @@ -126,20 +130,6 @@ public void initialize() {
(subscription, textEvent) -> this.consumeTextEvent(textEvent),
subscription -> log.info("SSE channel is completed."),
(subscription, cause) -> log.error("SSE channel is failed.", cause));
this.pingScheduler = ThreadPoolScheduler.custom()
.threadPoolName("mcp-client-ping-" + this.name)
.awaitTermination(3, TimeUnit.SECONDS)
.isImmediateShutdown(true)
.corePoolSize(1)
.maximumPoolSize(1)
.keepAliveTime(60, TimeUnit.SECONDS)
.workQueueCapacity(Integer.MAX_VALUE)
.isDaemonThread(true)
.build();
this.pingScheduler.schedule(Task.builder()
.runnable(this::pingServer)
.policy(ExecutePolicy.fixedDelay(DELAY_MILLIS))
.build(), DELAY_MILLIS);
if (!this.waitInitialized()) {
throw new IllegalStateException("Failed to initialize.");
}
Expand Down Expand Up @@ -236,6 +226,20 @@ private void initializedMcpServer(JsonRpc.Response<Long> response) {
} catch (IOException e) {
throw new IllegalStateException(e);
}
this.pingScheduler = ThreadPoolScheduler.custom()
.threadPoolName("mcp-client-ping-" + this.name)
.awaitTermination(3, TimeUnit.SECONDS)
.isImmediateShutdown(true)
.corePoolSize(1)
.maximumPoolSize(1)
.keepAliveTime(60, TimeUnit.SECONDS)
.workQueueCapacity(Integer.MAX_VALUE)
.isDaemonThread(true)
.build();
this.pingScheduler.schedule(Task.builder()
.runnable(this::pingServer)
.policy(ExecutePolicy.fixedDelay(this.pingInterval))
.build(), this.pingInterval);
}

private void recordServerSchema(JsonRpc.Response<Long> response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

package modelengine.fel.tool.mcp.client.support;

import static modelengine.fitframework.inspection.Validation.notNull;

import modelengine.fel.tool.mcp.client.McpClient;
import modelengine.fel.tool.mcp.client.McpClientFactory;
import modelengine.fit.http.client.HttpClassicClient;
import modelengine.fit.http.client.HttpClassicClientFactory;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.annotation.Fit;
import modelengine.fitframework.annotation.Value;
import modelengine.fitframework.serialization.ObjectSerializer;

/**
Expand All @@ -26,25 +29,29 @@
public class DefaultMcpClientFactory implements McpClientFactory {
private final HttpClassicClient client;
private final ObjectSerializer jsonSerializer;
private final long pingInterval;

/**
* Constructs a new instance of the DefaultMcpClientFactory.
*
* @param clientFactory The factory used to create the HTTP client.
* @param jsonSerializer The JSON serializer used for serialization and deserialization.
* @param pingInterval The interval between ping requests. Units: milliseconds.
*/
public DefaultMcpClientFactory(HttpClassicClientFactory clientFactory,
@Fit(alias = "json") ObjectSerializer jsonSerializer) {
@Fit(alias = "json") ObjectSerializer jsonSerializer,
@Value("${mcp.client.ping-interval}") long pingInterval) {
this.client = clientFactory.create(HttpClassicClientFactory.Config.builder()
.connectTimeout(30_000)
.socketTimeout(60_000)
.connectionRequestTimeout(60_000)
.build());
this.jsonSerializer = jsonSerializer;
this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null.");
this.pingInterval = pingInterval;
}

@Override
public McpClient create(String baseUri, String sseEndpoint) {
return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint);
return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint, this.pingInterval);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
fit:
beans:
packages:
- 'modelengine.fel.tool.mcp.client.support'
- 'modelengine.fel.tool.mcp.client.support'

mcp:
client:
ping-interval: 15000
Loading