Skip to content

Conversation

@kawayiYokami
Copy link
Contributor

@kawayiYokami kawayiYokami commented Dec 24, 2025

Motivation / 动机

为了给 AstrBot 用户提供更强大的 AI 交互体验,我们实现了 DeepSeek 思考模式工具调用功能。这个功能让 AI
模型能够在输出最终答案之前,进行多轮思考和工具调用,大幅提升了复杂问题的解决能力和答案的准确性。这解决了传统工具调用
只能进行单轮推理的局限性。

Modifications / 改动点

核心文件修改:

  • astrbot/core/provider/deepseek_thinking_manager.py - 新增思考会话管理器
  • astrbot/core/provider/sources/openai_source.py - 集成思考模式支持
  • astrbot/core/provider/entities.py - 扩展响应实体支持思考状态
  • astrbot/core/config/default.py - 添加思考模式配置选项
  • astrbot/core/utils/migra_helper.py - 添加配置迁移支持
  • astrbot/core/provider/init.py - 导出新的思考管理器
  • thinking_assistant_debug.py - 完整的生产可用助手脚本

实现的核心功能:

  1. 多轮思考循环 - 支持思考→工具调用→再思考→再工具调用的完整循环
  2. 智能上下文管理 - 同一轮对话保留 reasoning_content,新问题自动清理
  3. 流式和非流式支持 - 两种模式都完整支持思考模式
  4. 配置化管理 - 通过 ds_thinking_tool_call 开关控制功能
  5. 完整的调试支持 - 输出详细的 API 调用和响应信息
  6. 符合官方规范 - 完全按照 DeepSeek 官方文档实现
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

测试日志示例:
1 🤖 问题: 计算 200+300,然后告诉我北京天气
2 🧠 思考中...
3 ----------------------------------------
4 🔄 第 1 轮思考
5 🤔 思考过程: 用户要求计算200+300,我需要用加法工具...
6 🛠️ 执行 1 个工具:
7 • add_numbers({'a': 200, 'b': 300})
8 📤 500
9 ----------------------------------------
10 🔄 第 2 轮思考
11 🤔 思考过程: 加法结果是500,现在需要获取北京天气...
12 🛠️ 执行 1 个工具:
13 • get_weather({'location': '北京', 'date': '2025-12-25'})
14 📤 晴天,气温8-16°C
15 ----------------------------------------
16 💬 回答: 计算结果:200 + 300 = 500。北京今天天气晴天,气温8-16°C,适合出行。

调试文件输出:

  • 完整的 API 请求和响应 JSON 保存在 output/json/ 目录
  • 包含 reasoning_content、tool_calls、content 等完整信息
  • 验证了思考循环的正确执行和上下文管理

Checklist / 检查清单

✦ - [x] 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in
the PR, I have discussed it with the authors through issues/emails, etc.

  • 👀 我的更改经过了良好的测试,并已在上方提供了"验证步骤"和"运行截图"。/ My changes have been well-tested, and
    "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txt 和 pyproject.toml
    文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced,
    they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

在 OpenAI provider 中新增可配置的 DeepSeek 思维模式(thinking mode),支持多轮工具调用,并覆盖流式与非流式两种流程。

New Features:

  • 引入 DeepSeekThinkingManagerThinkingSession,用于管理 DeepSeek 模型的多轮推理和工具调用循环。
  • 当完成相应配置后,使 OpenAI provider 中的 DeepSeek 模型能够在同步与流式查询路径中执行多轮「思考→工具调用」循环。
  • 在 provider 与 provider_settings 两个层级暴露新的配置开关,用于启用 DeepSeek 思维模式的工具调用,并为现有的 DeepSeek reasoner 配置提供自动迁移。
  • 扩展 LLMResponse,以标记思维片段、工具结果、最终回答以及错误信息,从而实现更丰富的下游处理能力。
  • 新增独立调试脚本,用于试验 DeepSeek 思维模式,并记录原始请求/响应 JSON。

Enhancements:

  • 从 provider 包中导出 DeepSeekThinkingManager,以便在其他组件中复用。
Original summary in English

Summary by Sourcery

Add configurable DeepSeek thinking mode with multi-turn tool-calling support to the OpenAI provider, including both streaming and non-streaming flows.

New Features:

  • Introduce DeepSeekThinkingManager and ThinkingSession to manage multi-turn reasoning and tool-call loops for DeepSeek models.
  • Enable DeepSeek models in the OpenAI provider to perform multi-round thinking→tool-call cycles in both sync and streaming query paths when configured.
  • Expose new configuration flags to enable DeepSeek thinking tool calls at provider and provider_settings levels, with automatic migration for existing DeepSeek reasoner configs.
  • Extend LLMResponse to mark thinking chunks, tool results, final answers, and error messages for richer downstream handling.
  • Add a standalone debugging script for experimenting with DeepSeek thinking mode and recording raw request/response JSON.

Enhancements:

  • Export DeepSeekThinkingManager from the provider package for reuse in other components.
Original summary in English

Summary by Sourcery

在 OpenAI provider 中新增可配置的 DeepSeek 思维模式(thinking mode),支持多轮工具调用,并覆盖流式与非流式两种流程。

New Features:

  • 引入 DeepSeekThinkingManagerThinkingSession,用于管理 DeepSeek 模型的多轮推理和工具调用循环。
  • 当完成相应配置后,使 OpenAI provider 中的 DeepSeek 模型能够在同步与流式查询路径中执行多轮「思考→工具调用」循环。
  • 在 provider 与 provider_settings 两个层级暴露新的配置开关,用于启用 DeepSeek 思维模式的工具调用,并为现有的 DeepSeek reasoner 配置提供自动迁移。
  • 扩展 LLMResponse,以标记思维片段、工具结果、最终回答以及错误信息,从而实现更丰富的下游处理能力。
  • 新增独立调试脚本,用于试验 DeepSeek 思维模式,并记录原始请求/响应 JSON。

Enhancements:

  • 从 provider 包中导出 DeepSeekThinkingManager,以便在其他组件中复用。
Original summary in English

Summary by Sourcery

Add configurable DeepSeek thinking mode with multi-turn tool-calling support to the OpenAI provider, including both streaming and non-streaming flows.

