-
-
Notifications
You must be signed in to change notification settings - Fork 1.2k
fix: fix log loss on SSE reconnect using Last-Event-ID #4205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+217
−89
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Contributor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 3 个问题,并给出了一些整体性的反馈:
- 在
processNewLogs中,仅基于time做去重,可能会错误地丢弃具有相同时间戳但实际不同的日志条目;建议在相等性判断中加入额外字段(例如 data/level 或唯一 ID)。 - 在 SSE 的
onerror处理函数中,你设置了一个新的retryTimer,但没有清除已有的定时器,这会导致出现多个重叠的重连尝试;在设置新的超时之前,先清除已有的定时器。
给 AI Agent 的提示词
Please address the comments from this code review:
## Overall Comments
- In `processNewLogs`, de-duplication based only on `time` can inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check.
- In the SSE `onerror` handler, you set a new `retryTimer` without clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.
## Individual Comments
### Comment 1
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:115-121` </location>
<code_context>
+ };
+ },
+
+ processNewLogs(newLogs) {
+ if (!newLogs || newLogs.length === 0) return;
+
+ let hasUpdate = false;
+
+ newLogs.forEach(log => {
+ const exists = this.localLogCache.some(existing => existing.time === log.time);
+
+ if (!exists) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Using only the `time` field for deduplication can drop distinct logs that share the same timestamp.
In `processNewLogs`, any new log with a `time` matching an existing entry is discarded. If your logging source can emit multiple entries with the same timestamp (e.g., second-level resolution or batched events), valid logs will be lost. Consider including an additional stable field (e.g., sequence/id, array index, or a hash of `data` + `level`) in the dedup key, or delegating uniqueness to the broker instead of deduplicating here.
</issue_to_address>
### Comment 2
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:105-108` </location>
<code_context>
+ }
+ };
+
+ this.eventSource.onerror = (err) => {
+ this.eventSource.close();
+
+ this.retryTimer = setTimeout(async () => {
+ await this.fetchLogHistory();
+ this.connectSSE();
</code_context>
<issue_to_address>
**suggestion (performance):** The SSE reconnect logic may cause tight retry loops and unnecessary history re-fetches on persistent errors.
Currently, every `onerror` closes the `EventSource`, waits 1s, then reconnects and calls `fetchLogHistory()`. During extended outages this will repeatedly re-fetch the full history and create tight retry loops against the backend. Consider adding exponential backoff and/or a max retry count, and distinguishing transient network errors from persistent failures. Also consider avoiding `fetchLogHistory()` on every retry—only call it when `Last-Event-ID` is missing or when you detect an actual gap in events.
Suggested implementation:
```
this.eventSource.onmessage = (event) => {
try {
const payload = JSON.parse(event.data);
// 记录 lastEventId,供重连时判断是否需要补拉历史
if (event.lastEventId) {
this.lastEventId = event.lastEventId;
}
this.processNewLogs([payload]);
} catch (e) {
console.error('解析日志失败:', e);
}
};
```
我只能看到 `onmessage` 这一段代码,`connectSSE` 和 `onerror` 的实现没有完整展示。为实现你评论中的建议,需要在同一组件里做以下额外修改:
1. 在 `data()` 中新增用于指数退避和状态管理的字段(名称可按你们现有约定调整):
```js
data() {
return {
// ...
retryAttempts: 0,
maxRetryAttempts: 5, // 最大重试次数,可按需调整或做成可配置
baseRetryDelay: 1000, // 初始重试间隔(毫秒)
lastEventId: null, // 最新收到的 SSE 事件 ID
retryTimer: null,
eventSource: null,
// ...
};
}
```
2. 在 `connectSSE()`(或创建 `EventSource` 的地方)中,替换现有的 `onerror` 逻辑为带指数退避和最大重试次数的实现,并在 `onopen` 中重置重试计数、按需补拉历史:
```js
this.eventSource = new EventSource(/* ... */);
this.eventSource.onopen = () => {
// 连接成功,重置重试计数
this.retryAttempts = 0;
// 仅在没有 lastEventId 的情况下补拉历史,避免每次重连都全量拉取
if (!this.lastEventId) {
this.fetchLogHistory();
}
};
this.eventSource.onerror = (err) => {
console.error('SSE 连接错误:', err);
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
// 如果已经超过最大重试次数,停止重连,避免无限紧凑循环
if (this.retryAttempts >= this.maxRetryAttempts) {
console.warn('SSE 已达到最大重试次数,停止重连');
return;
}
// 指数退避:1s, 2s, 4s, 8s,最大不超过 30s
const delay = Math.min(
this.baseRetryDelay * Math.pow(2, this.retryAttempts),
30000
);
this.retryAttempts += 1;
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryTimer = setTimeout(() => {
// 重连时不再无条件调用 fetchLogHistory
// 依赖 SSE 的 Last-Event-ID / lastEventId 来保证不丢数据
this.connectSSE();
}, delay);
};
```
3. 若要进一步区分“瞬时网络故障”和“持久性失败”,可以在 `onerror` 回调中:
- 检查 `err.target.readyState`(若浏览器支持),例如:
```js
if (this.eventSource && this.eventSource.readyState === EventSource.CLOSED) {
// 远端主动关闭,按持久性错误处理
}
```
- 或基于你的后端约定,对某些错误类型或 HTTP 返回码避免立即重试,直接停机并提示用户。
4. 在组件销毁(`beforeUnmount`)时,除了已有的 `close` 和 `clearTimeout`,确保同步清理计数:
```js
beforeUnmount() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryAttempts = 0;
}
```
通过这些调整:
- `onerror` 不会再造成紧凑的无限重试循环;
- 使用指数退避降低对后端的压力;
- 只在没有 `Last-Event-ID` 时调用 `fetchLogHistory()`,避免在长时间故障期间频繁全量拉取历史;
- 通过 `lastEventId` 为未来基于事件 ID 的补拉逻辑(检测缺口时再拉历史)打下基础。
</issue_to_address>
### Comment 3
<location> `astrbot/dashboard/routes/log.py:25` </location>
<code_context>
)
async def log(self):
+ last_event_id = request.headers.get("Last-Event-ID")
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting helper functions for SSE formatting and cache replay to simplify `stream()` and reduce duplication.
You can reduce the added complexity in `stream()` by extracting two small helpers: one for SSE formatting and one for cache replay. This removes the nested `try` and duplicates while keeping behavior unchanged.
### 1. Share SSE formatting
```python
def _format_log_sse(log: dict, ts: float) -> str:
payload = {
"type": "log",
**log,
}
return f"id: {ts}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
```
Use it in both the replay and live streaming paths:
```python
# in replay loop
yield _format_log_sse(log_item, log_ts)
# in live loop
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
```
### 2. Extract cache replay logic
Move the `Last-Event-ID` handling out of the main `try` in `stream()`:
```python
async def _replay_cached_logs(log_broker: LogBroker, last_event_id: str | None):
if not last_event_id:
return
try:
last_ts = float(last_event_id)
except ValueError:
return
try:
cached_logs = list(log_broker.log_cache)
for log_item in cached_logs:
log_ts = float(log_item.get("time", 0))
if log_ts > last_ts:
yield _format_log_sse(log_item, log_ts)
except Exception as e:
logger.error(f"Log SSE 历史错误: {e}")
```
Then `stream()` becomes more linear and focused on registration and live streaming:
```python
async def log(self):
last_event_id = request.headers.get("Last-Event-ID")
async def stream():
queue = None
try:
async for event in _replay_cached_logs(self.log_broker, last_event_id):
yield event
queue = self.log_broker.register()
while True:
message = await queue.get()
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
except asyncio.CancelledError:
pass
except BaseException as e:
logger.error(f"Log SSE 连接错误: {e}")
finally:
if queue:
self.log_broker.unregister(queue)
```
This keeps all existing functionality (including error logging and SSE IDs) but flattens the control flow and removes duplication.
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据这些反馈改进后续的 Review。
Original comment in English
Hey - I've found 3 issues, and left some high level feedback:
- In
processNewLogs, de-duplication based only ontimecan inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check. - In the SSE
onerrorhandler, you set a newretryTimerwithout clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `processNewLogs`, de-duplication based only on `time` can inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check.
- In the SSE `onerror` handler, you set a new `retryTimer` without clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.
## Individual Comments
### Comment 1
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:115-121` </location>
<code_context>
+ };
+ },
+
+ processNewLogs(newLogs) {
+ if (!newLogs || newLogs.length === 0) return;
+
+ let hasUpdate = false;
+
+ newLogs.forEach(log => {
+ const exists = this.localLogCache.some(existing => existing.time === log.time);
+
+ if (!exists) {
</code_context>
<issue_to_address>
**issue (bug_risk):** Using only the `time` field for deduplication can drop distinct logs that share the same timestamp.
In `processNewLogs`, any new log with a `time` matching an existing entry is discarded. If your logging source can emit multiple entries with the same timestamp (e.g., second-level resolution or batched events), valid logs will be lost. Consider including an additional stable field (e.g., sequence/id, array index, or a hash of `data` + `level`) in the dedup key, or delegating uniqueness to the broker instead of deduplicating here.
</issue_to_address>
### Comment 2
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:105-108` </location>
<code_context>
+ }
+ };
+
+ this.eventSource.onerror = (err) => {
+ this.eventSource.close();
+
+ this.retryTimer = setTimeout(async () => {
+ await this.fetchLogHistory();
+ this.connectSSE();
</code_context>
<issue_to_address>
**suggestion (performance):** The SSE reconnect logic may cause tight retry loops and unnecessary history re-fetches on persistent errors.
Currently, every `onerror` closes the `EventSource`, waits 1s, then reconnects and calls `fetchLogHistory()`. During extended outages this will repeatedly re-fetch the full history and create tight retry loops against the backend. Consider adding exponential backoff and/or a max retry count, and distinguishing transient network errors from persistent failures. Also consider avoiding `fetchLogHistory()` on every retry—only call it when `Last-Event-ID` is missing or when you detect an actual gap in events.
Suggested implementation:
```
this.eventSource.onmessage = (event) => {
try {
const payload = JSON.parse(event.data);
// 记录 lastEventId,供重连时判断是否需要补拉历史
if (event.lastEventId) {
this.lastEventId = event.lastEventId;
}
this.processNewLogs([payload]);
} catch (e) {
console.error('解析日志失败:', e);
}
};
```
我只能看到 `onmessage` 这一段代码,`connectSSE` 和 `onerror` 的实现没有完整展示。为实现你评论中的建议,需要在同一组件里做以下额外修改:
1. 在 `data()` 中新增用于指数退避和状态管理的字段(名称可按你们现有约定调整):
```js
data() {
return {
// ...
retryAttempts: 0,
maxRetryAttempts: 5, // 最大重试次数,可按需调整或做成可配置
baseRetryDelay: 1000, // 初始重试间隔(毫秒)
lastEventId: null, // 最新收到的 SSE 事件 ID
retryTimer: null,
eventSource: null,
// ...
};
}
```
2. 在 `connectSSE()`(或创建 `EventSource` 的地方)中,替换现有的 `onerror` 逻辑为带指数退避和最大重试次数的实现,并在 `onopen` 中重置重试计数、按需补拉历史:
```js
this.eventSource = new EventSource(/* ... */);
this.eventSource.onopen = () => {
// 连接成功,重置重试计数
this.retryAttempts = 0;
// 仅在没有 lastEventId 的情况下补拉历史,避免每次重连都全量拉取
if (!this.lastEventId) {
this.fetchLogHistory();
}
};
this.eventSource.onerror = (err) => {
console.error('SSE 连接错误:', err);
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
// 如果已经超过最大重试次数,停止重连,避免无限紧凑循环
if (this.retryAttempts >= this.maxRetryAttempts) {
console.warn('SSE 已达到最大重试次数,停止重连');
return;
}
// 指数退避:1s, 2s, 4s, 8s,最大不超过 30s
const delay = Math.min(
this.baseRetryDelay * Math.pow(2, this.retryAttempts),
30000
);
this.retryAttempts += 1;
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryTimer = setTimeout(() => {
// 重连时不再无条件调用 fetchLogHistory
// 依赖 SSE 的 Last-Event-ID / lastEventId 来保证不丢数据
this.connectSSE();
}, delay);
};
```
3. 若要进一步区分“瞬时网络故障”和“持久性失败”,可以在 `onerror` 回调中:
- 检查 `err.target.readyState`(若浏览器支持),例如:
```js
if (this.eventSource && this.eventSource.readyState === EventSource.CLOSED) {
// 远端主动关闭,按持久性错误处理
}
```
- 或基于你的后端约定,对某些错误类型或 HTTP 返回码避免立即重试,直接停机并提示用户。
4. 在组件销毁(`beforeUnmount`)时,除了已有的 `close` 和 `clearTimeout`,确保同步清理计数:
```js
beforeUnmount() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
if (this.retryTimer) {
clearTimeout(this.retryTimer);
this.retryTimer = null;
}
this.retryAttempts = 0;
}
```
通过这些调整:
- `onerror` 不会再造成紧凑的无限重试循环;
- 使用指数退避降低对后端的压力;
- 只在没有 `Last-Event-ID` 时调用 `fetchLogHistory()`,避免在长时间故障期间频繁全量拉取历史;
- 通过 `lastEventId` 为未来基于事件 ID 的补拉逻辑(检测缺口时再拉历史)打下基础。
</issue_to_address>
### Comment 3
<location> `astrbot/dashboard/routes/log.py:25` </location>
<code_context>
)
async def log(self):
+ last_event_id = request.headers.get("Last-Event-ID")
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting helper functions for SSE formatting and cache replay to simplify `stream()` and reduce duplication.
You can reduce the added complexity in `stream()` by extracting two small helpers: one for SSE formatting and one for cache replay. This removes the nested `try` and duplicates while keeping behavior unchanged.
### 1. Share SSE formatting
```python
def _format_log_sse(log: dict, ts: float) -> str:
payload = {
"type": "log",
**log,
}
return f"id: {ts}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
```
Use it in both the replay and live streaming paths:
```python
# in replay loop
yield _format_log_sse(log_item, log_ts)
# in live loop
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
```
### 2. Extract cache replay logic
Move the `Last-Event-ID` handling out of the main `try` in `stream()`:
```python
async def _replay_cached_logs(log_broker: LogBroker, last_event_id: str | None):
if not last_event_id:
return
try:
last_ts = float(last_event_id)
except ValueError:
return
try:
cached_logs = list(log_broker.log_cache)
for log_item in cached_logs:
log_ts = float(log_item.get("time", 0))
if log_ts > last_ts:
yield _format_log_sse(log_item, log_ts)
except Exception as e:
logger.error(f"Log SSE 历史错误: {e}")
```
Then `stream()` becomes more linear and focused on registration and live streaming:
```python
async def log(self):
last_event_id = request.headers.get("Last-Event-ID")
async def stream():
queue = None
try:
async for event in _replay_cached_logs(self.log_broker, last_event_id):
yield event
queue = self.log_broker.register()
while True:
message = await queue.get()
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
except asyncio.CancelledError:
pass
except BaseException as e:
logger.error(f"Log SSE 连接错误: {e}")
finally:
if queue:
self.log_broker.unregister(queue)
```
This keeps all existing functionality (including error logging and SSE IDs) but flattens the control flow and removes duplication.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
Soulter
approved these changes
Dec 26, 2025
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR addresses the issue of log loss when the real-time log stream (SSE) is interrupted. Currently, if the SSE connection is dropped due to network instability or other reasons, any logs generated during the disconnection period are permanently lost. The client has no standard way to request missing logs upon reconnection. This PR implements the standard SSE
Last-Event-IDmechanism to ensure Zero Log Loss by allowing clients to resume the log stream from the last received event.Modifications / 改动点
Modified
astrbot/dashboard/routes/log.py:idfield (using timestamp) to the SSE data stream.Last-Event-IDheader from the request.LogBroker.log_cacheupon reconnection and replay logs that occurred after the timestamp provided inLast-Event-ID.dashboard/src/components/shared/ConsoleDisplayer.vue:EventSourceconnection directly instead of relying on the passive store state.Screenshots or Test Results / 运行截图或测试结果
I verified the changes via command-line tools to ensure protocol compliance and data integrity.
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
实现可靠的、可恢复的实时日志流,以避免在 SSE 中断期间发生日志丢失。
Bug Fixes(缺陷修复):
Enhancements(功能增强):
ConsoleDisplayer组件,使其拥有EventSource连接、在错误时自动重连,并将历史日志与实时日志合并,同时进行去重和有界本地缓存。Original summary in English
Summary by Sourcery
Implement reliable, resumable real-time log streaming to avoid log loss during SSE interruptions.
Bug Fixes:
Enhancements: