From e80259d48cc1ba27ed4a3063407714e35cfb921c Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Fri, 24 Oct 2025 10:54:43 +0800 Subject: [PATCH 01/14] =?UTF-8?q?[waterflow]=20=E5=90=88=E5=B9=B6=EF=BC=9A?= =?UTF-8?q?=E9=80=82=E9=85=8D=E4=B8=BB=E8=A6=81=E7=9A=84=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=95=B4=E5=90=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/context/CompleteContext.java | 2 +- .../waterflow/domain/context/FlowContext.java | 27 ++++-- .../fit/waterflow/domain/context/Window.java | 2 +- .../flowcontext/FlowContextMemoMessenger.java | 2 +- .../repo/flowcontext/FlowContextMemoRepo.java | 29 +++++++ .../flowcontext/FlowContextMessenger.java | 3 +- .../repo/flowcontext/FlowContextRepo.java | 83 ++++++++++++++++++ .../waterflow/domain/enums/FlowNodeStage.java | 25 ++++++ .../domain/enums/FlowNodeStatus.java | 7 +- .../domain/enums/FlowTraceStatus.java | 5 ++ .../stream/callbacks/PreSendCallbackInfo.java | 53 ++++++++++++ .../domain/stream/nodes/BlockToken.java | 6 +- .../domain/stream/nodes/ConditionsNode.java | 41 ++++++--- .../waterflow/domain/stream/nodes/From.java | 15 ++-- .../waterflow/domain/stream/nodes/Node.java | 11 ++- .../domain/stream/nodes/ParallelNode.java | 6 +- .../domain/stream/nodes/Retryable.java | 19 ++++- .../fit/waterflow/domain/stream/nodes/To.java | 85 +++++++++++++++++-- .../domain/stream/reactive/Publisher.java | 5 +- .../domain/stream/reactive/Subscriber.java | 26 +++++- .../fit/waterflow/domain/WaterFlowsTest.java | 64 +++++++------- 21 files changed, 438 insertions(+), 78 deletions(-) create mode 100644 framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java create mode 100644 framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java index 7d28698f..3e026b82 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java @@ -27,7 +27,7 @@ public class CompleteContext extends FlowContext { */ public CompleteContext(FlowContext context, String position) { super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position, - context.getParallel(), context.getParallelMode(), context.getSession()); + context.getParallel(), context.getParallelMode(), context.getSession(), context.getCreateAt()); this.batchId = context.getBatchId(); this.setIndex(Constants.NOT_PRESERVED_INDEX); } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java index 8184bc5b..c817e4a3 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java @@ -142,6 +142,13 @@ public class FlowContext extends IdGenerator implements StateContext { @Setter private Integer index; + /** + * 当前context接下来要走到位置:可以是连线或者节点id + */ + @Setter + @Getter + private String nextPositionId; + /** * 创建一个 {@link FlowContext} 实例。 * @@ -154,7 +161,7 @@ public class FlowContext extends IdGenerator implements StateContext { */ public FlowContext(String streamId, String rootId, T data, Set traceId, String position, FlowSession session) { - this(streamId, rootId, data, traceId, position, "", "", session); + this(streamId, rootId, data, traceId, position, "", "", session, LocalDateTime.now()); } /** @@ -167,10 +174,11 @@ public FlowContext(String streamId, String rootId, T data, Set traceId, * @param position 表示上下文当前所处的位置的 {@link String}。 * @param parallel 表示并行节点唯一标识的 {@link String}。 * @param parallelMode 表示并行模式的 {@link String}。 + * @param createAt 表示创建时间的 {@link LocalDateTime}。 * @param session 表示上下文会话信息的 {@link FlowSession}。 */ public FlowContext(String streamId, String rootId, T data, Set traceId, String position, String parallel, - String parallelMode, FlowSession session) { + String parallelMode, FlowSession session, LocalDateTime createAt) { this.streamId = streamId; this.rootId = rootId; this.data = data; @@ -179,7 +187,7 @@ public FlowContext(String streamId, String rootId, T data, Set traceId, this.position = position; this.parallel = parallel; this.parallelMode = parallelMode; - this.createAt = LocalDateTime.now(); + this.createAt = createAt; this.session = session; this.index = this.createIndex(); // 0起始,说明保序 } @@ -266,9 +274,10 @@ public FlowContext toBatch(String toBatchId) { * @param 表示返回值类型的泛型参数。 * @param data 表示处理后数据的 {@link R}。 * @param position 表示处理后所处的节点的 {@link String}。 + * @param createAt 表示创建时间的 {@link LocalDateTime}。 * @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。 */ - public FlowContext generate(R data, String position) { + public FlowContext generate(R data, String position, LocalDateTime createAt) { FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, @@ -276,7 +285,8 @@ public FlowContext generate(R data, String position) { this.position, this.parallel, this.parallelMode, - this.session); + this.session, + createAt); context.position = position; context.previous = this.id; context.batchId = this.batchId; @@ -293,7 +303,9 @@ public FlowContext generate(R data, String position) { * @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。 */ public List> generate(List dataList, String position) { - return dataList.stream().map(data -> this.generate(data, position)).collect(Collectors.toList()); + return dataList.stream() + .map(data -> this.generate(data, position, LocalDateTime.now())) + .collect(Collectors.toList()); } /** @@ -312,7 +324,8 @@ public FlowContext convertData(R data, String id) { this.position, this.parallel, this.parallelMode, - this.session); + this.session, + LocalDateTime.now()); context.previous = this.previous; context.status = this.status; context.id = id; diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java index e9fe3a10..cfa660be 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java @@ -255,7 +255,7 @@ private void fire() { cs.add(completeContext); List contexts = node.getProcessMode().process(node, cs); if (node instanceof Processor) { - ((Processor) node).offer(contexts); + ((Processor) node).offer(contexts, __ -> {}); } completeContext = null; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java index 69cad9b3..1548e5d5 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java @@ -39,7 +39,7 @@ public void send(String nodeId, List> contexts) { * @param 流程实例执行时的入参数据类型,用于泛型推倒 */ @Override - public void sendCallback(List> contexts) { + public void sendCallback(Object callback, List> contexts) { LOG.warn("FlowEngine memo messenger does not support sending events."); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java index 9d428e46..49524d43 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java @@ -10,6 +10,7 @@ import modelengine.fit.waterflow.domain.context.FlowTrace; import modelengine.fit.waterflow.domain.enums.FlowNodeStatus; import modelengine.fit.waterflow.domain.stream.operators.Operators; +import modelengine.fitframework.util.ObjectUtils; import java.util.ArrayList; import java.util.Deque; @@ -74,6 +75,11 @@ public List> getContextsByPosition(String streamId, String po .filter(context -> context.getStatus().toString().equals(status))); } + @Override + public List> findWithoutFlowDataByTraceId(String traceId) { + throw new IllegalStateException("Not support"); + } + @Override public List> getContextsByTrace(String traceId) { return query(stream -> stream @@ -95,6 +101,21 @@ public synchronized void save(List> contexts) { }); } + @Override + public void updateFlowDataAndToBatch(List> contexts) { + save(contexts); + } + + @Override + public synchronized void updateFlowData(Map flowDataList) { + flowDataList.forEach((contextId, data) -> { + FlowContext flowContext = ObjectUtils.cast(this.contextsMap.get(contextId)); + if (flowContext != null) { + flowContext.setData(data); + } + }); + } + @Override public void updateToSent(List> contexts) { save(contexts); @@ -125,6 +146,14 @@ public List> getByIds(List ids) { return ids.stream().map(i -> (FlowContext) contextsMap.get(i)).collect(Collectors.toList()); } + @Override + public List> getByToBatch(List toBatchIds) { + return query(stream -> stream + .filter(context -> context.getStatus().equals(FlowNodeStatus.PENDING)) + .filter(FlowContext::isSent) + .filter(context -> toBatchIds.contains(context.getToBatch()))); + } + @Override public List> requestMappingContext(String streamId, List subscriptions, Map sessions) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java index 20c22f9a..eec0762b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java @@ -50,10 +50,11 @@ default void send(ProcessType type, Subscriber subscriber, List 流程实例执行时的入参数据类型,用于泛型推倒 */ - void sendCallback(List> contexts); + void sendCallback(Object callback, List> contexts); /** * Directly processes a list of flow contexts through the specified subscriber. diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java index aab19d6c..1ad5d43f 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java @@ -47,6 +47,15 @@ public interface FlowContextRepo { */ List> getContextsByPosition(String streamId, String posId, String batchId, String status); + /** + * 根据traceId查询所有的context对象 + * + * @param 泛型类型,表示上下文的数据类型 + * @param traceId traceId + * @return 上下文列表 + */ + List> findWithoutFlowDataByTraceId(String traceId); + /** * 根据traceId获取上下文 * @@ -64,6 +73,22 @@ public interface FlowContextRepo { */ void save(List> contexts); + /** + * 批量更新context的上下文数据flowData字段 + * + * @param 泛型类型,表示上下文的数据类型 + * @param contexts contexts + */ + void updateFlowDataAndToBatch(List> contexts); + + /** + * 批量更新上下文数据 + * + * @param 泛型类型,表示上下文的数据类型 + * @param flowDataList 数据列表(contextId, T) + */ + void updateFlowData(Map flowDataList); + /** * 批量更新context的内容,不更新status和position * @@ -109,6 +134,15 @@ default void update(List> contexts) { */ List> getByIds(List ids); + /** + * 根据toBatch查找FlowContext + * + * @param 泛型类型,表示上下文的数据类型 + * @param toBatchIds 上下文toBatch + * @return 上下文列表 + */ + List> getByToBatch(List toBatchIds); + /** * 查找和指定一批ID对应的状态为PENDING且SENT了的流程上下文 * @@ -154,6 +188,17 @@ default List> findByStreamId(String metaId, String version) { throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findByStreamId"); } + /** + * 查找流程对应版本正在运行的上下文 + * + * @param metaId metaId 流程metaId标识 + * @param version 流程对应版本 + * @return 对应所有上下文 + */ + default Integer findRunningContextCountByMetaId(String metaId, String version) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findRunningContextCountByMetaId"); + } + /** * 查找流程对应版本正在运行的上下文 * @@ -241,5 +286,43 @@ default boolean isTracesTerminate(List traceIds) { * @param contexts 上下文信息 */ void updateIndex(List> contexts); + + /** + * deleteByContextIds + * + * @param contextIds contextIds + */ + default void deleteByContextIds(List contextIds) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByContextIds"); + } + + /** + * 根据transId获取stream id + * + * @param flowTransId trans id + * @return stream id + */ + default String getStreamIdByTransId(String flowTransId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getStreamIdByTransId"); + } + + /** + * 根据transId获取traceId + * + * @param transId transId + * @return traceId + */ + default List getTraceByTransId(String transId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTraceByTransId"); + } + + /** + * 根据transId删除上下文 + * + * @param transId trans id + */ + default void deleteByTransId(String transId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTransId"); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java new file mode 100644 index 00000000..54cd4563 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java @@ -0,0 +1,25 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.enums; + +/** + * 流程节点的阶段信息 + * + * @author songyongtan + * @since 2024/10/17 + */ +public enum FlowNodeStage { + /** + * 节点调用前 + */ + BEFORE, + + /** + * 节点调用后 + */ + AFTER; +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java index 5a637cee..37f4a06b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java @@ -47,5 +47,10 @@ public enum FlowNodeStatus { /** * 节点处理过程中发生错误 */ - ERROR + ERROR, + + /** + * 可重试 + */ + RETRYABLE; } \ No newline at end of file diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java index b6e23ee6..4ce182b6 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java @@ -39,6 +39,11 @@ public enum FlowTraceStatus { */ ERROR, + /** + * 部分失败 + */ + PARTIAL_ERROR, + /** * 已终止 */ diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java new file mode 100644 index 00000000..fa4f428d --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java @@ -0,0 +1,53 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.stream.callbacks; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import modelengine.fit.waterflow.domain.context.FlowContext; +import modelengine.fit.waterflow.domain.stream.reactive.Subscription; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 数据向下一个节点发送前触发回调的入参 + * + * @author 杨祥宇 + * @since 2024/10/21 + */ +@Getter +@Setter +@AllArgsConstructor +public class PreSendCallbackInfo { + /** + * 命中的context列表 + */ + private Map, List>> matchedContexts; + + /** + * 未命中的context列表 + */ + + private List> unMatchedContexts; + + /** + * 获取命中和未命中的所有context列表 + * + * @return context列表 + */ + public List> getAllContext() { + List> mergeContexts = this.getMatchedContexts().values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + mergeContexts.addAll(this.getUnMatchedContexts()); + return mergeContexts; + } +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java index 8a415281..4748cca2 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java @@ -44,7 +44,7 @@ public synchronized void resume() { } } List> cloned = verified.stream() - .map(context -> context.generate(context.getData(), context.getPosition()) + .map(context -> context.generate(context.getData(), context.getPosition(), context.getCreateAt()) .batchId(context.getBatchId())) .collect(Collectors.toList()); this.publisher.getFlowContextRepo().save(cloned); @@ -52,7 +52,9 @@ public synchronized void resume() { .collect(Collectors.groupingBy(item -> item.getSession().getId(), LinkedHashMap::new, Collectors.toList())) .values() - .forEach(this.publisher::offer); + .forEach(contexts -> { + this.publisher.offer(contexts, __ -> {}); + }); } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java index 669669e9..a8c2a806 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java @@ -11,10 +11,16 @@ import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks; import modelengine.fit.waterflow.domain.enums.FlowNodeType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; +import modelengine.fit.waterflow.domain.stream.operators.Operators; +import modelengine.fit.waterflow.domain.stream.reactive.Subscription; import modelengine.fit.waterflow.domain.utils.UUIDUtil; import modelengine.fitframework.util.CollectionUtils; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -30,12 +36,17 @@ public class ConditionsNode extends Node { * 1->1处理节点 * * @param streamId stream流程ID + * @param processor 对应处理器 * @param repo 上下文持久化repo,默认在内存 * @param messenger 上下文事件发送器,默认在内存 * @param locks 流程锁 */ - public ConditionsNode(String streamId, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { - super(streamId, FlowContext::getData, repo, messenger, locks, () -> initFrom(streamId, repo, messenger, locks)); + public ConditionsNode(String streamId, Operators.Just> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks) { + super(streamId, i -> { + processor.process(i); + return i.getData(); + }, repo, messenger, locks, () -> initFrom(streamId, repo, messenger, locks)); super.id = "condition:" + UUIDUtil.uuid(); } @@ -44,14 +55,15 @@ public ConditionsNode(String streamId, FlowContextRepo repo, FlowContextMessenge * * @param streamId stream流程ID * @param nodeId stream流程节点ID + * @param processor 对应处理器 * @param repo 上下文持久化repo,默认在内存 * @param messenger 上下文事件发送器,默认在内存 * @param locks 流程锁 * @param nodeType 节点类型 */ - public ConditionsNode(String streamId, String nodeId, FlowContextRepo repo, FlowContextMessenger messenger, - FlowLocks locks, FlowNodeType nodeType) { - this(streamId, repo, messenger, locks); + public ConditionsNode(String streamId, String nodeId, Operators.Just> processor, + FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks, FlowNodeType nodeType) { + this(streamId, processor, repo, messenger, locks); this.id = nodeId; this.nodeType = nodeType; } @@ -77,12 +89,21 @@ public ConditionFrom(String streamId, FlowContextRepo repo, FlowContextMessenger } @Override - public void offer(List> contexts) { + public void offer(List> contexts, Consumer> preSendCallback) { + Map, List>> matchedContexts = new LinkedHashMap<>(); this.getSubscriptions().forEach(subscription -> { - List> matched = contexts.stream() - .filter(context -> subscription.getWhether().is(context.getData())) - .collect(Collectors.toList()); - matched.forEach(contexts::remove); + this.getSubscriptions().forEach(w -> { + List> matched = contexts.stream() + .filter(c -> w.getWhether().is(c.getData())) + .peek(c -> c.setNextPositionId(w.getId())) + .collect(Collectors.toList()); + matched.forEach(contexts::remove); + matchedContexts.put(w, matched); + }); + PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, contexts); + preSendCallback.accept(callbackInfo); + }); + matchedContexts.forEach((subscription, matched) -> { // For order-sensitive data, directly synchronously executes the next conditional branch node. if (CollectionUtils.isNotEmpty(matched) && matched.get(0).getSession().preserved()) { subscription.process(matched); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index 79ec8c25..1f2cea71 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -23,6 +23,7 @@ import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.enums.SpecialDisplayNode; import modelengine.fit.waterflow.domain.states.DataStart; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.Processor; import modelengine.fit.waterflow.domain.stream.reactive.Publisher; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -124,7 +126,7 @@ public From(String streamId, String nodeId, FlowContextRepo repo, FlowContextMes */ @Override public Processor conditions(Operators.Whether whether) { - ConditionsNode node = new ConditionsNode<>(this.getStreamId(), repo, messenger, locks); + ConditionsNode node = new ConditionsNode<>(this.getStreamId(), i -> {}, repo, messenger, locks); this.subscribe(node, whether); return node.displayAs(SpecialDisplayNode.CONDITION.name()); } @@ -266,9 +268,9 @@ public String offer(I[] data, FlowSession session) { window.createToken(); return context; }).collect(Collectors.toList()); - List> after = this.startNodeMarkAsHandled(contexts, trace); - after.forEach(this::generateIndex); - this.offer(after); + List> afters = this.startNodeMarkAsHandled(contexts, trace); + afters.forEach(this::generateIndex); + this.offer(afters, __ -> {}); return trace.getId(); } @@ -320,9 +322,10 @@ public String offer(I... data) { * 这个数据可以来源于数据最开始,也可以是接受者处理完的数据 * * @param contexts 数据上下文,里面有要处理的数据,还有其他流处理状态信息 + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ @Override - public void offer(List> contexts) { + public void offer(List> contexts, Consumer> preSendCallback) { if (CollectionUtils.isEmpty(contexts)) { return; } @@ -472,7 +475,7 @@ private List> startNodeMarkAsHandled(List> preList context.setStatus(FlowNodeStatus.ARCHIVED); }); List> afterList = preList.stream().map(pre -> { - FlowContext after = pre.generate(pre.getData(), pre.getPosition()).batchId(toBatchId); + FlowContext after = pre.generate(pre.getData(), pre.getPosition(), pre.getCreateAt()).batchId(toBatchId); trace.getContextPool().add(after.getId()); return after; }).collect(Collectors.toList()); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java index d63ed287..39bd49b7 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java @@ -14,6 +14,7 @@ import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.flow.Flow; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.NodeDisplay; import modelengine.fit.waterflow.domain.stream.reactive.Processor; @@ -24,6 +25,7 @@ import modelengine.fitframework.inspection.Validation; import java.util.List; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -176,8 +178,8 @@ public void subscribe(String eventId, Subscriber subscriber, Operators } @Override - public void offer(List> contexts) { - this.publisher.offer(contexts); + public void offer(List> contexts, Consumer> preSendCallback) { + this.publisher.offer(contexts, preSendCallback); } @Override @@ -215,10 +217,11 @@ public List> getSubscriptions() { * 把该publisher里所有的数据都publish到subscription * * @param batchId 批次id + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ @Override - public void onNext(String batchId) { - this.publisher.offer(this.nextContexts(batchId)); + public void onNext(String batchId, Consumer> preSendCallback) { + this.publisher.offer(this.nextContexts(batchId), preSendCallback); } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java index 679d2ff7..a193f461 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java @@ -12,8 +12,10 @@ import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks; import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import java.util.List; +import java.util.function.Consumer; /** * 平行节点,也是fork的起始节点 @@ -78,9 +80,9 @@ private static From initFrom(String streamId, ParallelMode mode, FlowCont FlowContextMessenger messenger, FlowLocks locks) { return new From(streamId, repo, messenger, locks) { @Override - public void offer(List> contexts) { + public void offer(List> contexts, Consumer> preSendCallback) { contexts.forEach(c -> c.setParallel(c.getId()).setParallelMode(mode.name())); - super.offer(contexts); + super.offer(contexts, preSendCallback); } }; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java index d9096d36..72307188 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java @@ -35,12 +35,24 @@ public Retryable(FlowContextRepo repo, Subscriber to) { this.to = to; } + /** + * 是否需要重试 + * + * @param exception 出现的异常 + * @param contexts 异常上下文信息 + * @return 是否需要重试 + */ + public boolean isNeedRetry(Exception exception, List> contexts) { + return false; + } + /** * 发生错误后,处理contexts上下文 * + * @param exception 异常 * @param contexts 需要错误处理的context列表 */ - public void process(List> contexts) { + public void process(Exception exception, List> contexts) { this.repo.update(contexts); this.repo.updateStatus(contexts, contexts.get(0).getStatus().toString(), contexts.get(0).getPosition()); } @@ -48,10 +60,11 @@ public void process(List> contexts) { /** * 发生错误后,处理contexts上下文,并且发起重试 * + * @param exception 异常 * @param contexts 需要错误处理的context列表 */ - public void retry(List> contexts) { - this.process(contexts); + public void retry(Exception exception, List> contexts) { + this.process(exception, contexts); to.onProcess(null, contexts, false); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index 692ee11f..a531b744 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -10,6 +10,7 @@ import static modelengine.fit.waterflow.ErrorCodes.FLOW_NODE_MAX_TASK; import lombok.Getter; +import lombok.Setter; import modelengine.fit.waterflow.domain.common.Constants; import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowSession; @@ -24,6 +25,7 @@ import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.enums.ProcessType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.callbacks.ToCallback; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.Callback; @@ -42,6 +44,7 @@ import modelengine.fitframework.util.ObjectUtils; import modelengine.fitframework.util.StringUtils; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -52,6 +55,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -189,6 +193,10 @@ public class To extends IdGenerator implements Subscriber { private Operators.ErrorHandler globalErrorHandler = null; + private Operators.Just>> globalBeforeHandler = i -> {}; + + private Operators.Just>> globalAfterHandler = i -> {}; + private boolean isAuto = true; private Thread processT = null; @@ -197,6 +205,8 @@ public class To extends IdGenerator implements Subscriber { private final Map> listeners = new ConcurrentHashMap<>(); + private int order = 0; + private final Map nextSessions = new ConcurrentHashMap<>(); private final Map counter = new ConcurrentHashMap<>(); @@ -311,7 +321,20 @@ private void directProcess(List> preList) { this.afterProcess(preList, afterList); if (CollectionUtils.isNotEmpty(afterList)) { feedback(afterList); - this.onNext(afterList.get(0).getBatchId()); + if (FlowNodeType.END.equals(this.nodeType)) { + this.callback(preList, afterList); + } else { + this.onNext(afterList.get(0).getBatchId(), c -> { + List discardContexts = + c.getUnMatchedContexts().stream().map(IdGenerator::getId).collect(Collectors.toList()); + if (!discardContexts.isEmpty()) { + this.getFlowContextRepo().deleteByContextIds(discardContexts); + } + + List> mergeContexts = c.getAllContext(); + this.callback(preList, mergeContexts); + }); + } } afterList.forEach(context -> this.emit(context.getData(), context.getSession())); } catch (Exception ex) { @@ -545,7 +568,22 @@ public void onProcess(ProcessType type, List> preList, boolean is if (CollectionUtils.isNotEmpty(afterList)) { // 查找一个transaction里的所有数据的都完成了,运行callback给stream外反馈数据 feedback(afterList); - this.onNext(afterList.get(0).getBatchId()); + if (FlowNodeType.END.equals(this.nodeType)) { + this.callback(preList, afterList); + } else { + this.onNext(afterList.get(0).getBatchId(), c -> { + List discardContexts = c.getUnMatchedContexts() + .stream() + .map(context -> context.getId()) + .collect(Collectors.toList()); + if (!discardContexts.isEmpty()) { + this.getFlowContextRepo().deleteByContextIds(discardContexts); + } + + List> mergeContexts = c.getAllContext(); + this.callback(preList, mergeContexts); + }); + } } // 处理好数据后对外送数据,驱动其他flow响应 afterList.forEach(context -> this.emit(context.getData(), context.getSession())); @@ -616,7 +654,7 @@ private List getTraceIds(List> contexts) { } @Override - public void onNext(String batchId) { + public void onNext(String batchId, Consumer> preSendCallback) { } private void feedback(List> contexts) { @@ -668,6 +706,18 @@ public void afterProcess(List> preList, List> afte .updateStatus(preList, preList.get(0).getStatus().toString(), preList.get(0).getPosition()); } + private void callback(List> preContexts, List> after) { + LocalDateTime createAt = preContexts.get(0).getCreateAt(); + LocalDateTime archivedAt = LocalDateTime.now(); + feedback(after.stream().map(context -> { + FlowContext callbackContext = context.generate(context.getData(), context.getPosition(), + createAt); + callbackContext.setArchivedAt(archivedAt); + callbackContext.setNextPositionId(context.getNextPositionId()); + return callbackContext; + }).collect(Collectors.toList())); + } + /** * 更新contexts的batchID,负责更新当前contexts的toBatch和新contexts的batchId * 特殊情况:先有toBatch,再有after的情况 @@ -724,6 +774,21 @@ public void onGlobalError(Operators.ErrorHandler handler) { this.globalErrorHandler = handler; } + @Override + public void onGlobalBefore(Operators.Just>> handler) { + this.globalBeforeHandler = handler; + } + + @Override + public void onGlobalAfter(Operators.Just>> handler) { + this.globalAfterHandler = handler; + } + + @Override + public void setOrder(int order) { + this.order = order; + } + @Override public List getErrorHandlers() { return Stream.of(this.errorHandler, this.globalErrorHandler) @@ -802,7 +867,7 @@ public enum ProcessMode { public List> process(To to, List> contexts) { return to.produce.process(contexts) .stream() - .map(data -> contexts.get(0).generate(data, to.getId())) + .map(data -> contexts.get(0).generate(data, to.getId(), LocalDateTime.now())) .collect(Collectors.toList()); } @@ -834,7 +899,7 @@ public List> process(To to, List clonedContext = context.generate(data, to.getId()); + FlowContext clonedContext = context.generate(data, to.getId(), LocalDateTime.now()); clonedContext.setSession(nextSession); if (context.getSession().isAccumulator()) { if (clonedContext.getIndex() > Constants.NOT_PRESERVED_INDEX) { @@ -983,6 +1048,16 @@ private void submit(ProcessType type, To to, List to.onProcess(ready)) + // .build()); } private void handleProcessConcurrentConflict(To to) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java index a70ca76a..5bb2cd68 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java @@ -12,9 +12,11 @@ import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; import modelengine.fit.waterflow.domain.emitters.EmitterListener; import modelengine.fit.waterflow.domain.enums.ParallelMode; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import java.util.List; +import java.util.function.Consumer; /** * 数据发布者 @@ -150,8 +152,9 @@ default void handle(I data, FlowSession flowSession) { * offer * * @param contexts contexts + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ - void offer(List> contexts); + void offer(List> contexts, Consumer> preSendCallback); /** * offer diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java index 783b485a..6c68e5f9 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java @@ -11,10 +11,12 @@ import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.emitters.Emitter; import modelengine.fit.waterflow.domain.enums.ProcessType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.nodes.Blocks; import modelengine.fit.waterflow.domain.stream.operators.Operators; import java.util.List; +import java.util.function.Consumer; /** * 数据接受者,processor数据在接收者处的处理方式 @@ -104,8 +106,9 @@ public interface Subscriber extends StreamIdentity, Emitter> preSendCallback); /** * afterProcess @@ -158,6 +161,27 @@ public interface Subscriber extends StreamIdentity, Emitter>> handler); + + /** + * 设置获取节点执行后信息的回调 + * + * @param handler handler + */ + void onGlobalAfter(Operators.Just>> handler); + + /** + * 设置节点优先级 + * + * @param order 优先级 + */ + void setOrder(int order); + /** * 获取错误处理器列表 * diff --git a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java index babff8c9..fd0006c1 100644 --- a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java +++ b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java @@ -842,38 +842,38 @@ void testFitStreamNotifyCrossStream() { @Test @DisplayName("流程实例异常处理流转逻辑") void testExceptionHandleForFitStream() { - AtomicReference output = new AtomicReference<>(); - // 单节点错误处理 - Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { - if (data.first < 120) { - throw new IllegalArgumentException(); - } else { - data.second(100); - } - }).error((error, retryable, contexts) -> { - contexts.get(0).getData().first(120); - contexts.forEach(context -> context.setStatus(READY)); - retryable.retry(contexts); - }).close(callback -> output.set(callback.get().getData())).offer(new TestData()); - FlowsTestUtil.waitUntil(() -> output.get() != null, 2000); - assertEquals(120, output.get().first); - assertEquals(100, output.get().second); - - // 整体错误处理 - Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { - if (data.first < 120) { - throw new IllegalArgumentException(); - } else { - data.second(100); - } - }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> { - ObjectUtils.cast(contexts.get(0).getData()).first(120); - contexts.forEach(context -> context.setStatus(READY)); - retryable.retry(contexts); - }).offer(new TestData()); - FlowsTestUtil.waitFortyMillis(Collections::emptyList); - assertEquals(120, output.get().first); - assertEquals(100, output.get().second); + // AtomicReference output = new AtomicReference<>(); + // // 单节点错误处理 + // Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { + // if (data.first < 120) { + // throw new IllegalArgumentException(); + // } else { + // data.second(100); + // } + // }).error((error, retryable, contexts) -> { + // contexts.get(0).getData().first(120); + // contexts.forEach(context -> context.setStatus(READY)); + // retryable.retry(contexts); + // }).close(callback -> output.set(callback.get().getData())).offer(new TestData()); + // FlowsTestUtil.waitUntil(() -> output.get() != null, 2000); + // assertEquals(120, output.get().first); + // assertEquals(100, output.get().second); + // + // // 整体错误处理 + // Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { + // if (data.first < 120) { + // throw new IllegalArgumentException(); + // } else { + // data.second(100); + // } + // }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> { + // ObjectUtils.cast(contexts.get(0).getData()).first(120); + // contexts.forEach(context -> context.setStatus(READY)); + // retryable.retry(contexts); + // }).offer(new TestData()); + // FlowsTestUtil.waitFortyMillis(Collections::emptyList); + // assertEquals(120, output.get().first); + // assertEquals(100, output.get().second); } @Test From bbf3293dd0617e7ccbabe9566d2de33c23fb8db6 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Fri, 24 Oct 2025 11:06:55 +0800 Subject: [PATCH 02/14] =?UTF-8?q?[waterflow]=20=E5=90=88=E5=B9=B6=E5=BC=82?= =?UTF-8?q?=E6=AD=A5=E4=BB=BB=E5=8A=A1=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/stream/nodes/To.java | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index a531b744..1cde9515 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -142,6 +142,9 @@ public class To extends IdGenerator implements Subscriber { @Getter private ProcessMode processMode; + @Setter + private Boolean isAsyncJob = false; + private Map processingSessions = new ConcurrentHashMap<>(); private Operators.Validator validator = (repo, to) -> repo.requestMappingContext(to.streamId, @@ -558,6 +561,11 @@ public void onProcess(ProcessType type, List> preList, boolean is this.afterProcess(preList, new ArrayList<>()); return; } + if (this.isAsyncJob) { + this.beforeAsyncProcess(preList); + this.getProcessMode().process(this, preList); + return; + } List> afterList = this.getProcessMode().process(this, preList); preList.forEach(context -> { context.getWindow() @@ -668,6 +676,13 @@ private void feedback(List> contexts) { } } + private void beforeAsyncProcess(List> pre) { + updateBatch(pre, Collections.emptyList()); + pre.forEach(p -> p.setStatus(FlowNodeStatus.PROCESSING)); + this.getFlowContextRepo().update(pre); + this.getFlowContextRepo().updateStatus(pre, pre.get(0).getStatus().toString(), pre.get(0).getPosition()); + } + private synchronized void updateConcurrency(int newConcurrency) { this.curConcurrency += newConcurrency; } @@ -865,7 +880,13 @@ public enum ProcessMode { PRODUCING { @Override public List> process(To to, List> contexts) { - return to.produce.process(contexts) + // 异步任务,不关心结果 + if (to.isAsyncJob) { + this.processData(to, contexts); + return null; + } + // 同步任务,返回结果 + return processData(to, contexts) .stream() .map(data -> contexts.get(0).generate(data, to.getId(), LocalDateTime.now())) .collect(Collectors.toList()); @@ -877,6 +898,10 @@ protected List> requestAll(To to) { to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.postFilter()); } + + private List processData(To to, List> contexts) { + return to.produce.process(contexts); + } }, /** From dbc222b294a7c8683ba4596b34952732bd5b0e5e Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Thu, 30 Oct 2025 11:00:26 +0800 Subject: [PATCH 03/14] =?UTF-8?q?[waterflow]=20=E9=80=82=E9=85=8D=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=9F=A5=E8=AF=A2=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/domain/stream/nodes/From.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index 1f2cea71..48ca3b5d 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -43,6 +43,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -484,4 +485,54 @@ private List> startNodeMarkAsHandled(List> preList repo.save(preList); return afterList; } + + /** + * findNodeFromFlow + * + * @param from from + * @param nodeMetaId nodeMetaId + * @return Node The Target Node. + */ + public Node findNodeFromFlow(From from, String nodeMetaId) { + return (Node) findNode(this, nodeMetaId).orElse(null); + } + + /** + * findNode + * + * @param from from + * @param nodeMetaId nodeMetaId + * @return Optional To object + */ + private static Optional> findNode(From from, String nodeMetaId) { + ArrayDeque> nodesQueue = new ArrayDeque<>(); + Set visited = new HashSet<>(); + for (Subscription s : from.getSubscriptions()) { + Subscriber to = s.getTo(); + nodesQueue.addLast(to); + visited.add(to.getId()); + if (to.getId().equals(nodeMetaId)) { + return Optional.of((To) to); + } + } + + while (!nodesQueue.isEmpty()) { + Subscriber cur = nodesQueue.removeFirst(); + if (!(cur instanceof Node)) { + continue; + } + Node curNode = (Node) cur; + for (Subscription s : curNode.getSubscriptions()) { + Subscriber to = s.getTo(); + if (!visited.contains(to.getId())) { + nodesQueue.offer(to); + visited.add(to.getId()); + } + if (to.getId().equals(nodeMetaId)) { + return Optional.of((To) to); + } + } + } + return Optional.empty(); + } } From 3d1c0630b2ac2ce4bdf27d6e34f8c46850400363 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Thu, 30 Oct 2025 11:01:00 +0800 Subject: [PATCH 04/14] =?UTF-8?q?[waterflow]=20=E4=BF=AE=E5=A4=8D=E5=90=88?= =?UTF-8?q?=E5=B9=B6=E4=BB=A3=E7=A0=81=E6=9D=A1=E4=BB=B6=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E5=A4=9A=E5=BE=AA=E7=8E=AF=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/stream/nodes/ConditionsNode.java | 20 +++++++++---------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java index a8c2a806..4e8a9749 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java @@ -91,18 +91,16 @@ public ConditionFrom(String streamId, FlowContextRepo repo, FlowContextMessenger @Override public void offer(List> contexts, Consumer> preSendCallback) { Map, List>> matchedContexts = new LinkedHashMap<>(); - this.getSubscriptions().forEach(subscription -> { - this.getSubscriptions().forEach(w -> { - List> matched = contexts.stream() - .filter(c -> w.getWhether().is(c.getData())) - .peek(c -> c.setNextPositionId(w.getId())) - .collect(Collectors.toList()); - matched.forEach(contexts::remove); - matchedContexts.put(w, matched); - }); - PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, contexts); - preSendCallback.accept(callbackInfo); + this.getSubscriptions().forEach(w -> { + List> matched = contexts.stream() + .filter(c -> w.getWhether().is(c.getData())) + .peek(c -> c.setNextPositionId(w.getId())) + .collect(Collectors.toList()); + matched.forEach(contexts::remove); + matchedContexts.put(w, matched); }); + PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, contexts); + preSendCallback.accept(callbackInfo); matchedContexts.forEach((subscription, matched) -> { // For order-sensitive data, directly synchronously executes the next conditional branch node. if (CollectionUtils.isNotEmpty(matched) && matched.get(0).getSession().preserved()) { From 2d72c8f1635e1572d762bf1a207ae309837ffa0a Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Thu, 30 Oct 2025 11:01:53 +0800 Subject: [PATCH 05/14] =?UTF-8?q?[waterflow]=20=E4=BF=AE=E5=A4=8DFlowSessi?= =?UTF-8?q?on=E4=B8=ADid=E5=86=97=E4=BD=99=E7=9A=84=E7=94=9F=E6=88=90?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E5=90=8C=E6=97=B6=E8=AF=A5=E8=A7=84?= =?UTF-8?q?=E5=88=99=E4=BC=9A=E5=AF=BC=E8=87=B4id=E8=B6=85=E8=BF=8732?= =?UTF-8?q?=E5=AD=97=E8=8A=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../modelengine/fit/waterflow/domain/context/FlowSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java index 81539f4f..4f41b1d4 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java @@ -76,7 +76,7 @@ public class FlowSession extends IdGenerator implements StateContext { * @param preserved 是否保序 */ public FlowSession(boolean preserved) { - this(UUID.randomUUID().toString(), preserved); + this.preserved = preserved; } /** From 871dae462387c9f2891d3f27f1df9cd6b5e8c304 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Thu, 30 Oct 2025 15:15:52 +0800 Subject: [PATCH 06/14] =?UTF-8?q?[waterflow]=20=E4=BF=AE=E5=A4=8D=E4=BC=A0?= =?UTF-8?q?=E9=80=92=E6=9C=BA=E5=88=B6=EF=BC=8CafterProcess=E4=B8=AD?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=90=8E=E7=BB=AD=E4=BC=A0=E9=80=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/stream/nodes/To.java | 62 +++++++------------ 1 file changed, 22 insertions(+), 40 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index 1cde9515..f0623034 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -322,24 +322,6 @@ private void directProcess(List> preList) { } List> afterList = this.getProcessMode().process(this, preList); this.afterProcess(preList, afterList); - if (CollectionUtils.isNotEmpty(afterList)) { - feedback(afterList); - if (FlowNodeType.END.equals(this.nodeType)) { - this.callback(preList, afterList); - } else { - this.onNext(afterList.get(0).getBatchId(), c -> { - List discardContexts = - c.getUnMatchedContexts().stream().map(IdGenerator::getId).collect(Collectors.toList()); - if (!discardContexts.isEmpty()) { - this.getFlowContextRepo().deleteByContextIds(discardContexts); - } - - List> mergeContexts = c.getAllContext(); - this.callback(preList, mergeContexts); - }); - } - } - afterList.forEach(context -> this.emit(context.getData(), context.getSession())); } catch (Exception ex) { LOG.error("Node direct process exception. [streamId={}, nodeId={}, positionId={}, traceId={}, causedBy={}]", this.streamId, this.id, preList.get(0).getPosition(), preList.get(0).getTraceId(), @@ -573,28 +555,7 @@ public void onProcess(ProcessType type, List> preList, boolean is () -> this.processingSessions.remove(context.getSession().getId())); }); this.afterProcess(preList, afterList); - if (CollectionUtils.isNotEmpty(afterList)) { - // 查找一个transaction里的所有数据的都完成了,运行callback给stream外反馈数据 - feedback(afterList); - if (FlowNodeType.END.equals(this.nodeType)) { - this.callback(preList, afterList); - } else { - this.onNext(afterList.get(0).getBatchId(), c -> { - List discardContexts = c.getUnMatchedContexts() - .stream() - .map(context -> context.getId()) - .collect(Collectors.toList()); - if (!discardContexts.isEmpty()) { - this.getFlowContextRepo().deleteByContextIds(discardContexts); - } - List> mergeContexts = c.getAllContext(); - this.callback(preList, mergeContexts); - }); - } - } - // 处理好数据后对外送数据,驱动其他flow响应 - afterList.forEach(context -> this.emit(context.getData(), context.getSession())); // keep order preList.forEach(context -> { if (context.getIndex() > Constants.NOT_PRESERVED_INDEX && !context.getWindow().isDone()) { @@ -625,7 +586,7 @@ public void onProcess(ProcessType type, List> preList, boolean is * @param exception 异常对象。 * @param preList 失败时处理的上下文数据。 */ - protected void fail(Exception exception, List> preList) { + public void fail(Exception exception, List> preList) { Retryable retryable = new Retryable<>(this.getFlowContextRepo(), this); Optional.ofNullable(this.errorHandler).ifPresent(handler -> handler.handle(exception, retryable, preList)); Optional.ofNullable(this.globalErrorHandler) @@ -719,6 +680,27 @@ public void afterProcess(List> preList, List> afte this.getFlowContextRepo().update(preList); this.getFlowContextRepo() .updateStatus(preList, preList.get(0).getStatus().toString(), preList.get(0).getPosition()); + + if (CollectionUtils.isEmpty(afterList)) { + return; + } + // 查找一个transaction里的所有数据的都完成了,运行callback给stream外反馈数据 + if (FlowNodeType.END.equals(this.nodeType)) { + this.callback(preList, afterList); + } else { + this.onNext(afterList.get(0).getBatchId(), c -> { + List discardContexts = + c.getUnMatchedContexts().stream().map(context -> context.getId()).collect(Collectors.toList()); + if (!discardContexts.isEmpty()) { + this.getFlowContextRepo().deleteByContextIds(discardContexts); + } + + List> mergeContexts = c.getAllContext(); + this.callback(preList, mergeContexts); + }); + } + // 处理好数据后对外送数据,驱动其他flow响应 + afterList.forEach(context -> this.emit(context.getData(), context.getSession())); } private void callback(List> preContexts, List> after) { From d0644202e3e8762c34c7d7f8df0c4ef78a4ca75a Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Tue, 4 Nov 2025 10:34:30 +0800 Subject: [PATCH 07/14] =?UTF-8?q?[waterflow]=20=E9=80=82=E9=85=8D=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=89=A7=E8=A1=8C=E5=89=8D=E5=90=8E=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E7=9A=84=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/domain/stream/nodes/From.java | 24 ++++++++++++++++--- .../fit/waterflow/domain/stream/nodes/To.java | 8 ++++++- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index 48ca3b5d..ebf94b62 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -42,6 +42,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Optional; import java.util.Set; @@ -350,9 +351,26 @@ public void offer(List> contexts, Consumer } // qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context - qualifiedWhens.forEach(when -> when.cache(contexts.stream() - .filter(context -> when.getWhether().is(context.getData())) - .collect(Collectors.toList()))); + java.util.Map, List>> matchedContexts = new LinkedHashMap<>(); + Set> matchedContextSet = new HashSet<>(); + qualifiedWhens.forEach( + w -> { + List> afterContexts = contexts + .stream() + .filter(c -> w.getWhether().is(c.getData())) + .peek(c -> c.setNextPositionId(w.getId())) + .collect(Collectors.toList()); + matchedContexts.put(w, afterContexts); + matchedContextSet.addAll(afterContexts); + } + ); + List> unMatchedContexts = contexts + .stream() + .filter(c -> !matchedContextSet.contains(c)) + .collect(Collectors.toList()); + PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts); + preSendCallback.accept(callbackInfo); + matchedContexts.forEach(Subscription::cache); } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index f0623034..c047c391 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -78,7 +78,7 @@ public class To extends IdGenerator implements Subscriber { /** * 最大流量,也就是该节点可以处理的最大数据量 */ - private static final int MAX_CONCURRENCY = 10; + private static final int MAX_CONCURRENCY = 16; private static final int SLEEP_MILLS = 10; @@ -539,6 +539,7 @@ public void onProcess(ProcessType type, List> preList, boolean is if (CollectionUtils.isEmpty(preList)) { return; } + this.beforeProcess(preList); if (preList.size() == 1 && preList.get(0).getData() == null) { this.afterProcess(preList, new ArrayList<>()); return; @@ -635,6 +636,7 @@ private void feedback(List> contexts) { }); }); } + this.globalAfterHandler.process(new ToCallback<>(contexts)); } private void beforeAsyncProcess(List> pre) { @@ -644,6 +646,10 @@ private void beforeAsyncProcess(List> pre) { this.getFlowContextRepo().updateStatus(pre, pre.get(0).getStatus().toString(), pre.get(0).getPosition()); } + private void beforeProcess(List> contexts) { + this.globalBeforeHandler.process(new ToCallback<>(contexts)); + } + private synchronized void updateConcurrency(int newConcurrency) { this.curConcurrency += newConcurrency; } From adb5d39f930c8d8f9183b09026c0d13103d60451 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Tue, 4 Nov 2025 11:30:32 +0800 Subject: [PATCH 08/14] =?UTF-8?q?[waterflow]=20=E9=80=82=E9=85=8D=E6=B8=85?= =?UTF-8?q?=E7=90=86context=E7=9A=84=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../repo/flowcontext/FlowContextRepo.java | 9 +++++++++ .../context/repo/flowtrace/FlowTraceRepo.java | 16 ++++++++++++++++ 2 files changed, 25 insertions(+) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java index 1ad5d43f..c929a4db 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java @@ -324,5 +324,14 @@ default List getTraceByTransId(String transId) { default void deleteByTransId(String transId) { throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTransId"); } + + /** + * 根据链路唯一标识列表删除对应的上下文数据。 + * + * @param traceIdList 表示链路唯一标识列表的 {@link List}{@code <}{@link String}{@code >}。 + */ + default void deleteByTraceIdList(List traceIdList) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTraceIdList"); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java index e6456998..b2ae96fe 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java @@ -85,4 +85,20 @@ public interface FlowTraceRepo { * @param status trace状态 {@link FlowTraceStatus} */ void updateStatus(List ids, String status); + + /** + * 根据traceId删除trace + * + * @param traceIds traceId列表 + */ + void deleteByIdList(List traceIds); + + /** + * 查询超期并且已完成的链路唯一标识。 + * + * @param expiredDays 表示超期天数的 {@code int}。 + * @param limit 表示查询限制的 {@code int}。 + * @return 表示链路唯一标识列表的 {@link List}{@code <}{@link String}{@code >}。 + */ + List getExpiredTrace(int expiredDays, int limit); } From 7747f7140624088dac8220d22c77ffa0ba0994e3 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Tue, 4 Nov 2025 15:34:32 +0800 Subject: [PATCH 09/14] =?UTF-8?q?[waterflow]=20=E9=80=82=E9=85=8DTraceOwne?= =?UTF-8?q?r=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../waterflow/domain/context/TraceOwner.java | 90 +++++++++++++++++++ .../repo/flowcontext/FlowContextMemoRepo.java | 52 +++++++++++ .../repo/flowcontext/FlowContextRepo.java | 65 ++++++++++++++ .../context/repo/flowtrace/FlowTraceRepo.java | 8 ++ .../domain/enums/FlowNodeStatus.java | 10 +++ .../waterflow/domain/stream/nodes/Blocks.java | 4 + .../waterflow/domain/stream/nodes/From.java | 1 + .../fit/waterflow/domain/stream/nodes/To.java | 8 ++ 8 files changed, 238 insertions(+) create mode 100644 framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java new file mode 100644 index 00000000..d6036760 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java @@ -0,0 +1,90 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.context; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * 提供trace的归属服务 + * + * @author 夏斐 + * @since 2024/4/29 + */ +public interface TraceOwner { + /** + * own + * + * @param traceId traceId + * @param transId transId + */ + void own(String traceId, String transId); + + /** + * tryOwn + * + * @param traceId traceId + * @param transId transId + * @return boolean + */ + boolean tryOwn(String traceId, String transId); + + /** + * release + * + * @param traceId traceId + */ + void release(String traceId); + + /** + * isOwn + * + * @param traceId traceId + * @return boolean + */ + boolean isOwn(String traceId); + + /** + * trace map中包含任意一个trace列表的值,返回true + * + * @param traceIds trace id列表 + * @return true or false + */ + boolean isAnyOwn(Set traceIds); + + /** + * 获取链路标识列表。 + * + * @return 链路标识列表。 + */ + List getTraces(); + + /** + * 获取链路标识列表。 + * + * @param targetTransId 实例标识。 + * @return 链路标识列表。 + */ + List getTraces(String targetTransId); + + /** + * 移除所有失效的链路标识。 + * + * @param invalidLock 失效的链路标识锁。 + */ + void removeInvalidTrace(Lock invalidLock); + + /** + * 判断trace是否在初始化保护期 + * 针对首次offer trace先加入到内存,但是实际数据库中还未插入时的情况使用 + * + * @param traceId traceId + * @return true-处于保护时间,false-超过保护时间 + */ + boolean isInProtectTime(String traceId); +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java index 49524d43..4f9542e3 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java @@ -8,6 +8,7 @@ import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowTrace; +import modelengine.fit.waterflow.domain.context.TraceOwner; import modelengine.fit.waterflow.domain.enums.FlowNodeStatus; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fitframework.util.ObjectUtils; @@ -17,8 +18,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.locks.Lock; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -34,6 +37,55 @@ public class FlowContextMemoRepo implements FlowContextRepo { private final boolean isReserveTerminal; + private final TraceOwner traceOwner = new TraceOwner() { + @Override + public void own(String traceId, String transId) { + } + + @Override + public boolean tryOwn(String traceId, String transId) { + return true; + } + + @Override + public void release(String traceId) { + } + + @Override + public boolean isOwn(String traceId) { + return true; + } + + @Override + public boolean isAnyOwn(Set traceIds) { + return true; + } + + @Override + public List getTraces() { + return List.of(); + } + + @Override + public List getTraces(String targetTransId) { + return List.of(); + } + + @Override + public void removeInvalidTrace(Lock invalidLock) { + } + + @Override + public boolean isInProtectTime(String traceId) { + return false; + } + }; + + @Override + public TraceOwner getTraceOwner() { + return this.traceOwner; + } + /** * 构造方法 */ diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java index c929a4db..88656321 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java @@ -9,6 +9,7 @@ import modelengine.fit.waterflow.ErrorCodes; import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowTrace; +import modelengine.fit.waterflow.domain.context.TraceOwner; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.exceptions.WaterflowException; @@ -287,6 +288,15 @@ default boolean isTracesTerminate(List traceIds) { */ void updateIndex(List> contexts); + /** + * 获取链路标识管理对象。 + * + * @return 链路标识管理对象。 + */ + default TraceOwner getTraceOwner() { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTraceOwner"); + } + /** * deleteByContextIds * @@ -325,6 +335,61 @@ default void deleteByTransId(String transId) { throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTransId"); } + /** + * 根据链路唯一标识查询所有错误上下文。 + * + * @param 泛型类型,表示上下文的数据类型。 + * @param traceId 链路唯一标识。 + * @return 错误上下文集合。 + */ + default List> findErrorContextsByTraceId(String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findErrorContextsByTraceId"); + } + + /** + * 至少含有一个符合状态的上下文。 + * + * @param statusList 状态列表。 + * @param traceId 链路唯一标识。 + * @return true or false。 + */ + default boolean hasContextWithStatus(List statusList, String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "hasContextWithStatus"); + } + + /** + * 所有上下文状态都符合要求。 + * + * @param statusList 状态列表。 + * @param traceId 链路唯一标识。 + * @return true or false。 + */ + default boolean isAllContextStatus(List statusList, String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "isAllContextStatus"); + } + + /** + * 在某个节点至少含有一个符合状态的上下文。 + * + * @param statusList 状态列表. + * @param traceId 链路唯一标识。 + * @param position 位置. + * @return true or false + */ + default boolean hasContextWithStatusAtPosition(List statusList, String traceId, String position) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "hasContextWithStatusAtPosition"); + } + + /** + * 根据链路唯一标识获取运行实例标识。 + * + * @param traceId 链路唯一标识。 + * @return 运行实例标识。 + */ + default String getTransIdByTrace(String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTransIdByTrace"); + } + /** * 根据链路唯一标识列表删除对应的上下文数据。 * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java index b2ae96fe..05a60f9e 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java @@ -86,6 +86,14 @@ public interface FlowTraceRepo { */ void updateStatus(List ids, String status); + /** + * 查找运行中的链路标识。 + * + * @param applications 应用标识列表。 + * @return 链路标识列表。 + */ + List findRunningTrace(List applications); + /** * 根据traceId删除trace * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java index 37f4a06b..dfe6afc2 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java @@ -53,4 +53,14 @@ public enum FlowNodeStatus { * 可重试 */ RETRYABLE; + + /** + * 判断是否在运行中的状态。 + * + * @return 是否在运行中的状态。 + */ + public boolean isRunningStatus() { + return FlowNodeStatus.NEW.equals(this) || FlowNodeStatus.PENDING.equals(this) + || FlowNodeStatus.READY.equals(this) || FlowNodeStatus.PROCESSING.equals(this); + } } \ No newline at end of file diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java index 11fd262b..152f2058 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java @@ -44,6 +44,10 @@ public abstract static class Block extends IdGenerator { * @param contexts 待处理的contexts列表 */ public void process(List> contexts) { + contexts.forEach(context -> context.getTraceId() + .forEach(traceId -> this.target.getFlowContextRepo() + .getTraceOwner() + .own(traceId, context.getSession().getId()))); this.target.accept(ProcessType.PROCESS, contexts); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index ebf94b62..2e445ca8 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -261,6 +261,7 @@ public String offer(I data, FlowSession session) { */ public String offer(I[] data, FlowSession session) { FlowTrace trace = new FlowTrace(); + this.repo.getTraceOwner().own(trace.getId(), session.getId()); Set traceId = new HashSet<>(); traceId.add(trace.getId()); Window window = session.begin(); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index c047c391..c6d6d51e 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -390,6 +390,7 @@ private void preProcess(ProcessType type) { return; } messenger.send(this.getId(), ready); + this.releaseTrace(ready); } catch (Exception ex) { ready.forEach( // 如果是数据库或者redis挂了,会死循环,线程不退出等待数据库或者redis恢复 r -> LOG.error("Preprocess main loop exception stream-id: {}, node-id: {}, context-id: {}.", @@ -403,6 +404,13 @@ private void preProcess(ProcessType type) { } } + private void releaseTrace(List> contexts) { + contexts.forEach(context -> context.getTraceId().forEach(traceId -> { + LOG.debug("Release trace. [stage=preProcess, traceId={0}, contextId={1}]", traceId, context.getId()); + this.getFlowContextRepo().getTraceOwner().release(traceId); + })); + } + private void process(ProcessType type) { this.getProcessMode().request(type, this); } From b1ad80da11934190c514958143706d5d1ec0af96 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Mon, 17 Nov 2025 11:14:07 +0800 Subject: [PATCH 10/14] =?UTF-8?q?[waterflow]=20=E9=80=82=E9=85=8D=E6=B5=81?= =?UTF-8?q?=E7=A8=8B=E6=B7=B1=E5=BA=A6=E4=BC=98=E5=85=88=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E7=AD=96=E7=95=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/stream/nodes/To.java | 30 ++- .../waterflow/domain/utils/FlowExecutors.java | 21 +- .../domain/utils/PriorityThreadPool.java | 181 ++++++++++++++++++ 3 files changed, 199 insertions(+), 33 deletions(-) create mode 100644 framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index c6d6d51e..cb552019 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -34,6 +34,7 @@ import modelengine.fit.waterflow.domain.utils.FlowExecutors; import modelengine.fit.waterflow.domain.utils.IdGenerator; import modelengine.fit.waterflow.domain.utils.Identity; +import modelengine.fit.waterflow.domain.utils.PriorityThreadPool; import modelengine.fit.waterflow.domain.utils.SleepUtil; import modelengine.fit.waterflow.domain.utils.UUIDUtil; import modelengine.fit.waterflow.exceptions.WaterflowException; @@ -658,7 +659,7 @@ private void beforeProcess(List> contexts) { this.globalBeforeHandler.process(new ToCallback<>(contexts)); } - private synchronized void updateConcurrency(int newConcurrency) { + public synchronized void updateConcurrency(int newConcurrency) { this.curConcurrency += newConcurrency; } @@ -1063,22 +1064,19 @@ private List> filterReady(To to, List void submit(ProcessType type, To to, List> ready, + public void submit(ProcessType type, To to, List> ready, FlowExecutors.ConcurrencyHolder concurrencyHolder) { - FlowExecutors.getThreadPool().execute(Task.builder().runnable(() -> { - to.onProcess(type, ready, true); - concurrencyHolder.release(); - }).buildDisposable()); - - // FlowExecutors.getThreadPool(StringUtils.join(Constant.STREAM_ID_SEPARATOR, to.streamId, to.id), - // MAX_CONCURRENCY) - // .submit(PriorityThreadPool.PriorityTask.builder() - // .priority(PriorityThreadPool.PriorityTask.PriorityInfo.builder() - // .order(to.order) - // .createTime(System.currentTimeMillis()) - // .build()) - // .runner(() -> to.onProcess(ready)) - // .build()); + FlowExecutors.getThreadPool() + .submit(PriorityThreadPool.PriorityTask.builder() + .priority(PriorityThreadPool.PriorityTask.PriorityInfo.builder() + .order(to.order) + .createTime(System.currentTimeMillis()) + .build()) + .runner(() -> { + to.onProcess(type, ready, true); + concurrencyHolder.release(); + }) + .build()); } private void handleProcessConcurrentConflict(To to) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java index a5e419fb..0daae758 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java @@ -15,6 +15,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.atomic.AtomicInteger; @@ -31,26 +32,12 @@ public final class FlowExecutors { private static final Logger LOG = Logger.get(FlowExecutors.class); - private static final ThreadPoolExecutor THREAD_POOL; + private static final PriorityThreadPool THREAD_POOL; private static AtomicInteger currentConcurrency = new AtomicInteger(0); static { - ThreadPoolExecutor newPool = ThreadPoolExecutor.custom() - .threadPoolName("flow-node-thread-pool") - .corePoolSize(CORE_THREAD_COUNT) - .maximumPoolSize(MAX_THREAD_COUNT) - .workQueueCapacity(0) - .keepAliveTime(60L, SECONDS) - .isDaemonThread(true) - .exceptionHandler((thread, throwable) -> { - LOG.error("The node pool run failed, error cause: {}, message: {}.", throwable.getCause(), - throwable.getMessage()); - LOG.debug("The node pool run failed details: ", throwable); - }) - .rejectedExecutionHandler(new AbortPolicy()) - .build(); - THREAD_POOL = newPool; + THREAD_POOL = PriorityThreadPool.build("flow-node-thread-pool", CORE_THREAD_COUNT, MAX_THREAD_COUNT); } /** @@ -58,7 +45,7 @@ public final class FlowExecutors { * * @return 线程池对象 */ - public static ThreadPoolExecutor getThreadPool() { + public static PriorityThreadPool getThreadPool() { return THREAD_POOL; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java new file mode 100644 index 00000000..4d2a5959 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java @@ -0,0 +1,181 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.utils; + +import lombok.Data; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.thread.DefaultThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * 带有任务优先级的线程池。 + * + * @author 夏斐 + * @since 2024/3/6 + */ +public class PriorityThreadPool { + private static final Logger LOG = Logger.get(PriorityThreadPool.class); + + private final ExecutorService executorService; + + private PriorityThreadPool(ExecutorService executorService) { + this.executorService = executorService; + } + + /** + * 构造一个线程池。 + * + * @param key 线程池名称。 + * @param coreSize 核心线程数。 + * @param maxSize 最大线程数。 + * @return 带有优先队列的线程池。 + */ + public static PriorityThreadPool build(String key, int coreSize, int maxSize) { + Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, throwable) -> { + LOG.error("[node-pool-{}]: The node pool run failed, error cause: {}, message: {}.", key, + throwable.getCause(), throwable.getMessage()); + LOG.error("The node pool run failed details: ", throwable); + }; + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + coreSize, maxSize, 60L, SECONDS, new PriorityBlockingQueue<>(), + new DefaultThreadFactory(key, true, uncaughtExceptionHandler), + new ThreadPoolExecutor.AbortPolicy()); + return new PriorityThreadPool(threadPoolExecutor); + } + + /** + * 提交任务。 + * + * @param task 任务。 + */ + public void submit(PriorityTask task) { + this.executorService.execute(task); + } + + /** + * 关闭。 + */ + public void shutdown() { + this.executorService.shutdown(); + } + + /** + * 优先队列提交的任务。 + */ + public abstract static class PriorityTask implements Runnable, Comparable { + /** + * 任务构造器。 + * + * @return 返回一个构造器实例。 + */ + public static Builder builder() { + return new Builder(); + } + + /** + * 得到优先级。 + * + * @return 优先级。 + */ + public abstract PriorityInfo getPriority(); + + @Override + public int compareTo(PriorityTask other) { + PriorityInfo thisPriority = this.getPriority(); + PriorityInfo otherPriority = other.getPriority(); + // 从trace粒度比较,越早的trace执行优先级越高 + int traceTimeCompare = Long.compare(thisPriority.getTraceTime(), otherPriority.getTraceTime()); + if (traceTimeCompare != 0) { + return traceTimeCompare; + } + // 从节点优先级比较,越往后的节点执行优先级越高 + int orderCompare = Long.compare(otherPriority.getOrder(), thisPriority.getOrder()); + if (orderCompare != 0) { + return orderCompare; + } + // 从当前任务的创建实践比较,越早创建执行优先级越高 + return Long.compare(thisPriority.getCreateTime(), otherPriority.getCreateTime()); + } + + @Override + public String toString() { + return getPriority().toString(); + } + + /** + * 优先级信息。 + * + * @author 夏斐 + * @since 2024/3/6 + */ + @lombok.Builder + @Data + public static class PriorityInfo { + private int order; + + private long createTime; + + // 流程时间 + private long traceTime = 0L; + } + + /** + * 构造器 + */ + public static class Builder { + private PriorityInfo priority; + + private Runnable runnable; + + /** + * 设置优先级。 + * + * @param priority 优先级。 + * @return 构造器。 + */ + public Builder priority(PriorityInfo priority) { + this.priority = priority; + return this; + } + + /** + * 设置 runner。 + * + * @param runnable runner。 + * @return 构造器。 + */ + public Builder runner(Runnable runnable) { + this.runnable = runnable; + return this; + } + + /** + * 构造任务。 + * + * @return 任务。 + */ + public PriorityTask build() { + return new PriorityTask() { + @Override + public PriorityInfo getPriority() { + return Builder.this.priority; + } + + @Override + public void run() { + Builder.this.runnable.run(); + } + }; + } + } + } +} From 46adec99c2431b6a24eb984fc9e0aab7f9774d23 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Mon, 17 Nov 2025 11:14:49 +0800 Subject: [PATCH 11/14] =?UTF-8?q?[waterflow]=20=E4=BF=AE=E6=AD=A3=E6=9D=A1?= =?UTF-8?q?=E4=BB=B6=E8=8A=82=E7=82=B9id=E8=B6=85=E9=95=BF=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/stream/nodes/ConditionsNode.java | 1 - 1 file changed, 1 deletion(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java index 4e8a9749..cdd9d41a 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java @@ -47,7 +47,6 @@ public ConditionsNode(String streamId, Operators.Just> processor, processor.process(i); return i.getData(); }, repo, messenger, locks, () -> initFrom(streamId, repo, messenger, locks)); - super.id = "condition:" + UUIDUtil.uuid(); } /** From 3cd1214bef2d9e033e0874b6e3f19adfaa1c561f Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Mon, 17 Nov 2025 11:16:39 +0800 Subject: [PATCH 12/14] =?UTF-8?q?[waterflow]=20=E5=A2=9E=E5=8A=A0=E5=88=A4?= =?UTF-8?q?=E6=96=AD=E8=8A=82=E7=82=B9=E7=BB=88=E6=80=81=E7=9A=84=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/enums/FlowNodeStatus.java | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java index dfe6afc2..a0f9a27b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java @@ -6,6 +6,10 @@ package modelengine.fit.waterflow.domain.enums; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + /** * 流程实例节点状态 * 状态流转顺序:NEW -> PENDING(停留在EVENT边上) -> READY(进入到节点) -> PROCESSING(开始处理) -> ARCHIVED(处理完成) @@ -54,13 +58,24 @@ public enum FlowNodeStatus { */ RETRYABLE; + private static final Set END_STATUS = new HashSet<>(Arrays.asList(ARCHIVED, ERROR, TERMINATE)); + private static final Set RUNNING_STATUS = new HashSet<>(Arrays.asList(NEW, PENDING, READY, PROCESSING)); + /** * 判断是否在运行中的状态。 * * @return 是否在运行中的状态。 */ public boolean isRunningStatus() { - return FlowNodeStatus.NEW.equals(this) || FlowNodeStatus.PENDING.equals(this) - || FlowNodeStatus.READY.equals(this) || FlowNodeStatus.PROCESSING.equals(this); + return RUNNING_STATUS.contains(this); + } + + /** + * 是否是终态。 + * + * @return 是否是终态。 + */ + public boolean isEndStatus() { + return END_STATUS.contains(this); } } \ No newline at end of file From a09869fda1c554a352fa3e622a15bf78eab5706a Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Mon, 17 Nov 2025 11:16:54 +0800 Subject: [PATCH 13/14] =?UTF-8?q?[waterflow]=20From=20=E5=A2=9E=E5=8A=A0?= =?UTF-8?q?=E8=8E=B7=E5=8F=96=E8=AE=A2=E9=98=85=E8=80=85=E7=9A=84=E6=96=B9?= =?UTF-8?q?=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/stream/nodes/From.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index 2e445ca8..b831dd18 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -7,6 +7,7 @@ package modelengine.fit.waterflow.domain.stream.nodes; import static modelengine.fit.waterflow.ErrorCodes.FLOW_ENGINE_INVALID_MANUAL_TASK; +import static modelengine.fit.waterflow.ErrorCodes.FLOW_ENGINE_INVALID_NODE_ID; import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow; import modelengine.fit.waterflow.domain.context.FlatMapWindow; @@ -505,6 +506,17 @@ private List> startNodeMarkAsHandled(List> preList return afterList; } + /** + * 通过订阅节点Id查找订阅节点 + * + * @param nodeId 节点Id + * @return 订阅节点 + */ + public To getSubscriber(String nodeId) { + return ObjectUtils.cast(findNode(this, nodeId) + .orElseThrow(() -> new WaterflowException(FLOW_ENGINE_INVALID_NODE_ID, nodeId))); + } + /** * findNodeFromFlow * From c88a7ce1c80c0922ba73270427b4376bcc3cbd39 Mon Sep 17 00:00:00 2001 From: loveTsong <271667068@qq.com> Date: Wed, 19 Nov 2025 14:54:10 +0800 Subject: [PATCH 14/14] =?UTF-8?q?[waterflow]=20=E6=94=BE=E5=BC=80=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../fit/waterflow/domain/WaterFlowsTest.java | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java index fd0006c1..6493f98d 100644 --- a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java +++ b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java @@ -842,38 +842,38 @@ void testFitStreamNotifyCrossStream() { @Test @DisplayName("流程实例异常处理流转逻辑") void testExceptionHandleForFitStream() { - // AtomicReference output = new AtomicReference<>(); - // // 单节点错误处理 - // Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { - // if (data.first < 120) { - // throw new IllegalArgumentException(); - // } else { - // data.second(100); - // } - // }).error((error, retryable, contexts) -> { - // contexts.get(0).getData().first(120); - // contexts.forEach(context -> context.setStatus(READY)); - // retryable.retry(contexts); - // }).close(callback -> output.set(callback.get().getData())).offer(new TestData()); - // FlowsTestUtil.waitUntil(() -> output.get() != null, 2000); - // assertEquals(120, output.get().first); - // assertEquals(100, output.get().second); - // - // // 整体错误处理 - // Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { - // if (data.first < 120) { - // throw new IllegalArgumentException(); - // } else { - // data.second(100); - // } - // }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> { - // ObjectUtils.cast(contexts.get(0).getData()).first(120); - // contexts.forEach(context -> context.setStatus(READY)); - // retryable.retry(contexts); - // }).offer(new TestData()); - // FlowsTestUtil.waitFortyMillis(Collections::emptyList); - // assertEquals(120, output.get().first); - // assertEquals(100, output.get().second); + AtomicReference output = new AtomicReference<>(); + // 单节点错误处理 + Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { + if (data.first < 120) { + throw new IllegalArgumentException(); + } else { + data.second(100); + } + }).error((error, retryable, contexts) -> { + contexts.get(0).getData().first(120); + contexts.forEach(context -> context.setStatus(READY)); + retryable.retry(error, contexts); + }).close(callback -> output.set(callback.get().getData())).offer(new TestData()); + FlowsTestUtil.waitUntil(() -> output.get() != null, 2000); + assertEquals(120, output.get().first); + assertEquals(100, output.get().second); + + // 整体错误处理 + Flows.create(repo, messenger, locks).just(data -> data.first(100)).just(data -> { + if (data.first < 120) { + throw new IllegalArgumentException(); + } else { + data.second(100); + } + }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> { + ObjectUtils.cast(contexts.get(0).getData()).first(120); + contexts.forEach(context -> context.setStatus(READY)); + retryable.retry(exception, contexts); + }).offer(new TestData()); + FlowsTestUtil.waitFortyMillis(Collections::emptyList); + assertEquals(120, output.get().first); + assertEquals(100, output.get().second); } @Test