Skip to content

Commit 9c4f026

Browse files
committed
test
1 parent 434f39d commit 9c4f026

File tree

2 files changed

+63
-198
lines changed

2 files changed

+63
-198
lines changed

src/libs/amqp.server.ts

Lines changed: 30 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,16 @@
1-
import { Channel, connect } from 'amqplib/callback_api';
1+
import * as amqp from 'amqplib/callback_api';
22

3-
import { configService, HttpServer, Rabbitmq } from '../config/env.config';
3+
import { configService, Rabbitmq } from '../config/env.config';
44
import { Logger } from '../config/logger.config';
5-
import { Events } from '../whatsapp/types/wa.types';
65

76
const logger = new Logger('AMQP');
87

9-
const parseEvtName = (evt: string) => evt.replace(/_/g, '.').toLowerCase();
10-
11-
const globalQueues: { [key: string]: Events[] } = {
12-
contacts: [Events.CONTACTS_SET, Events.CONTACTS_UPDATE, Events.CONTACTS_UPSERT],
13-
messages: [
14-
Events.MESSAGES_DELETE,
15-
Events.MESSAGES_SET,
16-
Events.MESSAGES_UPDATE,
17-
Events.MESSAGES_UPSERT,
18-
Events.MESSAGING_HISTORY_SET,
19-
Events.SEND_MESSAGE,
20-
],
21-
chats: [Events.CHATS_DELETE, Events.CHATS_SET, Events.CHATS_UPDATE, Events.CHATS_UPSERT],
22-
groups: [Events.GROUPS_UPDATE, Events.GROUPS_UPSERT, Events.GROUP_PARTICIPANTS_UPDATE],
23-
others: [], // All other events not included in the above categories
24-
};
25-
26-
let amqpChannel: Channel | null = null;
8+
let amqpChannel: amqp.Channel | null = null;
279

2810
export const initAMQP = () => {
2911
return new Promise<void>((resolve, reject) => {
30-
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
31-
connect(rabbitConfig.URI, (error, connection) => {
12+
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
13+
amqp.connect(uri, (error, connection) => {
3214
if (error) {
3315
reject(error);
3416
return;
@@ -40,7 +22,7 @@ export const initAMQP = () => {
4022
return;
4123
}
4224

43-
const exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
25+
const exchangeName = 'evolution_exchange';
4426

4527
channel.assertExchange(exchangeName, 'topic', {
4628
durable: true,
@@ -56,46 +38,28 @@ export const initAMQP = () => {
5638
});
5739
};
5840

59-
export const getAMQP = (): Channel | null => {
41+
export const getAMQP = (): amqp.Channel | null => {
6042
return amqpChannel;
6143
};
6244

6345
export const initQueues = (instanceName: string, events: string[]) => {
64-
if (!instanceName || !events || !events.length) return;
65-
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
66-
const amqp = getAMQP();
46+
if (!events || !events.length) return;
6747

68-
const rabbitMode = rabbitConfig.MODE || 'isolated';
69-
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
48+
const queues = events.map((event) => {
49+
return `${event.replace(/_/g, '.').toLowerCase()}`;
50+
});
7051

71-
const receivedEvents = events.map(parseEvtName);
72-
if (rabbitMode === 'isolated') {
73-
exchangeName = instanceName ?? 'evolution_exchange';
52+
queues.forEach((event) => {
53+
const amqp = getAMQP();
54+
const exchangeName = instanceName ?? 'evolution_exchange';
7455

7556
amqp.assertExchange(exchangeName, 'topic', {
7657
durable: true,
7758
autoDelete: false,
7859
});
7960

80-
receivedEvents.forEach((event) => {
81-
const queueName = `${instanceName}.${event}`;
82-
amqp.assertQueue(queueName, {
83-
durable: true,
84-
autoDelete: false,
85-
arguments: {
86-
'x-queue-type': 'quorum',
87-
},
88-
});
89-
90-
amqp.bindQueue(queueName, exchangeName, event);
91-
});
92-
} else if (rabbitMode === 'single') {
93-
amqp.assertExchange(exchangeName, 'topic', {
94-
durable: true,
95-
autoDelete: false,
96-
});
61+
const queueName = `${instanceName}.${event}`;
9762

98-
const queueName = 'evolution';
9963
amqp.assertQueue(queueName, {
10064
durable: true,
10165
autoDelete: false,
@@ -104,157 +68,33 @@ export const initQueues = (instanceName: string, events: string[]) => {
10468
},
10569
});
10670

107-
receivedEvents.forEach((event) => {
108-
amqp.bindQueue(queueName, exchangeName, event);
109-
});
110-
} else if (rabbitMode === 'global') {
111-
const queues = Object.keys(globalQueues);
112-
113-
const addQueues = queues.filter((evt) => {
114-
if (evt === 'others') {
115-
return receivedEvents.some(
116-
(e) =>
117-
!Object.values(globalQueues)
118-
.flat()
119-
.includes(e as Events),
120-
);
121-
}
122-
return globalQueues[evt].some((e) => receivedEvents.includes(e));
123-
});
124-
125-
addQueues.forEach((event) => {
126-
amqp.assertExchange(exchangeName, 'topic', {
127-
durable: true,
128-
autoDelete: false,
129-
});
130-
131-
const queueName = event;
132-
amqp.assertQueue(queueName, {
133-
durable: true,
134-
autoDelete: false,
135-
arguments: {
136-
'x-queue-type': 'quorum',
137-
},
138-
});
139-
140-
if (globalQueues[event].length === 0) {
141-
// Other events
142-
const otherEvents = Object.values(globalQueues).flat();
143-
for (const subEvent in Events) {
144-
const eventCode = Events[subEvent];
145-
if (otherEvents.includes(eventCode)) continue;
146-
if (!receivedEvents.includes(eventCode)) continue;
147-
amqp.bindQueue(queueName, exchangeName, eventCode);
148-
}
149-
} else {
150-
globalQueues[event].forEach((subEvent) => {
151-
amqp.bindQueue(queueName, exchangeName, subEvent);
152-
});
153-
}
154-
});
155-
} else {
156-
throw new Error('Invalid RabbitMQ mode');
157-
}
71+
amqp.bindQueue(queueName, exchangeName, event);
72+
});
15873
};
15974

16075
export const removeQueues = (instanceName: string, events: string[]) => {
16176
if (!events || !events.length) return;
162-
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
163-
164-
const rabbitMode = rabbitConfig.MODE || 'isolated';
165-
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
166-
const amqp = getAMQP();
167-
168-
const receivedEvents = events.map(parseEvtName);
169-
if (rabbitMode === 'isolated') {
170-
exchangeName = instanceName;
171-
receivedEvents.forEach((event) => {
172-
amqp.assertExchange(exchangeName, 'topic', {
173-
durable: true,
174-
autoDelete: false,
175-
});
17677

177-
const queueName = `${instanceName}.${event}`;
178-
amqp.deleteQueue(queueName);
179-
});
180-
amqp.deleteExchange(instanceName);
181-
}
182-
};
183-
184-
interface SendEventData {
185-
instanceName: string;
186-
wuid: string;
187-
event: string;
188-
apiKey?: string;
189-
data: any;
190-
}
78+
const channel = getAMQP();
19179

192-
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
193-
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
194-
const rabbitMode = rabbitConfig.MODE || 'isolated';
195-
let exchangeName = rabbitConfig.EXCHANGE_NAME ?? 'evolution_exchange';
196-
if (rabbitMode === 'isolated') exchangeName = instanceName ?? 'evolution_exchange';
197-
198-
console.log('exchangeName: ', exchangeName);
199-
console.log('rabbitMode: ', rabbitMode);
200-
201-
amqpChannel.assertExchange(exchangeName, 'topic', {
202-
durable: true,
203-
autoDelete: false,
80+
const queues = events.map((event) => {
81+
return `${event.replace(/_/g, '.').toLowerCase()}`;
20482
});
20583

206-
let queueName = event;
84+
const exchangeName = instanceName ?? 'evolution_exchange';
20785

208-
if (rabbitMode === 'single') {
209-
queueName = 'evolution';
210-
} else if (rabbitMode === 'global') {
211-
let eventName = '';
86+
queues.forEach((event) => {
87+
const amqp = getAMQP();
21288

213-
Object.keys(globalQueues).forEach((key) => {
214-
if (globalQueues[key].includes(event as Events)) {
215-
eventName = key;
216-
}
217-
if (eventName === '' && key === 'others') {
218-
eventName = key;
219-
}
89+
amqp.assertExchange(exchangeName, 'topic', {
90+
durable: true,
91+
autoDelete: false,
22092
});
221-
queueName = eventName;
222-
} else if (rabbitMode === 'isolated') {
223-
queueName = `${instanceName}.${event}`;
224-
}
225-
226-
amqpChannel.assertQueue(queueName, {
227-
durable: true,
228-
autoDelete: false,
229-
arguments: { 'x-queue-type': 'quorum' },
230-
});
23193

232-
console.log('envia na fila: ', queueName, exchangeName, event);
94+
const queueName = `${instanceName}.${event}`;
23395

234-
amqpChannel.bindQueue(queueName, exchangeName, event);
235-
236-
const serverUrl = configService.get<HttpServer>('SERVER').URL;
237-
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
238-
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
239-
const now = localISOTime;
240-
241-
const message = {
242-
event,
243-
instance: instanceName,
244-
data,
245-
server_url: serverUrl,
246-
date_time: now,
247-
sender: wuid,
248-
};
249-
250-
if (apiKey) {
251-
message['apikey'] = apiKey;
252-
}
253-
254-
logger.log({
255-
queueName,
256-
exchangeName,
257-
event,
96+
amqp.deleteQueue(queueName);
25897
});
259-
amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
98+
99+
channel.deleteExchange(exchangeName);
260100
};

src/whatsapp/services/whatsapp.service.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import {
2020
import { Logger } from '../../config/logger.config';
2121
import { ROOT_DIR } from '../../config/path.config';
2222
import { NotFoundException } from '../../exceptions';
23-
import { getAMQP, removeQueues, sendEventData } from '../../libs/amqp.server';
23+
import { getAMQP, removeQueues } from '../../libs/amqp.server';
2424
import { getIO } from '../../libs/socket.server';
2525
import { getSQS, removeQueues as removeQueuesSQS } from '../../libs/sqs.server';
2626
import { ChamaaiRaw, IntegrationRaw, ProxyRaw, RabbitmqRaw, SettingsRaw, SqsRaw, TypebotRaw } from '../models';
@@ -685,15 +685,40 @@ export class WAStartupService {
685685

686686
if (amqp) {
687687
if (Array.isArray(rabbitmqLocal) && rabbitmqLocal.includes(we)) {
688-
console.log('envia na fila: ', we);
689-
sendEventData({
690-
data,
691-
event,
692-
instanceName: this.instanceName,
693-
wuid: this.wuid,
694-
apiKey: expose && instanceApikey ? instanceApikey : undefined,
688+
const exchangeName = this.instanceName ?? 'evolution_exchange';
689+
690+
amqp.assertExchange(exchangeName, 'topic', {
691+
durable: true,
692+
autoDelete: false,
693+
});
694+
695+
const queueName = `${this.instanceName}.${event}`;
696+
697+
amqp.assertQueue(queueName, {
698+
durable: true,
699+
autoDelete: false,
700+
arguments: {
701+
'x-queue-type': 'quorum',
702+
},
695703
});
696704

705+
amqp.bindQueue(queueName, exchangeName, event);
706+
707+
const message = {
708+
event,
709+
instance: this.instance.name,
710+
data,
711+
server_url: serverUrl,
712+
date_time: now,
713+
sender: this.wuid,
714+
};
715+
716+
if (expose && instanceApikey) {
717+
message['apikey'] = instanceApikey;
718+
}
719+
720+
amqp.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
721+
697722
if (this.configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
698723
const logData = {
699724
local: WAStartupService.name + '.sendData-RabbitMQ',

0 commit comments

Comments
 (0)