-
Notifications
You must be signed in to change notification settings - Fork 329
[fel] add key scenario cases #165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
CodeCasterX
merged 8 commits into
ModelEngine-Group:3.5.x
from
loveTsong:fel-improvement-key-examples
Jun 23, 2025
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
0a50001
[waterflow] support configuring node concurrency
loveTsong aae4738
[fel] subflows inherit ordering flag from parent flow
loveTsong 2bda813
[fel] add key examples for backpressure and concurrency
loveTsong c0085b0
[waterflow] support branch syntax for 'when' and 'then' clauses
loveTsong ab700ac
[fel] demonstrate data anonymization with example
loveTsong 707cbf0
[fel] adapter flow case
loveTsong ed78b5a
[fel] clean code
loveTsong aa9aa3c
[fel] clean code
loveTsong File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
71 changes: 71 additions & 0 deletions
71
...ework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiWhenHappen.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| /*--------------------------------------------------------------------------------------------- | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * This file is a part of the ModelEngine Project. | ||
| * Licensed under the MIT License. See License.txt in the project root for license information. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package modelengine.fel.engine.activities; | ||
|
|
||
| import modelengine.fel.engine.flows.AiFlow; | ||
| import modelengine.fit.waterflow.domain.flow.Flow; | ||
| import modelengine.fit.waterflow.domain.states.WhenHappen; | ||
| import modelengine.fit.waterflow.domain.stream.operators.Operators; | ||
| import modelengine.fitframework.inspection.Validation; | ||
|
|
||
| /** | ||
| * Represents a conditional branch that matches when conditions in an AI processing flow. | ||
| * This class handles the branching logic when specific conditions are met in the workflow. | ||
| * | ||
| * @param <O> The output data type of the current node. | ||
| * @param <D> The initial data type of the containing flow. | ||
| * @param <I> The input parameter data type. | ||
| * @param <RF> The internal flow type, extending {@link Flow}{@code <D>}. | ||
| * @param <F> The AI flow type, extending {@link AiFlow}{@code <D, RF>}. | ||
| * | ||
| * @author 宋永坦 | ||
| * @since 2025-06-12 | ||
| */ | ||
| public class AiWhenHappen<O, D, I, RF extends Flow<D>, F extends AiFlow<D, RF>> { | ||
| private final WhenHappen<O, D, I, RF> matchHappen; | ||
|
|
||
| private final F flow; | ||
|
|
||
| /** | ||
| * Creates a new AI flow matching generator that handles conditional branching. | ||
| * This constructor initializes a stateful processor for when/then style pattern matching | ||
| * within AI workflows. | ||
| * | ||
| * @param matchHappen The core matching generator that evaluates conditions. | ||
| * @param flow The parent AI flow. | ||
| * @throws NullPointerException If either parameter is null. | ||
| */ | ||
| public AiWhenHappen(WhenHappen<O, D, I, RF> matchHappen, F flow) { | ||
| this.matchHappen = Validation.notNull(matchHappen, "WhenHappen cannot be null."); | ||
| this.flow = Validation.notNull(flow, "Flow cannot be null."); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a conditional branch with the specified predicate and handler. | ||
| * | ||
| * @param whether The condition predicate that determines branch activation. | ||
| * @param processor The transformation handler to execute when condition is met. | ||
| * @return A new {@link AiWhenHappen} instance representing the conditional branch. | ||
| * @throws IllegalArgumentException if processor is null. | ||
| */ | ||
| public AiWhenHappen<O, D, I, RF, F> when(Operators.Whether<I> whether, Operators.Then<I, O> processor) { | ||
| Validation.notNull(processor, "Ai branch processor cannot be null."); | ||
| return new AiWhenHappen<>(this.matchHappen.when(whether, processor), this.flow); | ||
| } | ||
|
|
||
| /** | ||
| * Provides a default processing logic and terminates the conditional node. | ||
| * | ||
| * @param processor The handler to process unmatched inputs. | ||
| * @return An {@link AiState} representing the terminal node of the conditional flow. | ||
| * @throws IllegalArgumentException if processor is null. | ||
| */ | ||
| public AiState<O, D, O, RF, F> others(Operators.Then<I, O> processor) { | ||
| Validation.notNull(processor, "Ai branch processor cannot be null."); | ||
| return new AiState<>(this.matchHappen.others(processor), this.flow); | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
226 changes: 226 additions & 0 deletions
226
framework/fel/java/fel-flow/src/test/java/modelengine/fel/engine/AiFlowCaseTest.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,226 @@ | ||
| /*--------------------------------------------------------------------------------------------- | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * This file is a part of the ModelEngine Project. | ||
| * Licensed under the MIT License. See License.txt in the project root for license information. | ||
| *--------------------------------------------------------------------------------------------*/ | ||
|
|
||
| package modelengine.fel.engine; | ||
|
|
||
| import modelengine.fel.core.chat.ChatMessage; | ||
| import modelengine.fel.core.chat.ChatOption; | ||
| import modelengine.fel.core.chat.support.AiMessage; | ||
| import modelengine.fel.core.util.Tip; | ||
| import modelengine.fel.engine.flows.AiFlows; | ||
| import modelengine.fel.engine.flows.AiProcessFlow; | ||
| import modelengine.fel.engine.flows.ConverseLatch; | ||
| import modelengine.fel.engine.operators.models.ChatFlowModel; | ||
| import modelengine.fel.engine.operators.prompts.Prompts; | ||
| import modelengine.fit.waterflow.domain.context.FlowSession; | ||
| import modelengine.fit.waterflow.domain.context.StateContext; | ||
| import modelengine.fit.waterflow.domain.utils.SleepUtil; | ||
| import modelengine.fitframework.flowable.Choir; | ||
|
|
||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.DisplayName; | ||
| import org.junit.jupiter.api.Nested; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import java.util.concurrent.atomic.AtomicInteger; | ||
|
|
||
| /** | ||
| * Test cases demonstrating different flow control scenarios in AI processing pipelines. | ||
| * Contains nested test classes for specific flow control mechanisms. | ||
| * | ||
| * @author 宋永坦 | ||
| * @since 2025-06-11 | ||
| */ | ||
| public class AiFlowCaseTest { | ||
| private static final int SPEED = 1; | ||
|
|
||
| @Nested | ||
CodeCasterX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| class DesensitizeCase { | ||
| private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { | ||
| emitter.emit(new AiMessage("<think>")); | ||
| int takeTime = 10 * SPEED; | ||
| SleepUtil.sleep(takeTime); | ||
| for (int i = 0; i < 48; i++) { | ||
| emitter.emit(new AiMessage(String.valueOf(i))); | ||
| SleepUtil.sleep(takeTime); | ||
| } | ||
| emitter.emit(new AiMessage("</think>")); | ||
| SleepUtil.sleep(takeTime); | ||
| for (int i = 100; i < 150; i++) { | ||
| emitter.emit(new AiMessage(String.valueOf(i))); | ||
| SleepUtil.sleep(takeTime); | ||
| } | ||
| emitter.complete(); | ||
| }), ChatOption.custom().model("modelName").stream(true).build()); | ||
|
|
||
| private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create() | ||
| .prompt(Prompts.human("{{0}}")) | ||
| .generate(model) | ||
| .map(this::classic) | ||
| .conditions() | ||
| .when(chunk -> chunk.isThinkContent, input -> input) | ||
| .others(input -> { | ||
| this.log(input); | ||
| return input; | ||
| }) | ||
| .map(this::mockDesensitize1) | ||
| .map(this::mockDesensitize2) | ||
| .close(); | ||
|
|
||
| @Test | ||
| @DisplayName("DesensitizeCase") | ||
| void run() { | ||
CodeCasterX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| AtomicInteger counter = new AtomicInteger(0); | ||
| long startTime = System.currentTimeMillis(); | ||
| System.out.printf("time:%s, start.\n", startTime); | ||
| ConverseLatch<String> result = flow.converse(new FlowSession(true)).doOnConsume(answer -> { | ||
| System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); | ||
| counter.incrementAndGet(); | ||
| }).offer(Tip.fromArray("hi")); | ||
| result.await(); | ||
| System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); | ||
| Assertions.assertEquals(100, counter.get()); | ||
| } | ||
|
|
||
| private Chunk classic(ChatMessage message, StateContext ctx) { | ||
| if (message.text().trim().equals("<think>")) { | ||
| ctx.setState("isThinking", true); | ||
| return new Chunk(true, message.text()); | ||
| } | ||
| if (message.text().trim().equals("</think>")) { | ||
| ctx.setState("isThinking", false); | ||
| return new Chunk(true, message.text()); | ||
| } | ||
| if (Boolean.TRUE.equals(ctx.getState("isThinking"))) { | ||
| return new Chunk(true, message.text()); | ||
| } | ||
| return new Chunk(false, message.text()); | ||
| } | ||
|
|
||
| private String mockDesensitize1(Chunk chunk) { | ||
| SleepUtil.sleep(10 * SPEED); | ||
| return chunk.content.replace("3", "*"); | ||
| } | ||
|
|
||
| private String mockDesensitize2(String chunk) { | ||
| SleepUtil.sleep(10 * SPEED); | ||
| return chunk.replace("4", "*"); | ||
| } | ||
|
|
||
| private void log(Chunk chunk) { | ||
| System.out.println("log content:" + chunk.content); | ||
| } | ||
|
|
||
| private static class Chunk { | ||
| private final boolean isThinkContent; | ||
| private final String content; | ||
|
|
||
| private Chunk(boolean isThinkContent, String content) {this.isThinkContent = isThinkContent; | ||
| this.content = content; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Simulates a backpressure scenario where: | ||
| * <ol> | ||
| * <li>The LLM generates data faster than the TTS can process it.</li> | ||
| * <li>TTS processing is constrained to a single thread.</li> | ||
| * <li>TTS processing speed is artificially slowed.</li> | ||
| * </ol> | ||
| */ | ||
| @Nested | ||
| class BackPressureCase { | ||
| private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { | ||
| for (int i = 0; i < 100; i++) { | ||
| emitter.emit(new AiMessage(String.valueOf(i))); | ||
| SleepUtil.sleep(5 * SPEED); | ||
| } | ||
| emitter.complete(); | ||
| System.out.printf("time:%s, generate completed.\n", System.currentTimeMillis()); | ||
| }), ChatOption.custom().model("modelName").stream(true).build()); | ||
|
|
||
| private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create() | ||
| .prompt(Prompts.human("{{0}}")) | ||
| .generate(model) | ||
| .map(this::mockDesensitize).concurrency(1) // Limit processing to 1 concurrent thread | ||
| .map(this::mockTTS).concurrency(1) // Limit processing to 1 concurrent thread | ||
| .close(); | ||
|
|
||
| @Test | ||
| @DisplayName("BackPressureCase") | ||
| void run() { | ||
| AtomicInteger counter = new AtomicInteger(0); | ||
| long startTime = System.currentTimeMillis(); | ||
| System.out.printf("time:%s, start.\n", startTime); | ||
| ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> { | ||
| System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); | ||
| counter.incrementAndGet(); | ||
| }).offer(Tip.fromArray("hi")); | ||
| result.await(); | ||
| System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); | ||
| Assertions.assertEquals(100, counter.get()); | ||
| } | ||
|
|
||
| private String mockDesensitize(ChatMessage chunk) { | ||
| // Simulate time-consuming operation with a delay. | ||
| SleepUtil.sleep(10 * SPEED); | ||
| return chunk.text().replace("3", "*"); | ||
| } | ||
|
|
||
| private String mockTTS(String chunk) { | ||
| // Simulate time-consuming operation with a delay. | ||
| SleepUtil.sleep(10 * SPEED); | ||
| return chunk; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Demonstrates concurrent processing with balanced throughput where: | ||
| * <ol> | ||
| * <li>LLM generates data at moderate pace.</li> | ||
| * <li>Downstream processing runs with 3 concurrent threads.</li> | ||
| * <li>Processing speed is slightly slower than generation (3 : 1).</li> | ||
| * </ol> | ||
| */ | ||
| @Nested | ||
| class ConcurrencyCase { | ||
| private final ChatFlowModel model = new ChatFlowModel((prompt, chatOption) -> Choir.create(emitter -> { | ||
| for (int i = 0; i < 100; i++) { | ||
| emitter.emit(new AiMessage(String.valueOf(i))); | ||
| SleepUtil.sleep(10 * SPEED); | ||
| } | ||
| emitter.complete(); | ||
| }), ChatOption.custom().model("modelName").stream(true).build()); | ||
|
|
||
| private final AiProcessFlow<Tip, String> flow = AiFlows.<Tip>create() | ||
| .prompt(Prompts.human("{{0}}")) | ||
| .generate(model) | ||
| .map(this::mockDesensitize).concurrency(3) // Set processing to 3 concurrent thread | ||
| .close(); | ||
|
|
||
| @Test | ||
| @DisplayName("ConcurrencyCase") | ||
| void run() { | ||
| AtomicInteger counter = new AtomicInteger(0); | ||
| long startTime = System.currentTimeMillis(); | ||
| System.out.printf("time:%s, start.\n", startTime); | ||
| ConverseLatch<String> result = flow.converse(new FlowSession(false)).doOnConsume(answer -> { | ||
| System.out.printf("time:%s, chunk=%s\n", System.currentTimeMillis(), answer); | ||
| counter.incrementAndGet(); | ||
| }).offer(Tip.fromArray("hi")); | ||
| result.await(); | ||
| System.out.printf("time:%s, cost=%s\n", System.currentTimeMillis(), System.currentTimeMillis() - startTime); | ||
CodeCasterX marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Assertions.assertEquals(100, counter.get()); | ||
| } | ||
|
|
||
| private String mockDesensitize(ChatMessage chunk) { | ||
| // Simulate slower processing at 1/3 speed of LLM generation. | ||
| SleepUtil.sleep(30 * SPEED); | ||
| return chunk.text().replace("3", "*"); | ||
| } | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.