Skip to content

Conversation

@IGCrystal-A
Copy link
Contributor

@IGCrystal-A IGCrystal-A commented Dec 26, 2025

This PR addresses the issue of log loss when the real-time log stream (SSE) is interrupted. Currently, if the SSE connection is dropped due to network instability or other reasons, any logs generated during the disconnection period are permanently lost. The client has no standard way to request missing logs upon reconnection. This PR implements the standard SSE Last-Event-ID mechanism to ensure Zero Log Loss by allowing clients to resume the log stream from the last received event.

Modifications / 改动点

Modified

astrbot/dashboard/routes/log.py:

  1. ID Generation: Added the id field (using timestamp) to the SSE data stream.
  2. Header Handling: Added logic to read the Last-Event-ID header from the request.
  3. Replay Mechanism: Implemented logic to check LogBroker.log_cache upon reconnection and replay logs that occurred after the timestamp provided in Last-Event-ID.

dashboard/src/components/shared/ConsoleDisplayer.vue:

  1. Active Connection Management: Refactored the component to manage the EventSource connection directly instead of relying on the passive store state.
  2. Smart Reconnection: Implemented an error handler that automatically fetches missing history and re-establishes the SSE connection upon disconnection.
  3. De-duplication: Added logic to seamlessly merge historical logs with real-time logs to prevent data duplication or loss during network jitter.
  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

I verified the changes via command-line tools to ensure protocol compliance and data integrity.


Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

实现可靠的、可恢复的实时日志流,以避免在 SSE 中断期间发生日志丢失。

Bug Fixes(缺陷修复):

  • 通过让客户端能够从最后接收到的事件继续恢复,防止在 SSE 日志流中断时发生日志丢失。

Enhancements(功能增强):

  • 基于日志时间戳添加 SSE 事件 ID,并在服务器端支持 Last-Event-ID,从内存缓存中重放丢失的日志。
  • 重构 ConsoleDisplayer 组件,使其拥有 EventSource 连接、在错误时自动重连,并将历史日志与实时日志合并,同时进行去重和有界本地缓存。
  • 微调控制台日志渲染样式,以略微提升可读性。
Original summary in English

Summary by Sourcery

Implement reliable, resumable real-time log streaming to avoid log loss during SSE interruptions.

Bug Fixes:

  • Prevent loss of logs when the SSE log stream is interrupted by enabling clients to resume from the last received event.

Enhancements:

  • Add SSE event IDs based on log timestamps and support Last-Event-ID on the server to replay missed logs from the in-memory cache.
  • Refactor the ConsoleDisplayer component to own the EventSource connection, automatically reconnect on errors, and merge historical and live logs with de-duplication and bounded local caching.
  • Tweak console log rendering styles for slightly improved readability.

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 3 个问题,并给出了一些整体性的反馈:

  • processNewLogs 中,仅基于 time 做去重,可能会错误地丢弃具有相同时间戳但实际不同的日志条目;建议在相等性判断中加入额外字段(例如 data/level 或唯一 ID)。
  • 在 SSE 的 onerror 处理函数中,你设置了一个新的 retryTimer,但没有清除已有的定时器,这会导致出现多个重叠的重连尝试;在设置新的超时之前,先清除已有的定时器。
给 AI Agent 的提示词
Please address the comments from this code review:

## Overall Comments
- In `processNewLogs`, de-duplication based only on `time` can inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check.
- In the SSE `onerror` handler, you set a new `retryTimer` without clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.

## Individual Comments

