Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
062e6dd
[fel] chore: merge fel codebase
loveTsong Apr 12, 2025
88e5a06
[fel] chore: update pom configuration
loveTsong May 6, 2025
eee38e7
[fel] chore: adapt build directory
loveTsong May 8, 2025
f80f926
[fel] feat: add context passing for agent interface
loveTsong May 8, 2025
afbf98d
[fel] refactor: optimize chat option validation position
loveTsong May 8, 2025
e6eef35
[fel] refactor: replace LLM emitter streaming with reduce
loveTsong May 8, 2025
11a3792
[waterflow] fix: multiple critical issues
loveTsong Mar 26, 2025
384611b
[waterflow] if data is null, the next session is useless.
loveTsong May 15, 2025
748047f
[waterflow] fix fake stream for FitBoundedEmitter
loveTsong May 15, 2025
e216a53
[waterflow] fix the wrong usage of session for the process operator
loveTsong May 15, 2025
7643946
[waterflow] refactor: adapt emitter in SimpleFlowPattern
loveTsong May 15, 2025
0e074dd
[fel] fix: enforce ordering in AbstractFlowPattern sync calls
loveTsong May 16, 2025
ef45ca7
[fel] feat: adapt delegate AiProcessFlow scenarios
loveTsong May 16, 2025
dd4c257
[waterflow] clean processingSessions
loveTsong May 16, 2025
7bc5fa4
[fel] support multi offer for the one conversation
loveTsong May 19, 2025
7876846
[waterflow] fix the acc group order leak
loveTsong May 19, 2025
a47efd3
[waterflow] ensure thread safety for emitter listeners
loveTsong May 20, 2025
cb734d2
[waterflow] replace direct completion check with callback-based windo…
loveTsong May 20, 2025
94583e3
[waterflow] support multi onDoneHandlers for window
loveTsong May 22, 2025
c61cfde
[waterflow] less lock scope for to listeners
loveTsong May 22, 2025
e0507a5
[fel] apply onDone to AbstractFlowPattern
loveTsong May 22, 2025
2a648e2
[fel] enable agent tool call
loveTsong May 23, 2025
36be8db
[fel] remove nonNull
loveTsong May 28, 2025
a6c7dd4
[fel] adapter llm stream output
loveTsong May 28, 2025
31be73f
[waterflow] fix: flatMap-reduce occasionally not terminating
loveTsong May 28, 2025
e707293
[waterflow] chore: remove unused consumeAction
loveTsong May 28, 2025
a8bed1a
[fel] refactor: unify FlowPattern handling and fix listener leaks in …
loveTsong May 28, 2025
fb02892
[fel] remove logs and more
loveTsong May 30, 2025
483b894
[fel] unique h2 version
loveTsong Jun 3, 2025
5ad00d0
[fel] adapter llm chat reasoning_content
loveTsong Jun 3, 2025
37ded1e
[fel] add null check for isLatest field
loveTsong Jun 4, 2025
f3834c8
[waterflow] fix: concurrent resource cleanup at termination nodes
loveTsong May 30, 2025
99bc30f
[waterflow] fix: memory leak in ConditionFrom
loveTsong Jun 3, 2025
06ced6b
[waterflow] remove useless dependency
loveTsong Jun 6, 2025
6a228d9
[waterflow] fix: clean flow locks
loveTsong Jun 6, 2025
dc05d1f
[fel] handle toolCalls when extracting reasoningContent
loveTsong May 30, 2025
f801b57
[fel] should not sync the code node tools code
loveTsong Jun 7, 2025
69c2892
[fel] fix: adapter JacksonObjectSerializer modification
loveTsong Jun 9, 2025
2e5ebc5
[fel] clean code
loveTsong Jun 9, 2025
788fbe2
[waterflow] clean code
loveTsong Jun 10, 2025
cbbdbaa
[waterflow] clean code
loveTsong Jun 10, 2025
59c6aaa
[waterflow] clean code
loveTsong Jun 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion framework/fel/java/fel-community/model-openai/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>org.fitframework</groupId>
<artifactId>fit-util</artifactId>
</dependency>
<dependency>
<groupId>org.fitframework.service</groupId>
<artifactId>fit-security</artifactId>
</dependency>

