diff --git a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java index e2b1d5c5..459a4978 100644 --- a/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java +++ b/framework/fel/java/plugins/tool-mcp-server/src/main/java/modelengine/fel/tool/mcp/server/McpServerController.java @@ -88,19 +88,22 @@ public McpServerController(@Fit(alias = "json") ObjectSerializer serializer, Mcp if (MapUtils.isEmpty(this.responses)) { return; } - List toRemoved = new ArrayList<>(); + List obsoleteSessionIds = new ArrayList<>(); for (Map.Entry entry : this.responses.entrySet()) { if (entry.getValue().isActive()) { continue; } - toRemoved.add(entry.getKey()); + obsoleteSessionIds.add(entry.getKey()); } - if (CollectionUtils.isEmpty(toRemoved)) { + if (CollectionUtils.isEmpty(obsoleteSessionIds)) { return; } - toRemoved.forEach(this.responses::remove); - toRemoved.forEach(this.emitters::remove); - log.info("Channels are inactive, remove emitters and responses. [sessionIds={}]", toRemoved); + obsoleteSessionIds.forEach(this.responses::remove); + for (String obsoleteSessionId : obsoleteSessionIds) { + Emitter removed = this.emitters.remove(obsoleteSessionId); + removed.complete(); + } + log.info("Channels are inactive, remove emitters and responses. [sessionIds={}]", obsoleteSessionIds); }).build()); }