@@ -13,6 +13,7 @@ import {
1313 Database ,
1414 HttpServer ,
1515 Log ,
16+ Rabbitmq ,
1617 Sqs ,
1718 Webhook ,
1819 Websocket ,
@@ -688,6 +689,9 @@ export class ChannelStartupService {
688689 const rabbitmqLocal = this . localRabbitmq . events ;
689690 const sqsLocal = this . localSqs . events ;
690691 const serverUrl = this . configService . get < HttpServer > ( 'SERVER' ) . URL ;
692+ const rabbitmqEnabled = this . configService . get < Rabbitmq > ( 'RABBITMQ' ) . ENABLED ;
693+ const rabbitmqGlobal = this . configService . get < Rabbitmq > ( 'RABBITMQ' ) . GLOBAL_ENABLED ;
694+ const rabbitmqEvents = this . configService . get < Rabbitmq > ( 'RABBITMQ' ) . EVENTS ;
691695 const we = event . replace ( / [ . - ] / gm, '_' ) . toUpperCase ( ) ;
692696 const transformedWe = we . replace ( / _ / gm, '-' ) . toLowerCase ( ) ;
693697 const tzoffset = new Date ( ) . getTimezoneOffset ( ) * 60000 ; //offset in milliseconds
@@ -698,67 +702,134 @@ export class ChannelStartupService {
698702 const tokenStore = await this . repository . auth . find ( this . instanceName ) ;
699703 const instanceApikey = tokenStore ?. apikey || 'Apikey not found' ;
700704
701- if ( this . localRabbitmq . enabled ) {
705+ if ( rabbitmqEnabled ) {
702706 const amqp = getAMQP ( ) ;
703-
704- if ( amqp ) {
707+ if ( this . localRabbitmq . enabled && amqp ) {
705708 if ( Array . isArray ( rabbitmqLocal ) && rabbitmqLocal . includes ( we ) ) {
706709 const exchangeName = this . instanceName ?? 'evolution_exchange' ;
707710
708- // await amqp.assertExchange(exchangeName, 'topic', {
709- // durable: true,
710- // autoDelete: false,
711- // });
711+ let retry = 0 ;
712712
713- await this . assertExchangeAsync ( amqp , exchangeName , 'topic' , {
714- durable : true ,
715- autoDelete : false ,
716- } ) ;
713+ while ( retry < 3 ) {
714+ try {
715+ await amqp . assertExchange ( exchangeName , 'topic' , {
716+ durable : true ,
717+ autoDelete : false ,
718+ } ) ;
717719
718- const queueName = `${ this . instanceName } .${ event } ` ;
720+ const queueName = `${ this . instanceName } .${ event } ` ;
719721
720- await amqp . assertQueue ( queueName , {
721- durable : true ,
722- autoDelete : false ,
723- arguments : {
724- 'x-queue-type' : 'quorum' ,
725- } ,
726- } ) ;
722+ await amqp . assertQueue ( queueName , {
723+ durable : true ,
724+ autoDelete : false ,
725+ arguments : {
726+ 'x-queue-type' : 'quorum' ,
727+ } ,
728+ } ) ;
727729
728- await amqp . bindQueue ( queueName , exchangeName , event ) ;
730+ await amqp . bindQueue ( queueName , exchangeName , event ) ;
729731
730- const message = {
731- event,
732- instance : this . instance . name ,
733- data,
734- server_url : serverUrl ,
735- date_time : now ,
736- sender : this . wuid ,
737- } ;
732+ const message = {
733+ event,
734+ instance : this . instance . name ,
735+ data,
736+ server_url : serverUrl ,
737+ date_time : now ,
738+ sender : this . wuid ,
739+ } ;
738740
739- if ( expose && instanceApikey ) {
740- message [ 'apikey' ] = instanceApikey ;
741+ if ( expose && instanceApikey ) {
742+ message [ 'apikey' ] = instanceApikey ;
743+ }
744+
745+ await amqp . publish ( exchangeName , event , Buffer . from ( JSON . stringify ( message ) ) ) ;
746+
747+ if ( this . configService . get < Log > ( 'LOG' ) . LEVEL . includes ( 'WEBHOOKS' ) ) {
748+ const logData = {
749+ local : ChannelStartupService . name + '.sendData-RabbitMQ' ,
750+ event,
751+ instance : this . instance . name ,
752+ data,
753+ server_url : serverUrl ,
754+ apikey : ( expose && instanceApikey ) || null ,
755+ date_time : now ,
756+ sender : this . wuid ,
757+ } ;
758+
759+ if ( expose && instanceApikey ) {
760+ logData [ 'apikey' ] = instanceApikey ;
761+ }
762+
763+ this . logger . log ( logData ) ;
764+ }
765+ break ;
766+ } catch ( error ) {
767+ retry ++ ;
768+ }
741769 }
770+ }
771+ }
772+
773+ if ( rabbitmqGlobal && rabbitmqEvents [ we ] && amqp ) {
774+ const exchangeName = 'evolution_exchange' ;
775+
776+ let retry = 0 ;
777+
778+ while ( retry < 3 ) {
779+ try {
780+ await amqp . assertExchange ( exchangeName , 'topic' , {
781+ durable : true ,
782+ autoDelete : false ,
783+ } ) ;
784+
785+ const queueName = transformedWe ;
742786
743- await amqp . publish ( exchangeName , event , Buffer . from ( JSON . stringify ( message ) ) ) ;
787+ await amqp . assertQueue ( queueName , {
788+ durable : true ,
789+ autoDelete : false ,
790+ arguments : {
791+ 'x-queue-type' : 'quorum' ,
792+ } ,
793+ } ) ;
744794
745- if ( this . configService . get < Log > ( 'LOG' ) . LEVEL . includes ( 'WEBHOOKS' ) ) {
746- const logData = {
747- local : ChannelStartupService . name + '.sendData-RabbitMQ' ,
795+ await amqp . bindQueue ( queueName , exchangeName , event ) ;
796+
797+ const message = {
748798 event,
749799 instance : this . instance . name ,
750800 data,
751801 server_url : serverUrl ,
752- apikey : ( expose && instanceApikey ) || null ,
753802 date_time : now ,
754803 sender : this . wuid ,
755804 } ;
756805
757806 if ( expose && instanceApikey ) {
758- logData [ 'apikey' ] = instanceApikey ;
807+ message [ 'apikey' ] = instanceApikey ;
808+ }
809+ await amqp . publish ( exchangeName , event , Buffer . from ( JSON . stringify ( message ) ) ) ;
810+
811+ if ( this . configService . get < Log > ( 'LOG' ) . LEVEL . includes ( 'WEBHOOKS' ) ) {
812+ const logData = {
813+ local : ChannelStartupService . name + '.sendData-RabbitMQ-Global' ,
814+ event,
815+ instance : this . instance . name ,
816+ data,
817+ server_url : serverUrl ,
818+ apikey : ( expose && instanceApikey ) || null ,
819+ date_time : now ,
820+ sender : this . wuid ,
821+ } ;
822+
823+ if ( expose && instanceApikey ) {
824+ logData [ 'apikey' ] = instanceApikey ;
825+ }
826+
827+ this . logger . log ( logData ) ;
759828 }
760829
761- this . logger . log ( logData ) ;
830+ break ;
831+ } catch ( error ) {
832+ retry ++ ;
762833 }
763834 }
764835 }
0 commit comments