diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..e39a9d1bf5 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -214,12 +214,14 @@ export class Adapter extends EventEmitter { const encodedPackets = this._encode(packet, packetOpts); let clientCount = 0; + const sockets: any[] = []; this.apply(opts, (socket) => { // track the total number of acknowledgements that are expected clientCount++; // call the ack callback for each client response socket.acks.set(packet.id, ack); + sockets.push(socket); if (typeof socket.notifyOutgoingListeners === "function") { socket.notifyOutgoingListeners(packet); @@ -229,6 +231,16 @@ export class Adapter extends EventEmitter { }); clientCountCallback(clientCount); + + // clean up socket.acks entries after the timeout to prevent memory leaks + // when clients do not respond with an acknowledgement + if (flags.timeout !== undefined) { + setTimeout(() => { + for (const socket of sockets) { + socket.acks.delete(packet.id); + } + }, flags.timeout); + } } private _encode(packet: unknown, packetOpts: Record) { diff --git a/packages/socket.io-adapter/test/index.ts b/packages/socket.io-adapter/test/index.ts index 285ae66f02..9895c0bc3b 100644 --- a/packages/socket.io-adapter/test/index.ts +++ b/packages/socket.io-adapter/test/index.ts @@ -317,6 +317,120 @@ describe("socket.io-adapter", () => { }); }); + describe("broadcastWithAck", () => { + it("should clean up socket.acks after timeout", (done) => { + const acks1 = new Map(); + const acks2 = new Map(); + let _ids = 0; + + function socket(id, acks) { + return [ + id, + { + id, + acks, + client: { + writeToEngine() {}, + }, + }, + ]; + } + + const nsp = { + _ids, + name: "/", + server: { + encoder: { + encode() { + return ["123"]; + }, + }, + }, + // @ts-ignore + sockets: new Map([socket("s1", acks1), socket("s2", acks2)]), + }; + + const adapter = new Adapter(nsp); + adapter.addAll("s1", new Set(["r1"])); + adapter.addAll("s2", new Set(["r1"])); + + const ackFn = () => {}; + + adapter.broadcastWithAck( + { type: 2, data: ["test", ackFn] }, + { + rooms: new Set(["r1"]), + except: new Set(), + flags: { timeout: 50 }, + }, + () => {}, + ackFn, + ); + + // acks should be set immediately + expect(acks1.size).to.be(1); + expect(acks2.size).to.be(1); + + // acks should be cleaned up after timeout + setTimeout(() => { + expect(acks1.size).to.be(0); + expect(acks2.size).to.be(0); + done(); + }, 100); + }); + + it("should not set cleanup timer when no timeout is specified", () => { + const acks1 = new Map(); + let _ids = 0; + + function socket(id, acks) { + return [ + id, + { + id, + acks, + client: { + writeToEngine() {}, + }, + }, + ]; + } + + const nsp = { + _ids, + name: "/", + server: { + encoder: { + encode() { + return ["123"]; + }, + }, + }, + // @ts-ignore + sockets: new Map([socket("s1", acks1)]), + }; + + const adapter = new Adapter(nsp); + adapter.addAll("s1", new Set(["r1"])); + + const ackFn = () => {}; + + adapter.broadcastWithAck( + { type: 2, data: ["test", ackFn] }, + { + rooms: new Set(["r1"]), + except: new Set(), + flags: {}, + }, + () => {}, + ackFn, + ); + + // acks should be set + expect(acks1.size).to.be(1); + }); + }); + describe("connection state recovery", () => { it("should persist and restore session", async () => { const adapter = new SessionAwareAdapter({