Skip to content

Commit b04ed66

Browse files
Merge pull request #451 from judsonjuniorr/feat/rabbit-per-events
RabbitMQ improvements
2 parents f5df8bc + e27e990 commit b04ed66

File tree

10 files changed

+202
-82
lines changed

10 files changed

+202
-82
lines changed

Docker/.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ REDIS_URI=redis://redis:6379
4848
REDIS_PREFIX_KEY=evdocker
4949

5050
RABBITMQ_ENABLED=false
51+
RABBITMQ_RABBITMQ_MODE=global
52+
RABBITMQ_EXCHANGE_NAME=evolution_exchange
5153
RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
5254

5355
WEBSOCKET_ENABLED=false

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
FROM node:20.7.0-alpine AS builder
22

3-
LABEL version="1.6.2" description="Api to control whatsapp features through http requests."
3+
LABEL version="1.7.0" description="Api to control whatsapp features through http requests."
44
LABEL maintainer="Davidson Gomes" git="https://github.com/DavidsonGomes"
55
LABEL contact="contato@agenciadgcode.com"
66

@@ -63,6 +63,8 @@ ENV REDIS_URI=redis://redis:6379
6363
ENV REDIS_PREFIX_KEY=evolution
6464

6565
ENV RABBITMQ_ENABLED=false
66+
ENV RABBITMQ_MODE=global
67+
ENV RABBITMQ_EXCHANGE_NAME=evolution_exchange
6668
ENV RABBITMQ_URI=amqp://guest:guest@rabbitmq:5672
6769

