diff --git a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts index 3295b12de..60563c75b 100644 --- a/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts +++ b/src/api/integrations/event/rabbitmq/rabbitmq.controller.ts @@ -9,11 +9,13 @@ import { EmitData, EventController, EventControllerInterface } from '../event.co export class RabbitmqController extends EventController implements EventControllerInterface { public amqpChannel: amqp.Channel | null = null; private amqpConnection: amqp.Connection | null = null; + private reconnectTimer: NodeJS.Timeout | null = null; + private reconnectAttempt: number = 0; + private maxReconnectDelay: number = 300000; // 5 minutos máximo + private baseReconnectDelay: number = 1000; // 1 segundo inicial + private isReconnecting: boolean = false; + private isShuttingDown: boolean = false; private readonly logger = new Logger('RabbitmqController'); - private reconnectAttempts = 0; - private maxReconnectAttempts = 10; - private reconnectDelay = 5000; // 5 seconds - private isReconnecting = false; constructor(prismaRepository: PrismaRepository, waMonitor: WAMonitoringService) { super(prismaRepository, waMonitor, configService.get('RABBITMQ')?.ENABLED, 'rabbitmq'); @@ -24,15 +26,24 @@ export class RabbitmqController extends EventController implements EventControll return; } - await this.connect(); + return this.connect(); } private async connect(): Promise { - return new Promise((resolve, reject) => { + if (this.isReconnecting) { + return; + } + + this.isReconnecting = true; + + try { const uri = configService.get('RABBITMQ').URI; const frameMax = configService.get('RABBITMQ').FRAME_MAX; + const heartbeat = configService.get('RABBITMQ').HEARTBEAT; const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; + this.logger.info(`Tentativa de conexão RabbitMQ #${this.reconnectAttempt + 1}...`); + const url = new URL(uri); const connectionOptions = { protocol: url.protocol.slice(0, -1), @@ -42,142 +53,198 @@ export class RabbitmqController extends EventController implements EventControll password: url.password || 'guest', vhost: url.pathname.slice(1) || '/', frameMax: frameMax, - heartbeat: 30, // Add heartbeat of 30 seconds + heartbeat: heartbeat, + reconnect: true, + reconnectBackoffStrategy: 'linear', + reconnectExponentialLimit: 120000, + reconnectTimeInSeconds: 5, }; - amqp.connect(connectionOptions, (error: Error, connection: amqp.Connection) => { - if (error) { - this.logger.error({ - local: 'RabbitmqController.connect', - message: 'Failed to connect to RabbitMQ', - error: error.message || error, + await new Promise((resolve, reject) => { + amqp.connect(connectionOptions, (error, connection) => { + if (error) { + this.logger.error(`Failed to connect to RabbitMQ (attempt #${this.reconnectAttempt + 1}): ${error.message}`); + reject(error); + return; + } + + this.amqpConnection = connection; + this.reconnectAttempt = 0; // Reset counter on successful connection + this.isReconnecting = false; + + connection.on('error', (err) => { + this.logger.error(`RabbitMQ connection error: ${err.message}`); + this.handleConnectionError(); }); - reject(error); - return; - } - // Connection event handlers - connection.on('error', (err: Error) => { - this.logger.error({ - local: 'RabbitmqController.connectionError', - message: 'RabbitMQ connection error', - error: err.message || err, + connection.on('close', () => { + this.logger.warn('RabbitMQ connection closed, attempting reconnection...'); + this.amqpConnection = null; + this.amqpChannel = null; + this.scheduleReconnect(); }); - this.handleConnectionLoss(); - }); - connection.on('close', () => { - this.logger.warn('RabbitMQ connection closed'); - this.handleConnectionLoss(); - }); + connection.createChannel((channelError, channel) => { + if (channelError) { + this.logger.error(`Failed to create RabbitMQ channel: ${channelError.message}`); + reject(channelError); + return; + } - connection.createChannel((channelError: Error, channel: amqp.Channel) => { - if (channelError) { - this.logger.error({ - local: 'RabbitmqController.createChannel', - message: 'Failed to create RabbitMQ channel', - error: channelError.message || channelError, + channel.on('error', (err) => { + this.logger.error(`RabbitMQ channel error: ${err.message}`); + this.handleChannelError(); }); - reject(channelError); - return; - } - // Channel event handlers - channel.on('error', (err: Error) => { - this.logger.error({ - local: 'RabbitmqController.channelError', - message: 'RabbitMQ channel error', - error: err.message || err, + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.amqpChannel = null; }); - this.handleConnectionLoss(); - }); - channel.on('close', () => { - this.logger.warn('RabbitMQ channel closed'); - this.handleConnectionLoss(); - }); + const exchangeName = rabbitmqExchangeName; - const exchangeName = rabbitmqExchangeName; - - channel.assertExchange(exchangeName, 'topic', { - durable: true, - autoDelete: false, - }); + channel.assertExchange(exchangeName, 'topic', { + durable: true, + autoDelete: false, + }); - this.amqpConnection = connection; - this.amqpChannel = channel; - this.reconnectAttempts = 0; // Reset reconnect attempts on successful connection - this.isReconnecting = false; + this.amqpChannel = channel; - this.logger.info('AMQP initialized successfully'); + this.logger.info(`✅ RabbitMQ conectado com sucesso após ${this.reconnectAttempt > 0 ? this.reconnectAttempt + ' tentativas' : '1 tentativa'}`); - resolve(); - }); - }); - }) - .then(() => { - if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { - this.initGlobalQueues(); - } - }) - .catch((error) => { - this.logger.error({ - local: 'RabbitmqController.init', - message: 'Failed to initialize AMQP', - error: error.message || error, + resolve(); + }); }); - this.scheduleReconnect(); - throw error; }); - } - private handleConnectionLoss(): void { - if (this.isReconnecting) { - return; // Already attempting to reconnect + if (configService.get('RABBITMQ')?.GLOBAL_ENABLED) { + await this.initGlobalQueues(); + } + } catch (error) { + this.isReconnecting = false; + this.logger.error(`Error initializing RabbitMQ (attempt #${this.reconnectAttempt + 1}): ${error.message}`); + this.scheduleReconnect(); } + } - this.cleanup(); + private handleConnectionError(): void { + this.amqpConnection = null; + this.amqpChannel = null; this.scheduleReconnect(); } + private handleChannelError(): void { + this.amqpChannel = null; + + if (this.amqpConnection) { + this.amqpConnection.createChannel((channelError, channel) => { + if (channelError) { + this.logger.error(`Failed to recreate channel: ${channelError.message}`); + return; + } + + channel.on('error', (err) => { + this.logger.error(`RabbitMQ channel error: ${err.message}`); + this.handleChannelError(); + }); + + channel.on('close', () => { + this.logger.warn('RabbitMQ channel closed'); + this.amqpChannel = null; + }); + + this.amqpChannel = channel; + this.logger.info('RabbitMQ channel recreated successfully'); + }); + } + } + private scheduleReconnect(): void { - if (this.reconnectAttempts >= this.maxReconnectAttempts) { - this.logger.error( - `Maximum reconnect attempts (${this.maxReconnectAttempts}) reached. Stopping reconnection attempts.`, - ); + // Se está em processo de shutdown, não tentar reconectar + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown, parando tentativas de reconexão'); return; } - if (this.isReconnecting) { - return; // Already scheduled + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); } - this.isReconnecting = true; - this.reconnectAttempts++; + this.reconnectAttempt++; + + // Backoff exponencial: 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, até max 5min + const delay = Math.min( + this.baseReconnectDelay * Math.pow(2, this.reconnectAttempt - 1), + this.maxReconnectDelay + ); - const delay = this.reconnectDelay * Math.pow(2, Math.min(this.reconnectAttempts - 1, 5)); // Exponential backoff with max delay + this.logger.info(`⏰ Agendando reconexão RabbitMQ em ${delay/1000}s (tentativa #${this.reconnectAttempt})...`); - this.logger.info( - `Scheduling RabbitMQ reconnection attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`, - ); + this.reconnectTimer = setTimeout(() => { + // Verificar novamente se não está em shutdown + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown durante reconnect timer'); + return; + } - setTimeout(async () => { + this.logger.info(`🔄 Tentando reconectar ao RabbitMQ (tentativa #${this.reconnectAttempt})...`); + this.connect().catch((error) => { + if (this.isShuttingDown) { + this.logger.info('🛑 Sistema em shutdown, interrompendo reconexão'); + return; + } + + this.logger.error(`❌ Falha na reconexão #${this.reconnectAttempt}: ${error.message}`); + // NUNCA desistir - sempre tentar novamente (exceto se em shutdown) + this.logger.info(`💪 NUNCA desisto! Reagendando próxima tentativa...`); + this.scheduleReconnect(); + }); + }, delay); + } + + public async shutdown(): Promise { + this.logger.info('🛑 Iniciando shutdown do RabbitMQ Controller...'); + + // Marcar como em processo de shutdown para parar tentativas de reconexão + this.isShuttingDown = true; + + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + this.logger.info('⏹️ Timer de reconexão cancelado'); + } + + if (this.amqpChannel) { try { - this.logger.info( - `Attempting to reconnect to RabbitMQ (attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts})`, - ); - await this.connect(); - this.logger.info('Successfully reconnected to RabbitMQ'); + await new Promise((resolve, reject) => { + this.amqpChannel.close((err) => { + if (err) reject(err); + else resolve(); + }); + }); + this.amqpChannel = null; + this.logger.info('✅ RabbitMQ channel fechado graciosamente'); } catch (error) { - this.logger.error({ - local: 'RabbitmqController.scheduleReconnect', - message: `Reconnection attempt ${this.reconnectAttempts} failed`, - error: error.message || error, + this.logger.error(`❌ Erro ao fechar RabbitMQ channel: ${error.message}`); + } + } + + if (this.amqpConnection) { + try { + await new Promise((resolve, reject) => { + this.amqpConnection.close((err) => { + if (err) reject(err); + else resolve(); + }); }); - this.isReconnecting = false; - this.scheduleReconnect(); + this.amqpConnection = null; + this.logger.info('✅ RabbitMQ connection fechada graciosamente'); + } catch (error) { + this.logger.error(`❌ Erro ao fechar RabbitMQ connection: ${error.message}`); } - }, delay); + } + + this.logger.info('✅ Shutdown do RabbitMQ Controller concluído'); } private set channel(channel: amqp.Channel) { @@ -188,17 +255,6 @@ export class RabbitmqController extends EventController implements EventControll return this.amqpChannel; } - private async ensureConnection(): Promise { - if (!this.amqpChannel) { - this.logger.warn('AMQP channel is not available, attempting to reconnect...'); - if (!this.isReconnecting) { - this.scheduleReconnect(); - } - return false; - } - return true; - } - public async emit({ instanceName, origin, @@ -218,11 +274,6 @@ export class RabbitmqController extends EventController implements EventControll return; } - if (!(await this.ensureConnection())) { - this.logger.warn(`Failed to emit event ${event} for instance ${instanceName}: No AMQP connection`); - return; - } - const instanceRabbitmq = await this.get(instanceName); const rabbitmqLocal = instanceRabbitmq?.events; const rabbitmqGlobal = configService.get('RABBITMQ').GLOBAL_ENABLED; @@ -250,6 +301,11 @@ export class RabbitmqController extends EventController implements EventControll while (retry < 3) { try { + if (!this.amqpChannel) { + this.logger.warn('RabbitMQ channel not available, skipping message'); + break; + } + await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, @@ -282,14 +338,16 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing local RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); retry++; + this.logger.error(`RabbitMQ publish attempt ${retry} failed: ${error.message}`); + if (retry >= 3) { - this.handleConnectionLoss(); + this.logger.error('Max retry attempts reached for RabbitMQ publish'); + if (!this.amqpChannel) { + this.scheduleReconnect(); + } + } else { + await new Promise((resolve) => setTimeout(resolve, 1000 * retry)); } } } @@ -303,6 +361,11 @@ export class RabbitmqController extends EventController implements EventControll while (retry < 3) { try { + if (!this.amqpChannel) { + this.logger.warn('RabbitMQ channel not available, skipping global message'); + break; + } + await this.amqpChannel.assertExchange(exchangeName, 'topic', { durable: true, autoDelete: false, @@ -335,14 +398,16 @@ export class RabbitmqController extends EventController implements EventControll break; } catch (error) { - this.logger.error({ - local: 'RabbitmqController.emit', - message: `Error publishing global RabbitMQ message (attempt ${retry + 1}/3)`, - error: error.message || error, - }); retry++; + this.logger.error(`RabbitMQ global publish attempt ${retry} failed: ${error.message}`); + if (retry >= 3) { - this.handleConnectionLoss(); + this.logger.error('Max retry attempts reached for RabbitMQ global publish'); + if (!this.amqpChannel) { + this.scheduleReconnect(); + } + } else { + await new Promise((resolve) => setTimeout(resolve, 1000 * retry)); } } } @@ -352,11 +417,6 @@ export class RabbitmqController extends EventController implements EventControll private async initGlobalQueues(): Promise { this.logger.info('Initializing global queues'); - if (!(await this.ensureConnection())) { - this.logger.error('Cannot initialize global queues: No AMQP connection'); - return; - } - const rabbitmqExchangeName = configService.get('RABBITMQ').EXCHANGE_NAME; const events = configService.get('RABBITMQ').EVENTS; const prefixKey = configService.get('RABBITMQ').PREFIX_KEY; @@ -366,6 +426,11 @@ export class RabbitmqController extends EventController implements EventControll return; } + if (!this.amqpChannel) { + this.logger.error('Cannot initialize global queues: RabbitMQ channel not available'); + return; + } + const eventKeys = Object.keys(events); for (const event of eventKeys) { @@ -393,37 +458,12 @@ export class RabbitmqController extends EventController implements EventControll await this.amqpChannel.bindQueue(queueName, exchangeName, event); - this.logger.info(`Global queue initialized: ${queueName}`); + this.logger.debug(`Global queue ${queueName} initialized successfully`); } catch (error) { - this.logger.error({ - local: 'RabbitmqController.initGlobalQueues', - message: `Failed to initialize global queue for event ${event}`, - error: error.message || error, - }); - this.handleConnectionLoss(); - break; + this.logger.error(`Failed to initialize global queue for event ${event}: ${error.message}`); } } - } - public async cleanup(): Promise { - try { - if (this.amqpChannel) { - await this.amqpChannel.close(); - this.amqpChannel = null; - } - if (this.amqpConnection) { - await this.amqpConnection.close(); - this.amqpConnection = null; - } - } catch (error) { - this.logger.warn({ - local: 'RabbitmqController.cleanup', - message: 'Error during cleanup', - error: error.message || error, - }); - this.amqpChannel = null; - this.amqpConnection = null; - } + this.logger.info('Global queues initialization completed'); } } diff --git a/src/config/env.config.ts b/src/config/env.config.ts index c59acd382..616649458 100644 --- a/src/config/env.config.ts +++ b/src/config/env.config.ts @@ -96,6 +96,7 @@ export type Rabbitmq = { ENABLED: boolean; URI: string; FRAME_MAX: number; + HEARTBEAT: number; EXCHANGE_NAME: string; GLOBAL_ENABLED: boolean; EVENTS: EventsRabbitmq; @@ -395,6 +396,7 @@ export class ConfigService { EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange', URI: process.env.RABBITMQ_URI || '', FRAME_MAX: Number.parseInt(process.env.RABBITMQ_FRAME_MAX) || 8192, + HEARTBEAT: Number.parseInt(process.env.RABBITMQ_HEARTBEAT) || 60, EVENTS: { APPLICATION_STARTUP: process.env?.RABBITMQ_EVENTS_APPLICATION_STARTUP === 'true', INSTANCE_CREATE: process.env?.RABBITMQ_EVENTS_INSTANCE_CREATE === 'true',