diff --git a/lib/internal/cluster/primary.js b/lib/internal/cluster/primary.js index af17d39de44964..6ab845c6d122aa 100644 --- a/lib/internal/cluster/primary.js +++ b/lib/internal/cluster/primary.js @@ -271,8 +271,12 @@ function queryServer(worker, message) { return; const key = `${message.address}:${message.port}:${message.addressType}:` + - `${message.fd}:${message.index}`; - let handle = handles.get(key); + `${message.fd}` + (message.port === 0 ? `:${message.index}` : ''); + const cachedHandle = handles.get(key); + let handle; + if (cachedHandle && !cachedHandle.has(worker)) { + handle = cachedHandle; + } if (handle === undefined) { let address = message.address; @@ -298,17 +302,22 @@ function queryServer(worker, message) { handle = new RoundRobinHandle(key, address, message); } - handles.set(key, handle); + if (!cachedHandle) { + handles.set(key, handle); + } } handle.data ||= message.data; // Set custom server data - handle.add(worker, (errno, reply, handle) => { + handle.add(worker, (errno, reply, serverHandle) => { + if (!errno) { + handles.set(key, handle); // Update in case it was replaced. + } const { data } = handles.get(key); - - if (errno) - handles.delete(key); // Gives other workers a chance to retry. + if (!cachedHandle && errno) { + handles.delete(key); + } send(worker, { errno, @@ -316,7 +325,7 @@ function queryServer(worker, message) { ack: message.seq, data, ...reply, - }, handle); + }, serverHandle); }); } diff --git a/lib/internal/cluster/round_robin_handle.js b/lib/internal/cluster/round_robin_handle.js index 7ba28d7f4e6a51..c2453adc8067f9 100644 --- a/lib/internal/cluster/round_robin_handle.js +++ b/lib/internal/cluster/round_robin_handle.js @@ -137,3 +137,7 @@ RoundRobinHandle.prototype.handoff = function(worker) { this.handoff(worker); }); }; + +RoundRobinHandle.prototype.has = function(worker) { + return this.all.has(worker.id); +}; diff --git a/lib/internal/cluster/shared_handle.js b/lib/internal/cluster/shared_handle.js index 88e981fdf07038..b6b9ee1ffb1af5 100644 --- a/lib/internal/cluster/shared_handle.js +++ b/lib/internal/cluster/shared_handle.js @@ -47,3 +47,7 @@ SharedHandle.prototype.remove = function(worker) { this.handle = null; return true; }; + +SharedHandle.prototype.has = function(worker) { + return this.workers.has(worker.id); +}; diff --git a/test/sequential/test-cluster-port-reuse-between-workers.js b/test/sequential/test-cluster-port-reuse-between-workers.js new file mode 100644 index 00000000000000..77d6902964e76a --- /dev/null +++ b/test/sequential/test-cluster-port-reuse-between-workers.js @@ -0,0 +1,93 @@ +'use strict'; + +const common = require('../common'); +const cluster = require('cluster'); +const assert = require('assert'); + +const acts = { + WORKER1_SERVER1_CLOSED: { cmd: 'WORKER1_SERVER1_CLOSED' }, + WORKER2_SERVER1_STARTED: { cmd: 'WORKER2_SERVER1_STARTED' }, + WORKER1_SERVER2_CLOSED: { cmd: 'WORKER1_SERVER2_CLOSED' }, +}; + +if (cluster.isMaster) { + const currentHost = '::'; + const worker1 = cluster.fork({ + WORKER_ID: 'worker1', + HOST: currentHost, + }); + let worker2; + worker1.on('error', common.mustNotCall()); + worker1.on('message', onMessage); + + function createWorker2() { + worker2 = cluster.fork({ + WORKER_ID: 'worker2', + HOST: currentHost, + }); + worker2.on('error', common.mustNotCall()); + worker2.on('message', onMessage); + } + + function onMessage(msg) { + switch (msg.cmd) { + case acts.WORKER1_SERVER1_CLOSED.cmd: + createWorker2(); + break; + case acts.WORKER2_SERVER1_STARTED.cmd: + worker1.send(acts.WORKER2_SERVER1_STARTED); + break; + case acts.WORKER1_SERVER2_CLOSED.cmd: + worker1.kill(); + worker2.kill(); + break; + default: + assert.fail(`Unexpected message ${msg.cmd}`); + } + } +} else { + const WORKER_ID = process.env.WORKER_ID; + function createServer() { + return new Promise((resolve, reject) => { + const net = require('net'); + const PORT = 8000; + const server = net + .createServer((socket) => { + socket.end( + `Handled by worker ${process.env.WORKER_ID} (${process.pid})\n` + ); + }) + .on('error', (e) => { + reject(e); + }); + + server.listen( + { + port: PORT, + host: process.env.HOST, + }, + () => resolve(server) + ); + }); + } + (async () => { + const server1 = await createServer(); + if (WORKER_ID === 'worker2') { + process.send(acts.WORKER2_SERVER1_STARTED); + } else { + await createServer().catch(common.mustCall()); + await new Promise((r) => server1.close(r)); + process.send(acts.WORKER1_SERVER1_CLOSED); + + process.on('message', async (msg) => { + if (msg.cmd === acts.WORKER2_SERVER1_STARTED.cmd) { + const server2 = await createServer(); + await new Promise((r) => server2.close(r)); + process.send(acts.WORKER1_SERVER2_CLOSED); + } else { + assert.fail(`Unexpected message ${msg.cmd}`); + } + }); + } + })().then(common.mustCall()); +}