From c61d8be70cb531a5c016bcb16ad925b609d21da4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 29 Dec 2025 10:00:49 +0800 Subject: [PATCH 1/5] =?UTF-8?q?05=E7=A4=BA=E4=BE=8B=E4=BF=AE=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../retrieval/RetrievalExampleController.java | 40 ++++++++++++++++--- .../waterflow/domain/context/MatchWindow.java | 6 ++- 2 files changed, 39 insertions(+), 7 deletions(-) diff --git a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java index 2444ddfd..436e52c2 100644 --- a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java +++ b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java @@ -41,10 +41,14 @@ import modelengine.fitframework.annotation.Component; import modelengine.fitframework.annotation.Fit; import modelengine.fitframework.annotation.Value; +import modelengine.fitframework.log.Logger; import modelengine.fitframework.serialization.ObjectSerializer; -import modelengine.fitframework.util.FileUtils; import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; import java.util.List; import java.util.stream.Collectors; @@ -57,6 +61,7 @@ @Component @RequestMapping("/ai/example") public class RetrievalExampleController { + private static final Logger log = Logger.get(RetrievalExampleController.class); private static final String REWRITE_PROMPT = "作为一个向量检索助手,你的任务是结合历史记录,为”原问题“生成”检索词“," + "生成的问题要求指向对象清晰明确,并与“原问题语言相同。\n\n" + "历史记录:\n---\n" + DEFAULT_HISTORY_KEY + "---\n原问题:{{query}}\n检索词:"; @@ -85,22 +90,27 @@ public RetrievalExampleController(ChatModel chatModel, EmbedModel embedModel, .others(node -> node.map(tip -> tip.freeze().get("query").text())) .retrieve(new DefaultVectorRetriever(vectorStore, SearchOption.custom().topK(1).build())) .synthesize(docs -> Content.from(docs.stream().map(Document::text).collect(Collectors.joining("\n\n")))) - .close(); + .close(__ -> log.info("Retrieve flow completed.")); AiProcessFlow> indexFlow = AiFlows.create() .load(new JsonFileSource(serializer, StringTemplate.create("{{question}}: {{answer}}"))) .index(vectorStore) .close(); - File file = FileUtils.file(this.getClass().getClassLoader().getResource("data.json")); + File file = extractResourceToTempFile("data.json"); notNull(file, "The data cannot be null."); - indexFlow.converse().offer(file); + indexFlow.converse() + .doOnError(e -> log.info("Index build error. [error={}]", e.getMessage(), e)) + .doOnFinally(() -> log.info("Index build successfully.")) + .offer(file); this.ragFlow = AiFlows.create() + .just(query -> log.info("RAG flow start. [query={}]", query)) .map(query -> Tip.from("query", query)) .runnableParallel(value("context", retrieveFlow), passThrough()) .prompt(Prompts.history(), Prompts.human(CHAT_PROMPT)) + .just(__ -> log.info("LLM start generation.")) .generate(chatFlowModel) - .close(); + .close(__ -> log.info("RAG flow completed.")); } /** @@ -116,4 +126,24 @@ public ChatMessage chat(@RequestParam("query") String query) { this.memory.add(aiMessage); return aiMessage; } + + /** + * 从 JAR 中提取资源到临时文件。 + * + * @param resourceName 表示资源名称的 {@link String}。 + * @return 表示临时文件的 {@link File}。 + */ + private File extractResourceToTempFile(String resourceName) { + try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(resourceName)) { + if (inputStream == null) { + throw new IllegalArgumentException("Resource not found: " + resourceName); + } + File tempFile = File.createTempFile("data-", ".json"); + tempFile.deleteOnExit(); + Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + return tempFile; + } catch (IOException e) { + throw new RuntimeException("Failed to extract resource: " + resourceName, e); + } + } } \ No newline at end of file diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java index 261ff5f5..646c9434 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java @@ -47,12 +47,14 @@ public MatchWindow(Window source, UUID id, Object data) { * @return 返回创建的MatchWindow对象 */ public static synchronized MatchWindow from(Window source, UUID id, Object data) { - MatchWindow window = all.get(id.toString()); + // Use composite key: sessionId + UUID to prevent cross-session pollution + String cacheKey = source.getSession().getId() + ":" + id.toString(); + MatchWindow window = all.get(cacheKey); if (window == null) { window = new MatchWindow(source, id, data); FlowSession session = new FlowSession(source.getSession()); session.setWindow(window); - all.put(id.toString(), window); + all.put(cacheKey, window); } WindowToken token = window.createToken(); token.beginConsume(); From 22ee3e41a0e7bff070ddce9aa6e3b93ae98d5f53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Mon, 29 Dec 2025 10:41:05 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E5=A4=8D=E5=8E=9F=E6=96=87=E4=BB=B6?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/fel-example/05-retrieval/README.md | 2 ++ .../retrieval/RetrievalExampleController.java | 27 ++----------------- 2 files changed, 4 insertions(+), 25 deletions(-) diff --git a/examples/fel-example/05-retrieval/README.md b/examples/fel-example/05-retrieval/README.md index 63e72a4e..33692310 100644 --- a/examples/fel-example/05-retrieval/README.md +++ b/examples/fel-example/05-retrieval/README.md @@ -226,6 +226,8 @@ node0-->node1{{=}} ## 验证 +- 在IDEA中运行`DemoApplication` + - 在浏览器栏输入:`http://localhost:8080/ai/example/chat?query=请介绍一下黑神话悟空` ```json diff --git a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java index 436e52c2..099c3f81 100644 --- a/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java +++ b/examples/fel-example/05-retrieval/src/main/java/modelengine/example/ai/chat/retrieval/RetrievalExampleController.java @@ -43,12 +43,9 @@ import modelengine.fitframework.annotation.Value; import modelengine.fitframework.log.Logger; import modelengine.fitframework.serialization.ObjectSerializer; +import modelengine.fitframework.util.FileUtils; import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.StandardCopyOption; import java.util.List; import java.util.stream.Collectors; @@ -96,7 +93,7 @@ public RetrievalExampleController(ChatModel chatModel, EmbedModel embedModel, .load(new JsonFileSource(serializer, StringTemplate.create("{{question}}: {{answer}}"))) .index(vectorStore) .close(); - File file = extractResourceToTempFile("data.json"); + File file = FileUtils.file(this.getClass().getClassLoader().getResource("data.json")); notNull(file, "The data cannot be null."); indexFlow.converse() .doOnError(e -> log.info("Index build error. [error={}]", e.getMessage(), e)) @@ -126,24 +123,4 @@ public ChatMessage chat(@RequestParam("query") String query) { this.memory.add(aiMessage); return aiMessage; } - - /** - * 从 JAR 中提取资源到临时文件。 - * - * @param resourceName 表示资源名称的 {@link String}。 - * @return 表示临时文件的 {@link File}。 - */ - private File extractResourceToTempFile(String resourceName) { - try (InputStream inputStream = this.getClass().getClassLoader().getResourceAsStream(resourceName)) { - if (inputStream == null) { - throw new IllegalArgumentException("Resource not found: " + resourceName); - } - File tempFile = File.createTempFile("data-", ".json"); - tempFile.deleteOnExit(); - Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING); - return tempFile; - } catch (IOException e) { - throw new RuntimeException("Failed to extract resource: " + resourceName, e); - } - } } \ No newline at end of file From 6fe7ee95780c235a30b0d6544f9734e7c47bba31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Tue, 30 Dec 2025 15:01:23 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0cache=E9=87=8A=E6=94=BE?= =?UTF-8?q?=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/domain/context/MatchWindow.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java index 646c9434..0b5d481c 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java @@ -82,4 +82,23 @@ public void complete() { public boolean fulfilled() { return this.from.isComplete() && this.from.isOngoing(); } + + @Override + public void tryFinish() { + super.tryFinish(); + // 当 MatchWindow 完全完成时,从缓存中移除以防止内存泄漏 + if (this.isDone()) { + cleanup(); + } + } + + /** + * 从缓存中移除当前 MatchWindow + */ + private void cleanup() { + if (this.getSession() != null) { + String cacheKey = this.getSession().getId() + ":" + this.key().toString(); + all.remove(cacheKey); + } + } } From dd8c60346d5b775f576376c2872f078d09dcc0d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Tue, 30 Dec 2025 19:10:15 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=8A=8AMatchWindow=E6=94=BE=E5=9C=A8FlowS?= =?UTF-8?q?essionCache=E7=BB=9F=E4=B8=80=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/domain/context/MatchWindow.java | 38 +++++-------------- .../repo/flowsession/FlowSessionRepo.java | 24 ++++++++++++ .../waterflow/domain/states/MatchHappen.java | 4 +- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java index 0b5d481c..1ea34270 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/MatchWindow.java @@ -6,12 +6,13 @@ package modelengine.fit.waterflow.domain.context; +import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; + import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** @@ -21,8 +22,6 @@ * @since 1.0 */ public class MatchWindow extends Window { - private static final Map all = new ConcurrentHashMap<>(); - private final Set arms = new HashSet<>(); /** @@ -41,24 +40,26 @@ public MatchWindow(Window source, UUID id, Object data) { /** * 创建一个MatchWindow * + * @param flowId 流程ID * @param source 源窗口 * @param id 窗口ID * @param data 窗口数据 * @return 返回创建的MatchWindow对象 */ - public static synchronized MatchWindow from(Window source, UUID id, Object data) { - // Use composite key: sessionId + UUID to prevent cross-session pollution - String cacheKey = source.getSession().getId() + ":" + id.toString(); - MatchWindow window = all.get(cacheKey); + public static synchronized MatchWindow from(String flowId, Window source, UUID id, Object data) { + // 从 FlowSessionRepo 获取缓存 + Map cache = FlowSessionRepo.getMatchWindowCache(flowId, source.getSession()); + + MatchWindow window = cache.get(id); if (window == null) { window = new MatchWindow(source, id, data); FlowSession session = new FlowSession(source.getSession()); session.setWindow(window); - all.put(cacheKey, window); + cache.put(id, window); } WindowToken token = window.createToken(); token.beginConsume(); - List arms = all.values().stream().filter(t -> t.from == source).collect(Collectors.toList()); + List arms = cache.values().stream().filter(t -> t.from == source).collect(Collectors.toList()); for (MatchWindow a : arms) { a.setArms(arms); } @@ -82,23 +83,4 @@ public void complete() { public boolean fulfilled() { return this.from.isComplete() && this.from.isOngoing(); } - - @Override - public void tryFinish() { - super.tryFinish(); - // 当 MatchWindow 完全完成时,从缓存中移除以防止内存泄漏 - if (this.isDone()) { - cleanup(); - } - } - - /** - * 从缓存中移除当前 MatchWindow - */ - private void cleanup() { - if (this.getSession() != null) { - String cacheKey = this.getSession().getId() + ":" + this.key().toString(); - all.remove(cacheKey); - } - } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java index be71e73f..1c822ce5 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowsession/FlowSessionRepo.java @@ -8,6 +8,7 @@ import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow; import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.context.MatchWindow; import modelengine.fit.waterflow.domain.context.Window; import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fitframework.inspection.Validation; @@ -90,6 +91,19 @@ public static FlatMapSourceWindow getFlatMapSource(String flowId, Window window, .getFlatMapSourceWindow(window, repo); } + /** + * 获取 MatchWindow 缓存 Map,用于存储和检索 MatchWindow 实例 + * + * @param flowId The unique identifier of the flow. + * @param session The current session context. + * @return MatchWindow 缓存 Map + */ + public static Map getMatchWindowCache(String flowId, FlowSession session) { + Validation.notNull(flowId, "Flow id cannot be null."); + Validation.notNull(session, "Session cannot be null."); + return getFlowSessionCache(flowId, session).getMatchWindowCache(); + } + /** * Releases all resources associated with a specific flow session. * @@ -137,6 +151,12 @@ private static class FlowSessionCache { */ private final Map flatMapSourceWindows = new ConcurrentHashMap<>(); + /** + * 记录流程中条件匹配节点产生的窗口信息,用于将同一批数据汇聚。 + * 其中索引为 match window 的唯一标识。 + */ + private final Map matchWindows = new ConcurrentHashMap<>(); + private final Map accOrders = new ConcurrentHashMap<>(); private FlowSession getNextToSession(FlowSession session) { @@ -165,6 +185,10 @@ private FlatMapSourceWindow getFlatMapSourceWindow(Window window, FlowContextRep }); } + private Map getMatchWindowCache() { + return this.matchWindows; + } + private int getNextAccOrder(String nodeId) { return this.accOrders.compute(nodeId, (key, value) -> { if (value == null) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java index cec67190..c3bc6bdc 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/MatchHappen.java @@ -50,8 +50,8 @@ public MatchHappen match(Operators.Whether whether, Operators.BranchProcessor processor) { UUID id = UUID.randomUUID(); State branchStart = new State<>(this.node.publisher() - .just(input -> input.setSession( - MatchWindow.from(input.getWindow(), id, input.getData()).getSession()), whether) + .just(input -> input.setSession(MatchWindow.from(this.node.processor.getStreamId(), + input.getWindow(), id, input.getData()).getSession()), whether) .displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow()); State branch = processor.process(branchStart); this.branches.add(branch); From a444f64e77af945d06c02622f2fc49ac29123887 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BB=84=E5=8F=AF=E6=AC=A3?= <2218887102@qq.com> Date: Tue, 30 Dec 2025 20:35:21 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=B7=BB=E5=8A=A0FlowSessionCache=E6=96=87?= =?UTF-8?q?=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/java/waterflow-core/README.md | 52 ++++++++++++++++++- 1 file changed, 51 insertions(+), 1 deletion(-) diff --git a/framework/waterflow/java/waterflow-core/README.md b/framework/waterflow/java/waterflow-core/README.md index 17fb5c20..ae310ad2 100644 --- a/framework/waterflow/java/waterflow-core/README.md +++ b/framework/waterflow/java/waterflow-core/README.md @@ -271,7 +271,57 @@ Flows.create() 向流中投递数据。这里需要注意,流的运行是异步的,offer返回的是这次运行的实例ID。 -## 使用限制 +## 核心机制 + +### FlowSessionCache 会话缓存管理 + +FlowSessionCache 是 waterflow 的核心资源管理机制,负责统一管理流程执行过程中产生的所有会话相关资源,确保同一批数据的正确汇聚和资源的自动释放。 + +#### 缓存结构 + +FlowSessionCache 按照 `flowId -> sessionId -> FlowSessionCache` 的层级结构组织,每个流程会话维护独立的缓存实例,内部管理以下资源: + +1. **Session 流转缓存(nextToSessions)** + - 记录每个节点向下游节点流转数据时使用的 session + - 以当前窗口的唯一标识(UUID)为索引 + - 确保同一批数据在节点间流转时使用相同的 session 进行汇聚 + +2. **Emitter 处理缓存(nextEmitterHandleSessions)** + - 专门用于处理 emitter 操作的 session 缓存 + - 为发射器操作提供独立的会话上下文 + +3. **FlatMap 窗口缓存(flatMapSourceWindows)** + - 记录 flatMap 节点产生的源窗口信息 + - 以窗口唯一标识为索引存储 `FlatMapSourceWindow` 实例 + - 用于将 flatMap 操作产生的多个输出数据与原始输入关联 + +4. **Match 窗口缓存(matchWindows)** + - 记录条件匹配节点(`conditions`)产生的窗口信息 + - 以 MatchWindow 的唯一标识为索引 + - 用于将条件分支产生的数据进行汇聚 + +5. **累加器顺序缓存(accOrders)** + - 记录每个节点的累加操作顺序编号 + - 以节点 ID 为索引,存储递增的序号 + +#### 资源管理机制 + +**自动创建与复用** +- 首次访问某个流程会话时,FlowSessionCache 自动创建并初始化 +- 相同窗口/会话标识的资源会被复用,避免重复创建 +- 通过 `FlowSessionRepo` 提供的静态方法访问各类缓存资源 + +**会话隔离** +- 不同流程(flowId)的缓存完全隔离 +- 同一流程的不同会话(sessionId)也拥有独立的缓存空间 +- 避免跨会话或跨流程的数据污染 + +**生命周期管理** +- 会话结束时调用 `FlowSessionRepo.release(flowId, session)` 自动释放所有关联资源 +- 当某个流程的所有会话都释放后,自动清理该流程的缓存映射 +- 无需在各个窗口或会话类中实现清理逻辑,避免内存泄漏 + +## 使用限制 1. 在编排流程时需要保证节点流转上没有死循环,否则处于死循环的数据将一致在这些节点上循环流转。 2. 数据流转的线程池最大是100个,每个节点最大同时处理16个批次的数据,每个批次的数据在每个节点上串行执行。超过限制的数据将排队等待执行。