Skip to content

Commit d998baa

Browse files
Merge pull request #1896 from nolramaf/feat/add-global-sqs-mode
feat/add global SQS mode with single-queue-per-event and payload size control
2 parents e75dae3 + 7486d22 commit d998baa

File tree

3 files changed

+235
-81
lines changed

3 files changed

+235
-81
lines changed

.env.example

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
SERVER_NAME=evolution
12
SERVER_TYPE=http
23
SERVER_PORT=8080
34
# Server URL - Set your application url
@@ -96,6 +97,36 @@ SQS_SECRET_ACCESS_KEY=
9697
SQS_ACCOUNT_ID=
9798
SQS_REGION=
9899

100+
SQS_GLOBAL_ENABLED=false
101+
SQS_GLOBAL_FORCE_SINGLE_QUEUE=false
102+
SQS_GLOBAL_APPLICATION_STARTUP=false
103+
SQS_GLOBAL_CALL=false
104+
SQS_GLOBAL_CHATS_DELETE=false
105+
SQS_GLOBAL_CHATS_SET=false
106+
SQS_GLOBAL_CHATS_UPDATE=false
107+
SQS_GLOBAL_CHATS_UPSERT=false
108+
SQS_GLOBAL_CONNECTION_UPDATE=false
109+
SQS_GLOBAL_CONTACTS_SET=false
110+
SQS_GLOBAL_CONTACTS_UPDATE=false
111+
SQS_GLOBAL_CONTACTS_UPSERT=false
112+
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
113+
SQS_GLOBAL_GROUPS_UPDATE=false
114+
SQS_GLOBAL_GROUPS_UPSERT=false
115+
SQS_GLOBAL_LABELS_ASSOCIATION=false
116+
SQS_GLOBAL_LABELS_EDIT=false
117+
SQS_GLOBAL_LOGOUT_INSTANCE=false
118+
SQS_GLOBAL_MESSAGES_DELETE=false
119+
SQS_GLOBAL_MESSAGES_EDITED=false
120+
SQS_GLOBAL_MESSAGES_SET=false
121+
SQS_GLOBAL_MESSAGES_UPDATE=false
122+
SQS_GLOBAL_MESSAGES_UPSERT=false
123+
SQS_GLOBAL_PRESENCE_UPDATE=false
124+
SQS_GLOBAL_QRCODE_UPDATED=false
125+
SQS_GLOBAL_REMOVE_INSTANCE=false
126+
SQS_GLOBAL_SEND_MESSAGE=false
127+
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
128+
SQS_GLOBAL_TYPEBOT_START=false
129+
99130
# Websocket - Environment variables
100131
WEBSOCKET_ENABLED=false
101132
WEBSOCKET_GLOBAL_EVENTS=false
Lines changed: 136 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
12
import { PrismaRepository } from '@api/repository/repository.service';
23
import { WAMonitoringService } from '@api/services/monitor.service';
34
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
4-
import { configService, Log, Sqs } from '@config/env.config';
5+
import { configService, HttpServer, Log, S3, Sqs } from '@config/env.config';
56
import { Logger } from '@config/logger.config';
67

78
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
@@ -15,27 +16,29 @@ export class SqsController extends EventController implements EventControllerInt
1516
super(prismaRepository, waMonitor, configService.get<Sqs>('SQS')?.ENABLED, 'sqs');
1617
}
1718

18-
public init(): void {
19+
public async init(): Promise<void> {
1920
if (!this.status) {
2021
return;
2122
}
2223

23-
new Promise<void>((resolve) => {
24-
const awsConfig = configService.get<Sqs>('SQS');
24+
const awsConfig = configService.get<Sqs>('SQS');
2525

26-
this.sqs = new SQS({
27-
credentials: {
28-
accessKeyId: awsConfig.ACCESS_KEY_ID,
29-
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
30-
},
26+
this.sqs = new SQS({
27+
credentials: {
28+
accessKeyId: awsConfig.ACCESS_KEY_ID,
29+
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
30+
},
3131

32-
region: awsConfig.REGION,
33-
});
32+
region: awsConfig.REGION,
33+
});
3434

35-
this.logger.info('SQS initialized');
35+
this.logger.info('SQS initialized');
3636

37-
resolve();
38-
});
37+
const sqsConfig = configService.get<Sqs>('SQS');
38+
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
39+
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
40+
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
41+
}
3942
}
4043

