From 05b6d2fc240cd02435ffcd2fde74373e13f01c77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 20 Oct 2025 16:35:28 +0800 Subject: [PATCH 01/16] MCP SDK Client first commit. --- .../fel/java/plugins/tool-mcp-client/pom.xml | 6 ++ .../support/DefaultMcpStreamableClient.java | 65 +++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index 856c68aa..0fd9e6af 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -37,6 +37,12 @@ org.fitframework.fel tool-mcp-client-service + + io.modelcontextprotocol.sdk + mcp + 0.14.0 + + diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java new file mode 100644 index 00000000..5ea06eae --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -0,0 +1,65 @@ +package modelengine.fel.tool.mcp.client.support; + +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fitframework.annotation.Bean; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +@Component +public class DefaultMcpStreamableClient { + private static final Logger log = Logger.get(DefaultMcpStreamableClient.class); + + @Bean + public HttpClientStreamableHttpTransport mcpTransport() { + return HttpClientStreamableHttpTransport.builder("http://localhost:9000") + .jsonMapper(McpJsonMapper.getDefault()) + .endpoint("/mcp") + .build(); + } + + @Bean + public McpSyncClient mcpSyncClient(HttpClientStreamableHttpTransport transport) { + return McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(10)) + .capabilities(McpSchema.ClientCapabilities.builder() + .roots(true) // Enable roots capability + .elicitation() + .build()) + .loggingConsumer(DefaultMcpStreamableClient::handleLoggingMessage) + .elicitation(DefaultMcpStreamableClient::handleElicitationRequest) + .build(); + } + + public static void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { + System.out.println("[Received log] " + notification.level() + + " - " + notification.data()); + } + + public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) { + Map schema = request.requestedSchema(); + Map userData = new HashMap<>(); + + System.out.println("[ElicitationMessage] "+ request.message()); + + // Check what information is being requested + if (schema != null && schema.containsKey("properties")) { + Map properties = (Map) schema.get("properties"); + if (properties.containsKey("message")) { + System.out.print("[ElicitationRequest] Input additional message: "); + Scanner scanner = new Scanner(System.in); + String input = scanner.nextLine(); + userData.put("message", input); + } + } + return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); + } +} From 40c2bdae2aecdad33e187d62663b22a7b1289e5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 20 Oct 2025 16:35:28 +0800 Subject: [PATCH 02/16] MCP SDK Client first commit. --- .../fel/java/plugins/tool-mcp-client/pom.xml | 6 ++ .../support/DefaultMcpStreamableClient.java | 65 +++++++++++++++++++ 2 files changed, 71 insertions(+) create mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index a8d490b5..940d3722 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -37,6 +37,12 @@ org.fitframework.fel tool-mcp-client-service + + io.modelcontextprotocol.sdk + mcp + 0.14.0 + + diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java new file mode 100644 index 00000000..5ea06eae --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -0,0 +1,65 @@ +package modelengine.fel.tool.mcp.client.support; + +import io.modelcontextprotocol.client.McpClient; +import io.modelcontextprotocol.client.McpSyncClient; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fitframework.annotation.Bean; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +@Component +public class DefaultMcpStreamableClient { + private static final Logger log = Logger.get(DefaultMcpStreamableClient.class); + + @Bean + public HttpClientStreamableHttpTransport mcpTransport() { + return HttpClientStreamableHttpTransport.builder("http://localhost:9000") + .jsonMapper(McpJsonMapper.getDefault()) + .endpoint("/mcp") + .build(); + } + + @Bean + public McpSyncClient mcpSyncClient(HttpClientStreamableHttpTransport transport) { + return McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(10)) + .capabilities(McpSchema.ClientCapabilities.builder() + .roots(true) // Enable roots capability + .elicitation() + .build()) + .loggingConsumer(DefaultMcpStreamableClient::handleLoggingMessage) + .elicitation(DefaultMcpStreamableClient::handleElicitationRequest) + .build(); + } + + public static void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { + System.out.println("[Received log] " + notification.level() + + " - " + notification.data()); + } + + public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) { + Map schema = request.requestedSchema(); + Map userData = new HashMap<>(); + + System.out.println("[ElicitationMessage] "+ request.message()); + + // Check what information is being requested + if (schema != null && schema.containsKey("properties")) { + Map properties = (Map) schema.get("properties"); + if (properties.containsKey("message")) { + System.out.print("[ElicitationRequest] Input additional message: "); + Scanner scanner = new Scanner(System.in); + String input = scanner.nextLine(); + userData.put("message", input); + } + } + return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); + } +} From 2e901e898aab8e83f1e9dac504fb9b3775ae63d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 3 Nov 2025 11:25:09 +0800 Subject: [PATCH 03/16] =?UTF-8?q?=E5=AE=9E=E7=8E=B0fit=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fel/java/plugins/tool-mcp-client/pom.xml | 2 +- .../mcp/client/support/DefaultMcpClient.java | 429 ------------------ .../support/DefaultMcpClientFactory.java | 32 +- .../support/DefaultMcpStreamableClient.java | 217 +++++++-- .../support/McpClientMessageHandler.java | 59 +++ .../fel/tool/mcp/client/support/Result.java | 72 --- .../src/main/resources/application.yml | 3 +- 7 files changed, 256 insertions(+), 558 deletions(-) delete mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java create mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java delete mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index 940d3722..c6b8a7ef 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -40,7 +40,7 @@ io.modelcontextprotocol.sdk mcp - 0.14.0 + 0.14.1 diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java deleted file mode 100644 index 0b8c1faa..00000000 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ /dev/null @@ -1,429 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.client.support; - -import static modelengine.fitframework.inspection.Validation.notBlank; -import static modelengine.fitframework.inspection.Validation.notNull; -import static modelengine.fitframework.util.ObjectUtils.cast; - -import modelengine.fel.tool.mcp.client.McpClient; -import modelengine.fel.tool.mcp.entity.ClientSchema; -import modelengine.fel.tool.mcp.entity.Event; -import modelengine.fel.tool.mcp.entity.JsonRpc; -import modelengine.fel.tool.mcp.entity.Method; -import modelengine.fel.tool.mcp.entity.ServerSchema; -import modelengine.fel.tool.mcp.entity.Tool; -import modelengine.fit.http.client.HttpClassicClient; -import modelengine.fit.http.client.HttpClassicClientRequest; -import modelengine.fit.http.client.HttpClassicClientResponse; -import modelengine.fit.http.entity.Entity; -import modelengine.fit.http.entity.TextEvent; -import modelengine.fit.http.protocol.HttpRequestMethod; -import modelengine.fitframework.flowable.Choir; -import modelengine.fitframework.flowable.Subscription; -import modelengine.fitframework.log.Logger; -import modelengine.fitframework.schedule.ExecutePolicy; -import modelengine.fitframework.schedule.Task; -import modelengine.fitframework.schedule.ThreadPoolExecutor; -import modelengine.fitframework.schedule.ThreadPoolScheduler; -import modelengine.fitframework.serialization.ObjectSerializer; -import modelengine.fitframework.util.CollectionUtils; -import modelengine.fitframework.util.LockUtils; -import modelengine.fitframework.util.MapBuilder; -import modelengine.fitframework.util.ObjectUtils; -import modelengine.fitframework.util.StringUtils; -import modelengine.fitframework.util.ThreadUtils; -import modelengine.fitframework.util.UuidUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.function.BiConsumer; -import java.util.function.Consumer; - -/** - * Represents a default implementation of the MCP client, responsible for interacting with the MCP server. - * This class provides methods for initializing the client, retrieving tools, and calling tools. - * - * @author 季聿阶 - * @since 2025-05-21 - */ -public class DefaultMcpClient implements McpClient { - private static final Logger log = Logger.get(DefaultMcpClient.class); - - private final ObjectSerializer jsonSerializer; - private final HttpClassicClient client; - private final String baseUri; - private final String sseEndpoint; - private final String name; - private final AtomicLong id = new AtomicLong(0); - private final long pingInterval; - - private volatile String messageEndpoint; - private volatile String sessionId; - private volatile ServerSchema serverSchema; - private volatile boolean initialized = false; - private volatile boolean closed = false; - private final Object initializedLock = LockUtils.newSynchronizedLock(); - private final Map>> responseConsumers = new ConcurrentHashMap<>(); - private final Map pendingRequests = new ConcurrentHashMap<>(); - private final Map pendingResults = new ConcurrentHashMap<>(); - - private volatile Subscription subscription; - private volatile ThreadPoolScheduler pingScheduler; - - /** - * Constructs a new instance of the DefaultMcpClient. - * - * @param jsonSerializer The serializer used for JSON serialization and deserialization. - * @param client The HTTP client used for communication with the MCP server. - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. - * @param pingInterval The interval for sending ping messages to the MCP server. Unit: milliseconds. - */ - public DefaultMcpClient(ObjectSerializer jsonSerializer, HttpClassicClient client, String baseUri, - String sseEndpoint, long pingInterval) { - this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); - this.client = notNull(client, "The http client cannot be null."); - this.baseUri = notBlank(baseUri, "The MCP server base URI cannot be blank."); - this.sseEndpoint = notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); - this.name = UuidUtils.randomUuidString(); - this.pingInterval = pingInterval > 0 ? pingInterval : 15_000; - } - - @Override - public void initialize() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.GET, this.baseUri + this.sseEndpoint); - Choir messages = this.client.exchangeStream(request, TextEvent.class); - ThreadPoolExecutor threadPool = ThreadPoolExecutor.custom() - .threadPoolName("mcp-client-" + this.name) - .awaitTermination(3, TimeUnit.SECONDS) - .isImmediateShutdown(true) - .corePoolSize(1) - .maximumPoolSize(1) - .keepAliveTime(60, TimeUnit.SECONDS) - .workQueueCapacity(Integer.MAX_VALUE) - .isDaemonThread(true) - .exceptionHandler((thread, cause) -> log.warn("Exception in MCP client pool.", cause)) - .rejectedExecutionHandler(new java.util.concurrent.ThreadPoolExecutor.AbortPolicy()) - .build(); - messages.subscribeOn(threadPool).subscribe(subscription -> { - log.info("Prepare to create SSE channel."); - this.subscription = subscription; - subscription.request(Long.MAX_VALUE); - }, - (subscription, textEvent) -> this.consumeTextEvent(textEvent), - subscription -> log.info("SSE channel is completed."), - (subscription, cause) -> log.error("SSE channel is failed.", cause)); - if (!this.waitInitialized()) { - throw new IllegalStateException("Failed to initialize."); - } - } - - private void consumeTextEvent(TextEvent textEvent) { - log.info("Receive message from MCP server. [id={}, event={}, message={}]", - textEvent.id(), - textEvent.event(), - textEvent.data()); - if (StringUtils.isBlank(textEvent.event()) || StringUtils.isBlank((String) textEvent.data())) { - return; - } - if (Objects.equals(textEvent.event(), Event.ENDPOINT.code())) { - this.initializeMcpServer(textEvent); - return; - } - Map jsonRpc = this.jsonSerializer.deserialize((String) textEvent.data(), Object.class); - Object messageId = jsonRpc.get("id"); - if (messageId == null) { - // Notification message, ignore. - return; - } - long actualId = Long.parseLong(messageId.toString()); - Consumer> consumer = this.responseConsumers.remove(actualId); - if (consumer == null) { - log.info("No consumer registered. [id={}]", actualId); - return; - } - Object error = jsonRpc.get("error"); - JsonRpc.Response response; - if (error == null) { - response = JsonRpc.createResponse(actualId, jsonRpc.get("result")); - } else { - response = JsonRpc.createResponseWithError(actualId, error); - } - consumer.accept(response); - } - - private void pingServer() { - if (this.isNotInitialized()) { - log.info("MCP client is not initialized and {} method will be delayed.", Method.PING.code()); - return; - } - this.post2McpServer(Method.PING, null, null); - } - - private void initializeMcpServer(TextEvent textEvent) { - this.messageEndpoint = textEvent.data().toString(); - ClientSchema schema = new ClientSchema("2024-11-05", - new ClientSchema.Capabilities(), - new ClientSchema.Info("FIT MCP Client", "3.6.0-SNAPSHOT")); - this.post2McpServer(Method.INITIALIZE, schema, (request, currentId) -> { - this.sessionId = request.queries() - .first("session_id") - .orElseThrow(() -> new IllegalStateException("The session_id cannot be empty.")); - this.responseConsumers.put(currentId, this::initializedMcpServer); - }); - } - - private void initializedMcpServer(JsonRpc.Response response) { - if (response.error() != null) { - log.error("Abort send {} method to MCP server. [sessionId={}, error={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - response.error()); - throw new IllegalStateException(response.error().toString()); - } - this.recordServerSchema(response); - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint); - JsonRpc.Notification notification = JsonRpc.createNotification(Method.NOTIFICATION_INITIALIZED.code()); - request.entity(Entity.createObject(request, notification)); - log.info("Send {} method to MCP server. [sessionId={}, notification={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - notification); - try (HttpClassicClientResponse exchange = request.exchange(Object.class)) { - if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) { - log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - exchange.statusCode()); - } else { - log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]", - Method.NOTIFICATION_INITIALIZED.code(), - this.sessionId, - exchange.statusCode()); - } - } catch (IOException e) { - throw new IllegalStateException(e); - } - synchronized (this.initializedLock) { - this.initialized = true; - this.initializedLock.notifyAll(); - } - this.pingScheduler = ThreadPoolScheduler.custom() - .threadPoolName("mcp-client-ping-" + this.name) - .awaitTermination(3, TimeUnit.SECONDS) - .isImmediateShutdown(true) - .corePoolSize(1) - .maximumPoolSize(1) - .keepAliveTime(60, TimeUnit.SECONDS) - .workQueueCapacity(Integer.MAX_VALUE) - .isDaemonThread(true) - .build(); - this.pingScheduler.schedule(Task.builder() - .runnable(this::pingServer) - .policy(ExecutePolicy.fixedDelay(this.pingInterval)) - .build(), this.pingInterval); - } - - private void recordServerSchema(JsonRpc.Response response) { - Map mapResult = cast(response.result()); - this.serverSchema = ServerSchema.create(mapResult); - log.info("MCP server has initialized. [server={}]", this.serverSchema); - } - - @Override - public List getTools() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (this.isNotInitialized()) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } - long requestId = this.post2McpServer(Method.TOOLS_LIST, null, (request, currentId) -> { - this.responseConsumers.put(currentId, this::getTools0); - this.pendingRequests.put(currentId, true); - }); - while (this.pendingRequests.get(requestId)) { - ThreadUtils.sleep(100); - } - Result result = this.pendingResults.remove(requestId); - this.pendingRequests.remove(requestId); - if (result.isSuccess()) { - return ObjectUtils.cast(result.getContent()); - } else { - throw new IllegalStateException(result.getError()); - } - } - - private void getTools0(JsonRpc.Response response) { - if (response.error() != null) { - String error = StringUtils.format("Failed to get tools list from MCP server. [sessionId={0}, response={1}]", - this.sessionId, - response); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map result = cast(response.result()); - List> rawTools = cast(result.get("tools")); - List tools = new ArrayList<>(rawTools.stream() - .map(rawTool -> ObjectUtils.toCustomObject(rawTool, Tool.class)) - .toList()); - this.pendingResults.put(response.id(), Result.success(tools)); - this.pendingRequests.put(response.id(), false); - } - - @Override - public Object callTool(String name, Map arguments) { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (this.isNotInitialized()) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } - long requestId = this.post2McpServer(Method.TOOLS_CALL, - MapBuilder.get().put("name", name).put("arguments", arguments).build(), - (request, currentId) -> { - this.responseConsumers.put(currentId, this::callTools0); - this.pendingRequests.put(currentId, true); - }); - while (this.pendingRequests.get(requestId)) { - ThreadUtils.sleep(100); - } - Result result = this.pendingResults.remove(requestId); - this.pendingRequests.remove(requestId); - if (result.isSuccess()) { - return result.getContent(); - } else { - throw new IllegalStateException(result.getError()); - } - } - - private void callTools0(JsonRpc.Response response) { - if (response.error() != null) { - String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, response={1}]", - this.sessionId, - response); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map result = cast(response.result()); - boolean isError = cast(result.get("isError")); - if (isError) { - String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, result={1}]", - this.sessionId, - result); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - List> rawContents = cast(result.get("content")); - if (CollectionUtils.isEmpty(rawContents)) { - String error = StringUtils.format( - "Failed to call tool from MCP server: no result returned. [sessionId={0}, result={1}]", - this.sessionId, - result); - this.pendingResults.put(response.id(), Result.error(error)); - this.pendingRequests.put(response.id(), false); - return; - } - Map rawContent = rawContents.get(0); - this.pendingResults.put(response.id(), Result.success(rawContent.get("text"))); - this.pendingRequests.put(response.id(), false); - } - - private long post2McpServer(Method method, Object requestParams, - BiConsumer requestConsumer) { - HttpClassicClientRequest request = - this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint); - long currentId = this.getNextId(); - if (requestConsumer != null) { - requestConsumer.accept(request, currentId); - } - JsonRpc.Request rpcRequest = JsonRpc.createRequest(currentId, method.code(), requestParams); - request.entity(Entity.createObject(request, rpcRequest)); - log.info("Send {} method to MCP server. [sessionId={}, request={}]", method.code(), this.sessionId, rpcRequest); - try (HttpClassicClientResponse exchange = request.exchange(Object.class)) { - if (exchange.statusCode() >= 200 && exchange.statusCode() < 300) { - log.info("Send {} method to MCP server successfully. [sessionId={}, statusCode={}]", - method.code(), - this.sessionId, - exchange.statusCode()); - } else { - log.error("Failed to {} MCP server. [sessionId={}, statusCode={}]", - method.code(), - this.sessionId, - exchange.statusCode()); - } - } catch (IOException e) { - throw new IllegalStateException(StringUtils.format("Failed to {0} MCP server. [sessionId={1}]", - method.code(), - this.sessionId), e); - } - return currentId; - } - - private long getNextId() { - long tmpId = this.id.getAndIncrement(); - if (tmpId < 0) { - this.id.set(0); - return 0; - } - return tmpId; - } - - private boolean isNotInitialized() { - return !this.initialized; - } - - private boolean waitInitialized() { - if (this.initialized) { - return true; - } - synchronized (this.initializedLock) { - if (this.initialized) { - return true; - } - try { - this.initializedLock.wait(60_000L); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IllegalStateException("Failed to initialize.", e); - } - } - return this.initialized; - } - - @Override - public void close() throws IOException { - this.closed = true; - if (this.subscription != null) { - this.subscription.cancel(); - } - try { - if (this.pingScheduler != null) { - this.pingScheduler.shutdown(); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - log.info("Close MCP client. [name={}, sessionId={}]", this.name, this.sessionId); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 4af4869e..d319ac8d 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -8,6 +8,9 @@ import static modelengine.fitframework.inspection.Validation.notNull; +import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fit.http.client.HttpClassicClient; @@ -17,41 +20,30 @@ import modelengine.fitframework.annotation.Value; import modelengine.fitframework.serialization.ObjectSerializer; +import java.time.Duration; + /** - * Represents a factory for creating instances of the DefaultMcpClient. - * This class is responsible for initializing and configuring the HTTP client and JSON serializer - * required by the DefaultMcpClient. + * Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}. + * This class is responsible for initializing and configuring. * * @author 季聿阶 * @since 2025-05-21 */ @Component public class DefaultMcpClientFactory implements McpClientFactory { - private final HttpClassicClient client; - private final ObjectSerializer jsonSerializer; - private final long pingInterval; + private final int requestTimeoutSeconds; /** * Constructs a new instance of the DefaultMcpClientFactory. * - * @param clientFactory The factory used to create the HTTP client. - * @param jsonSerializer The JSON serializer used for serialization and deserialization. - * @param pingInterval The interval between ping requests. Units: milliseconds. + * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. */ - public DefaultMcpClientFactory(HttpClassicClientFactory clientFactory, - @Fit(alias = "json") ObjectSerializer jsonSerializer, - @Value("${mcp.client.ping-interval}") long pingInterval) { - this.client = clientFactory.create(HttpClassicClientFactory.Config.builder() - .connectTimeout(30_000) - .socketTimeout(60_000) - .connectionRequestTimeout(60_000) - .build()); - this.jsonSerializer = notNull(jsonSerializer, "The json serializer cannot be null."); - this.pingInterval = pingInterval; + public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}")int requestTimeoutSeconds) { + this.requestTimeoutSeconds = requestTimeoutSeconds; } @Override public McpClient create(String baseUri, String sseEndpoint) { - return new DefaultMcpClient(this.jsonSerializer, this.client, baseUri, sseEndpoint, this.pingInterval); + return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 5ea06eae..161c144c 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -1,65 +1,212 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + package modelengine.fel.tool.mcp.client.support; -import io.modelcontextprotocol.client.McpClient; import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.spec.McpSchema; -import modelengine.fitframework.annotation.Bean; -import modelengine.fitframework.annotation.Component; +import modelengine.fel.tool.mcp.client.McpClient; +import modelengine.fel.tool.mcp.entity.Tool; import modelengine.fitframework.log.Logger; +import java.io.IOException; import java.time.Duration; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Scanner; +import java.util.stream.Collectors; + +import static modelengine.fitframework.inspection.Validation.notBlank; -@Component -public class DefaultMcpStreamableClient { +/** + * A default implementation of the MCP client that uses the MCP SDK's streamable HTTP transport. + * + * @author 黄可欣 + * @since 2025-11-03 + */ +public class DefaultMcpStreamableClient implements McpClient { private static final Logger log = Logger.get(DefaultMcpStreamableClient.class); - @Bean - public HttpClientStreamableHttpTransport mcpTransport() { - return HttpClientStreamableHttpTransport.builder("http://localhost:9000") + private final McpSyncClient mcpSyncClient; + private volatile boolean initialized = false; + private volatile boolean closed = false; + + /** + * Constructs a new instance of the DefaultMcpStreamableClient. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. + * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. + */ + public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) { + notBlank(baseUri, "The MCP server base URI cannot be blank."); + notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); + HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) .jsonMapper(McpJsonMapper.getDefault()) - .endpoint("/mcp") + .endpoint(sseEndpoint) .build(); - } - - @Bean - public McpSyncClient mcpSyncClient(HttpClientStreamableHttpTransport transport) { - return McpClient.sync(transport) - .requestTimeout(Duration.ofSeconds(10)) + this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 5)) .capabilities(McpSchema.ClientCapabilities.builder() - .roots(true) // Enable roots capability .elicitation() .build()) - .loggingConsumer(DefaultMcpStreamableClient::handleLoggingMessage) - .elicitation(DefaultMcpStreamableClient::handleElicitationRequest) + .loggingConsumer(McpClientMessageHandler::handleLoggingMessage) + .elicitation(McpClientMessageHandler::handleElicitationRequest) .build(); } - public static void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { - System.out.println("[Received log] " + notification.level() + - " - " + notification.data()); + /** + * Initializes the MCP client connection. + * + * @throws IllegalStateException if the client has already been closed. + */ + @Override + public void initialize() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } + mcpSyncClient.initialize(); + this.initialized = true; + log.info("MCP client initialized successfully."); + } + + /** + * Retrieves the list of available tools from the MCP server. + * + * @return A {@link List} of {@link Tool} objects representing the available tools. + * @throws IllegalStateException if the client is closed, not initialized, or if + * the server request fails. + */ + @Override + public List getTools() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } + if (!this.initialized) { + throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); + } + + try { + McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); + if (result == null || result.tools() == null) { + log.warn("Failed to get tools from MCP server: result is null."); + throw new IllegalStateException("Failed to get tools from MCP server: result is null."); + } + + List tools = result.tools().stream() + .map(this::convertToFelTool) + .collect(Collectors.toList()); + + log.info("Successfully retrieved {} tools from MCP server.", tools.size()); + return tools; + } catch (Exception e) { + log.error("Failed to get tools from MCP server: {}", e); + throw new IllegalStateException("Failed to get tools from MCP server: " + e.getMessage(), e); + } + } + + /** + * Invokes a specific tool on the MCP server with the provided arguments. + * + * @param name The name of the tool to invoke, as a {@link String}. + * @param arguments The arguments to pass to the tool, as a {@link Map} of parameter names to values. + * @return The result of the tool invocation. For text content, returns the text as a {@link String}. + * For image content, returns the {@link McpSchema.ImageContent} object. + * Returns {@code null} if the tool returns empty content. + * @throws IllegalStateException if the client is closed, not initialized, if the tool + * returns an error, or if the server request fails. + */ + @Override + public Object callTool(String name, Map arguments) { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } + if (!this.initialized) { + throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); + } + + try { + log.info("Calling tool: {} with arguments: {}", name, arguments); + McpSchema.CallToolResult result = this.mcpSyncClient.callTool( + new McpSchema.CallToolRequest(name, arguments) + ); + + if (result == null) { + log.error("Failed to call tool '{}': result is null.", name); + throw new IllegalStateException("Failed to call tool '" + name + "': result is null."); + } + + if (result.isError() != null && result.isError()) { + String errorMsg = "Tool '" + name + "' returned an error."; + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } + + if (result.content() == null || result.content().isEmpty()) { + log.warn("Tool '{}' returned empty content.", name); + return null; + } + + Object content = result.content().get(0); + if (content instanceof McpSchema.TextContent textContent) { + log.info("Successfully called tool '{}', result: {}", name, textContent.text()); + return textContent.text(); + } else if (content instanceof McpSchema.ImageContent imageContent) { + log.info("Successfully called tool '{}', returned image content.", name); + return imageContent; + } else { + log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName()); + return content; + } + + } catch (Exception e) { + log.error("Failed to call tool '{}' from MCP server.", name, e); + throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); + } } - public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) { - Map schema = request.requestedSchema(); - Map userData = new HashMap<>(); + /** + * Closes the MCP client connection and releases associated resources. + * + * @throws IOException if an I/O error occurs during the close operation. + */ + @Override + public void close() throws IOException { + this.closed = true; + this.mcpSyncClient.closeGracefully(); + log.info("MCP client closed."); + } - System.out.println("[ElicitationMessage] "+ request.message()); + /** + * Converts an MCP SDK Tool to a FEL Tool entity. + * + * @param mcpTool The MCP SDK {@link McpSchema.Tool} to convert. + * @return A FEL {@link Tool} entity with the corresponding name, description, and input schema. + */ + private Tool convertToFelTool(McpSchema.Tool mcpTool) { + Tool tool = new Tool(); + tool.setName(mcpTool.name()); + tool.setDescription(mcpTool.description()); - // Check what information is being requested - if (schema != null && schema.containsKey("properties")) { - Map properties = (Map) schema.get("properties"); - if (properties.containsKey("message")) { - System.out.print("[ElicitationRequest] Input additional message: "); - Scanner scanner = new Scanner(System.in); - String input = scanner.nextLine(); - userData.put("message", input); + // Convert JsonSchema to Map + McpSchema.JsonSchema inputSchema = mcpTool.inputSchema(); + if (inputSchema != null) { + Map schemaMap = new HashMap<>(); + schemaMap.put("type", inputSchema.type()); + if (inputSchema.properties() != null) { + schemaMap.put("properties", inputSchema.properties()); + } + if (inputSchema.required() != null) { + schemaMap.put("required", inputSchema.required()); } + tool.setInputSchema(schemaMap); } - return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); + + return tool; } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java new file mode 100644 index 00000000..fb75106b --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java @@ -0,0 +1,59 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.client.support; + +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +/** + * Handles MCP client messages received from MCP server, + * including logging notifications and elicitation requests. + * + * @author 黄可欣 + * @since 2025-11-03 + */ +@Component +public class McpClientMessageHandler { + private static final Logger log = Logger.get(McpClientMessageHandler.class); + + /** + * Handles logging messages received from the MCP server. + * + * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. + */ + public static void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { + log.info("[Client] log: {}-{}", notification.level(), notification.data()); + } + + /** + * Handles elicitation requests from the MCP server. + * + * @param request The {@link McpSchema.ElicitRequest} containing the elicitation message and schema. + * @return A {@link McpSchema.ElicitResult} with the action {@code ACCEPT} and any collected user data. + */ + public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) { + Map schema = request.requestedSchema(); + Map userData = new HashMap<>(); + + log.info("[Client]get elicitation: {}", request.message()); + if (schema != null && schema.containsKey("properties")) { + Map properties = (Map) schema.get("properties"); + if (properties.containsKey("message")) { + log.info("[ElicitationRequest] Input additional message: "); + Scanner scanner = new Scanner(System.in); + String input = scanner.nextLine(); + userData.put("message", input); + } + } + return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java deleted file mode 100644 index 4b655024..00000000 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java +++ /dev/null @@ -1,72 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.client.support; - -/** - * 表示调用 MCP 的结果。 - * - * @author 季聿阶 - * @since 2025-08-04 - */ -public class Result { - private final boolean success; - private final Object content; - private final String error; - - private Result(boolean success, Object content, String error) { - this.success = success; - this.content = content; - this.error = error; - } - - /** - * 创建一个成功的结果。 - * - * @param content 表示成功结果的内容的 {@link Object}。 - * @return 表示成功结果的对象的 {@link Result}。 - */ - public static Result success(Object content) { - return new Result(true, content, null); - } - - /** - * 创建一个失败的结果。 - * - * @param error 表示错误结果的信息的 {@link String}。 - * @return 表示错误结果的对象的 {@link Result}。 - */ - public static Result error(String error) { - return new Result(false, null, error); - } - - /** - * 获取结果是否成功。 - * - * @return 如果结果成功,则返回 {@code true};否则返回 {@code false}。 - */ - public boolean isSuccess() { - return this.success; - } - - /** - * 获取结果内容。 - * - * @return 表示结果内容的 {@link Object}。 - */ - public Object getContent() { - return this.content; - } - - /** - * 获取结果错误信息。 - * - * @return 表示错误信息的 {@link String}。 - */ - public String getError() { - return this.error; - } -} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml index 77b64240..106eaa55 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml @@ -5,4 +5,5 @@ fit: mcp: client: - ping-interval: 15000 \ No newline at end of file + request: + timeout-seconds: 10 \ No newline at end of file From 4cf12f979dedd5afd5e9d06846b97599588f7676 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 3 Nov 2025 15:08:18 +0800 Subject: [PATCH 04/16] =?UTF-8?q?=E5=9B=A0=E4=B8=BAjsonMapper=E9=97=AE?= =?UTF-8?q?=E9=A2=98=E6=9A=82=E6=97=B6=E5=9B=9E=E9=80=800.12.0=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fel/java/plugins/tool-mcp-client/pom.xml | 2 +- .../mcp/client/support/DefaultMcpClientFactory.java | 11 ----------- .../client/support/DefaultMcpStreamableClient.java | 5 +++-- .../mcp/client/support/McpClientMessageHandler.java | 2 +- 4 files changed, 5 insertions(+), 15 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index c6b8a7ef..f54443e6 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -40,7 +40,7 @@ io.modelcontextprotocol.sdk mcp - 0.14.1 + 0.12.0 diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index d319ac8d..3831af92 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -6,21 +6,10 @@ package modelengine.fel.tool.mcp.client.support; -import static modelengine.fitframework.inspection.Validation.notNull; - -import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; -import io.modelcontextprotocol.json.McpJsonMapper; -import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; -import modelengine.fit.http.client.HttpClassicClient; -import modelengine.fit.http.client.HttpClassicClientFactory; import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.annotation.Fit; import modelengine.fitframework.annotation.Value; -import modelengine.fitframework.serialization.ObjectSerializer; - -import java.time.Duration; /** * Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}. diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 161c144c..2f67bd85 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -6,9 +6,9 @@ package modelengine.fel.tool.mcp.client.support; +import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; -import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.entity.Tool; @@ -47,7 +47,7 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques notBlank(baseUri, "The MCP server base URI cannot be blank."); notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) - .jsonMapper(McpJsonMapper.getDefault()) + .objectMapper(new ObjectMapper()) .endpoint(sseEndpoint) .build(); this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) @@ -103,6 +103,7 @@ public List getTools() { .collect(Collectors.toList()); log.info("Successfully retrieved {} tools from MCP server.", tools.size()); + tools.forEach(tool -> log.info("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription())); return tools; } catch (Exception e) { log.error("Failed to get tools from MCP server: {}", e); diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java index fb75106b..88949b96 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java @@ -48,7 +48,7 @@ public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRe if (schema != null && schema.containsKey("properties")) { Map properties = (Map) schema.get("properties"); if (properties.containsKey("message")) { - log.info("[ElicitationRequest] Input additional message: "); + log.info("[ElicitationRequest] Please input additional message: "); Scanner scanner = new Scanner(System.in); String input = scanner.nextLine(); userData.put("message", input); From 4aa02a148914ec7f1908f67a2eb9ff19bccae6ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 3 Nov 2025 17:31:22 +0800 Subject: [PATCH 05/16] =?UTF-8?q?SDK=E7=89=88=E6=9C=AC=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E4=B8=BA0.15.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fel/java/plugins/tool-mcp-client/pom.xml | 3 +-- .../mcp/client/support/DefaultMcpStreamableClient.java | 8 +++++++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/pom.xml b/framework/fel/java/plugins/tool-mcp-client/pom.xml index f54443e6..df882d44 100644 --- a/framework/fel/java/plugins/tool-mcp-client/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-client/pom.xml @@ -40,10 +40,9 @@ io.modelcontextprotocol.sdk mcp - 0.12.0 + 0.15.0 - org.junit.jupiter diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 2f67bd85..2dbdd4be 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -9,6 +9,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; +import io.modelcontextprotocol.json.McpJsonMapper; +import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; +import io.modelcontextprotocol.json.schema.JsonSchemaValidator; +import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator; import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.entity.Tool; @@ -46,8 +50,9 @@ public class DefaultMcpStreamableClient implements McpClient { public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) { notBlank(baseUri, "The MCP server base URI cannot be blank."); notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); + ObjectMapper mapper = new ObjectMapper(); HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) - .objectMapper(new ObjectMapper()) + .jsonMapper(new JacksonMcpJsonMapper(mapper)) .endpoint(sseEndpoint) .build(); this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) @@ -57,6 +62,7 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques .build()) .loggingConsumer(McpClientMessageHandler::handleLoggingMessage) .elicitation(McpClientMessageHandler::handleElicitationRequest) + .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) .build(); } From f94556ebfc989470ee5a73d53c6349ae0c35ac6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Wed, 5 Nov 2025 11:39:01 +0800 Subject: [PATCH 06/16] =?UTF-8?q?=E6=A0=BC=E5=BC=8F=E5=8C=96=EF=BC=8C?= =?UTF-8?q?=E5=8A=A0callTool=E9=94=99=E8=AF=AF=E6=97=A5=E5=BF=97=E6=9B=B4?= =?UTF-8?q?=E6=96=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpClientFactory.java | 2 +- .../support/DefaultMcpStreamableClient.java | 44 ++++++++++--------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 3831af92..69300206 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -27,7 +27,7 @@ public class DefaultMcpClientFactory implements McpClientFactory { * * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. */ - public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}")int requestTimeoutSeconds) { + public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") int requestTimeoutSeconds) { this.requestTimeoutSeconds = requestTimeoutSeconds; } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 2dbdd4be..e303faa1 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -6,12 +6,13 @@ package modelengine.fel.tool.mcp.client.support; +import static modelengine.fitframework.inspection.Validation.notBlank; + import com.fasterxml.jackson.databind.ObjectMapper; + import io.modelcontextprotocol.client.McpSyncClient; import io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTransport; -import io.modelcontextprotocol.json.McpJsonMapper; import io.modelcontextprotocol.json.jackson.JacksonMcpJsonMapper; -import io.modelcontextprotocol.json.schema.JsonSchemaValidator; import io.modelcontextprotocol.json.schema.jackson.DefaultJsonSchemaValidator; import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; @@ -25,8 +26,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static modelengine.fitframework.inspection.Validation.notBlank; - /** * A default implementation of the MCP client that uses the MCP SDK's streamable HTTP transport. * @@ -57,9 +56,7 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques .build(); this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 5)) - .capabilities(McpSchema.ClientCapabilities.builder() - .elicitation() - .build()) + .capabilities(McpSchema.ClientCapabilities.builder().elicitation().build()) .loggingConsumer(McpClientMessageHandler::handleLoggingMessage) .elicitation(McpClientMessageHandler::handleElicitationRequest) .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) @@ -96,18 +93,16 @@ public List getTools() { if (!this.initialized) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } - + try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { log.warn("Failed to get tools from MCP server: result is null."); throw new IllegalStateException("Failed to get tools from MCP server: result is null."); } - - List tools = result.tools().stream() - .map(this::convertToFelTool) - .collect(Collectors.toList()); - + + List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); + log.info("Successfully retrieved {} tools from MCP server.", tools.size()); tools.forEach(tool -> log.info("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription())); return tools; @@ -136,24 +131,31 @@ public Object callTool(String name, Map arguments) { if (!this.initialized) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } - + try { log.info("Calling tool: {} with arguments: {}", name, arguments); - McpSchema.CallToolResult result = this.mcpSyncClient.callTool( - new McpSchema.CallToolRequest(name, arguments) - ); - + McpSchema.CallToolResult result = + this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments)); + if (result == null) { log.error("Failed to call tool '{}': result is null.", name); throw new IllegalStateException("Failed to call tool '" + name + "': result is null."); } if (result.isError() != null && result.isError()) { - String errorMsg = "Tool '" + name + "' returned an error."; + String errorMsg = "Tool '" + name + "' returned an error"; + if (result.content() != null && !result.content().isEmpty()) { + Object errorContent = result.content().get(0); + if (errorContent instanceof McpSchema.TextContent textContent) { + errorMsg += ": " + textContent.text(); + } else { + errorMsg += ": " + errorContent; + } + } log.error(errorMsg); throw new IllegalStateException(errorMsg); } - + if (result.content() == null || result.content().isEmpty()) { log.warn("Tool '{}' returned empty content.", name); return null; @@ -170,7 +172,7 @@ public Object callTool(String name, Map arguments) { log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName()); return content; } - + } catch (Exception e) { log.error("Failed to call tool '{}' from MCP server.", name, e); throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); From 3ce782c3b96c4f11c54748dfefb557732145a4b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Wed, 5 Nov 2025 11:40:51 +0800 Subject: [PATCH 07/16] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E9=BB=98=E8=AE=A4=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../plugins/tool-mcp-client/src/main/resources/application.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml index 106eaa55..18a50455 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/resources/application.yml @@ -6,4 +6,4 @@ fit: mcp: client: request: - timeout-seconds: 10 \ No newline at end of file + timeout-seconds: 300 \ No newline at end of file From cc8116affcfae8b214439d3ec2d1e06d17a50ff9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Fri, 7 Nov 2025 15:34:28 +0800 Subject: [PATCH 08/16] =?UTF-8?q?=E4=BC=98=E5=8C=96=E8=BF=87=E9=95=BF?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpStreamableClient.java | 74 +++++++++++-------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index e303faa1..5a271d8d 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -131,7 +131,6 @@ public Object callTool(String name, Map arguments) { if (!this.initialized) { throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); } - try { log.info("Calling tool: {} with arguments: {}", name, arguments); McpSchema.CallToolResult result = @@ -141,41 +140,54 @@ public Object callTool(String name, Map arguments) { log.error("Failed to call tool '{}': result is null.", name); throw new IllegalStateException("Failed to call tool '" + name + "': result is null."); } + return processToolResult(result, name); + } catch (Exception e) { + log.error("Failed to call tool '{}' from MCP server.", name, e); + throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); + } + } - if (result.isError() != null && result.isError()) { - String errorMsg = "Tool '" + name + "' returned an error"; - if (result.content() != null && !result.content().isEmpty()) { - Object errorContent = result.content().get(0); - if (errorContent instanceof McpSchema.TextContent textContent) { - errorMsg += ": " + textContent.text(); - } else { - errorMsg += ": " + errorContent; - } + /** + * Processes the tool call result and extracts the content. + * Handles error cases and different content types (text, image, etc.). + * + * @param result The {@link McpSchema.CallToolResult} returned from the tool call. + * @param name The name of the tool that was called. + * @return The extracted content. For text content, returns the text as a {@link String}. + * For image content, returns the {@link McpSchema.ImageContent} object. + * Returns {@code null} if the tool returns empty content. + * @throws IllegalStateException if the tool returns an error. + */ + private Object processToolResult(McpSchema.CallToolResult result, String name) { + if (result.isError() != null && result.isError()) { + String errorMsg = "Tool '" + name + "' returned an error"; + if (result.content() != null && !result.content().isEmpty()) { + Object errorContent = result.content().get(0); + if (errorContent instanceof McpSchema.TextContent textContent) { + errorMsg += ": " + textContent.text(); + } else { + errorMsg += ": " + errorContent; } - log.error(errorMsg); - throw new IllegalStateException(errorMsg); - } - - if (result.content() == null || result.content().isEmpty()) { - log.warn("Tool '{}' returned empty content.", name); - return null; } + log.error(errorMsg); + throw new IllegalStateException(errorMsg); + } - Object content = result.content().get(0); - if (content instanceof McpSchema.TextContent textContent) { - log.info("Successfully called tool '{}', result: {}", name, textContent.text()); - return textContent.text(); - } else if (content instanceof McpSchema.ImageContent imageContent) { - log.info("Successfully called tool '{}', returned image content.", name); - return imageContent; - } else { - log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName()); - return content; - } + if (result.content() == null || result.content().isEmpty()) { + log.warn("Tool '{}' returned empty content.", name); + return null; + } - } catch (Exception e) { - log.error("Failed to call tool '{}' from MCP server.", name, e); - throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); + Object content = result.content().get(0); + if (content instanceof McpSchema.TextContent textContent) { + log.info("Successfully called tool '{}', result: {}", name, textContent.text()); + return textContent.text(); + } else if (content instanceof McpSchema.ImageContent imageContent) { + log.info("Successfully called tool '{}', returned image content.", name); + return imageContent; + } else { + log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName()); + return content; } } From c38ae25c37bcf848b8527f622416a93f5fcc8cd2 Mon Sep 17 00:00:00 2001 From: 3200105739 <3200105739@zju.edu.cn> Date: Sat, 8 Nov 2025 18:08:35 +0800 Subject: [PATCH 09/16] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=B7=A5=E5=8E=82?= =?UTF-8?q?=E7=B1=BB=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpClientFactory.java | 29 +++- .../DefaultMcpClientMessageHandler.java | 36 +++++ .../support/DefaultMcpStreamableClient.java | 142 ++++++++++++------ .../support/McpClientMessageHandler.java | 59 -------- .../fel/java/plugins/tool-mcp-server/pom.xml | 2 +- .../services/tool-mcp-client-service/pom.xml | 5 + .../fel/tool/mcp/client/McpClient.java | 7 + .../fel/tool/mcp/client/McpClientFactory.java | 41 ++++- 8 files changed, 211 insertions(+), 110 deletions(-) create mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java delete mode 100644 framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 69300206..4791828d 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -6,11 +6,15 @@ package modelengine.fel.tool.mcp.client.support; +import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fitframework.annotation.Component; import modelengine.fitframework.annotation.Value; +import java.util.function.Consumer; +import java.util.function.Function; + /** * Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}. * This class is responsible for initializing and configuring. @@ -28,11 +32,32 @@ public class DefaultMcpClientFactory implements McpClientFactory { * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. */ public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") int requestTimeoutSeconds) { - this.requestTimeoutSeconds = requestTimeoutSeconds; + this.requestTimeoutSeconds = requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 180; } @Override public McpClient create(String baseUri, String sseEndpoint) { - return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds); + return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, null); + } + + @Override + public McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer) { + return create(baseUri, sseEndpoint, loggingConsumer, null); + } + + @Override + public McpClient create(String baseUri, String sseEndpoint, Function elicitationHandler) { + return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, elicitationHandler); + } + + @Override + public McpClient create(String baseUri, String sseEndpoint, + Consumer loggingConsumer, + Function elicitationHandler) { + return new DefaultMcpStreamableClient(baseUri, + sseEndpoint, + requestTimeoutSeconds, + loggingConsumer, + elicitationHandler); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java new file mode 100644 index 00000000..091b100a --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java @@ -0,0 +1,36 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.client.support; + +import io.modelcontextprotocol.spec.McpSchema; +import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; + +import java.util.HashMap; +import java.util.Map; +import java.util.Scanner; + +/** + * Handles MCP client messages received from MCP server, + * including logging notifications and elicitation requests. + * + * @author 黄可欣 + * @since 2025-11-03 + */ +@Component +public class DefaultMcpClientMessageHandler { + private static final Logger log = Logger.get(DefaultMcpClientMessageHandler.class); + + /** + * Handles logging messages received from the MCP server. + * + * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. + */ + public static void defaultLoggingMessageHandler(McpSchema.LoggingMessageNotification notification) { + log.info("[Client] log: {}-{}", notification.level(), notification.data()); + } +} diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 5a271d8d..3565096d 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -24,6 +24,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; /** @@ -35,6 +38,7 @@ public class DefaultMcpStreamableClient implements McpClient { private static final Logger log = Logger.get(DefaultMcpStreamableClient.class); + private final String clientId; private final McpSyncClient mcpSyncClient; private volatile boolean initialized = false; private volatile boolean closed = false; @@ -45,22 +49,44 @@ public class DefaultMcpStreamableClient implements McpClient { * @param baseUri The base URI of the MCP server. * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. + * @param loggingConsumer The consumer to handle logging messages from the MCP server. + * @param elicitationHandler The function to handle elicitation requests from the MCP server. */ - public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) { + public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds, + Consumer loggingConsumer, + Function elicitationHandler) { + this.clientId = UUID.randomUUID().toString(); notBlank(baseUri, "The MCP server base URI cannot be blank."); notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); + log.info("Creating MCP client [{}] for server: {}", clientId, baseUri); ObjectMapper mapper = new ObjectMapper(); HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) .jsonMapper(new JacksonMcpJsonMapper(mapper)) .endpoint(sseEndpoint) .build(); - this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) - .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds > 0 ? requestTimeoutSeconds : 5)) - .capabilities(McpSchema.ClientCapabilities.builder().elicitation().build()) - .loggingConsumer(McpClientMessageHandler::handleLoggingMessage) - .elicitation(McpClientMessageHandler::handleElicitationRequest) - .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) - .build(); + + if (elicitationHandler != null) { + this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .capabilities(McpSchema.ClientCapabilities.builder().elicitation().build()) + .loggingConsumer(loggingConsumer) + .elicitation(elicitationHandler) + .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) + .build(); + } else { + this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .loggingConsumer(loggingConsumer) + .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) + .build(); + } + + } + + @Override + public String getClientId() { + return clientId; } /** @@ -70,12 +96,10 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques */ @Override public void initialize() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } + ensureNotClosed(); mcpSyncClient.initialize(); this.initialized = true; - log.info("MCP client initialized successfully."); + log.info("MCP client [{}] initialized successfully.", clientId); } /** @@ -87,27 +111,22 @@ public void initialize() { */ @Override public List getTools() { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (!this.initialized) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } + ensureReady(); try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { - log.warn("Failed to get tools from MCP server: result is null."); + log.warn("MCP client [{}] failed to get tools: result is null.", clientId); throw new IllegalStateException("Failed to get tools from MCP server: result is null."); } List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); - log.info("Successfully retrieved {} tools from MCP server.", tools.size()); - tools.forEach(tool -> log.info("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription())); + log.info("MCP client [{}] successfully retrieved {} tools.", clientId, tools.size()); + tools.forEach(tool -> log.debug("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription())); return tools; } catch (Exception e) { - log.error("Failed to get tools from MCP server: {}", e); + log.error("MCP client [{}] failed to get tools: {}", clientId, e); throw new IllegalStateException("Failed to get tools from MCP server: " + e.getMessage(), e); } } @@ -125,28 +144,43 @@ public List getTools() { */ @Override public Object callTool(String name, Map arguments) { - if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); - } - if (!this.initialized) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); - } + ensureReady(); try { - log.info("Calling tool: {} with arguments: {}", name, arguments); + log.info("MCP client [{}] calling tool: {} with arguments: {}", clientId, name, arguments); McpSchema.CallToolResult result = this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments)); if (result == null) { - log.error("Failed to call tool '{}': result is null.", name); + log.error("MCP client [{}] failed to call tool '{}': result is null.", clientId, name); throw new IllegalStateException("Failed to call tool '" + name + "': result is null."); } return processToolResult(result, name); } catch (Exception e) { - log.error("Failed to call tool '{}' from MCP server.", name, e); + log.error("MCP client [{}] failed to call tool '{}': {}", clientId, name, e); throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); } } + /** + * Builds an error message from tool result content. + * + * @param name The name of the tool that was called. + * @param content The content list from the tool result. + * @return The formatted error message. + */ + private String buildToolErrorMessage(String name, List content) { + String errorMsg = "Tool '" + name + "' returned an error"; + if (content != null && !content.isEmpty()) { + McpSchema.Content errorContent = content.get(0); + if (errorContent instanceof McpSchema.TextContent textContent) { + errorMsg += ": " + textContent.text(); + } else { + errorMsg += ": " + errorContent; + } + } + return errorMsg; + } + /** * Processes the tool call result and extracts the content. * Handles error cases and different content types (text, image, etc.). @@ -160,33 +194,25 @@ public Object callTool(String name, Map arguments) { */ private Object processToolResult(McpSchema.CallToolResult result, String name) { if (result.isError() != null && result.isError()) { - String errorMsg = "Tool '" + name + "' returned an error"; - if (result.content() != null && !result.content().isEmpty()) { - Object errorContent = result.content().get(0); - if (errorContent instanceof McpSchema.TextContent textContent) { - errorMsg += ": " + textContent.text(); - } else { - errorMsg += ": " + errorContent; - } - } - log.error(errorMsg); + String errorMsg = buildToolErrorMessage(name, result.content()); + log.error("MCP client [{}]: {}", clientId, errorMsg); throw new IllegalStateException(errorMsg); } if (result.content() == null || result.content().isEmpty()) { - log.warn("Tool '{}' returned empty content.", name); + log.warn("MCP client [{}] tool '{}' returned empty content.", clientId, name); return null; } Object content = result.content().get(0); if (content instanceof McpSchema.TextContent textContent) { - log.info("Successfully called tool '{}', result: {}", name, textContent.text()); + log.info("MCP client [{}] successfully called tool '{}', result: {}", clientId, name, textContent.text()); return textContent.text(); } else if (content instanceof McpSchema.ImageContent imageContent) { - log.info("Successfully called tool '{}', returned image content.", name); + log.info("MCP client [{}] successfully called tool '{}', returned image content.", clientId, name); return imageContent; } else { - log.info("Successfully called tool '{}', content type: {}", name, content.getClass().getSimpleName()); + log.info("MCP client [{}] successfully called tool '{}', content type: {}", clientId, name, content.getClass().getSimpleName()); return content; } } @@ -198,9 +224,10 @@ private Object processToolResult(McpSchema.CallToolResult result, String name) { */ @Override public void close() throws IOException { + ensureNotClosed(); this.closed = true; this.mcpSyncClient.closeGracefully(); - log.info("MCP client closed."); + log.info("MCP client [{}] closed.", clientId); } /** @@ -230,4 +257,27 @@ private Tool convertToFelTool(McpSchema.Tool mcpTool) { return tool; } + + /** + * Ensures the MCP client is not closed. + * + * @throws IllegalStateException if the client is closed. + */ + private void ensureNotClosed() { + if (this.closed) { + throw new IllegalStateException("The MCP client is closed."); + } + } + + /** + * Ensures the MCP client is ready for operations (not closed and initialized). + * + * @throws IllegalStateException if the client is closed or not initialized. + */ + private void ensureReady() { + ensureNotClosed(); + if (!this.initialized) { + throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); + } + } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java deleted file mode 100644 index 88949b96..00000000 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/McpClientMessageHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/*--------------------------------------------------------------------------------------------- - * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. - * This file is a part of the ModelEngine Project. - * Licensed under the MIT License. See License.txt in the project root for license information. - *--------------------------------------------------------------------------------------------*/ - -package modelengine.fel.tool.mcp.client.support; - -import io.modelcontextprotocol.spec.McpSchema; -import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.log.Logger; - -import java.util.HashMap; -import java.util.Map; -import java.util.Scanner; - -/** - * Handles MCP client messages received from MCP server, - * including logging notifications and elicitation requests. - * - * @author 黄可欣 - * @since 2025-11-03 - */ -@Component -public class McpClientMessageHandler { - private static final Logger log = Logger.get(McpClientMessageHandler.class); - - /** - * Handles logging messages received from the MCP server. - * - * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. - */ - public static void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { - log.info("[Client] log: {}-{}", notification.level(), notification.data()); - } - - /** - * Handles elicitation requests from the MCP server. - * - * @param request The {@link McpSchema.ElicitRequest} containing the elicitation message and schema. - * @return A {@link McpSchema.ElicitResult} with the action {@code ACCEPT} and any collected user data. - */ - public static McpSchema.ElicitResult handleElicitationRequest(McpSchema.ElicitRequest request) { - Map schema = request.requestedSchema(); - Map userData = new HashMap<>(); - - log.info("[Client]get elicitation: {}", request.message()); - if (schema != null && schema.containsKey("properties")) { - Map properties = (Map) schema.get("properties"); - if (properties.containsKey("message")) { - log.info("[ElicitationRequest] Please input additional message: "); - Scanner scanner = new Scanner(System.in); - String input = scanner.nextLine(); - userData.put("message", input); - } - } - return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); - } -} diff --git a/framework/fel/java/plugins/tool-mcp-server/pom.xml b/framework/fel/java/plugins/tool-mcp-server/pom.xml index b6072ea4..c0cc60f0 100644 --- a/framework/fel/java/plugins/tool-mcp-server/pom.xml +++ b/framework/fel/java/plugins/tool-mcp-server/pom.xml @@ -44,7 +44,7 @@ io.modelcontextprotocol.sdk mcp - 0.14.1 + 0.15.0 diff --git a/framework/fel/java/services/tool-mcp-client-service/pom.xml b/framework/fel/java/services/tool-mcp-client-service/pom.xml index 8a2fb8f1..a2f0cb0a 100644 --- a/framework/fel/java/services/tool-mcp-client-service/pom.xml +++ b/framework/fel/java/services/tool-mcp-client-service/pom.xml @@ -33,6 +33,11 @@ org.fitframework.fel tool-mcp-common + + io.modelcontextprotocol.sdk + mcp + 0.15.0 + diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java index 6a50201b..98318b0a 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClient.java @@ -22,6 +22,13 @@ * @since 2025-05-21 */ public interface McpClient extends Closeable { + /** + * Gets the unique identifier of this MCP client instance. + * + * @return The client ID as a {@link String}. + */ + String getClientId(); + /** * Initializes the MCP Client. */ diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java index d52c639d..c6dfc6fb 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java @@ -6,6 +6,11 @@ package modelengine.fel.tool.mcp.client; +import io.modelcontextprotocol.spec.McpSchema; + +import java.util.function.Consumer; +import java.util.function.Function; + /** * Indicates the factory of {@link McpClient}. *

@@ -16,11 +21,43 @@ */ public interface McpClientFactory { /** - * Creates a {@link McpClient} instance. + * Creates a {@link McpClient} instance with default logging consumer but without elicitation ability. * * @param baseUri The base URI of the MCP server. * @param sseEndpoint The SSE endpoint of the MCP server. - * @return The connected {@link McpClient} instance. + * @return The connected {@link McpClient} instance with default logging consumer but without elicitation ability. */ McpClient create(String baseUri, String sseEndpoint); + + /** + * Creates a {@link McpClient} instance with custom logging consumer but without elicitation ability. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @param loggingConsumer The consumer to handle logging messages from the MCP server. + * @return The connected {@link McpClient} instance with custom logging consumer but without elicitation ability. + */ + McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer); + + /** + * Creates a {@link McpClient} instance with default logging consumer and elicitation ability. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @param elicitationHandler The function to handle elicitation requests from the MCP server. + * @return The connected {@link McpClient} instance with default logging consumer and elicitation ability. + */ + McpClient create(String baseUri, String sseEndpoint, Function elicitationHandler); + + /** + * Creates a {@link McpClient} instance with custom message handlers and elicitation ability. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @param loggingConsumer The consumer to handle logging messages from the MCP server. + * @param elicitationHandler The function to handle elicitation requests from the MCP server. + * @return The connected {@link McpClient} instance with custom message handlers and elicitation ability. + */ + McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer, + Function elicitationHandler); } \ No newline at end of file From 9d5e552ce3cea6dfafaa9177889ed49704e0e8e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9C=C3=A2=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sat, 8 Nov 2025 22:18:43 +0800 Subject: [PATCH 10/16] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E6=94=B9log=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpClientFactory.java | 11 ++- .../DefaultMcpClientMessageHandler.java | 4 +- .../support/DefaultMcpStreamableClient.java | 91 ++++++++++--------- .../fel/tool/mcp/test/TestController.java | 86 +++++++++++++++++- .../fel/tool/mcp/client/McpClientFactory.java | 6 +- 5 files changed, 146 insertions(+), 52 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 4791828d..202b4b65 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -41,13 +41,18 @@ public McpClient create(String baseUri, String sseEndpoint) { } @Override - public McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer) { + public McpClient create(String baseUri, String sseEndpoint, + Consumer loggingConsumer) { return create(baseUri, sseEndpoint, loggingConsumer, null); } @Override - public McpClient create(String baseUri, String sseEndpoint, Function elicitationHandler) { - return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, elicitationHandler); + public McpClient create(String baseUri, String sseEndpoint, + Function elicitationHandler) { + return create(baseUri, + sseEndpoint, + DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, + elicitationHandler); } @Override diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java index 091b100a..f8209e33 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java @@ -31,6 +31,8 @@ public class DefaultMcpClientMessageHandler { * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. */ public static void defaultLoggingMessageHandler(McpSchema.LoggingMessageNotification notification) { - log.info("[Client] log: {}-{}", notification.level(), notification.data()); + log.info("Received logging message from MCP server. [level={}, data={}]", + notification.level(), + notification.data()); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 3565096d..88cf6891 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -58,7 +58,7 @@ public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int reques this.clientId = UUID.randomUUID().toString(); notBlank(baseUri, "The MCP server base URI cannot be blank."); notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); - log.info("Creating MCP client [{}] for server: {}", clientId, baseUri); + log.info("Creating MCP client. [clientId={}, baseUri={}]", clientId, baseUri); ObjectMapper mapper = new ObjectMapper(); HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) .jsonMapper(new JacksonMcpJsonMapper(mapper)) @@ -99,7 +99,7 @@ public void initialize() { ensureNotClosed(); mcpSyncClient.initialize(); this.initialized = true; - log.info("MCP client [{}] initialized successfully.", clientId); + log.info("MCP client initialized successfully. [clientId={}]", clientId); } /** @@ -112,22 +112,23 @@ public void initialize() { @Override public List getTools() { ensureReady(); - try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { - log.warn("MCP client [{}] failed to get tools: result is null.", clientId); + log.warn("Failed to get tools: result is null. [clientId={}]", clientId); throw new IllegalStateException("Failed to get tools from MCP server: result is null."); } List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); - log.info("MCP client [{}] successfully retrieved {} tools.", clientId, tools.size()); - tools.forEach(tool -> log.debug("Tool - Name: {}, Description: {}", tool.getName(), tool.getDescription())); + log.info("Successfully retrieved tools. [clientId={}, count={}]", clientId, tools.size()); + tools.forEach(tool -> log.debug("Tool information. [name={}, description={}]", + tool.getName(), + tool.getDescription())); return tools; } catch (Exception e) { - log.error("MCP client [{}] failed to get tools: {}", clientId, e); - throw new IllegalStateException("Failed to get tools from MCP server: " + e.getMessage(), e); + log.error("Failed to get tools. [clientId={}, error={}]", clientId, e.getMessage()); + throw new IllegalStateException("Failed to get tools from MCP server. [error=" + e.getMessage() + "]", e); } } @@ -146,39 +147,20 @@ public List getTools() { public Object callTool(String name, Map arguments) { ensureReady(); try { - log.info("MCP client [{}] calling tool: {} with arguments: {}", clientId, name, arguments); + log.info("Calling tool. [clientId={}, name={}, arguments={}]", clientId, name, arguments); McpSchema.CallToolResult result = this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments)); if (result == null) { - log.error("MCP client [{}] failed to call tool '{}': result is null.", clientId, name); - throw new IllegalStateException("Failed to call tool '" + name + "': result is null."); + log.error("Failed to call tool: result is null. [clientId={}, name={}]", clientId, name); + throw new IllegalStateException("Failed to call tool: result is null. [name=" + name + "]"); } return processToolResult(result, name); } catch (Exception e) { - log.error("MCP client [{}] failed to call tool '{}': {}", clientId, name, e); - throw new IllegalStateException("Failed to call tool '" + name + "': " + e.getMessage(), e); - } - } - - /** - * Builds an error message from tool result content. - * - * @param name The name of the tool that was called. - * @param content The content list from the tool result. - * @return The formatted error message. - */ - private String buildToolErrorMessage(String name, List content) { - String errorMsg = "Tool '" + name + "' returned an error"; - if (content != null && !content.isEmpty()) { - McpSchema.Content errorContent = content.get(0); - if (errorContent instanceof McpSchema.TextContent textContent) { - errorMsg += ": " + textContent.text(); - } else { - errorMsg += ": " + errorContent; - } + log.error("Failed to call tool. [clientId={}, name={}, error={}]", clientId, name, e.getMessage()); + throw new IllegalStateException("Failed to call tool. [name=" + name + ", error=" + e.getMessage() + "]", + e); } - return errorMsg; } /** @@ -194,25 +176,29 @@ private String buildToolErrorMessage(String name, List conten */ private Object processToolResult(McpSchema.CallToolResult result, String name) { if (result.isError() != null && result.isError()) { - String errorMsg = buildToolErrorMessage(name, result.content()); - log.error("MCP client [{}]: {}", clientId, errorMsg); - throw new IllegalStateException(errorMsg); + String errorDetails = extractErrorDetails(result.content()); + log.error("Tool returned an error. [clientId={}, name={}, details={}]", clientId, name, errorDetails); + throw new IllegalStateException( + "Tool returned an error. [name=" + name + ", details=" + errorDetails + "]"); } if (result.content() == null || result.content().isEmpty()) { - log.warn("MCP client [{}] tool '{}' returned empty content.", clientId, name); + log.warn("Tool returned empty content. [clientId={}, name={}]", clientId, name); return null; } Object content = result.content().get(0); if (content instanceof McpSchema.TextContent textContent) { - log.info("MCP client [{}] successfully called tool '{}', result: {}", clientId, name, textContent.text()); + log.info("Successfully called tool. [clientId={}, name={}, result={}]", clientId, name, textContent.text()); return textContent.text(); } else if (content instanceof McpSchema.ImageContent imageContent) { - log.info("MCP client [{}] successfully called tool '{}', returned image content.", clientId, name); + log.info("Successfully called tool: image content. [clientId={}, name={}]", clientId, name); return imageContent; } else { - log.info("MCP client [{}] successfully called tool '{}', content type: {}", clientId, name, content.getClass().getSimpleName()); + log.info("Successfully called tool. [clientId={}, name={}, contentType={}]", + clientId, + name, + content.getClass().getSimpleName()); return content; } } @@ -227,7 +213,7 @@ public void close() throws IOException { ensureNotClosed(); this.closed = true; this.mcpSyncClient.closeGracefully(); - log.info("MCP client [{}] closed.", clientId); + log.info("MCP client closed. [clientId={}]", clientId); } /** @@ -265,7 +251,7 @@ private Tool convertToFelTool(McpSchema.Tool mcpTool) { */ private void ensureNotClosed() { if (this.closed) { - throw new IllegalStateException("The MCP client is closed."); + throw new IllegalStateException("The MCP client is closed. [clientId=" + clientId + "]"); } } @@ -277,7 +263,26 @@ private void ensureNotClosed() { private void ensureReady() { ensureNotClosed(); if (!this.initialized) { - throw new IllegalStateException("MCP client is not initialized. Please wait a moment."); + throw new IllegalStateException( + "MCP client is not initialized. Please wait a moment. [clientId=" + clientId + "]"); + } + } + + /** + * Extracts error details from tool result content. + * + * @param content The content list from the tool result. + * @return The error details as a string. + */ + private String extractErrorDetails(List content) { + if (content != null && !content.isEmpty()) { + McpSchema.Content errorContent = content.get(0); + if (errorContent instanceof McpSchema.TextContent textContent) { + return textContent.text(); + } else { + return errorContent.toString(); + } } + return ""; } } diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index f6cb6b39..fc765f10 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -6,6 +6,7 @@ package modelengine.fel.tool.mcp.test; +import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fel.tool.mcp.entity.Tool; @@ -15,8 +16,10 @@ import modelengine.fit.http.annotation.RequestMapping; import modelengine.fit.http.annotation.RequestQuery; import modelengine.fitframework.annotation.Component; +import modelengine.fitframework.log.Logger; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,6 +33,8 @@ @Component @RequestMapping(path = "/mcp-test") public class TestController { + private static final Logger log = Logger.get(TestController.class); + private final McpClientFactory mcpClientFactory; private McpClient client; @@ -43,10 +48,12 @@ public TestController(McpClientFactory mcpClientFactory) { } /** - * Initializes the MCP client by creating an instance using the provided factory and initializing it. - * This method sets up the connection to the MCP server and prepares it for further interactions. + * Initializes the MCP client with default settings (default logging, no elicitation). + * This method creates an instance using the provided factory and initializes it. * - * @return A string indicating that the initialization was successful. + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @return A map with clientId and status message. */ @PostMapping(path = "/initialize") public String initialize(@RequestQuery(name = "baseUri") String baseUri, @@ -56,6 +63,55 @@ public String initialize(@RequestQuery(name = "baseUri") String baseUri, return "Initialized"; } + /** + * Initializes the MCP client with custom logging consumer but without elicitation. + * This demonstrates using a custom logging handler. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @return A string indicating that the initialization was successful. + */ + @PostMapping(path = "/initialize-with-log") + public String initializeWithCustomLogging(@RequestQuery(name = "baseUri") String baseUri, + @RequestQuery(name = "sseEndpoint") String sseEndpoint) { + this.client = this.mcpClientFactory.create(baseUri, sseEndpoint, this::loggingConsumer); + this.client.initialize(); + return "Initialized with custom logging"; + } + + /** + * Initializes the MCP client with elicitation capability. + * This demonstrates enabling elicitation with default logging. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @return A string indicating that the initialization was successful. + */ + @PostMapping(path = "/initialize-with-elicitation") + public String initializeWithElicitation(@RequestQuery(name = "baseUri") String baseUri, + @RequestQuery(name = "sseEndpoint") String sseEndpoint) { + this.client = this.mcpClientFactory.create(baseUri, sseEndpoint, this::elicitationHandler); + this.client.initialize(); + return "Initialized with elicitation"; + } + + /** + * Initializes the MCP client with both custom logging and elicitation. + * This demonstrates full customization of the client. + * + * @param baseUri The base URI of the MCP server. + * @param sseEndpoint The SSE endpoint of the MCP server. + * @return A string indicating that the initialization was successful. + */ + @PostMapping(path = "/initialize-full") + public String initializeFullCustom(@RequestQuery(name = "baseUri") String baseUri, + @RequestQuery(name = "sseEndpoint") String sseEndpoint) { + this.client = + this.mcpClientFactory.create(baseUri, sseEndpoint, this::loggingConsumer, this::elicitationHandler); + this.client.initialize(); + return "Initialized with full customization"; + } + /** * Closes the MCP client and releases any resources associated with it. * This method ensures that the MCP client is properly closed and resources are released. @@ -95,4 +151,28 @@ public List toolsList() { public Object toolsCall(@RequestQuery(name = "name") String name, @RequestBody Map jsonArgs) { return this.client.callTool(name, jsonArgs); } + + /** + * Custom logging consumer for MCP client. + * + * @param notification The logging message notification from MCP server. + */ + private void loggingConsumer(McpSchema.LoggingMessageNotification notification) { + log.info("Custom logging handler received message. [level={}, data={}]", + notification.level(), + notification.data()); + } + + /** + * Elicitation handler for MCP client. + * + * @param request The elicitation request from MCP server. + * @return The elicitation result with action and user data. + */ + private McpSchema.ElicitResult elicitationHandler(McpSchema.ElicitRequest request) { + log.info("Elicitation request received. [message={}, schema={}]", request.message(), request.requestedSchema()); + Map userData = new HashMap<>(); + userData.put("response", "Auto-accepted by test controller"); + return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); + } } \ No newline at end of file diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java index c6dfc6fb..8f0a87d4 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java @@ -37,7 +37,8 @@ public interface McpClientFactory { * @param loggingConsumer The consumer to handle logging messages from the MCP server. * @return The connected {@link McpClient} instance with custom logging consumer but without elicitation ability. */ - McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer); + McpClient create(String baseUri, String sseEndpoint, + Consumer loggingConsumer); /** * Creates a {@link McpClient} instance with default logging consumer and elicitation ability. @@ -47,7 +48,8 @@ public interface McpClientFactory { * @param elicitationHandler The function to handle elicitation requests from the MCP server. * @return The connected {@link McpClient} instance with default logging consumer and elicitation ability. */ - McpClient create(String baseUri, String sseEndpoint, Function elicitationHandler); + McpClient create(String baseUri, String sseEndpoint, + Function elicitationHandler); /** * Creates a {@link McpClient} instance with custom message handlers and elicitation ability. From 2cb20b36e70c65cbb69ffe64c10eea0a35a7cb6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9C=C3=A2=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sat, 8 Nov 2025 22:42:01 +0800 Subject: [PATCH 11/16] =?UTF-8?q?=E4=BF=AE=E6=94=B9client=E5=92=8Cserver?= =?UTF-8?q?=E9=83=A8=E5=88=86=E6=97=A5=E5=BF=97=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../client/support/DefaultMcpStreamableClient.java | 12 ++++++------ .../FitMcpStreamableServerTransportProvider.java | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 88cf6891..5e028706 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -115,19 +115,19 @@ public List getTools() { try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { - log.warn("Failed to get tools: result is null. [clientId={}]", clientId); - throw new IllegalStateException("Failed to get tools from MCP server: result is null."); + log.warn("Failed to get tools list: result is null. [clientId={}]", clientId); + throw new IllegalStateException("Failed to get tools list from MCP server: result is null."); } List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); - log.info("Successfully retrieved tools. [clientId={}, count={}]", clientId, tools.size()); + log.info("Successfully retrieved tools list. [clientId={}, count={}]", clientId, tools.size()); tools.forEach(tool -> log.debug("Tool information. [name={}, description={}]", tool.getName(), tool.getDescription())); return tools; } catch (Exception e) { - log.error("Failed to get tools. [clientId={}, error={}]", clientId, e.getMessage()); + log.error("Failed to get tools list. [clientId={}, error={}]", clientId, e.getMessage()); throw new IllegalStateException("Failed to get tools from MCP server. [error=" + e.getMessage() + "]", e); } } @@ -251,7 +251,7 @@ private Tool convertToFelTool(McpSchema.Tool mcpTool) { */ private void ensureNotClosed() { if (this.closed) { - throw new IllegalStateException("The MCP client is closed. [clientId=" + clientId + "]"); + throw new IllegalStateException("The MCP client is already closed. [clientId=" + clientId + "]"); } } @@ -264,7 +264,7 @@ private void ensureReady() { ensureNotClosed(); if (!this.initialized) { throw new IllegalStateException( - "MCP client is not initialized. Please wait a moment. [clientId=" + clientId + "]"); + "MCP client is not initialized. [clientId=" + clientId + "]"); } } diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java index 324c427d..d3da8acf 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java @@ -204,7 +204,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[GET] Handling GET request for session: {}", sessionId); + logger.info("[GET] Receiving GET request from session: {}", sessionId); McpTransportContext transportContext = this.contextExtractor.extract(request); try { @@ -392,7 +392,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { String lastId = request.headers().first(HttpHeaders.LAST_EVENT_ID).orElse("0"); - logger.info("[GET] Receiving replay request from session: {}", sessionId); + logger.info("[GET] Handling replay request from session: {}", sessionId); try { session.replay(lastId) @@ -426,7 +426,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo */ private void handleEstablishSseRequest(String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { - logger.info("[GET] Receiving Get request to establish new SSE for session: {}", sessionId); + logger.info("[GET] Handling Get request to establish new SSE for session: {}", sessionId); McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session.listeningStream(sessionTransport); @@ -643,7 +643,7 @@ private class FitStreamableMcpSessionTransport implements McpStreamableServerTra this.sessionId = sessionId; this.emitter = emitter; this.response = response; - logger.info("[SSE] Building SSE for session: {} ", sessionId); + logger.info("[SSE] Building SSE emitter for session: {} ", sessionId); } /** From e492942cd8a63e38f085fd1ca559fc66aec8dcbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sun, 9 Nov 2025 15:55:47 +0800 Subject: [PATCH 12/16] =?UTF-8?q?=E7=A7=BB=E9=99=A4elicitation=E8=83=BD?= =?UTF-8?q?=E5=8A=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpClientFactory.java | 32 +----- ...r.java => DefaultMcpClientLogHandler.java} | 26 +++-- .../support/DefaultMcpStreamableClient.java | 97 +++++++++---------- ...tMcpStreamableServerTransportProvider.java | 92 +++++++++++------- .../fel/tool/mcp/test/TestController.java | 51 +--------- .../fel/tool/mcp/client/McpClientFactory.java | 43 +------- 6 files changed, 121 insertions(+), 220 deletions(-) rename framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/{DefaultMcpClientMessageHandler.java => DefaultMcpClientLogHandler.java} (63%) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java index 202b4b65..330a3172 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientFactory.java @@ -6,15 +6,11 @@ package modelengine.fel.tool.mcp.client.support; -import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fitframework.annotation.Component; import modelengine.fitframework.annotation.Value; -import java.util.function.Consumer; -import java.util.function.Function; - /** * Represents a factory for creating instances of the {@link DefaultMcpStreamableClient}. * This class is responsible for initializing and configuring. @@ -37,32 +33,6 @@ public DefaultMcpClientFactory(@Value("${mcp.client.request.timeout-seconds}") i @Override public McpClient create(String baseUri, String sseEndpoint) { - return create(baseUri, sseEndpoint, DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, null); - } - - @Override - public McpClient create(String baseUri, String sseEndpoint, - Consumer loggingConsumer) { - return create(baseUri, sseEndpoint, loggingConsumer, null); - } - - @Override - public McpClient create(String baseUri, String sseEndpoint, - Function elicitationHandler) { - return create(baseUri, - sseEndpoint, - DefaultMcpClientMessageHandler::defaultLoggingMessageHandler, - elicitationHandler); - } - - @Override - public McpClient create(String baseUri, String sseEndpoint, - Consumer loggingConsumer, - Function elicitationHandler) { - return new DefaultMcpStreamableClient(baseUri, - sseEndpoint, - requestTimeoutSeconds, - loggingConsumer, - elicitationHandler); + return new DefaultMcpStreamableClient(baseUri, sseEndpoint, requestTimeoutSeconds); } } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java similarity index 63% rename from framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java rename to framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java index f8209e33..bffb88df 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientMessageHandler.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClientLogHandler.java @@ -7,13 +7,8 @@ package modelengine.fel.tool.mcp.client.support; import io.modelcontextprotocol.spec.McpSchema; -import modelengine.fitframework.annotation.Component; import modelengine.fitframework.log.Logger; -import java.util.HashMap; -import java.util.Map; -import java.util.Scanner; - /** * Handles MCP client messages received from MCP server, * including logging notifications and elicitation requests. @@ -21,17 +16,28 @@ * @author 黄可欣 * @since 2025-11-03 */ -@Component -public class DefaultMcpClientMessageHandler { - private static final Logger log = Logger.get(DefaultMcpClientMessageHandler.class); +public class DefaultMcpClientLogHandler { + private static final Logger log = Logger.get(DefaultMcpClientLogHandler.class); + private final String clientId; + + /** + * Constructs a new instance of DefaultMcpClientLogHandler. + * + * @param clientId The unique identifier of the MCP client. + */ + public DefaultMcpClientLogHandler(String clientId) { + this.clientId = clientId; + } /** * Handles logging messages received from the MCP server. + * Includes the client UUID in the log message for tracking. * * @param notification The {@link McpSchema.LoggingMessageNotification} containing the log level and data. */ - public static void defaultLoggingMessageHandler(McpSchema.LoggingMessageNotification notification) { - log.info("Received logging message from MCP server. [level={}, data={}]", + public void handleLoggingMessage(McpSchema.LoggingMessageNotification notification) { + log.info("Received logging message from MCP server. [clientId={}, level={}, data={}]", + this.clientId, notification.level(), notification.data()); } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 5e028706..9546c0f0 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -18,15 +18,14 @@ import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.entity.Tool; import modelengine.fitframework.log.Logger; +import modelengine.fitframework.util.StringUtils; +import modelengine.fitframework.util.UuidUtils; import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.UUID; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -40,6 +39,8 @@ public class DefaultMcpStreamableClient implements McpClient { private final String clientId; private final McpSyncClient mcpSyncClient; + private final DefaultMcpClientLogHandler logHandler; + private volatile boolean initialized = false; private volatile boolean closed = false; @@ -49,44 +50,30 @@ public class DefaultMcpStreamableClient implements McpClient { * @param baseUri The base URI of the MCP server. * @param sseEndpoint The endpoint for the Server-Sent Events (SSE) connection. * @param requestTimeoutSeconds The timeout duration of requests. Units: seconds. - * @param loggingConsumer The consumer to handle logging messages from the MCP server. - * @param elicitationHandler The function to handle elicitation requests from the MCP server. */ - public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds, - Consumer loggingConsumer, - Function elicitationHandler) { - this.clientId = UUID.randomUUID().toString(); + public DefaultMcpStreamableClient(String baseUri, String sseEndpoint, int requestTimeoutSeconds) { + this.clientId = UuidUtils.randomUuidString(); notBlank(baseUri, "The MCP server base URI cannot be blank."); notBlank(sseEndpoint, "The MCP server SSE endpoint cannot be blank."); - log.info("Creating MCP client. [clientId={}, baseUri={}]", clientId, baseUri); + log.info("Creating MCP client. [clientId={}, baseUri={}]", this.clientId, baseUri); ObjectMapper mapper = new ObjectMapper(); HttpClientStreamableHttpTransport transport = HttpClientStreamableHttpTransport.builder(baseUri) .jsonMapper(new JacksonMcpJsonMapper(mapper)) .endpoint(sseEndpoint) .build(); - if (elicitationHandler != null) { - this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) - .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) - .capabilities(McpSchema.ClientCapabilities.builder().elicitation().build()) - .loggingConsumer(loggingConsumer) - .elicitation(elicitationHandler) - .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) - .build(); - } else { - this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) - .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) - .capabilities(McpSchema.ClientCapabilities.builder().build()) - .loggingConsumer(loggingConsumer) - .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) - .build(); - } - + this.logHandler = new DefaultMcpClientLogHandler(this.clientId); + this.mcpSyncClient = io.modelcontextprotocol.client.McpClient.sync(transport) + .requestTimeout(Duration.ofSeconds(requestTimeoutSeconds)) + .capabilities(McpSchema.ClientCapabilities.builder().build()) + .loggingConsumer(this.logHandler::handleLoggingMessage) + .jsonSchemaValidator(new DefaultJsonSchemaValidator(mapper)) + .build(); } @Override public String getClientId() { - return clientId; + return this.clientId; } /** @@ -97,9 +84,9 @@ public String getClientId() { @Override public void initialize() { ensureNotClosed(); - mcpSyncClient.initialize(); + this.mcpSyncClient.initialize(); this.initialized = true; - log.info("MCP client initialized successfully. [clientId={}]", clientId); + log.info("MCP client initialized successfully. [clientId={}]", this.clientId); } /** @@ -115,20 +102,21 @@ public List getTools() { try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { - log.warn("Failed to get tools list: result is null. [clientId={}]", clientId); + log.warn("Failed to get tools list: result is null. [clientId={}]", this.clientId); throw new IllegalStateException("Failed to get tools list from MCP server: result is null."); } List tools = result.tools().stream().map(this::convertToFelTool).collect(Collectors.toList()); - log.info("Successfully retrieved tools list. [clientId={}, count={}]", clientId, tools.size()); + log.info("Successfully retrieved tools list. [clientId={}, count={}]", this.clientId, tools.size()); tools.forEach(tool -> log.debug("Tool information. [name={}, description={}]", tool.getName(), tool.getDescription())); return tools; } catch (Exception e) { - log.error("Failed to get tools list. [clientId={}, error={}]", clientId, e.getMessage()); - throw new IllegalStateException("Failed to get tools from MCP server. [error=" + e.getMessage() + "]", e); + log.error("Failed to get tools list. [clientId={}, error={}]", this.clientId, e.getMessage()); + throw new IllegalStateException(StringUtils.format("Failed to get tools from MCP server. [error={0}]", + e.getMessage()), e); } } @@ -147,19 +135,21 @@ public List getTools() { public Object callTool(String name, Map arguments) { ensureReady(); try { - log.info("Calling tool. [clientId={}, name={}, arguments={}]", clientId, name, arguments); + log.info("Calling tool. [clientId={}, name={}, arguments={}]", this.clientId, name, arguments); McpSchema.CallToolResult result = this.mcpSyncClient.callTool(new McpSchema.CallToolRequest(name, arguments)); if (result == null) { - log.error("Failed to call tool: result is null. [clientId={}, name={}]", clientId, name); - throw new IllegalStateException("Failed to call tool: result is null. [name=" + name + "]"); + log.error("Failed to call tool: result is null. [clientId={}, name={}]", this.clientId, name); + throw new IllegalStateException(StringUtils.format("Failed to call tool: result is null. [name={0}]", + name)); } return processToolResult(result, name); } catch (Exception e) { - log.error("Failed to call tool. [clientId={}, name={}, error={}]", clientId, name, e.getMessage()); - throw new IllegalStateException("Failed to call tool. [name=" + name + ", error=" + e.getMessage() + "]", - e); + log.error("Failed to call tool. [clientId={}, name={}, error={}]", this.clientId, name, e.getMessage()); + throw new IllegalStateException(StringUtils.format("Failed to call tool. [name={0}, error={1}]", + name, + e.getMessage()), e); } } @@ -177,26 +167,30 @@ public Object callTool(String name, Map arguments) { private Object processToolResult(McpSchema.CallToolResult result, String name) { if (result.isError() != null && result.isError()) { String errorDetails = extractErrorDetails(result.content()); - log.error("Tool returned an error. [clientId={}, name={}, details={}]", clientId, name, errorDetails); - throw new IllegalStateException( - "Tool returned an error. [name=" + name + ", details=" + errorDetails + "]"); + log.error("Tool returned an error. [clientId={}, name={}, details={}]", this.clientId, name, errorDetails); + throw new IllegalStateException(StringUtils.format("Tool returned an error. [name={0}, details={1}]", + name, + errorDetails)); } if (result.content() == null || result.content().isEmpty()) { - log.warn("Tool returned empty content. [clientId={}, name={}]", clientId, name); + log.warn("Tool returned empty content. [clientId={}, name={}]", this.clientId, name); return null; } Object content = result.content().get(0); if (content instanceof McpSchema.TextContent textContent) { - log.info("Successfully called tool. [clientId={}, name={}, result={}]", clientId, name, textContent.text()); + log.info("Successfully called tool. [clientId={}, name={}, result={}]", + this.clientId, + name, + textContent.text()); return textContent.text(); } else if (content instanceof McpSchema.ImageContent imageContent) { - log.info("Successfully called tool: image content. [clientId={}, name={}]", clientId, name); + log.info("Successfully called tool: image content. [clientId={}, name={}]", this.clientId, name); return imageContent; } else { log.info("Successfully called tool. [clientId={}, name={}, contentType={}]", - clientId, + this.clientId, name, content.getClass().getSimpleName()); return content; @@ -213,7 +207,7 @@ public void close() throws IOException { ensureNotClosed(); this.closed = true; this.mcpSyncClient.closeGracefully(); - log.info("MCP client closed. [clientId={}]", clientId); + log.info("MCP client closed. [clientId={}]", this.clientId); } /** @@ -251,7 +245,8 @@ private Tool convertToFelTool(McpSchema.Tool mcpTool) { */ private void ensureNotClosed() { if (this.closed) { - throw new IllegalStateException("The MCP client is already closed. [clientId=" + clientId + "]"); + throw new IllegalStateException(StringUtils.format("The MCP client is already closed. [clientId={0}]", + this.clientId)); } } @@ -263,8 +258,8 @@ private void ensureNotClosed() { private void ensureReady() { ensureNotClosed(); if (!this.initialized) { - throw new IllegalStateException( - "MCP client is not initialized. [clientId=" + clientId + "]"); + throw new IllegalStateException(StringUtils.format("MCP client is not initialized. [clientId={0}]", + this.clientId)); } } diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java index d3da8acf..9bf5bbfe 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/transport/FitMcpStreamableServerTransportProvider.java @@ -133,18 +133,21 @@ public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) @Override public Mono notifyClients(String method, Object params) { if (this.sessions.isEmpty()) { - logger.debug("No active sessions to broadcast message to"); + logger.debug("No active sessions to broadcast message."); return Mono.empty(); } - logger.info("Attempting to broadcast message to {} active sessions", this.sessions.size()); + logger.info("Attempting to broadcast message. [sessionCount={}]", this.sessions.size()); return Mono.fromRunnable(() -> { this.sessions.values().parallelStream().forEach(session -> { try { session.sendNotification(method, params).block(); } catch (Exception e) { - logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage(), e); + logger.error("Failed to send message to session. [sessionId={}, error={}]", + session.getId(), + e.getMessage(), + e); } }); }); @@ -159,18 +162,21 @@ public Mono notifyClients(String method, Object params) { public Mono closeGracefully() { return Mono.fromRunnable(() -> { this.isClosing = true; - logger.info("Initiating graceful shutdown with {} active sessions", this.sessions.size()); + logger.info("Initiating graceful shutdown. [sessionCount={}]", this.sessions.size()); this.sessions.values().parallelStream().forEach(session -> { try { session.closeGracefully().block(); } catch (Exception e) { - logger.error("Failed to close session {}: {}", session.getId(), e.getMessage(), e); + logger.error("Failed to close session. [sessionId={}, error={}]", + session.getId(), + e.getMessage(), + e); } }); this.sessions.clear(); - logger.info("Graceful shutdown completed"); + logger.info("Graceful shutdown completed."); }).then().doOnSuccess(v -> { if (this.keepAliveScheduler != null) { this.keepAliveScheduler.shutdown(); @@ -204,7 +210,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[GET] Receiving GET request from session: {}", sessionId); + logger.info("[GET] Receiving GET request. [sessionId={}]", sessionId); McpTransportContext transportContext = this.contextExtractor.extract(request); try { @@ -220,7 +226,7 @@ public Object handleGet(HttpClassicServerRequest request, HttpClassicServerRespo } }); } catch (Exception e) { - logger.error("Failed to handle GET request for session {}: {}", sessionId, e.getMessage(), e); + logger.error("[GET] Failed to handle GET request. [sessionId={}, error={}]", sessionId, e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return null; } @@ -252,18 +258,18 @@ public Object handlePost(HttpClassicServerRequest request, HttpClassicServerResp // Handle JSONRPCMessage if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest && jsonrpcRequest.method() .equals(McpSchema.METHOD_INITIALIZE)) { - logger.info("[POST] Handling initialize method, with receiving message: {}", requestBody); + logger.info("[POST] Handling initialize method. [requestBody={}]", requestBody); return handleInitializeRequest(request, response, jsonrpcRequest); } else { return handleJsonRpcMessage(message, request, requestBody, transportContext, response); } } catch (IllegalArgumentException | IOException e) { - logger.error("Failed to deserialize message: {}", e.getMessage(), e); + logger.error("[POST] Failed to deserialize message. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.BAD_REQUEST.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.PARSE_ERROR).message("Invalid message format").build()); } catch (Exception e) { - logger.error("Error handling message: {}", e.getMessage(), e); + logger.error("[POST] Error handling message. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -295,7 +301,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[DELETE] Receiving delete request from session: {}", sessionId); + logger.info("[DELETE] Receiving delete request. [sessionId={}]", sessionId); McpTransportContext transportContext = this.contextExtractor.extract(request); try { @@ -304,7 +310,7 @@ public Object handleDelete(HttpClassicServerRequest request, HttpClassicServerRe response.statusCode(HttpResponseStatus.OK.statusCode()); return null; } catch (Exception e) { - logger.error("Failed to delete session {}: {}", sessionId, e.getMessage(), e); + logger.error("[DELETE] Failed to delete session. [sessionId={}, error={}]", sessionId, e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -392,7 +398,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { String lastId = request.headers().first(HttpHeaders.LAST_EVENT_ID).orElse("0"); - logger.info("[GET] Handling replay request from session: {}", sessionId); + logger.info("[GET] Handling replay request. [sessionId={}]", sessionId); try { session.replay(lastId) @@ -404,12 +410,12 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); } catch (Exception e) { - logger.error("Failed to replay message: {}", e.getMessage(), e); + logger.error("[GET] Failed to replay message. [error={}]", e.getMessage(), e); emitter.fail(e); } }); } catch (Exception e) { - logger.error("Failed to replay messages: {}", e.getMessage(), e); + logger.error("[GET] Failed to replay messages. [error={}]", e.getMessage(), e); emitter.fail(e); } } @@ -426,7 +432,7 @@ private void handleReplaySseRequest(HttpClassicServerRequest request, McpTranspo */ private void handleEstablishSseRequest(String sessionId, McpStreamableServerSession session, FitStreamableMcpSessionTransport sessionTransport, Emitter emitter) { - logger.info("[GET] Handling Get request to establish new SSE for session: {}", sessionId); + logger.info("[GET] Handling GET request to establish new SSE. [sessionId={}]", sessionId); McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session.listeningStream(sessionTransport); @@ -438,21 +444,25 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on complete: {}", e.getMessage()); + logger.warn("[SSE] Error closing listeningStream on complete. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } } @Override public void onFailed(Exception cause) { - logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, cause.getMessage()); + logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, cause.getMessage()); try { listeningStream.close(); } catch (Exception e) { - logger.warn("[SSE] Error closing listeningStream on failure: {}", e.getMessage()); + logger.warn("[SSE] Error closing listeningStream on failure. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } } }); @@ -484,11 +494,11 @@ private Object handleInitializeRequest(HttpClassicServerRequest request, HttpCla response.statusCode(HttpResponseStatus.OK.statusCode()); response.headers().set("Content-Type", MimeType.APPLICATION_JSON.value()); response.headers().set(HttpHeaders.MCP_SESSION_ID, init.session().getId()); - logger.info("[POST] Sending initialize message via HTTP response to session {}", init.session().getId()); + logger.info("[POST] Sending initialize message via HTTP response. [sessionId={}]", init.session().getId()); return Entity.createObject(response, new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initResult, null)); } catch (Exception e) { - logger.error("Failed to initialize session: {}", e.getMessage(), e); + logger.error("[POST] Failed to initialize session. [error={}]", e.getMessage(), e); response.statusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.statusCode()); return Entity.createObject(response, McpError.builder(McpSchema.ErrorCodes.INTERNAL_ERROR).message(e.getMessage()).build()); @@ -515,7 +525,7 @@ private Object handleJsonRpcMessage(McpSchema.JSONRPCMessage message, HttpClassi } String sessionId = request.headers().first(HttpHeaders.MCP_SESSION_ID).orElse(""); McpStreamableServerSession session = this.sessions.get(sessionId); - logger.info("[POST] Receiving message from session {}: {}", sessionId, requestBody); + logger.info("[POST] Receiving message from session. [sessionId={}, requestBody={}]", sessionId, requestBody); if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) { handleJsonRpcResponse(jsonrpcResponse, session, transportContext, response); @@ -590,12 +600,12 @@ public void onEmittedData(TextEvent data) { @Override public void onCompleted() { - logger.info("[SSE] Completed SSE emitting for session: {}", sessionId); + logger.info("[SSE] Completed SSE emitting. [sessionId={}]", sessionId); } @Override public void onFailed(Exception e) { - logger.warn("[SSE] SSE failed for session: {}, cause: {}", sessionId, e.getMessage()); + logger.warn("[SSE] SSE failed. [sessionId={}, cause={}]", sessionId, e.getMessage()); } }); @@ -607,7 +617,7 @@ public void onFailed(Exception e) { .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext)) .block(); } catch (Exception e) { - logger.error("Failed to handle request stream: {}", e.getMessage(), e); + logger.error("[POST] Failed to handle request stream. [error={}]", e.getMessage(), e); emitter.fail(e); } }); @@ -643,7 +653,7 @@ private class FitStreamableMcpSessionTransport implements McpStreamableServerTra this.sessionId = sessionId; this.emitter = emitter; this.response = response; - logger.info("[SSE] Building SSE emitter for session: {} ", sessionId); + logger.info("[SSE] Building SSE emitter. [sessionId={}]", sessionId); } /** @@ -669,20 +679,21 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) { return Mono.fromRunnable(() -> { if (this.closed) { - logger.info("Attempted to send message to closed session: {}", this.sessionId); + logger.info("[SSE] Attempted to send message to closed session. [sessionId={}]", this.sessionId); return; } this.lock.lock(); try { if (this.closed) { - logger.info("Session {} was closed during message send attempt", this.sessionId); + logger.info("[SSE] Session was closed during message send attempt. [sessionId={}]", + this.sessionId); return; } // Check if connection is still active before sending if (!this.response.isActive()) { - logger.warn("[SSE] Connection inactive detected while sending message for session: {}", + logger.warn("[SSE] Connection inactive detected while sending message. [sessionId={}]", this.sessionId); this.close(); return; @@ -693,13 +704,18 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId TextEvent.custom().id(this.sessionId).event(Event.MESSAGE.code()).data(jsonText).build(); this.emitter.emit(textEvent); - logger.info("[SSE] Sending message to session {}: {}", this.sessionId, jsonText); + logger.info("[SSE] Sending message to session. [sessionId={}, jsonText={}]", + this.sessionId, + jsonText); } catch (Exception e) { - logger.error("Failed to send message to session {}: {}", this.sessionId, e.getMessage(), e); + logger.error("[SSE] Failed to send message to session. [sessionId={}, error={}]", + this.sessionId, + e.getMessage(), + e); try { this.emitter.fail(e); } catch (Exception errorException) { - logger.error("Failed to send error to SSE builder for session {}: {}", + logger.error("[SSE] Failed to send error to SSE builder. [sessionId={}, error={}]", this.sessionId, errorException.getMessage(), errorException); @@ -741,16 +757,18 @@ public void close() { this.lock.lock(); try { if (this.closed) { - logger.info("Session transport {} already closed", this.sessionId); + logger.info("[SSE] Session transport already closed. [sessionId={}]", this.sessionId); return; } this.closed = true; this.emitter.complete(); - logger.info("[SSE] Closed SSE builder successfully for session {}", sessionId); + logger.info("[SSE] Closed SSE builder successfully. [sessionId={}]", sessionId); } catch (Exception e) { - logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage()); + logger.warn("[SSE] Failed to complete SSE builder. [sessionId={}, error={}]", + sessionId, + e.getMessage()); } finally { this.lock.unlock(); } diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index fc765f10..a8d1e59b 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -48,7 +48,7 @@ public TestController(McpClientFactory mcpClientFactory) { } /** - * Initializes the MCP client with default settings (default logging, no elicitation). + * Initializes the MCP client . * This method creates an instance using the provided factory and initializes it. * * @param baseUri The base URI of the MCP server. @@ -63,55 +63,6 @@ public String initialize(@RequestQuery(name = "baseUri") String baseUri, return "Initialized"; } - /** - * Initializes the MCP client with custom logging consumer but without elicitation. - * This demonstrates using a custom logging handler. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @return A string indicating that the initialization was successful. - */ - @PostMapping(path = "/initialize-with-log") - public String initializeWithCustomLogging(@RequestQuery(name = "baseUri") String baseUri, - @RequestQuery(name = "sseEndpoint") String sseEndpoint) { - this.client = this.mcpClientFactory.create(baseUri, sseEndpoint, this::loggingConsumer); - this.client.initialize(); - return "Initialized with custom logging"; - } - - /** - * Initializes the MCP client with elicitation capability. - * This demonstrates enabling elicitation with default logging. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @return A string indicating that the initialization was successful. - */ - @PostMapping(path = "/initialize-with-elicitation") - public String initializeWithElicitation(@RequestQuery(name = "baseUri") String baseUri, - @RequestQuery(name = "sseEndpoint") String sseEndpoint) { - this.client = this.mcpClientFactory.create(baseUri, sseEndpoint, this::elicitationHandler); - this.client.initialize(); - return "Initialized with elicitation"; - } - - /** - * Initializes the MCP client with both custom logging and elicitation. - * This demonstrates full customization of the client. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @return A string indicating that the initialization was successful. - */ - @PostMapping(path = "/initialize-full") - public String initializeFullCustom(@RequestQuery(name = "baseUri") String baseUri, - @RequestQuery(name = "sseEndpoint") String sseEndpoint) { - this.client = - this.mcpClientFactory.create(baseUri, sseEndpoint, this::loggingConsumer, this::elicitationHandler); - this.client.initialize(); - return "Initialized with full customization"; - } - /** * Closes the MCP client and releases any resources associated with it. * This method ensures that the MCP client is properly closed and resources are released. diff --git a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java index 8f0a87d4..d52c639d 100644 --- a/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java +++ b/framework/fel/java/services/tool-mcp-client-service/src/main/java/modelengine/fel/tool/mcp/client/McpClientFactory.java @@ -6,11 +6,6 @@ package modelengine.fel.tool.mcp.client; -import io.modelcontextprotocol.spec.McpSchema; - -import java.util.function.Consumer; -import java.util.function.Function; - /** * Indicates the factory of {@link McpClient}. *

@@ -21,45 +16,11 @@ */ public interface McpClientFactory { /** - * Creates a {@link McpClient} instance with default logging consumer but without elicitation ability. + * Creates a {@link McpClient} instance. * * @param baseUri The base URI of the MCP server. * @param sseEndpoint The SSE endpoint of the MCP server. - * @return The connected {@link McpClient} instance with default logging consumer but without elicitation ability. + * @return The connected {@link McpClient} instance. */ McpClient create(String baseUri, String sseEndpoint); - - /** - * Creates a {@link McpClient} instance with custom logging consumer but without elicitation ability. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @param loggingConsumer The consumer to handle logging messages from the MCP server. - * @return The connected {@link McpClient} instance with custom logging consumer but without elicitation ability. - */ - McpClient create(String baseUri, String sseEndpoint, - Consumer loggingConsumer); - - /** - * Creates a {@link McpClient} instance with default logging consumer and elicitation ability. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @param elicitationHandler The function to handle elicitation requests from the MCP server. - * @return The connected {@link McpClient} instance with default logging consumer and elicitation ability. - */ - McpClient create(String baseUri, String sseEndpoint, - Function elicitationHandler); - - /** - * Creates a {@link McpClient} instance with custom message handlers and elicitation ability. - * - * @param baseUri The base URI of the MCP server. - * @param sseEndpoint The SSE endpoint of the MCP server. - * @param loggingConsumer The consumer to handle logging messages from the MCP server. - * @param elicitationHandler The function to handle elicitation requests from the MCP server. - * @return The connected {@link McpClient} instance with custom message handlers and elicitation ability. - */ - McpClient create(String baseUri, String sseEndpoint, Consumer loggingConsumer, - Function elicitationHandler); } \ No newline at end of file From 9c03bb171c7a8ae4e240f3ae75a9b874d4326eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sun, 9 Nov 2025 16:00:53 +0800 Subject: [PATCH 13/16] =?UTF-8?q?=E6=81=A2=E5=A4=8Dtestcontroller?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fel/tool/mcp/test/TestController.java | 32 ++----------------- 1 file changed, 3 insertions(+), 29 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index a8d1e59b..ead53548 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -6,7 +6,6 @@ package modelengine.fel.tool.mcp.test; -import io.modelcontextprotocol.spec.McpSchema; import modelengine.fel.tool.mcp.client.McpClient; import modelengine.fel.tool.mcp.client.McpClientFactory; import modelengine.fel.tool.mcp.entity.Tool; @@ -19,7 +18,6 @@ import modelengine.fitframework.log.Logger; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,12 +46,12 @@ public TestController(McpClientFactory mcpClientFactory) { } /** - * Initializes the MCP client . - * This method creates an instance using the provided factory and initializes it. + * Initializes the MCP client by creating an instance using the provided factory and initializing it. + * This method sets up the connection to the MCP server and prepares it for further interactions. * * @param baseUri The base URI of the MCP server. * @param sseEndpoint The SSE endpoint of the MCP server. - * @return A map with clientId and status message. + * @return A string indicating that the initialization was successful. */ @PostMapping(path = "/initialize") public String initialize(@RequestQuery(name = "baseUri") String baseUri, @@ -102,28 +100,4 @@ public List toolsList() { public Object toolsCall(@RequestQuery(name = "name") String name, @RequestBody Map jsonArgs) { return this.client.callTool(name, jsonArgs); } - - /** - * Custom logging consumer for MCP client. - * - * @param notification The logging message notification from MCP server. - */ - private void loggingConsumer(McpSchema.LoggingMessageNotification notification) { - log.info("Custom logging handler received message. [level={}, data={}]", - notification.level(), - notification.data()); - } - - /** - * Elicitation handler for MCP client. - * - * @param request The elicitation request from MCP server. - * @return The elicitation result with action and user data. - */ - private McpSchema.ElicitResult elicitationHandler(McpSchema.ElicitRequest request) { - log.info("Elicitation request received. [message={}, schema={}]", request.message(), request.requestedSchema()); - Map userData = new HashMap<>(); - userData.put("response", "Auto-accepted by test controller"); - return new McpSchema.ElicitResult(McpSchema.ElicitResult.Action.ACCEPT, userData); - } } \ No newline at end of file From 89c4443013bfe4cfa1d2353ab37b648e5c563244 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sun, 9 Nov 2025 16:01:48 +0800 Subject: [PATCH 14/16] =?UTF-8?q?=E5=88=A0=E9=99=A4testcontroller=E7=9A=84?= =?UTF-8?q?log=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/modelengine/fel/tool/mcp/test/TestController.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java index ead53548..110dc3cb 100644 --- a/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java +++ b/framework/fel/java/plugins/tool-mcp-test/src/main/java/modelengine/fel/tool/mcp/test/TestController.java @@ -15,7 +15,6 @@ import modelengine.fit.http.annotation.RequestMapping; import modelengine.fit.http.annotation.RequestQuery; import modelengine.fitframework.annotation.Component; -import modelengine.fitframework.log.Logger; import java.io.IOException; import java.util.List; @@ -31,8 +30,6 @@ @Component @RequestMapping(path = "/mcp-test") public class TestController { - private static final Logger log = Logger.get(TestController.class); - private final McpClientFactory mcpClientFactory; private McpClient client; From a5438f6981c1a84fead934de1810e47145a14ab9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <3200105739@zju.edu.cn> Date: Sun, 9 Nov 2025 16:02:44 +0800 Subject: [PATCH 15/16] =?UTF-8?q?=E5=88=A0=E9=99=A4service=E7=9A=84sdk?= =?UTF-8?q?=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- framework/fel/java/services/tool-mcp-client-service/pom.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/framework/fel/java/services/tool-mcp-client-service/pom.xml b/framework/fel/java/services/tool-mcp-client-service/pom.xml index a2f0cb0a..8a2fb8f1 100644 --- a/framework/fel/java/services/tool-mcp-client-service/pom.xml +++ b/framework/fel/java/services/tool-mcp-client-service/pom.xml @@ -33,11 +33,6 @@ org.fitframework.fel tool-mcp-common - - io.modelcontextprotocol.sdk - mcp - 0.15.0 - From 122cb0efbc2d633ba074002ad38e563d1d6368e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 10 Nov 2025 11:19:41 +0800 Subject: [PATCH 16/16] =?UTF-8?q?=E5=A3=B0=E6=98=8E=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E4=B8=8D=E5=AE=89=E5=85=A8=EF=BC=8C=E4=BB=A5=E5=8F=8A=E5=8A=A0?= =?UTF-8?q?this.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../support/DefaultMcpStreamableClient.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java index 9546c0f0..44bca847 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpStreamableClient.java @@ -31,6 +31,9 @@ /** * A default implementation of the MCP client that uses the MCP SDK's streamable HTTP transport. * + *

Not thread-safe: This class is not thread-safe. External synchronization is required + * if instances are to be accessed by multiple threads concurrently. + * * @author 黄可欣 * @since 2025-11-03 */ @@ -83,7 +86,7 @@ public String getClientId() { */ @Override public void initialize() { - ensureNotClosed(); + this.ensureNotClosed(); this.mcpSyncClient.initialize(); this.initialized = true; log.info("MCP client initialized successfully. [clientId={}]", this.clientId); @@ -98,7 +101,7 @@ public void initialize() { */ @Override public List getTools() { - ensureReady(); + this.ensureReady(); try { McpSchema.ListToolsResult result = this.mcpSyncClient.listTools(); if (result == null || result.tools() == null) { @@ -133,7 +136,7 @@ public List getTools() { */ @Override public Object callTool(String name, Map arguments) { - ensureReady(); + this.ensureReady(); try { log.info("Calling tool. [clientId={}, name={}, arguments={}]", this.clientId, name, arguments); McpSchema.CallToolResult result = @@ -144,7 +147,7 @@ public Object callTool(String name, Map arguments) { throw new IllegalStateException(StringUtils.format("Failed to call tool: result is null. [name={0}]", name)); } - return processToolResult(result, name); + return this.processToolResult(result, name); } catch (Exception e) { log.error("Failed to call tool. [clientId={}, name={}, error={}]", this.clientId, name, e.getMessage()); throw new IllegalStateException(StringUtils.format("Failed to call tool. [name={0}, error={1}]", @@ -166,7 +169,7 @@ public Object callTool(String name, Map arguments) { */ private Object processToolResult(McpSchema.CallToolResult result, String name) { if (result.isError() != null && result.isError()) { - String errorDetails = extractErrorDetails(result.content()); + String errorDetails = this.extractErrorDetails(result.content()); log.error("Tool returned an error. [clientId={}, name={}, details={}]", this.clientId, name, errorDetails); throw new IllegalStateException(StringUtils.format("Tool returned an error. [name={0}, details={1}]", name, @@ -200,11 +203,14 @@ private Object processToolResult(McpSchema.CallToolResult result, String name) { /** * Closes the MCP client connection and releases associated resources. * + *

Note: This method is not thread-safe. Callers should ensure that this method + * is not invoked concurrently from multiple threads. + * * @throws IOException if an I/O error occurs during the close operation. */ @Override public void close() throws IOException { - ensureNotClosed(); + this.ensureNotClosed(); this.closed = true; this.mcpSyncClient.closeGracefully(); log.info("MCP client closed. [clientId={}]", this.clientId); @@ -256,7 +262,7 @@ private void ensureNotClosed() { * @throws IllegalStateException if the client is closed or not initialized. */ private void ensureReady() { - ensureNotClosed(); + this.ensureNotClosed(); if (!this.initialized) { throw new IllegalStateException(StringUtils.format("MCP client is not initialized. [clientId={0}]", this.clientId));