New Features:

  • Introduce DeepSeekThinkingManager and ThinkingSession to manage multi-turn reasoning and tool-call loops for DeepSeek models.
  • Enable DeepSeek models in the OpenAI provider to perform multi-round thinking→tool-call cycles in both sync and streaming query paths when configured.
  • Expose new configuration flags to enable DeepSeek thinking tool calls at provider and provider_settings levels, with automatic migration for existing DeepSeek reasoner configs.
  • Extend LLMResponse to mark thinking chunks, tool results, final answers, and error messages for richer downstream handling.
  • Add a standalone debugging script for experimenting with DeepSeek thinking mode and recording raw request/response JSON.

Enhancements:

  • Export DeepSeekThinkingManager from the provider package for reuse in other components.

- 在配置文件中添加 ds_thinking_tool_call 开关
- 仅在 OpenAI provider 且检测到 DeepSeek 模型时生效
- 简化实现,只保留核心功能
- 添加配置迁移支持,自动为 deepseek-reasoner 模型启用该功能
实现完整的流式思考循环,不再使用简化处理:

核心功能:
- 新增 _query_stream_deepseek_thinking 方法
- 实现完整的流式思考-工具调用循环
- 实时输出思考内容、工具执行结果和最终答案
- 支持 reasoning_content 的流式回传

用户体验:
- 实时显示模型思考过程 (is_thinking=True)
- 实时显示工具执行结果 (is_tool_result=True)
- 清晰标记最终答案 (is_final_answer=True)
- 错误处理和状态提示 (is_error=True)

技术实现:
- 扩展 LLMResponse 添加思考模式状态字段
- 流式处理 reasoning_content、tool_calls 和 content
- 完整的工具执行和结果回传
- 会话管理和自动清理

现在流式模式和非流式模式都完全支持 DeepSeek 思考模式!
- 将MD5哈希替换为SHA-256提高安全性
- 移除日志中的敏感模型名称信息
- 解决CodeQL高优先级安全问题
user_content = msg.get("content", "")
break

session_id = hashlib.sha256(user_content.encode()).hexdigest()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
user_content = msg.get("content", "")
break

session_id = hashlib.sha256(user_content.encode()).hexdigest()

Check failure

Code scanning / CodeQL

Use of a broken or weak cryptographic hashing algorithm on sensitive data High

Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
Sensitive data (password)
is used in a hashing algorithm (SHA256) that is insecure for password hashing, since it is not a computationally expensive hash function.
@kawayiYokami
Copy link
Contributor Author

这个pr需要 多多测试,好像支持思考中调用工具的不止deepseek

@kawayiYokami kawayiYokami marked this pull request as ready for review December 25, 2025 05:42
@kawayiYokami kawayiYokami marked this pull request as draft December 25, 2025 05:43
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey,我这边发现了 6 个问题,并给出了一些整体性的反馈:

  • _query_deepseek_thinking_query_stream_deepseek_thinking 里,base_payload 只保留了 model/messages/tools,因此来自 payloads 的其他标准参数(例如 temperature、top_p 等)会被悄悄丢弃;建议在构建 call_payload 时,以原始的 payloads(减去非默认参数)为基础,以保持现有行为。
  • DeepSeekThinkingManager.get_api_messages 的定义中需要传入 initial_messages 参数,但在 openai_source.py 中是以 get_api_messages(session_id) 的形式调用,这会导致运行时报 TypeError;可以考虑让 initial_messages 变为可选并提供默认值,或者修改调用处以显式传入初始消息列表。
  • 在流式思考路径中,循环内部出现错误时既会 yield 一个 is_error=TrueLLMResponse,然后又会重新抛出异常;建议只选择一种机制(要么通过结构化错误响应,要么直接抛异常),以避免下游调用方既收到错误片段又遇到未预期异常这种令人困惑的情况。
给 AI Agent 的提示词
请根据下面这次代码评审中的评论逐条进行修改:

## 整体性评论
-`_query_deepseek_thinking``_query_stream_deepseek_thinking` 里,`base_payload` 只保留了 `model/messages/tools`,因此来自 `payloads` 的其他标准参数(例如 temperature、top_p 等)会被悄悄丢弃;建议在构建 `call_payload` 时,以原始的 `payloads`(减去非默认参数)为基础,以保持现有行为。
- `DeepSeekThinkingManager.get_api_messages` 的定义中需要传入 `initial_messages` 参数,但在 `openai_source.py` 中是以 `get_api_messages(session_id)` 的形式调用,这会导致运行时报 `TypeError`;可以考虑让 `initial_messages` 变为可选并提供默认值,或者修改调用处以显式传入初始消息列表。
- 在流式思考路径中,循环内部出现错误时既会 yield 一个 `is_error=True``LLMResponse`,然后又会重新抛出异常;建议只选择一种机制(要么通过结构化错误响应,要么直接抛异常),以避免下游调用方既收到错误片段又遇到未预期异常这种令人困惑的情况。

## 逐条评论

### Comment 1
<location> `astrbot/core/provider/sources/openai_source.py:219` </location>
<code_context>
+        # 思考循环
+        while session.should_continue():
+            # 获取当前上下文消息
+            current_messages = self.thinking_manager.get_api_messages(session_id)
+            if not current_messages:
+                # 第一次调用,使用原始消息
</code_context>

<issue_to_address>
**issue (bug_risk):** DeepSeekThinkingManager.get_api_messages 在调用时缺少必需参数,运行时会抛异常。

`DeepSeekThinkingManager.get_api_messages` 的定义如下:
```python
def get_api_messages(self, session_id: str, initial_messages: list[dict]) -> list[dict]:
````_query_deepseek_thinking``_query_stream_deepseek_thinking` 中调用时缺少 `initial_messages````python
current_messages = self.thinking_manager.get_api_messages(session_id)
```
这里应该传入原始的 `messages````python
current_messages = self.thinking_manager.get_api_messages(session_id, messages)
```
两个调用点都需要修正。
</issue_to_address>