<!-- FEL -->
<dependency>
Expand All @@ -53,6 +57,15 @@
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
</dependency>
<dependency>
<groupId>org.fitframework</groupId>
<artifactId>fit-test-framework</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -90,7 +103,7 @@
<configuration>
<target>
<copy file="${project.build.directory}/${project.build.finalName}.jar"
todir="../../../../fit/java/target/plugins"/>
todir="../../../../../build/plugins"/>
</target>
</configuration>
<goals>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,51 @@
import modelengine.fel.community.model.openai.entity.embed.OpenAiEmbedding;
import modelengine.fel.community.model.openai.entity.embed.OpenAiEmbeddingRequest;
import modelengine.fel.community.model.openai.entity.embed.OpenAiEmbeddingResponse;
import modelengine.fel.community.model.openai.entity.image.OpenAiImageRequest;
import modelengine.fel.community.model.openai.entity.image.OpenAiImageResponse;
import modelengine.fel.community.model.openai.enums.ModelProcessingState;
import modelengine.fel.community.model.openai.util.HttpUtils;
import modelengine.fel.core.chat.ChatMessage;
import modelengine.fel.core.chat.ChatModel;
import modelengine.fel.core.chat.ChatOption;
import modelengine.fel.core.chat.Prompt;
import modelengine.fel.core.chat.support.AiMessage;
import modelengine.fel.core.embed.EmbedModel;
import modelengine.fel.core.embed.EmbedOption;
import modelengine.fel.core.embed.Embedding;
import modelengine.fel.core.image.ImageModel;
import modelengine.fel.core.image.ImageOption;
import modelengine.fel.core.model.http.SecureConfig;
import modelengine.fit.http.client.HttpClassicClient;
import modelengine.fit.http.client.HttpClassicClientFactory;
import modelengine.fit.http.client.HttpClassicClientRequest;
import modelengine.fit.http.client.HttpClassicClientResponse;
import modelengine.fit.http.entity.ObjectEntity;
import modelengine.fit.http.protocol.HttpRequestMethod;
import modelengine.fit.security.Decryptor;
import modelengine.fitframework.annotation.Component;
import modelengine.fitframework.annotation.Fit;
import modelengine.fitframework.conf.Config;
import modelengine.fitframework.exception.FitException;
import modelengine.fitframework.flowable.Choir;
import modelengine.fitframework.ioc.BeanContainer;
import modelengine.fitframework.ioc.BeanFactory;
import modelengine.fitframework.log.Logger;
import modelengine.fitframework.resource.UrlUtils;
import modelengine.fitframework.resource.web.Media;
import modelengine.fitframework.serialization.ObjectSerializer;
import modelengine.fitframework.util.CollectionUtils;
import modelengine.fitframework.util.LazyLoader;
import modelengine.fitframework.util.MapBuilder;
import modelengine.fitframework.util.ObjectUtils;
import modelengine.fitframework.util.StringUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* 表示 openai 模型服务。
Expand All @@ -48,39 +70,61 @@
* @since 2024-08-07
*/
@Component
public class OpenAiModel implements EmbedModel, ChatModel {
public class OpenAiModel implements EmbedModel, ChatModel, ImageModel {
private static final Logger log = Logger.get(OpenAiModel.class);
private static final Map<String, Boolean> HTTPS_CONFIG_KEY_MAPS = MapBuilder.<String, Boolean>get()
.put("client.http.secure.ignore-trust", Boolean.FALSE)
.put("client.http.secure.ignore-hostname", Boolean.FALSE)
.put("client.http.secure.trust-store-file", Boolean.FALSE)
.put("client.http.secure.trust-store-password", Boolean.TRUE)
.put("client.http.secure.key-store-file", Boolean.FALSE)
.put("client.http.secure.key-store-password", Boolean.TRUE)
.build();
private static final String RESPONSE_TEMPLATE = "<think>{0}<//think>{1}";

private final HttpClassicClientFactory httpClientFactory;
private final HttpClassicClientFactory.Config config;
private final HttpClassicClientFactory.Config clientConfig;
private final String baseUrl;
private final String defaultApiKey;
private final ObjectSerializer serializer;
private final Config config;
private final Decryptor decryptor;
private final LazyLoader<HttpClassicClient> httpClient;

/**
* 创建 openai 嵌入模型服务的实例。
*
* @param httpClientFactory 表示 http 客户端工厂的 {@link HttpClassicClientFactory}。
* @param config 表示 openai http 配置的 {@link OpenAiConfig}。
* @param clientConfig 表示 openai http 配置的 {@link OpenAiConfig}。
* @param serializer 表示对象序列化器的 {@link ObjectSerializer}。
* @param config 表示配置信息的 {@link Config}。
* @param container 表示 bean 容器的 {@link BeanContainer}。
* @throws IllegalArgumentException 当 {@code httpClientFactory}、{@code config} 为 {@code null} 时。
*/
public OpenAiModel(HttpClassicClientFactory httpClientFactory, OpenAiConfig config, ObjectSerializer serializer) {
notNull(config, "The config cannot be null.");
public OpenAiModel(HttpClassicClientFactory httpClientFactory, OpenAiConfig clientConfig,
@Fit(alias = "json") ObjectSerializer serializer, Config config, BeanContainer container) {
notNull(clientConfig, "The config cannot be null.");
this.httpClientFactory = notNull(httpClientFactory, "The http client factory cannot be null.");
this.config = HttpClassicClientFactory.Config.builder()
.connectTimeout(config.getConnectTimeout())
.socketTimeout(config.getReadTimeout())
this.clientConfig = HttpClassicClientFactory.Config.builder()
.connectTimeout(clientConfig.getConnectTimeout())
.socketTimeout(clientConfig.getReadTimeout())
.build();
this.serializer = notNull(serializer, "The serializer cannot be null.");
this.baseUrl = config.getApiBase();
this.defaultApiKey = config.getApiKey();
this.baseUrl = clientConfig.getApiBase();
this.defaultApiKey = clientConfig.getApiKey();
this.httpClient = new LazyLoader<>(this::getHttpClient);
this.config = config;
this.decryptor = container.lookup(Decryptor.class)
.map(BeanFactory::<Decryptor>get)
.orElseGet(() -> encrypted -> encrypted);
}

@Override
public List<Embedding> generate(List<String> inputs, EmbedOption option) {
notEmpty(inputs, "The input cannot be empty.");
notNull(option, "The embed option cannot be null.");
notBlank(option.model(), "The embed model name cannot be null.");
HttpClassicClientRequest request = this.httpClientFactory.create(this.config)
HttpClassicClientRequest request = this.httpClient.get()
.createRequest(HttpRequestMethod.POST, UrlUtils.combine(this.baseUrl, OpenAiApi.EMBEDDING_ENDPOINT));
HttpUtils.setBearerAuth(request, StringUtils.blankIf(option.apiKey(), this.defaultApiKey));
request.jsonEntity(new OpenAiEmbeddingRequest(inputs, option.model()));
Expand All @@ -98,19 +142,61 @@ public List<Embedding> generate(List<String> inputs, EmbedOption option) {
public Choir<ChatMessage> generate(Prompt prompt, ChatOption chatOption) {
notNull(prompt, "The prompt cannot be null.");
notNull(chatOption, "The chat option cannot be null.");
HttpClassicClientRequest request = this.httpClientFactory.create(this.config)
.createRequest(HttpRequestMethod.POST, UrlUtils.combine(this.baseUrl, OpenAiApi.CHAT_ENDPOINT));
String modelSource = StringUtils.blankIf(chatOption.baseUrl(), this.baseUrl);
HttpClassicClientRequest request = this.getHttpClient(chatOption.secureConfig())
.createRequest(HttpRequestMethod.POST, UrlUtils.combine(modelSource, OpenAiApi.CHAT_ENDPOINT));
HttpUtils.setBearerAuth(request, StringUtils.blankIf(chatOption.apiKey(), this.defaultApiKey));
request.jsonEntity(new OpenAiChatCompletionRequest(prompt, chatOption));
return chatOption.stream() ? this.createChatStream(request) : this.createChatCompletion(request);
}

@Override
public List<Media> generate(String prompt, ImageOption option) {
notNull(prompt, "The prompt cannot be null.");
notNull(option, "The image option cannot be null.");
String modelSource = StringUtils.blankIf(option.baseUrl(), this.baseUrl);
HttpClassicClientRequest request = this.httpClient.get()
.createRequest(HttpRequestMethod.POST, UrlUtils.combine(modelSource, OpenAiApi.IMAGE_ENDPOINT));
HttpUtils.setBearerAuth(request, StringUtils.blankIf(option.apiKey(), this.defaultApiKey));
request.jsonEntity(new OpenAiImageRequest(option.model(), option.size(), prompt));
Class<OpenAiImageResponse> clazz = OpenAiImageResponse.class;
try (HttpClassicClientResponse<OpenAiImageResponse> response = request.exchange(clazz)) {
return response.objectEntity()
.map(entity -> entity.object().media())
.orElseThrow(() -> new FitException("The response body is abnormal."));
} catch (IOException e) {
throw new IllegalStateException("Failed to close response.", e);
}
}

private Choir<ChatMessage> createChatStream(HttpClassicClientRequest request) {
AtomicReference<ModelProcessingState> modelProcessingState =
new AtomicReference<>(ModelProcessingState.INITIAL);
return request.<String>exchangeStream(String.class)
.filter(str -> !StringUtils.equals(str, "[DONE]"))
.map(str -> this.serializer.<OpenAiChatCompletionResponse>deserialize(str,
OpenAiChatCompletionResponse.class))
.map(OpenAiChatCompletionResponse::message);
.map(response -> getChatMessage(response, modelProcessingState));
}

private ChatMessage getChatMessage(OpenAiChatCompletionResponse response,
AtomicReference<ModelProcessingState> state) {
// 适配reasoning_content格式返回的模型推理内容,模型生成内容顺序为先reasoning_content后content
// 在第一个reasoning_content chunk之前增加<think>标签,并且在第一个content chunk之前增加</think>标签
if (state.get() == ModelProcessingState.INITIAL && StringUtils.isNotEmpty(response.reasoningContent().text())) {
String text = "<think>" + response.reasoningContent().text();
state.set(ModelProcessingState.THINKING);
return new AiMessage(text, response.message().toolCalls());
}
if (state.get() == ModelProcessingState.THINKING && StringUtils.isNotEmpty(response.message().text())) {
String text = "</think>" + response.message().text();
state.set(ModelProcessingState.RESPONDING);
return new AiMessage(text, response.message().toolCalls());
}
if (state.get() == ModelProcessingState.THINKING) {
return new AiMessage(response.reasoningContent().text(), response.message().toolCalls());
}
return response.message();
}

private Choir<ChatMessage> createChatCompletion(HttpClassicClientRequest request) {
Expand All @@ -119,9 +205,64 @@ private Choir<ChatMessage> createChatCompletion(HttpClassicClientRequest request
OpenAiChatCompletionResponse chatCompletionResponse = response.objectEntity()
.map(ObjectEntity::object)
.orElseThrow(() -> new FitException("The response body is abnormal."));
return Choir.just(chatCompletionResponse.message());
String finalMessage = chatCompletionResponse.message().text();
if (StringUtils.isNotBlank(chatCompletionResponse.reasoningContent().text())) {
finalMessage = StringUtils.format(RESPONSE_TEMPLATE,
chatCompletionResponse.reasoningContent().text(),
finalMessage);
}
return Choir.just(new AiMessage(finalMessage, chatCompletionResponse.message().toolCalls()));
} catch (IOException e) {
throw new FitException(e);
}
}

private HttpClassicClient getHttpClient() {
Map<String, Object> custom = HTTPS_CONFIG_KEY_MAPS.keySet()
.stream()
.filter(sslKey -> this.config.keys().contains(Config.canonicalizeKey(sslKey)))
.collect(Collectors.toMap(sslKey -> sslKey, sslKey -> {
Object value = this.config.get(sslKey, Object.class);
if (HTTPS_CONFIG_KEY_MAPS.get(sslKey)) {
value = this.decryptor.decrypt(ObjectUtils.cast(value));
}
return value;
}));

return this.httpClientFactory.create(HttpClassicClientFactory.Config.builder()
.socketTimeout(this.clientConfig.socketTimeout())
.connectTimeout(this.clientConfig.connectTimeout())
.custom(custom)
.build());
}

private HttpClassicClient getHttpClient(SecureConfig secureConfig) {
if (secureConfig == null) {
return getHttpClient();
}

Map<String, Object> custom = buildHttpsConfig(secureConfig);
return this.httpClientFactory.create(HttpClassicClientFactory.Config.builder()
.socketTimeout(this.clientConfig.socketTimeout())
.connectTimeout(this.clientConfig.connectTimeout())
.custom(custom)
.build());
}

private Map<String, Object> buildHttpsConfig(SecureConfig secureConfig) {
Map<String, Object> result = new HashMap<>();
putConfigIfNotNull(secureConfig.ignoreTrust(), "client.http.secure.ignore-trust", result);
putConfigIfNotNull(secureConfig.ignoreHostName(), "client.http.secure.ignore-hostname", result);
putConfigIfNotNull(secureConfig.trustStoreFile(), "client.http.secure.trust-store-file", result);
putConfigIfNotNull(secureConfig.trustStorePassword(), "client.http.secure.trust-store-password", result);
putConfigIfNotNull(secureConfig.keyStoreFile(), "client.http.secure.key-store-file", result);
putConfigIfNotNull(secureConfig.keyStorePassword(), "client.http.secure.key-store-password", result);
return result;
}

private static void putConfigIfNotNull(Object value, String key, Map<String, Object> result) {
if (value != null) {
result.put(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public interface OpenAiApi {
*/
String EMBEDDING_ENDPOINT = "/embeddings";

/**
* 图像生成请求的端点。
*/
String IMAGE_ENDPOINT = "/images/generations";

/**
* 请求头模型密钥字段。
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

package modelengine.fel.community.model.openai.entity.chat;

import static modelengine.fitframework.util.ObjectUtils.cast;

import modelengine.fel.core.chat.ChatMessage;
import modelengine.fel.core.chat.support.AiMessage;
import modelengine.fel.core.tool.ToolCall;
Expand All @@ -16,7 +14,10 @@
import modelengine.fitframework.util.CollectionUtils;
import modelengine.fitframework.util.StringUtils;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

/**
* OpenAi API 格式的会话补全响应。
Expand All @@ -36,18 +37,37 @@ public class OpenAiChatCompletionResponse {
* @return 表示模型回复的 {@link ChatMessage}。
*/
public ChatMessage message() {
return extractMessage(OpenAiChatMessage::content, OpenAiChatMessage::toolCalls);
}

/**
* 获取响应中的模型推理。
*
* @return 表示模型回复的 {@link ChatMessage}。
*/
public ChatMessage reasoningContent() {
return extractMessage(OpenAiChatMessage::reasoningContent, OpenAiChatMessage::toolCalls);
}

private ChatMessage extractMessage(
Function<OpenAiChatMessage, Object> contentExtractor,
Function<OpenAiChatMessage, List<ToolCall>> toolCallsExtractor) {
if (CollectionUtils.isEmpty(choices)) {
return EMPTY_RESPONSE;
}
OpenAiChatMessage openAiChatMessage = choices.get(0).message;
if (openAiChatMessage == null) {
return EMPTY_RESPONSE;
}
String content = StringUtils.EMPTY;
if (openAiChatMessage.content() instanceof String) {
content = cast(openAiChatMessage.content());
}
List<ToolCall> toolCalls = CollectionUtils.asParent(openAiChatMessage.toolCalls());

String content = Optional.ofNullable(contentExtractor.apply(openAiChatMessage))
.filter(obj -> obj instanceof String)
.map(obj -> (String) obj)
.orElse(StringUtils.EMPTY);

List<ToolCall> toolCalls = Optional.ofNullable(toolCallsExtractor.apply(openAiChatMessage))
.orElse(Collections.emptyList());

return new AiMessage(content, toolCalls);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class OpenAiChatMessage {
private String toolCallId;
@Property(name = "tool_calls")
private List<OpenAiToolCall> toolCalls;
@Property(name = "reasoning_content")
private String reasoningContent;

/**
* 将 {@link ChatMessage} 对象转换为 {@link OpenAiChatMessage} 对象。
Expand Down Expand Up @@ -79,6 +81,15 @@ public Object content() {
return this.content;
}

/**
* 获取模型推理内容。
*
* @return 表示推理内容的 {@link String}。
*/
public String reasoningContent() {
return this.reasoningContent;
}

/**
* 获取消息的工具调用。
*
Expand Down
Loading