4144
private set channel(sqs: SQS) {
@@ -47,7 +50,7 @@ export class SqsController extends EventController implements EventControllerInt
4750
}
4851

4952
override async set(instanceName: string, data: EventDto): Promise<any> {
50-
if (!this.status) {
53+
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
5154
return;
5255
}
5356

@@ -75,6 +78,7 @@ export class SqsController extends EventController implements EventControllerInt
7578
instanceId: this.monitor.waInstances[instanceName].instanceId,
7679
},
7780
};
81+
7882
console.log('*** payload: ', payload);
7983
return this.prisma[this.name].upsert(payload);
8084
}
@@ -98,100 +102,151 @@ export class SqsController extends EventController implements EventControllerInt
98102
return;
99103
}
100104

101-
const instanceSqs = await this.get(instanceName);
102-
const sqsLocal = instanceSqs?.events;
103-
const we = event.replace(/[.-]/gm, '_').toUpperCase();
104-
105-
if (instanceSqs?.enabled) {
106-
if (this.sqs) {
107-
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
108-
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
109-
const queueName = `${instanceName}_${eventFormatted}.fifo`;
110-
const sqsConfig = configService.get<Sqs>('SQS');
111-
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
112-
113-
const message = {
114-
event,
115-
instance: instanceName,
116-
data,
117-
server_url: serverUrl,
118-
date_time: dateTime,
119-
sender,
120-
apikey: apiKey,
121-
};
122-
123-
const params = {
124-
MessageBody: JSON.stringify(message),
125-
MessageGroupId: 'evolution',
126-
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
127-
QueueUrl: sqsUrl,
128-
};
129-
130-
this.sqs.sendMessage(params, (err) => {
131-
if (err) {
132-
this.logger.error({
133-
local: `${origin}.sendData-SQS`,
134-
message: err?.message,
135-
hostName: err?.hostname,
136-
code: err?.code,
137-
stack: err?.stack,
138-
name: err?.name,
139-
url: queueName,
140-
server_url: serverUrl,
141-
});
142-
} else {
143-
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
144-
const logData = {
145-
local: `${origin}.sendData-SQS`,
146-
...message,
147-
};
148-
149-
this.logger.log(logData);
150-
}
151-
}
105+
if (this.sqs) {
106+
const serverConfig = configService.get<HttpServer>('SERVER');
107+
const sqsConfig = configService.get<Sqs>('SQS');
108+
109+
const we = event.replace(/[.-]/gm, '_').toUpperCase();
110+
111+
let sqsEvents = [];
112+
if (sqsConfig.GLOBAL_ENABLED) {
113+
sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
114+
} else {
115+
const instanceSqs = await this.get(instanceName);
116+
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
117+
sqsEvents = instanceSqs?.events;
118+
}
119+
}
120+
121+
if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
122+
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
123+
const eventFormatted =
124+
sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE
125+
? 'singlequeue'
126+
: `${event.replace('.', '_').toLowerCase()}`;
127+
const queueName = `${prefixName}_${eventFormatted}.fifo`;
128+
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
129+
130+
const message = {
131+
event,
132+
instance: instanceName,
133+
dataType: 'json',
134+
data,
135+
server: serverConfig.NAME,
136+
server_url: serverUrl,
137+
date_time: dateTime,
138+
sender,
139+
apikey: apiKey,
140+
};
141+
142+
const jsonStr = JSON.stringify(message);
143+
const size = Buffer.byteLength(jsonStr, 'utf8');
144+
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
145+
if (!configService.get<S3>('S3').ENABLE) {
146+
this.logger.error(
147+
`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`,
148+
);
149+
return;
150+
}
151+
152+
const buffer = Buffer.from(jsonStr, 'utf8');
153+
const fullName = `messages/${instanceName}_${eventFormatted}_${Date.now()}.json`;
154+
155+
await s3Service.uploadFile(fullName, buffer, size, {
156+
'Content-Type': 'application/json',
157+
'Cache-Control': 'no-store',
152158
});
159+
160+
const fileUrl = await s3Service.getObjectUrl(fullName);
161+
162+
message.data = { fileUrl };
163+
message.dataType = 's3';
153164
}
165+
166+
const messageGroupId = sqsConfig.GLOBAL_ENABLED ? `${serverConfig.NAME}-${eventFormatted}-${instanceName}` : 'evolution';
167+
const isGlobalEnabled = sqsConfig.GLOBAL_ENABLED;
168+
const params = {
169+
MessageBody: JSON.stringify(message),
170+
MessageGroupId: messageGroupId,
171+
QueueUrl: sqsUrl,
172+
...(!isGlobalEnabled && {
173+
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
174+
}),
175+
};
176+
177+
this.sqs.sendMessage(params, (err) => {
178+
if (err) {
179+
this.logger.error({
180+
local: `${origin}.sendData-SQS`,
181+
params: JSON.stringify(message),
182+
sqsUrl: sqsUrl,
183+
message: err?.message,
184+
hostName: err?.hostname,
185+
code: err?.code,
186+
stack: err?.stack,
187+
name: err?.name,
188+
url: queueName,
189+
server_url: serverUrl,
190+
});
191+
} else if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
192+
const logData = {
193+
local: `${origin}.sendData-SQS`,
194+
...message,
195+
};
196+
197+
this.logger.log(logData);
198+
}
199+
});
154200
}
155201
}
156202
}
157203

