Skip to content

Commit 8789f9b

Browse files
Merge pull request #917 from judsonjuniorr/feat/sync-lost-messages-chatwoot
feat: Sync lost messages on chatwoot
2 parents d535bc4 + f35f8dd commit 8789f9b

File tree

7 files changed

+141
-27
lines changed

7 files changed

+141
-27
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
"mime": "^3.0.0",
8282
"minio": "^8.0.1",
8383
"node-cache": "^5.1.2",
84+
"node-cron": "^3.0.3",
8485
"node-windows": "^1.0.0-beta.8",
8586
"openai": "^4.52.7",
8687
"parse-bmfont-xml": "^1.1.4",
@@ -106,6 +107,7 @@
106107
"@types/json-schema": "^7.0.15",
107108
"@types/mime": "3.0.0",
108109
"@types/node": "^18.15.11",
110+
"@types/node-cron": "^3.0.11",
109111
"@types/node-windows": "^0.1.2",
110112
"@types/qrcode": "^1.5.0",
111113
"@types/qrcode-terminal": "^0.12.0",

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ import { readFileSync } from 'fs';
121121
import Long from 'long';
122122
import mime from 'mime';
123123
import NodeCache from 'node-cache';
124+
import cron from 'node-cron';
124125
import { release } from 'os';
125126
import { join } from 'path';
126127
import P from 'pino';
@@ -367,7 +368,12 @@ export class BaileysStartupService extends ChannelStartupService {
367368

368369
if (connection === 'open') {
369370
this.instance.wuid = this.client.user.id.replace(/:\d+/, '');
370-
this.instance.profilePictureUrl = (await this.profilePicture(this.instance.wuid)).profilePictureUrl;
371+
try {
372+
const profilePic = await this.profilePicture(this.instance.wuid);
373+
this.instance.profilePictureUrl = profilePic.profilePictureUrl;
374+
} catch (error) {
375+
this.instance.profilePictureUrl = null;
376+
}
371377
const formattedWuid = this.instance.wuid.split('@')[0].padEnd(30, ' ');
372378
const formattedName = this.instance.name;
373379
this.logger.info(
@@ -402,6 +408,7 @@ export class BaileysStartupService extends ChannelStartupService {
402408
status: 'open',
403409
},
404410
);
411+
this.syncChatwootLostMessages();
405412
}
406413
}
407414
}
@@ -3594,15 +3601,16 @@ export class BaileysStartupService extends ChannelStartupService {
35943601
}
35953602

35963603
private prepareMessage(message: proto.IWebMessageInfo): any {
3597-
const contentMsg = message?.message[getContentType(message.message)] as any;
3604+
const contentType = getContentType(message.message);
3605+
const contentMsg = message?.message[contentType] as any;
35983606

35993607
const messageRaw = {
36003608
key: message.key,
36013609
pushName: message.pushName,
36023610
status: message.status,
36033611
message: { ...message.message },
36043612
contextInfo: contentMsg?.contextInfo,
3605-
messageType: getContentType(message.message) || 'unknown',
3613+
messageType: contentType || 'unknown',
36063614
messageTimestamp: message.messageTimestamp as number,
36073615
instanceId: this.instanceId,
36083616
source: getDevice(message.key.id),
@@ -3622,4 +3630,17 @@ export class BaileysStartupService extends ChannelStartupService {
36223630

36233631
return messageRaw;
36243632
}
3633+
3634+
private async syncChatwootLostMessages() {
3635+
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot?.enabled) {
3636+
const chatwootConfig = await this.findChatwoot();
3637+
const prepare = (message: any) => this.prepareMessage(message);
3638+
this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare);
3639+
3640+
const task = cron.schedule('0,30 * * * *', async () => {
3641+
this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare);
3642+
});
3643+
task.start();
3644+
}
3645+
}
36253646
}

src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export class ChatwootController {
5454
return response;
5555
}
5656

57-
public async findChatwoot(instance: InstanceDto) {
57+
public async findChatwoot(instance: InstanceDto): Promise<ChatwootDto & { webhook_url: string }> {
5858
if (!this.configService.get<Chatwoot>('CHATWOOT').ENABLED) throw new BadRequestException('Chatwoot is disabled');
5959

6060
const result = await this.chatwootService.find(instance);

src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts

Lines changed: 75 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { PrismaRepository } from '@api/repository/repository.service';
77
import { CacheService } from '@api/services/cache.service';
88
import { WAMonitoringService } from '@api/services/monitor.service';
99
import { Events } from '@api/types/wa.types';
10-
import { Chatwoot, ConfigService, HttpServer } from '@config/env.config';
10+
import { Chatwoot, ConfigService, Database, HttpServer } from '@config/env.config';
1111
import { Logger } from '@config/logger.config';
1212
import ChatwootClient, {
1313
ChatwootAPIConfig,
@@ -24,6 +24,7 @@ import i18next from '@utils/i18n';
2424
import { sendTelemetry } from '@utils/sendTelemetry';
2525
import axios from 'axios';
2626
import { proto } from 'baileys';
27+
import dayjs from 'dayjs';
2728
import FormData from 'form-data';
2829
import Jimp from 'jimp';
2930
import Long from 'long';
@@ -53,7 +54,7 @@ export class ChatwootService {
5354

5455
private pgClient = postgresClient.getChatwootConnection();
5556

56-
private async getProvider(instance: InstanceDto) {
57+
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
5758
const cacheKey = `${instance.instanceName}:getProvider`;
5859
if (await this.cache.has(cacheKey)) {
5960
const provider = (await this.cache.get(cacheKey)) as ChatwootModel;
@@ -715,7 +716,7 @@ export class ChatwootService {
715716
}
716717
}
717718

718-
public async getInbox(instance: InstanceDto) {
719+
public async getInbox(instance: InstanceDto): Promise<inbox | null> {
719720
const cacheKey = `${instance.instanceName}:getInbox`;
720721
if (await this.cache.has(cacheKey)) {
721722
return (await this.cache.get(cacheKey)) as inbox;
@@ -839,12 +840,6 @@ export class ChatwootService {
839840
return null;
840841
}
841842

842-
if (!this.configService.get<Chatwoot>('CHATWOOT').BOT_CONTACT) {
843-
this.logger.log('Chatwoot bot contact is disabled');
844-
845-
return true;
846-
}
847-
848843
const contact = await this.findContact(instance, '123456');
849844

850845
if (!contact) {
@@ -1186,10 +1181,10 @@ export class ChatwootService {
11861181

11871182
const cwBotContact = this.configService.get<Chatwoot>('CHATWOOT').BOT_CONTACT;
11881183

1189-
if (cwBotContact && chatId === '123456' && body.message_type === 'outgoing') {
1184+
if (chatId === '123456' && body.message_type === 'outgoing') {
11901185
const command = messageReceived.replace('/', '');
11911186

1192-
if (command.includes('init') || command.includes('iniciar')) {
1187+
if (cwBotContact && (command.includes('init') || command.includes('iniciar'))) {
11931188
const state = waInstance?.connectionStatus?.state;
11941189

11951190
if (state !== 'open') {
@@ -1242,7 +1237,7 @@ export class ChatwootService {
12421237
}
12431238
}
12441239

1245-
if (command === 'disconnect' || command === 'desconectar') {
1240+
if (cwBotContact && (command === 'disconnect' || command === 'desconectar')) {
12461241
const msgLogout = i18next.t('cw.inbox.disconnect', {
12471242
inboxName: body.inbox.name,
12481243
});
@@ -1458,6 +1453,10 @@ export class ChatwootService {
14581453
chatwootIsRead: chatwootMessageIds.isRead,
14591454
},
14601455
});
1456+
1457+
if (this.isImportHistoryAvailable()) {
1458+
chatwootImport.updateMessageSourceID(chatwootMessageIds.messageId, key.id);
1459+
}
14611460
}
14621461

14631462
private async getMessageByKeyId(instance: InstanceDto, keyId: string): Promise<MessageModel> {
@@ -1532,7 +1531,7 @@ export class ChatwootService {
15321531
'audioMessage',
15331532
'videoMessage',
15341533
'stickerMessage',
1535-
'viewOnceMessageV2'
1534+
'viewOnceMessageV2',
15361535
];
15371536

15381537
const messageKeys = Object.keys(message);
@@ -1586,8 +1585,10 @@ export class ChatwootService {
15861585
liveLocationMessage: msg.liveLocationMessage,
15871586
listMessage: msg.listMessage,
15881587
listResponseMessage: msg.listResponseMessage,
1589-
viewOnceMessageV2: msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url || msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url || msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url,
1590-
1588+
viewOnceMessageV2:
1589+
msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url ||
1590+
msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url ||
1591+
msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url,
15911592
};
15921593

15931594
return types;
@@ -2376,4 +2377,63 @@ export class ChatwootService {
23762377
this.logger.error(`Error on update avatar in recent conversations: ${error.toString()}`);
23772378
}
23782379
}
2380+
2381+
public async syncLostMessages(
2382+
instance: InstanceDto,
2383+
chatwootConfig: ChatwootDto,
2384+
prepareMessage: (message: any) => any,
2385+
) {
2386+
if (!this.isImportHistoryAvailable()) {
2387+
return;
2388+
}
2389+
if (!this.configService.get<Database>('DATABASE').SAVE_DATA.MESSAGE_UPDATE) {
2390+
return;
2391+
}
2392+
2393+
const inbox = await this.getInbox(instance);
2394+
2395+
const sqlMessages = `select * from messages m
2396+
where account_id = ${chatwootConfig.accountId}
2397+
and inbox_id = ${inbox.id}
2398+
and created_at >= now() - interval '6h'
2399+
order by created_at desc`;
2400+
2401+
const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
2402+
const ids: string[] = messagesData
2403+
.filter((message) => !!message.source_id)
2404+
.map((message) => message.source_id.replace('WAID:', ''));
2405+
2406+
const savedMessages = await this.prismaRepository.message.findMany({
2407+
where: {
2408+
Instance: { name: instance.instanceName },
2409+
messageTimestamp: { gte: dayjs().subtract(6, 'hours').unix() },
2410+
AND: ids.map((id) => ({ key: { path: ['id'], not: id } })),
2411+
},
2412+
});
2413+
2414+
const filteredMessages = savedMessages.filter(
2415+
(msg: any) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid),
2416+
);
2417+
const messagesRaw: any[] = [];
2418+
for (const m of filteredMessages) {
2419+
if (!m.message || !m.key || !m.messageTimestamp) {
2420+
continue;
2421+
}
2422+
2423+
if (Long.isLong(m?.messageTimestamp)) {
2424+
m.messageTimestamp = m.messageTimestamp?.toNumber();
2425+
}
2426+
2427+
messagesRaw.push(prepareMessage(m as any));
2428+
}
2429+
2430+
this.addHistoryMessages(
2431+
instance,
2432+
messagesRaw.filter((msg) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid)),
2433+
);
2434+
2435+
await chatwootImport.importHistoryMessages(instance, this, inbox, this.provider);
2436+
const waInstance = this.waMonitor.waInstances[instance.instanceName];
2437+
waInstance.clearCacheChatwoot();
2438+
}
23792439
}

src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ChatwootImport {
5050
const actualValue = this.historyMessages.has(instance.instanceName)
5151
? this.historyMessages.get(instance.instanceName)
5252
: [];
53-
this.historyMessages.set(instance.instanceName, actualValue.concat(messagesRaw));
53+
this.historyMessages.set(instance.instanceName, [...actualValue, ...messagesRaw]);
5454
}
5555

5656
public addHistoryContacts(instance: InstanceDto, contactsRaw: Contact[]) {
@@ -169,6 +169,24 @@ class ChatwootImport {
169169
}
170170
}
171171

172+
private async getExistingSourceIds(sourceIds: string[]): Promise<Set<string>> {
173+
const existingSourceIdsSet = new Set<string>();
174+
175+
if (sourceIds.length === 0) {
176+
return existingSourceIdsSet;
177+
}
178+
179+
const query = 'SELECT source_id FROM messages WHERE source_id = ANY($1)';
180+
const pgClient = postgresClient.getChatwootConnection();
181+
const result = await pgClient.query(query, [sourceIds]);
182+
183+
for (const row of result.rows) {
184+
existingSourceIdsSet.add(row.source_id);
185+
}
186+
187+
return existingSourceIdsSet;
188+
}
189+
172190
public async importHistoryMessages(
173191
instance: InstanceDto,
174192
chatwootService: ChatwootService,
@@ -185,7 +203,7 @@ class ChatwootImport {
185203

186204
let totalMessagesImported = 0;
187205

188-
const messagesOrdered = this.historyMessages.get(instance.instanceName) || [];
206+
let messagesOrdered = this.historyMessages.get(instance.instanceName) || [];
189207
if (messagesOrdered.length === 0) {
190208
return 0;
191209
}
@@ -216,6 +234,8 @@ class ChatwootImport {
216234
});
217235
});
218236

237+
const existingSourceIds = await this.getExistingSourceIds(messagesOrdered.map((message: any) => message.key.id));
238+
messagesOrdered = messagesOrdered.filter((message: any) => !existingSourceIds.has(message.key.id));
219239
// processing messages in batch
220240
const batchSize = 4000;
221241
let messagesChunk: Message[] = this.sliceIntoChunks(messagesOrdered, batchSize);
@@ -233,8 +253,8 @@ class ChatwootImport {
233253

234254
// inserting messages in chatwoot db
235255
let sqlInsertMsg = `INSERT INTO messages
236-
(content, account_id, inbox_id, conversation_id, message_type, private, content_type,
237-
sender_type, sender_id, created_at, updated_at) VALUES `;
256+
(content, processed_message_content, account_id, inbox_id, conversation_id, message_type, private, content_type,
257+
sender_type, sender_id, source_id, created_at, updated_at) VALUES `;
238258
const bindInsertMsg = [provider.accountId, inbox.id];
239259

240260
messagesByPhoneNumber.forEach((messages: any[], phoneNumber: string) => {
@@ -269,11 +289,14 @@ class ChatwootImport {
269289
bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id);
270290
const bindSenderId = `$${bindInsertMsg.length}`;
271291

292+
bindInsertMsg.push('WAID:' + message.key.id);
293+
const bindSourceId = `$${bindInsertMsg.length}`;
294+
272295
bindInsertMsg.push(message.messageTimestamp as number);
273296
const bindmessageTimestamp = `$${bindInsertMsg.length}`;
274297

275-
sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0,
276-
${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`;
298+
sqlInsertMsg += `(${bindContent}, ${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0,
299+
${bindSenderType},${bindSenderId},${bindSourceId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`;
277300
});
278301
});
279302
if (bindInsertMsg.length > 2) {
@@ -519,6 +542,14 @@ class ChatwootImport {
519542
public isIgnorePhoneNumber(remoteJid: string) {
520543
return this.isGroup(remoteJid) || remoteJid === 'status@broadcast' || remoteJid === '0@s.whatsapp.net';
521544
}
545+
546+
public updateMessageSourceID(messageId: string | number, sourceId: string) {
547+
const pgClient = postgresClient.getChatwootConnection();
548+
549+
const sql = `UPDATE messages SET source_id = $1, status = 0, created_at = NOW(), updated_at = NOW() WHERE id = $2;`;
550+
551+
return pgClient.query(sql, [`WAID:${sourceId}`, messageId]);
552+
}
522553
}
523554

524555
export const chatwootImport = new ChatwootImport();

src/api/services/channel.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ export class ChannelStartupService {
294294
this.clearCacheChatwoot();
295295
}
296296

297-
public async findChatwoot(): Promise<ChatwootDto> {
297+
public async findChatwoot(): Promise<ChatwootDto | null> {
298298
if (!this.configService.get<Chatwoot>('CHATWOOT').ENABLED) {
299299
return null;
300300
}

src/config/error.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export function onUnexpectedError() {
1515
logger.error({
1616
origin,
1717
stderr: process.stderr.fd,
18-
error,
1918
});
19+
logger.error(error);
2020
});
2121
}

0 commit comments

Comments
 (0)