Skip to content

Commit cca929e

Browse files
committed
feat/add global SQS mode with single-queue-per-event and payload size control
1 parent 9cdb897 commit cca929e

File tree

3 files changed

+221
-79
lines changed

3 files changed

+221
-79
lines changed

.env.example

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
SERVER_NAME=evolution
12
SERVER_TYPE=http
23
SERVER_PORT=8080
34
# Server URL - Set your application url
@@ -96,6 +97,35 @@ SQS_SECRET_ACCESS_KEY=
9697
SQS_ACCOUNT_ID=
9798
SQS_REGION=
9899

100+
SQS_GLOBAL_ENABLED=false
101+
SQS_GLOBAL_APPLICATION_STARTUP=false
102+
SQS_GLOBAL_CALL=false
103+
SQS_GLOBAL_CHATS_DELETE=false
104+
SQS_GLOBAL_CHATS_SET=false
105+
SQS_GLOBAL_CHATS_UPDATE=false
106+
SQS_GLOBAL_CHATS_UPSERT=false
107+
SQS_GLOBAL_CONNECTION_UPDATE=false
108+
SQS_GLOBAL_CONTACTS_SET=false
109+
SQS_GLOBAL_CONTACTS_UPDATE=false
110+
SQS_GLOBAL_CONTACTS_UPSERT=false
111+
SQS_GLOBAL_GROUP_PARTICIPANTS_UPDATE=false
112+
SQS_GLOBAL_GROUP_UPDATE=false
113+
SQS_GLOBAL_GROUPS_UPSERT=false
114+
SQS_GLOBAL_LABELS_ASSOCIATION=false
115+
SQS_GLOBAL_LABELS_EDIT=false
116+
SQS_GLOBAL_LOGOUT_INSTANCE=false
117+
SQS_GLOBAL_MESSAGES_DELETE=false
118+
SQS_GLOBAL_MESSAGES_EDITED=false
119+
SQS_GLOBAL_MESSAGES_SET=false
120+
SQS_GLOBAL_MESSAGES_UPDATE=false
121+
SQS_GLOBAL_MESSAGES_UPSERT=false
122+
SQS_GLOBAL_PRESENCE_UPDATE=false
123+
SQS_GLOBAL_QRCODE_UPDATED=false
124+
SQS_GLOBAL_REMOVE_INSTANCE=false
125+
SQS_GLOBAL_SEND_MESSAGE=false
126+
SQS_GLOBAL_TYPEBOT_CHANGE_STATUS=false
127+
SQS_GLOBAL_TYPEBOT_START=false
128+
99129
# Websocket - Environment variables
100130
WEBSOCKET_ENABLED=false
101131
WEBSOCKET_GLOBAL_EVENTS=false
Lines changed: 125 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import * as s3Service from '@api/integrations/storage/s3/libs/minio.server';
12
import { PrismaRepository } from '@api/repository/repository.service';
23
import { WAMonitoringService } from '@api/services/monitor.service';
34
import { CreateQueueCommand, DeleteQueueCommand, ListQueuesCommand, SQS } from '@aws-sdk/client-sqs';
4-
import { configService, Log, Sqs } from '@config/env.config';
5+
import { configService, HttpServer, Log, S3, Sqs } from '@config/env.config';
56
import { Logger } from '@config/logger.config';
67

78
import { EmitData, EventController, EventControllerInterface } from '../event.controller';
@@ -15,27 +16,29 @@ export class SqsController extends EventController implements EventControllerInt
1516
super(prismaRepository, waMonitor, configService.get<Sqs>('SQS')?.ENABLED, 'sqs');
1617
}
1718

18-
public init(): void {
19+
public async init(): Promise<void> {
1920
if (!this.status) {
2021
return;
2122
}
2223

23-
new Promise<void>((resolve) => {
24-
const awsConfig = configService.get<Sqs>('SQS');
24+
const awsConfig = configService.get<Sqs>('SQS');
2525

26-
this.sqs = new SQS({
27-
credentials: {
28-
accessKeyId: awsConfig.ACCESS_KEY_ID,
29-
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
30-
},
26+
this.sqs = new SQS({
27+
credentials: {
28+
accessKeyId: awsConfig.ACCESS_KEY_ID,
29+
secretAccessKey: awsConfig.SECRET_ACCESS_KEY,
30+
},
3131

32-
region: awsConfig.REGION,
33-
});
32+
region: awsConfig.REGION,
33+
});
3434

35-
this.logger.info('SQS initialized');
35+
this.logger.info('SQS initialized');
3636

37-
resolve();
38-
});
37+
const sqsConfig = configService.get<Sqs>('SQS');
38+
if (this.sqs && sqsConfig.GLOBAL_ENABLED) {
39+
const sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
40+
await this.saveQueues(sqsConfig.GLOBAL_PREFIX_NAME, sqsEvents, true);
41+
}
3942
}
4043

4144
private set channel(sqs: SQS) {
@@ -47,7 +50,7 @@ export class SqsController extends EventController implements EventControllerInt
4750
}
4851

4952
override async set(instanceName: string, data: EventDto): Promise<any> {
50-
if (!this.status) {
53+
if (!this.status || configService.get<Sqs>('SQS').GLOBAL_ENABLED) {
5154
return;
5255
}
5356

@@ -75,6 +78,7 @@ export class SqsController extends EventController implements EventControllerInt
7578
instanceId: this.monitor.waInstances[instanceName].instanceId,
7679
},
7780
};
81+
7882
console.log('*** payload: ', payload);
7983
return this.prisma[this.name].upsert(payload);
8084
}
@@ -98,66 +102,104 @@ export class SqsController extends EventController implements EventControllerInt
98102
return;
99103
}
100104

101-
const instanceSqs = await this.get(instanceName);
102-
const sqsLocal = instanceSqs?.events;
103-
const we = event.replace(/[.-]/gm, '_').toUpperCase();
104-
105-
if (instanceSqs?.enabled) {
106-
if (this.sqs) {
107-
if (Array.isArray(sqsLocal) && sqsLocal.includes(we)) {
108-
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
109-
const queueName = `${instanceName}_${eventFormatted}.fifo`;
110-
const sqsConfig = configService.get<Sqs>('SQS');
111-
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
112-
113-
const message = {
114-
event,
115-
instance: instanceName,
116-
data,
117-
server_url: serverUrl,
118-
date_time: dateTime,
119-
sender,
120-
apikey: apiKey,
121-
};
122-
123-
const params = {
124-
MessageBody: JSON.stringify(message),
125-
MessageGroupId: 'evolution',
126-
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
127-
QueueUrl: sqsUrl,
128-
};
129-
130-
this.sqs.sendMessage(params, (err) => {
131-
if (err) {
132-
this.logger.error({
133-
local: `${origin}.sendData-SQS`,
134-
message: err?.message,
135-
hostName: err?.hostname,
136-
code: err?.code,
137-
stack: err?.stack,
138-
name: err?.name,
139-
url: queueName,
140-
server_url: serverUrl,
141-
});
142-
} else {
143-
if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
144-
const logData = {
145-
local: `${origin}.sendData-SQS`,
146-
...message,
147-
};
148-
149-
this.logger.log(logData);
150-
}
151-
}
105+
if (this.sqs) {
106+
const sqsConfig = configService.get<Sqs>('SQS');
107+
108+
const we = event.replace(/[.-]/gm, '_').toUpperCase();
109+
110+
let sqsEvents = [];
111+
if (sqsConfig.GLOBAL_ENABLED) {
112+
sqsEvents = Object.keys(sqsConfig.EVENTS).filter((e) => sqsConfig.EVENTS[e]);
113+
} else {
114+
const instanceSqs = await this.get(instanceName);
115+
if (instanceSqs?.enabled && Array.isArray(instanceSqs?.events)) {
116+
sqsEvents = instanceSqs?.events;
117+
}
118+
}
119+
120+
if (Array.isArray(sqsEvents) && sqsEvents.includes(we)) {
121+
const eventFormatted = `${event.replace('.', '_').toLowerCase()}`;
122+
const prefixName = sqsConfig.GLOBAL_ENABLED ? sqsConfig.GLOBAL_PREFIX_NAME : instanceName;
123+
const queueName = `${prefixName}_${eventFormatted}.fifo`;
124+
125+
const sqsUrl = `https://sqs.${sqsConfig.REGION}.amazonaws.com/${sqsConfig.ACCOUNT_ID}/${queueName}`;
126+
127+
const message = {
128+
event,
129+
instance: instanceName,
130+
dataType: 'json',
131+
data,
132+
server: configService.get<HttpServer>('SERVER').NAME,
133+
server_url: serverUrl,
134+
date_time: dateTime,
135+
sender,
136+
apikey: apiKey,
137+
};
138+
139+
const jsonStr = JSON.stringify(message);
140+
const size = Buffer.byteLength(jsonStr, 'utf8');
141+
if (size > sqsConfig.MAX_PAYLOAD_SIZE) {
142+
if (!configService.get<S3>('S3').ENABLE) {
143+
this.logger.error(
144+
`${instanceName} - ${eventFormatted} - SQS ignored: payload (${size} bytes) exceeds SQS size limit (${sqsConfig.MAX_PAYLOAD_SIZE} bytes) and S3 storage is not enabled.`,
145+
);
146+
return;
147+
}
148+
149+
const buffer = Buffer.from(jsonStr, 'utf8');
150+
const fullName = `messages/${instanceName}_${eventFormatted}_${Date.now()}.json`;
151+
152+
await s3Service.uploadFile(fullName, buffer, size, {
153+
'Content-Type': 'application/json',
154+
'Cache-Control': 'no-store',
152155
});
156+
157+
const fileUrl = await s3Service.getObjectUrl(fullName);
158+
159+
message.data = { fileUrl };
160+
message.dataType = 's3';
153161
}
162+
163+
const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
164+
const params = {
165+
MessageBody: JSON.stringify(message),
166+
MessageGroupId: 'evolution',
167+
QueueUrl: sqsUrl,
168+
...(!isGlobalEnabled && {
169+
MessageDeduplicationId: `${instanceName}_${eventFormatted}_${Date.now()}`,
170+
}),
171+
};
172+
173+
this.sqs.sendMessage(params, (err) => {
174+
if (err) {
175+
this.logger.error({
176+
local: `${origin}.sendData-SQS`,
177+
params: JSON.stringify(message),
178+
sqsUrl: sqsUrl,
179+
message: err?.message,
180+
hostName: err?.hostname,
181+
code: err?.code,
182+
stack: err?.stack,
183+
name: err?.name,
184+
url: queueName,
185+
server_url: serverUrl,
186+
});
187+
} else if (configService.get<Log>('LOG').LEVEL.includes('WEBHOOKS')) {
188+
const logData = {
189+
local: `${origin}.sendData-SQS`,
190+
...message,
191+
};
192+
193+
this.logger.log(logData);
194+
}
195+
});
154196
}
155197
}
156198
}
157199

158-
private async saveQueues(instanceName: string, events: string[], enable: boolean) {
200+
private async saveQueues(prefixName: string, events: string[], enable: boolean) {
159201
if (enable) {
160-
const eventsFinded = await this.listQueuesByInstance(instanceName);
202+
const eventsFinded = await this.listQueues(prefixName);
161203
console.log('eventsFinded', eventsFinded);
162204

163205
for (const event of events) {
@@ -168,15 +210,17 @@ export class SqsController extends EventController implements EventControllerInt
168210
continue;
169211
}
170212

171-
const queueName = `${instanceName}_${normalizedEvent}.fifo`;
172-
213+
const queueName = `${prefixName}_${normalizedEvent}.fifo`;
173214
try {
215+
const isGlobalEnabled = configService.get<Sqs>('SQS').GLOBAL_ENABLED;
174216
const createCommand = new CreateQueueCommand({
175217
QueueName: queueName,
176218
Attributes: {
177219
FifoQueue: 'true',
220+
...(isGlobalEnabled && { ContentBasedDeduplication: 'true' }),
178221
},
179222
});
223+
180224
const data = await this.sqs.send(createCommand);
181225
this.logger.info(`Queue ${queueName} criada: ${data.QueueUrl}`);
182226
} catch (err: any) {
@@ -186,12 +230,14 @@ export class SqsController extends EventController implements EventControllerInt
186230
}
187231
}
188232

189-
private async listQueuesByInstance(instanceName: string) {
233+
private async listQueues(prefixName: string) {
190234
let existingQueues: string[] = [];
235+
191236
try {
192237
const listCommand = new ListQueuesCommand({
193-
QueueNamePrefix: `${instanceName}_`,
238+
QueueNamePrefix: `${prefixName}_`,
194239
});
240+
195241
const listData = await this.sqs.send(listCommand);
196242
if (listData.QueueUrls && listData.QueueUrls.length > 0) {
197243
// Extrai o nome da fila a partir da URL
@@ -201,32 +247,32 @@ export class SqsController extends EventController implements EventControllerInt
201247
});
202248
}
203249
} catch (error: any) {
204-
this.logger.error(`Erro ao listar filas para a instância ${instanceName}: ${error.message}`);
250+
this.logger.error(`Erro ao listar filas para ${prefixName}: ${error.message}`);
205251
return;
206252
}
207253

208254
// Mapeia os eventos já existentes nas filas: remove o prefixo e o sufixo ".fifo"
209255
return existingQueues
210256
.map((queueName) => {
211257
// Espera-se que o nome seja `${instanceName}_${event}.fifo`
212-
if (queueName.startsWith(`${instanceName}_`) && queueName.endsWith('.fifo')) {
213-
return queueName.substring(instanceName.length + 1, queueName.length - 5).toLowerCase();
258+
if (queueName.startsWith(`${prefixName}_`) && queueName.endsWith('.fifo')) {
259+
return queueName.substring(prefixName.length + 1, queueName.length - 5).toLowerCase();
214260
}
215261
return '';
216262
})
217263
.filter((event) => event !== '');
218264
}
219265

220266
// Para uma futura feature de exclusão forçada das queues
221-
private async removeQueuesByInstance(instanceName: string) {
267+
private async removeQueuesByInstance(prefixName: string) {
222268
try {
223269
const listCommand = new ListQueuesCommand({
224-
QueueNamePrefix: `${instanceName}_`,
270+
QueueNamePrefix: `${prefixName}_`,
225271
});
226272
const listData = await this.sqs.send(listCommand);
227273

228274
if (!listData.QueueUrls || listData.QueueUrls.length === 0) {
229-
this.logger.info(`No queues found for instance ${instanceName}`);
275+
this.logger.info(`No queues found for ${prefixName}`);
230276
return;
231277
}
232278

@@ -240,7 +286,7 @@ export class SqsController extends EventController implements EventControllerInt
240286
}
241287
}
242288
} catch (err: any) {
243-
this.logger.error(`Error listing queues for instance ${instanceName}: ${err.message}`);
289+
this.logger.error(`Error listing queues for ${prefixName}: ${err.message}`);
244290
}
245291
}
246292
}

0 commit comments

Comments
 (0)