diff --git a/packages/socket.io-adapter/lib/cluster-adapter.ts b/packages/socket.io-adapter/lib/cluster-adapter.ts index c53a48b4be..775a9fb296 100644 --- a/packages/socket.io-adapter/lib/cluster-adapter.ts +++ b/packages/socket.io-adapter/lib/cluster-adapter.ts @@ -482,7 +482,7 @@ export abstract class ClusterAdapter extends Adapter { opts: BroadcastOptions, clientCountCallback: (clientCount: number) => void, ack: (...args: any[]) => void, - ) { + ): { cleanup: () => void } { const onlyLocal = opts?.flags?.local; if (!onlyLocal) { const requestId = randomId(); @@ -508,7 +508,7 @@ export abstract class ClusterAdapter extends Adapter { }, opts.flags!.timeout); } - super.broadcastWithAck(packet, opts, clientCountCallback, ack); + return super.broadcastWithAck(packet, opts, clientCountCallback, ack); } override async addSockets(opts: BroadcastOptions, rooms: Room[]) { diff --git a/packages/socket.io-adapter/lib/in-memory-adapter.ts b/packages/socket.io-adapter/lib/in-memory-adapter.ts index cf178170e0..9bef59b336 100644 --- a/packages/socket.io-adapter/lib/in-memory-adapter.ts +++ b/packages/socket.io-adapter/lib/in-memory-adapter.ts @@ -199,7 +199,7 @@ export class Adapter extends EventEmitter { opts: BroadcastOptions, clientCountCallback: (clientCount: number) => void, ack: (...args: any[]) => void, - ) { + ): { cleanup: () => void } { const flags = opts.flags || {}; const packetOpts = { preEncoded: true, @@ -214,12 +214,15 @@ export class Adapter extends EventEmitter { const encodedPackets = this._encode(packet, packetOpts); let clientCount = 0; + const sentToSockets: any[] = []; this.apply(opts, (socket) => { // track the total number of acknowledgements that are expected clientCount++; // call the ack callback for each client response socket.acks.set(packet.id, ack); + // track sockets for cleanup on timeout + sentToSockets.push(socket); if (typeof socket.notifyOutgoingListeners === "function") { socket.notifyOutgoingListeners(packet); @@ -229,6 +232,20 @@ export class Adapter extends EventEmitter { }); clientCountCallback(clientCount); + + const packetId = packet.id; + return { + cleanup: () => { + // remove pending acks from all sockets on timeout + for (const socket of sentToSockets) { + if (socket.acks) { + socket.acks.delete(packetId); + } + } + // clear array to release socket references + sentToSockets.length = 0; + }, + }; } private _encode(packet: unknown, packetOpts: Record) { diff --git a/packages/socket.io/lib/broadcast-operator.ts b/packages/socket.io/lib/broadcast-operator.ts index 6a229daef6..37058b037a 100644 --- a/packages/socket.io/lib/broadcast-operator.ts +++ b/packages/socket.io/lib/broadcast-operator.ts @@ -232,9 +232,14 @@ export class BroadcastOperator const ack = data.pop() as (...args: any[]) => void; let timedOut = false; let responses: any[] = []; + let cleanup: (() => void) | undefined; const timer = setTimeout(() => { timedOut = true; + // cleanup pending acks to prevent memory leak + if (cleanup) { + cleanup(); + } ack.apply(this, [ new Error("operation has timed out"), this.flags.expectSingleResponse ? null : responses, @@ -259,7 +264,7 @@ export class BroadcastOperator } }; - this.adapter.broadcastWithAck( + const result = this.adapter.broadcastWithAck( packet, { rooms: this.rooms, @@ -279,6 +284,11 @@ export class BroadcastOperator }, ); + // store cleanup function for timeout handler + if (result && typeof result.cleanup === "function") { + cleanup = result.cleanup; + } + this.adapter.serverCount().then((serverCount) => { expectedServerCount = serverCount; checkCompleteness(); diff --git a/packages/socket.io/test/socket-timeout.ts b/packages/socket.io/test/socket-timeout.ts index 62f36807b8..92a26aeda6 100644 --- a/packages/socket.io/test/socket-timeout.ts +++ b/packages/socket.io/test/socket-timeout.ts @@ -84,4 +84,75 @@ describe("timeout", () => { success(done, io, client); }); }); + + it("should cleanup pending acks on broadcast timeout (memory leak fix)", (done) => { + const io = new Server(0); + const client = createClient(io, "/"); + + // Client does not acknowledge the event (simulates timeout scenario) + client.on("test-event", () => { + // intentionally not calling the callback + }); + + io.on("connection", async (socket) => { + socket.join("test-room"); + + // Get initial acks count (cast to any to access private property in test) + const initialAcksSize = (socket as any).acks.size; + + try { + await io.timeout(50).to("test-room").emitWithAck("test-event", "data"); + expect().fail("should have timed out"); + } catch (err) { + expect(err).to.be.an(Error); + + // After timeout, acks should be cleaned up (no memory leak) + // Wait a bit for cleanup to complete + setTimeout(() => { + expect((socket as any).acks.size).to.be(initialAcksSize); + success(done, io, client); + }, 10); + } + }); + }); + + it("should cleanup pending acks on broadcast timeout with multiple clients", (done) => { + const io = new Server(0); + const client1 = createClient(io, "/"); + const client2 = createClient(io, "/"); + + let connectedSockets: any[] = []; + + // Clients do not acknowledge + client1.on("test-event", () => {}); + client2.on("test-event", () => {}); + + io.on("connection", (socket) => { + socket.join("test-room"); + connectedSockets.push(socket); + + if (connectedSockets.length === 2) { + runTest(); + } + }); + + async function runTest() { + const initialAcksSizes = connectedSockets.map((s) => s.acks.size); + + try { + await io.timeout(50).to("test-room").emitWithAck("test-event", "data"); + expect().fail("should have timed out"); + } catch (err) { + expect(err).to.be.an(Error); + + setTimeout(() => { + // All sockets should have their acks cleaned up + connectedSockets.forEach((socket, i) => { + expect(socket.acks.size).to.be(initialAcksSizes[i]); + }); + success(done, io, client1, client2); + }, 10); + } + } + }); });