diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java index 0c62e05a..e3ae2fda 100644 --- a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/DefaultMcpClient.java @@ -72,12 +72,10 @@ public class DefaultMcpClient implements McpClient { private volatile ServerSchema serverSchema; private volatile boolean initialized = false; private volatile boolean closed = false; - private final List tools = new ArrayList<>(); private final Object initializedLock = LockUtils.newSynchronizedLock(); - private final Object toolsLock = LockUtils.newSynchronizedLock(); private final Map>> responseConsumers = new ConcurrentHashMap<>(); private final Map pendingRequests = new ConcurrentHashMap<>(); - private final Map pendingResults = new ConcurrentHashMap<>(); + private final Map pendingResults = new ConcurrentHashMap<>(); private volatile Subscription subscription; private volatile ThreadPoolScheduler pingScheduler; @@ -197,10 +195,6 @@ private void initializedMcpServer(JsonRpc.Response response) { response.error()); throw new IllegalStateException(response.error().toString()); } - synchronized (this.initializedLock) { - this.initialized = true; - this.initializedLock.notifyAll(); - } this.recordServerSchema(response); HttpClassicClientRequest request = this.client.createRequest(HttpRequestMethod.POST, this.baseUri + this.messageEndpoint); @@ -225,6 +219,10 @@ private void initializedMcpServer(JsonRpc.Response response) { } catch (IOException e) { throw new IllegalStateException(e); } + synchronized (this.initializedLock) { + this.initialized = true; + this.initializedLock.notifyAll(); + } this.pingScheduler = ThreadPoolScheduler.custom() .threadPoolName("mcp-client-ping-" + this.name) .awaitTermination(3, TimeUnit.SECONDS) @@ -262,27 +260,30 @@ public List getTools() { while (this.pendingRequests.get(requestId)) { ThreadUtils.sleep(100); } - synchronized (this.toolsLock) { - return this.tools; + Result result = this.pendingResults.remove(requestId); + this.pendingRequests.remove(requestId); + if (result.isSuccess()) { + return ObjectUtils.cast(result.getContent()); + } else { + throw new IllegalStateException(result.getError()); } } private void getTools0(JsonRpc.Response response) { if (response.error() != null) { - log.error("Failed to get tools list from MCP server. [sessionId={}, response={}]", + String error = StringUtils.format("Failed to get tools list from MCP server. [sessionId={0}, response={1}]", this.sessionId, response); + this.pendingResults.put(response.id(), Result.error(error)); this.pendingRequests.put(response.id(), false); return; } Map result = cast(response.result()); List> rawTools = cast(result.get("tools")); - synchronized (this.toolsLock) { - this.tools.clear(); - this.tools.addAll(rawTools.stream() - .map(rawTool -> ObjectUtils.toCustomObject(rawTool, Tool.class)) - .toList()); - } + List tools = new ArrayList<>(rawTools.stream() + .map(rawTool -> ObjectUtils.toCustomObject(rawTool, Tool.class)) + .toList()); + this.pendingResults.put(response.id(), Result.success(tools)); this.pendingRequests.put(response.id(), false); } @@ -303,32 +304,46 @@ public Object callTool(String name, Map arguments) { while (this.pendingRequests.get(requestId)) { ThreadUtils.sleep(100); } - return this.pendingResults.get(requestId); + Result result = this.pendingResults.remove(requestId); + this.pendingRequests.remove(requestId); + if (result.isSuccess()) { + return result.getContent(); + } else { + throw new IllegalStateException(result.getError()); + } } private void callTools0(JsonRpc.Response response) { if (response.error() != null) { - log.error("Failed to call tool from MCP server. [sessionId={}, response={}]", this.sessionId, response); + String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, response={1}]", + this.sessionId, + response); + this.pendingResults.put(response.id(), Result.error(error)); this.pendingRequests.put(response.id(), false); return; } Map result = cast(response.result()); boolean isError = cast(result.get("isError")); if (isError) { - log.error("Failed to call tool from MCP server. [sessionId={}, result={}]", this.sessionId, result); + String error = StringUtils.format("Failed to call tool from MCP server. [sessionId={0}, result={1}]", + this.sessionId, + result); + this.pendingResults.put(response.id(), Result.error(error)); this.pendingRequests.put(response.id(), false); return; } List> rawContents = cast(result.get("content")); if (CollectionUtils.isEmpty(rawContents)) { - log.error("Failed to call tool from MCP server: no result returned. [sessionId={}, result={}]", + String error = StringUtils.format( + "Failed to call tool from MCP server: no result returned. [sessionId={0}, result={1}]", this.sessionId, result); + this.pendingResults.put(response.id(), Result.error(error)); this.pendingRequests.put(response.id(), false); return; } Map rawContent = rawContents.get(0); - this.pendingResults.put(response.id(), rawContent.get("text")); + this.pendingResults.put(response.id(), Result.success(rawContent.get("text"))); this.pendingRequests.put(response.id(), false); } diff --git a/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java new file mode 100644 index 00000000..4b655024 --- /dev/null +++ b/framework/fel/java/plugins/tool-mcp-client/src/main/java/modelengine/fel/tool/mcp/client/support/Result.java @@ -0,0 +1,72 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.tool.mcp.client.support; + +/** + * 表示调用 MCP 的结果。 + * + * @author 季聿阶 + * @since 2025-08-04 + */ +public class Result { + private final boolean success; + private final Object content; + private final String error; + + private Result(boolean success, Object content, String error) { + this.success = success; + this.content = content; + this.error = error; + } + + /** + * 创建一个成功的结果。 + * + * @param content 表示成功结果的内容的 {@link Object}。 + * @return 表示成功结果的对象的 {@link Result}。 + */ + public static Result success(Object content) { + return new Result(true, content, null); + } + + /** + * 创建一个失败的结果。 + * + * @param error 表示错误结果的信息的 {@link String}。 + * @return 表示错误结果的对象的 {@link Result}。 + */ + public static Result error(String error) { + return new Result(false, null, error); + } + + /** + * 获取结果是否成功。 + * + * @return 如果结果成功,则返回 {@code true};否则返回 {@code false}。 + */ + public boolean isSuccess() { + return this.success; + } + + /** + * 获取结果内容。 + * + * @return 表示结果内容的 {@link Object}。 + */ + public Object getContent() { + return this.content; + } + + /** + * 获取结果错误信息。 + * + * @return 表示错误信息的 {@link String}。 + */ + public String getError() { + return this.error; + } +}