diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index a8d490b5..df882d44 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -37,6 +37,11 @@ org.fitframework.fel tool-mcp-client-service + + io.modelcontextprotocol.sdk + mcp + 0.15.0 + diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java deleted file mode 100644 index 0b8c1faa..00000000 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ /dev/null @@ -1,429 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.client.support; - -import static modelengine.fitframework.inspection.Validation.notBlank; -import static modelengine.fitframework.inspection.Validation.notNull; -import static modelengine.fitframework.util.ObjectUtils.cast; - -import modelengine.fel.tool.mcp.client.McpClient; -import modelengine.fel.tool.mcp.entity.ClientSchema; -import modelengine.fel.tool.mcp.entity.Event; -import modelengine.fel.tool.mcp.entity.JsonRpc; -import modelengine.fel.tool.mcp.entity.Method; -import modelengine.fel.tool.mcp.entity.ServerSchema; -import modelengine.fel.tool.mcp.entity.Tool; -import modelengine.fit.http.client.HttpClassicClient; -import modelengine.fit.http.client.HttpClassicClientRequest; -import modelengine.fit.http.client.HttpClassicClientResponse; -import modelengine.fit.http.entity.Entity; -import modelengine.fit.http.entity.TextEvent; -import modelengine.fit.http.protocol.HttpRequestMethod; -import modelengine.fitframework.flowable.Choir; -import modelengine.fitframework.flowable.Subscription; -import modelengine.fitframework.log.Logger; -import modelengine.fitframework.schedule.ExecutePolicy; -import modelengine.fitframework.schedule.Task; -import modelengine.fitframework.schedule.ThreadPoolExecutor; -import modelengine.fitframework.schedule.ThreadPoolScheduler; -import modelengine.fitframework.serialization.ObjectSerializer; -import modelengine.fitframework.util.CollectionUtils; -import modelengine.fitframework.util.LockUtils; -import modelengine.fitframework.util.MapBuilder; -import modelengine.fitframework.util.ObjectUtils; -import modelengine.fitframework.util.StringUtils; -import modelengine.fitframework.util.ThreadUtils; -import modelengine.fitframework.util.UuidUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -/** - * Represents a default implementation of the MCP client, responsible for interacting with the MCP server. - * This class provides methods for initializing the client, retrieving tools, and calling tools. - * - * @author 季聿阶 - * @since 2025-05-21 - */ -public class DefaultMcpClient implements McpClient { - private static final Logger log = Logger.get(DefaultMcpClient.class); - - private final ObjectSerializer jsonSerializer; - private final HttpClassicClient client; - private final String baseUri; - private final String sseEndpoint; - private final String name; - private final AtomicLong id = new AtomicLong(0); - private final long pingInterval; - - private volatile String messageEndpoint; - private volatile String sessionId; - private volatile ServerSchema serverSchema; - private volatile boolean initialized = false; - private volatile boolean closed = false; - private final Object initializedLock = LockUtils.newSynchronizedLock(); - private final Map>> responseConsumers = new ConcurrentHashMap<>(); - private final Map pendingRequests = new ConcurrentHashMap<>(); - private final Map pendingResults = new ConcurrentHashMap<>(); - - private volatile Subscription subscription; - private volatile ThreadPoolScheduler pingScheduler; - - /** - * Constructs a new instance of the DefaultMcpClient. - * - * @param jsonSerializer The serializer used for JSON serialization and deserialization. - * @param client The HTTP client used for communication with the MCP server. - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. - * @param pingInterval The interval for sending ping messages to the MCP server. Unit: milliseconds. - */ - public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String baseUri, - String sseEndpoint, long pingInterval) { - this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); - this.client = notNull(client, "The http client cannot be null."); - this.baseUri = notBlank(baseUri, "The MCP server base URI cannot be blank."); - this.sseEndpoint = notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); - this.name = UuidUtils.randomUuidString(); - this.pingInterval = pingInterval > 0 ? pingInterval : 15_000; - } - - @Override - public void initialize() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint); - Choir messages = this.client.exchangeStream(request, TextEvent.class); - ThreadPoolExecutor threadPool = ThreadPoolExecutor.custom() - .threadPoolName("mcp-client-" + this.name) - .awaitTermination(3, TimeUnit.SECONDS) - .isImmediateShutdown(true) - .corePoolSize(1) - .maximumPoolSize(1) - .keepAliveTime(60, TimeUnit.SECONDS) - .workQueueCapacity(Integer.MAX_VALUE) - .isDaemonThread(true) - .exceptionHandler((thread, cause) -> log.warn("Exception in MCP client pool.", cause)) - .rejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.AbortPolicy()) - .build(); - messages.subscribeOn(threadPool).subscribe(subscription -> { - log.info("Prepare to create SSE channel."); - this.subscription = subscription; - subscription.request(Long.MAX_VALUE); - }, - (subscription, textEvent) -> this.consumeTextEvent(textEvent), - subscription -> log.info("SSE channel is completed."), - (subscription, cause) -> log.error("SSE channel is failed.", cause)); - if (!this.waitInitialized()) { - throw new IllegalStateException("Failed to initialize."); - } - } - - private void consumeTextEvent(TextEvent textEvent) { - log.info("Receive message from MCP server. [id={}, event={}, message={}]", - textEvent.id(), - textEvent.event(), - textEvent.data()); - if (StringUtils.isBlank(textEvent.event()) || StringUtils.isBlank((String) textEvent.data())) { - return; - } - if (Objects.equals(textEvent.event(), Event.ENDPOINT.code())) { - this.initializeMcpServer(textEvent); - return; - } - Map jsonRpc = this.jsonSerializer.deserialize((String) textEvent.data(), Object.class); - Object messageId = jsonRpc.get("id"); - if (messageId == null) { - // Notification message, ignore. - return; - } - long actualId = Long.parseLong(messageId.toString()); - Consumer> consumer = this.responseConsumers.remove(actualId); - if (consumer == null) { - log.info("No consumer registered. [id={}]", actualId); - return; - } - Object error = jsonRpc.get("error"); - JsonRpc.Response response; - if (error == null) { - response = JsonRpc.createResponse(actualId, jsonRpc.get("result")); - } else { - response = JsonRpc.createResponseWithError(actualId, error); - } - consumer.accept(response); - } - - private void pingServer() { - if (this.isNotInitialized()) { - log.info("MCP client is not initialized and {} method will be delayed.", Method.PING.code()); - return; - } - this.post2McpServer(Method.PING, null, null); - } - - private void initializeMcpServer(TextEvent textEvent) { - this.messageEndpoint = textEvent.data().toString(); - ClientSchema schema = new ClientSchema("2024-11-05", - new ClientSchema.Capabilities(), - new ClientSchema.Info("FIT MCP Client", "3.6.0-SNAPSHOT")); - this.post2McpServer(Method.INITIALIZE, schema, (request, currentId) -> { - this.sessionId = request.queries() - .first("session_id") - .orElseThrow(() -> new IllegalStateException("The session_id cannot be empty.")); - this.responseConsumers.put(currentId, this::initializedMcpServer); - }); - } - - private void initializedMcpServer(JsonRpc.Response response) { - if (response.error() != null) { - log.error("Abort send {} method to MCP server. [sessionId={}, error={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - response.error()); - throw new IllegalStateException(response.error().toString()); - } - this.recordServerSchema(response); - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint); - JsonRpc.Notification notification = JsonRpc.createNotification(Method.NOTIFICATION_INITIALIZED.code()); - request.entity(Entity.createObject(request, notification)); - log.info("Send {} method to MCP server. [sessionId={}, notification={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - notification); - try (HttpClassicClientResponse exchange = request.exchange(Object.class)) { - if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) { - log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - exchange.statusCode()); - } else { - log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - exchange.statusCode()); - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - synchronized (this.initializedLock) { - this.initialized = true; - this.initializedLock.notifyAll(); - } - this.pingScheduler = ThreadPoolScheduler.custom() - .threadPoolName("mcp-client-ping-" + this.name) - .awaitTermination(3, TimeUnit.SECONDS) - .isImmediateShutdown(true) - .corePoolSize(1) - .maximumPoolSize(1) - .keepAliveTime(60, TimeUnit.SECONDS) - .workQueueCapacity(Integer.MAX_VALUE) - .isDaemonThread(true) - .build(); - this.pingScheduler.schedule(Task.builder() - .runnable(this::pingServer) - .policy(ExecutePolicy.fixedDelay(this.pingInterval)) - .build(), this.pingInterval); - } - - private void recordServerSchema(JsonRpc.Response response) { - Map mapResult = cast(response.result()); - this.serverSchema = ServerSchema.create(mapResult); - log.info("MCP server has initialized. [server={}]", this.serverSchema); - } - - @Override - public List getTools() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (this.isNotInitialized()) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } - long requestId = this.post2McpServer(Method.TOOLS_LIST, null, (request, currentId) -> { - this.responseConsumers.put(currentId, this::getTools0); - this.pendingRequests.put(currentId, true); - }); - while (this.pendingRequests.get(requestId)) { - ThreadUtils.sleep(100); - } - Result result = this.pendingResults.remove(requestId); - this.pendingRequests.remove(requestId); - if (result.isSuccess()) { - return ObjectUtils.cast(result.getContent()); - } else { - throw new IllegalStateException(result.getError()); - } - } - - private void getTools0(JsonRpc.Response response) { - if (response.error() != null) { - String error = StringUtils.format("Failed to get tools list from MCP server. [sessionId={0}, response={1}]", - this.sessionId, - response); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map result = cast(response.result()); - List> rawTools = cast(result.get("tools")); - List tools = new ArrayList<>(rawTools.stream() - .map(rawTool -> ObjectUtils.toCustomObject(rawTool, Tool.class)) - .toList()); - this.pendingResults.put(response.id(), Result.success(tools)); - this.pendingRequests.put(response.id(), false); - } - - @Override - public Object callTool(String name, Map arguments) { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (this.isNotInitialized()) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } - long requestId = this.post2McpServer(Method.TOOLS_CALL, - MapBuilder.get().put("name", name).put("arguments", arguments).build(), - (request, currentId) -> { - this.responseConsumers.put(currentId, this::callTools0); - this.pendingRequests.put(currentId, true); - }); - while (this.pendingRequests.get(requestId)) { - ThreadUtils.sleep(100); - } - Result result = this.pendingResults.remove(requestId); - this.pendingRequests.remove(requestId); - if (result.isSuccess()) { - return result.getContent(); - } else { - throw new IllegalStateException(result.getError()); - } - } - - private void callTools0(JsonRpc.Response response) { - if (response.error() != null) { - String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, response={1}]", - this.sessionId, - response); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map result = cast(response.result()); - boolean isError = cast(result.get("isError")); - if (isError) { - String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, result={1}]", - this.sessionId, - result); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - List> rawContents = cast(result.get("content")); - if (CollectionUtils.isEmpty(rawContents)) { - String error = StringUtils.format( - "Failed to call tool from MCP server: no result returned. [sessionId={0}, result={1}]", - this.sessionId, - result); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map rawContent = rawContents.get(0); - this.pendingResults.put(response.id(), Result.success(rawContent.get("text"))); - this.pendingRequests.put(response.id(), false); - } - - private long post2McpServer(Method method, Object requestParams, - BiConsumer requestConsumer) { - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint); - long currentId = this.getNextId(); - if (requestConsumer != null) { - requestConsumer.accept(request, currentId); - } - JsonRpc.Request rpcRequest = JsonRpc.createRequest(currentId, method.code(), requestParams); - request.entity(Entity.createObject(request, rpcRequest)); - log.info("Send {} method to MCP server. [sessionId={}, request={}]", method.code(), this.sessionId, rpcRequest); - try (HttpClassicClientResponse exchange = request.exchange(Object.class)) { - if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) { - log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]", - method.code(), - this.sessionId, - exchange.statusCode()); - } else { - log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]", - method.code(), - this.sessionId, - exchange.statusCode()); - } - } catch (IOException e) { - throw new IllegalStateException(StringUtils.format("Failed to {0} MCP server. [sessionId={1}]", - method.code(), - this.sessionId), e); - } - return currentId; - } - - private long getNextId() { - long tmpId = this.id.getAndIncrement(); - if (tmpId < 0) { - this.id.set(0); - return 0; - } - return tmpId; - } - - private boolean isNotInitialized() { - return !this.initialized; - } - - private boolean waitInitialized() { - if (this.initialized) { - return true; - } - synchronized (this.initializedLock) { - if (this.initialized) { - return true; - } - try { - this.initializedLock.wait(60_000L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Failed to initialize.", e); - } - } - return this.initialized; - } - - @Override - public void close() throws IOException { - this.closed = true; - if (this.subscription != null) { - this.subscription.cancel(); - } - try { - if (this.pingScheduler != null) { - this.pingScheduler.shutdown(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - log.info("Close MCP client. [name={}, sessionId={}]", this.name, this.sessionId); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 4af4869e..330a3172 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -6,52 +6,33 @@ package modelengine.fel.tool.mcp.client.support; -import static modelengine.fitframework.inspection.Validation.notNull; - import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; -import modelengine.fit.http.client.HttpClassicClient; -import modelengine.fit.http.client.HttpClassicClientFactory; import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.annotation.Fit; import modelengine.fitframework.annotation.Value; -import modelengine.fitframework.serialization.ObjectSerializer; /** - * Represents a factory for creating instances of the DefaultMcpClient. - * This class is responsible for initializing and configuring the HTTP client and JSON serializer - * required by the DefaultMcpClient. + * Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}. + * This class is responsible for initializing and configuring. * * @author 季聿阶 * @since 2025-05-21 */ @Component public class DefaultMcpClientFactory implements McpClientFactory { - private final HttpClassicClient client; - private final ObjectSerializer jsonSerializer; - private final long pingInterval; + private final int requestTimeoutSeconds; /** * Constructs a new instance of the DefaultMcpClientFactory. * - * @param clientFactory The factory used to create the HTTP client. - * @param jsonSerializer The JSON serializer used for serialization and deserialization. - * @param pingInterval The interval between ping requests. Units: milliseconds. + * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. */ - public DefaultMcpClientFactory(HttpClassicClientFactory clientFactory, - @Fit(alias = "json") ObjectSerializer jsonSerializer, - @Value("${mcp.client.ping-interval}") long pingInterval) { - this.client = clientFactory.create(HttpClassicClientFactory.Config.builder() - .connectTimeout(30_000) - .socketTimeout(60_000) - .connectionRequestTimeout(60_000) - .build()); - this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); - this.pingInterval = pingInterval; + public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") int requestTimeoutSeconds) { + this.requestTimeoutSeconds = requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 180; } @Override public McpClient create(String baseUri, String sseEndpoint) { - return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint, this.pingInterval); + return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java new file mode 100644 index 00000000..bffb88df --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java @@ -0,0 +1,44 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.client.support; + +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fitframework.log.Logger; + +/** + * Handles MCP client messages received from MCP server, + * including logging notifications and elicitation requests. + * + * @author 黄可欣 + * @since 2025-11-03 + */ +public class DefaultMcpClientLogHandler { + private static final Logger log = Logger.get(DefaultMcpClientLogHandler.class); + private final String clientId; + + /** + * Constructs a new instance of DefaultMcpClientLogHandler. + * + * @param clientId The unique identifier of the MCP client. + */ + public DefaultMcpClientLogHandler(String clientId) { + this.clientId = clientId; + } + + /** + * Handles logging messages received from the MCP server. + * Includes the client UUID in the log message for tracking. + * + * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. + */ + public void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { + log.info("Received logging message from MCP server. [clientId={}, level={}, data={}]", + this.clientId, + notification.level(), + notification.data()); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java new file mode 100644 index 00000000..44bca847 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -0,0 +1,289 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.client.support; + +import static modelengine.fitframework.inspection.Validation.notBlank; + +import com.fasterxml.jackson.databind.ObjectMapper; + +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; +import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fel.tool.mcp.client.McpClient; +import modelengine.fel.tool.mcp.entity.Tool; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.StringUtils; +import modelengine.fitframework.util.UuidUtils; + +import java.io.IOException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * A default implementation of the MCP client that uses the MCP SDK's streamable HTTP transport. + * + *

Not thread-safe: This class is not thread-safe. External synchronization is required + * if instances are to be accessed by multiple threads concurrently. + * + * @author 黄可欣 + * @since 2025-11-03 + */ +public class DefaultMcpStreamableClient implements McpClient { + private static final Logger log = Logger.get(DefaultMcpStreamableClient.class); + + private final String clientId; + private final McpSyncClient mcpSyncClient; + private final DefaultMcpClientLogHandler logHandler; + + private volatile boolean initialized = false; + private volatile boolean closed = false; + + /** + * Constructs a new instance of the DefaultMcpStreamableClient. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. + * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. + */ + public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) { + this.clientId = UuidUtils.randomUuidString(); + notBlank(baseUri, "The MCP server base URI cannot be blank."); + notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); + log.info("Creating MCP client. [clientId={}, baseUri={}]", this.clientId, baseUri); + ObjectMapper mapper = new ObjectMapper(); + HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) + .jsonMapper(new JacksonMcpJsonMapper(mapper)) + .endpoint(sseEndpoint) + .build(); + + this.logHandler = new DefaultMcpClientLogHandler(this.clientId); + this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .loggingConsumer(this.logHandler::handleLoggingMessage) + .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) + .build(); + } + + @Override + public String getClientId() { + return this.clientId; + } + + /** + * Initializes the MCP client connection. + * + * @throws IllegalStateException if the client has already been closed. + */ + @Override + public void initialize() { + this.ensureNotClosed(); + this.mcpSyncClient.initialize(); + this.initialized = true; + log.info("MCP client initialized successfully. [clientId={}]", this.clientId); + } + + /** + * Retrieves the list of available tools from the MCP server. + * + * @return A {@link List} of {@link Tool} objects representing the available tools. + * @throws IllegalStateException if the client is closed, not initialized, or if + * the server request fails. + */ + @Override + public List getTools() { + this.ensureReady(); + try { + McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); + if (result == null || result.tools() == null) { + log.warn("Failed to get tools list: result is null. [clientId={}]", this.clientId); + throw new IllegalStateException("Failed to get tools list from MCP server: result is null."); + } + + List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); + + log.info("Successfully retrieved tools list. [clientId={}, count={}]", this.clientId, tools.size()); + tools.forEach(tool -> log.debug("Tool information. [name={}, description={}]", + tool.getName(), + tool.getDescription())); + return tools; + } catch (Exception e) { + log.error("Failed to get tools list. [clientId={}, error={}]", this.clientId, e.getMessage()); + throw new IllegalStateException(StringUtils.format("Failed to get tools from MCP server. [error={0}]", + e.getMessage()), e); + } + } + + /** + * Invokes a specific tool on the MCP server with the provided arguments. + * + * @param name The name of the tool to invoke, as a {@link String}. + * @param arguments The arguments to pass to the tool, as a {@link Map} of parameter names to values. + * @return The result of the tool invocation. For text content, returns the text as a {@link String}. + * For image content, returns the {@link McpSchema.ImageContent} object. + * Returns {@code null} if the tool returns empty content. + * @throws IllegalStateException if the client is closed, not initialized, if the tool + * returns an error, or if the server request fails. + */ + @Override + public Object callTool(String name, Map arguments) { + this.ensureReady(); + try { + log.info("Calling tool. [clientId={}, name={}, arguments={}]", this.clientId, name, arguments); + McpSchema.CallToolResult result = + this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments)); + + if (result == null) { + log.error("Failed to call tool: result is null. [clientId={}, name={}]", this.clientId, name); + throw new IllegalStateException(StringUtils.format("Failed to call tool: result is null. [name={0}]", + name)); + } + return this.processToolResult(result, name); + } catch (Exception e) { + log.error("Failed to call tool. [clientId={}, name={}, error={}]", this.clientId, name, e.getMessage()); + throw new IllegalStateException(StringUtils.format("Failed to call tool. [name={0}, error={1}]", + name, + e.getMessage()), e); + } + } + + /** + * Processes the tool call result and extracts the content. + * Handles error cases and different content types (text, image, etc.). + * + * @param result The {@link McpSchema.CallToolResult} returned from the tool call. + * @param name The name of the tool that was called. + * @return The extracted content. For text content, returns the text as a {@link String}. + * For image content, returns the {@link McpSchema.ImageContent} object. + * Returns {@code null} if the tool returns empty content. + * @throws IllegalStateException if the tool returns an error. + */ + private Object processToolResult(McpSchema.CallToolResult result, String name) { + if (result.isError() != null && result.isError()) { + String errorDetails = this.extractErrorDetails(result.content()); + log.error("Tool returned an error. [clientId={}, name={}, details={}]", this.clientId, name, errorDetails); + throw new IllegalStateException(StringUtils.format("Tool returned an error. [name={0}, details={1}]", + name, + errorDetails)); + } + + if (result.content() == null || result.content().isEmpty()) { + log.warn("Tool returned empty content. [clientId={}, name={}]", this.clientId, name); + return null; + } + + Object content = result.content().get(0); + if (content instanceof McpSchema.TextContent textContent) { + log.info("Successfully called tool. [clientId={}, name={}, result={}]", + this.clientId, + name, + textContent.text()); + return textContent.text(); + } else if (content instanceof McpSchema.ImageContent imageContent) { + log.info("Successfully called tool: image content. [clientId={}, name={}]", this.clientId, name); + return imageContent; + } else { + log.info("Successfully called tool. [clientId={}, name={}, contentType={}]", + this.clientId, + name, + content.getClass().getSimpleName()); + return content; + } + } + + /** + * Closes the MCP client connection and releases associated resources. + * + *

Note: This method is not thread-safe. Callers should ensure that this method + * is not invoked concurrently from multiple threads. + * + * @throws IOException if an I/O error occurs during the close operation. + */ + @Override + public void close() throws IOException { + this.ensureNotClosed(); + this.closed = true; + this.mcpSyncClient.closeGracefully(); + log.info("MCP client closed. [clientId={}]", this.clientId); + } + + /** + * Converts an MCP SDK Tool to a FEL Tool entity. + * + * @param mcpTool The MCP SDK {@link McpSchema.Tool} to convert. + * @return A FEL {@link Tool} entity with the corresponding name, description, and input schema. + */ + private Tool convertToFelTool(McpSchema.Tool mcpTool) { + Tool tool = new Tool(); + tool.setName(mcpTool.name()); + tool.setDescription(mcpTool.description()); + + // Convert JsonSchema to Map + McpSchema.JsonSchema inputSchema = mcpTool.inputSchema(); + if (inputSchema != null) { + Map schemaMap = new HashMap<>(); + schemaMap.put("type", inputSchema.type()); + if (inputSchema.properties() != null) { + schemaMap.put("properties", inputSchema.properties()); + } + if (inputSchema.required() != null) { + schemaMap.put("required", inputSchema.required()); + } + tool.setInputSchema(schemaMap); + } + + return tool; + } + + /** + * Ensures the MCP client is not closed. + * + * @throws IllegalStateException if the client is closed. + */ + private void ensureNotClosed() { + if (this.closed) { + throw new IllegalStateException(StringUtils.format("The MCP client is already closed. [clientId={0}]", + this.clientId)); + } + } + + /** + * Ensures the MCP client is ready for operations (not closed and initialized). + * + * @throws IllegalStateException if the client is closed or not initialized. + */ + private void ensureReady() { + this.ensureNotClosed(); + if (!this.initialized) { + throw new IllegalStateException(StringUtils.format("MCP client is not initialized. [clientId={0}]", + this.clientId)); + } + } + + /** + * Extracts error details from tool result content. + * + * @param content The content list from the tool result. + * @return The error details as a string. + */ + private String extractErrorDetails(List content) { + if (content != null && !content.isEmpty()) { + McpSchema.Content errorContent = content.get(0); + if (errorContent instanceof McpSchema.TextContent textContent) { + return textContent.text(); + } else { + return errorContent.toString(); + } + } + return ""; + } +} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java deleted file mode 100644 index 4b655024..00000000 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java +++ /dev/null @@ -1,72 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.client.support; - -/** - * 表示调用 MCP 的结果。 - * - * @author 季聿阶 - * @since 2025-08-04 - */ -public class Result { - private final boolean success; - private final Object content; - private final String error; - - private Result(boolean success, Object content, String error) { - this.success = success; - this.content = content; - this.error = error; - } - - /** - * 创建一个成功的结果。 - * - * @param content 表示成功结果的内容的 {@link Object}。 - * @return 表示成功结果的对象的 {@link Result}。 - */ - public static Result success(Object content) { - return new Result(true, content, null); - } - - /** - * 创建一个失败的结果。 - * - * @param error 表示错误结果的信息的 {@link String}。 - * @return 表示错误结果的对象的 {@link Result}。 - */ - public static Result error(String error) { - return new Result(false, null, error); - } - - /** - * 获取结果是否成功。 - * - * @return 如果结果成功,则返回 {@code true};否则返回 {@code false}。 - */ - public boolean isSuccess() { - return this.success; - } - - /** - * 获取结果内容。 - * - * @return 表示结果内容的 {@link Object}。 - */ - public Object getContent() { - return this.content; - } - - /** - * 获取结果错误信息。 - * - * @return 表示错误信息的 {@link String}。 - */ - public String getError() { - return this.error; - } -} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml index 77b64240..18a50455 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml @@ -5,4 +5,5 @@ fit: mcp: client: - ping-interval: 15000 \ No newline at end of file + request: + timeout-seconds: 300 \ No newline at end of file diff --git a/framework/fel/java/plugins/tool-mcp-server/pom.xml b/framework/fel/java/plugins/tool-mcp-server/pom.xml index b6072ea4..c0cc60f0 100644 --- a/framework/fel/java/plugins/tool-mcp-server/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-server/pom.xml @@ -44,7 +44,7 @@ io.modelcontextprotocol.sdk mcp - 0.14.1 + 0.15.0 diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java index 324c427d..9bf5bbfe 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java @@ -133,18 +133,21 @@ public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) @Override public Mono notifyClients(String method, Object params) { if (this.sessions.isEmpty()) { - logger.debug("No active sessions to broadcast message to"); + logger.debug("No active sessions to broadcast message."); return Mono.empty(); } - logger.info("Attempting to broadcast message to {} active sessions", this.sessions.size()); + logger.info("Attempting to broadcast message. [sessionCount={}]", this.sessions.size()); return Mono.fromRunnable(() -> { this.sessions.values().parallelStream().forEach(session -> { try { session.sendNotification(method, params).block(); } catch (Exception e) { - logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage(), e); + logger.error("Failed to send message to session. [sessionId={}, error={}]", + session.getId(), + e.getMessage(), + e); } }); }); @@ -159,18 +162,21 @@ public Mono notifyClients(String method, Object params) { public Mono closeGracefully() { return Mono.fromRunnable(() -> { this.isClosing = true; - logger.info("Initiating graceful shutdown with {} active sessions", this.sessions.size()); + logger.info("Initiating graceful shutdown. [sessionCount={}]", this.sessions.size()); this.sessions.values().parallelStream().forEach(session -> { try { session.closeGracefully().block(); } catch (Exception e) { - logger.error("Failed to close session {}: {}", session.getId(), e.getMessage(), e); + logger.error("Failed to close session. [sessionId={}, error={}]", + session.getId(), + e.getMessage(), + e); } }); this.sessions.clear(); - logger.info("Graceful shutdown completed"); + logger.info("Graceful shutdown completed."); }).then().doOnSuccess(v -> { if (this.keepAliveScheduler != null) { this.keepAliveScheduler.shutdown(); @@ -204,7 +210,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[GET] Handling GET request for session: {}", sessionId); + logger.info("[GET] Receiving GET request. [sessionId={}]", sessionId); McpTransportContext transportContext = this.contextExtractor.extract(request); try { @@ -220,7 +226,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo } }); } catch (Exception e) { - logger.error("Failed to handle GET request for session {}: {}", sessionId, e.getMessage(), e); + logger.error("[GET] Failed to handle GET request. [sessionId={}, error={}]", sessionId, e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return null; } @@ -252,18 +258,18 @@ public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResp // Handle JSONRPCMessage if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest && jsonrpcRequest.method() .equals(McpSchema.METHOD_INITIALIZE)) { - logger.info("[POST] Handling initialize method, with receiving message: {}", requestBody); + logger.info("[POST] Handling initialize method. [requestBody={}]", requestBody); return handleInitializeRequest(request, response, jsonrpcRequest); } else { return handleJsonRpcMessage(message, request, requestBody, transportContext, response); } } catch (IllegalArgumentException | IOException e) { - logger.error("Failed to deserialize message: {}", e.getMessage(), e); + logger.error("[POST] Failed to deserialize message. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build()); } catch (Exception e) { - logger.error("Error handling message: {}", e.getMessage(), e); + logger.error("[POST] Error handling message. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -295,7 +301,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[DELETE] Receiving delete request from session: {}", sessionId); + logger.info("[DELETE] Receiving delete request. [sessionId={}]", sessionId); McpTransportContext transportContext = this.contextExtractor.extract(request); try { @@ -304,7 +310,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe response.statusCode(HttpResponseStatus.OK.statusCode()); return null; } catch (Exception e) { - logger.error("Failed to delete session {}: {}", sessionId, e.getMessage(), e); + logger.error("[DELETE] Failed to delete session. [sessionId={}, error={}]", sessionId, e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -392,7 +398,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { String lastId = request.headers().first(HttpHeaders.LAST_EVENT_ID).orElse("0"); - logger.info("[GET] Receiving replay request from session: {}", sessionId); + logger.info("[GET] Handling replay request. [sessionId={}]", sessionId); try { session.replay(lastId) @@ -404,12 +410,12 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); } catch (Exception e) { - logger.error("Failed to replay message: {}", e.getMessage(), e); + logger.error("[GET] Failed to replay message. [error={}]", e.getMessage(), e); emitter.fail(e); } }); } catch (Exception e) { - logger.error("Failed to replay messages: {}", e.getMessage(), e); + logger.error("[GET] Failed to replay messages. [error={}]", e.getMessage(), e); emitter.fail(e); } } @@ -426,7 +432,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo */ private void handleEstablishSseRequest(String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { - logger.info("[GET] Receiving Get request to establish new SSE for session: {}", sessionId); + logger.info("[GET] Handling GET request to establish new SSE. [sessionId={}]", sessionId); McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session.listeningStream(sessionTransport); @@ -438,21 +444,25 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on complete: {}", e.getMessage()); + logger.warn("[SSE] Error closing listeningStream on complete. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } } @Override public void onFailed(Exception cause) { - logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, cause.getMessage()); + logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, cause.getMessage()); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on failure: {}", e.getMessage()); + logger.warn("[SSE] Error closing listeningStream on failure. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } } }); @@ -484,11 +494,11 @@ private Object handleInitializeRequest(HttpClassicServerRequest request, HttpCla response.statusCode(HttpResponseStatus.OK.statusCode()); response.headers().set("Content-Type", MimeType.APPLICATION_JSON.value()); response.headers().set(HttpHeaders.MCP_SESSION_ID, init.session().getId()); - logger.info("[POST] Sending initialize message via HTTP response to session {}", init.session().getId()); + logger.info("[POST] Sending initialize message via HTTP response. [sessionId={}]", init.session().getId()); return Entity.createObject(response, new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initResult, null)); } catch (Exception e) { - logger.error("Failed to initialize session: {}", e.getMessage(), e); + logger.error("[POST] Failed to initialize session. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -515,7 +525,7 @@ private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassi } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[POST] Receiving message from session {}: {}", sessionId, requestBody); + logger.info("[POST] Receiving message from session. [sessionId={}, requestBody={}]", sessionId, requestBody); if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) { handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response); @@ -590,12 +600,12 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); } @Override public void onFailed(Exception e) { - logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, e.getMessage()); + logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, e.getMessage()); } }); @@ -607,7 +617,7 @@ public void onFailed(Exception e) { .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); } catch (Exception e) { - logger.error("Failed to handle request stream: {}", e.getMessage(), e); + logger.error("[POST] Failed to handle request stream. [error={}]", e.getMessage(), e); emitter.fail(e); } }); @@ -643,7 +653,7 @@ private class FitStreamableMcpSessionTransport implements McpStreamableServerTra this.sessionId = sessionId; this.emitter = emitter; this.response = response; - logger.info("[SSE] Building SSE for session: {} ", sessionId); + logger.info("[SSE] Building SSE emitter. [sessionId={}]", sessionId); } /** @@ -669,20 +679,21 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) { return Mono.fromRunnable(() -> { if (this.closed) { - logger.info("Attempted to send message to closed session: {}", this.sessionId); + logger.info("[SSE] Attempted to send message to closed session. [sessionId={}]", this.sessionId); return; } this.lock.lock(); try { if (this.closed) { - logger.info("Session {} was closed during message send attempt", this.sessionId); + logger.info("[SSE] Session was closed during message send attempt. [sessionId={}]", + this.sessionId); return; } // Check if connection is still active before sending if (!this.response.isActive()) { - logger.warn("[SSE] Connection inactive detected while sending message for session: {}", + logger.warn("[SSE] Connection inactive detected while sending message. [sessionId={}]", this.sessionId); this.close(); return; @@ -693,13 +704,18 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId TextEvent.custom().id(this.sessionId).event(Event.MESSAGE.code()).data(jsonText).build(); this.emitter.emit(textEvent); - logger.info("[SSE] Sending message to session {}: {}", this.sessionId, jsonText); + logger.info("[SSE] Sending message to session. [sessionId={}, jsonText={}]", + this.sessionId, + jsonText); } catch (Exception e) { - logger.error("Failed to send message to session {}: {}", this.sessionId, e.getMessage(), e); + logger.error("[SSE] Failed to send message to session. [sessionId={}, error={}]", + this.sessionId, + e.getMessage(), + e); try { this.emitter.fail(e); } catch (Exception errorException) { - logger.error("Failed to send error to SSE builder for session {}: {}", + logger.error("[SSE] Failed to send error to SSE builder. [sessionId={}, error={}]", this.sessionId, errorException.getMessage(), errorException); @@ -741,16 +757,18 @@ public void close() { this.lock.lock(); try { if (this.closed) { - logger.info("Session transport {} already closed", this.sessionId); + logger.info("[SSE] Session transport already closed. [sessionId={}]", this.sessionId); return; } this.closed = true; this.emitter.complete(); - logger.info("[SSE] Closed SSE builder successfully for session {}", sessionId); + logger.info("[SSE] Closed SSE builder successfully. [sessionId={}]", sessionId); } catch (Exception e) { - logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage()); + logger.warn("[SSE] Failed to complete SSE builder. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } finally { this.lock.unlock(); } diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index f6cb6b39..110dc3cb 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -46,6 +46,8 @@ public TestController(McpClientFactory mcpClientFactory) { * Initializes the MCP client by creating an instance using the provided factory and initializing it. * This method sets up the connection to the MCP server and prepares it for further interactions. * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. * @return A string indicating that the initialization was successful. */ @PostMapping(path = "/initialize") diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java index 6a50201b..98318b0a 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java @@ -22,6 +22,13 @@ * @since 2025-05-21 */ public interface McpClient extends Closeable { + /** + * Gets the unique identifier of this MCP client instance. + * + * @return The client ID as a {@link String}. + */ + String getClientId(); + /** * Initializes the MCP Client. */