Skip to content

Commit e241cf4

Browse files
committed
feat: Sync lost messages on chatwoot
Runs the sync method every 30min
1 parent 4ca141b commit e241cf4

File tree

7 files changed

+129
-27
lines changed

7 files changed

+129
-27
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
"mime": "^3.0.0",
8282
"minio": "^8.0.1",
8383
"node-cache": "^5.1.2",
84+
"node-cron": "^3.0.3",
8485
"node-windows": "^1.0.0-beta.8",
8586
"openai": "^4.52.7",
8687
"parse-bmfont-xml": "^1.1.4",
@@ -106,6 +107,7 @@
106107
"@types/json-schema": "^7.0.15",
107108
"@types/mime": "3.0.0",
108109
"@types/node": "^18.15.11",
110+
"@types/node-cron": "^3.0.11",
109111
"@types/node-windows": "^0.1.2",
110112
"@types/qrcode": "^1.5.0",
111113
"@types/qrcode-terminal": "^0.12.0",

src/api/integrations/channel/whatsapp/whatsapp.baileys.service.ts

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ import { readFileSync } from 'fs';
121121
import Long from 'long';
122122
import mime from 'mime';
123123
import NodeCache from 'node-cache';
124+
import cron from 'node-cron';
124125
import { release } from 'os';
125126
import { join } from 'path';
126127
import P from 'pino';
@@ -367,7 +368,12 @@ export class BaileysStartupService extends ChannelStartupService {
367368

368369
if (connection === 'open') {
369370
this.instance.wuid = this.client.user.id.replace(/:\d+/, '');
370-
this.instance.profilePictureUrl = (await this.profilePicture(this.instance.wuid)).profilePictureUrl;
371+
try {
372+
const profilePic = await this.profilePicture(this.instance.wuid);
373+
this.instance.profilePictureUrl = profilePic.profilePictureUrl;
374+
} catch (error) {
375+
this.instance.profilePictureUrl = null;
376+
}
371377
const formattedWuid = this.instance.wuid.split('@')[0].padEnd(30, ' ');
372378
const formattedName = this.instance.name;
373379
this.logger.info(
@@ -402,6 +408,7 @@ export class BaileysStartupService extends ChannelStartupService {
402408
status: 'open',
403409
},
404410
);
411+
this.syncChatwootLostMessages();
405412
}
406413
}
407414
}
@@ -3638,14 +3645,15 @@ export class BaileysStartupService extends ChannelStartupService {
36383645
}
36393646

36403647
private prepareMessage(message: proto.IWebMessageInfo): any {
3641-
const contentMsg = message?.message[getContentType(message.message)] as any;
3648+
const contentType = getContentType(message.message);
3649+
const contentMsg = message?.message[contentType] as any;
36423650

36433651
const messageRaw = {
36443652
key: message.key,
36453653
pushName: message.pushName,
36463654
message: { ...message.message },
36473655
contextInfo: contentMsg?.contextInfo,
3648-
messageType: getContentType(message.message) || 'unknown',
3656+
messageType: contentType || 'unknown',
36493657
messageTimestamp: message.messageTimestamp as number,
36503658
instanceId: this.instanceId,
36513659
source: getDevice(message.key.id),
@@ -3659,4 +3667,17 @@ export class BaileysStartupService extends ChannelStartupService {
36593667

36603668
return messageRaw;
36613669
}
3670+
3671+
private async syncChatwootLostMessages() {
3672+
if (this.configService.get<Chatwoot>('CHATWOOT').ENABLED && this.localChatwoot?.enabled) {
3673+
const chatwootConfig = await this.findChatwoot();
3674+
const prepare = (message: any) => this.prepareMessage(message);
3675+
this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare);
3676+
3677+
const task = cron.schedule('0,30 * * * *', async () => {
3678+
this.chatwootService.syncLostMessages({ instanceName: this.instance.name }, chatwootConfig, prepare);
3679+
});
3680+
task.start();
3681+
}
3682+
}
36623683
}

src/api/integrations/chatbot/chatwoot/controllers/chatwoot.controller.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ export class ChatwootController {
5454
return response;
5555
}
5656

