Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
"qrcode": "^1.5.4",
"qrcode-terminal": "^0.12.0",
"redis": "^4.7.0",
"rxjs": "^7.8.2",
"sharp": "^0.34.2",
"socket.io": "^4.8.1",
"socket.io-client": "^4.8.1",
Expand Down
59 changes: 59 additions & 0 deletions src/api/integrations/channel/whatsapp/baileysMessage.processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { Logger } from '@config/logger.config';
import { BaileysEventMap, MessageUpsertType, proto } from 'baileys';
import { catchError, concatMap, delay, EMPTY, from, retryWhen, Subject, Subscription, take, tap } from 'rxjs';

type MessageUpsertPayload = BaileysEventMap['messages.upsert'];
type MountProps = {
onMessageReceive: (payload: MessageUpsertPayload, settings: any) => Promise<void>;
};

export class BaileysMessageProcessor {
private processorLogs = new Logger('BaileysMessageProcessor');
private subscription?: Subscription;

protected messageSubject = new Subject<{
messages: proto.IWebMessageInfo[];
type: MessageUpsertType;
requestId?: string;
settings: any;
}>();

mount({ onMessageReceive }: MountProps) {
this.subscription = this.messageSubject
.pipe(
tap(({ messages }) => {
this.processorLogs.log(`Processing batch of ${messages.length} messages`);
}),
concatMap(({ messages, type, requestId, settings }) =>
from(onMessageReceive({ messages, type, requestId }, settings)).pipe(
retryWhen((errors) =>
errors.pipe(
tap((error) => this.processorLogs.warn(`Retrying message batch due to error: ${error.message}`)),
delay(1000), // 1 segundo de delay
take(3), // Máximo 3 tentativas
),
),
),
),
catchError((error) => {
this.processorLogs.error(`Error processing message batch: ${error}`);
return EMPTY;
}),
)
.subscribe({
error: (error) => {
this.processorLogs.error(`Message stream error: ${error}`);
},
});
}

processMessage(payload: MessageUpsertPayload, settings: any) {
const { messages, type, requestId } = payload;
this.messageSubject.next({ messages, type, requestId, settings });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): No backpressure or queue size limit is enforced on messageSubject.

This could cause memory issues if many messages are processed rapidly. Consider implementing a queue size limit or backpressure mechanism.

Suggested implementation:

  // Limit the queue to 100 messages (adjust as needed)
  protected readonly MESSAGE_QUEUE_LIMIT = 100;
  protected messageQueue: Array<{ messages: any; type: any; requestId: any; settings: any }> = [];
  protected messageSubject = new Subject<{
  processMessage(payload: MessageUpsertPayload, settings: any) {
    const { messages, type, requestId } = payload;
    if (this.messageQueue.length >= this.MESSAGE_QUEUE_LIMIT) {
      // Drop the oldest message to make room for the new one (or log/drop new, as preferred)
      this.processorLogs.warn(
        `Message queue full (${this.MESSAGE_QUEUE_LIMIT}). Dropping oldest message.`
      );
      this.messageQueue.shift();
    }
    const message = { messages, type, requestId, settings };
    this.messageQueue.push(message);
    this.messageSubject.next(message);
  }

}

onDestroy() {
this.subscription?.unsubscribe();
this.messageSubject.complete();
Comment on lines +56 to +57
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (bug_risk): Unsubscribing and completing the Subject may cause issues if processMessage is called after onDestroy.

Calling processMessage after onDestroy will throw an error due to the completed Subject. Please guard processMessage or document its usage constraints.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ import sharp from 'sharp';
import { PassThrough, Readable } from 'stream';
import { v4 } from 'uuid';

import { BaileysMessageProcessor } from './baileysMessage.processor';
import { useVoiceCallsBaileys } from './voiceCalls/useVoiceCallsBaileys';

const groupMetadataCache = new CacheService(new CacheEngine(configService, 'groups').getEngine());
Expand Down Expand Up @@ -213,6 +214,8 @@ async function getVideoDuration(input: Buffer | string | Readable): Promise<numb
}

export class BaileysStartupService extends ChannelStartupService {
private messageProcessor = new BaileysMessageProcessor();

constructor(
public readonly configService: ConfigService,
public readonly eventEmitter: EventEmitter2,
Expand All @@ -224,6 +227,9 @@ export class BaileysStartupService extends ChannelStartupService {
) {
super(configService, eventEmitter, prismaRepository, chatwootCache);
this.instance.qrcode = { count: 0 };
this.messageProcessor.mount({
onMessageReceive: this.messageHandle['messages.upsert'].bind(this), // Bind the method to the current context
});

this.authStateProvider = new AuthStateProvider(this.providerFiles);
}
Expand All @@ -243,6 +249,7 @@ export class BaileysStartupService extends ChannelStartupService {
}

public async logoutInstance() {
this.messageProcessor.onDestroy();
await this.client?.logout('Log out instance: ' + this.instanceName);

this.client?.ws?.close();
Expand Down Expand Up @@ -1653,7 +1660,9 @@ export class BaileysStartupService extends ChannelStartupService {

if (events['messages.upsert']) {
const payload = events['messages.upsert'];
this.messageHandle['messages.upsert'](payload, settings);

this.messageProcessor.processMessage(payload, settings);
// this.messageHandle['messages.upsert'](payload, settings);
}

if (events['messages.update']) {
Expand Down