158-
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
204+
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
159205
if (enable) {
160-
const eventsFinded = await this.listQueuesByInstance(instanceName);
206+
const sqsConfig = configService.get<Sqs>('SQS');
207+
const eventsFinded = await this.listQueues(prefixName);
161208
console.log('eventsFinded', eventsFinded);
162209

163210
for (const event of events) {
164-
const normalizedEvent = event.toLowerCase();
165-
211+
const normalizedEvent =
212+
sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE ? 'singlequeue' : event.toLowerCase();
166213
if (eventsFinded.includes(normalizedEvent)) {
167214
this.logger.info(`A queue para o evento "${normalizedEvent}" já existe. Ignorando criação.`);
168215
continue;
169216
}
170217

171-
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
172-
218+
const queueName = `${prefixName}_${normalizedEvent}.fifo`;
173219
try {
220+
const isGlobalEnabled = sqsConfig.GLOBAL_ENABLED;
174221
const createCommand = new CreateQueueCommand({
175222
QueueName: queueName,
176223
Attributes: {
177224
FifoQueue: 'true',
225+
...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }),
178226
},
179227
});
228+
180229
const data = await this.sqs.send(createCommand);
181230
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
182231
} catch (err: any) {
183232
this.logger.error(`Erro ao criar queue ${queueName}: ${err.message}`);
184233
}
234+
235+
if (sqsConfig.GLOBAL_ENABLED && sqsConfig.GLOBAL_FORCE_SINGLE_QUEUE) {
236+
break;
237+
}
185238
}
186239
}
187240
}
188241

189-
private async listQueuesByInstance(instanceName: string) {
242+
private async listQueues(prefixName: string) {
190243
let existingQueues: string[] = [];
244+
191245
try {
192246
const listCommand = new ListQueuesCommand({
193-
QueueNamePrefix: `${instanceName}_`,
247+
QueueNamePrefix: `${prefixName}_`,
194248
});
249+
195250
const listData = await this.sqs.send(listCommand);
196251
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
197252
// Extrai o nome da fila a partir da URL
@@ -201,32 +256,32 @@ export class SqsController extends EventController implements EventControllerInt
201256
});
202257
}
203258
} catch (error: any) {
204-
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
259+
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
205260
return;
206261
}
207262

208263
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
209264
return existingQueues
210265
.map((queueName) => {
211266
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
212-
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
213-
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
267+
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
268+
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
214269
}
215270
return '';
216271
})
217272
.filter((event) => event !== '');
218273
}
219274

220275
// Para uma futura feature de exclusão forçada das queues
221-
private async removeQueuesByInstance(instanceName: string) {
276+
private async removeQueuesByInstance(prefixName: string) {
222277
try {
223278
const listCommand = new ListQueuesCommand({
224-
QueueNamePrefix: `${instanceName}_`,
279+
QueueNamePrefix: `${prefixName}_`,
225280
});
226281
const listData = await this.sqs.send(listCommand);
227282

228283
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
229-
this.logger.info(`No queues found for instance ${instanceName}`);
284+
this.logger.info(`No queues found for ${prefixName}`);
230285
return;
231286
}
232287

@@ -240,7 +295,7 @@ export class SqsController extends EventController implements EventControllerInt
240295
}
241296
}
242297
} catch (err: any) {
243-
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
298+
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
244299
}
245300
}
246301
}

0 commit comments

Comments
 (0)