### Comment 2
<location> `astrbot/core/provider/sources/openai_source.py:189-190` </location>
<code_context>
+                omit_empty_parameter_field=omit_empty_param_field,
+            )
+
+        # 准备基础payload
+        base_payload = {
+            "model": model,
+            "messages": messages,
</code_context>

<issue_to_address>
**issue (bug_risk):** 在思考模式下,默认的模型参数(例如 temperature、max_tokens)没有进入 call_payload,导致被丢弃。

在 `_query_deepseek_thinking` 中,`base_payload` 构造方式如下:
```python
base_payload = {
    "model": model,
    "messages": messages,
    "tools": tool_list if tool_list else None,
}
```
之后你从 `payloads` 中把非默认的 key 移到 `extra_body`,但 `call_payload` 始终是 `base_payload.copy()`,从未再与 `payloads` 进行合并。结果就是,出现在 `payloads` 中且属于 `self.default_params` 的标准参数(例如 `temperature``max_tokens``top_p`)在思考请求中会被忽略。

为了避免和非思考路径 `_query`(直接传入 `**payloads`)相比出现行为回退,可以:
- 在移动非默认参数之后,从 `payloads` 重建 `base_payload`,或者
- 在思考循环之前,把 `payloads` 中的默认参数显式复制进 `base_payload``_query_stream_deepseek_thinking` 中也需要做同样的修正,以保持流式 / 非流式行为一致。
</issue_to_address>

### Comment 3
<location> `astrbot/core/provider/sources/openai_source.py:586-598` </location>
<code_context>
         if isinstance(custom_extra_body, dict):
             extra_body.update(custom_extra_body)

+        # 针对 deepseek 模型的特殊处理:思考中调用工具
+        if "deepseek" in model and self.provider_config.get(
+            "ds_thinking_tool_call", False
+        ):
+            # 为 DeepSeek 模型启用思考模式
+            if "thinking" not in extra_body:
+                extra_body["thinking"] = {"type": "enabled"}
+            logger.debug(
+                "DeepSeek model detected in stream with thinking tool call enabled"
+            )
</code_context>

<issue_to_address>
**suggestion:** 在通用的 stream 路径中,这段额外的 DeepSeek 思考逻辑看起来是死代码,因为前面已经有基于 thinking-manager 的分流。

由于 `_query_stream` 已经会把所有 `is_thinking_enabled(model, self.provider_config)` 的情况路由到 `_query_stream_deepseek_thinking` 并直接返回,对于任何 `ds_thinking_tool_call=True` 的 DeepSeek 模型,都不会再走到后面这个 `if "deepseek" in model and self.provider_config.get("ds_thinking_tool_call", False)` 分支。如果你确实想保留一条不同的路径(例如不走 manager 的 DeepSeek 思考,或者由不同的开关控制),就需要调整条件;否则可以移除这块逻辑,以简化控制流。

```suggestion
        if isinstance(custom_extra_body, dict):
            extra_body.update(custom_extra_body)
```
</issue_to_address>

### Comment 4
<location> `astrbot/core/provider/sources/openai_source.py:267-276` </location>
<code_context>
+
+                    return final_response
+
+            except Exception as e:
+                logger.error(f"Error in thinking session {session_id}: {e}")
+                self.thinking_manager.cleanup_session(session_id)
+                raise
+
+        # 如果循环结束但没有得到答案,返回错误
</code_context>

<issue_to_address>
**suggestion (bug_risk):** 在流式思考模式的错误处理里,既会 yield 一个错误的 LLMResponse,又会重新抛出异常,这可能会让调用方感到困惑。

在 `_query_stream_deepseek_thinking` 中,先 yield 一个 `is_error=True``LLMResponse`,然后再重新抛出异常,意味着调用方既会收到一个错误 chunk,又会遇到异步生成器抛出的异常。如果它们把 `is_error` 当作终止信号,这种行为可能导致重复处理或未处理的错误。

更清晰的做法是二选一:(a)只 yield 错误响应而不再抛异常,或者(b)只抛异常并让调用方负责格式化错误。建议和非流式错误行为保持一致,使 API 行为更可预测。
</issue_to_address>

### Comment 5
<location> `thinking_assistant_debug.py:12-17` </location>
<code_context>
+from openai import AsyncOpenAI
+
+# 配置
+API_KEY = "输入你的key"  # 填入你的 DeepSeek API Key
+MODEL = "deepseek-chat"
+BASE_URL = "https://api.deepseek.com"
</code_context>

<issue_to_address>
**🚨 suggestion (security):** 即便是占位用的 API key,也不应写死在代码中;从环境变量 / 配置读取既更安全,也更方便本地使用。

即使是调试脚本,在代码中写死 key 也容易诱导他人提交真实秘钥,并且不利于在多环境之间切换使用。

更推荐从环境 / 配置中读取,例如:
```python
API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not API_KEY:
    print("❌ 请先设置环境变量 DEEPSEEK_API_KEY")
    return
```
这样可以避免把秘钥放进仓库里,也更符合常见的部署实践。

```suggestion
from openai import AsyncOpenAI

# 配置
API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not API_KEY:
    raise RuntimeError("❌ 请先设置环境变量 DEEPSEEK_API_KEY 用于访问 DeepSeek 接口")

MODEL = "deepseek-chat"
BASE_URL = "https://api.deepseek.com"
```
</issue_to_address>

### Comment 6
<location> `astrbot/core/provider/sources/openai_source.py:112` </location>
<code_context>
             raise Exception(f"获取模型列表失败:{e}")

     async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+        model = payloads.get("model", "").lower()
+
</code_context>

<issue_to_address>
**issue (complexity):** 建议抽取工具 / extra_body 构造、DeepSeek 会话 ID 生成、以及思考模式初始化的公共逻辑,减少重复,让 DeepSeek 相关路径更易理解。

在保持现有 DeepSeek 功能的前提下,可以通过抽出几个小的辅助函数,让思考和非思考路径都复用,从而移除大量重复代码和嵌套。

### 1. 去重工具列表和 extra_body 处理逻辑

以下逻辑在 `_query``_query_stream``_query_deepseek_thinking``_query_stream_deepseek_thinking` 中重复出现:

- 计算 `model = payloads["model"].lower()`
- 基于 `ToolSet` 构造工具 payload,对 Gemini 使用 `omit_empty_param_field`
- 使用 `self.default_params``payloads` 拆分为 `payloads``extra_body`
- 合并来自 `provider_config``custom_extra_body`
- DeepSeek 思考时设置 `extra_body["thinking"]`

可以将这些逻辑集中到两个辅助函数中,并在各处复用:

```python
def _build_tools_payload(
    self, model: str, tools: ToolSet | None
) -> list | None:
    if not tools:
        return None
    omit_empty_param_field = "gemini" in model.lower()
    return tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    ) or None

def _split_payload_and_extra_body(
    self, payloads: dict, *, enable_thinking: bool = False
) -> tuple[dict, dict]:
    base_payload = dict(payloads)  # shallow copy
    extra_body: dict = {}

    # move non-default params into extra_body
    for key in list(base_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = base_payload.pop(key)

    # merge custom_extra_body
    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    if enable_thinking:
        extra_body.setdefault("thinking", {"type": "enabled"})

    return base_payload, extra_body
````_query` 中使用之后,代码会精简很多,并且 DeepSeek 的差异点会更加明显:

```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    model = payloads.get("model", "").lower()

    if self.thinking_manager.is_thinking_enabled(model, self.provider_config):
        return await self._query_deepseek_thinking(payloads, tools)

    tool_list = self._build_tools_payload(model, tools)
    if tool_list:
        payloads["tools"] = tool_list

    payloads, extra_body = self._split_payload_and_extra_body(payloads)

    completion = await self.client.chat.completions.create(
        **payloads, stream=False, extra_body=extra_body
    )
    ...
````_query_stream_deepseek_thinking` / `_query_deepseek_thinking` 中,则可以这样复用:

```python
tool_list = self._build_tools_payload(model, tools)
base_payload = {
    "model": model,
    "messages": messages,
}
if tool_list:
    base_payload["tools"] = tool_list

base_payload, extra_body = self._split_payload_and_extra_body(
    {**base_payload, **payloads}, enable_thinking=True
)
```

这样就可以去掉四个位置上重复的 `to_del` 循环和 custom_extra_body 合并代码。

### 2. 去重 session id 生成逻辑

两个思考方法都重复了相同的“取首条 user 消息 → SHA‑256”的逻辑,并且在函数内部 import hashlib。可以抽取成一个小的私有辅助方法,让主流程更聚焦于“编排”:

```python
import hashlib  # 顶部统一导入

def _build_thinking_session_id(self, messages: list[dict]) -> str:
    user_content = ""
    for msg in messages:
        if msg.get("role") == "user":
            user_content = msg.get("content", "") or ""
            break
    return hashlib.sha256(user_content.encode("utf-8")).hexdigest()
```

这样两个方法就可以简化为:

```python
model = payloads.get("model", "")
messages = payloads.get("messages", []) or []

session_id = self._build_thinking_session_id(messages)
session = self.thinking_manager.start_new_session(session_id, messages)
```

(如果 `DeepSeekThinkingManager` 更希望拿到 `user_content` 而不是消息列表,可以同时传入或调整其签名。)

### 3. 抽取思考模式专用的初始化辅助函数

`_query_deepseek_thinking``_query_stream_deepseek_thinking` 共享的初始化逻辑包括:

- 计算 `model``messages`
- 生成 `session_id` 并启动 session
- 构建 `tool_list`
- 构建 `base_payload`
- 在启用 thinking 的前提下拆分 payload / extra_body

可以把这些样板步骤封装到一个辅助函数中,让两个主方法的主体更聚焦在循环以及流式 vs 非流式的差异上:

```python
def _prepare_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> tuple[str, Any, dict, dict, list | None]:
    model = payloads.get("model", "")
    messages = payloads.get("messages", []) or []

    session_id = self._build_thinking_session_id(messages)
    session = self.thinking_manager.start_new_session(session_id, messages)

    tool_list = self._build_tools_payload(model, tools)

    base_payload = {
        "model": model,
        "messages": messages,
    }
    if tool_list:
        base_payload["tools"] = tool_list

    # 与原始 payloads 合并,保证调用方仍然可以覆盖默认值
    merged = {**payloads, **base_payload}
    merged, extra_body = self._split_payload_and_extra_body(
        merged, enable_thinking=True
    )

    return session_id, session, merged, extra_body, tool_list
```

这样 `_query_deepseek_thinking` 就可以大幅缩减为:

```python
async def _query_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> LLMResponse:
    session_id, session, base_payload, extra_body, tool_list = (
        self._prepare_deepseek_thinking(payloads, tools)
    )

    while session.should_continue():
        current_messages = self.thinking_manager.get_api_messages(session_id) or base_payload["messages"]
        call_payload = dict(base_payload, messages=current_messages)

        try:
            completion = await self.client.chat.completions.create(
                **call_payload, stream=False, extra_body=extra_body
            )
            response_dict = completion.model_dump()
            should_continue, tool_calls, final_answer = (
                self.thinking_manager.process_response(session_id, response_dict)
            )
            ...
        except Exception:
            ...
```

`_query_stream_deepseek_thinking` 也可以复用同样的 `session_id, session, base_payload, extra_body, tool_list`,而无需再次展开所有初始化逻辑。

通过这三个辅助函数,可以在保持现有行为不变的情况下,让主方法更加简洁,并更明显地体现“思考编排”的主线逻辑,有助于后续维护者理解和演进 DeepSeek 的流程。
</issue_to_address>

Sourcery 对开源项目免费使用——如果你觉得这次评审有帮助,欢迎分享 ✨
帮我变得更有用!请对每条评论点 👍 或 👎,我会根据反馈改进后续评审。
Original comment in English

Hey - I've found 6 issues, and left some high level feedback:

  • In both _query_deepseek_thinking and _query_stream_deepseek_thinking, base_payload only carries model/messages/tools, so any other standard parameters from payloads (e.g. temperature, top_p, etc.) are silently dropped; consider building call_payload starting from the original payloads (minus non-defaults) to preserve existing behavior.
  • DeepSeekThinkingManager.get_api_messages is defined to take an initial_messages parameter but is called as get_api_messages(session_id) in openai_source.py, which will raise a TypeError; either make initial_messages optional with a default or update the call sites to pass the initial message list.
  • In the streaming thinking path, errors inside the loop both yield an LLMResponse with is_error=True and then re-raise the exception; consider choosing one mechanism (either structured error responses or exceptions) to avoid surprising consumers that may receive both an error chunk and an unexpected exception.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In both `_query_deepseek_thinking` and `_query_stream_deepseek_thinking`, `base_payload` only carries `model/messages/tools`, so any other standard parameters from `payloads` (e.g. temperature, top_p, etc.) are silently dropped; consider building `call_payload` starting from the original `payloads` (minus non-defaults) to preserve existing behavior.
- `DeepSeekThinkingManager.get_api_messages` is defined to take an `initial_messages` parameter but is called as `get_api_messages(session_id)` in `openai_source.py`, which will raise a `TypeError`; either make `initial_messages` optional with a default or update the call sites to pass the initial message list.
- In the streaming thinking path, errors inside the loop both yield an `LLMResponse` with `is_error=True` and then re-raise the exception; consider choosing one mechanism (either structured error responses or exceptions) to avoid surprising consumers that may receive both an error chunk and an unexpected exception.

## Individual Comments

### Comment 1
<location> `astrbot/core/provider/sources/openai_source.py:219` </location>
<code_context>
+        # 思考循环
+        while session.should_continue():
+            # 获取当前上下文消息
+            current_messages = self.thinking_manager.get_api_messages(session_id)
+            if not current_messages:
+                # 第一次调用,使用原始消息
</code_context>

<issue_to_address>
**issue (bug_risk):** DeepSeekThinkingManager.get_api_messages is called with missing required argument, which will raise at runtime.

`DeepSeekThinkingManager.get_api_messages` is defined as:
```python
def get_api_messages(self, session_id: str, initial_messages: list[dict]) -> list[dict]:
```
In `_query_deepseek_thinking` and `_query_stream_deepseek_thinking` it’s called without `initial_messages`:
```python
current_messages = self.thinking_manager.get_api_messages(session_id)
```
You should pass the original `messages` instead:
```python
current_messages = self.thinking_manager.get_api_messages(session_id, messages)
```
in both call sites.
</issue_to_address>

### Comment 2
<location> `astrbot/core/provider/sources/openai_source.py:189-190` </location>
<code_context>
+                omit_empty_parameter_field=omit_empty_param_field,
+            )
+
+        # 准备基础payload
+        base_payload = {
+            "model": model,
+            "messages": messages,
</code_context>

<issue_to_address>
**issue (bug_risk):** Default model parameters (e.g. temperature, max_tokens) are dropped in thinking mode because they never make it into call_payload.

In `_query_deepseek_thinking`, `base_payload` is built as:
```python
base_payload = {
    "model": model,
    "messages": messages,
    "tools": tool_list if tool_list else None,
}
```
Then you strip non-default keys from `payloads` into `extra_body`, but `call_payload` is always `base_payload.copy()` and never re-merged with `payloads`. As a result, standard params present in `payloads` and `self.default_params` (e.g. `temperature`, `max_tokens`, `top_p`) are ignored for thinking requests.

To avoid this regression vs. the non-thinking `_query` path (which passes `**payloads`), either:
- Rebuild `base_payload` from `payloads` after moving non-default keys, or
- Explicitly copy default params from `payloads` into `base_payload` before the thinking loop.

The same fix is needed in `_query_stream_deepseek_thinking` to keep streaming/non-streaming behavior consistent.
</issue_to_address>

### Comment 3
<location> `astrbot/core/provider/sources/openai_source.py:586-598` </location>
<code_context>
         if isinstance(custom_extra_body, dict):
             extra_body.update(custom_extra_body)

+        # 针对 deepseek 模型的特殊处理:思考中调用工具
+        if "deepseek" in model and self.provider_config.get(
+            "ds_thinking_tool_call", False
+        ):
+            # 为 DeepSeek 模型启用思考模式
+            if "thinking" not in extra_body:
+                extra_body["thinking"] = {"type": "enabled"}
+            logger.debug(
+                "DeepSeek model detected in stream with thinking tool call enabled"
+            )
</code_context>

<issue_to_address>
**suggestion:** The extra DeepSeek-thinking block in the generic stream path appears to be dead code given the earlier thinking-manager routing.

Because `_query_stream` already routes all `is_thinking_enabled(model, self.provider_config)` cases through `_query_stream_deepseek_thinking` and returns immediately, any DeepSeek model with `ds_thinking_tool_call=True` will never reach this later `if "deepseek" in model and self.provider_config.get("ds_thinking_tool_call", False)` block. If you do want a distinct path (e.g., DeepSeek thinking without the manager or under a different flag), the conditions need to differ; otherwise this block can be removed to simplify control flow.

```suggestion
        if isinstance(custom_extra_body, dict):
            extra_body.update(custom_extra_body)
```
</issue_to_address>

### Comment 4
<location> `astrbot/core/provider/sources/openai_source.py:267-276` </location>
<code_context>
+
+                    return final_response
+
+            except Exception as e:
+                logger.error(f"Error in thinking session {session_id}: {e}")
+                self.thinking_manager.cleanup_session(session_id)
+                raise
+
+        # 如果循环结束但没有得到答案,返回错误
</code_context>

<issue_to_address>
**suggestion (bug_risk):** In streaming-thinking error handling you both yield an error LLMResponse and then re-raise, which can surprise consumers.

In `_query_stream_deepseek_thinking`, the pattern of yielding an error `LLMResponse` and then re-raising means callers may see both an error chunk and an exception from the async generator. If they treat `is_error` as terminal, this can cause duplicated or unhandled error handling.

It would be clearer to either (a) yield the error response and not re-raise, or (b) just raise and let callers format errors. Align this with the non-streaming error behavior so the API is predictable.
</issue_to_address>

### Comment 5
<location> `thinking_assistant_debug.py:12-17` </location>
<code_context>
+from openai import AsyncOpenAI
+
+# 配置
+API_KEY = "输入你的key"  # 填入你的 DeepSeek API Key
+MODEL = "deepseek-chat"
+BASE_URL = "https://api.deepseek.com"
</code_context>

<issue_to_address>
**🚨 suggestion (security):** Hard-coded API key placeholder is better sourced from environment/config to reduce risk and ease local use.

Even for debug helpers, defining the key in code encourages committing real secrets and makes multi-environment use harder.

Prefer reading from env/config instead, e.g.:
```python
API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not API_KEY:
    print("❌ 请先设置环境变量 DEEPSEEK_API_KEY")
    return
```
This keeps secrets out of the repo and matches typical deployment practices.

```suggestion
from openai import AsyncOpenAI

# 配置
API_KEY = os.getenv("DEEPSEEK_API_KEY")
if not API_KEY:
    raise RuntimeError("❌ 请先设置环境变量 DEEPSEEK_API_KEY 用于访问 DeepSeek 接口")

MODEL = "deepseek-chat"
BASE_URL = "https://api.deepseek.com"
```
</issue_to_address>

### Comment 6
<location> `astrbot/core/provider/sources/openai_source.py:112` </location>
<code_context>
             raise Exception(f"获取模型列表失败:{e}")

     async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+        model = payloads.get("model", "").lower()
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for tools/extra_body setup, DeepSeek session id creation, and thinking-specific initialization to remove duplication and make the DeepSeek paths easier to follow.

You can keep all the new DeepSeek functionality while cutting a lot of duplication and nesting by extracting a few small helpers that both thinking and non‑thinking paths use.

### 1. Deduplicate tool list & extra_body handling

The following logic appears in `_query`, `_query_stream`, `_query_deepseek_thinking`, `_query_stream_deepseek_thinking`:

- Compute `model = payloads["model"].lower()`
- Build tools payload from `ToolSet` with `omit_empty_param_field` for Gemini
- Split `payloads` into `payloads` vs `extra_body` using `self.default_params`
- Merge `custom_extra_body` from `provider_config`
- For DeepSeek thinking: set `extra_body["thinking"]`

You can centralize this into two helpers and reuse everywhere:

```python
def _build_tools_payload(
    self, model: str, tools: ToolSet | None
) -> list | None:
    if not tools:
        return None
    omit_empty_param_field = "gemini" in model.lower()
    return tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    ) or None

def _split_payload_and_extra_body(
    self, payloads: dict, *, enable_thinking: bool = False
) -> tuple[dict, dict]:
    base_payload = dict(payloads)  # shallow copy
    extra_body: dict = {}

    # move non-default params into extra_body
    for key in list(base_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = base_payload.pop(key)

    # merge custom_extra_body
    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    if enable_thinking:
        extra_body.setdefault("thinking", {"type": "enabled"})

    return base_payload, extra_body
```

Usage in `_query` becomes very small and makes the DeepSeek difference obvious:

```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    model = payloads.get("model", "").lower()

    if self.thinking_manager.is_thinking_enabled(model, self.provider_config):
        return await self._query_deepseek_thinking(payloads, tools)

    tool_list = self._build_tools_payload(model, tools)
    if tool_list:
        payloads["tools"] = tool_list

    payloads, extra_body = self._split_payload_and_extra_body(payloads)

    completion = await self.client.chat.completions.create(
        **payloads, stream=False, extra_body=extra_body
    )
    ...
```

And in `_query_stream_deepseek_thinking` / `_query_deepseek_thinking` you can do:

```python
tool_list = self._build_tools_payload(model, tools)
base_payload = {
    "model": model,
    "messages": messages,
}
if tool_list:
    base_payload["tools"] = tool_list

base_payload, extra_body = self._split_payload_and_extra_body(
    {**base_payload, **payloads}, enable_thinking=True
)
```

That removes the repeated `to_del` loops and custom_extra_body merging from four places.

### 2. Deduplicate session id generation

Both thinking methods repeat the same “first user message → SHA‑256” logic and import hashlib inside the function. A tiny private helper keeps them focused on orchestration:

```python
import hashlib  # top of file

def _build_thinking_session_id(self, messages: list[dict]) -> str:
    user_content = ""
    for msg in messages:
        if msg.get("role") == "user":
            user_content = msg.get("content", "") or ""
            break
    return hashlib.sha256(user_content.encode("utf-8")).hexdigest()
```

Then both methods reduce to:

```python
model = payloads.get("model", "")
messages = payloads.get("messages", []) or []

session_id = self._build_thinking_session_id(messages)
session = self.thinking_manager.start_new_session(session_id, messages)
```

(If `DeepSeekThinkingManager` prefers `user_content`, you can still pass both or adjust its signature.)

### 3. Extract thinking‑specific setup to a shared helper

`_query_deepseek_thinking` and `_query_stream_deepseek_thinking` share the same setup:

- compute `model`, `messages`
- build `session_id`, start session
- build `tool_list`
- build `base_payload`
- split payloads / extra_body with thinking enabled

You can hide that boilerplate in one helper and make both main methods mostly about the loop and streaming vs non‑stream behavior:

```python
def _prepare_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> tuple[str, Any, dict, dict, list | None]:
    model = payloads.get("model", "")
    messages = payloads.get("messages", []) or []

    session_id = self._build_thinking_session_id(messages)
    session = self.thinking_manager.start_new_session(session_id, messages)

    tool_list = self._build_tools_payload(model, tools)

    base_payload = {
        "model": model,
        "messages": messages,
    }
    if tool_list:
        base_payload["tools"] = tool_list

    # merge with original payloads so caller can override defaults if needed
    merged = {**payloads, **base_payload}
    merged, extra_body = self._split_payload_and_extra_body(
        merged, enable_thinking=True
    )

    return session_id, session, merged, extra_body, tool_list
```

Then `_query_deepseek_thinking` shrinks down:

```python
async def _query_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> LLMResponse:
    session_id, session, base_payload, extra_body, tool_list = (
        self._prepare_deepseek_thinking(payloads, tools)
    )

    while session.should_continue():
        current_messages = self.thinking_manager.get_api_messages(session_id) or base_payload["messages"]
        call_payload = dict(base_payload, messages=current_messages)

        try:
            completion = await self.client.chat.completions.create(
                **call_payload, stream=False, extra_body=extra_body
            )
            response_dict = completion.model_dump()
            should_continue, tool_calls, final_answer = (
                self.thinking_manager.process_response(session_id, response_dict)
            )
            ...
        except Exception:
            ...
```

And `_query_stream_deepseek_thinking` can reuse the same `session_id, session, base_payload, extra_body, tool_list` without re‑expanding all that logic.

These three helpers keep behavior identical but make the main methods much shorter and more obviously about “thinking orchestration” instead of request munging, which should help the next person reason about and evolve the DeepSeek flow.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

except NotFoundError as e:
raise Exception(f"获取模型列表失败:{e}")

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议抽取工具 / extra_body 构造、DeepSeek 会话 ID 生成、以及思考模式初始化的公共逻辑,减少重复,让 DeepSeek 相关路径更易理解。

在保持现有 DeepSeek 功能的前提下,可以通过抽出几个小的辅助函数,让思考和非思考路径都复用,从而移除大量重复代码和嵌套。

1. 去重工具列表和 extra_body 处理逻辑

以下逻辑在 _query_query_stream_query_deepseek_thinking_query_stream_deepseek_thinking 中重复出现:

  • 计算 model = payloads["model"].lower()
  • 基于 ToolSet 构造工具 payload,对 Gemini 使用 omit_empty_param_field
  • 使用 self.default_paramspayloads 拆分为 payloadsextra_body
  • 合并来自 provider_configcustom_extra_body
  • DeepSeek 思考时设置 extra_body["thinking"]

可以将这些逻辑集中到两个辅助函数中,并在各处复用:

def _build_tools_payload(
    self, model: str, tools: ToolSet | None
) -> list | None:
    if not tools:
        return None
    omit_empty_param_field = "gemini" in model.lower()
    return tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    ) or None

def _split_payload_and_extra_body(
    self, payloads: dict, *, enable_thinking: bool = False
) -> tuple[dict, dict]:
    base_payload = dict(payloads)  # shallow copy
    extra_body: dict = {}

    # move non-default params into extra_body
    for key in list(base_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = base_payload.pop(key)

    # merge custom_extra_body
    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    if enable_thinking:
        extra_body.setdefault("thinking", {"type": "enabled"})

    return base_payload, extra_body

_query 中使用之后,代码会精简很多,并且 DeepSeek 的差异点会更加明显:

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    model = payloads.get("model", "").lower()

    if self.thinking_manager.is_thinking_enabled(model, self.provider_config):
        return await self._query_deepseek_thinking(payloads, tools)

    tool_list = self._build_tools_payload(model, tools)
    if tool_list:
        payloads["tools"] = tool_list

    payloads, extra_body = self._split_payload_and_extra_body(payloads)

    completion = await self.client.chat.completions.create(
        **payloads, stream=False, extra_body=extra_body
    )
    ...

_query_stream_deepseek_thinking / _query_deepseek_thinking 中,则可以这样复用:

tool_list = self._build_tools_payload(model, tools)
base_payload = {
    "model": model,
    "messages": messages,
}
if tool_list:
    base_payload["tools"] = tool_list

base_payload, extra_body = self._split_payload_and_extra_body(
    {**base_payload, **payloads}, enable_thinking=True
)

这样就可以去掉四个位置上重复的 to_del 循环和 custom_extra_body 合并代码。

2. 去重 session id 生成逻辑

两个思考方法都重复了相同的“取首条 user 消息 → SHA‑256”的逻辑,并且在函数内部 import hashlib。可以抽取成一个小的私有辅助方法,让主流程更聚焦于“编排”:

import hashlib  # 顶部统一导入

def _build_thinking_session_id(self, messages: list[dict]) -> str:
    user_content = ""
    for msg in messages:
        if msg.get("role") == "user":
            user_content = msg.get("content", "") or ""
            break
    return hashlib.sha256(user_content.encode("utf-8")).hexdigest()

这样两个方法就可以简化为:

model = payloads.get("model", "")
messages = payloads.get("messages", []) or []

session_id = self._build_thinking_session_id(messages)
session = self.thinking_manager.start_new_session(session_id, messages)

(如果 DeepSeekThinkingManager 更希望拿到 user_content 而不是消息列表,可以同时传入或调整其签名。)

3. 抽取思考模式专用的初始化辅助函数

_query_deepseek_thinking_query_stream_deepseek_thinking 共享的初始化逻辑包括:

  • 计算 modelmessages
  • 生成 session_id 并启动 session
  • 构建 tool_list
  • 构建 base_payload
  • 在启用 thinking 的前提下拆分 payload / extra_body

可以把这些样板步骤封装到一个辅助函数中,让两个主方法的主体更聚焦在循环以及流式 vs 非流式的差异上:

def _prepare_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> tuple[str, Any, dict, dict, list | None]:
    model = payloads.get("model", "")
    messages = payloads.get("messages", []) or []

    session_id = self._build_thinking_session_id(messages)
    session = self.thinking_manager.start_new_session(session_id, messages)

    tool_list = self._build_tools_payload(model, tools)

    base_payload = {
        "model": model,
        "messages": messages,
    }
    if tool_list:
        base_payload["tools"] = tool_list

    # 与原始 payloads 合并,保证调用方仍然可以覆盖默认值
    merged = {**payloads, **base_payload}
    merged, extra_body = self._split_payload_and_extra_body(
        merged, enable_thinking=True
    )

    return session_id, session, merged, extra_body, tool_list

这样 _query_deepseek_thinking 就可以大幅缩减为:

async def _query_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> LLMResponse:
    session_id, session, base_payload, extra_body, tool_list = (
        self._prepare_deepseek_thinking(payloads, tools)
    )

    while session.should_continue():
        current_messages = self.thinking_manager.get_api_messages(session_id) or base_payload["messages"]
        call_payload = dict(base_payload, messages=current_messages)

        try:
            completion = await self.client.chat.completions.create(
                **call_payload, stream=False, extra_body=extra_body
            )
            response_dict = completion.model_dump()
            should_continue, tool_calls, final_answer = (
                self.thinking_manager.process_response(session_id, response_dict)
            )
            ...
        except Exception:
            ...

_query_stream_deepseek_thinking 也可以复用同样的 session_id, session, base_payload, extra_body, tool_list,而无需再次展开所有初始化逻辑。

通过这三个辅助函数,可以在保持现有行为不变的情况下,让主方法更加简洁,并更明显地体现“思考编排”的主线逻辑,有助于后续维护者理解和演进 DeepSeek 的流程。

Original comment in English

issue (complexity): Consider extracting shared helpers for tools/extra_body setup, DeepSeek session id creation, and thinking-specific initialization to remove duplication and make the DeepSeek paths easier to follow.

You can keep all the new DeepSeek functionality while cutting a lot of duplication and nesting by extracting a few small helpers that both thinking and non‑thinking paths use.

1. Deduplicate tool list & extra_body handling

The following logic appears in _query, _query_stream, _query_deepseek_thinking, _query_stream_deepseek_thinking:

  • Compute model = payloads["model"].lower()
  • Build tools payload from ToolSet with omit_empty_param_field for Gemini
  • Split payloads into payloads vs extra_body using self.default_params
  • Merge custom_extra_body from provider_config
  • For DeepSeek thinking: set extra_body["thinking"]

You can centralize this into two helpers and reuse everywhere:

def _build_tools_payload(
    self, model: str, tools: ToolSet | None
) -> list | None:
    if not tools:
        return None
    omit_empty_param_field = "gemini" in model.lower()
    return tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    ) or None

def _split_payload_and_extra_body(
    self, payloads: dict, *, enable_thinking: bool = False
) -> tuple[dict, dict]:
    base_payload = dict(payloads)  # shallow copy
    extra_body: dict = {}

    # move non-default params into extra_body
    for key in list(base_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = base_payload.pop(key)

    # merge custom_extra_body
    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    if enable_thinking:
        extra_body.setdefault("thinking", {"type": "enabled"})

    return base_payload, extra_body

Usage in _query becomes very small and makes the DeepSeek difference obvious:

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    model = payloads.get("model", "").lower()

    if self.thinking_manager.is_thinking_enabled(model, self.provider_config):
        return await self._query_deepseek_thinking(payloads, tools)

    tool_list = self._build_tools_payload(model, tools)
    if tool_list:
        payloads["tools"] = tool_list

    payloads, extra_body = self._split_payload_and_extra_body(payloads)

    completion = await self.client.chat.completions.create(
        **payloads, stream=False, extra_body=extra_body
    )
    ...

And in _query_stream_deepseek_thinking / _query_deepseek_thinking you can do:

tool_list = self._build_tools_payload(model, tools)
base_payload = {
    "model": model,
    "messages": messages,
}
if tool_list:
    base_payload["tools"] = tool_list

base_payload, extra_body = self._split_payload_and_extra_body(
    {**base_payload, **payloads}, enable_thinking=True
)

That removes the repeated to_del loops and custom_extra_body merging from four places.

2. Deduplicate session id generation

Both thinking methods repeat the same “first user message → SHA‑256” logic and import hashlib inside the function. A tiny private helper keeps them focused on orchestration:

import hashlib  # top of file

def _build_thinking_session_id(self, messages: list[dict]) -> str:
    user_content = ""
    for msg in messages:
        if msg.get("role") == "user":
            user_content = msg.get("content", "") or ""
            break
    return hashlib.sha256(user_content.encode("utf-8")).hexdigest()

Then both methods reduce to:

model = payloads.get("model", "")
messages = payloads.get("messages", []) or []

session_id = self._build_thinking_session_id(messages)
session = self.thinking_manager.start_new_session(session_id, messages)

(If DeepSeekThinkingManager prefers user_content, you can still pass both or adjust its signature.)

3. Extract thinking‑specific setup to a shared helper

_query_deepseek_thinking and _query_stream_deepseek_thinking share the same setup:

  • compute model, messages
  • build session_id, start session
  • build tool_list
  • build base_payload
  • split payloads / extra_body with thinking enabled

You can hide that boilerplate in one helper and make both main methods mostly about the loop and streaming vs non‑stream behavior:

def _prepare_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> tuple[str, Any, dict, dict, list | None]:
    model = payloads.get("model", "")
    messages = payloads.get("messages", []) or []

    session_id = self._build_thinking_session_id(messages)
    session = self.thinking_manager.start_new_session(session_id, messages)

    tool_list = self._build_tools_payload(model, tools)

    base_payload = {
        "model": model,
        "messages": messages,
    }
    if tool_list:
        base_payload["tools"] = tool_list

    # merge with original payloads so caller can override defaults if needed
    merged = {**payloads, **base_payload}
    merged, extra_body = self._split_payload_and_extra_body(
        merged, enable_thinking=True
    )

    return session_id, session, merged, extra_body, tool_list

Then _query_deepseek_thinking shrinks down:

async def _query_deepseek_thinking(
    self, payloads: dict, tools: ToolSet | None
) -> LLMResponse:
    session_id, session, base_payload, extra_body, tool_list = (
        self._prepare_deepseek_thinking(payloads, tools)
    )

    while session.should_continue():
        current_messages = self.thinking_manager.get_api_messages(session_id) or base_payload["messages"]
        call_payload = dict(base_payload, messages=current_messages)

        try:
            completion = await self.client.chat.completions.create(
                **call_payload, stream=False, extra_body=extra_body
            )
            response_dict = completion.model_dump()
            should_continue, tool_calls, final_answer = (
                self.thinking_manager.process_response(session_id, response_dict)
            )
            ...
        except Exception:
            ...

And _query_stream_deepseek_thinking can reuse the same session_id, session, base_payload, extra_body, tool_list without re‑expanding all that logic.

These three helpers keep behavior identical but make the main methods much shorter and more obviously about “thinking orchestration” instead of request munging, which should help the next person reason about and evolve the DeepSeek flow.

修复内容:
- 修复思考模式下默认模型参数丢失的问题
- 修复DeepSeekThinkingManager.get_api_messages参数不匹配问题
- 修复流式思考模式错误处理的双重机制问题
- 移除stream路径中的死代码
- 统一导入hashlib到文件顶部

解决Sourcery AI标记的5个高优先级和中优先级问题
@kawayiYokami
Copy link
Contributor Author

我先自己用几周

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant