extensions();
/**
diff --git a/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/tool/support/DefaultToolCallChunk.java b/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/tool/support/DefaultToolCallChunk.java
new file mode 100644
index 00000000..f3005303
--- /dev/null
+++ b/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/tool/support/DefaultToolCallChunk.java
@@ -0,0 +1,71 @@
+/*---------------------------------------------------------------------------------------------
+ * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+ * This file is a part of the ModelEngine Project.
+ * Licensed under the MIT License. See License.txt in the project root for license information.
+ *--------------------------------------------------------------------------------------------*/
+
+package modelengine.fel.core.tool.support;
+
+import modelengine.fel.core.tool.ToolCall;
+import modelengine.fel.core.tool.ToolCallChunk;
+import modelengine.fitframework.inspection.Validation;
+import modelengine.fitframework.util.LazyLoader;
+import modelengine.fitframework.util.ObjectUtils;
+import modelengine.fitframework.util.StringUtils;
+
+/**
+ * 表示工具调用请求的实体片段默认实现。
+ *
+ * 该实现不保证流式片段聚合的线程安全,需要外部使用方保证线程安全。
+ *
+ *
+ * @author 刘信宏
+ * @since 2024-12-23
+ */
+public class DefaultToolCallChunk implements ToolCallChunk {
+ private final String id;
+ private final String name;
+ private final String arguments;
+ private final LazyLoader argumentsBuffer;
+
+ /**
+ * 使用 {@link ToolCall} 构造一个新的 {@link ToolCallChunk}。
+ *
+ * @param toolCall 表示工具调用的 {@link ToolCall}。
+ */
+ public DefaultToolCallChunk(ToolCall toolCall) {
+ Validation.notNull(toolCall, "The tool call cannot be null.");
+ this.id = Validation.notNull(toolCall.id(), "The tool call id cannot be null.");
+ this.name = toolCall.name();
+ this.arguments = toolCall.arguments();
+ this.argumentsBuffer =
+ new LazyLoader<>(() -> new StringBuilder(ObjectUtils.nullIf(this.arguments, StringUtils.EMPTY)));
+ }
+
+ @Override
+ public String id() {
+ return this.id;
+ }
+
+ @Override
+ public Integer index() {
+ // 工具调用的片段不需要index字段。
+ return null;
+ }
+
+ @Override
+ public String name() {
+ return this.name;
+ }
+
+ @Override
+ public String arguments() {
+ return this.argumentsBuffer.get().toString();
+ }
+
+ @Override
+ public void merge(ToolCall toolCall) {
+ Validation.notNull(toolCall, "The tool call cannot be null.");
+ this.argumentsBuffer.get().append(ObjectUtils.nullIf(toolCall.arguments(), StringUtils.EMPTY));
+ }
+}
diff --git a/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java b/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java
index 7a1c2d60..c5a51fa4 100644
--- a/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java
+++ b/framework/fel/java/fel-core/src/main/java/modelengine/fel/core/util/Tip.java
@@ -25,6 +25,11 @@ public class Tip {
private final Map values = new HashMap<>();
private int index = 0;
+ @Override
+ public String toString() {
+ return this.values.toString();
+ }
+
/**
* 从键值对创建 {@link Tip} 的实例。
*
diff --git a/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RerankDocumentProcessorTest.java b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RerankDocumentProcessorTest.java
new file mode 100644
index 00000000..8a08a5e7
--- /dev/null
+++ b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RerankDocumentProcessorTest.java
@@ -0,0 +1,108 @@
+/*---------------------------------------------------------------------------------------------
+ * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+ * This file is a part of the ModelEngine Project.
+ * Licensed under the MIT License. See License.txt in the project root for license information.
+ *--------------------------------------------------------------------------------------------*/
+
+package modelengine.fel.core.document.support;
+
+import static modelengine.fel.core.document.support.TestRerankModelController.FAIL_ENDPOINT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+import modelengine.fel.core.document.Document;
+import modelengine.fel.core.document.MeasurableDocument;
+import modelengine.fit.http.client.HttpClassicClientFactory;
+import modelengine.fitframework.annotation.Fit;
+import modelengine.fitframework.exception.FitException;
+import modelengine.fitframework.test.annotation.MvcTest;
+import modelengine.fitframework.test.domain.mvc.MockMvc;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * ReRank 客户端服务测试。
+ *
+ * @author 马朝阳
+ * @since 2024-09-14
+ */
+@MvcTest(classes = TestRerankModelController.class)
+public class RerankDocumentProcessorTest {
+ private static final String[] DOCS = new String[] {"Burgers", "Carson", "Shanghai", "Beijing", "Test"};
+
+ private RerankDocumentProcessor client;
+
+ @Fit
+ private HttpClassicClientFactory httpClientFactory;
+
+ @Fit
+ private MockMvc mockMvc;
+
+ @BeforeEach
+ public void setUp() {
+ this.client = new RerankDocumentProcessor(httpClientFactory,
+ RerankOption.custom()
+ .baseUri("http://localhost:" + mockMvc.getPort())
+ .model("rerank1")
+ .query("What is the capital of the united states?")
+ .topN(3)
+ .build());
+ }
+
+ @Test
+ @DisplayName("测试 Rerank 接口调用响应成功")
+ public void testWhenCallRerankModelThenSuccess() {
+ List texts = Arrays.asList(DOCS[3], DOCS[4], DOCS[0]);
+ List scores = Arrays.asList(0.999071, 0.7867867, 0.32713068);
+ List docs = this.client.process(this.getRequest());
+ assertThat(docs).extracting(MeasurableDocument::text).isEqualTo(texts);
+ assertThat(docs).extracting(MeasurableDocument::score).isEqualTo(scores);
+ }
+
+ @Test
+ @DisplayName("测试 Rerank 接口调用响应异常")
+ public void testWhenCallRerankModelThenResponseException() {
+ RerankDocumentProcessor client1 = new RerankDocumentProcessor(httpClientFactory,
+ RerankOption.custom().baseUri("http://localhost:" + mockMvc.getPort() + FAIL_ENDPOINT).build());
+ assertThatThrownBy(() -> client1.process(this.getRequest())).isInstanceOf(FitException.class);
+ }
+
+ @Test
+ @DisplayName("测试 Rerank 接口参数为空响应异常")
+ public void testWhenCallRerankModelNullParamThenResponseException() {
+ assertThatThrownBy(() -> new RerankDocumentProcessor(this.httpClientFactory, null)).isInstanceOf(
+ IllegalArgumentException.class);
+ assertThatThrownBy(() -> new RerankDocumentProcessor(null, RerankOption.custom().build())).isInstanceOf(
+ IllegalArgumentException.class);
+ }
+
+ @Test
+ @DisplayName("测试 Rerank 接口请求参数为空响应异常")
+ public void testWhenCallRerankModelNullRequestParamThenResponseException() {
+ assertThat(this.client.process(new ArrayList<>())).isEqualTo(Collections.emptyList());
+ assertThat(this.client.process(null)).isEqualTo(Collections.emptyList());
+ }
+
+ private List getRequest() {
+ List documents = new ArrayList<>();
+ Arrays.stream(DOCS)
+ .forEach(doc -> documents.add(new MeasurableDocument(Document.custom()
+ .text(doc)
+ .metadata(new HashMap<>())
+ .build(), -1)));
+ return documents;
+ }
+
+ private String getMockReRankResponseBody() {
+ return "{\"results\":[{\"index\":3,\"relevance_score\":0.999071},{\"index\":4,\"relevance_score\":0.7867867},"
+ + "{\"index\":0,\"relevance_score\":0.32713068}]}";
+ }
+}
\ No newline at end of file
diff --git a/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RrfPostProcessorTest.java b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RrfPostProcessorTest.java
new file mode 100644
index 00000000..1b6777c6
--- /dev/null
+++ b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/RrfPostProcessorTest.java
@@ -0,0 +1,90 @@
+/*---------------------------------------------------------------------------------------------
+ * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+ * This file is a part of the ModelEngine Project.
+ * Licensed under the MIT License. See License.txt in the project root for license information.
+ *--------------------------------------------------------------------------------------------*/
+
+package modelengine.fel.core.document.support;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+import modelengine.fel.core.document.Document;
+import modelengine.fel.core.document.MeasurableDocument;
+import modelengine.fel.core.document.support.postprocessor.RrfPostProcessor;
+import modelengine.fel.core.document.support.postprocessor.RrfScoreStrategyEnum;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * RRF 算法测试。
+ *
+ * @author 马朝阳
+ * @since 2024-09-29
+ */
+public class RrfPostProcessorTest {
+ private static final String[] DOCS = new String[] {"A", "B", "C", "D", "E"};
+
+ @Test
+ @DisplayName("测试 RFF 算法最大值策略成功")
+ public void testWhenCallRRFMaxThenSuccess() {
+ RrfPostProcessor rrf = new RrfPostProcessor();
+ List process = rrf.process(getDocumentList());
+ assertThat(process).map(MeasurableDocument::score).containsExactly(0.94, 0.69, 0.36, 0.52, 0.32);
+ assertThat(process).map(MeasurableDocument::id).containsExactly("1", "4", "2", "5", "3");
+ }
+
+ @Test
+ @DisplayName("测试 RFF 算法平均值策略成功")
+ public void testWhenCallRRFAvgThenSuccess() {
+ RrfPostProcessor rrf = new RrfPostProcessor(RrfScoreStrategyEnum.AVG);
+ List process = rrf.process(getDocumentList());
+ assertThat(process).map(MeasurableDocument::score).containsExactly(0.84, 0.655, 0.36, 0.52, 0.32);
+ assertThat(process).map(MeasurableDocument::id).containsExactly("1", "4", "2", "5", "3");
+ }
+
+ @Test
+ @DisplayName("测试 RFF 算法倒数系数")
+ public void testWhenCallRRFFactorThenSuccess() {
+ RrfPostProcessor rrf = new RrfPostProcessor(RrfScoreStrategyEnum.AVG, 100);
+ List process = rrf.process(getDocumentList());
+ assertThat(process).map(MeasurableDocument::score).containsExactly(0.84, 0.655, 0.36, 0.52, 0.32);
+ assertThat(process).map(MeasurableDocument::id).containsExactly("1", "4", "2", "5", "3");
+ }
+
+ @Test
+ @DisplayName("测试 RFF 算法策略失败")
+ public void testWhenCallRRFArgNullThenFail() {
+ assertThatThrownBy(() -> new RrfPostProcessor(null, 60)).isInstanceOf(IllegalArgumentException.class);
+ assertThatThrownBy(() -> new RrfPostProcessor(RrfScoreStrategyEnum.AVG, -1)).isInstanceOf(
+ IllegalArgumentException.class);
+ }
+
+ private List getDocumentList() {
+ List res = new ArrayList<>();
+ res.addAll(getGroup("1", new int[] {1, 3, 4}, new double[] {0.74, 0.32, 0.69}));
+ res.addAll(getGroup("2", new int[] {1, 5}, new double[] {0.94, 0.52}));
+ res.addAll(getGroup("3", new int[] {2, 4}, new double[] {0.36, 0.62}));
+
+ return res;
+ }
+
+ private List getGroup(String groupId, int[] ids, double[] scores) {
+ List documents = new ArrayList<>();
+ int scoreId = 0;
+ for (int id : ids) {
+ documents.add(new MeasurableDocument(Document.custom()
+ .text(DOCS[id - 1])
+ .id(String.valueOf(id))
+ .metadata(new HashMap<>())
+ .build(), scores[scoreId], groupId));
+ scoreId++;
+ }
+ return documents;
+ }
+}
diff --git a/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/TestRerankModelController.java b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/TestRerankModelController.java
new file mode 100644
index 00000000..74e1fd23
--- /dev/null
+++ b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/document/support/TestRerankModelController.java
@@ -0,0 +1,55 @@
+/*---------------------------------------------------------------------------------------------
+ * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
+ * This file is a part of the ModelEngine Project.
+ * Licensed under the MIT License. See License.txt in the project root for license information.
+ *--------------------------------------------------------------------------------------------*/
+
+package modelengine.fel.core.document.support;
+
+import modelengine.fit.http.annotation.PostMapping;
+import modelengine.fitframework.annotation.Component;
+import modelengine.fitframework.serialization.ObjectSerializer;
+
+/**
+ * 表示测试使用的 Rerank 接口。
+ *
+ * @author 马朝阳
+ * @since 2024-09-27
+ */
+@Component
+public class TestRerankModelController {
+ /**
+ * Rerank 接口失败调用端口。
+ */
+ public static final String FAIL_ENDPOINT = "/fail";
+
+ private final ObjectSerializer serializer;
+
+ TestRerankModelController(ObjectSerializer serializer) {
+ this.serializer = serializer;
+ }
+
+ /**
+ * 测试成功用 Rerank 接口。
+ *
+ * @return 表示流式返回结果的 {@link String}。
+ */
+ @PostMapping(RerankApi.RERANK_ENDPOINT)
+ public RerankResponse rerankSuccess() {
+ String json =
+ "{\"results\":[{\"index\":3,\"relevance_score\":0.999071},{\"index\":4,\"relevance_score\":0.7867867},"
+ + "{\"index\":0,\"relevance_score\":0.32713068}]}";
+ return this.serializer.deserialize(json, RerankResponse.class);
+ }
+
+ /**
+ * 测试用 Rerank 接口。
+ *
+ * @return 表示流式返回结果的 {@link String}。
+ */
+ @PostMapping(FAIL_ENDPOINT + RerankApi.RERANK_ENDPOINT)
+ public RerankResponse rerankFail() {
+ String json = "wrong json";
+ return this.serializer.deserialize(json, RerankResponse.class);
+ }
+}
diff --git a/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/splitter/support/SimpleTokenizer.java b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/splitter/support/SimpleTokenizer.java
index b4489b55..48a29378 100644
--- a/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/splitter/support/SimpleTokenizer.java
+++ b/framework/fel/java/fel-core/src/test/java/modelengine/fel/core/splitter/support/SimpleTokenizer.java
@@ -6,6 +6,8 @@
package modelengine.fel.core.splitter.support;
+import static modelengine.fitframework.inspection.Validation.notNull;
+
import modelengine.fel.core.tokenizer.Tokenizer;
import java.util.ArrayList;
@@ -36,4 +38,10 @@ public String decode(List tokens) {
}
return new String(charArray);
}
+
+ @Override
+ public int countToken(String text) {
+ notNull(text, "Text cannot be null.");
+ return text.length();
+ }
}
\ No newline at end of file
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
index 8fc2f794..0bd7962c 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiStart.java
@@ -12,6 +12,7 @@
import modelengine.fel.core.document.Content;
import modelengine.fel.core.document.Document;
import modelengine.fel.core.document.Measurable;
+import modelengine.fel.core.model.BlockModel;
import modelengine.fel.core.pattern.Parser;
import modelengine.fel.core.pattern.Pattern;
import modelengine.fel.core.pattern.PostProcessor;
@@ -28,7 +29,9 @@
import modelengine.fel.engine.flows.Conversation;
import modelengine.fel.engine.operators.models.FlowModel;
import modelengine.fel.engine.operators.patterns.AbstractFlowPattern;
+import modelengine.fel.engine.operators.patterns.FlowNodeSupportable;
import modelengine.fel.engine.operators.patterns.FlowPattern;
+import modelengine.fel.engine.operators.patterns.FlowSupportable;
import modelengine.fel.engine.operators.patterns.SimpleFlowPattern;
import modelengine.fel.engine.operators.prompts.PromptTemplate;
import modelengine.fel.engine.util.AiFlowSession;
@@ -46,7 +49,6 @@
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
@@ -419,17 +421,30 @@ public AiState parse(Parser parser) {
*/
public AiState delegate(Pattern pattern) {
Validation.notNull(pattern, "Pattern operator cannot be null.");
- FlowPattern flowPattern = this.castFlowPattern(pattern);
+ return this.delegate(new SimpleFlowPattern<>(pattern));
+ }
+
+ /**
+ * 将数据委托给 {@link FlowPattern}{@code <}{@link O}{@code , }{@link R}{@code >}
+ * 处理,然后自身放弃处理数据。处理后的数据会发送回该节点,作为该节点的处理结果。
+ *
+ * @param pattern 表示异步委托单元的 {@link FlowPattern}{@code <}{@link O}{@code , }{@link R}{@code >}。
+ * @param 表示委托节点的输出数据类型。
+ * @return 表示委托节点的 {@link AiState}{@code <}{@link R}{@code , }{@link D}{@code , }{@link O}{@code ,
+ * }{@link RF}{@code , }{@link F}{@code >}。
+ * @throws IllegalArgumentException 当 {@code pattern} 为 {@code null} 时。
+ */
+ public AiState delegate(FlowPattern pattern) {
+ Validation.notNull(pattern, "Pattern operator cannot be null.");
Processor orProcessor = this.publisher().flatMap(input -> {
- FlowEmitter cachedEmitter = FlowEmitter.from(flowPattern);
- AiFlowSession.applyPattern(flowPattern, input.getData(), input.getSession());
- return Flows.source(cachedEmitter);
+ FlowEmitter emitter = AiFlowSession.applyPattern(pattern, input.getData(), input.getSession());
+ return Flows.source(emitter);
}, null);
this.displayPatternProcessor(pattern, orProcessor);
return new AiState<>(new State<>(orProcessor, this.flow().origin()), this.flow());
}
- private void displayPatternProcessor(Pattern pattern, Processor processor) {
+ private void displayPatternProcessor(FlowPattern pattern, Processor processor) {
if (pattern instanceof AbstractFlowPattern) {
Flow originFlow = ObjectUtils.>cast(pattern).origin();
processor.displayAs("delegate to flow", originFlow, originFlow.start().getId());
@@ -477,13 +492,7 @@ public AiState delegate(Operators.ProcessMap operator)
*/
public AiState delegate(AiProcessFlow aiFlow) {
Validation.notNull(aiFlow, "Flow cannot be null.");
- Processor processor = this.publisher().map(input -> {
- aiFlow.converse(input.getSession()).offer(input.getData());
- return (R) null;
- }, null).displayAs("delegate to flow", aiFlow.origin(), aiFlow.origin().start().getId());
- AiState state = new AiState<>(new State<>(processor, this.flow().origin()), this.flow());
- state.offer(aiFlow);
- return state;
+ return this.delegate(new FlowSupportable<>(aiFlow));
}
/**
@@ -503,14 +512,7 @@ public AiState delegate(AiProcessFlow aiFlow) {
public AiState delegate(AiProcessFlow aiFlow, String nodeId) {
Validation.notNull(aiFlow, "Flow cannot be null.");
Validation.notBlank(nodeId, "Node id cannot be blank.");
- Processor processor = this.publisher().map(input -> {
- aiFlow.converse(input.getSession()).offer(nodeId, Collections.singletonList(input.getData()));
- return (R) null;
- }, null).displayAs("delegate to node", aiFlow.origin(), nodeId);
-
- AiState state = new AiState<>(new State<>(processor, this.flow().origin()), this.flow());
- state.offer(aiFlow);
- return state;
+ return this.delegate(new FlowNodeSupportable<>(aiFlow, nodeId));
}
/**
@@ -531,6 +533,22 @@ public final AiState prompt(PromptTemplate... templates)
}, null).displayAs("prompt"), this.flow().origin()), this.flow());
}
+ /**
+ * 生成大模型阻塞调用节点。
+ *
+ * @param model 表示模型算子实现的 {@link BlockModel}{@code <}{@link M}{@code >}。
+ * @param 表示模型节点的输入数据类型。
+ * @return 表示大模型阻塞调用节点的 {@link AiState}{@code <}{@link ChatMessage}{@code , }{@link D}{@code ,
+ * }{@link O}{@code , }{@link RF}{@code , }{@link F}{@code >}。
+ * @throws IllegalArgumentException 当 {@code model} 为 {@code null} 时。
+ */
+ public AiState generate(BlockModel model) {
+ Validation.notNull(model, "Model operator cannot be null.");
+ return new AiState<>(new State<>(this.publisher()
+ .map(input -> AiFlowSession.applyPattern(model, input.getData(), input.getSession()), null)
+ .displayAs("generate"), this.flow().origin()), this.flow());
+ }
+
/**
* 生成大模型流式调用节点。
*
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java
index bb697ed5..1f175ac5 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/activities/AiState.java
@@ -103,9 +103,12 @@ public Publisher publisher() {
@Override
public void register(EmitterListener handler) {
- if (handler != null) {
- this.state.register(handler);
- }
+ this.state.register(handler);
+ }
+
+ @Override
+ public void unregister(EmitterListener listener) {
+ this.state.unregister(listener);
}
@Override
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlows.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlows.java
index fff32fc5..49ace2aa 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlows.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiFlows.java
@@ -8,6 +8,8 @@
import modelengine.fel.engine.activities.AiDataStart;
import modelengine.fel.engine.activities.AiStart;
+import modelengine.fit.waterflow.domain.context.FlowSession;
+import modelengine.fit.waterflow.domain.emitters.Emitter;
import modelengine.fit.waterflow.domain.flow.Flows;
import modelengine.fit.waterflow.domain.flow.ProcessFlow;
import modelengine.fit.waterflow.domain.states.Start;
@@ -56,4 +58,16 @@ public static AiDataStart flux(D... data) {
AiStart, AiProcessFlow> start = AiFlows.create();
return new AiDataStart<>(start, data);
}
+
+ /**
+ * 通过指定的发射源来构造一个数据前置流。
+ *
+ * @param emitter 表示数据源的 {@link Emitter}{@code <}{@link D}{@code , }{@link FlowSession}{@code >}。
+ * @param 表示数据类型。
+ * @return 表示数据前置流的 {@link AiDataStart}{@code <}{@link D}{@code , }{@link D}{@code , }{@link D}{@code >}。
+ */
+ public static AiDataStart source(Emitter emitter) {
+ AiStart, AiProcessFlow> start = AiFlows.create();
+ return new AiDataStart<>(start, emitter);
+ }
}
diff --git a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
index 40e5abd9..c7d132e9 100644
--- a/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
+++ b/framework/fel/java/fel-flow/src/main/java/modelengine/fel/engine/flows/AiProcessFlow.java
@@ -16,6 +16,9 @@
import modelengine.fit.waterflow.domain.stream.reactive.Publisher;
import modelengine.fitframework.util.ObjectUtils;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
/**
* AI 数据处理流程,在 {@link AiFlow} 的基础上增加流程间的数据流转能力,并对外提供对话语义。
*
@@ -26,6 +29,9 @@
*/
public class AiProcessFlow extends AiFlow>
implements EmitterListener, Emitter {
+ private final Map, EmitterListener