6870
ENV WEBSOCKET_ENABLED=false

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "evolution-api",
3-
"version": "1.6.2",
3+
"version": "1.7.0",
44
"description": "Rest api for communication with WhatsApp",
55
"main": "./dist/src/main.js",
66
"scripts": {
@@ -90,6 +90,7 @@
9090
"yamljs": "^0.3.0"
9191
},
9292
"devDependencies": {
93+
"@types/amqplib": "^0.10.5",
9394
"@types/compression": "^1.7.2",
9495
"@types/cors": "^2.8.13",
9596
"@types/express": "^4.17.17",

src/config/env.config.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ export type Redis = {
7171

7272
export type Rabbitmq = {
7373
ENABLED: boolean;
74+
MODE: 'isolated' | 'global' | 'single';
75+
EXCHANGE_NAME: string; // available for global and single, isolated mode will use instance name as exchange
7476
URI: string;
7577
};
7678

@@ -283,6 +285,8 @@ export class ConfigService {
283285
},
284286
RABBITMQ: {
285287
ENABLED: process.env?.RABBITMQ_ENABLED === 'true',
288+
MODE: (process.env?.RABBITMQ_MODE as Rabbitmq['MODE']) || 'isolated',
289+
EXCHANGE_NAME: process.env?.RABBITMQ_EXCHANGE_NAME || 'evolution_exchange',
286290
URI: process.env.RABBITMQ_URI || '',
287291
},
288292
SQS: {

src/dev-env.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ REDIS:
8484

8585
RABBITMQ:
8686
ENABLED: false
87+
MODE: "global"
88+
EXCHANGE_NAME: "evolution_exchange"
8789
URI: "amqp://guest:guest@localhost:5672"
8890

8991
SQS:

src/docs/swagger.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ info:
2525
</font>
2626
2727
[![Run in Postman](https://run.pstmn.io/button.svg)](https://god.gw.postman.com/run-collection/26869335-5546d063-156b-4529-915f-909dd628c090?action=collection%2Ffork&source=rip_markdown&collection-url=entityId%3D26869335-5546d063-156b-4529-915f-909dd628c090%26entityType%3Dcollection%26workspaceId%3D339a4ee7-378b-45c9-b5b8-fd2c0a9c2442)
28-
version: 1.6.2
28+
version: 1.7.0
2929
contact:
3030
name: DavidsonGomes
3131
email: contato@agenciadgcode.com

src/libs/amqp.server.ts

Lines changed: 180 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,34 @@
1-
import * as amqp from 'amqplib/callback_api';
1+
import { Channel, connect } from 'amqplib/callback_api';
22

3-
import { configService, Rabbitmq } from '../config/env.config';
3+
import { configService, HttpServer, Rabbitmq } from '../config/env.config';
44
import { Logger } from '../config/logger.config';
5+
import { Events } from '../whatsapp/types/wa.types';
56

67
const logger = new Logger('AMQP');
78

8-
let amqpChannel: amqp.Channel | null = null;
9+
const parseEvtName = (evt: string) => evt.replace(/_/g, '.').toLowerCase();
10+
11+
const globalQueues: { [key: string]: Events[] } = {
12+
contacts: [Events.CONTACTS_SET, Events.CONTACTS_UPDATE, Events.CONTACTS_UPSERT],
13+
messages: [
14+
Events.MESSAGES_DELETE,
15+
Events.MESSAGES_SET,
16+
Events.MESSAGES_UPDATE,
17+
Events.MESSAGES_UPSERT,
18+
Events.MESSAGING_HISTORY_SET,
19+
Events.SEND_MESSAGE,
20+
],
21+
chats: [Events.CHATS_DELETE, Events.CHATS_SET, Events.CHATS_UPDATE, Events.CHATS_UPSERT],
22+
groups: [Events.GROUPS_UPDATE, Events.GROUPS_UPSERT, Events.GROUP_PARTICIPANTS_UPDATE],
23+
others: [], // All other events not included in the above categories
24+
};
25+
26+
let amqpChannel: Channel | null = null;
927

1028
export const initAMQP = () => {
1129
return new Promise<void>((resolve, reject) => {
12-
const uri = configService.get<Rabbitmq>('RABBITMQ').URI;
13-
amqp.connect(uri, (error, connection) => {
30+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
31+
connect(rabbitConfig.URI, (error, connection) => {
1432
if (error) {
1533
reject(error);
1634
return;
@@ -22,12 +40,9 @@ export const initAMQP = () => {
2240
return;
2341
}
2442

25-
const exchangeName = 'evolution_exchange';
26-
27-
channel.assertExchange(exchangeName, 'topic', {
43+
channel.assertExchange(rabbitConfig.EXCHANGE_NAME, 'topic', {
2844
durable: true,
2945
autoDelete: false,
30-
assert: true,
3146
});
3247

3348
amqpChannel = channel;
@@ -39,65 +54,190 @@ export const initAMQP = () => {
3954
});
4055
};
4156

42-
export const getAMQP = (): amqp.Channel | null => {
57+
export const getAMQP = (): Channel | null => {
4358
return amqpChannel;
4459
};
4560

4661
export const initQueues = (instanceName: string, events: string[]) => {
4762
if (!instanceName || !events || !events.length) return;
63+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
64+
const TWO_DAYS_IN_MS = 2 * 24 * 60 * 60 * 1000;
65+
const amqp = getAMQP();
4866

49-
const queues = events.map((event) => {
50-
return `${event.replace(/_/g, '.').toLowerCase()}`;
51-
});
67+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
68+
69+
const receivedEvents = events.map(parseEvtName);
70+
if (rabbitConfig.MODE === 'isolated') {
71+
exchangeName = instanceName;
5272

53-
queues.forEach((event) => {
54-
const amqp = getAMQP();
55-
const exchangeName = instanceName ?? 'evolution_exchange';
73+
receivedEvents.forEach((event) => {
74+
amqp.assertExchange(exchangeName, 'topic', {
75+
durable: true,
76+
autoDelete: false,
77+
});
78+
79+
const queueName = `${instanceName}.${event}`;
80+
amqp.assertQueue(queueName, {
81+
durable: true,
82+
autoDelete: false,
83+
messageTtl: TWO_DAYS_IN_MS,
84+
arguments: {
85+
'x-queue-type': 'quorum',
86+
},
87+
});
5688

89+
amqp.bindQueue(queueName, exchangeName, event);
90+
});
91+
} else if (rabbitConfig.MODE === 'single') {
5792
amqp.assertExchange(exchangeName, 'topic', {
5893
durable: true,
5994
autoDelete: false,
60-
assert: true,
6195
});
6296

63-
const queueName = `${instanceName}.${event}`;
64-
97+
const queueName = 'evolution';
6598
amqp.assertQueue(queueName, {
6699
durable: true,
67100
autoDelete: false,
101+
messageTtl: TWO_DAYS_IN_MS,
68102
arguments: {
69103
'x-queue-type': 'quorum',
70104
},
71105
});
72106

73-
amqp.bindQueue(queueName, exchangeName, event);
74-
});
107+
receivedEvents.forEach((event) => {
108+
amqp.bindQueue(queueName, exchangeName, event);
109+
});
110+
} else if (rabbitConfig.MODE === 'global') {
111+
const queues = Object.keys(globalQueues);
112+
113+
const addQueues = queues.filter((evt) => {
114+
if (evt === 'others') {
115+
return receivedEvents.some(
116+
(e) =>
117+
!Object.values(globalQueues)
118+
.flat()
119+
.includes(e as Events),
120+
);
121+
}
122+
return globalQueues[evt].some((e) => receivedEvents.includes(e));
123+
});
124+
125+
addQueues.forEach((event) => {
126+
amqp.assertExchange(exchangeName, 'topic', {
127+
durable: true,
128+
autoDelete: false,
129+
});
130+
131+
const queueName = event;
132+
amqp.assertQueue(queueName, {
133+
durable: true,
134+
autoDelete: false,
135+
messageTtl: TWO_DAYS_IN_MS,
136+
arguments: {
137+
'x-queue-type': 'quorum',
138+
},
139+
});
140+
141+
if (globalQueues[event].length === 0) {
142+
// Other events
143+
const otherEvents = Object.values(globalQueues).flat();
144+
for (const subEvent in Events) {
145+
const eventCode = Events[subEvent];
146+
if (otherEvents.includes(eventCode)) continue;
147+
if (!receivedEvents.includes(eventCode)) continue;
148+
amqp.bindQueue(queueName, exchangeName, eventCode);
149+
}
150+
} else {
151+
globalQueues[event].forEach((subEvent) => {
152+
amqp.bindQueue(queueName, exchangeName, subEvent);
153+
});
154+
}
155+
});
156+
} else {
157+
throw new Error('Invalid RabbitMQ mode');
158+
}
75159
};
76160

77161
export const removeQueues = (instanceName: string, events: string[]) => {
78162
if (!events || !events.length) return;
163+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
164+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
165+
const amqp = getAMQP();
166+
167+
const receivedEvents = events.map(parseEvtName);
168+
if (rabbitConfig.MODE === 'isolated') {
169+
exchangeName = instanceName;
170+
receivedEvents.forEach((event) => {
171+
amqp.assertExchange(exchangeName, 'topic', {
172+
durable: true,
173+
autoDelete: false,
174+
});
79175

80-
const channel = getAMQP();
176+
const queueName = `${instanceName}.${event}`;
177+
amqp.deleteQueue(queueName);
178+
});
179+
amqp.deleteExchange(instanceName);
180+
}
181+
};
81182

82-
const queues = events.map((event) => {
83-
return `${event.replace(/_/g, '.').toLowerCase()}`;
183+
interface SendEventData {
184+
instanceName: string;
185+
wuid: string;
186+
event: string;
187+
apiKey?: string;
188+
data: any;
189+
}
190+
191+
export const sendEventData = ({ data, event, wuid, apiKey, instanceName }: SendEventData) => {
192+
const rabbitConfig = configService.get<Rabbitmq>('RABBITMQ');
193+
let exchangeName = rabbitConfig.EXCHANGE_NAME;
194+
if (rabbitConfig.MODE === 'isolated') exchangeName = instanceName;
195+
196+
amqpChannel.assertExchange(exchangeName, 'topic', {
197+
durable: true,
198+
autoDelete: false,
84199
});
85-
86-
const exchangeName = instanceName ?? 'evolution_exchange';
87-
88-
queues.forEach((event) => {
89-
const amqp = getAMQP();
90-
91-
amqp.assertExchange(exchangeName, 'topic', {
92-
durable: true,
93-
autoDelete: false,
94-
assert: true,
200+
let queueName = event;
201+
if (rabbitConfig.MODE === 'single') {
202+
queueName = 'evolution';
203+
} else if (rabbitConfig.MODE === 'global') {
204+
let eventName = '';
205+
Object.keys(globalQueues).forEach((key) => {
206+
if (globalQueues[key].includes(event as Events)) {
207+
eventName = key;
208+
}
209+
if (eventName === '' && key === 'others') {
210+
eventName = key;
211+
}
95212
});
96-
97-
const queueName = `${instanceName}.${event}`;
98-
99-
amqp.deleteQueue(queueName);
213+
queueName = eventName;
214+
}
215+
amqpChannel.assertQueue(queueName, {
216+
durable: true,
217+
autoDelete: false,
218+
arguments: { 'x-queue-type': 'quorum' },
100219
});
101-
102-
channel.deleteExchange(exchangeName);
220+
amqpChannel.bindQueue(queueName, exchangeName, event);
221+
222+
const serverUrl = configService.get<HttpServer>('SERVER').URL;
223+
const tzoffset = new Date().getTimezoneOffset() * 60000; //offset in milliseconds
224+
const localISOTime = new Date(Date.now() - tzoffset).toISOString();
225+
const now = localISOTime;
226+
const message = {
227+
event,
228+
instance: instanceName,
229+
data,
230+
server_url: serverUrl,
231+
date_time: now,
232+
sender: wuid,
233+
};
234+
if (apiKey) {
235+
message['apikey'] = apiKey;
236+
}
237+
logger.log({
238+
queueName,
239+
exchangeName,
240+
event,
241+
});
242+
amqpChannel.publish(exchangeName, event, Buffer.from(JSON.stringify(message)));
103243
};

src/whatsapp/services/whatsapp.baileys.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1123,7 +1123,7 @@ export class BaileysStartupService extends WAStartupService {
11231123
5: 'PLAYED',
11241124
};
11251125
for await (const { key, update } of args) {
1126-
if (settings?.groups_ignore && key.remoteJid.includes('@g.us')) {
1126+
if (settings?.groups_ignore && key.remoteJid?.includes('@g.us')) {
11271127
this.logger.verbose('group ignored');
11281128
return;
11291129
}

src/whatsapp/services/whatsapp.business.service.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,6 @@ export class BusinessStartupService extends WAStartupService {
303303
if (received.contacts) pushName = received.contacts[0].profile.name;
304304

305305
if (received.messages) {
306-
console.log('received?.messages[0]', received?.messages[0]);
307306
const key = {
308307
id: received.messages[0].id,
309308
remoteJid: this.phoneNumber,
@@ -772,9 +771,6 @@ export class BusinessStartupService extends WAStartupService {
772771
}
773772
})();
774773

775-
console.log('messageSent', messageSent);
776-
console.log('message', message);
777-
778774
const messageRaw: MessageRaw = {
779775
key: { fromMe: true, id: messageSent?.messages[0]?.id, remoteJid: this.createJid(number) },
780776
//pushName: messageSent.pushName,

0 commit comments

Comments
 (0)