From 6705e05103249670682b9a8f3e4c087461bd2636 Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Sat, 14 Feb 2026 02:46:33 -0800 Subject: [PATCH] fix: clean up socket.acks entries on broadcastWithAck timeout When using emitWithAck with a timeout on broadcast operations, the ack callbacks stored in each socket's acks map were never cleaned up if the client didn't respond before the timeout fired. Over time this causes significant memory leaks, especially with many sockets and frequent broadcasts. The fix tracks which sockets received the broadcast ack and schedules cleanup of their acks map entries after the timeout period, matching the behavior already present in individual socket registerAckCallback. Fixes socketio/socket.io#4984 --- .../lib/in-memory-adapter.ts | 12 ++ packages/socket.io-adapter/test/index.ts | 114 ++++++++++++++++++ 2 files changed, 126 insertions(+) diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..e39a9d1bf5 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -214,12 +214,14 @@ export class Adapter extends EventEmitter { const encodedPackets = this._encode(packet, packetOpts); let clientCount = 0; + const sockets: any[] = []; this.apply(opts, (socket) => { // track the total number of acknowledgements that are expected clientCount++; // call the ack callback for each client response socket.acks.set(packet.id, ack); + sockets.push(socket); if (typeof socket.notifyOutgoingListeners === "function") { socket.notifyOutgoingListeners(packet); @@ -229,6 +231,16 @@ export class Adapter extends EventEmitter { }); clientCountCallback(clientCount); + + // clean up socket.acks entries after the timeout to prevent memory leaks + // when clients do not respond with an acknowledgement + if (flags.timeout !== undefined) { + setTimeout(() => { + for (const socket of sockets) { + socket.acks.delete(packet.id); + } + }, flags.timeout); + } } private _encode(packet: unknown, packetOpts: Record) { diff --git a/packages/socket.io-adapter/test/index.ts b/packages/socket.io-adapter/test/index.ts index 285ae66f02..9895c0bc3b 100644 --- a/packages/socket.io-adapter/test/index.ts +++ b/packages/socket.io-adapter/test/index.ts @@ -317,6 +317,120 @@ describe("socket.io-adapter", () => { }); }); + describe("broadcastWithAck", () => { + it("should clean up socket.acks after timeout", (done) => { + const acks1 = new Map(); + const acks2 = new Map(); + let _ids = 0; + + function socket(id, acks) { + return [ + id, + { + id, + acks, + client: { + writeToEngine() {}, + }, + }, + ]; + } + + const nsp = { + _ids, + name: "/", + server: { + encoder: { + encode() { + return ["123"]; + }, + }, + }, + // @ts-ignore + sockets: new Map([socket("s1", acks1), socket("s2", acks2)]), + }; + + const adapter = new Adapter(nsp); + adapter.addAll("s1", new Set(["r1"])); + adapter.addAll("s2", new Set(["r1"])); + + const ackFn = () => {}; + + adapter.broadcastWithAck( + { type: 2, data: ["test", ackFn] }, + { + rooms: new Set(["r1"]), + except: new Set(), + flags: { timeout: 50 }, + }, + () => {}, + ackFn, + ); + + // acks should be set immediately + expect(acks1.size).to.be(1); + expect(acks2.size).to.be(1); + + // acks should be cleaned up after timeout + setTimeout(() => { + expect(acks1.size).to.be(0); + expect(acks2.size).to.be(0); + done(); + }, 100); + }); + + it("should not set cleanup timer when no timeout is specified", () => { + const acks1 = new Map(); + let _ids = 0; + + function socket(id, acks) { + return [ + id, + { + id, + acks, + client: { + writeToEngine() {}, + }, + }, + ]; + } + + const nsp = { + _ids, + name: "/", + server: { + encoder: { + encode() { + return ["123"]; + }, + }, + }, + // @ts-ignore + sockets: new Map([socket("s1", acks1)]), + }; + + const adapter = new Adapter(nsp); + adapter.addAll("s1", new Set(["r1"])); + + const ackFn = () => {}; + + adapter.broadcastWithAck( + { type: 2, data: ["test", ackFn] }, + { + rooms: new Set(["r1"]), + except: new Set(), + flags: {}, + }, + () => {}, + ackFn, + ); + + // acks should be set + expect(acks1.size).to.be(1); + }); + }); + describe("connection state recovery", () => { it("should persist and restore session", async () => { const adapter = new SessionAwareAdapter({