57-
public async findChatwoot(instance: InstanceDto) {
57+
public async findChatwoot(instance: InstanceDto): Promise<ChatwootDto & { webhook_url: string }> {
5858
if (!this.configService.get<Chatwoot>('CHATWOOT').ENABLED) throw new BadRequestException('Chatwoot is disabled');
5959

6060
const result = await this.chatwootService.find(instance);

src/api/integrations/chatbot/chatwoot/services/chatwoot.service.ts

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { PrismaRepository } from '@api/repository/repository.service';
77
import { CacheService } from '@api/services/cache.service';
88
import { WAMonitoringService } from '@api/services/monitor.service';
99
import { Events } from '@api/types/wa.types';
10-
import { Chatwoot, ConfigService, HttpServer } from '@config/env.config';
10+
import { Chatwoot, ConfigService, Database, HttpServer } from '@config/env.config';
1111
import { Logger } from '@config/logger.config';
1212
import ChatwootClient, {
1313
ChatwootAPIConfig,
@@ -24,6 +24,7 @@ import i18next from '@utils/i18n';
2424
import { sendTelemetry } from '@utils/sendTelemetry';
2525
import axios from 'axios';
2626
import { proto } from 'baileys';
27+
import dayjs from 'dayjs';
2728
import FormData from 'form-data';
2829
import Jimp from 'jimp';
2930
import Long from 'long';
@@ -53,7 +54,7 @@ export class ChatwootService {
5354

5455
private pgClient = postgresClient.getChatwootConnection();
5556

56-
private async getProvider(instance: InstanceDto) {
57+
private async getProvider(instance: InstanceDto): Promise<ChatwootModel | null> {
5758
const cacheKey = `${instance.instanceName}:getProvider`;
5859
if (await this.cache.has(cacheKey)) {
5960
const provider = (await this.cache.get(cacheKey)) as ChatwootModel;
@@ -715,7 +716,7 @@ export class ChatwootService {
715716
}
716717
}
717718

718-
public async getInbox(instance: InstanceDto) {
719+
public async getInbox(instance: InstanceDto): Promise<inbox | null> {
719720
const cacheKey = `${instance.instanceName}:getInbox`;
720721
if (await this.cache.has(cacheKey)) {
721722
return (await this.cache.get(cacheKey)) as inbox;
@@ -839,12 +840,6 @@ export class ChatwootService {
839840
return null;
840841
}
841842

842-
if (!this.configService.get<Chatwoot>('CHATWOOT').BOT_CONTACT) {
843-
this.logger.log('Chatwoot bot contact is disabled');
844-
845-
return true;
846-
}
847-
848843
const contact = await this.findContact(instance, '123456');
849844

850845
if (!contact) {
@@ -1186,10 +1181,10 @@ export class ChatwootService {
11861181

11871182
const cwBotContact = this.configService.get<Chatwoot>('CHATWOOT').BOT_CONTACT;
11881183

1189-
if (cwBotContact && chatId === '123456' && body.message_type === 'outgoing') {
1184+
if (chatId === '123456' && body.message_type === 'outgoing') {
11901185
const command = messageReceived.replace('/', '');
11911186

1192-
if (command.includes('init') || command.includes('iniciar')) {
1187+
if (cwBotContact && (command.includes('init') || command.includes('iniciar'))) {
11931188
const state = waInstance?.connectionStatus?.state;
11941189

11951190
if (state !== 'open') {
@@ -1242,7 +1237,7 @@ export class ChatwootService {
12421237
}
12431238
}
12441239

1245-
if (command === 'disconnect' || command === 'desconectar') {
1240+
if (cwBotContact && (command === 'disconnect' || command === 'desconectar')) {
12461241
const msgLogout = i18next.t('cw.inbox.disconnect', {
12471242
inboxName: body.inbox.name,
12481243
});
@@ -1532,7 +1527,7 @@ export class ChatwootService {
15321527
'audioMessage',
15331528
'videoMessage',
15341529
'stickerMessage',
1535-
'viewOnceMessageV2'
1530+
'viewOnceMessageV2',
15361531
];
15371532

15381533
const messageKeys = Object.keys(message);
@@ -1586,8 +1581,10 @@ export class ChatwootService {
15861581
liveLocationMessage: msg.liveLocationMessage,
15871582
listMessage: msg.listMessage,
15881583
listResponseMessage: msg.listResponseMessage,
1589-
viewOnceMessageV2: msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url || msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url || msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url,
1590-
1584+
viewOnceMessageV2:
1585+
msg?.message?.viewOnceMessageV2?.message?.imageMessage?.url ||
1586+
msg?.message?.viewOnceMessageV2?.message?.videoMessage?.url ||
1587+
msg?.message?.viewOnceMessageV2?.message?.audioMessage?.url,
15911588
};
15921589

15931590
return types;
@@ -2376,4 +2373,63 @@ export class ChatwootService {
23762373
this.logger.error(`Error on update avatar in recent conversations: ${error.toString()}`);
23772374
}
23782375
}
2376+
2377+
public async syncLostMessages(
2378+
instance: InstanceDto,
2379+
chatwootConfig: ChatwootDto,
2380+
prepareMessage: (message: any) => any,
2381+
) {
2382+
if (!this.isImportHistoryAvailable()) {
2383+
return;
2384+
}
2385+
if (!this.configService.get<Database>('DATABASE').SAVE_DATA.MESSAGE_UPDATE) {
2386+
return;
2387+
}
2388+
2389+
const inbox = await this.getInbox(instance);
2390+
2391+
const sqlMessages = `select * from messages m
2392+
where account_id = ${chatwootConfig.accountId}
2393+
and inbox_id = ${inbox.id}
2394+
and created_at >= now() - interval '6h'
2395+
order by created_at desc`;
2396+
2397+
const messagesData = (await this.pgClient.query(sqlMessages))?.rows;
2398+
const ids: string[] = messagesData
2399+
.filter((message) => !!message.source_id)
2400+
.map((message) => message.source_id.replace('WAID:', ''));
2401+
2402+
const savedMessages = await this.prismaRepository.message.findMany({
2403+
where: {
2404+
Instance: { name: instance.instanceName },
2405+
messageTimestamp: { gte: dayjs().subtract(6, 'hours').unix() },
2406+
AND: ids.map((id) => ({ key: { path: ['id'], not: id } })),
2407+
},
2408+
});
2409+
2410+
const filteredMessages = savedMessages.filter(
2411+
(msg: any) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid),
2412+
);
2413+
const messagesRaw: any[] = [];
2414+
for (const m of filteredMessages) {
2415+
if (!m.message || !m.key || !m.messageTimestamp) {
2416+
continue;
2417+
}
2418+
2419+
if (Long.isLong(m?.messageTimestamp)) {
2420+
m.messageTimestamp = m.messageTimestamp?.toNumber();
2421+
}
2422+
2423+
messagesRaw.push(prepareMessage(m as any));
2424+
}
2425+
2426+
this.addHistoryMessages(
2427+
instance,
2428+
messagesRaw.filter((msg) => !chatwootImport.isIgnorePhoneNumber(msg.key?.remoteJid)),
2429+
);
2430+
2431+
await chatwootImport.importHistoryMessages(instance, this, inbox, this.provider);
2432+
const waInstance = this.waMonitor.waInstances[instance.instanceName];
2433+
waInstance.clearCacheChatwoot();
2434+
}
23792435
}

src/api/integrations/chatbot/chatwoot/utils/chatwoot-import-helper.ts

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class ChatwootImport {
5050
const actualValue = this.historyMessages.has(instance.instanceName)
5151
? this.historyMessages.get(instance.instanceName)
5252
: [];
53-
this.historyMessages.set(instance.instanceName, actualValue.concat(messagesRaw));
53+
this.historyMessages.set(instance.instanceName, [...actualValue, ...messagesRaw]);
5454
}
5555

5656
public addHistoryContacts(instance: InstanceDto, contactsRaw: Contact[]) {
@@ -169,6 +169,24 @@ class ChatwootImport {
169169
}
170170
}
171171

172+
private async getExistingSourceIds(sourceIds: string[]): Promise<Set<string>> {
173+
const existingSourceIdsSet = new Set<string>();
174+
175+
if (sourceIds.length === 0) {
176+
return existingSourceIdsSet;
177+
}
178+
179+
const query = 'SELECT source_id FROM messages WHERE source_id = ANY($1)';
180+
const pgClient = postgresClient.getChatwootConnection();
181+
const result = await pgClient.query(query, [sourceIds]);
182+
183+
for (const row of result.rows) {
184+
existingSourceIdsSet.add(row.source_id);
185+
}
186+
187+
return existingSourceIdsSet;
188+
}
189+
172190
public async importHistoryMessages(
173191
instance: InstanceDto,
174192
chatwootService: ChatwootService,
@@ -185,7 +203,7 @@ class ChatwootImport {
185203

186204
let totalMessagesImported = 0;
187205

188-
const messagesOrdered = this.historyMessages.get(instance.instanceName) || [];
206+
let messagesOrdered = this.historyMessages.get(instance.instanceName) || [];
189207
if (messagesOrdered.length === 0) {
190208
return 0;
191209
}
@@ -216,6 +234,8 @@ class ChatwootImport {
216234
});
217235
});
218236

237+
const existingSourceIds = await this.getExistingSourceIds(messagesOrdered.map((message: any) => message.key.id));
238+
messagesOrdered = messagesOrdered.filter((message: any) => !existingSourceIds.has(message.key.id));
219239
// processing messages in batch
220240
const batchSize = 4000;
221241
let messagesChunk: Message[] = this.sliceIntoChunks(messagesOrdered, batchSize);
@@ -233,8 +253,8 @@ class ChatwootImport {
233253

234254
// inserting messages in chatwoot db
235255
let sqlInsertMsg = `INSERT INTO messages
236-
(content, account_id, inbox_id, conversation_id, message_type, private, content_type,
237-
sender_type, sender_id, created_at, updated_at) VALUES `;
256+
(content, processed_message_content, account_id, inbox_id, conversation_id, message_type, private, content_type,
257+
sender_type, sender_id, source_id, created_at, updated_at) VALUES `;
238258
const bindInsertMsg = [provider.accountId, inbox.id];
239259

240260
messagesByPhoneNumber.forEach((messages: any[], phoneNumber: string) => {
@@ -269,11 +289,14 @@ class ChatwootImport {
269289
bindInsertMsg.push(message.key.fromMe ? chatwootUser.user_id : fksChatwoot.contact_id);
270290
const bindSenderId = `$${bindInsertMsg.length}`;
271291

292+
bindInsertMsg.push('WAID:' + message.key.id);
293+
const bindSourceId = `$${bindInsertMsg.length}`;
294+
272295
bindInsertMsg.push(message.messageTimestamp as number);
273296
const bindmessageTimestamp = `$${bindInsertMsg.length}`;
274297

275-
sqlInsertMsg += `(${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0,
276-
${bindSenderType},${bindSenderId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`;
298+
sqlInsertMsg += `(${bindContent}, ${bindContent}, $1, $2, ${bindConversationId}, ${bindMessageType}, FALSE, 0,
299+
${bindSenderType},${bindSenderId},${bindSourceId}, to_timestamp(${bindmessageTimestamp}), to_timestamp(${bindmessageTimestamp})),`;
277300
});
278301
});
279302
if (bindInsertMsg.length > 2) {

src/api/services/channel.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ export class ChannelStartupService {
294294
this.clearCacheChatwoot();
295295
}
296296

297-
public async findChatwoot(): Promise<ChatwootDto> {
297+
public async findChatwoot(): Promise<ChatwootDto | null> {
298298
if (!this.configService.get<Chatwoot>('CHATWOOT').ENABLED) {
299299
return null;
300300
}

src/config/error.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export function onUnexpectedError() {
1515
logger.error({
1616
origin,
1717
stderr: process.stderr.fd,
18-
error,
1918
});
19+
logger.error(error);
2020
});
2121
}

0 commit comments

Comments
 (0)