From 0a50001ca18ecf803287f7263d2f9c967373be6b Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Thu, 12 Jun 2025 11:26:13 +0800 Subject: [PATCH 1/8] [waterflow] support configuring node concurrency --- .../modelengine/fel/engine/activities/AiState.java | 12 ++++++++++++ .../fit/waterflow/domain/states/State.java | 12 ++++++++++++ .../fit/waterflow/domain/stream/nodes/To.java | 14 +++++++++++++- 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java index 1f175ac5..3d252d18 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java @@ -82,6 +82,18 @@ public AiState id(String id) { return this; } + /** + * Sets the maximum concurrency level for this state's processing pipeline. + * + * @param concurrency The maximum number of concurrent operations allowed (must be positive). + * @return The current state instance for method chaining. + * @throws IllegalArgumentException If the concurrency value is zero or negative. + */ + public AiState concurrency(int concurrency) { + this.state.concurrency(concurrency); + return this; + } + /** * 获取当前节点的数据订阅者。 * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java index 6aac4b54..54b18447 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/State.java @@ -104,6 +104,18 @@ public State id(String id) { return ObjectUtils.cast(super.setId(id)); } + /** + * Sets the maximum concurrency level for this state's processing pipeline. + * + * @param concurrency The maximum number of concurrent operations allowed (must be positive). + * @return The current state instance for method chaining. + * @throws IllegalArgumentException If the concurrency value is zero or negative. + */ + public State concurrency(int concurrency) { + ObjectUtils.>cast(this.processor).setMaxConcurrency(concurrency); + return this; + } + /** * 跳转到指定节点,使用节点的唯一标识来标识一个节点。 *

diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index d5308526..353996fd 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -100,6 +100,8 @@ public class To extends IdGenerator implements Subscriber { @Getter private final FlowLocks locks; + private int maxConcurrency = MAX_CONCURRENCY; + // 默认自动流转过滤器是按batchID批次过滤contexts private final Operators.Filter defaultAutoFilter = (contexts) -> { if (CollectionUtils.isEmpty(contexts)) { @@ -606,7 +608,7 @@ private synchronized void updateConcurrency(int newConcurrency) { * @return true-已经满负载, false-未满负载 */ public boolean isOverLimit() { - return this.curConcurrency >= MAX_CONCURRENCY; + return this.curConcurrency >= this.maxConcurrency; } /** @@ -736,6 +738,16 @@ public void emit(O data, FlowSession session) { this.listeners.values().forEach(listener -> listener.handle(data, session)); } + /** + * Sets the maximum concurrency level for this state's processing pipeline. + * + * @param concurrency The maximum number of concurrent operations allowed (must be positive). + * @throws IllegalArgumentException If the concurrency value is zero or negative. + */ + public void setMaxConcurrency(int concurrency) { + this.maxConcurrency = Validation.greaterThan(concurrency, 0, "The concurrency should greater than 0."); + } + private FlowSession getNextSession(FlowSession session) { return FlowSessionRepo.getNextToSession(this.streamId, session); } From aae4738b27ce16df799a4f03eedcaa12d4b95be5 Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Thu, 12 Jun 2025 11:31:19 +0800 Subject: [PATCH 2/8] [fel] subflows inherit ordering flag from parent flow --- .../fel/engine/operators/patterns/AbstractFlowPattern.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java index 7de5954b..bccbccdb 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/operators/patterns/AbstractFlowPattern.java @@ -89,7 +89,7 @@ public FlowEmitter invoke(I data) { public Pattern sync() { return new SimplePattern<>(data -> { FlowSession require = AiFlowSession.require(); - FlowSession session = new FlowSession(true); + FlowSession session = new FlowSession(require.preserved()); Window window = session.begin(); session.copySessionState(require); ConverseLatch conversation = this.getFlow().converse(session).offer(data); @@ -116,7 +116,7 @@ public Flow origin() { */ protected static FlowSession buildFlowSession(FlowEmitter emitter) { FlowSession mainSession = AiFlowSession.require(); - FlowSession flowSession = FlowSession.newRootSession(mainSession, true); + FlowSession flowSession = FlowSession.newRootSession(mainSession, mainSession.preserved()); flowSession.setInnerState(PARENT_SESSION_ID_KEY, mainSession.getId()); ResultAction resultAction = emitter::emit; flowSession.setInnerState(RESULT_ACTION_KEY, resultAction); From 2bda8135e589d09db7d8e5cdcf6206bcc94d87de Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Thu, 12 Jun 2025 15:12:37 +0800 Subject: [PATCH 3/8] [fel] add key examples for backpressure and concurrency --- .../fel/engine/flows/Conversation.java | 3 + .../fel/engine/AiFlowCaseTest.java | 126 ++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java index 48703e5e..25b40a98 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/Conversation.java @@ -47,6 +47,9 @@ public class Conversation { */ public Conversation(AiProcessFlow flow, FlowSession session) { this.flow = Validation.notNull(flow, "Flow cannot be null."); + if (session != null) { + session.begin(); + } this.session = (session == null) ? this.setConverseListener(new FlowSession(true)) : this.setSubConverseListener(session); this.session.begin(); diff --git a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java new file mode 100644 index 00000000..f94529fb --- /dev/null +++ b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java @@ -0,0 +1,126 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.engine; + +import modelengine.fel.core.chat.ChatMessage; +import modelengine.fel.core.chat.ChatOption; +import modelengine.fel.core.chat.support.AiMessage; +import modelengine.fel.core.util.Tip; +import modelengine.fel.engine.flows.AiFlows; +import modelengine.fel.engine.flows.AiProcessFlow; +import modelengine.fel.engine.flows.ConverseLatch; +import modelengine.fel.engine.operators.models.ChatFlowModel; +import modelengine.fel.engine.operators.prompts.Prompts; +import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.utils.SleepUtil; +import modelengine.fitframework.flowable.Choir; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Test cases demonstrating different flow control scenarios in AI processing pipelines. + * Contains nested test classes for specific flow control mechanisms. + * + * @author 宋永坦 + * @since 2025-06-11 + */ +public class AiFlowCaseTest { + /** + * Simulates a backpressure scenario where: + *

    + *
  1. The LLM generates data (50ms per item) faster than the TTS can process it.
  2. + *
  3. TTS processing is constrained to a single thread.
  4. + *
  5. TTS processing speed is artificially slowed (100ms per item).
  6. + *
+ */ + @Nested + class BackPressureCase { + private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { + for (int i = 0; i < 10; i++) { + emitter.emit(new AiMessage(String.valueOf(i))); + SleepUtil.sleep(50); + } + emitter.complete(); + System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis()); + }), ChatOption.custom().model("modelName").stream(true).build()); + + private final AiProcessFlow flow = AiFlows.create() + .prompt(Prompts.human("{{0}}")) + .generate(model) + .map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread + .close(); + + @Test + void run() { + AtomicInteger counter = new AtomicInteger(0); + long startTime = System.currentTimeMillis(); + System.out.printf("time:%s, start.\n", startTime); + ConverseLatch result = flow.converse(new FlowSession(false)).doOnConsume(answer -> { + System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); + counter.incrementAndGet(); + }).offer(Tip.fromArray("hi")); + result.await(); + System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); + Assertions.assertEquals(10, counter.get()); + } + + private String mockTTS(ChatMessage chunk) { + // Simulate time-consuming operation with a delay. + SleepUtil.sleep(100); + return chunk.text(); + } + } + + /** + * Demonstrates concurrent processing with balanced throughput where: + *
    + *
  1. LLM generates data at moderate pace (50ms per item)
  2. + *
  3. Downstream processing runs with 3 concurrent threads
  4. + *
  5. Processing speed is slightly slower than generation (150ms vs 50ms)
  6. + *
+ */ + @Nested + class ConcurrencyCase { + private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { + for (int i = 0; i < 10; i++) { + emitter.emit(new AiMessage(String.valueOf(i))); + SleepUtil.sleep(50); + } + emitter.complete(); + }), ChatOption.custom().model("modelName").stream(true).build()); + + private final AiProcessFlow flow = AiFlows.create() + .prompt(Prompts.human("{{0}}")) + .generate(model) + .map(this::mockDesensitize).concurrency(3) // Set processing to 3 concurrent thread + .close(); + + @Test + void run() { + AtomicInteger counter = new AtomicInteger(0); + long startTime = System.currentTimeMillis(); + System.out.printf("time:%s, start.\n", startTime); + ConverseLatch result = flow.converse(new FlowSession(false)).doOnConsume(answer -> { + System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); + counter.incrementAndGet(); + }).offer(Tip.fromArray("hi")); + result.await(); + System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); + Assertions.assertEquals(10, counter.get()); + } + + private String mockDesensitize(ChatMessage chunk) { + // Simulate slower processing at 1/3 speed of LLM generation. + SleepUtil.sleep(150); + return chunk.text().replace("3", "*"); + } + } +} From c0085b05559d7a1511269db710fe87e77928350c Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Fri, 13 Jun 2025 14:41:12 +0800 Subject: [PATCH 4/8] [waterflow] support branch syntax for 'when' and 'then' clauses --- .../flowcontext/FlowContextMessenger.java | 14 ++++ .../waterflow/domain/states/Conditions.java | 15 +++++ .../waterflow/domain/states/WhenHappen.java | 64 +++++++++++++++++++ .../domain/stream/nodes/ConditionsNode.java | 8 ++- .../fit/waterflow/domain/stream/nodes/To.java | 34 +++++++++- .../domain/stream/operators/Operators.java | 19 ++++++ .../domain/stream/reactive/Subscriber.java | 10 +++ .../domain/stream/reactive/Subscription.java | 8 +++ .../domain/stream/reactive/When.java | 14 ++++ .../fit/waterflow/domain/WaterFlowsTest.java | 40 ++++++++++++ 10 files changed, 224 insertions(+), 2 deletions(-) create mode 100644 framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java index 55700c29..20c22f9a 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/context/repo/flowcontext/FlowContextMessenger.java @@ -54,4 +54,18 @@ default void send(ProcessType type, Subscriber subscriber, List 流程实例执行时的入参数据类型,用于泛型推倒 */ void sendCallback(List> contexts); + + /** + * Directly processes a list of flow contexts through the specified subscriber. + * This method serves as a default implementation for immediate processing without + * any intermediate transformations or routing. + * + * @param The type of input data contained in the flow contexts. + * @param type The type of processing to be performed. + * @param subscriber The subscriber that will handle the processing. + * @param context List of flow contexts to be processed. + */ + default void directProcess(ProcessType type, Subscriber subscriber, List> context) { + subscriber.process(type, context); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java index dcad8bce..5c9613c8 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/Conditions.java @@ -49,6 +49,21 @@ public MatchHappen match(Operators.Whether whether, return new MatchHappen<>(whether, processor, this); } + /** + * Creates a conditional branch that executes when the specified condition is met. + * This establishes a processing path that will only be followed if the predicate + * evaluates to true for the input data. + * + * @param The output type of the branch processor. + * @param whether The condition predicate that determines branch activation. + * @param processor The transformation to apply when the condition is met. + * @return A {@link WhenHappen} instance representing the conditional relationship, + * allowing for further chaining of operations. + */ + public WhenHappen when(Operators.Whether whether, Operators.Then processor) { + return new WhenHappen<>(whether, processor, this); + } + /** * 在满足条件时跳转到指定节点。 * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java new file mode 100644 index 00000000..fc146afb --- /dev/null +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java @@ -0,0 +1,64 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fit.waterflow.domain.states; + +import modelengine.fit.waterflow.domain.enums.SpecialDisplayNode; +import modelengine.fit.waterflow.domain.flow.Flow; +import modelengine.fit.waterflow.domain.stream.operators.Operators; + +import java.util.ArrayList; +import java.util.List; + +/** + * Represent conditional branches (when clause) in a processing flow. + * + * @param The output data type of this node. + * @param The initial data type of the containing flow. + * @param The flow type used for generic type inference. + * + * @author 宋永坦 + * @since 2025-06-12 + */ +public class WhenHappen> { + private final State node; + private final List> branches = new ArrayList<>(); + + WhenHappen(Operators.Whether whether, Operators.Then processor, Conditions conditions) { + this.node = conditions.node; + this.when(whether, processor); + } + + /** + * Creates a conditional branch that executes when the specified condition is met. + * + * @param whether The condition predicate. + * @param processor The transformation logic. + * @return The current WhenHappen instance for method chaining. + */ + public WhenHappen when(Operators.Whether whether, Operators.Then processor) { + State branch = new State<>(this.node.publisher() + .map(input -> processor.process(input.getData()), whether) + .displayAs(SpecialDisplayNode.BRANCH.name()), this.node.getFlow()); + this.branches.add(branch); + return this; + } + + /** + * Provides a default processing logic and terminates the conditional node. + * + * @param processor The handler to process unmatched inputs. + * @return An {@link State} representing the join node of the conditional flow. + * @throws IllegalArgumentException if processor is null + */ + public State others(Operators.Then processor) { + this.when(null, processor); + State joinState = this.branches.get(0).just(any -> {}); + joinState.processor.displayAs(SpecialDisplayNode.OTHERS.name()); + this.branches.stream().skip(1).forEach(branch -> branch.publisher().subscribe(joinState.subscriber())); + return joinState; + } +} diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java index d080cf19..669669e9 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/ConditionsNode.java @@ -12,6 +12,7 @@ import modelengine.fit.waterflow.domain.context.repo.flowlock.FlowLocks; import modelengine.fit.waterflow.domain.enums.FlowNodeType; import modelengine.fit.waterflow.domain.utils.UUIDUtil; +import modelengine.fitframework.util.CollectionUtils; import java.util.List; import java.util.stream.Collectors; @@ -82,7 +83,12 @@ public void offer(List> contexts) { .filter(context -> subscription.getWhether().is(context.getData())) .collect(Collectors.toList()); matched.forEach(contexts::remove); - subscription.cache(matched); + // For order-sensitive data, directly synchronously executes the next conditional branch node. + if (CollectionUtils.isNotEmpty(matched) && matched.get(0).getSession().preserved()) { + subscription.process(matched); + } else { + subscription.cache(matched); + } }); } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index 353996fd..5551b19a 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -140,7 +140,7 @@ public class To extends IdGenerator implements Subscriber { @Getter private ProcessMode processMode; - private Map processingSessions = new ConcurrentHashMap<>(); + private final Map processingSessions = new ConcurrentHashMap<>(); private Operators.Validator validator = (repo, to) -> repo.requestMappingContext(to.streamId, to.froms.stream().map(Identity::getId).collect(Collectors.toList()), @@ -291,6 +291,38 @@ public synchronized void accept(ProcessType type, List> contexts) this.triggerNodeProcessor(type); } + @Override + public void process(ProcessType type, List> contexts) { + Validation.isTrue(ProcessType.PROCESS.equals(type), + "Direct processing requires PROCESS type, but received: " + type); + this.directProcess(contexts); + } + + private void directProcess(List> preList) { + try { + if (CollectionUtils.isEmpty(preList)) { + return; + } + if (preList.size() == 1 && preList.get(0).getData() == null) { + this.afterProcess(preList, new ArrayList<>()); + return; + } + List> afterList = this.getProcessMode().process(this, preList); + this.afterProcess(preList, afterList); + if (CollectionUtils.isNotEmpty(afterList)) { + feedback(afterList); + this.onNext(afterList.get(0).getBatchId()); + } + afterList.forEach(context -> this.emit(context.getData(), context.getSession())); + } catch (Exception ex) { + LOG.error("Node direct process exception. [streamId={}, nodeId={}, positionId={}, traceId={}, causedBy={}]", + this.streamId, this.id, preList.get(0).getPosition(), preList.get(0).getTraceId(), + ex.getClass().getName()); + LOG.debug("Node process exception details: ", ex); + this.fail(ex, preList); + } + } + private synchronized void triggerNodeProcessor(ProcessType type) { if (type == ProcessType.PRE_PROCESS && (preProcessT == null || !preProcessRunning)) { preProcessRunning = true; diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java index 3d1afbe4..4e8d6b7f 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java @@ -292,5 +292,24 @@ public interface Validator { */ List> validate(FlowContextRepo repo, To to); } + + /** + * Represents a conditional node's transformation from raw material to product. + * This functional interface defines the processing operation that converts an + * input of one type to an output of potentially different type. + * + * @param The type of raw material (input) to be processed + * @param The type of product (output) to be produced + */ + @FunctionalInterface + public interface Then { + /** + * Transforms the input raw material into a processed product. + * + * @param input The raw material to be processed. + * @return The transformed product result. + */ + R process(T input); + } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java index 2e1edd2c..783b485a 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscriber.java @@ -32,6 +32,16 @@ public interface Subscriber extends StreamIdentity, Emitter> contexts); + /** + * Processes a batch of flow contexts according to the specified processing type. + * This method handles the core execution logic for the workflow engine, applying + * the appropriate operations to each context in the batch. + * + * @param type The type of processing to perform. + * @param contexts The list of flow contexts to process. + */ + void process(ProcessType type, List> contexts); + /** * 设置节点block * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java index 39da6646..d1dbd050 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/Subscription.java @@ -27,6 +27,14 @@ public interface Subscription extends StreamIdentity { */ void cache(List> contexts); + /** + * Immediately sends data to subscriber for processing the given flow contexts. + * This method executes synchronously and blocks until all contexts have been processed. + * + * @param contexts The list of flow contexts to process and send to subscribers。 + */ + void process(List> contexts); + /** * getWhether * diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java index 4b05d99f..aaf16612 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/reactive/When.java @@ -105,4 +105,18 @@ public void cache(List> contexts) { public String getStreamId() { return this.streamId; } + + @Override + public void process(List> contexts) { + if (CollectionUtils.isEmpty(contexts)) { + return; + } + List> converted = contexts.stream() + .map(context -> context.convertData(context.getData(), context.getId()) + .setPosition(this.to.getId()) + .setStatus(FlowNodeStatus.READY)) + .collect(Collectors.toList()); + repo.updateStatus(converted, FlowNodeStatus.READY.toString(), this.to.getId()); + messenger.directProcess(this.to.isAuto() ? PROCESS : PRE_PROCESS, this.to, converted); + } } diff --git a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java index 1ee27b92..e3c9815c 100644 --- a/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java +++ b/framework/waterflow/java/waterflow-core/src/test/java/modelengine/fit/waterflow/domain/WaterFlowsTest.java @@ -1137,5 +1137,45 @@ void try_a_complicated_map_reduce() throws InterruptedException { assertEquals("150unit", result.get(0)); assertEquals("240unit", result.get(1)); } + + @Test + void shouldExecuteRightBranchWhenUseWhenThenBranchGivenPreserved() { + List result = new ArrayList<>(); + ProcessFlow flow = Flows.create() + .conditions() + .when(i -> i >= 5, i -> i * 10) + .others(i -> i) + .reduce(Integer::sum) + .just(i -> result.add(i)) + .close(); + FlowSession session = new FlowSession(true); + Window window = session.begin(); + flow.offer(new Integer[] {1, 2, 3, 4, 5, 6}, session); + window.complete(); + + FlowsTestUtil.waitUntil(() -> !result.isEmpty(), 1000); + assertEquals(1, result.size()); + assertEquals(120, result.get(0)); + } + + @Test + void shouldExecuteRightBranchWhenUseWhenThenBranchGivenNotPreserved() { + List result = new ArrayList<>(); + ProcessFlow flow = Flows.create() + .conditions() + .when(i -> i >= 5, i -> i * 10) + .others(i -> i) + .reduce(Integer::sum) + .just(i -> result.add(i)) + .close(); + FlowSession session = new FlowSession(false); + Window window = session.begin(); + flow.offer(new Integer[] {1, 2, 3, 4, 5, 6}, session); + window.complete(); + + FlowsTestUtil.waitUntil(() -> !result.isEmpty(), 1000); + assertEquals(1, result.size()); + assertEquals(120, result.get(0)); + } } } From ab700acc1e66668039dac60ad1d82f1b2e50e7b7 Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Fri, 13 Jun 2025 15:46:32 +0800 Subject: [PATCH 5/8] [fel] demonstrate data anonymization with example --- .../fel/engine/activities/AiConditions.java | 17 ++++ .../fel/engine/activities/AiWhenHappen.java | 62 +++++++++++++++ .../fel/engine/AiFlowCaseTest.java | 77 +++++++++++++++++++ 3 files changed, 156 insertions(+) create mode 100644 framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java index 91e96659..def887f4 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiConditions.java @@ -57,6 +57,23 @@ public AiMatchHappen match(Operators.Whether whether, node -> processor.process(new AiState<>(node, this.flow())).state), this.flow()); } + /** + * 指定条件和处理器创建条件分支。 + * + * @param whether 表示匹配条件的 {@link Operators.Whether}{@code <}{@link I}{@code >}。 + * @param processor 表示分支处理器的 {@link Operators.Then}{@code <}{@link O}{@code , }{@link D}{@code , + * }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。 + * @param 表示第一个条件分支指定的返回类型。 + * @return 表示条件分支的 {@link AiMatchHappen}{@code <}{@link O}{@code , }{@link D}{@code , + * }{@link I}{@code , }{@link RF}{@code , }{@link F}{@code >}。 + * @throws IllegalArgumentException 当 {@code processor} 为 {@code null} 时。 + */ + public AiWhenHappen when(Operators.Whether whether, + Operators.Then processor) { + Validation.notNull(processor, "Ai branch processor cannot be null."); + return new AiWhenHappen<>(this.conditions.when(whether, processor), this.flow()); + } + /** * 指定条件和对应的处理器创建条件跳转分支。 * diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java new file mode 100644 index 00000000..04077703 --- /dev/null +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java @@ -0,0 +1,62 @@ +/*--------------------------------------------------------------------------------------------- + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * This file is a part of the ModelEngine Project. + * Licensed under the MIT License. See License.txt in the project root for license information. + *--------------------------------------------------------------------------------------------*/ + +package modelengine.fel.engine.activities; + +import modelengine.fel.engine.flows.AiFlow; +import modelengine.fit.waterflow.domain.flow.Flow; +import modelengine.fit.waterflow.domain.states.WhenHappen; +import modelengine.fit.waterflow.domain.stream.operators.Operators; +import modelengine.fitframework.inspection.Validation; + +/** + * Represents a conditional branch that matches when conditions in an AI processing flow. + * This class handles the branching logic when specific conditions are met in the workflow. + * + * @param The output data type of the current node. + * @param The initial data type of the containing flow. + * @param The input parameter data type. + * @param The internal flow type, extending {@link Flow}{@code }. + * @param The AI flow type, extending {@link AiFlow}{@code }. + * + * @author 宋永坦 + * @since 2025-06-12 + */ +public class AiWhenHappen, F extends AiFlow> { + private final WhenHappen matchHappen; + + private final F flow; + + public AiWhenHappen(WhenHappen matchHappen, F flow) { + this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null."); + this.flow = Validation.notNull(flow, "Flow cannot be null."); + } + + /** + * Creates a conditional branch with the specified predicate and handler. + * + * @param whether The condition predicate that determines branch activation. + * @param processor The transformation handler to execute when condition is met. + * @return A new {@link AiWhenHappen} instance representing the conditional branch. + * @throws IllegalArgumentException if processor is null. + */ + public AiWhenHappen when(Operators.Whether whether, Operators.Then processor) { + Validation.notNull(processor, "Ai branch processor cannot be null."); + return new AiWhenHappen<>(this.matchHappen.when(whether, processor), this.flow); + } + + /** + * Provides a default processing logic and terminates the conditional node. + * + * @param processor The handler to process unmatched inputs. + * @return An {@link AiState} representing the terminal node of the conditional flow. + * @throws IllegalArgumentException if processor is null + */ + public AiState others(Operators.Then processor) { + Validation.notNull(processor, "Ai branch processor cannot be null."); + return new AiState<>(this.matchHappen.others(processor), this.flow); + } +} diff --git a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java index f94529fb..29b76dfc 100644 --- a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java +++ b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java @@ -16,6 +16,7 @@ import modelengine.fel.engine.operators.models.ChatFlowModel; import modelengine.fel.engine.operators.prompts.Prompts; import modelengine.fit.waterflow.domain.context.FlowSession; +import modelengine.fit.waterflow.domain.context.StateContext; import modelengine.fit.waterflow.domain.utils.SleepUtil; import modelengine.fitframework.flowable.Choir; @@ -33,6 +34,82 @@ * @since 2025-06-11 */ public class AiFlowCaseTest { + @Nested + class DesensitizeCase { + private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { + emitter.emit(new AiMessage("")); + for (int i = 0; i < 10; i++) { + emitter.emit(new AiMessage(String.valueOf(i))); + SleepUtil.sleep(100); + } + emitter.emit(new AiMessage("")); + for (int i = 100; i < 110; i++) { + emitter.emit(new AiMessage(String.valueOf(i))); + SleepUtil.sleep(100); + } + emitter.complete(); + }), ChatOption.custom().model("modelName").stream(true).build()); + + private final AiProcessFlow flow = AiFlows.create() + .prompt(Prompts.human("{{0}}")) + .generate(model) + .map(this::classic) + .conditions() + .when(chunk -> chunk.isThinkContent, input -> input) + .others(input -> { + this.log(input); + return input; + }) + .map(this::mockDesensitize) + .close(); + + @Test + void run() { + AtomicInteger counter = new AtomicInteger(0); + long startTime = System.currentTimeMillis(); + System.out.printf("time:%s, start.\n", startTime); + ConverseLatch result = flow.converse(new FlowSession(true)).doOnConsume(answer -> { + System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); + counter.incrementAndGet(); + }).offer(Tip.fromArray("hi")); + result.await(); + System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); + Assertions.assertEquals(22, counter.get()); + } + + private Chunk classic(ChatMessage message, StateContext ctx) { + if (message.text().trim().equals("")) { + ctx.setState("isThinking", true); + return new Chunk(true, message.text()); + } + if (message.text().trim().equals("")) { + ctx.setState("isThinking", false); + return new Chunk(true, message.text()); + } + if (Boolean.TRUE.equals(ctx.getState("isThinking"))) { + return new Chunk(true, message.text()); + } + return new Chunk(false, message.text()); + } + + private String mockDesensitize(Chunk chunk) { + return chunk.content.replace("3", "*"); + } + + private void log(Chunk chunk) { + System.out.println("log content:" + chunk.content); + } + + private static class Chunk { + private final boolean isThinkContent; + private final String content; + + private Chunk(boolean isThinkContent, String content) {this.isThinkContent = isThinkContent; + this.content = content; + } + } + } + /** * Simulates a backpressure scenario where: *
    From 707cbf09f22f1a12e6734d76b4a8778a30a5b19f Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Mon, 16 Jun 2025 21:46:20 +0800 Subject: [PATCH 6/8] [fel] adapter flow case --- .../fel/engine/activities/AiWhenHappen.java | 2 +- .../fel/engine/AiFlowCaseTest.java | 62 ++++++++++++------- .../waterflow/domain/states/WhenHappen.java | 2 +- .../fit/waterflow/domain/stream/nodes/To.java | 5 +- .../domain/stream/operators/Operators.java | 4 +- 5 files changed, 46 insertions(+), 29 deletions(-) diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java index 04077703..82ea3805 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java @@ -53,7 +53,7 @@ public AiWhenHappen when(Operators.Whether whether, Operators * * @param processor The handler to process unmatched inputs. * @return An {@link AiState} representing the terminal node of the conditional flow. - * @throws IllegalArgumentException if processor is null + * @throws IllegalArgumentException if processor is null. */ public AiState others(Operators.Then processor) { Validation.notNull(processor, "Ai branch processor cannot be null."); diff --git a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java index 29b76dfc..d43a7084 100644 --- a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java +++ b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java @@ -34,18 +34,22 @@ * @since 2025-06-11 */ public class AiFlowCaseTest { + private static final int SPEED = 1; @Nested class DesensitizeCase { private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { emitter.emit(new AiMessage("")); - for (int i = 0; i < 10; i++) { + int takeTime = 10 * SPEED; + SleepUtil.sleep(takeTime); + for (int i = 0; i < 48; i++) { emitter.emit(new AiMessage(String.valueOf(i))); - SleepUtil.sleep(100); + SleepUtil.sleep(takeTime); } emitter.emit(new AiMessage("")); - for (int i = 100; i < 110; i++) { + SleepUtil.sleep(takeTime); + for (int i = 100; i < 150; i++) { emitter.emit(new AiMessage(String.valueOf(i))); - SleepUtil.sleep(100); + SleepUtil.sleep(takeTime); } emitter.complete(); }), ChatOption.custom().model("modelName").stream(true).build()); @@ -60,7 +64,8 @@ class DesensitizeCase { this.log(input); return input; }) - .map(this::mockDesensitize) + .map(this::mockDesensitize1) + .map(this::mockDesensitize2) .close(); @Test @@ -74,7 +79,7 @@ void run() { }).offer(Tip.fromArray("hi")); result.await(); System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); - Assertions.assertEquals(22, counter.get()); + Assertions.assertEquals(100, counter.get()); } private Chunk classic(ChatMessage message, StateContext ctx) { @@ -92,10 +97,16 @@ private Chunk classic(ChatMessage message, StateContext ctx) { return new Chunk(false, message.text()); } - private String mockDesensitize(Chunk chunk) { + private String mockDesensitize1(Chunk chunk) { + SleepUtil.sleep(10 * SPEED); return chunk.content.replace("3", "*"); } + private String mockDesensitize2(String chunk) { + SleepUtil.sleep(10 * SPEED); + return chunk.replace("4", "*"); + } + private void log(Chunk chunk) { System.out.println("log content:" + chunk.content); } @@ -113,17 +124,17 @@ private static class Chunk { /** * Simulates a backpressure scenario where: *
      - *
    1. The LLM generates data (50ms per item) faster than the TTS can process it.
    2. + *
    3. The LLM generates data faster than the TTS can process it.
    4. *
    5. TTS processing is constrained to a single thread.
    6. - *
    7. TTS processing speed is artificially slowed (100ms per item).
    8. + *
    9. TTS processing speed is artificially slowed.
    10. *
    */ @Nested class BackPressureCase { private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { emitter.emit(new AiMessage(String.valueOf(i))); - SleepUtil.sleep(50); + SleepUtil.sleep(5 * SPEED); } emitter.complete(); System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis()); @@ -132,6 +143,7 @@ class BackPressureCase { private final AiProcessFlow flow = AiFlows.create() .prompt(Prompts.human("{{0}}")) .generate(model) + .map(this::mockDesensitize).concurrency(1) // Limit processing to 1 concurrent thread .map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread .close(); @@ -146,30 +158,36 @@ void run() { }).offer(Tip.fromArray("hi")); result.await(); System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); - Assertions.assertEquals(10, counter.get()); + Assertions.assertEquals(100, counter.get()); + } + + private String mockDesensitize(ChatMessage chunk) { + // Simulate time-consuming operation with a delay. + SleepUtil.sleep(10 * SPEED); + return chunk.text().replace("3", "*"); } - private String mockTTS(ChatMessage chunk) { + private String mockTTS(String chunk) { // Simulate time-consuming operation with a delay. - SleepUtil.sleep(100); - return chunk.text(); + SleepUtil.sleep(10 * SPEED); + return chunk; } } /** * Demonstrates concurrent processing with balanced throughput where: *
      - *
    1. LLM generates data at moderate pace (50ms per item)
    2. - *
    3. Downstream processing runs with 3 concurrent threads
    4. - *
    5. Processing speed is slightly slower than generation (150ms vs 50ms)
    6. + *
    7. LLM generates data at moderate pace.
    8. + *
    9. Downstream processing runs with 3 concurrent threads.
    10. + *
    11. Processing speed is slightly slower than generation (3 : 1).
    12. *
    */ @Nested class ConcurrencyCase { private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 100; i++) { emitter.emit(new AiMessage(String.valueOf(i))); - SleepUtil.sleep(50); + SleepUtil.sleep(10 * SPEED); } emitter.complete(); }), ChatOption.custom().model("modelName").stream(true).build()); @@ -191,12 +209,12 @@ void run() { }).offer(Tip.fromArray("hi")); result.await(); System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); - Assertions.assertEquals(10, counter.get()); + Assertions.assertEquals(100, counter.get()); } private String mockDesensitize(ChatMessage chunk) { // Simulate slower processing at 1/3 speed of LLM generation. - SleepUtil.sleep(150); + SleepUtil.sleep(30 * SPEED); return chunk.text().replace("3", "*"); } } diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java index fc146afb..e7650746 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/states/WhenHappen.java @@ -52,7 +52,7 @@ public WhenHappen when(Operators.Whether whether, Operators.Then< * * @param processor The handler to process unmatched inputs. * @return An {@link State} representing the join node of the conditional flow. - * @throws IllegalArgumentException if processor is null + * @throws IllegalArgumentException if processor is null. */ public State others(Operators.Then processor) { this.when(null, processor); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index 5551b19a..ca9bc14d 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -100,8 +100,6 @@ public class To extends IdGenerator implements Subscriber { @Getter private final FlowLocks locks; - private int maxConcurrency = MAX_CONCURRENCY; - // 默认自动流转过滤器是按batchID批次过滤contexts private final Operators.Filter defaultAutoFilter = (contexts) -> { if (CollectionUtils.isEmpty(contexts)) { @@ -140,7 +138,7 @@ public class To extends IdGenerator implements Subscriber { @Getter private ProcessMode processMode; - private final Map processingSessions = new ConcurrentHashMap<>(); + private Map processingSessions = new ConcurrentHashMap<>(); private Operators.Validator validator = (repo, to) -> repo.requestMappingContext(to.streamId, to.froms.stream().map(Identity::getId).collect(Collectors.toList()), @@ -162,6 +160,7 @@ public class To extends IdGenerator implements Subscriber { */ private Operators.Produce, O> produce; + private volatile int maxConcurrency = MAX_CONCURRENCY; /** * 当前并发度,已经提交的批次 */ diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java index 4e8d6b7f..b8759c2c 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/operators/Operators.java @@ -298,8 +298,8 @@ public interface Validator { * This functional interface defines the processing operation that converts an * input of one type to an output of potentially different type. * - * @param The type of raw material (input) to be processed - * @param The type of product (output) to be produced + * @param The type of raw material (input) to be processed. + * @param The type of product (output) to be produced. */ @FunctionalInterface public interface Then { From ed78b5a49fa04d93a2ae6f266b81ac6e9aa2ab26 Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Sat, 21 Jun 2025 14:39:01 +0800 Subject: [PATCH 7/8] [fel] clean code --- .../modelengine/fel/engine/activities/AiWhenHappen.java | 9 +++++++++ .../test/java/modelengine/fel/engine/AiFlowCaseTest.java | 5 +++++ .../fit/waterflow/domain/stream/nodes/To.java | 1 + 3 files changed, 15 insertions(+) diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java index 82ea3805..88a5d836 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java @@ -30,6 +30,15 @@ public class AiWhenHappen, F extends AiFlow> private final F flow; + /** + * Creates a new AI flow matching generator that handles conditional branching. + * This constructor initializes a stateful processor for when/then style pattern matching + * within AI workflows. + * + * @param matchHappen The core matching generator that evaluates conditions + * @param flow The parent AI flow + * @throws NullPointerException If either parameter is null + */ public AiWhenHappen(WhenHappen matchHappen, F flow) { this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null."); this.flow = Validation.notNull(flow, "Flow cannot be null."); diff --git a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java index d43a7084..1199c811 100644 --- a/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java +++ b/framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java @@ -21,6 +21,7 @@ import modelengine.fitframework.flowable.Choir; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -35,6 +36,7 @@ */ public class AiFlowCaseTest { private static final int SPEED = 1; + @Nested class DesensitizeCase { private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { @@ -69,6 +71,7 @@ class DesensitizeCase { .close(); @Test + @DisplayName("DesensitizeCase") void run() { AtomicInteger counter = new AtomicInteger(0); long startTime = System.currentTimeMillis(); @@ -148,6 +151,7 @@ class BackPressureCase { .close(); @Test + @DisplayName("BackPressureCase") void run() { AtomicInteger counter = new AtomicInteger(0); long startTime = System.currentTimeMillis(); @@ -199,6 +203,7 @@ class ConcurrencyCase { .close(); @Test + @DisplayName("ConcurrencyCase") void run() { AtomicInteger counter = new AtomicInteger(0); long startTime = System.currentTimeMillis(); diff --git a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java index ca9bc14d..692ee11f 100644 --- a/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java +++ b/framework/waterflow/java/waterflow-core/src/main/java/modelengine/fit/waterflow/domain/stream/nodes/To.java @@ -161,6 +161,7 @@ public class To extends IdGenerator implements Subscriber { private Operators.Produce, O> produce; private volatile int maxConcurrency = MAX_CONCURRENCY; + /** * 当前并发度,已经提交的批次 */ From aa9aa3ce8caf42703150664f5bea60fddc2ff873 Mon Sep 17 00:00:00 2001 From: songyongtan <271667068@qq.com> Date: Sat, 21 Jun 2025 18:09:35 +0800 Subject: [PATCH 8/8] [fel] clean code --- .../modelengine/fel/engine/activities/AiWhenHappen.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java index 88a5d836..a437894c 100644 --- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java +++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java @@ -35,9 +35,9 @@ public class AiWhenHappen, F extends AiFlow> * This constructor initializes a stateful processor for when/then style pattern matching * within AI workflows. * - * @param matchHappen The core matching generator that evaluates conditions - * @param flow The parent AI flow - * @throws NullPointerException If either parameter is null + * @param matchHappen The core matching generator that evaluates conditions. + * @param flow The parent AI flow. + * @throws NullPointerException If either parameter is null. */ public AiWhenHappen(WhenHappen matchHappen, F flow) { this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null.");