### Comment 1
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:115-121` </location>
<code_context>
+      };
+    },
+
+    processNewLogs(newLogs) {
+      if (!newLogs || newLogs.length === 0) return;
+
+      let hasUpdate = false;
+
+      newLogs.forEach(log => {
+        const exists = this.localLogCache.some(existing => existing.time === log.time);
+        
+        if (!exists) {
</code_context>

<issue_to_address>
**issue (bug_risk):** Using only the `time` field for deduplication can drop distinct logs that share the same timestamp.

In `processNewLogs`, any new log with a `time` matching an existing entry is discarded. If your logging source can emit multiple entries with the same timestamp (e.g., second-level resolution or batched events), valid logs will be lost. Consider including an additional stable field (e.g., sequence/id, array index, or a hash of `data` + `level`) in the dedup key, or delegating uniqueness to the broker instead of deduplicating here.
</issue_to_address>

### Comment 2
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:105-108` </location>
<code_context>
+        }
+      };
+
+      this.eventSource.onerror = (err) => {
+        this.eventSource.close();
+        
+        this.retryTimer = setTimeout(async () => {
+          await this.fetchLogHistory();
+          this.connectSSE();
</code_context>

<issue_to_address>
**suggestion (performance):** The SSE reconnect logic may cause tight retry loops and unnecessary history re-fetches on persistent errors.

Currently, every `onerror` closes the `EventSource`, waits 1s, then reconnects and calls `fetchLogHistory()`. During extended outages this will repeatedly re-fetch the full history and create tight retry loops against the backend. Consider adding exponential backoff and/or a max retry count, and distinguishing transient network errors from persistent failures. Also consider avoiding `fetchLogHistory()` on every retry—only call it when `Last-Event-ID` is missing or when you detect an actual gap in events.

Suggested implementation:

```
      this.eventSource.onmessage = (event) => {
        try {
          const payload = JSON.parse(event.data);

          // 记录 lastEventId,供重连时判断是否需要补拉历史
          if (event.lastEventId) {
            this.lastEventId = event.lastEventId;
          }

          this.processNewLogs([payload]);
        } catch (e) {
          console.error('解析日志失败:', e);
        }
      };

```

我只能看到 `onmessage` 这一段代码,`connectSSE``onerror` 的实现没有完整展示。为实现你评论中的建议,需要在同一组件里做以下额外修改:

1.`data()` 中新增用于指数退避和状态管理的字段(名称可按你们现有约定调整):
   ```js
   data() {
     return {
       // ...
       retryAttempts: 0,
       maxRetryAttempts: 5,          // 最大重试次数,可按需调整或做成可配置
       baseRetryDelay: 1000,         // 初始重试间隔(毫秒)
       lastEventId: null,            // 最新收到的 SSE 事件 ID
       retryTimer: null,
       eventSource: null,
       // ...
     };
   }
   ```

2.`connectSSE()`(或创建 `EventSource` 的地方)中,替换现有的 `onerror` 逻辑为带指数退避和最大重试次数的实现,并在 `onopen` 中重置重试计数、按需补拉历史:
   ```js
   this.eventSource = new EventSource(/* ... */);

   this.eventSource.onopen = () => {
     // 连接成功,重置重试计数
     this.retryAttempts = 0;

     // 仅在没有 lastEventId 的情况下补拉历史,避免每次重连都全量拉取
     if (!this.lastEventId) {
       this.fetchLogHistory();
     }
   };

   this.eventSource.onerror = (err) => {
     console.error('SSE 连接错误:', err);

     if (this.eventSource) {
       this.eventSource.close();
       this.eventSource = null;
     }

     // 如果已经超过最大重试次数,停止重连,避免无限紧凑循环
     if (this.retryAttempts >= this.maxRetryAttempts) {
       console.warn('SSE 已达到最大重试次数,停止重连');
       return;
     }

     // 指数退避:1s, 2s, 4s, 8s,最大不超过 30s
     const delay = Math.min(
       this.baseRetryDelay * Math.pow(2, this.retryAttempts),
       30000
     );
     this.retryAttempts += 1;

     if (this.retryTimer) {
       clearTimeout(this.retryTimer);
       this.retryTimer = null;
     }

     this.retryTimer = setTimeout(() => {
       // 重连时不再无条件调用 fetchLogHistory
       // 依赖 SSE 的 Last-Event-ID / lastEventId 来保证不丢数据
       this.connectSSE();
     }, delay);
   };
   ```

3. 若要进一步区分“瞬时网络故障”和“持久性失败”,可以在 `onerror` 回调中:
   - 检查 `err.target.readyState`(若浏览器支持),例如:
     ```js
     if (this.eventSource && this.eventSource.readyState === EventSource.CLOSED) {
       // 远端主动关闭,按持久性错误处理
     }
     ```
   - 或基于你的后端约定,对某些错误类型或 HTTP 返回码避免立即重试,直接停机并提示用户。

4. 在组件销毁(`beforeUnmount`)时,除了已有的 `close``clearTimeout`,确保同步清理计数:
   ```js
   beforeUnmount() {
     if (this.eventSource) {
       this.eventSource.close();
       this.eventSource = null;
     }
     if (this.retryTimer) {
       clearTimeout(this.retryTimer);
       this.retryTimer = null;
     }
     this.retryAttempts = 0;
   }
   ```

通过这些调整:
- `onerror` 不会再造成紧凑的无限重试循环;
- 使用指数退避降低对后端的压力;
- 只在没有 `Last-Event-ID` 时调用 `fetchLogHistory()`,避免在长时间故障期间频繁全量拉取历史;
- 通过 `lastEventId` 为未来基于事件 ID 的补拉逻辑(检测缺口时再拉历史)打下基础。
</issue_to_address>

### Comment 3
<location> `astrbot/dashboard/routes/log.py:25` </location>
<code_context>
         )

     async def log(self):
+        last_event_id = request.headers.get("Last-Event-ID")
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting helper functions for SSE formatting and cache replay to simplify `stream()` and reduce duplication.

You can reduce the added complexity in `stream()` by extracting two small helpers: one for SSE formatting and one for cache replay. This removes the nested `try` and duplicates while keeping behavior unchanged.

### 1. Share SSE formatting

```python
def _format_log_sse(log: dict, ts: float) -> str:
    payload = {
        "type": "log",
        **log,
    }
    return f"id: {ts}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
```

Use it in both the replay and live streaming paths:

```python
# in replay loop
yield _format_log_sse(log_item, log_ts)

# in live loop
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
```

### 2. Extract cache replay logic

Move the `Last-Event-ID` handling out of the main `try` in `stream()`:

```python
async def _replay_cached_logs(log_broker: LogBroker, last_event_id: str | None):
    if not last_event_id:
        return

    try:
        last_ts = float(last_event_id)
    except ValueError:
        return

    try:
        cached_logs = list(log_broker.log_cache)
        for log_item in cached_logs:
            log_ts = float(log_item.get("time", 0))
            if log_ts > last_ts:
                yield _format_log_sse(log_item, log_ts)
    except Exception as e:
        logger.error(f"Log SSE 历史错误: {e}")
```

Then `stream()` becomes more linear and focused on registration and live streaming:

```python
async def log(self):
    last_event_id = request.headers.get("Last-Event-ID")

    async def stream():
        queue = None
        try:
            async for event in _replay_cached_logs(self.log_broker, last_event_id):
                yield event

            queue = self.log_broker.register()
            while True:
                message = await queue.get()
                current_ts = message.get("time", time.time())
                yield _format_log_sse(message, current_ts)
        except asyncio.CancelledError:
            pass
        except BaseException as e:
            logger.error(f"Log SSE 连接错误: {e}")
        finally:
            if queue:
                self.log_broker.unregister(queue)
```

This keeps all existing functionality (including error logging and SSE IDs) but flattens the control flow and removes duplication.
</issue_to_address>

Sourcery 对开源项目免费——如果你觉得这些 Review 有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据这些反馈改进后续的 Review。
Original comment in English

Hey - I've found 3 issues, and left some high level feedback:

  • In processNewLogs, de-duplication based only on time can inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check.
  • In the SSE onerror handler, you set a new retryTimer without clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- In `processNewLogs`, de-duplication based only on `time` can inadvertently drop distinct log entries that share the same timestamp; consider including additional fields (e.g., data/level or a unique ID) in the equality check.
- In the SSE `onerror` handler, you set a new `retryTimer` without clearing any existing one, which can create multiple overlapping reconnection attempts; clear an existing timer before scheduling a new timeout.

## Individual Comments

### Comment 1
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:115-121` </location>
<code_context>
+      };
+    },
+
+    processNewLogs(newLogs) {
+      if (!newLogs || newLogs.length === 0) return;
+
+      let hasUpdate = false;
+
+      newLogs.forEach(log => {
+        const exists = this.localLogCache.some(existing => existing.time === log.time);
+        
+        if (!exists) {
</code_context>

<issue_to_address>
**issue (bug_risk):** Using only the `time` field for deduplication can drop distinct logs that share the same timestamp.

In `processNewLogs`, any new log with a `time` matching an existing entry is discarded. If your logging source can emit multiple entries with the same timestamp (e.g., second-level resolution or batched events), valid logs will be lost. Consider including an additional stable field (e.g., sequence/id, array index, or a hash of `data` + `level`) in the dedup key, or delegating uniqueness to the broker instead of deduplicating here.
</issue_to_address>

### Comment 2
<location> `dashboard/src/components/shared/ConsoleDisplayer.vue:105-108` </location>
<code_context>
+        }
+      };
+
+      this.eventSource.onerror = (err) => {
+        this.eventSource.close();
+        
+        this.retryTimer = setTimeout(async () => {
+          await this.fetchLogHistory();
+          this.connectSSE();
</code_context>

<issue_to_address>
**suggestion (performance):** The SSE reconnect logic may cause tight retry loops and unnecessary history re-fetches on persistent errors.

Currently, every `onerror` closes the `EventSource`, waits 1s, then reconnects and calls `fetchLogHistory()`. During extended outages this will repeatedly re-fetch the full history and create tight retry loops against the backend. Consider adding exponential backoff and/or a max retry count, and distinguishing transient network errors from persistent failures. Also consider avoiding `fetchLogHistory()` on every retry—only call it when `Last-Event-ID` is missing or when you detect an actual gap in events.

Suggested implementation:

```
      this.eventSource.onmessage = (event) => {
        try {
          const payload = JSON.parse(event.data);

          // 记录 lastEventId,供重连时判断是否需要补拉历史
          if (event.lastEventId) {
            this.lastEventId = event.lastEventId;
          }

          this.processNewLogs([payload]);
        } catch (e) {
          console.error('解析日志失败:', e);
        }
      };

```

我只能看到 `onmessage` 这一段代码,`connectSSE``onerror` 的实现没有完整展示。为实现你评论中的建议,需要在同一组件里做以下额外修改:

1.`data()` 中新增用于指数退避和状态管理的字段(名称可按你们现有约定调整):
   ```js
   data() {
     return {
       // ...
       retryAttempts: 0,
       maxRetryAttempts: 5,          // 最大重试次数,可按需调整或做成可配置
       baseRetryDelay: 1000,         // 初始重试间隔(毫秒)
       lastEventId: null,            // 最新收到的 SSE 事件 ID
       retryTimer: null,
       eventSource: null,
       // ...
     };
   }
   ```

2.`connectSSE()`(或创建 `EventSource` 的地方)中,替换现有的 `onerror` 逻辑为带指数退避和最大重试次数的实现,并在 `onopen` 中重置重试计数、按需补拉历史:
   ```js
   this.eventSource = new EventSource(/* ... */);

   this.eventSource.onopen = () => {
     // 连接成功,重置重试计数
     this.retryAttempts = 0;

     // 仅在没有 lastEventId 的情况下补拉历史,避免每次重连都全量拉取
     if (!this.lastEventId) {
       this.fetchLogHistory();
     }
   };

   this.eventSource.onerror = (err) => {
     console.error('SSE 连接错误:', err);

     if (this.eventSource) {
       this.eventSource.close();
       this.eventSource = null;
     }

     // 如果已经超过最大重试次数,停止重连,避免无限紧凑循环
     if (this.retryAttempts >= this.maxRetryAttempts) {
       console.warn('SSE 已达到最大重试次数,停止重连');
       return;
     }

     // 指数退避:1s, 2s, 4s, 8s,最大不超过 30s
     const delay = Math.min(
       this.baseRetryDelay * Math.pow(2, this.retryAttempts),
       30000
     );
     this.retryAttempts += 1;

     if (this.retryTimer) {
       clearTimeout(this.retryTimer);
       this.retryTimer = null;
     }

     this.retryTimer = setTimeout(() => {
       // 重连时不再无条件调用 fetchLogHistory
       // 依赖 SSE 的 Last-Event-ID / lastEventId 来保证不丢数据
       this.connectSSE();
     }, delay);
   };
   ```

3. 若要进一步区分“瞬时网络故障”和“持久性失败”,可以在 `onerror` 回调中:
   - 检查 `err.target.readyState`(若浏览器支持),例如:
     ```js
     if (this.eventSource && this.eventSource.readyState === EventSource.CLOSED) {
       // 远端主动关闭,按持久性错误处理
     }
     ```
   - 或基于你的后端约定,对某些错误类型或 HTTP 返回码避免立即重试,直接停机并提示用户。

4. 在组件销毁(`beforeUnmount`)时,除了已有的 `close``clearTimeout`,确保同步清理计数:
   ```js
   beforeUnmount() {
     if (this.eventSource) {
       this.eventSource.close();
       this.eventSource = null;
     }
     if (this.retryTimer) {
       clearTimeout(this.retryTimer);
       this.retryTimer = null;
     }
     this.retryAttempts = 0;
   }
   ```

通过这些调整:
- `onerror` 不会再造成紧凑的无限重试循环;
- 使用指数退避降低对后端的压力;
- 只在没有 `Last-Event-ID` 时调用 `fetchLogHistory()`,避免在长时间故障期间频繁全量拉取历史;
- 通过 `lastEventId` 为未来基于事件 ID 的补拉逻辑(检测缺口时再拉历史)打下基础。
</issue_to_address>

### Comment 3
<location> `astrbot/dashboard/routes/log.py:25` </location>
<code_context>
         )

     async def log(self):
+        last_event_id = request.headers.get("Last-Event-ID")
+
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting helper functions for SSE formatting and cache replay to simplify `stream()` and reduce duplication.

You can reduce the added complexity in `stream()` by extracting two small helpers: one for SSE formatting and one for cache replay. This removes the nested `try` and duplicates while keeping behavior unchanged.

### 1. Share SSE formatting

```python
def _format_log_sse(log: dict, ts: float) -> str:
    payload = {
        "type": "log",
        **log,
    }
    return f"id: {ts}\ndata: {json.dumps(payload, ensure_ascii=False)}\n\n"
```

Use it in both the replay and live streaming paths:

```python
# in replay loop
yield _format_log_sse(log_item, log_ts)

# in live loop
current_ts = message.get("time", time.time())
yield _format_log_sse(message, current_ts)
```

### 2. Extract cache replay logic

Move the `Last-Event-ID` handling out of the main `try` in `stream()`:

```python
async def _replay_cached_logs(log_broker: LogBroker, last_event_id: str | None):
    if not last_event_id:
        return

    try:
        last_ts = float(last_event_id)
    except ValueError:
        return

    try:
        cached_logs = list(log_broker.log_cache)
        for log_item in cached_logs:
            log_ts = float(log_item.get("time", 0))
            if log_ts > last_ts:
                yield _format_log_sse(log_item, log_ts)
    except Exception as e:
        logger.error(f"Log SSE 历史错误: {e}")
```

Then `stream()` becomes more linear and focused on registration and live streaming:

```python
async def log(self):
    last_event_id = request.headers.get("Last-Event-ID")

    async def stream():
        queue = None
        try:
            async for event in _replay_cached_logs(self.log_broker, last_event_id):
                yield event

            queue = self.log_broker.register()
            while True:
                message = await queue.get()
                current_ts = message.get("time", time.time())
                yield _format_log_sse(message, current_ts)
        except asyncio.CancelledError:
            pass
        except BaseException as e:
            logger.error(f"Log SSE 连接错误: {e}")
        finally:
            if queue:
                self.log_broker.unregister(queue)
```

This keeps all existing functionality (including error logging and SSE IDs) but flattens the control flow and removes duplication.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

@dosubot dosubot bot added the size:L This PR changes 100-499 lines, ignoring generated files. label Dec 26, 2025
@dosubot dosubot bot added the lgtm This PR has been approved by a maintainer label Dec 26, 2025
@Soulter Soulter changed the title Fix/log fix: fix log loss on SSE reconnect using Last-Event-ID Dec 26, 2025
@Soulter Soulter merged commit 9a5cc97 into AstrBotDevs:master Dec 26, 2025
4 of 5 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

lgtm This PR has been approved by a maintainer size:L This PR changes 100-499 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants