Skip to content

Commit a213be4

Browse files
committed
feat(conversation_service): implement core session management with OTS backend
This commit introduces the Conversation Service module, which provides session state persistence capabilities using Alibaba Cloud TableStore (OTS). Key components include: SessionStore: Manages session CRUD operations and cascading deletes. OTSBackend: Encapsulates OTS SDK operations for table creation and data manipulation. Data Models: Defines core data structures for sessions, events, and states. Utilities: Provides helper functions for state serialization and timestamp generation. Documentation: Includes usage examples and design documentation. Additionally, a new dependency on tablestore has been added to pyproject.toml to support the OTS operations. Signed-off-by: 寒光 <2510399607@qq.com>
1 parent 5744dad commit a213be4

29 files changed

+13278
-11
lines changed

.gitignore

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,12 @@ dmypy.json
105105
uv.lock
106106
coverage.json
107107
coverage.json
108+
109+
# examples
110+
examples/conversation_service_adk_example.py
111+
examples/conversation_service_adk_data.py
112+
examples/conversation_service_langchain_example.py
113+
examples/conversation_service_langchain_data.py
114+
examples/conversation_service_verify.py
115+
examples/Langchain_His_example.py
116+
examples/agent-quickstart-langchain/

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,5 +149,5 @@ mypy-check: ## 运行 mypy 类型检查
149149
.PHONY: coverage
150150
coverage: ## 运行测试并显示覆盖率报告(全量代码 + 增量代码)
151151
@echo "📊 运行覆盖率测试..."
152-
@uv run python scripts/check_coverage.py
152+
@uv run --python ${PYTHON_VERSION} --all-extras python scripts/check_coverage.py $(COVERAGE_ARGS)
153153

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
# Conversation Service
2+
3+
为不同 Agent 开发框架提供**统一的会话状态持久化**能力,底层存储选用阿里云 TableStore(OTS,宽表模型)。
4+
5+
## 架构概览
6+
7+
采用 **统一存储 + 中心 Service + 薄 Adapter** 的三层设计:
8+
9+
```
10+
ADK Agent ──→ OTSSessionService ──┐
11+
│ ┌─────────────┐ ┌─────────┐
12+
LangChain ──→ OTSChatMessageHistory ──→│ SessionStore │───→│ OTS │
13+
│ │ (业务逻辑层) │───→│ Tables │
14+
LangGraph ──→ (LG Adapter) ─────┘ └─────────────┘ └─────────┘
15+
16+
OTSBackend
17+
(存储操作层)
18+
```
19+
20+
- **SessionStore**:核心业务层,理解 OTS 表结构,提供 Session / Event / State 的 CRUD、级联删除、三级状态合并等统一接口。
21+
- **OTSBackend**:存储操作层,封装 TableStore SDK 的底层调用。
22+
- **Adapter**:薄适配层,仅负责框架数据模型转换。
23+
24+
## 快速开始
25+
26+
### 前置条件
27+
28+
- 阿里云账号,配置 AK/SK 环境变量
29+
- AgentRun 平台上已创建 MemoryCollection(包含 OTS 实例配置)
30+
31+
### 安装
32+
33+
```bash
34+
pip install agentrun
35+
```
36+
37+
### 初始化
38+
39+
**方式一(推荐):通过 MemoryCollection 自动获取 OTS 连接信息**
40+
41+
```python
42+
from agentrun.conversation_service import SessionStore
43+
44+
# 环境变量:AGENTRUN_ACCESS_KEY_ID / AGENTRUN_ACCESS_KEY_SECRET
45+
store = SessionStore.from_memory_collection("your-memory-collection-name")
46+
47+
# 首次使用时创建表
48+
store.init_tables()
49+
```
50+
51+
`from_memory_collection()` 内部自动完成:
52+
1. 调用 AgentRun API 获取 MemoryCollection 配置
53+
2. 从中提取 OTS 的 endpoint 和 instance_name
54+
3.`Config` 读取 AK/SK 凭证
55+
4. 构建 OTSClient 和 OTSBackend
56+
57+
**方式二:手动传入 OTSClient**
58+
59+
```python
60+
import tablestore
61+
from agentrun.conversation_service import SessionStore, OTSBackend
62+
63+
ots_client = tablestore.OTSClient(
64+
endpoint, access_key_id, access_key_secret, instance_name,
65+
retry_policy=tablestore.WriteRetryPolicy(),
66+
)
67+
backend = OTSBackend(ots_client)
68+
store = SessionStore(backend)
69+
store.init_tables()
70+
```
71+
72+
### 表初始化策略
73+
74+
表和索引按用途分组创建,避免创建不必要的表:
75+
76+
| 方法 | 创建的资源 | 适用场景 |
77+
|------|-----------|---------|
78+
| `init_core_tables()` | Conversation + Event + 二级索引 | 所有框架 |
79+
| `init_state_tables()` | State + App_state + User_state | ADK 三级 State |
80+
| `init_search_index()` | 多元索引(conversation_search_index) | 需要搜索/过滤 |
81+
| `init_tables()` | 以上全部 | 快速开发 |
82+
83+
> 多元索引创建耗时较长(数秒级),建议与核心表创建分离,不阻塞核心流程。
84+
85+
## 使用示例
86+
87+
### Google ADK 集成
88+
89+
```python
90+
import asyncio
91+
from agentrun.conversation_service import SessionStore
92+
from agentrun.conversation_service.adapters import OTSSessionService
93+
from google.adk.agents import Agent
94+
from google.adk.runners import Runner
95+
96+
# 初始化
97+
store = SessionStore.from_memory_collection("my-collection")
98+
store.init_tables()
99+
session_service = OTSSessionService(session_store=store)
100+
101+
# 创建 Agent + Runner
102+
agent = Agent(name="assistant", model=my_model, instruction="...")
103+
runner = Runner(agent=agent, app_name="my_app", session_service=session_service)
104+
105+
# 对话自动持久化到 OTS
106+
async def chat():
107+
session = await session_service.create_session(
108+
app_name="my_app", user_id="user_1"
109+
)
110+
async for event in runner.run_async(
111+
user_id="user_1", session_id=session.id, new_message=content
112+
):
113+
...
114+
115+
asyncio.run(chat())
116+
```
117+
118+
### LangChain 集成
119+
120+
```python
121+
from agentrun.conversation_service import SessionStore
122+
from agentrun.conversation_service.adapters import OTSChatMessageHistory
123+
from langchain_core.messages import HumanMessage, AIMessage
124+
125+
# 初始化
126+
store = SessionStore.from_memory_collection("my-collection")
127+
store.init_core_tables()
128+
129+
# 创建消息历史(自动关联 Session)
130+
history = OTSChatMessageHistory(
131+
session_store=store,
132+
agent_id="my_agent",
133+
user_id="user_1",
134+
session_id="session_1",
135+
)
136+
137+
# 添加消息(自动持久化到 OTS)
138+
history.add_message(HumanMessage(content="你好"))
139+
history.add_message(AIMessage(content="你好!有什么可以帮你的?"))
140+
141+
# 读取历史消息
142+
for msg in history.messages:
143+
print(f"{msg.type}: {msg.content}")
144+
```
145+
146+
### 直接使用 SessionStore
147+
148+
```python
149+
from agentrun.conversation_service import SessionStore
150+
151+
store = SessionStore.from_memory_collection("my-collection")
152+
store.init_tables()
153+
154+
# Session CRUD
155+
session = store.create_session("agent_1", "user_1", "sess_1", summary="测试会话")
156+
sessions = store.list_sessions("agent_1", "user_1")
157+
158+
# Event CRUD
159+
event = store.append_event("agent_1", "user_1", "sess_1", "message", {"text": "hello"})
160+
events = store.get_events("agent_1", "user_1", "sess_1")
161+
recent = store.get_recent_events("agent_1", "user_1", "sess_1", n=10)
162+
163+
# 三级 State 管理(ADK 概念)
164+
store.update_app_state("agent_1", {"model": "qwen-max"})
165+
store.update_user_state("agent_1", "user_1", {"language": "zh-CN"})
166+
store.update_session_state("agent_1", "user_1", "sess_1", {"topic": "weather"})
167+
merged = store.get_merged_state("agent_1", "user_1", "sess_1")
168+
# merged = app_state <- user_state <- session_state(浅合并)
169+
170+
# 多元索引搜索
171+
results, total = store.search_sessions(
172+
"agent_1",
173+
summary_keyword="天气",
174+
updated_after=1700000000000000,
175+
limit=20,
176+
)
177+
178+
# 级联删除(Event → State → Session 行)
179+
store.delete_session("agent_1", "user_1", "sess_1")
180+
```
181+
182+
## API 参考
183+
184+
### SessionStore
185+
186+
核心业务层,所有方法同时提供同步和异步(`_async` 后缀)版本。
187+
188+
**工厂方法**
189+
190+
| 方法 | 说明 |
191+
|------|------|
192+
| `from_memory_collection(name, *, config, table_prefix)` | 通过 MemoryCollection 名称创建实例 |
193+
194+
**初始化**
195+
196+
| 方法 | 说明 |
197+
|------|------|
198+
| `init_tables()` | 创建所有表和索引 |
199+
| `init_core_tables()` | 创建核心表 + 二级索引 |
200+
| `init_state_tables()` | 创建三张 State 表 |
201+
| `init_search_index()` | 创建多元索引 |
202+
203+
**Session 管理**
204+
205+
| 方法 | 说明 |
206+
|------|------|
207+
| `create_session(agent_id, user_id, session_id, ...)` | 创建新会话 |
208+
| `get_session(agent_id, user_id, session_id)` | 获取单个会话 |
209+
| `list_sessions(agent_id, user_id, limit)` | 列出用户会话(按 updated_at 倒序) |
210+
| `list_all_sessions(agent_id, limit)` | 列出 agent 下所有会话 |
211+
| `search_sessions(agent_id, *, user_id, summary_keyword, ...)` | 多元索引搜索会话 |
212+
| `update_session(agent_id, user_id, session_id, *, version, ...)` | 更新会话属性(乐观锁) |
213+
| `delete_session(agent_id, user_id, session_id)` | 级联删除会话 |
214+
215+
**Event 管理**
216+
217+
| 方法 | 说明 |
218+
|------|------|
219+
| `append_event(agent_id, user_id, session_id, event_type, content)` | 追加事件 |
220+
| `get_events(agent_id, user_id, session_id)` | 获取全部事件(正序) |
221+
| `get_recent_events(agent_id, user_id, session_id, n)` | 获取最近 N 条事件 |
222+
| `delete_events(agent_id, user_id, session_id)` | 删除会话下所有事件 |
223+
224+
**State 管理**
225+
226+
| 方法 | 说明 |
227+
|------|------|
228+
| `get_session_state / update_session_state` | 会话级状态读写 |
229+
| `get_app_state / update_app_state` | 应用级状态读写 |
230+
| `get_user_state / update_user_state` | 用户级状态读写 |
231+
| `get_merged_state(agent_id, user_id, session_id)` | 三级状态浅合并 |
232+
233+
### 框架适配器
234+
235+
| 适配器 | 框架 | 基类 |
236+
|--------|------|------|
237+
| `OTSSessionService` | Google ADK | `BaseSessionService` |
238+
| `OTSChatMessageHistory` | LangChain | `BaseChatMessageHistory` |
239+
240+
### 领域模型
241+
242+
| 模型 | 说明 |
243+
|------|------|
244+
| `ConversationSession` | 会话对象(含 agent_id, user_id, session_id, summary, labels 等) |
245+
| `ConversationEvent` | 事件对象(含 seq_id 自增序号、type、content、raw_event) |
246+
| `StateData` | 状态数据对象(含 state 字典、version 乐观锁) |
247+
| `StateScope` | 状态作用域枚举:APP / USER / SESSION |
248+
249+
## OTS 表结构
250+
251+
共五张表 + 一个二级索引 + 一个多元索引:
252+
253+
| 表名 | 主键 | 用途 |
254+
|------|------|------|
255+
| `conversation` | agent_id, user_id, session_id | 会话元信息 |
256+
| `event` | agent_id, user_id, session_id, seq_id (自增) | 事件/消息流 |
257+
| `state` | agent_id, user_id, session_id | 会话级状态 |
258+
| `app_state` | agent_id | 应用级状态 |
259+
| `user_state` | agent_id, user_id | 用户级状态 |
260+
| `conversation_secondary_index` | agent_id, user_id, updated_at, session_id | 二级索引(list 热路径) |
261+
| `conversation_search_index` | 多元索引 | 全文搜索 / 标签过滤 / 组合查询 |
262+
263+
> 表名支持通过 `table_prefix` 参数添加前缀,实现多租户隔离。
264+
265+
## 示例代码
266+
267+
| 文件 | 说明 |
268+
|------|------|
269+
| [`conversation_service_adk_agent.py`](../../examples/conversation_service_adk_agent.py) | ADK Agent 完整对话示例,自动持久化到 OTS |
270+
| [`conversation_service_adk_example.py`](../../examples/conversation_service_adk_example.py) | ADK 数据读写验证(Session / Event / State) |
271+
| [`conversation_service_adk_data.py`](../../examples/conversation_service_adk_data.py) | ADK 模拟数据填充 + 多元索引搜索验证 |
272+
| [`conversation_service_langchain_example.py`](../../examples/conversation_service_langchain_example.py) | LangChain 消息历史读写验证 |
273+
| [`conversation_service_langchain_data.py`](../../examples/conversation_service_langchain_data.py) | LangChain 模拟数据填充 |
274+
| [`conversation_service_verify.py`](../../examples/conversation_service_verify.py) | 端到端 CRUD 验证脚本 |
275+
276+
## 环境变量
277+
278+
| 变量 | 说明 | 必填 |
279+
|------|------|------|
280+
| `AGENTRUN_ACCESS_KEY_ID` | 阿里云 Access Key ID | 是(使用 `from_memory_collection` 时) |
281+
| `AGENTRUN_ACCESS_KEY_SECRET` | 阿里云 Access Key Secret | 是(使用 `from_memory_collection` 时) |
282+
| `ALIBABA_CLOUD_ACCESS_KEY_ID` | 备选 AK 环境变量 | 否(AK 候选) |
283+
| `ALIBABA_CLOUD_ACCESS_KEY_SECRET` | 备选 SK 环境变量 | 否(SK 候选) |
284+
| `MEMORY_COLLECTION_NAME` | MemoryCollection 名称(示例脚本使用) ||
285+
286+
## 设计文档
287+
288+
详细的表设计、访问模式分析和分层架构说明见 [conversation_design.md](./conversation_design.md)
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Conversation Service 模块。
2+
3+
为不同 Agent 开发框架提供会话状态持久化能力,
4+
持久化数据库选用阿里云 TableStore(OTS,宽表模型)。
5+
6+
使用方式::
7+
8+
# 方式一(推荐):通过 MemoryCollection 自动获取 OTS 连接信息
9+
from agentrun.conversation_service import SessionStore
10+
11+
store = SessionStore.from_memory_collection("your-memory-collection-name")
12+
store.init_tables()
13+
14+
# 方式二:手动传入 OTSClient
15+
import tablestore
16+
from agentrun.conversation_service import SessionStore, OTSBackend
17+
18+
ots_client = tablestore.OTSClient(
19+
endpoint, access_key_id, access_key_secret, instance_name,
20+
)
21+
backend = OTSBackend(ots_client)
22+
store = SessionStore(backend)
23+
store.init_tables()
24+
"""
25+
26+
from agentrun.conversation_service.model import (
27+
ConversationEvent,
28+
ConversationSession,
29+
StateData,
30+
StateScope,
31+
)
32+
from agentrun.conversation_service.ots_backend import OTSBackend
33+
from agentrun.conversation_service.session_store import SessionStore
34+
35+
__all__ = [
36+
# 核心服务
37+
"SessionStore",
38+
"OTSBackend",
39+
# 领域模型
40+
"ConversationSession",
41+
"ConversationEvent",
42+
"StateData",
43+
"StateScope",
44+
]

0 commit comments

Comments
 (0)