Skip to content

Commit 097aa04

Browse files
committed
fix: add TimeoutManager
1 parent 87955c0 commit 097aa04

File tree

1 file changed

+62
-37
lines changed

1 file changed

+62
-37
lines changed

src/common/sessionStore.ts

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,54 @@ import { StreamableHTTPServerTransport } from "@modelcontextprotocol/sdk/server/
22
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
33
import logger, { LogId, McpLogger } from "./logger.js";
44

5+
class TimeoutManager {
6+
private timeoutId: NodeJS.Timeout | undefined;
7+
public onerror?: (error: unknown) => void;
8+
9+
constructor(
10+
private readonly callback: () => Promise<void> | void,
11+
private readonly timeoutMS: number
12+
) {
13+
if (timeoutMS <= 0) {
14+
throw new Error("timeoutMS must be greater than 0");
15+
}
16+
this.reset();
17+
}
18+
19+
clear() {
20+
if (this.timeoutId) {
21+
clearTimeout(this.timeoutId);
22+
this.timeoutId = undefined;
23+
}
24+
}
25+
26+
private async runCallback() {
27+
if (this.callback) {
28+
try {
29+
await this.callback();
30+
} catch (error: unknown) {
31+
this.onerror?.(error);
32+
}
33+
}
34+
}
35+
36+
reset() {
37+
this.clear();
38+
this.timeoutId = setTimeout(() => {
39+
void this.runCallback().finally(() => {
40+
this.timeoutId = undefined;
41+
});
42+
}, this.timeoutMS);
43+
}
44+
}
45+
546
export class SessionStore {
647
private sessions: {
748
[sessionId: string]: {
849
mcpServer: McpServer;
950
transport: StreamableHTTPServerTransport;
10-
abortController: AbortController;
11-
abortTimeoutId: NodeJS.Timeout;
12-
notificationTimeoutId: NodeJS.Timeout;
51+
abortTimeout: TimeoutManager;
52+
notificationTimeout: TimeoutManager;
1353
};
1454
} = {};
1555

@@ -39,21 +79,9 @@ export class SessionStore {
3979
return;
4080
}
4181

42-
if (session.abortTimeoutId) {
43-
clearTimeout(session.abortTimeoutId);
44-
}
45-
const abortTimeoutId = setTimeout(() => {
46-
session.abortController.abort();
47-
}, this.idleTimeoutMS);
48-
session.abortTimeoutId = abortTimeoutId;
82+
session.abortTimeout.reset();
4983

50-
if (session.notificationTimeoutId) {
51-
clearTimeout(session.notificationTimeoutId);
52-
}
53-
const notificationTimeoutId = setTimeout(() => {
54-
this.sendNotification(sessionId);
55-
}, this.notificationTimeoutMS);
56-
session.notificationTimeoutId = notificationTimeoutId;
84+
session.notificationTimeout.reset();
5785
}
5886

5987
private sendNotification(sessionId: string): void {
@@ -73,33 +101,31 @@ export class SessionStore {
73101
if (this.sessions[sessionId]) {
74102
throw new Error(`Session ${sessionId} already exists`);
75103
}
76-
const abortController = new AbortController();
77-
const abortTimeoutId = setTimeout(() => {
78-
abortController.abort();
79-
}, this.idleTimeoutMS);
80-
const notificationTimeoutId = setTimeout(() => {
81-
this.sendNotification(sessionId);
82-
}, this.notificationTimeoutMS);
83-
this.sessions[sessionId] = { mcpServer, transport, abortController, abortTimeoutId, notificationTimeoutId };
84-
abortController.signal.onabort = async () => {
104+
const abortTimeout = new TimeoutManager(async () => {
105+
const logger = new McpLogger(mcpServer);
106+
logger.info(
107+
LogId.streamableHttpTransportSessionCloseNotification,
108+
"sessionStore",
109+
"Session closed due to inactivity"
110+
);
111+
85112
await this.closeSession(sessionId);
86-
};
113+
}, this.idleTimeoutMS);
114+
const notificationTimeout = new TimeoutManager(
115+
() => this.sendNotification(sessionId),
116+
this.notificationTimeoutMS
117+
);
118+
this.sessions[sessionId] = { mcpServer, transport, abortTimeout, notificationTimeout };
87119
}
88120

89121
async closeSession(sessionId: string, closeTransport: boolean = true): Promise<void> {
90122
if (!this.sessions[sessionId]) {
91123
throw new Error(`Session ${sessionId} not found`);
92124
}
93-
clearTimeout(this.sessions[sessionId].abortTimeoutId);
94-
clearTimeout(this.sessions[sessionId].notificationTimeoutId);
125+
this.sessions[sessionId].abortTimeout.clear();
126+
this.sessions[sessionId].notificationTimeout.clear();
95127
if (closeTransport) {
96128
try {
97-
const logger = new McpLogger(this.sessions[sessionId].mcpServer);
98-
logger.info(
99-
LogId.streamableHttpTransportSessionCloseNotification,
100-
"sessionStore",
101-
"Session closed, please reconnect"
102-
);
103129
await this.sessions[sessionId].transport.close();
104130
} catch (error) {
105131
logger.error(
@@ -113,7 +139,6 @@ export class SessionStore {
113139
}
114140

115141
async closeAllSessions(): Promise<void> {
116-
await Promise.all(Object.values(this.sessions).map((session) => session.abortController.abort()));
117-
this.sessions = {};
142+
await Promise.all(Object.keys(this.sessions).map((sessionId) => this.closeSession(sessionId)));
118143
}
119144
}

0 commit comments

Comments
 (0)