From a9762cbf0cefe2c292b2cd6fa41ffc9e9314a85c Mon Sep 17 00:00:00 2001 From: luissantosjs Date: Tue, 2 Sep 2025 20:22:58 +0100 Subject: [PATCH] feat: improve RabbitMQ reliability with unlimited retries and publisher confirms --- .../event/rabbitmq/rabbitmq.controller.ts | 429 +++++++++++------- 1 file changed, 256 insertions(+), 173 deletions(-) diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index 3295b12de..d5be43248 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -9,11 +9,13 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co export class RabbitmqController extends EventController implements EventControllerInterface { public amqpChannel: amqp.Channel | null = null; private amqpConnection: amqp.Connection | null = null; + private reconnectTimer: NodeJS.Timeout | null = null; + private reconnectAttempt: number = 0; + private maxReconnectDelay: number = 300000; // 5 minutos máximo + private baseReconnectDelay: number = 1000; // 1 segundo inicial + private isReconnecting: boolean = false; + private isShuttingDown: boolean = false; private readonly logger = new Logger('RabbitmqController'); - private reconnectAttempts = 0; - private maxReconnectAttempts = 10; - private reconnectDelay = 5000; // 5 seconds - private isReconnecting = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -24,15 +26,23 @@ export class RabbitmqController extends EventController implements EventControll return; } - await this.connect(); + return this.connect(); } private async connect(): Promise { - return new Promise((resolve, reject) => { + if (this.isReconnecting) { + return; + } + + this.isReconnecting = true; + + try { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; + this.logger.info(`Tentativa de conexão RabbitMQ #${this.reconnectAttempt + 1}...`); + const url = new URL(uri); const connectionOptions = { protocol: url.protocol.slice(0, -1), @@ -42,142 +52,204 @@ export class RabbitmqController extends EventController implements EventControll password: url.password || 'guest', vhost: url.pathname.slice(1) || '/', frameMax: frameMax, - heartbeat: 30, // Add heartbeat of 30 seconds + reconnect: true, + reconnectBackoffStrategy: 'linear', + reconnectExponentialLimit: 120000, + reconnectTimeInSeconds: 5, }; + - amqp.connect(connectionOptions, (error: Error, connection: amqp.Connection) => { - if (error) { - this.logger.error({ - local: 'RabbitmqController.connect', - message: 'Failed to connect to RabbitMQ', - error: error.message || error, + await new Promise((resolve, reject) => { + amqp.connect(connectionOptions, (error, connection) => { + if (error) { + this.logger.error(`Failed to connect to RabbitMQ (attempt #${this.reconnectAttempt + 1}): ${error.message}`); + reject(error); + return; + } + + this.amqpConnection = connection; + this.reconnectAttempt = 0; // Reset counter on successful connection + this.isReconnecting = false; + + connection.on('error', (err) => { + this.logger.error(`RabbitMQ connection error: ${err.message}`); + this.handleConnectionError(); }); - reject(error); - return; - } - // Connection event handlers - connection.on('error', (err: Error) => { - this.logger.error({ - local: 'RabbitmqController.connectionError', - message: 'RabbitMQ connection error', - error: err.message || err, + connection.on('close', () => { + this.logger.warn('RabbitMQ connection closed, attempting reconnection...'); + this.amqpConnection = null; + this.amqpChannel = null; + this.scheduleReconnect(); }); - this.handleConnectionLoss(); - }); - connection.on('close', () => { - this.logger.warn('RabbitMQ connection closed'); - this.handleConnectionLoss(); - }); + connection.createChannel((channelError, channel) => { + if (channelError) { + this.logger.error(`Failed to create RabbitMQ channel: ${channelError.message}`); + reject(channelError); + return; + } - connection.createChannel((channelError: Error, channel: amqp.Channel) => { - if (channelError) { - this.logger.error({ - local: 'RabbitmqController.createChannel', - message: 'Failed to create RabbitMQ channel', - error: channelError.message || channelError, + channel.on('error', (err) => { + this.logger.error(`RabbitMQ channel error: ${err.message}`); + this.handleChannelError(); }); - reject(channelError); - return; - } - // Channel event handlers - channel.on('error', (err: Error) => { - this.logger.error({ - local: 'RabbitmqController.channelError', - message: 'RabbitMQ channel error', - error: err.message || err, + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.amqpChannel = null; }); - this.handleConnectionLoss(); - }); - channel.on('close', () => { - this.logger.warn('RabbitMQ channel closed'); - this.handleConnectionLoss(); - }); + const exchangeName = rabbitmqExchangeName; - const exchangeName = rabbitmqExchangeName; + channel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - channel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + // Enable publisher confirms + channel.confirmSelect(); - this.amqpConnection = connection; - this.amqpChannel = channel; - this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection - this.isReconnecting = false; + this.amqpChannel = channel; - this.logger.info('AMQP initialized successfully'); + this.logger.info(`✅ RabbitMQ conectado com sucesso após ${this.reconnectAttempt > 0 ? this.reconnectAttempt + ' tentativas' : '1 tentativa'}`); - resolve(); - }); - }); - }) - .then(() => { - if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { - this.initGlobalQueues(); - } - }) - .catch((error) => { - this.logger.error({ - local: 'RabbitmqController.init', - message: 'Failed to initialize AMQP', - error: error.message || error, + resolve(); + }); }); - this.scheduleReconnect(); - throw error; }); - } - private handleConnectionLoss(): void { - if (this.isReconnecting) { - return; // Already attempting to reconnect + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { + await this.initGlobalQueues(); + } + } catch (error) { + this.isReconnecting = false; + this.logger.error(`Error initializing RabbitMQ (attempt #${this.reconnectAttempt + 1}): ${error.message}`); + this.scheduleReconnect(); } + } - this.cleanup(); + private handleConnectionError(): void { + this.amqpConnection = null; + this.amqpChannel = null; this.scheduleReconnect(); } + private handleChannelError(): void { + this.amqpChannel = null; + + if (this.amqpConnection) { + this.amqpConnection.createChannel((channelError, channel) => { + if (channelError) { + this.logger.error(`Failed to recreate channel: ${channelError.message}`); + return; + } + + channel.on('error', (err) => { + this.logger.error(`RabbitMQ channel error: ${err.message}`); + this.handleChannelError(); + }); + + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.amqpChannel = null; + }); + + // Enable publisher confirms on recreated channel + channel.confirmSelect(); + + this.amqpChannel = channel; + this.logger.info('RabbitMQ channel recreated successfully'); + }); + } + } + private scheduleReconnect(): void { - if (this.reconnectAttempts >= this.maxReconnectAttempts) { - this.logger.error( - `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, - ); + // Se está em processo de shutdown, não tentar reconectar + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown, parando tentativas de reconexão'); return; } - if (this.isReconnecting) { - return; // Already scheduled + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); } - this.isReconnecting = true; - this.reconnectAttempts++; + this.reconnectAttempt++; + + // Backoff exponencial: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, até max 5min + const delay = Math.min( + this.baseReconnectDelay * Math.pow(2, this.reconnectAttempt - 1), + this.maxReconnectDelay + ); - const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay + this.logger.info(`⏰ Agendando reconexão RabbitMQ em ${delay/1000}s (tentativa #${this.reconnectAttempt})...`); - this.logger.info( - `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, - ); + this.reconnectTimer = setTimeout(() => { + // Verificar novamente se não está em shutdown + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown durante reconnect timer'); + return; + } - setTimeout(async () => { + this.logger.info(`🔄 Tentando reconectar ao RabbitMQ (tentativa #${this.reconnectAttempt})...`); + this.connect().catch((error) => { + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown, interrompendo reconexão'); + return; + } + + this.logger.error(`❌ Falha na reconexão #${this.reconnectAttempt}: ${error.message}`); + // NUNCA desistir - sempre tentar novamente (exceto se em shutdown) + this.logger.info(`💪 NUNCA desisto! Reagendando próxima tentativa...`); + this.scheduleReconnect(); + }); + }, delay); + } + + public async shutdown(): Promise { + this.logger.info('🛑 Iniciando shutdown do RabbitMQ Controller...'); + + // Marcar como em processo de shutdown para parar tentativas de reconexão + this.isShuttingDown = true; + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + this.logger.info('⏹️ Timer de reconexão cancelado'); + } + + if (this.amqpChannel) { try { - this.logger.info( - `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, - ); - await this.connect(); - this.logger.info('Successfully reconnected to RabbitMQ'); + await new Promise((resolve, reject) => { + this.amqpChannel.close((err) => { + if (err) reject(err); + else resolve(); + }); + }); + this.amqpChannel = null; + this.logger.info('✅ RabbitMQ channel fechado graciosamente'); } catch (error) { - this.logger.error({ - local: 'RabbitmqController.scheduleReconnect', - message: `Reconnection attempt ${this.reconnectAttempts} failed`, - error: error.message || error, + this.logger.error(`❌ Erro ao fechar RabbitMQ channel: ${error.message}`); + } + } + + if (this.amqpConnection) { + try { + await new Promise((resolve, reject) => { + this.amqpConnection.close((err) => { + if (err) reject(err); + else resolve(); + }); }); - this.isReconnecting = false; - this.scheduleReconnect(); + this.amqpConnection = null; + this.logger.info('✅ RabbitMQ connection fechada graciosamente'); + } catch (error) { + this.logger.error(`❌ Erro ao fechar RabbitMQ connection: ${error.message}`); } - }, delay); + } + + this.logger.info('✅ Shutdown do RabbitMQ Controller concluído'); } private set channel(channel: amqp.Channel) { @@ -188,17 +260,6 @@ export class RabbitmqController extends EventController implements EventControll return this.amqpChannel; } - private async ensureConnection(): Promise { - if (!this.amqpChannel) { - this.logger.warn('AMQP channel is not available, attempting to reconnect...'); - if (!this.isReconnecting) { - this.scheduleReconnect(); - } - return false; - } - return true; - } - public async emit({ instanceName, origin, @@ -218,11 +279,6 @@ export class RabbitmqController extends EventController implements EventControll return; } - if (!(await this.ensureConnection())) { - this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`); - return; - } - const instanceRabbitmq = await this.get(instanceName); const rabbitmqLocal = instanceRabbitmq?.events; const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; @@ -248,8 +304,14 @@ export class RabbitmqController extends EventController implements EventControll let retry = 0; - while (retry < 3) { + while (true) { try { + if (!this.amqpChannel) { + this.logger.warn('RabbitMQ channel not available, waiting for reconnection...'); + await new Promise((resolve) => setTimeout(resolve, 1000)); + continue; + } + await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, @@ -269,7 +331,28 @@ export class RabbitmqController extends EventController implements EventControll await this.amqpChannel.bindQueue(queueName, exchangeName, eventName); - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + // Publish with confirmation + const published = this.amqpChannel.publish( + exchangeName, + event, + Buffer.from(JSON.stringify(message)), + { persistent: true } + ); + + if (!published) { + throw new Error('Message could not be published (buffer full)'); + } + + // Wait for confirmation + await new Promise((resolve, reject) => { + this.amqpChannel.waitForConfirms((err) => { + if (err) { + reject(new Error(`Message confirmation failed: ${err.message}`)); + } else { + resolve(); + } + }); + }); if (logEnabled) { const logData = { @@ -282,15 +365,14 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); retry++; - if (retry >= 3) { - this.handleConnectionLoss(); + this.logger.error(`RabbitMQ publish attempt ${retry} failed: ${error.message}`); + + if (!this.amqpChannel) { + this.scheduleReconnect(); } + + await new Promise((resolve) => setTimeout(resolve, Math.min(1000 * retry, 30000))); } } } @@ -301,8 +383,14 @@ export class RabbitmqController extends EventController implements EventControll let retry = 0; - while (retry < 3) { + while (true) { try { + if (!this.amqpChannel) { + this.logger.warn('RabbitMQ channel not available, waiting for reconnection...'); + await new Promise((resolve) => setTimeout(resolve, 1000)); + continue; + } + await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, @@ -322,7 +410,28 @@ export class RabbitmqController extends EventController implements EventControll await this.amqpChannel.bindQueue(queueName, exchangeName, event); - await this.amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message))); + // Publish with confirmation + const published = this.amqpChannel.publish( + exchangeName, + event, + Buffer.from(JSON.stringify(message)), + { persistent: true } + ); + + if (!published) { + throw new Error('Global message could not be published (buffer full)'); + } + + // Wait for confirmation + await new Promise((resolve, reject) => { + this.amqpChannel.waitForConfirms((err) => { + if (err) { + reject(new Error(`Global message confirmation failed: ${err.message}`)); + } else { + resolve(); + } + }); + }); if (logEnabled) { const logData = { @@ -335,15 +444,14 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); retry++; - if (retry >= 3) { - this.handleConnectionLoss(); + this.logger.error(`RabbitMQ global publish attempt ${retry} failed: ${error.message}`); + + if (!this.amqpChannel) { + this.scheduleReconnect(); } + + await new Promise((resolve) => setTimeout(resolve, Math.min(1000 * retry, 30000))); } } } @@ -352,11 +460,6 @@ export class RabbitmqController extends EventController implements EventControll private async initGlobalQueues(): Promise { this.logger.info('Initializing global queues'); - if (!(await this.ensureConnection())) { - this.logger.error('Cannot initialize global queues: No AMQP connection'); - return; - } - const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const events = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; @@ -366,6 +469,11 @@ export class RabbitmqController extends EventController implements EventControll return; } + if (!this.amqpChannel) { + this.logger.error('Cannot initialize global queues: RabbitMQ channel not available'); + return; + } + const eventKeys = Object.keys(events); for (const event of eventKeys) { @@ -393,37 +501,12 @@ export class RabbitmqController extends EventController implements EventControll await this.amqpChannel.bindQueue(queueName, exchangeName, event); - this.logger.info(`Global queue initialized: ${queueName}`); + this.logger.debug(`Global queue ${queueName} initialized successfully`); } catch (error) { - this.logger.error({ - local: 'RabbitmqController.initGlobalQueues', - message: `Failed to initialize global queue for event ${event}`, - error: error.message || error, - }); - this.handleConnectionLoss(); - break; + this.logger.error(`Failed to initialize global queue for event ${event}: ${error.message}`); } } - } - public async cleanup(): Promise { - try { - if (this.amqpChannel) { - await this.amqpChannel.close(); - this.amqpChannel = null; - } - if (this.amqpConnection) { - await this.amqpConnection.close(); - this.amqpConnection = null; - } - } catch (error) { - this.logger.warn({ - local: 'RabbitmqController.cleanup', - message: 'Error during cleanup', - error: error.message || error, - }); - this.amqpChannel = null; - this.amqpConnection = null; - } + this.logger.info('Global queues initialization completed'); } -} +} \ No newline at end of file