-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Feat/auto compress #4178
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Feat/auto compress #4178
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 6 个问题,并给出了一些整体层面的反馈:
- ContextManager 会在第一条消息上打上
_needs_compression、_initial_token_count等内部标记,但之后从不清理;建议把这类“账本/元数据”从用户可见的消息负载中剥离出去(例如单独跟踪这些状态,或者在返回前移除这些 key),以避免向下游组件泄露实现细节。 - 当
model_context_limit为-1时,ContextManager.process会直接短路返回,不仅跳过了基于 token 的压缩,也跳过了诸如连续消息合并、移除未配对 tool-call 等有用的清理步骤;建议拆分这些阶段,即便禁用了基于 token 的压缩,清理和截断流程仍然可以继续执行。 - 自动从
LLM_METADATAS填充max_context_length的逻辑在update_provider和create_provider中都各写了一遍;建议提取成一个小的辅助函数以减少重复,并降低未来修改这块行为时出错的概率。
给 AI Agent 的提示词
Please address the comments from this code review:
## Overall Comments
- ContextManager marks messages with internal flags like `_needs_compression` and `_initial_token_count` on the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components.
- When `model_context_limit` is `-1`, `ContextManager.process` short-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled.
- The logic to auto-fill `max_context_length` from `LLM_METADATAS` is duplicated in both `update_provider` and `create_provider`; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.
## Individual Comments
### Comment 1
<location> `astrbot/core/context_manager/context_truncator.py:51-60` </location>
<code_context>
+
+ return result
+
+ def truncate_by_count(
+ self, messages: list[dict[str, Any]], max_messages: int
+ ) -> list[dict[str, Any]]:
+ """
+ 按数量截断:只保留最近的X条消息
+
+ 规则:
+ - 保留系统消息(如果存在)
+ - 保留最近的max_messages条消息
+
+ Args:
+ messages: 原始消息列表
+ max_messages: 最大保留消息数
+
+ Returns:
+ 截断后的消息列表
+ """
+ if len(messages) <= max_messages:
+ return messages
+
+ # 分离系统消息和其他消息
+ system_msgs = [m for m in messages if m.get("role") == "system"]
+ other_msgs = [m for m in messages if m.get("role") != "system"]
+
+ # 保留最近的消息
+ kept_other = other_msgs[-(max_messages - len(system_msgs)) :]
+
+ return system_msgs + kept_other
</code_context>
<issue_to_address>
**issue (bug_risk):** truncate_by_count 在系统消息较多时可能返回超过 max_messages 的结果。
当 `len(system_msgs) >= max_messages` 时,`max_messages - len(system_msgs)` 为 0 或负数,因此 `other_msgs[-(max_messages - len(system_msgs)):]` 仍然会返回元素,最终列表长度可能超过 `max_messages`。为了遵守 `max_messages` 的约定,建议在这种情况下直接短路返回(例如 `return system_msgs[-max_messages:]`),并且仅在 `len(system_msgs) < max_messages` 时才计算 `kept_other`。
</issue_to_address>
### Comment 2
<location> `tests/agent/runners/test_todolist_injection.py:94-95` </location>
<code_context>
+ )
+
+
+class TestTodoListInjection:
+ """测试 TodoList 注入逻辑"""
+
+ def setup_method(self):
</code_context>
<issue_to_address>
**suggestion (testing):** 建议为 `TodoListAddTool` 和 `TodoListUpdateTool` 本身补充单元测试。
当前测试只覆盖了 todo 列表是如何被注入到消息流中的逻辑,并未覆盖这些工具是如何修改 `AstrAgentContext.todolist` 的行为。请补充测试以:
- 断言在列表为空和非空时的 ID 分配行为;
- 覆盖错误路径(新增时缺失/空的 `tasks`;更新时传入不存在的 `task_id`);
- 校验状态字段处理以及可选 description 更新。
这些测试可以放在类似 `tests/agent/tools/test_todolist_tool.py` 的独立模块中,用来防止工具本身的回归影响整个 todo 流程。
Suggested implementation:
```python
import pytest
from tests.agent.runners.test_todolist_injection import MockAstrAgentContext
# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool # type: ignore[import]
class TestTodoListAddTool:
"""TodoListAddTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
# 确保每个测试都从干净的 todolist 开始
self.context.todolist = []
self.tool = TodoListAddTool()
def _run_add_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_assign_ids_when_list_empty(self):
"""当 todolist 为空时,新增任务应从 ID=1 开始递增"""
result = self._run_add_tool(
tasks=[
{"description": "task 1", "status": "todo"},
{"description": "task 2", "status": "doing"},
]
)
# 校验上下文内的 todolist
assert len(self.context.todolist) == 2
assert self.context.todolist[0]["id"] == 1
assert self.context.todolist[0]["description"] == "task 1"
assert self.context.todolist[0]["status"] == "todo"
assert self.context.todolist[1]["id"] == 2
assert self.context.todolist[1]["description"] == "task 2"
assert self.context.todolist[1]["status"] == "doing"
# 如果工具有返回值,也一并校验
assert result is None or isinstance(result, dict)
def test_assign_ids_when_list_non_empty(self):
"""当 todolist 非空时,应在现有最大 ID 基础上递增"""
# 预置已有任务
self.context.todolist = [
{"id": 1, "description": "existing 1", "status": "todo"},
{"id": 3, "description": "existing 3", "status": "done"},
]
self._run_add_tool(
tasks=[
{"description": "new 1", "status": "todo"},
{"description": "new 2", "status": "todo"},
]
)
# 最大 ID 为 3,新任务应为 4,5
assert len(self.context.todolist) == 4
new_tasks = self.context.todolist[-2:]
assert new_tasks[0]["id"] == 4
assert new_tasks[0]["description"] == "new 1"
assert new_tasks[0]["status"] == "todo"
assert new_tasks[1]["id"] == 5
assert new_tasks[1]["description"] == "new 2"
assert new_tasks[1]["status"] == "todo"
def test_add_tool_error_when_tasks_missing(self):
"""缺少 tasks 参数时应走错误分支"""
with pytest.raises((ValueError, KeyError, TypeError)):
# 不传 tasks
self._run_add_tool()
def test_add_tool_error_when_tasks_empty(self):
"""tasks 为空列表时应走错误分支"""
with pytest.raises(ValueError):
self._run_add_tool(tasks=[])
class TestTodoListUpdateTool:
"""TodoListUpdateTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
self.context.todolist = [
{"id": 1, "description": "task 1", "status": "todo"},
{"id": 2, "description": "task 2", "status": "doing"},
]
self.tool = TodoListUpdateTool()
def _run_update_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_update_status_and_description(self):
"""可以同时更新任务状态和描述"""
result = self._run_update_tool(
task_id=1,
status="done",
description="task 1 updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == "done"
assert task["description"] == "task 1 updated"
assert result is None or isinstance(result, dict)
def test_update_only_status_keeps_description(self):
"""仅更新状态时应保留原 description 不变"""
original_desc = self.context.todolist[1]["description"]
self._run_update_tool(
task_id=2,
status="done",
)
task = next(t for t in self.context.todolist if t["id"] == 2)
assert task["status"] == "done"
# 未传 description,应保持原值
assert task["description"] == original_desc
def test_update_only_description_keeps_status(self):
"""仅更新描述时应保留原 status 不变"""
original_status = self.context.todolist[0]["status"]
self._run_update_tool(
task_id=1,
description="only description updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["description"] == "only description updated"
assert task["status"] == original_status
def test_update_non_existent_task_id_raises(self):
"""更新不存在的 task_id 时应走错误分支"""
with pytest.raises((ValueError, LookupError, KeyError)):
self._run_update_tool(
task_id=999,
status="done",
)
@pytest.mark.parametrize("status", ["todo", "doing", "done"])
def test_accepts_valid_status_values(self, status):
"""验证常见状态值可以被接受并写入"""
self._run_update_tool(
task_id=1,
status=status,
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == status
```
1. 将 `from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool` 的导入路径改成项目实际路径(例如 `astrbot.core.agent.tools.todolist` 等)。
2. 根据工具的真实接口,调整 `_run_add_tool` 和 `_run_update_tool` 中对工具的调用方式(目前假定为 `tool.run(context, **params)`)。如果工具是可调用对象或有 `invoke/__call__` 等,请相应替换。
3. 如果 `MockAstrAgentContext` 的定义不在 `tests.agent.runners.test_todolist_injection` 中,请将导入改为实际位置,或改用正式的 `AstrAgentContext` 构造测试用例。
4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 `pytest.raises(...)` 断言替换成对返回值的断言,以匹配实际实现。
</issue_to_address>
### Comment 3
<location> `tests/core/test_context_manager.py:193-202` </location>
<code_context>
+ def test_truncate_by_count_exceeds_limit(self):
</code_context>
<issue_to_address>
**issue (testing):** 为 `truncate_by_count` 补充 `max_messages` 小于或等于系统消息数量时的测试用例。
当前测试没有覆盖 `max_messages` 小于或等于 `system` 消息数量这一情况,在这种情况下 `max_messages - len(system_msgs)` 可能为 0 或负数,切片逻辑很容易写错。请添加一个测试:构造多个 `system` 消息,并分别将 `max_messages` 设为 0、1 以及恰好等于系统消息数量,以验证该行为。
</issue_to_address>
### Comment 4
<location> `tests/core/test_context_manager_integration.py:303-312` </location>
<code_context>
+ """测试:LLM API调用失败时的错误处理"""
+ provider = MockProvider()
+
+ from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor
+
+ compressor = LLMSummaryCompressor(provider, keep_recent=2)
+
+ messages = [{"role": "user", "content": f"Message {i}"} for i in range(10)]
+
+ with patch.object(
+ provider, "text_chat", side_effect=Exception("API Error")
+ ) as mock_call:
+ result = await compressor.compress(messages)
+
+ # API失败时应该返回原始消息
+ assert result == messages
+ mock_call.assert_called_once()
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v", "-s"])
</code_context>
<issue_to_address>
**nitpick (testing):** LLM 摘要相关测试与 `summary_prompt.md` 中的具体文案绑定得过于紧密,建议放宽断言条件。
目前有若干测试直接断言 instruction 和 summary 中的具体中文前缀(例如 `"请基于我们完整的对话记录"`、`"历史会话摘要"`),这会导致任何对 `summary_prompt.md` 文案的修改都容易破坏测试。更稳健的做法是继续断言结构行为(例如额外的用户指令消息和系统摘要消息存在),但放宽对内容的检查(例如只要求非空、有调用并插入 summary 工具的结果),或者把对精确字符串的检查集中封装在一个辅助函数中,这样调整 prompt 文案时,不必修改大量测试用例。
</issue_to_address>
### Comment 5
<location> `showcase_features.py:193` </location>
<code_context>
+# ============ ContextManager 模拟实现 ============
+
+
+class MockContextManager:
+ """模拟 ContextManager 的核心逻辑"""
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议在这个演示脚本中,用对真实生产组件的轻量封装来替代重新实现的上下文管理、todo 列表和最大步数逻辑,以避免重复业务规则。
主要的复杂度问题在于,该脚本重新实现了核心逻辑(上下文管理、token 阈值、压缩、todo 列表注入、最大步数注入),而不是复用已有的、已经过测试的组件。这会制造一套平行的心智模型,并且后续很容易产生偏差。
你可以在保留全部演示行为的前提下,通过以下方式简化:
1. **使用真实的 ContextManager 替代 `MockContextManager`,当前文件只做简单封装**
保留本文件中的打印/日志逻辑,但在 token 计数和压缩决策上复用真正的实现。
```python
# from astrbot.core.context_manager import ContextManager # adjust import path
class DemoContextManager:
"""Thin wrapper that only adds logging around the real ContextManager."""
def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
self.cm = ContextManager(
model_context_limit=model_context_limit,
is_agent_mode=is_agent_mode,
provider=provider,
)
async def process_context(self, messages: list[dict]) -> list[dict]:
total_tokens = self.cm.count_tokens(messages)
usage_rate = total_tokens / self.cm.model_context_limit
print(f" 初始Token数: {total_tokens}")
print(f" 上下文限制: {self.cm.model_context_limit}")
print(f" 使用率: {usage_rate:.2%}")
print(f" 触发阈值: {self.cm.threshold:.0%}")
# delegate real logic
result = await self.cm.process_context(messages)
print("\n【输出】压缩后的消息历史:")
print_messages(result, indent=1)
return result
```
然后在 `demo_context_manager()` 中:
```python
async def demo_context_manager():
print_separator("DEMO 1: ContextManager Workflow")
print_subsection("Agent模式(触发摘要压缩)")
print("【输入】完整消息历史:")
print_messages(LONG_MESSAGE_HISTORY, indent=1)
print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
mock_provider = MagicMock()
cm_agent = DemoContextManager(
model_context_limit=150, is_agent_mode=True, provider=mock_provider
)
result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
```
这样既保留了相同的打印说明,又去掉了重复的 token 计数、阈值和压缩策略逻辑。
2. **复用真实的 todo 列表注入/格式化逻辑**
不要再维护一套 `format_todolist` + `inject_todolist_to_messages` 的实现,而是调用核心 helper,只在外面包一层用于打印的封装。
```python
# from astrbot.core.todo import inject_todolist, format_todolist # adjust to real modules
def demo_inject_todolist_to_messages(
messages: list[dict],
todolist: list[dict],
max_tool_calls: int | None = None,
current_tool_calls: int | None = None,
) -> list[dict]:
"""Thin demo wrapper adding logs around real injection."""
print("\n【输入】TodoList:")
for task in todolist:
status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
task["status"], "[ ]"
)
print(f" {status_icon} #{task['id']}: {task['description']}")
result = inject_todolist(
messages=messages,
todolist=todolist,
max_tool_calls=max_tool_calls,
current_tool_calls=current_tool_calls,
)
print("\n【输出】注入后的消息历史:")
print_messages(result, indent=1)
return result
```
然后在 `demo_todolist_injection()` 中,把对 `inject_todolist_to_messages` 的调用替换为 `demo_inject_todolist_to_messages`,从而展示真实注入行为。
3. **集中处理最大步数的智能注入逻辑**
`demo_max_step_smart_injection` 中的逻辑本质上就是手写的 `_smart_inject_user_message`/“最大步数注入”行为。如果生产代码中已经存在类似逻辑,建议直接封装调用,而不是在此重复规则。
```python
# from astrbot.core.agent_runner import smart_inject_user_message # adjust import
def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
"""Use real smart-injection and only add console explanation."""
print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
result = smart_inject_user_message(messages, max_step_message)
print("\n【输出】智能注入后的消息历史:")
print_messages(result, indent=1)
return result
```
然后在 `demo_max_step_smart_injection()` 中:
```python
result_a = demo_max_step_injection(messages_with_user, max_step_message)
...
result_b = demo_max_step_injection(messages_with_tool, max_step_message)
```
4. **仅在本文件中保留演示相关的逻辑**
各种打印 helper(`print_separator`、`print_subsection`、`print_messages`)以及示例数据都可以保留在这里;关键的简化点是:**不要在这里重新表达业务规则**(token 统计、压缩、注入),而是用增加日志的方式去包装真实组件。
这些改动可以保持所有演示行为(以及可见输出)不变,同时消除平行实现,使脚本更易维护,并在核心逻辑未来变更时自动保持一致。
</issue_to_address>
### Comment 6
<location> `astrbot/core/context_manager/context_manager.py:63` </location>
<code_context>
+ if self.model_context_limit == -1:
+ return messages
+
+ # 阶段1:Token初始统计
+ messages = await self._initial_token_check(messages)
+
</code_context>
<issue_to_address>
**issue (complexity):** 建议通过在本地计算 token 使用情况并内联一些简单阶段,来简化主处理流程,避免在消息上存储内部标记并通过多个小包装方法“跳转”。
你可以保持整体流程不变,但避免通过修改 `messages[0]` 以隐藏 flag 的方式来存状态,并减少主路径上的间接调用。
### 1. 避免在消息上存 `_needs_compression` 和 `_initial_token_count`
相比把内部状态编码到第一条消息上,更推荐直接计算 token 使用情况,并通过局部变量传递。这样可以保持外部数据结构的干净,也能让 `_run_compression` 更容易理解。
重构示例(不改变行为):
```python
async def process(
self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
if self.model_context_limit == -1 or not messages:
return messages
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.model_context_limit
needs_compression = usage_rate > self.threshold
if needs_compression:
messages = await self._run_compression(messages)
messages = await self._run_final_processing(messages, max_messages_to_keep)
return messages
```
然后可以简化 `_run_compression`,使其不再依赖隐藏标记:
```python
async def _run_compression(
self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
# Agent模式:先摘要,再判断
messages = await self._compress_by_summarization(messages)
tokens_after_summary = self.token_counter.count_tokens(messages)
if tokens_after_summary / self.model_context_limit > self.threshold:
messages = self._compress_by_halving(messages)
return messages
```
这样就可以完全去掉 `_initial_token_check`,因为它的逻辑已被内联到 `process` 中并通过局部变量保存。
### 2. 内联一些简单阶段包装,减少跳转
如果你决定保留 `_run_final_processing`,也可以视情况合并一些非常简单的阶段包装函数,使主流程在一个函数中就能一眼看懂。
例如,可以把 `_run_final_processing` 逻辑内联到 `process`(保留现有的复杂 helper 方法):
```python
async def process(
self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
if self.model_context_limit == -1 or not messages:
return messages
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.model_context_limit
needs_compression = usage_rate > self.threshold
if needs_compression:
messages = await self._run_compression(messages)
# final processing steps are linear and small, so keep them visible here
messages = self._merge_consecutive_messages(messages)
messages = self._cleanup_unpaired_tool_calls(messages)
messages = self.truncator.truncate_by_count(messages, max_messages_to_keep)
return messages
```
这样可以把“token 检查 → 视情况压缩 → 末尾处理”这一概念流程集中在一个函数中,同时仍然复用现有的复杂 helper(`_merge_consecutive_messages`、`_cleanup_unpaired_tool_calls`)。
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续评审。
Original comment in English
Hey - I've found 6 issues, and left some high level feedback:
- ContextManager marks messages with internal flags like
_needs_compressionand_initial_token_counton the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components. - When
model_context_limitis-1,ContextManager.processshort-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled. - The logic to auto-fill
max_context_lengthfromLLM_METADATASis duplicated in bothupdate_providerandcreate_provider; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- ContextManager marks messages with internal flags like `_needs_compression` and `_initial_token_count` on the first message and never strips them out; consider keeping such bookkeeping out of the user-visible payload (e.g., track this state separately or remove the keys before returning) to avoid leaking implementation details to downstream components.
- When `model_context_limit` is `-1`, `ContextManager.process` short-circuits and skips not only token-based compression but also useful cleanups like consecutive-message merging and unpaired tool-call removal; consider splitting these phases so that cleanup and truncation can still run even when token-based compression is disabled.
- The logic to auto-fill `max_context_length` from `LLM_METADATAS` is duplicated in both `update_provider` and `create_provider`; extracting this into a small helper function would reduce repetition and make future changes to that behavior less error-prone.
## Individual Comments
### Comment 1
<location> `astrbot/core/context_manager/context_truncator.py:51-60` </location>
<code_context>
+
+ return result
+
+ def truncate_by_count(
+ self, messages: list[dict[str, Any]], max_messages: int
+ ) -> list[dict[str, Any]]:
+ """
+ 按数量截断:只保留最近的X条消息
+
+ 规则:
+ - 保留系统消息(如果存在)
+ - 保留最近的max_messages条消息
+
+ Args:
+ messages: 原始消息列表
+ max_messages: 最大保留消息数
+
+ Returns:
+ 截断后的消息列表
+ """
+ if len(messages) <= max_messages:
+ return messages
+
+ # 分离系统消息和其他消息
+ system_msgs = [m for m in messages if m.get("role") == "system"]
+ other_msgs = [m for m in messages if m.get("role") != "system"]
+
+ # 保留最近的消息
+ kept_other = other_msgs[-(max_messages - len(system_msgs)) :]
+
+ return system_msgs + kept_other
</code_context>
<issue_to_address>
**issue (bug_risk):** truncate_by_count can exceed max_messages when there are many system messages.
When `len(system_msgs) >= max_messages`, `max_messages - len(system_msgs)` is zero or negative, so `other_msgs[-(max_messages - len(system_msgs)):]` still returns elements and the final list can exceed `max_messages`. To preserve the `max_messages` contract, short-circuit in this case (e.g. `return system_msgs[-max_messages:]`) and only compute `kept_other` when `len(system_msgs) < max_messages`.
</issue_to_address>
### Comment 2
<location> `tests/agent/runners/test_todolist_injection.py:94-95` </location>
<code_context>
+ )
+
+
+class TestTodoListInjection:
+ """测试 TodoList 注入逻辑"""
+
+ def setup_method(self):
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding unit tests for `TodoListAddTool` and `TodoListUpdateTool` to cover the new tools themselves.
The current tests only cover how the todo list is injected into the message stream, not the behavior of the tools that mutate `AstrAgentContext.todolist`. Please add tests that:
- assert ID assignment when the list is empty vs. non‑empty,
- exercise error paths (`tasks` missing/empty for add; non‑existent `task_id` for update),
- validate status handling and optional description updates.
These can go in a dedicated module like `tests/agent/tools/test_todolist_tool.py` to protect the todo flow against regressions in the tools themselves.
Suggested implementation:
```python
import pytest
from tests.agent.runners.test_todolist_injection import MockAstrAgentContext
# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool # type: ignore[import]
class TestTodoListAddTool:
"""TodoListAddTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
# 确保每个测试都从干净的 todolist 开始
self.context.todolist = []
self.tool = TodoListAddTool()
def _run_add_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_assign_ids_when_list_empty(self):
"""当 todolist 为空时,新增任务应从 ID=1 开始递增"""
result = self._run_add_tool(
tasks=[
{"description": "task 1", "status": "todo"},
{"description": "task 2", "status": "doing"},
]
)
# 校验上下文内的 todolist
assert len(self.context.todolist) == 2
assert self.context.todolist[0]["id"] == 1
assert self.context.todolist[0]["description"] == "task 1"
assert self.context.todolist[0]["status"] == "todo"
assert self.context.todolist[1]["id"] == 2
assert self.context.todolist[1]["description"] == "task 2"
assert self.context.todolist[1]["status"] == "doing"
# 如果工具有返回值,也一并校验
assert result is None or isinstance(result, dict)
def test_assign_ids_when_list_non_empty(self):
"""当 todolist 非空时,应在现有最大 ID 基础上递增"""
# 预置已有任务
self.context.todolist = [
{"id": 1, "description": "existing 1", "status": "todo"},
{"id": 3, "description": "existing 3", "status": "done"},
]
self._run_add_tool(
tasks=[
{"description": "new 1", "status": "todo"},
{"description": "new 2", "status": "todo"},
]
)
# 最大 ID 为 3,新任务应为 4,5
assert len(self.context.todolist) == 4
new_tasks = self.context.todolist[-2:]
assert new_tasks[0]["id"] == 4
assert new_tasks[0]["description"] == "new 1"
assert new_tasks[0]["status"] == "todo"
assert new_tasks[1]["id"] == 5
assert new_tasks[1]["description"] == "new 2"
assert new_tasks[1]["status"] == "todo"
def test_add_tool_error_when_tasks_missing(self):
"""缺少 tasks 参数时应走错误分支"""
with pytest.raises((ValueError, KeyError, TypeError)):
# 不传 tasks
self._run_add_tool()
def test_add_tool_error_when_tasks_empty(self):
"""tasks 为空列表时应走错误分支"""
with pytest.raises(ValueError):
self._run_add_tool(tasks=[])
class TestTodoListUpdateTool:
"""TodoListUpdateTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
self.context.todolist = [
{"id": 1, "description": "task 1", "status": "todo"},
{"id": 2, "description": "task 2", "status": "doing"},
]
self.tool = TodoListUpdateTool()
def _run_update_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_update_status_and_description(self):
"""可以同时更新任务状态和描述"""
result = self._run_update_tool(
task_id=1,
status="done",
description="task 1 updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == "done"
assert task["description"] == "task 1 updated"
assert result is None or isinstance(result, dict)
def test_update_only_status_keeps_description(self):
"""仅更新状态时应保留原 description 不变"""
original_desc = self.context.todolist[1]["description"]
self._run_update_tool(
task_id=2,
status="done",
)
task = next(t for t in self.context.todolist if t["id"] == 2)
assert task["status"] == "done"
# 未传 description,应保持原值
assert task["description"] == original_desc
def test_update_only_description_keeps_status(self):
"""仅更新描述时应保留原 status 不变"""
original_status = self.context.todolist[0]["status"]
self._run_update_tool(
task_id=1,
description="only description updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["description"] == "only description updated"
assert task["status"] == original_status
def test_update_non_existent_task_id_raises(self):
"""更新不存在的 task_id 时应走错误分支"""
with pytest.raises((ValueError, LookupError, KeyError)):
self._run_update_tool(
task_id=999,
status="done",
)
@pytest.mark.parametrize("status", ["todo", "doing", "done"])
def test_accepts_valid_status_values(self, status):
"""验证常见状态值可以被接受并写入"""
self._run_update_tool(
task_id=1,
status=status,
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == status
```
1. 将 `from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool` 的导入路径改成项目实际路径(例如 `astrbot.core.agent.tools.todolist` 等)。
2. 根据工具的真实接口,调整 `_run_add_tool` 和 `_run_update_tool` 中对工具的调用方式(目前假定为 `tool.run(context, **params)`)。如果工具是可调用对象或有 `invoke/__call__` 等,请相应替换。
3. 如果 `MockAstrAgentContext` 的定义不在 `tests.agent.runners.test_todolist_injection` 中,请将导入改为实际位置,或改用正式的 `AstrAgentContext` 构造测试用例。
4. 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将 `pytest.raises(...)` 断言替换成对返回值的断言,以匹配实际实现。
</issue_to_address>
### Comment 3
<location> `tests/core/test_context_manager.py:193-202` </location>
<code_context>
+ def test_truncate_by_count_exceeds_limit(self):
</code_context>
<issue_to_address>
**issue (testing):** Add a test for `truncate_by_count` when `max_messages` is less than or equal to the number of system messages.
Current tests don’t cover the case where `max_messages` is less than or equal to the number of `system` messages, where `max_messages - len(system_msgs)` can be zero or negative and slicing is easy to get wrong. Please add a test that creates multiple `system` messages and sets `max_messages` to 0, 1, and exactly the number of system messages to validate this behavior.
</issue_to_address>
### Comment 4
<location> `tests/core/test_context_manager_integration.py:303-312` </location>
<code_context>
+ """测试:LLM API调用失败时的错误处理"""
+ provider = MockProvider()
+
+ from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor
+
+ compressor = LLMSummaryCompressor(provider, keep_recent=2)
+
+ messages = [{"role": "user", "content": f"Message {i}"} for i in range(10)]
+
+ with patch.object(
+ provider, "text_chat", side_effect=Exception("API Error")
+ ) as mock_call:
+ result = await compressor.compress(messages)
+
+ # API失败时应该返回原始消息
+ assert result == messages
+ mock_call.assert_called_once()
+
+
+if __name__ == "__main__":
+ pytest.main([__file__, "-v", "-s"])
</code_context>
<issue_to_address>
**nitpick (testing):** LLM summary tests are tightly coupled to the exact wording in `summary_prompt.md`; consider loosening assertions.
Several tests assert specific Chinese prefixes in the instruction and summary (e.g. `"请基于我们完整的对话记录"`, `"历史会话摘要"`), which makes them fragile to any wording change in `summary_prompt.md`. It’d be more robust to keep asserting the structural behavior (extra user instruction and system summary messages) while loosening the content checks (e.g., non‑empty, summary tool invoked and interpolated), or centralizing any exact-string checks behind a single helper so prompt edits don’t require touching many tests.
</issue_to_address>
### Comment 5
<location> `showcase_features.py:193` </location>
<code_context>
+# ============ ContextManager 模拟实现 ============
+
+
+class MockContextManager:
+ """模拟 ContextManager 的核心逻辑"""
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the reimplemented context, todo-list, and max-step logic in this demo with thin wrappers around the real production components to avoid duplicating business rules.
The main complexity issue is that this script re‑implements core logic (context management, token thresholds, compression, todo‑list injection, max‑step injection) instead of calling existing, tested components. That creates a parallel mental model and will drift over time.
You can keep all demo behavior but simplify by:
1. **Delegating to the real ContextManager instead of `MockContextManager`**
Keep the printing/logging in this file, but reuse the actual implementation for token counting and compression decisions.
```python
# from astrbot.core.context_manager import ContextManager # adjust import path
class DemoContextManager:
"""Thin wrapper that only adds logging around the real ContextManager."""
def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None):
self.cm = ContextManager(
model_context_limit=model_context_limit,
is_agent_mode=is_agent_mode,
provider=provider,
)
async def process_context(self, messages: list[dict]) -> list[dict]:
total_tokens = self.cm.count_tokens(messages)
usage_rate = total_tokens / self.cm.model_context_limit
print(f" 初始Token数: {total_tokens}")
print(f" 上下文限制: {self.cm.model_context_limit}")
print(f" 使用率: {usage_rate:.2%}")
print(f" 触发阈值: {self.cm.threshold:.0%}")
# delegate real logic
result = await self.cm.process_context(messages)
print("\n【输出】压缩后的消息历史:")
print_messages(result, indent=1)
return result
```
Then in `demo_context_manager()`:
```python
async def demo_context_manager():
print_separator("DEMO 1: ContextManager Workflow")
print_subsection("Agent模式(触发摘要压缩)")
print("【输入】完整消息历史:")
print_messages(LONG_MESSAGE_HISTORY, indent=1)
print("\n【处理】执行 ContextManager.process_context (AGENT 模式):")
mock_provider = MagicMock()
cm_agent = DemoContextManager(
model_context_limit=150, is_agent_mode=True, provider=mock_provider
)
result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
```
This removes duplicated token counting, thresholds and compression strategy while keeping the same printed explanation.
2. **Reusing the real todo‑list injection/formatting logic**
Instead of `format_todolist` + `inject_todolist_to_messages` being a second implementation, call the core helpers and only wrap them with pretty printing.
```python
# from astrbot.core.todo import inject_todolist, format_todolist # adjust to real modules
def demo_inject_todolist_to_messages(
messages: list[dict],
todolist: list[dict],
max_tool_calls: int | None = None,
current_tool_calls: int | None = None,
) -> list[dict]:
"""Thin demo wrapper adding logs around real injection."""
print("\n【输入】TodoList:")
for task in todolist:
status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get(
task["status"], "[ ]"
)
print(f" {status_icon} #{task['id']}: {task['description']}")
result = inject_todolist(
messages=messages,
todolist=todolist,
max_tool_calls=max_tool_calls,
current_tool_calls=current_tool_calls,
)
print("\n【输出】注入后的消息历史:")
print_messages(result, indent=1)
return result
```
Then in `demo_todolist_injection()` replace calls to `inject_todolist_to_messages` with `demo_inject_todolist_to_messages`, so the real injection behavior is showcased.
3. **Centralizing max‑step smart injection**
The logic in `demo_max_step_smart_injection` is essentially a hand‑rolled `_smart_inject_user_message` / “max step injection”. If that exists in production code, wrap it instead of inlining the rules twice.
```python
# from astrbot.core.agent_runner import smart_inject_user_message # adjust import
def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]:
"""Use real smart-injection and only add console explanation."""
print("\n【处理】模拟工具耗尽,执行智能注入逻辑...")
result = smart_inject_user_message(messages, max_step_message)
print("\n【输出】智能注入后的消息历史:")
print_messages(result, indent=1)
return result
```
Then in `demo_max_step_smart_injection()`:
```python
result_a = demo_max_step_injection(messages_with_user, max_step_message)
...
result_b = demo_max_step_injection(messages_with_tool, max_step_message)
```
4. **Keep demo‑specific concerns only in this file**
All the printing helpers (`print_separator`, `print_subsection`, `print_messages`) and example data are fine here; the key simplification is to **stop re‑expressing business rules** (token accounting, compression, injection) and instead wrap the real components with extra logging.
These changes maintain all showcased behaviors (and visible output) but collapse the parallel implementation, making the script cheaper to maintain and automatically consistent with future core logic changes.
</issue_to_address>
### Comment 6
<location> `astrbot/core/context_manager/context_manager.py:63` </location>
<code_context>
+ if self.model_context_limit == -1:
+ return messages
+
+ # 阶段1:Token初始统计
+ messages = await self._initial_token_check(messages)
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider simplifying the main processing flow by computing token usage locally and inlining trivial stages instead of storing internal flags on messages and jumping through small wrapper methods.
You can keep the flow the same but avoid mutating `messages[0]` with hidden flags and reduce indirection in the main path.
### 1. Avoid `_needs_compression` and `_initial_token_count` on messages
Instead of encoding internal state into the first message, compute the token usage and pass it around as local variables. This keeps the external data structure clean and makes `_run_compression` easier to reason about.
Example refactor (no behavior change):
```python
async def process(
self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
if self.model_context_limit == -1 or not messages:
return messages
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.model_context_limit
needs_compression = usage_rate > self.threshold
if needs_compression:
messages = await self._run_compression(messages)
messages = await self._run_final_processing(messages, max_messages_to_keep)
return messages
```
Then simplify `_run_compression` so it doesn’t need to inspect hidden flags:
```python
async def _run_compression(
self, messages: list[dict[str, Any]]
) -> list[dict[str, Any]]:
# Agent模式:先摘要,再判断
messages = await self._compress_by_summarization(messages)
tokens_after_summary = self.token_counter.count_tokens(messages)
if tokens_after_summary / self.model_context_limit > self.threshold:
messages = self._compress_by_halving(messages)
return messages
```
You can then drop `_initial_token_check` entirely, since its logic is now inlined into `process` and held in local variables.
### 2. Inline trivial stage wrappers to reduce jumping
If you keep `_run_final_processing`, consider collapsing the very small stage wrappers where appropriate so the main flow is visible in one place.
For example, you can in-line `_run_final_processing` into `process` (keeping the existing helper methods):
```python
async def process(
self, messages: list[dict[str, Any]], max_messages_to_keep: int = 20
) -> list[dict[str, Any]]:
if self.model_context_limit == -1 or not messages:
return messages
total_tokens = self.token_counter.count_tokens(messages)
usage_rate = total_tokens / self.model_context_limit
needs_compression = usage_rate > self.threshold
if needs_compression:
messages = await self._run_compression(messages)
# final processing steps are linear and small, so keep them visible here
messages = self._merge_consecutive_messages(messages)
messages = self._cleanup_unpaired_tool_calls(messages)
messages = self.truncator.truncate_by_count(messages, max_messages_to_keep)
return messages
```
This keeps the conceptual flow “token check → maybe compress → finalize” in a single function, while still reusing the more complex helpers (`_merge_consecutive_messages`, `_cleanup_unpaired_tool_calls`) as-is.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def truncate_by_count( | ||
| self, messages: list[dict[str, Any]], max_messages: int | ||
| ) -> list[dict[str, Any]]: | ||
| """ | ||
| 按数量截断:只保留最近的X条消息 | ||
|
|
||
| 规则: | ||
| - 保留系统消息(如果存在) | ||
| - 保留最近的max_messages条消息 | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): truncate_by_count 在系统消息较多时可能返回超过 max_messages 的结果。
当 len(system_msgs) >= max_messages 时,max_messages - len(system_msgs) 为 0 或负数,因此 other_msgs[-(max_messages - len(system_msgs)):] 仍然会返回元素,最终列表长度可能超过 max_messages。为了遵守 max_messages 的约定,建议在这种情况下直接短路返回(例如 return system_msgs[-max_messages:]),并且仅在 len(system_msgs) < max_messages 时才计算 kept_other。
Original comment in English
issue (bug_risk): truncate_by_count can exceed max_messages when there are many system messages.
When len(system_msgs) >= max_messages, max_messages - len(system_msgs) is zero or negative, so other_msgs[-(max_messages - len(system_msgs)):] still returns elements and the final list can exceed max_messages. To preserve the max_messages contract, short-circuit in this case (e.g. return system_msgs[-max_messages:]) and only compute kept_other when len(system_msgs) < max_messages.
| class TestTodoListInjection: | ||
| """测试 TodoList 注入逻辑""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): 建议为 TodoListAddTool 和 TodoListUpdateTool 本身补充单元测试。
当前测试只覆盖了 todo 列表是如何被注入到消息流中的逻辑,并未覆盖这些工具是如何修改 AstrAgentContext.todolist 的行为。请补充测试以:
- 断言在列表为空和非空时的 ID 分配行为;
- 覆盖错误路径(新增时缺失/空的
tasks;更新时传入不存在的task_id); - 校验状态字段处理以及可选 description 更新。
这些测试可以放在类似tests/agent/tools/test_todolist_tool.py的独立模块中,用来防止工具本身的回归影响整个 todo 流程。
Suggested implementation:
import pytest
from tests.agent.runners.test_todolist_injection import MockAstrAgentContext
# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool # type: ignore[import]
class TestTodoListAddTool:
"""TodoListAddTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
# 确保每个测试都从干净的 todolist 开始
self.context.todolist = []
self.tool = TodoListAddTool()
def _run_add_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_assign_ids_when_list_empty(self):
"""当 todolist 为空时,新增任务应从 ID=1 开始递增"""
result = self._run_add_tool(
tasks=[
{"description": "task 1", "status": "todo"},
{"description": "task 2", "status": "doing"},
]
)
# 校验上下文内的 todolist
assert len(self.context.todolist) == 2
assert self.context.todolist[0]["id"] == 1
assert self.context.todolist[0]["description"] == "task 1"
assert self.context.todolist[0]["status"] == "todo"
assert self.context.todolist[1]["id"] == 2
assert self.context.todolist[1]["description"] == "task 2"
assert self.context.todolist[1]["status"] == "doing"
# 如果工具有返回值,也一并校验
assert result is None or isinstance(result, dict)
def test_assign_ids_when_list_non_empty(self):
"""当 todolist 非空时,应在现有最大 ID 基础上递增"""
# 预置已有任务
self.context.todolist = [
{"id": 1, "description": "existing 1", "status": "todo"},
{"id": 3, "description": "existing 3", "status": "done"},
]
self._run_add_tool(
tasks=[
{"description": "new 1", "status": "todo"},
{"description": "new 2", "status": "todo"},
]
)
# 最大 ID 为 3,新任务应为 4,5
assert len(self.context.todolist) == 4
new_tasks = self.context.todolist[-2:]
assert new_tasks[0]["id"] == 4
assert new_tasks[0]["description"] == "new 1"
assert new_tasks[0]["status"] == "todo"
assert new_tasks[1]["id"] == 5
assert new_tasks[1]["description"] == "new 2"
assert new_tasks[1]["status"] == "todo"
def test_add_tool_error_when_tasks_missing(self):
"""缺少 tasks 参数时应走错误分支"""
with pytest.raises((ValueError, KeyError, TypeError)):
# 不传 tasks
self._run_add_tool()
def test_add_tool_error_when_tasks_empty(self):
"""tasks 为空列表时应走错误分支"""
with pytest.raises(ValueError):
self._run_add_tool(tasks=[])
class TestTodoListUpdateTool:
"""TodoListUpdateTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
self.context.todolist = [
{"id": 1, "description": "task 1", "status": "todo"},
{"id": 2, "description": "task 2", "status": "doing"},
]
self.tool = TodoListUpdateTool()
def _run_update_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_update_status_and_description(self):
"""可以同时更新任务状态和描述"""
result = self._run_update_tool(
task_id=1,
status="done",
description="task 1 updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == "done"
assert task["description"] == "task 1 updated"
assert result is None or isinstance(result, dict)
def test_update_only_status_keeps_description(self):
"""仅更新状态时应保留原 description 不变"""
original_desc = self.context.todolist[1]["description"]
self._run_update_tool(
task_id=2,
status="done",
)
task = next(t for t in self.context.todolist if t["id"] == 2)
assert task["status"] == "done"
# 未传 description,应保持原值
assert task["description"] == original_desc
def test_update_only_description_keeps_status(self):
"""仅更新描述时应保留原 status 不变"""
original_status = self.context.todolist[0]["status"]
self._run_update_tool(
task_id=1,
description="only description updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["description"] == "only description updated"
assert task["status"] == original_status
def test_update_non_existent_task_id_raises(self):
"""更新不存在的 task_id 时应走错误分支"""
with pytest.raises((ValueError, LookupError, KeyError)):
self._run_update_tool(
task_id=999,
status="done",
)
@pytest.mark.parametrize("status", ["todo", "doing", "done"])
def test_accepts_valid_status_values(self, status):
"""验证常见状态值可以被接受并写入"""
self._run_update_tool(
task_id=1,
status=status,
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == status- 将
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool的导入路径改成项目实际路径(例如astrbot.core.agent.tools.todolist等)。 - 根据工具的真实接口,调整
_run_add_tool和_run_update_tool中对工具的调用方式(目前假定为tool.run(context, **params))。如果工具是可调用对象或有invoke/__call__等,请相应替换。 - 如果
MockAstrAgentContext的定义不在tests.agent.runners.test_todolist_injection中,请将导入改为实际位置,或改用正式的AstrAgentContext构造测试用例。 - 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将
pytest.raises(...)断言替换成对返回值的断言,以匹配实际实现。
Original comment in English
suggestion (testing): Consider adding unit tests for TodoListAddTool and TodoListUpdateTool to cover the new tools themselves.
The current tests only cover how the todo list is injected into the message stream, not the behavior of the tools that mutate AstrAgentContext.todolist. Please add tests that:
- assert ID assignment when the list is empty vs. non‑empty,
- exercise error paths (
tasksmissing/empty for add; non‑existenttask_idfor update), - validate status handling and optional description updates.
These can go in a dedicated module liketests/agent/tools/test_todolist_tool.pyto protect the todo flow against regressions in the tools themselves.
Suggested implementation:
import pytest
from tests.agent.runners.test_todolist_injection import MockAstrAgentContext
# NOTE: Adjust the import path below to match the actual location of the tools.
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool # type: ignore[import]
class TestTodoListAddTool:
"""TodoListAddTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
# 确保每个测试都从干净的 todolist 开始
self.context.todolist = []
self.tool = TodoListAddTool()
def _run_add_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_assign_ids_when_list_empty(self):
"""当 todolist 为空时,新增任务应从 ID=1 开始递增"""
result = self._run_add_tool(
tasks=[
{"description": "task 1", "status": "todo"},
{"description": "task 2", "status": "doing"},
]
)
# 校验上下文内的 todolist
assert len(self.context.todolist) == 2
assert self.context.todolist[0]["id"] == 1
assert self.context.todolist[0]["description"] == "task 1"
assert self.context.todolist[0]["status"] == "todo"
assert self.context.todolist[1]["id"] == 2
assert self.context.todolist[1]["description"] == "task 2"
assert self.context.todolist[1]["status"] == "doing"
# 如果工具有返回值,也一并校验
assert result is None or isinstance(result, dict)
def test_assign_ids_when_list_non_empty(self):
"""当 todolist 非空时,应在现有最大 ID 基础上递增"""
# 预置已有任务
self.context.todolist = [
{"id": 1, "description": "existing 1", "status": "todo"},
{"id": 3, "description": "existing 3", "status": "done"},
]
self._run_add_tool(
tasks=[
{"description": "new 1", "status": "todo"},
{"description": "new 2", "status": "todo"},
]
)
# 最大 ID 为 3,新任务应为 4,5
assert len(self.context.todolist) == 4
new_tasks = self.context.todolist[-2:]
assert new_tasks[0]["id"] == 4
assert new_tasks[0]["description"] == "new 1"
assert new_tasks[0]["status"] == "todo"
assert new_tasks[1]["id"] == 5
assert new_tasks[1]["description"] == "new 2"
assert new_tasks[1]["status"] == "todo"
def test_add_tool_error_when_tasks_missing(self):
"""缺少 tasks 参数时应走错误分支"""
with pytest.raises((ValueError, KeyError, TypeError)):
# 不传 tasks
self._run_add_tool()
def test_add_tool_error_when_tasks_empty(self):
"""tasks 为空列表时应走错误分支"""
with pytest.raises(ValueError):
self._run_add_tool(tasks=[])
class TestTodoListUpdateTool:
"""TodoListUpdateTool 行为测试"""
def setup_method(self):
self.context = MockAstrAgentContext()
self.context.todolist = [
{"id": 1, "description": "task 1", "status": "todo"},
{"id": 2, "description": "task 2", "status": "doing"},
]
self.tool = TodoListUpdateTool()
def _run_update_tool(self, **params):
"""
封装工具调用,便于后续统一修改调用方式。
根据实际实现,可能需要改成:
- self.tool(self.context, **params)
- self.tool.run(self.context, **params)
- self.tool.invoke(params, self.context)
等。
"""
return self.tool.run(self.context, **params)
def test_update_status_and_description(self):
"""可以同时更新任务状态和描述"""
result = self._run_update_tool(
task_id=1,
status="done",
description="task 1 updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == "done"
assert task["description"] == "task 1 updated"
assert result is None or isinstance(result, dict)
def test_update_only_status_keeps_description(self):
"""仅更新状态时应保留原 description 不变"""
original_desc = self.context.todolist[1]["description"]
self._run_update_tool(
task_id=2,
status="done",
)
task = next(t for t in self.context.todolist if t["id"] == 2)
assert task["status"] == "done"
# 未传 description,应保持原值
assert task["description"] == original_desc
def test_update_only_description_keeps_status(self):
"""仅更新描述时应保留原 status 不变"""
original_status = self.context.todolist[0]["status"]
self._run_update_tool(
task_id=1,
description="only description updated",
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["description"] == "only description updated"
assert task["status"] == original_status
def test_update_non_existent_task_id_raises(self):
"""更新不存在的 task_id 时应走错误分支"""
with pytest.raises((ValueError, LookupError, KeyError)):
self._run_update_tool(
task_id=999,
status="done",
)
@pytest.mark.parametrize("status", ["todo", "doing", "done"])
def test_accepts_valid_status_values(self, status):
"""验证常见状态值可以被接受并写入"""
self._run_update_tool(
task_id=1,
status=status,
)
task = next(t for t in self.context.todolist if t["id"] == 1)
assert task["status"] == status- 将
from astrbot.agent.tools.todolist import TodoListAddTool, TodoListUpdateTool的导入路径改成项目实际路径(例如astrbot.core.agent.tools.todolist等)。 - 根据工具的真实接口,调整
_run_add_tool和_run_update_tool中对工具的调用方式(目前假定为tool.run(context, **params))。如果工具是可调用对象或有invoke/__call__等,请相应替换。 - 如果
MockAstrAgentContext的定义不在tests.agent.runners.test_todolist_injection中,请将导入改为实际位置,或改用正式的AstrAgentContext构造测试用例。 - 如工具在错误路径下不是通过抛异常,而是返回错误对象/状态码,请将
pytest.raises(...)断言替换成对返回值的断言,以匹配实际实现。
| def test_truncate_by_count_exceeds_limit(self): | ||
| """测试按数量截断 - 超过限制""" | ||
| messages = [ | ||
| {"role": "system", "content": "System"}, | ||
| {"role": "user", "content": "Message 1"}, | ||
| {"role": "assistant", "content": "Response 1"}, | ||
| {"role": "user", "content": "Message 2"}, | ||
| {"role": "assistant", "content": "Response 2"}, | ||
| {"role": "user", "content": "Message 3"}, | ||
| ] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (testing): 为 truncate_by_count 补充 max_messages 小于或等于系统消息数量时的测试用例。
当前测试没有覆盖 max_messages 小于或等于 system 消息数量这一情况,在这种情况下 max_messages - len(system_msgs) 可能为 0 或负数,切片逻辑很容易写错。请添加一个测试:构造多个 system 消息,并分别将 max_messages 设为 0、1 以及恰好等于系统消息数量,以验证该行为。
Original comment in English
issue (testing): Add a test for truncate_by_count when max_messages is less than or equal to the number of system messages.
Current tests don’t cover the case where max_messages is less than or equal to the number of system messages, where max_messages - len(system_msgs) can be zero or negative and slicing is easy to get wrong. Please add a test that creates multiple system messages and sets max_messages to 0, 1, and exactly the number of system messages to validate this behavior.
| from astrbot.core.context_manager.context_compressor import LLMSummaryCompressor | ||
|
|
||
| compressor = LLMSummaryCompressor(provider, keep_recent=3) | ||
|
|
||
| messages = [ | ||
| {"role": "system", "content": "System prompt"}, | ||
| {"role": "user", "content": "Q1"}, | ||
| {"role": "assistant", "content": "A1"}, | ||
| {"role": "user", "content": "Q2"}, | ||
| {"role": "assistant", "content": "A2"}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick (testing): LLM 摘要相关测试与 summary_prompt.md 中的具体文案绑定得过于紧密,建议放宽断言条件。
目前有若干测试直接断言 instruction 和 summary 中的具体中文前缀(例如 "请基于我们完整的对话记录"、"历史会话摘要"),这会导致任何对 summary_prompt.md 文案的修改都容易破坏测试。更稳健的做法是继续断言结构行为(例如额外的用户指令消息和系统摘要消息存在),但放宽对内容的检查(例如只要求非空、有调用并插入 summary 工具的结果),或者把对精确字符串的检查集中封装在一个辅助函数中,这样调整 prompt 文案时,不必修改大量测试用例。
Original comment in English
nitpick (testing): LLM summary tests are tightly coupled to the exact wording in summary_prompt.md; consider loosening assertions.
Several tests assert specific Chinese prefixes in the instruction and summary (e.g. "请基于我们完整的对话记录", "历史会话摘要"), which makes them fragile to any wording change in summary_prompt.md. It’d be more robust to keep asserting the structural behavior (extra user instruction and system summary messages) while loosening the content checks (e.g., non‑empty, summary tool invoked and interpolated), or centralizing any exact-string checks behind a single helper so prompt edits don’t require touching many tests.
showcase_features.py
Outdated
| # ============ ContextManager 模拟实现 ============ | ||
|
|
||
|
|
||
| class MockContextManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议在这个演示脚本中,用对真实生产组件的轻量封装来替代重新实现的上下文管理、todo 列表和最大步数逻辑,以避免重复业务规则。
主要的复杂度问题在于,该脚本重新实现了核心逻辑(上下文管理、token 阈值、压缩、todo 列表注入、最大步数注入),而不是复用已有的、已经过测试的组件。这会制造一套平行的心智模型,并且后续很容易产生偏差。
你可以在保留全部演示行为的前提下,通过以下方式简化:
-
使用真实的 ContextManager 替代
MockContextManager,当前文件只做简单封装
保留本文件中的打印/日志逻辑,但在 token 计数和压缩决策上复用真正的实现。# from astrbot.core.context_manager import ContextManager # adjust import path class DemoContextManager: """Thin wrapper that only adds logging around the real ContextManager.""" def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None): self.cm = ContextManager( model_context_limit=model_context_limit, is_agent_mode=is_agent_mode, provider=provider, ) async def process_context(self, messages: list[dict]) -> list[dict]: total_tokens = self.cm.count_tokens(messages) usage_rate = total_tokens / self.cm.model_context_limit print(f" 初始Token数: {total_tokens}") print(f" 上下文限制: {self.cm.model_context_limit}") print(f" 使用率: {usage_rate:.2%}") print(f" 触发阈值: {self.cm.threshold:.0%}") # delegate real logic result = await self.cm.process_context(messages) print("\n【输出】压缩后的消息历史:") print_messages(result, indent=1) return result
然后在
demo_context_manager()中:async def demo_context_manager(): print_separator("DEMO 1: ContextManager Workflow") print_subsection("Agent模式(触发摘要压缩)") print("【输入】完整消息历史:") print_messages(LONG_MESSAGE_HISTORY, indent=1) print("\n【处理】执行 ContextManager.process_context (AGENT 模式):") mock_provider = MagicMock() cm_agent = DemoContextManager( model_context_limit=150, is_agent_mode=True, provider=mock_provider ) result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
这样既保留了相同的打印说明,又去掉了重复的 token 计数、阈值和压缩策略逻辑。
-
复用真实的 todo 列表注入/格式化逻辑
不要再维护一套format_todolist+inject_todolist_to_messages的实现,而是调用核心 helper,只在外面包一层用于打印的封装。# from astrbot.core.todo import inject_todolist, format_todolist # adjust to real modules def demo_inject_todolist_to_messages( messages: list[dict], todolist: list[dict], max_tool_calls: int | None = None, current_tool_calls: int | None = None, ) -> list[dict]: """Thin demo wrapper adding logs around real injection.""" print("\n【输入】TodoList:") for task in todolist: status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get( task["status"], "[ ]" ) print(f" {status_icon} #{task['id']}: {task['description']}") result = inject_todolist( messages=messages, todolist=todolist, max_tool_calls=max_tool_calls, current_tool_calls=current_tool_calls, ) print("\n【输出】注入后的消息历史:") print_messages(result, indent=1) return result
然后在
demo_todolist_injection()中,把对inject_todolist_to_messages的调用替换为demo_inject_todolist_to_messages,从而展示真实注入行为。 -
集中处理最大步数的智能注入逻辑
demo_max_step_smart_injection中的逻辑本质上就是手写的_smart_inject_user_message/“最大步数注入”行为。如果生产代码中已经存在类似逻辑,建议直接封装调用,而不是在此重复规则。# from astrbot.core.agent_runner import smart_inject_user_message # adjust import def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]: """Use real smart-injection and only add console explanation.""" print("\n【处理】模拟工具耗尽,执行智能注入逻辑...") result = smart_inject_user_message(messages, max_step_message) print("\n【输出】智能注入后的消息历史:") print_messages(result, indent=1) return result
然后在
demo_max_step_smart_injection()中:result_a = demo_max_step_injection(messages_with_user, max_step_message) ... result_b = demo_max_step_injection(messages_with_tool, max_step_message)
-
仅在本文件中保留演示相关的逻辑
各种打印 helper(print_separator、print_subsection、print_messages)以及示例数据都可以保留在这里;关键的简化点是:不要在这里重新表达业务规则(token 统计、压缩、注入),而是用增加日志的方式去包装真实组件。
这些改动可以保持所有演示行为(以及可见输出)不变,同时消除平行实现,使脚本更易维护,并在核心逻辑未来变更时自动保持一致。
Original comment in English
issue (complexity): Consider replacing the reimplemented context, todo-list, and max-step logic in this demo with thin wrappers around the real production components to avoid duplicating business rules.
The main complexity issue is that this script re‑implements core logic (context management, token thresholds, compression, todo‑list injection, max‑step injection) instead of calling existing, tested components. That creates a parallel mental model and will drift over time.
You can keep all demo behavior but simplify by:
-
Delegating to the real ContextManager instead of
MockContextManager
Keep the printing/logging in this file, but reuse the actual implementation for token counting and compression decisions.# from astrbot.core.context_manager import ContextManager # adjust import path class DemoContextManager: """Thin wrapper that only adds logging around the real ContextManager.""" def __init__(self, model_context_limit: int, is_agent_mode: bool = False, provider=None): self.cm = ContextManager( model_context_limit=model_context_limit, is_agent_mode=is_agent_mode, provider=provider, ) async def process_context(self, messages: list[dict]) -> list[dict]: total_tokens = self.cm.count_tokens(messages) usage_rate = total_tokens / self.cm.model_context_limit print(f" 初始Token数: {total_tokens}") print(f" 上下文限制: {self.cm.model_context_limit}") print(f" 使用率: {usage_rate:.2%}") print(f" 触发阈值: {self.cm.threshold:.0%}") # delegate real logic result = await self.cm.process_context(messages) print("\n【输出】压缩后的消息历史:") print_messages(result, indent=1) return result
Then in
demo_context_manager():async def demo_context_manager(): print_separator("DEMO 1: ContextManager Workflow") print_subsection("Agent模式(触发摘要压缩)") print("【输入】完整消息历史:") print_messages(LONG_MESSAGE_HISTORY, indent=1) print("\n【处理】执行 ContextManager.process_context (AGENT 模式):") mock_provider = MagicMock() cm_agent = DemoContextManager( model_context_limit=150, is_agent_mode=True, provider=mock_provider ) result_agent = await cm_agent.process_context(LONG_MESSAGE_HISTORY)
This removes duplicated token counting, thresholds and compression strategy while keeping the same printed explanation.
-
Reusing the real todo‑list injection/formatting logic
Instead offormat_todolist+inject_todolist_to_messagesbeing a second implementation, call the core helpers and only wrap them with pretty printing.# from astrbot.core.todo import inject_todolist, format_todolist # adjust to real modules def demo_inject_todolist_to_messages( messages: list[dict], todolist: list[dict], max_tool_calls: int | None = None, current_tool_calls: int | None = None, ) -> list[dict]: """Thin demo wrapper adding logs around real injection.""" print("\n【输入】TodoList:") for task in todolist: status_icon = {"pending": "[ ]", "in_progress": "[-]", "completed": "[x]"}.get( task["status"], "[ ]" ) print(f" {status_icon} #{task['id']}: {task['description']}") result = inject_todolist( messages=messages, todolist=todolist, max_tool_calls=max_tool_calls, current_tool_calls=current_tool_calls, ) print("\n【输出】注入后的消息历史:") print_messages(result, indent=1) return result
Then in
demo_todolist_injection()replace calls toinject_todolist_to_messageswithdemo_inject_todolist_to_messages, so the real injection behavior is showcased. -
Centralizing max‑step smart injection
The logic indemo_max_step_smart_injectionis essentially a hand‑rolled_smart_inject_user_message/ “max step injection”. If that exists in production code, wrap it instead of inlining the rules twice.# from astrbot.core.agent_runner import smart_inject_user_message # adjust import def demo_max_step_injection(messages: list[dict], max_step_message: str) -> list[dict]: """Use real smart-injection and only add console explanation.""" print("\n【处理】模拟工具耗尽,执行智能注入逻辑...") result = smart_inject_user_message(messages, max_step_message) print("\n【输出】智能注入后的消息历史:") print_messages(result, indent=1) return result
Then in
demo_max_step_smart_injection():result_a = demo_max_step_injection(messages_with_user, max_step_message) ... result_b = demo_max_step_injection(messages_with_tool, max_step_message)
-
Keep demo‑specific concerns only in this file
All the printing helpers (print_separator,print_subsection,print_messages) and example data are fine here; the key simplification is to stop re‑expressing business rules (token accounting, compression, injection) and instead wrap the real components with extra logging.
These changes maintain all showcased behaviors (and visible output) but collapse the parallel implementation, making the script cheaper to maintain and automatically consistent with future core logic changes.
|
@sourcery-ai[bot] 感谢审查!已针对以下问题进行修复: 修复 1:TodoList 工具单元测试 ✅
修复 2:测试断言放松 ✅
修复 3:showcase_features.py 重复实现 ✅
修复 4:ContextManager 内部标志污染 ✅
关于保留的问题(已解释):
|
关联
#4148
Modifications / 改动点
本 PR 包含两部分改动:
1. 原始 Commit (b6f1c5c) - Agent 上下文压缩系统
核心文件:
astrbot/core/context_manager/模块(完整的上下文管理系统)context_manager.py- 统一上下文压缩管理器context_compressor.py- LLM 智能摘要压缩器context_truncator.py- 对半砍截断器token_counter.py- Token 粗算计数器models.py- 数据模型定义summary_prompt.md- LLM 摘要提示词模板astrbot/core/agent/tools/todolist_tool.py- TodoList 内部工具(任务管理)astrbot/core/agent/runners/tool_loop_agent_runner.py- TodoList 注入和资源限制提示astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py- 集成上下文管理器到流水线astrbot/core/config/default.py- 添加上下文管理相关配置项astrbot/core/provider/manager.py- Provider 管理器增强astrbot/core/astr_agent_context.py- 添加 todolist 字段实现功能:
2. 本次修复 - 启用 LLM 智能摘要功能
核心文件:
astrbot/core/pipeline/process_stage/method/agent_sub_stages/internal.py_process_with_context_manager()方法添加provider参数(第 366-394 行)provider实例(第 481-486 行)实现功能:
ContextManager能够调用LLMSummaryCompressor进行智能压缩测试文件:
新增
tests/core/test_context_manager_integration.py- 10 个集成测试用例修改
tests/core/test_context_manager.py- 适配新的摘要提示词新增
tests/core/TEST_SUMMARY.md- 完整测试报告这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
测试结果:
测试覆盖:
关键验证点:
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。Summary by Sourcery
引入一个 V2 上下文管理系统,为智能体对话提供基于 LLM 的自动摘要;将内部 TodoList 工具和智能消息注入集成进智能体循环中;并在后端和控制台中贯通提供商的上下文窗口配置。
New Features:
max_context_length,同时写入后端配置和控制台中的提供商创建界面。Enhancements:
max_context_length字段,并在源配置中持久化该字段。Tests:
Original summary in English
Summary by Sourcery
Introduce a V2 context management system with automatic LLM-based summarization for agent conversations, integrate internal TodoList tools and smart message injections into the agent loop, and wire provider context window configuration throughout the backend and dashboard.
New Features:
Enhancements:
Tests: