diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java index 7d28698f..3e026b82 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/CompleteContext.java @@ -27,7 +27,7 @@ public class CompleteContext extends FlowContext { */ public CompleteContext(FlowContext context, String position) { super(context.getStreamId(), context.getRootId(), null, context.getTraceId(), position, - context.getParallel(), context.getParallelMode(), context.getSession()); + context.getParallel(), context.getParallelMode(), context.getSession(), context.getCreateAt()); this.batchId = context.getBatchId(); this.setIndex(Constants.NOT_PRESERVED_INDEX); } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java index 8184bc5b..c817e4a3 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowContext.java @@ -142,6 +142,13 @@ public class FlowContext extends IdGenerator implements StateContext { @Setter private Integer index; + /** + * 当前context接下来要走到位置:可以是连线或者节点id + */ + @Setter + @Getter + private String nextPositionId; + /** * 创建一个 {@link FlowContext} 实例。 * @@ -154,7 +161,7 @@ public class FlowContext extends IdGenerator implements StateContext { */ public FlowContext(String streamId, String rootId, T data, Set traceId, String position, FlowSession session) { - this(streamId, rootId, data, traceId, position, "", "", session); + this(streamId, rootId, data, traceId, position, "", "", session, LocalDateTime.now()); } /** @@ -167,10 +174,11 @@ public FlowContext(String streamId, String rootId, T data, Set traceId, * @param position 表示上下文当前所处的位置的 {@link String}。 * @param parallel 表示并行节点唯一标识的 {@link String}。 * @param parallelMode 表示并行模式的 {@link String}。 + * @param createAt 表示创建时间的 {@link LocalDateTime}。 * @param session 表示上下文会话信息的 {@link FlowSession}。 */ public FlowContext(String streamId, String rootId, T data, Set traceId, String position, String parallel, - String parallelMode, FlowSession session) { + String parallelMode, FlowSession session, LocalDateTime createAt) { this.streamId = streamId; this.rootId = rootId; this.data = data; @@ -179,7 +187,7 @@ public FlowContext(String streamId, String rootId, T data, Set traceId, this.position = position; this.parallel = parallel; this.parallelMode = parallelMode; - this.createAt = LocalDateTime.now(); + this.createAt = createAt; this.session = session; this.index = this.createIndex(); // 0起始,说明保序 } @@ -266,9 +274,10 @@ public FlowContext toBatch(String toBatchId) { * @param 表示返回值类型的泛型参数。 * @param data 表示处理后数据的 {@link R}。 * @param position 表示处理后所处的节点的 {@link String}。 + * @param createAt 表示创建时间的 {@link LocalDateTime}。 * @return 表示新的上下文的 {@link FlowContext}{@code <}{@link R}{@code >}。 */ - public FlowContext generate(R data, String position) { + public FlowContext generate(R data, String position, LocalDateTime createAt) { FlowContext context = new FlowContext<>(this.streamId, this.rootId, data, @@ -276,7 +285,8 @@ public FlowContext generate(R data, String position) { this.position, this.parallel, this.parallelMode, - this.session); + this.session, + createAt); context.position = position; context.previous = this.id; context.batchId = this.batchId; @@ -293,7 +303,9 @@ public FlowContext generate(R data, String position) { * @return 表示新的上下文的 {@link List}{@code <}{@link FlowContext}{@code <}{@link R}{@code >}{@code >}。 */ public List> generate(List dataList, String position) { - return dataList.stream().map(data -> this.generate(data, position)).collect(Collectors.toList()); + return dataList.stream() + .map(data -> this.generate(data, position, LocalDateTime.now())) + .collect(Collectors.toList()); } /** @@ -312,7 +324,8 @@ public FlowContext convertData(R data, String id) { this.position, this.parallel, this.parallelMode, - this.session); + this.session, + LocalDateTime.now()); context.previous = this.previous; context.status = this.status; context.id = id; diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java index 81539f4f..4f41b1d4 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/FlowSession.java @@ -76,7 +76,7 @@ public class FlowSession extends IdGenerator implements StateContext { * @param preserved 是否保序 */ public FlowSession(boolean preserved) { - this(UUID.randomUUID().toString(), preserved); + this.preserved = preserved; } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java new file mode 100644 index 00000000..d6036760 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/TraceOwner.java @@ -0,0 +1,90 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.context; + +import java.util.List; +import java.util.Set; +import java.util.concurrent.locks.Lock; + +/** + * 提供trace的归属服务 + * + * @author 夏斐 + * @since 2024/4/29 + */ +public interface TraceOwner { + /** + * own + * + * @param traceId traceId + * @param transId transId + */ + void own(String traceId, String transId); + + /** + * tryOwn + * + * @param traceId traceId + * @param transId transId + * @return boolean + */ + boolean tryOwn(String traceId, String transId); + + /** + * release + * + * @param traceId traceId + */ + void release(String traceId); + + /** + * isOwn + * + * @param traceId traceId + * @return boolean + */ + boolean isOwn(String traceId); + + /** + * trace map中包含任意一个trace列表的值,返回true + * + * @param traceIds trace id列表 + * @return true or false + */ + boolean isAnyOwn(Set traceIds); + + /** + * 获取链路标识列表。 + * + * @return 链路标识列表。 + */ + List getTraces(); + + /** + * 获取链路标识列表。 + * + * @param targetTransId 实例标识。 + * @return 链路标识列表。 + */ + List getTraces(String targetTransId); + + /** + * 移除所有失效的链路标识。 + * + * @param invalidLock 失效的链路标识锁。 + */ + void removeInvalidTrace(Lock invalidLock); + + /** + * 判断trace是否在初始化保护期 + * 针对首次offer trace先加入到内存,但是实际数据库中还未插入时的情况使用 + * + * @param traceId traceId + * @return true-处于保护时间,false-超过保护时间 + */ + boolean isInProtectTime(String traceId); +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java index e9fe3a10..cfa660be 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/Window.java @@ -255,7 +255,7 @@ private void fire() { cs.add(completeContext); List contexts = node.getProcessMode().process(node, cs); if (node instanceof Processor) { - ((Processor) node).offer(contexts); + ((Processor) node).offer(contexts, __ -> {}); } completeContext = null; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java index 69cad9b3..1548e5d5 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoMessenger.java @@ -39,7 +39,7 @@ public void send(String nodeId, List> contexts) { * @param 流程实例执行时的入参数据类型,用于泛型推倒 */ @Override - public void sendCallback(List> contexts) { + public void sendCallback(Object callback, List> contexts) { LOG.warn("FlowEngine memo messenger does not support sending events."); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java index 9d428e46..4f9542e3 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMemoRepo.java @@ -8,16 +8,20 @@ import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowTrace; +import modelengine.fit.waterflow.domain.context.TraceOwner; import modelengine.fit.waterflow.domain.enums.FlowNodeStatus; import modelengine.fit.waterflow.domain.stream.operators.Operators; +import modelengine.fitframework.util.ObjectUtils; import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.locks.Lock; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,6 +37,55 @@ public class FlowContextMemoRepo implements FlowContextRepo { private final boolean isReserveTerminal; + private final TraceOwner traceOwner = new TraceOwner() { + @Override + public void own(String traceId, String transId) { + } + + @Override + public boolean tryOwn(String traceId, String transId) { + return true; + } + + @Override + public void release(String traceId) { + } + + @Override + public boolean isOwn(String traceId) { + return true; + } + + @Override + public boolean isAnyOwn(Set traceIds) { + return true; + } + + @Override + public List getTraces() { + return List.of(); + } + + @Override + public List getTraces(String targetTransId) { + return List.of(); + } + + @Override + public void removeInvalidTrace(Lock invalidLock) { + } + + @Override + public boolean isInProtectTime(String traceId) { + return false; + } + }; + + @Override + public TraceOwner getTraceOwner() { + return this.traceOwner; + } + /** * 构造方法 */ @@ -74,6 +127,11 @@ public List> getContextsByPosition(String streamId, String po .filter(context -> context.getStatus().toString().equals(status))); } + @Override + public List> findWithoutFlowDataByTraceId(String traceId) { + throw new IllegalStateException("Not support"); + } + @Override public List> getContextsByTrace(String traceId) { return query(stream -> stream @@ -95,6 +153,21 @@ public synchronized void save(List> contexts) { }); } + @Override + public void updateFlowDataAndToBatch(List> contexts) { + save(contexts); + } + + @Override + public synchronized void updateFlowData(Map flowDataList) { + flowDataList.forEach((contextId, data) -> { + FlowContext flowContext = ObjectUtils.cast(this.contextsMap.get(contextId)); + if (flowContext != null) { + flowContext.setData(data); + } + }); + } + @Override public void updateToSent(List> contexts) { save(contexts); @@ -125,6 +198,14 @@ public List> getByIds(List ids) { return ids.stream().map(i -> (FlowContext) contextsMap.get(i)).collect(Collectors.toList()); } + @Override + public List> getByToBatch(List toBatchIds) { + return query(stream -> stream + .filter(context -> context.getStatus().equals(FlowNodeStatus.PENDING)) + .filter(FlowContext::isSent) + .filter(context -> toBatchIds.contains(context.getToBatch()))); + } + @Override public List> requestMappingContext(String streamId, List subscriptions, Map sessions) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java index 20c22f9a..eec0762b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java @@ -50,10 +50,11 @@ default void send(ProcessType type, Subscriber subscriber, List 流程实例执行时的入参数据类型,用于泛型推倒 */ - void sendCallback(List> contexts); + void sendCallback(Object callback, List> contexts); /** * Directly processes a list of flow contexts through the specified subscriber. diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java index aab19d6c..88656321 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextRepo.java @@ -9,6 +9,7 @@ import modelengine.fit.waterflow.ErrorCodes; import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowTrace; +import modelengine.fit.waterflow.domain.context.TraceOwner; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.exceptions.WaterflowException; @@ -47,6 +48,15 @@ public interface FlowContextRepo { */ List> getContextsByPosition(String streamId, String posId, String batchId, String status); + /** + * 根据traceId查询所有的context对象 + * + * @param 泛型类型,表示上下文的数据类型 + * @param traceId traceId + * @return 上下文列表 + */ + List> findWithoutFlowDataByTraceId(String traceId); + /** * 根据traceId获取上下文 * @@ -64,6 +74,22 @@ public interface FlowContextRepo { */ void save(List> contexts); + /** + * 批量更新context的上下文数据flowData字段 + * + * @param 泛型类型,表示上下文的数据类型 + * @param contexts contexts + */ + void updateFlowDataAndToBatch(List> contexts); + + /** + * 批量更新上下文数据 + * + * @param 泛型类型,表示上下文的数据类型 + * @param flowDataList 数据列表(contextId, T) + */ + void updateFlowData(Map flowDataList); + /** * 批量更新context的内容,不更新status和position * @@ -109,6 +135,15 @@ default void update(List> contexts) { */ List> getByIds(List ids); + /** + * 根据toBatch查找FlowContext + * + * @param 泛型类型,表示上下文的数据类型 + * @param toBatchIds 上下文toBatch + * @return 上下文列表 + */ + List> getByToBatch(List toBatchIds); + /** * 查找和指定一批ID对应的状态为PENDING且SENT了的流程上下文 * @@ -154,6 +189,17 @@ default List> findByStreamId(String metaId, String version) { throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findByStreamId"); } + /** + * 查找流程对应版本正在运行的上下文 + * + * @param metaId metaId 流程metaId标识 + * @param version 流程对应版本 + * @return 对应所有上下文 + */ + default Integer findRunningContextCountByMetaId(String metaId, String version) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findRunningContextCountByMetaId"); + } + /** * 查找流程对应版本正在运行的上下文 * @@ -241,5 +287,116 @@ default boolean isTracesTerminate(List traceIds) { * @param contexts 上下文信息 */ void updateIndex(List> contexts); + + /** + * 获取链路标识管理对象。 + * + * @return 链路标识管理对象。 + */ + default TraceOwner getTraceOwner() { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTraceOwner"); + } + + /** + * deleteByContextIds + * + * @param contextIds contextIds + */ + default void deleteByContextIds(List contextIds) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByContextIds"); + } + + /** + * 根据transId获取stream id + * + * @param flowTransId trans id + * @return stream id + */ + default String getStreamIdByTransId(String flowTransId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getStreamIdByTransId"); + } + + /** + * 根据transId获取traceId + * + * @param transId transId + * @return traceId + */ + default List getTraceByTransId(String transId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTraceByTransId"); + } + + /** + * 根据transId删除上下文 + * + * @param transId trans id + */ + default void deleteByTransId(String transId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTransId"); + } + + /** + * 根据链路唯一标识查询所有错误上下文。 + * + * @param 泛型类型,表示上下文的数据类型。 + * @param traceId 链路唯一标识。 + * @return 错误上下文集合。 + */ + default List> findErrorContextsByTraceId(String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "findErrorContextsByTraceId"); + } + + /** + * 至少含有一个符合状态的上下文。 + * + * @param statusList 状态列表。 + * @param traceId 链路唯一标识。 + * @return true or false。 + */ + default boolean hasContextWithStatus(List statusList, String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "hasContextWithStatus"); + } + + /** + * 所有上下文状态都符合要求。 + * + * @param statusList 状态列表。 + * @param traceId 链路唯一标识。 + * @return true or false。 + */ + default boolean isAllContextStatus(List statusList, String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "isAllContextStatus"); + } + + /** + * 在某个节点至少含有一个符合状态的上下文。 + * + * @param statusList 状态列表. + * @param traceId 链路唯一标识。 + * @param position 位置. + * @return true or false + */ + default boolean hasContextWithStatusAtPosition(List statusList, String traceId, String position) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "hasContextWithStatusAtPosition"); + } + + /** + * 根据链路唯一标识获取运行实例标识。 + * + * @param traceId 链路唯一标识。 + * @return 运行实例标识。 + */ + default String getTransIdByTrace(String traceId) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "getTransIdByTrace"); + } + + /** + * 根据链路唯一标识列表删除对应的上下文数据。 + * + * @param traceIdList 表示链路唯一标识列表的 {@link List}{@code <}{@link String}{@code >}。 + */ + default void deleteByTraceIdList(List traceIdList) { + throw new WaterflowException(ErrorCodes.FLOW_ENGINE_DATABASE_NOT_SUPPORT, "deleteByTraceIdList"); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java index e6456998..05a60f9e 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowtrace/FlowTraceRepo.java @@ -85,4 +85,28 @@ public interface FlowTraceRepo { * @param status trace状态 {@link FlowTraceStatus} */ void updateStatus(List ids, String status); + + /** + * 查找运行中的链路标识。 + * + * @param applications 应用标识列表。 + * @return 链路标识列表。 + */ + List findRunningTrace(List applications); + + /** + * 根据traceId删除trace + * + * @param traceIds traceId列表 + */ + void deleteByIdList(List traceIds); + + /** + * 查询超期并且已完成的链路唯一标识。 + * + * @param expiredDays 表示超期天数的 {@code int}。 + * @param limit 表示查询限制的 {@code int}。 + * @return 表示链路唯一标识列表的 {@link List}{@code <}{@link String}{@code >}。 + */ + List getExpiredTrace(int expiredDays, int limit); } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java new file mode 100644 index 00000000..54cd4563 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStage.java @@ -0,0 +1,25 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.enums; + +/** + * 流程节点的阶段信息 + * + * @author songyongtan + * @since 2024/10/17 + */ +public enum FlowNodeStage { + /** + * 节点调用前 + */ + BEFORE, + + /** + * 节点调用后 + */ + AFTER; +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java index 5a637cee..a0f9a27b 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowNodeStatus.java @@ -6,6 +6,10 @@ package modelengine.fit.waterflow.domain.enums; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + /** * 流程实例节点状态 * 状态流转顺序:NEW -> PENDING(停留在EVENT边上) -> READY(进入到节点) -> PROCESSING(开始处理) -> ARCHIVED(处理完成) @@ -47,5 +51,31 @@ public enum FlowNodeStatus { /** * 节点处理过程中发生错误 */ - ERROR + ERROR, + + /** + * 可重试 + */ + RETRYABLE; + + private static final Set END_STATUS = new HashSet<>(Arrays.asList(ARCHIVED, ERROR, TERMINATE)); + private static final Set RUNNING_STATUS = new HashSet<>(Arrays.asList(NEW, PENDING, READY, PROCESSING)); + + /** + * 判断是否在运行中的状态。 + * + * @return 是否在运行中的状态。 + */ + public boolean isRunningStatus() { + return RUNNING_STATUS.contains(this); + } + + /** + * 是否是终态。 + * + * @return 是否是终态。 + */ + public boolean isEndStatus() { + return END_STATUS.contains(this); + } } \ No newline at end of file diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java index b6e23ee6..4ce182b6 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/enums/FlowTraceStatus.java @@ -39,6 +39,11 @@ public enum FlowTraceStatus { */ ERROR, + /** + * 部分失败 + */ + PARTIAL_ERROR, + /** * 已终止 */ diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java new file mode 100644 index 00000000..fa4f428d --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/callbacks/PreSendCallbackInfo.java @@ -0,0 +1,53 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.stream.callbacks; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import modelengine.fit.waterflow.domain.context.FlowContext; +import modelengine.fit.waterflow.domain.stream.reactive.Subscription; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 数据向下一个节点发送前触发回调的入参 + * + * @author 杨祥宇 + * @since 2024/10/21 + */ +@Getter +@Setter +@AllArgsConstructor +public class PreSendCallbackInfo { + /** + * 命中的context列表 + */ + private Map, List>> matchedContexts; + + /** + * 未命中的context列表 + */ + + private List> unMatchedContexts; + + /** + * 获取命中和未命中的所有context列表 + * + * @return context列表 + */ + public List> getAllContext() { + List> mergeContexts = this.getMatchedContexts().values() + .stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + mergeContexts.addAll(this.getUnMatchedContexts()); + return mergeContexts; + } +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java index 8a415281..4748cca2 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/BlockToken.java @@ -44,7 +44,7 @@ public synchronized void resume() { } } List> cloned = verified.stream() - .map(context -> context.generate(context.getData(), context.getPosition()) + .map(context -> context.generate(context.getData(), context.getPosition(), context.getCreateAt()) .batchId(context.getBatchId())) .collect(Collectors.toList()); this.publisher.getFlowContextRepo().save(cloned); @@ -52,7 +52,9 @@ public synchronized void resume() { .collect(Collectors.groupingBy(item -> item.getSession().getId(), LinkedHashMap::new, Collectors.toList())) .values() - .forEach(this.publisher::offer); + .forEach(contexts -> { + this.publisher.offer(contexts, __ -> {}); + }); } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java index 11fd262b..152f2058 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Blocks.java @@ -44,6 +44,10 @@ public abstract static class Block extends IdGenerator { * @param contexts 待处理的contexts列表 */ public void process(List> contexts) { + contexts.forEach(context -> context.getTraceId() + .forEach(traceId -> this.target.getFlowContextRepo() + .getTraceOwner() + .own(traceId, context.getSession().getId()))); this.target.accept(ProcessType.PROCESS, contexts); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java index 669669e9..cdd9d41a 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java @@ -11,10 +11,16 @@ import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks; import modelengine.fit.waterflow.domain.enums.FlowNodeType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; +import modelengine.fit.waterflow.domain.stream.operators.Operators; +import modelengine.fit.waterflow.domain.stream.reactive.Subscription; import modelengine.fit.waterflow.domain.utils.UUIDUtil; import modelengine.fitframework.util.CollectionUtils; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -30,13 +36,17 @@ public class ConditionsNode extends Node { * 1->1处理节点 * * @param streamId stream流程ID + * @param processor 对应处理器 * @param repo 上下文持久化repo,默认在内存 * @param messenger 上下文事件发送器,默认在内存 * @param locks 流程锁 */ - public ConditionsNode(String streamId, FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks) { - super(streamId, FlowContext::getData, repo, messenger, locks, () -> initFrom(streamId, repo, messenger, locks)); - super.id = "condition:" + UUIDUtil.uuid(); + public ConditionsNode(String streamId, Operators.Just> processor, FlowContextRepo repo, + FlowContextMessenger messenger, FlowLocks locks) { + super(streamId, i -> { + processor.process(i); + return i.getData(); + }, repo, messenger, locks, () -> initFrom(streamId, repo, messenger, locks)); } /** @@ -44,14 +54,15 @@ public ConditionsNode(String streamId, FlowContextRepo repo, FlowContextMessenge * * @param streamId stream流程ID * @param nodeId stream流程节点ID + * @param processor 对应处理器 * @param repo 上下文持久化repo,默认在内存 * @param messenger 上下文事件发送器,默认在内存 * @param locks 流程锁 * @param nodeType 节点类型 */ - public ConditionsNode(String streamId, String nodeId, FlowContextRepo repo, FlowContextMessenger messenger, - FlowLocks locks, FlowNodeType nodeType) { - this(streamId, repo, messenger, locks); + public ConditionsNode(String streamId, String nodeId, Operators.Just> processor, + FlowContextRepo repo, FlowContextMessenger messenger, FlowLocks locks, FlowNodeType nodeType) { + this(streamId, processor, repo, messenger, locks); this.id = nodeId; this.nodeType = nodeType; } @@ -77,12 +88,19 @@ public ConditionFrom(String streamId, FlowContextRepo repo, FlowContextMessenger } @Override - public void offer(List> contexts) { - this.getSubscriptions().forEach(subscription -> { + public void offer(List> contexts, Consumer> preSendCallback) { + Map, List>> matchedContexts = new LinkedHashMap<>(); + this.getSubscriptions().forEach(w -> { List> matched = contexts.stream() - .filter(context -> subscription.getWhether().is(context.getData())) + .filter(c -> w.getWhether().is(c.getData())) + .peek(c -> c.setNextPositionId(w.getId())) .collect(Collectors.toList()); matched.forEach(contexts::remove); + matchedContexts.put(w, matched); + }); + PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, contexts); + preSendCallback.accept(callbackInfo); + matchedContexts.forEach((subscription, matched) -> { // For order-sensitive data, directly synchronously executes the next conditional branch node. if (CollectionUtils.isNotEmpty(matched) && matched.get(0).getSession().preserved()) { subscription.process(matched); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java index 79ec8c25..b831dd18 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/From.java @@ -7,6 +7,7 @@ package modelengine.fit.waterflow.domain.stream.nodes; import static modelengine.fit.waterflow.ErrorCodes.FLOW_ENGINE_INVALID_MANUAL_TASK; +import static modelengine.fit.waterflow.ErrorCodes.FLOW_ENGINE_INVALID_NODE_ID; import modelengine.fit.waterflow.domain.context.FlatMapSourceWindow; import modelengine.fit.waterflow.domain.context.FlatMapWindow; @@ -23,6 +24,7 @@ import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.enums.SpecialDisplayNode; import modelengine.fit.waterflow.domain.states.DataStart; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.Processor; import modelengine.fit.waterflow.domain.stream.reactive.Publisher; @@ -41,9 +43,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.stream.Collectors; /** @@ -124,7 +129,7 @@ public From(String streamId, String nodeId, FlowContextRepo repo, FlowContextMes */ @Override public Processor conditions(Operators.Whether whether) { - ConditionsNode node = new ConditionsNode<>(this.getStreamId(), repo, messenger, locks); + ConditionsNode node = new ConditionsNode<>(this.getStreamId(), i -> {}, repo, messenger, locks); this.subscribe(node, whether); return node.displayAs(SpecialDisplayNode.CONDITION.name()); } @@ -257,6 +262,7 @@ public String offer(I data, FlowSession session) { */ public String offer(I[] data, FlowSession session) { FlowTrace trace = new FlowTrace(); + this.repo.getTraceOwner().own(trace.getId(), session.getId()); Set traceId = new HashSet<>(); traceId.add(trace.getId()); Window window = session.begin(); @@ -266,9 +272,9 @@ public String offer(I[] data, FlowSession session) { window.createToken(); return context; }).collect(Collectors.toList()); - List> after = this.startNodeMarkAsHandled(contexts, trace); - after.forEach(this::generateIndex); - this.offer(after); + List> afters = this.startNodeMarkAsHandled(contexts, trace); + afters.forEach(this::generateIndex); + this.offer(afters, __ -> {}); return trace.getId(); } @@ -320,9 +326,10 @@ public String offer(I... data) { * 这个数据可以来源于数据最开始,也可以是接受者处理完的数据 * * @param contexts 数据上下文,里面有要处理的数据,还有其他流处理状态信息 + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ @Override - public void offer(List> contexts) { + public void offer(List> contexts, Consumer> preSendCallback) { if (CollectionUtils.isEmpty(contexts)) { return; } @@ -346,9 +353,26 @@ public void offer(List> contexts) { } // qualifiedWhens表示的与from节点连接的所有事件,条件节点符合条件的事件在这里筛选,在事件上处理需要下发的context - qualifiedWhens.forEach(when -> when.cache(contexts.stream() - .filter(context -> when.getWhether().is(context.getData())) - .collect(Collectors.toList()))); + java.util.Map, List>> matchedContexts = new LinkedHashMap<>(); + Set> matchedContextSet = new HashSet<>(); + qualifiedWhens.forEach( + w -> { + List> afterContexts = contexts + .stream() + .filter(c -> w.getWhether().is(c.getData())) + .peek(c -> c.setNextPositionId(w.getId())) + .collect(Collectors.toList()); + matchedContexts.put(w, afterContexts); + matchedContextSet.addAll(afterContexts); + } + ); + List> unMatchedContexts = contexts + .stream() + .filter(c -> !matchedContextSet.contains(c)) + .collect(Collectors.toList()); + PreSendCallbackInfo callbackInfo = new PreSendCallbackInfo<>(matchedContexts, unMatchedContexts); + preSendCallback.accept(callbackInfo); + matchedContexts.forEach(Subscription::cache); } /** @@ -472,7 +496,7 @@ private List> startNodeMarkAsHandled(List> preList context.setStatus(FlowNodeStatus.ARCHIVED); }); List> afterList = preList.stream().map(pre -> { - FlowContext after = pre.generate(pre.getData(), pre.getPosition()).batchId(toBatchId); + FlowContext after = pre.generate(pre.getData(), pre.getPosition(), pre.getCreateAt()).batchId(toBatchId); trace.getContextPool().add(after.getId()); return after; }).collect(Collectors.toList()); @@ -481,4 +505,65 @@ private List> startNodeMarkAsHandled(List> preList repo.save(preList); return afterList; } + + /** + * 通过订阅节点Id查找订阅节点 + * + * @param nodeId 节点Id + * @return 订阅节点 + */ + public To getSubscriber(String nodeId) { + return ObjectUtils.cast(findNode(this, nodeId) + .orElseThrow(() -> new WaterflowException(FLOW_ENGINE_INVALID_NODE_ID, nodeId))); + } + + /** + * findNodeFromFlow + * + * @param from from + * @param nodeMetaId nodeMetaId + * @return Node The Target Node. + */ + public Node findNodeFromFlow(From from, String nodeMetaId) { + return (Node) findNode(this, nodeMetaId).orElse(null); + } + + /** + * findNode + * + * @param from from + * @param nodeMetaId nodeMetaId + * @return Optional To object + */ + private static Optional> findNode(From from, String nodeMetaId) { + ArrayDeque> nodesQueue = new ArrayDeque<>(); + Set visited = new HashSet<>(); + for (Subscription s : from.getSubscriptions()) { + Subscriber to = s.getTo(); + nodesQueue.addLast(to); + visited.add(to.getId()); + if (to.getId().equals(nodeMetaId)) { + return Optional.of((To) to); + } + } + + while (!nodesQueue.isEmpty()) { + Subscriber cur = nodesQueue.removeFirst(); + if (!(cur instanceof Node)) { + continue; + } + Node curNode = (Node) cur; + for (Subscription s : curNode.getSubscriptions()) { + Subscriber to = s.getTo(); + if (!visited.contains(to.getId())) { + nodesQueue.offer(to); + visited.add(to.getId()); + } + if (to.getId().equals(nodeMetaId)) { + return Optional.of((To) to); + } + } + } + return Optional.empty(); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java index d63ed287..39bd49b7 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Node.java @@ -14,6 +14,7 @@ import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.flow.Flow; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.NodeDisplay; import modelengine.fit.waterflow.domain.stream.reactive.Processor; @@ -24,6 +25,7 @@ import modelengine.fitframework.inspection.Validation; import java.util.List; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -176,8 +178,8 @@ public void subscribe(String eventId, Subscriber subscriber, Operators } @Override - public void offer(List> contexts) { - this.publisher.offer(contexts); + public void offer(List> contexts, Consumer> preSendCallback) { + this.publisher.offer(contexts, preSendCallback); } @Override @@ -215,10 +217,11 @@ public List> getSubscriptions() { * 把该publisher里所有的数据都publish到subscription * * @param batchId 批次id + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ @Override - public void onNext(String batchId) { - this.publisher.offer(this.nextContexts(batchId)); + public void onNext(String batchId, Consumer> preSendCallback) { + this.publisher.offer(this.nextContexts(batchId), preSendCallback); } /** diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java index 679d2ff7..a193f461 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ParallelNode.java @@ -12,8 +12,10 @@ import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks; import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import java.util.List; +import java.util.function.Consumer; /** * 平行节点,也是fork的起始节点 @@ -78,9 +80,9 @@ private static From initFrom(String streamId, ParallelMode mode, FlowCont FlowContextMessenger messenger, FlowLocks locks) { return new From(streamId, repo, messenger, locks) { @Override - public void offer(List> contexts) { + public void offer(List> contexts, Consumer> preSendCallback) { contexts.forEach(c -> c.setParallel(c.getId()).setParallelMode(mode.name())); - super.offer(contexts); + super.offer(contexts, preSendCallback); } }; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java index d9096d36..72307188 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/Retryable.java @@ -35,12 +35,24 @@ public Retryable(FlowContextRepo repo, Subscriber to) { this.to = to; } + /** + * 是否需要重试 + * + * @param exception 出现的异常 + * @param contexts 异常上下文信息 + * @return 是否需要重试 + */ + public boolean isNeedRetry(Exception exception, List> contexts) { + return false; + } + /** * 发生错误后,处理contexts上下文 * + * @param exception 异常 * @param contexts 需要错误处理的context列表 */ - public void process(List> contexts) { + public void process(Exception exception, List> contexts) { this.repo.update(contexts); this.repo.updateStatus(contexts, contexts.get(0).getStatus().toString(), contexts.get(0).getPosition()); } @@ -48,10 +60,11 @@ public void process(List> contexts) { /** * 发生错误后,处理contexts上下文,并且发起重试 * + * @param exception 异常 * @param contexts 需要错误处理的context列表 */ - public void retry(List> contexts) { - this.process(contexts); + public void retry(Exception exception, List> contexts) { + this.process(exception, contexts); to.onProcess(null, contexts, false); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index 692ee11f..cb552019 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -10,6 +10,7 @@ import static modelengine.fit.waterflow.ErrorCodes.FLOW_NODE_MAX_TASK; import lombok.Getter; +import lombok.Setter; import modelengine.fit.waterflow.domain.common.Constants; import modelengine.fit.waterflow.domain.context.FlowContext; import modelengine.fit.waterflow.domain.context.FlowSession; @@ -24,6 +25,7 @@ import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.enums.ParallelMode; import modelengine.fit.waterflow.domain.enums.ProcessType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.callbacks.ToCallback; import modelengine.fit.waterflow.domain.stream.operators.Operators; import modelengine.fit.waterflow.domain.stream.reactive.Callback; @@ -32,6 +34,7 @@ import modelengine.fit.waterflow.domain.utils.FlowExecutors; import modelengine.fit.waterflow.domain.utils.IdGenerator; import modelengine.fit.waterflow.domain.utils.Identity; +import modelengine.fit.waterflow.domain.utils.PriorityThreadPool; import modelengine.fit.waterflow.domain.utils.SleepUtil; import modelengine.fit.waterflow.domain.utils.UUIDUtil; import modelengine.fit.waterflow.exceptions.WaterflowException; @@ -42,6 +45,7 @@ import modelengine.fitframework.util.ObjectUtils; import modelengine.fitframework.util.StringUtils; +import java.time.LocalDateTime; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; @@ -52,6 +56,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; +import java.util.function.Consumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -74,7 +79,7 @@ public class To extends IdGenerator implements Subscriber { /** * 最大流量,也就是该节点可以处理的最大数据量 */ - private static final int MAX_CONCURRENCY = 10; + private static final int MAX_CONCURRENCY = 16; private static final int SLEEP_MILLS = 10; @@ -138,6 +143,9 @@ public class To extends IdGenerator implements Subscriber { @Getter private ProcessMode processMode; + @Setter + private Boolean isAsyncJob = false; + private Map processingSessions = new ConcurrentHashMap<>(); private Operators.Validator validator = (repo, to) -> repo.requestMappingContext(to.streamId, @@ -189,6 +197,10 @@ public class To extends IdGenerator implements Subscriber { private Operators.ErrorHandler globalErrorHandler = null; + private Operators.Just>> globalBeforeHandler = i -> {}; + + private Operators.Just>> globalAfterHandler = i -> {}; + private boolean isAuto = true; private Thread processT = null; @@ -197,6 +209,8 @@ public class To extends IdGenerator implements Subscriber { private final Map> listeners = new ConcurrentHashMap<>(); + private int order = 0; + private final Map nextSessions = new ConcurrentHashMap<>(); private final Map counter = new ConcurrentHashMap<>(); @@ -309,11 +323,6 @@ private void directProcess(List> preList) { } List> afterList = this.getProcessMode().process(this, preList); this.afterProcess(preList, afterList); - if (CollectionUtils.isNotEmpty(afterList)) { - feedback(afterList); - this.onNext(afterList.get(0).getBatchId()); - } - afterList.forEach(context -> this.emit(context.getData(), context.getSession())); } catch (Exception ex) { LOG.error("Node direct process exception. [streamId={}, nodeId={}, positionId={}, traceId={}, causedBy={}]", this.streamId, this.id, preList.get(0).getPosition(), preList.get(0).getTraceId(), @@ -382,6 +391,7 @@ private void preProcess(ProcessType type) { return; } messenger.send(this.getId(), ready); + this.releaseTrace(ready); } catch (Exception ex) { ready.forEach( // 如果是数据库或者redis挂了,会死循环,线程不退出等待数据库或者redis恢复 r -> LOG.error("Preprocess main loop exception stream-id: {}, node-id: {}, context-id: {}.", @@ -395,6 +405,13 @@ private void preProcess(ProcessType type) { } } + private void releaseTrace(List> contexts) { + contexts.forEach(context -> context.getTraceId().forEach(traceId -> { + LOG.debug("Release trace. [stage=preProcess, traceId={0}, contextId={1}]", traceId, context.getId()); + this.getFlowContextRepo().getTraceOwner().release(traceId); + })); + } + private void process(ProcessType type) { this.getProcessMode().request(type, this); } @@ -531,10 +548,16 @@ public void onProcess(ProcessType type, List> preList, boolean is if (CollectionUtils.isEmpty(preList)) { return; } + this.beforeProcess(preList); if (preList.size() == 1 && preList.get(0).getData() == null) { this.afterProcess(preList, new ArrayList<>()); return; } + if (this.isAsyncJob) { + this.beforeAsyncProcess(preList); + this.getProcessMode().process(this, preList); + return; + } List> afterList = this.getProcessMode().process(this, preList); preList.forEach(context -> { context.getWindow() @@ -542,13 +565,7 @@ public void onProcess(ProcessType type, List> preList, boolean is () -> this.processingSessions.remove(context.getSession().getId())); }); this.afterProcess(preList, afterList); - if (CollectionUtils.isNotEmpty(afterList)) { - // 查找一个transaction里的所有数据的都完成了,运行callback给stream外反馈数据 - feedback(afterList); - this.onNext(afterList.get(0).getBatchId()); - } - // 处理好数据后对外送数据,驱动其他flow响应 - afterList.forEach(context -> this.emit(context.getData(), context.getSession())); + // keep order preList.forEach(context -> { if (context.getIndex() > Constants.NOT_PRESERVED_INDEX && !context.getWindow().isDone()) { @@ -579,7 +596,7 @@ public void onProcess(ProcessType type, List> preList, boolean is * @param exception 异常对象。 * @param preList 失败时处理的上下文数据。 */ - protected void fail(Exception exception, List> preList) { + public void fail(Exception exception, List> preList) { Retryable retryable = new Retryable<>(this.getFlowContextRepo(), this); Optional.ofNullable(this.errorHandler).ifPresent(handler -> handler.handle(exception, retryable, preList)); Optional.ofNullable(this.globalErrorHandler) @@ -616,7 +633,7 @@ private List getTraceIds(List> contexts) { } @Override - public void onNext(String batchId) { + public void onNext(String batchId, Consumer> preSendCallback) { } private void feedback(List> contexts) { @@ -628,9 +645,21 @@ private void feedback(List> contexts) { }); }); } + this.globalAfterHandler.process(new ToCallback<>(contexts)); + } + + private void beforeAsyncProcess(List> pre) { + updateBatch(pre, Collections.emptyList()); + pre.forEach(p -> p.setStatus(FlowNodeStatus.PROCESSING)); + this.getFlowContextRepo().update(pre); + this.getFlowContextRepo().updateStatus(pre, pre.get(0).getStatus().toString(), pre.get(0).getPosition()); + } + + private void beforeProcess(List> contexts) { + this.globalBeforeHandler.process(new ToCallback<>(contexts)); } - private synchronized void updateConcurrency(int newConcurrency) { + public synchronized void updateConcurrency(int newConcurrency) { this.curConcurrency += newConcurrency; } @@ -666,6 +695,39 @@ public void afterProcess(List> preList, List> afte this.getFlowContextRepo().update(preList); this.getFlowContextRepo() .updateStatus(preList, preList.get(0).getStatus().toString(), preList.get(0).getPosition()); + + if (CollectionUtils.isEmpty(afterList)) { + return; + } + // 查找一个transaction里的所有数据的都完成了,运行callback给stream外反馈数据 + if (FlowNodeType.END.equals(this.nodeType)) { + this.callback(preList, afterList); + } else { + this.onNext(afterList.get(0).getBatchId(), c -> { + List discardContexts = + c.getUnMatchedContexts().stream().map(context -> context.getId()).collect(Collectors.toList()); + if (!discardContexts.isEmpty()) { + this.getFlowContextRepo().deleteByContextIds(discardContexts); + } + + List> mergeContexts = c.getAllContext(); + this.callback(preList, mergeContexts); + }); + } + // 处理好数据后对外送数据,驱动其他flow响应 + afterList.forEach(context -> this.emit(context.getData(), context.getSession())); + } + + private void callback(List> preContexts, List> after) { + LocalDateTime createAt = preContexts.get(0).getCreateAt(); + LocalDateTime archivedAt = LocalDateTime.now(); + feedback(after.stream().map(context -> { + FlowContext callbackContext = context.generate(context.getData(), context.getPosition(), + createAt); + callbackContext.setArchivedAt(archivedAt); + callbackContext.setNextPositionId(context.getNextPositionId()); + return callbackContext; + }).collect(Collectors.toList())); } /** @@ -724,6 +786,21 @@ public void onGlobalError(Operators.ErrorHandler handler) { this.globalErrorHandler = handler; } + @Override + public void onGlobalBefore(Operators.Just>> handler) { + this.globalBeforeHandler = handler; + } + + @Override + public void onGlobalAfter(Operators.Just>> handler) { + this.globalAfterHandler = handler; + } + + @Override + public void setOrder(int order) { + this.order = order; + } + @Override public List getErrorHandlers() { return Stream.of(this.errorHandler, this.globalErrorHandler) @@ -800,9 +877,15 @@ public enum ProcessMode { PRODUCING { @Override public List> process(To to, List> contexts) { - return to.produce.process(contexts) + // 异步任务,不关心结果 + if (to.isAsyncJob) { + this.processData(to, contexts); + return null; + } + // 同步任务,返回结果 + return processData(to, contexts) .stream() - .map(data -> contexts.get(0).generate(data, to.getId())) + .map(data -> contexts.get(0).generate(data, to.getId(), LocalDateTime.now())) .collect(Collectors.toList()); } @@ -812,6 +895,10 @@ protected List> requestAll(To to) { to.froms.stream().map(Identity::getId).collect(Collectors.toList()), to.postFilter()); } + + private List processData(To to, List> contexts) { + return to.produce.process(contexts); + } }, /** @@ -834,7 +921,7 @@ public List> process(To to, List clonedContext = context.generate(data, to.getId()); + FlowContext clonedContext = context.generate(data, to.getId(), LocalDateTime.now()); clonedContext.setSession(nextSession); if (context.getSession().isAccumulator()) { if (clonedContext.getIndex() > Constants.NOT_PRESERVED_INDEX) { @@ -977,12 +1064,19 @@ private List> filterReady(To to, List void submit(ProcessType type, To to, List> ready, + public void submit(ProcessType type, To to, List> ready, FlowExecutors.ConcurrencyHolder concurrencyHolder) { - FlowExecutors.getThreadPool().execute(Task.builder().runnable(() -> { - to.onProcess(type, ready, true); - concurrencyHolder.release(); - }).buildDisposable()); + FlowExecutors.getThreadPool() + .submit(PriorityThreadPool.PriorityTask.builder() + .priority(PriorityThreadPool.PriorityTask.PriorityInfo.builder() + .order(to.order) + .createTime(System.currentTimeMillis()) + .build()) + .runner(() -> { + to.onProcess(type, ready, true); + concurrencyHolder.release(); + }) + .build()); } private void handleProcessConcurrentConflict(To to) { diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java index a70ca76a..5bb2cd68 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Publisher.java @@ -12,9 +12,11 @@ import modelengine.fit.waterflow.domain.context.repo.flowsession.FlowSessionRepo; import modelengine.fit.waterflow.domain.emitters.EmitterListener; import modelengine.fit.waterflow.domain.enums.ParallelMode; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.operators.Operators; import java.util.List; +import java.util.function.Consumer; /** * 数据发布者 @@ -150,8 +152,9 @@ default void handle(I data, FlowSession flowSession) { * offer * * @param contexts contexts + * @param preSendCallback 在数据发送到下一个节点前触发当前节点完成回调操作 */ - void offer(List> contexts); + void offer(List> contexts, Consumer> preSendCallback); /** * offer diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java index 783b485a..6c68e5f9 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java @@ -11,10 +11,12 @@ import modelengine.fit.waterflow.domain.context.repo.flowcontext.FlowContextRepo; import modelengine.fit.waterflow.domain.emitters.Emitter; import modelengine.fit.waterflow.domain.enums.ProcessType; +import modelengine.fit.waterflow.domain.stream.callbacks.PreSendCallbackInfo; import modelengine.fit.waterflow.domain.stream.nodes.Blocks; import modelengine.fit.waterflow.domain.stream.operators.Operators; import java.util.List; +import java.util.function.Consumer; /** * 数据接受者,processor数据在接收者处的处理方式 @@ -104,8 +106,9 @@ public interface Subscriber extends StreamIdentity, Emitter> preSendCallback); /** * afterProcess @@ -158,6 +161,27 @@ public interface Subscriber extends StreamIdentity, Emitter>> handler); + + /** + * 设置获取节点执行后信息的回调 + * + * @param handler handler + */ + void onGlobalAfter(Operators.Just>> handler); + + /** + * 设置节点优先级 + * + * @param order 优先级 + */ + void setOrder(int order); + /** * 获取错误处理器列表 * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java index a5e419fb..0daae758 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/FlowExecutors.java @@ -15,6 +15,7 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; import java.util.concurrent.atomic.AtomicInteger; @@ -31,26 +32,12 @@ public final class FlowExecutors { private static final Logger LOG = Logger.get(FlowExecutors.class); - private static final ThreadPoolExecutor THREAD_POOL; + private static final PriorityThreadPool THREAD_POOL; private static AtomicInteger currentConcurrency = new AtomicInteger(0); static { - ThreadPoolExecutor newPool = ThreadPoolExecutor.custom() - .threadPoolName("flow-node-thread-pool") - .corePoolSize(CORE_THREAD_COUNT) - .maximumPoolSize(MAX_THREAD_COUNT) - .workQueueCapacity(0) - .keepAliveTime(60L, SECONDS) - .isDaemonThread(true) - .exceptionHandler((thread, throwable) -> { - LOG.error("The node pool run failed, error cause: {}, message: {}.", throwable.getCause(), - throwable.getMessage()); - LOG.debug("The node pool run failed details: ", throwable); - }) - .rejectedExecutionHandler(new AbortPolicy()) - .build(); - THREAD_POOL = newPool; + THREAD_POOL = PriorityThreadPool.build("flow-node-thread-pool", CORE_THREAD_COUNT, MAX_THREAD_COUNT); } /** @@ -58,7 +45,7 @@ public final class FlowExecutors { * * @return 线程池对象 */ - public static ThreadPoolExecutor getThreadPool() { + public static PriorityThreadPool getThreadPool() { return THREAD_POOL; } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java new file mode 100644 index 00000000..4d2a5959 --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/utils/PriorityThreadPool.java @@ -0,0 +1,181 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.utils; + +import lombok.Data; +import modelengine.fitframework.log.Logger; +import modelengine.fitframework.thread.DefaultThreadFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * 带有任务优先级的线程池。 + * + * @author 夏斐 + * @since 2024/3/6 + */ +public class PriorityThreadPool { + private static final Logger LOG = Logger.get(PriorityThreadPool.class); + + private final ExecutorService executorService; + + private PriorityThreadPool(ExecutorService executorService) { + this.executorService = executorService; + } + + /** + * 构造一个线程池。 + * + * @param key 线程池名称。 + * @param coreSize 核心线程数。 + * @param maxSize 最大线程数。 + * @return 带有优先队列的线程池。 + */ + public static PriorityThreadPool build(String key, int coreSize, int maxSize) { + Thread.UncaughtExceptionHandler uncaughtExceptionHandler = (thread, throwable) -> { + LOG.error("[node-pool-{}]: The node pool run failed, error cause: {}, message: {}.", key, + throwable.getCause(), throwable.getMessage()); + LOG.error("The node pool run failed details: ", throwable); + }; + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( + coreSize, maxSize, 60L, SECONDS, new PriorityBlockingQueue<>(), + new DefaultThreadFactory(key, true, uncaughtExceptionHandler), + new ThreadPoolExecutor.AbortPolicy()); + return new PriorityThreadPool(threadPoolExecutor); + } + + /** + * 提交任务。 + * + * @param task 任务。 + */ + public void submit(PriorityTask task) { + this.executorService.execute(task); + } + + /** + * 关闭。 + */ + public void shutdown() { + this.executorService.shutdown(); + } + + /** + * 优先队列提交的任务。 + */ + public abstract static class PriorityTask implements Runnable, Comparable { + /** + * 任务构造器。 + * + * @return 返回一个构造器实例。 + */ + public static Builder builder() { + return new Builder(); + } + + /** + * 得到优先级。 + * + * @return 优先级。 + */ + public abstract PriorityInfo getPriority(); + + @Override + public int compareTo(PriorityTask other) { + PriorityInfo thisPriority = this.getPriority(); + PriorityInfo otherPriority = other.getPriority(); + // 从trace粒度比较,越早的trace执行优先级越高 + int traceTimeCompare = Long.compare(thisPriority.getTraceTime(), otherPriority.getTraceTime()); + if (traceTimeCompare != 0) { + return traceTimeCompare; + } + // 从节点优先级比较,越往后的节点执行优先级越高 + int orderCompare = Long.compare(otherPriority.getOrder(), thisPriority.getOrder()); + if (orderCompare != 0) { + return orderCompare; + } + // 从当前任务的创建实践比较,越早创建执行优先级越高 + return Long.compare(thisPriority.getCreateTime(), otherPriority.getCreateTime()); + } + + @Override + public String toString() { + return getPriority().toString(); + } + + /** + * 优先级信息。 + * + * @author 夏斐 + * @since 2024/3/6 + */ + @lombok.Builder + @Data + public static class PriorityInfo { + private int order; + + private long createTime; + + // 流程时间 + private long traceTime = 0L; + } + + /** + * 构造器 + */ + public static class Builder { + private PriorityInfo priority; + + private Runnable runnable; + + /** + * 设置优先级。 + * + * @param priority 优先级。 + * @return 构造器。 + */ + public Builder priority(PriorityInfo priority) { + this.priority = priority; + return this; + } + + /** + * 设置 runner。 + * + * @param runnable runner。 + * @return 构造器。 + */ + public Builder runner(Runnable runnable) { + this.runnable = runnable; + return this; + } + + /** + * 构造任务。 + * + * @return 任务。 + */ + public PriorityTask build() { + return new PriorityTask() { + @Override + public PriorityInfo getPriority() { + return Builder.this.priority; + } + + @Override + public void run() { + Builder.this.runnable.run(); + } + }; + } + } + } +} diff --git a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java index babff8c9..6493f98d 100644 --- a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java +++ b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java @@ -853,7 +853,7 @@ void testExceptionHandleForFitStream() { }).error((error, retryable, contexts) -> { contexts.get(0).getData().first(120); contexts.forEach(context -> context.setStatus(READY)); - retryable.retry(contexts); + retryable.retry(error, contexts); }).close(callback -> output.set(callback.get().getData())).offer(new TestData()); FlowsTestUtil.waitUntil(() -> output.get() != null, 2000); assertEquals(120, output.get().first); @@ -869,7 +869,7 @@ void testExceptionHandleForFitStream() { }).close(callback -> output.set(callback.get().getData()), (exception, retryable, contexts) -> { ObjectUtils.cast(contexts.get(0).getData()).first(120); contexts.forEach(context -> context.setStatus(READY)); - retryable.retry(contexts); + retryable.retry(exception, contexts); }).offer(new TestData()); FlowsTestUtil.waitFortyMillis(Collections::emptyList); assertEquals(120, output.get().first);