From 2b4cfc18771c19643ab192ff96232ac2a8db76db Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 15 Jul 2025 12:46:32 +0200 Subject: [PATCH 01/16] improve message logging --- packages/amqp/lib/AbstractAmqpConsumer.ts | 17 +++--- packages/amqp/lib/AbstractAmqpPublisher.ts | 6 -- .../consumers/AmqpPermissionConsumer.spec.ts | 26 +++++---- .../AmqpPermissionPublisher.spec.ts | 17 ++++-- .../lib/events/DomainEventEmitter.spec.ts | 2 +- .../core/lib/queues/AbstractQueueService.ts | 57 ++++++++++++++----- packages/core/lib/types/queueOptionsTypes.ts | 6 ++ .../lib/pubsub/AbstractPubSubConsumer.ts | 12 +++- .../lib/pubsub/AbstractPubSubPublisher.ts | 10 ---- packages/sns/lib/sns/AbstractSnsPublisher.ts | 6 -- packages/sns/lib/utils/snsInitter.ts | 2 +- ...lisher.multiStorePayloadOffloading.spec.ts | 2 +- ...missionPublisher.payloadOffloading.spec.ts | 2 +- packages/sns/test/utils/snsSubscriber.spec.ts | 2 +- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 18 +++--- packages/sqs/lib/sqs/AbstractSqsPublisher.ts | 6 -- .../consumers/SqsPermissionConsumer.spec.ts | 20 ++++--- ...lisher.multiStorePayloadOffloading.spec.ts | 2 +- ...missionPublisher.payloadOffloading.spec.ts | 2 +- 19 files changed, 124 insertions(+), 91 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 43192b26..5277ede1 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -6,13 +6,13 @@ import type { ParseMessageResult, PreHandlingOutputs, Prehandler, + ProcessedMessageMetadata, QueueConsumer, QueueConsumerOptions, TransactionObservabilityManager, } from '@message-queue-toolkit/core' import { HandlerContainer, isMessageError, parseMessage } from '@message-queue-toolkit/core' import type { ChannelModel, Message } from 'amqplib' - import type { AMQPConsumerDependencies, AMQPQueueCreationConfig, @@ -157,10 +157,6 @@ export abstract class AbstractAmqpConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } this.internalProcessMessage(parsedMessage, messageType) .then((result) => { if (result.result === 'success') { @@ -267,9 +263,14 @@ export abstract class AbstractAmqpConsumer< return this._messageSchemaContainer.resolveSchema(message) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + return handler.messageLogFormatter(processedMessageMetadata.message) } // eslint-disable-next-line max-params diff --git a/packages/amqp/lib/AbstractAmqpPublisher.ts b/packages/amqp/lib/AbstractAmqpPublisher.ts index 6366d5ae..e9b7aefd 100644 --- a/packages/amqp/lib/AbstractAmqpPublisher.ts +++ b/packages/amqp/lib/AbstractAmqpPublisher.ts @@ -88,12 +88,6 @@ export abstract class AbstractAmqpPublisher< return } - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) try { diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index a15a2056..97e5dae0 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -95,22 +95,26 @@ describe('AmqpPermissionConsumer', () => { await newConsumer.close() - expect(logger.loggedMessages.length).toBe(6) + expect(logger.loggedMessages.length).toBe(4) expect(logger.loggedMessages).toMatchObject([ 'Propagating new connection across 0 receivers', - { - id: '1', - messageType: 'add', - }, 'timestamp not defined, adding it automatically', - expect.any(Object), { - id: '1', - messageType: 'add', - timestamp: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + processingResult: { status: 'published' }, + }), }, { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'consumed' }, + }), + message: { + id: '1', + messageType: 'add', + timestamp: expect.any(String), + }, }, ]) }) @@ -405,7 +409,7 @@ describe('AmqpPermissionConsumer', () => { await waitAndRetry(() => errorReporterSpy.mock.calls.length > 0) expect(errorReporterSpy.mock.calls).toHaveLength(1) - expect(errorReporterSpy.mock.calls[0]![0]!.error).toMatchObject({ + expect(errorReporterSpy.mock.calls[0]?.[0]?.error).toMatchObject({ message: 'Unsupported message type: bad', }) }) diff --git a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts index a7db562f..3822aa82 100644 --- a/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts +++ b/packages/amqp/test/publishers/AmqpPermissionPublisher.spec.ts @@ -47,13 +47,20 @@ describe('PermissionPublisher', () => { publisher.publish(message) await waitAndRetry(() => { - return logger.loggedMessages.length === 2 + return logger.loggedMessages.length === 3 }) - expect(logger.loggedMessages[1]).toEqual({ - id: '1', - messageType: 'add', - }) + expect(logger.loggedMessages).toMatchObject([ + 'Propagating new connection across 0 receivers', + 'timestamp not defined, adding it automatically', + { + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { status: 'published' }, + }), + }, + ]) }) }) diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index cb01884a..7272d8d4 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -204,7 +204,7 @@ describe('DomainEventEmitter', () => { expect(emitResult.message).toEqual({ id: expect.any(String), metadata: { - correlationId: createdEventPayload.metadata!.correlationId!, + correlationId: createdEventPayload.metadata.correlationId!, originatedFrom: 'service', producedBy: undefined, schemaVersion: '1', diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 4a9e6a39..5e85fe8b 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -114,6 +114,10 @@ export abstract class AbstractQueueService< * Used to know the store-based message deduplication options */ protected readonly messageDeduplicationOptionsField: string + /** + * Used to know where metadata is stored - for debug logging purposes only + */ + protected readonly messageMetadataField: string protected readonly errorReporter: ErrorReporter public readonly logger: CommonLogger protected readonly messageIdField: string @@ -157,6 +161,7 @@ export abstract class AbstractQueueService< this.messageDeduplicationIdField = options.messageDeduplicationIdField ?? 'deduplicationId' this.messageDeduplicationOptionsField = options.messageDeduplicationOptionsField ?? 'deduplicationOptions' + this.messageMetadataField = options.messageMetadataField ?? 'metadata' this.creationConfig = options.creationConfig this.locatorConfig = options.locatorConfig this.deletionConfig = options.deletionConfig @@ -239,15 +244,36 @@ export abstract class AbstractQueueService< /** * Format message for logging */ - protected resolveMessageLog(message: MessagePayloadSchemas, _messageType: string): unknown { - return message + protected resolveMessageLog( + _processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + return null } - /** - * Log preformatted and potentially presanitized message payload - */ - protected logMessage(messageLogEntry: unknown) { - this.logger.debug(messageLogEntry) + protected logMessageProcessed( + processedMessageMetadata: ProcessedMessageMetadata, + ) { + const processedMessageMetadataLog = { + processingResult: processedMessageMetadata.processingResult, + messageId: processedMessageMetadata.messageId, + messageType: processedMessageMetadata.messageType, + queueName: processedMessageMetadata.queueName, + messageTimestamp: processedMessageMetadata.messageTimestamp, + messageDeduplicationId: processedMessageMetadata.messageDeduplicationId, + messageProcessingStartTimestamp: processedMessageMetadata.messageProcessingStartTimestamp, + messageProcessingEndTimestamp: processedMessageMetadata.messageProcessingEndTimestamp, + messageMetadata: stringValueSerializer(processedMessageMetadata.messageMetadata), + } + + const resolvedMessageLog = this.resolveMessageLog(processedMessageMetadata) + + this.logger.debug( + { + processedMessageMetadata: processedMessageMetadataLog, + ...(resolvedMessageLog ? { message: resolvedMessageLog } : {}), + }, + `Finished processing message ${processedMessageMetadata.messageId}`, + ) } protected handleError(err: unknown, context?: Record) { @@ -284,8 +310,8 @@ export abstract class AbstractQueueService< messageType, ) - const debugLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') - if (!debugLoggingEnabled && !this.messageMetricsManager) return + const debugMessageLoggingEnabled = this.logMessages && this.logger.isLevelEnabled('debug') + if (!debugMessageLoggingEnabled && !this.messageMetricsManager) return const processedMessageMetadata = this.resolveProcessedMessageMetadata( message, @@ -295,11 +321,8 @@ export abstract class AbstractQueueService< params.queueName, messageId, ) - if (debugLoggingEnabled) { - this.logger.debug( - { processedMessageMetadata: stringValueSerializer(processedMessageMetadata) }, - `Finished processing message ${processedMessageMetadata.messageId}`, - ) + if (debugMessageLoggingEnabled) { + this.logMessageProcessed(processedMessageMetadata) } if (this.messageMetricsManager) { this.messageMetricsManager.registerProcessedMessage(processedMessageMetadata) @@ -324,6 +347,11 @@ export abstract class AbstractQueueService< ? // @ts-ignore message[this.messageDeduplicationId] : undefined + const messageMetadata = + message && this.messageMetadataField in message + ? // @ts-ignore + message[this.messageMetadataField] + : undefined return { processingResult, @@ -335,6 +363,7 @@ export abstract class AbstractQueueService< messageDeduplicationId, messageProcessingStartTimestamp, messageProcessingEndTimestamp, + messageMetadata, } } diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index 9100af14..8fa23172 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -62,6 +62,11 @@ export type ProcessedMessageMetadata } export interface MessageMetricsManager { @@ -113,6 +118,7 @@ export type CommonQueueOptions = { messageTimestampField?: string messageDeduplicationIdField?: string messageDeduplicationOptionsField?: string + messageMetadataField?: string handlerSpy?: HandlerSpy | HandlerSpyParams | boolean logMessages?: boolean deletionConfig?: DeletionConfig diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 68fff02f..673e4c95 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -2,6 +2,7 @@ import { type Either, type ErrorResolver, isError } from '@lokalise/node-core' import type { MessageInvalidFormatError, MessageValidationError, + ProcessedMessageMetadata, ResolvedMessage, } from '@message-queue-toolkit/core' import { @@ -861,9 +862,14 @@ export abstract class AbstractPubSubConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override isDeduplicationEnabledForMessage(message: MessagePayloadType): boolean { diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts index d9484b43..5927afa0 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubPublisher.ts @@ -68,12 +68,6 @@ export abstract class AbstractPubSubPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) const maybeOffloadedPayloadMessage = await this.offloadMessagePayloadIfNeeded(message, () => { // Calculate message size for PubSub @@ -163,8 +157,4 @@ export abstract class AbstractPubSubPublisher protected override resolveSchema(message: MessagePayloadType) { return this.messageSchemaContainer.resolveSchema(message) } - - protected override resolveMessageLog(message: MessagePayloadType, _messageType: string): unknown { - return message - } } diff --git a/packages/sns/lib/sns/AbstractSnsPublisher.ts b/packages/sns/lib/sns/AbstractSnsPublisher.ts index cc32d0e3..f6c14ad4 100644 --- a/packages/sns/lib/sns/AbstractSnsPublisher.ts +++ b/packages/sns/lib/sns/AbstractSnsPublisher.ts @@ -125,12 +125,6 @@ export abstract class AbstractSnsPublisher const topicName = this.locatorConfig?.topicName ?? this.creationConfig?.topic?.Name ?? 'unknown' - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - const updatedMessage = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 5c1c9b40..04b2dfd8 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -111,7 +111,7 @@ export async function initSnsSqs( queueName = splitUrl[splitUrl.length - 1]! } else { // biome-ignore lint/style/noNonNullAssertion: It's ok - queueName = creationConfig!.queue.QueueName! + queueName = creationConfig.queue.QueueName! } return { diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts index be3badd1..8e52e995 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts @@ -132,7 +132,7 @@ describe('SnsPermissionPublisher - multi-store payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload expect(receivedSnsMessages.length).toBe(1) const snsMessageBodyParseResult = SNS_MESSAGE_BODY_SCHEMA.safeParse( - JSON.parse(receivedSnsMessages[0]!.Body!), + JSON.parse(receivedSnsMessages[0].Body!), ) expect(snsMessageBodyParseResult.success).toBe(true) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts index 172e2790..1154f051 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts @@ -130,7 +130,7 @@ describe('SnsPermissionPublisher - single-store payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload. expect(receivedSnsMessages.length).toBe(1) const snsMessageBodyParseResult = SNS_MESSAGE_BODY_SCHEMA.safeParse( - JSON.parse(receivedSnsMessages[0]!.Body!), + JSON.parse(receivedSnsMessages[0].Body!), ) expect(snsMessageBodyParseResult.success).toBe(true) diff --git a/packages/sns/test/utils/snsSubscriber.spec.ts b/packages/sns/test/utils/snsSubscriber.spec.ts index 3a0ca021..9c9f3574 100644 --- a/packages/sns/test/utils/snsSubscriber.spec.ts +++ b/packages/sns/test/utils/snsSubscriber.spec.ts @@ -144,7 +144,7 @@ describe('snsSubscriber', () => { const subscriptionAttributes = await getSubscriptionAttributes( snsClient, - updatedSubscription!.SubscriptionArn!, + updatedSubscription.SubscriptionArn!, ) expect(subscriptionAttributes).toEqual({ result: { diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 42b7b8d7..474a06ae 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -5,6 +5,7 @@ import { SetQueueAttributesCommand, } from '@aws-sdk/client-sqs' import type { Either, ErrorResolver } from '@lokalise/node-core' +import type { ProcessedMessageMetadata } from '@message-queue-toolkit/core' import { type BarrierResult, type DeadLetterQueueOptions, @@ -25,7 +26,6 @@ import { import type { ConsumerOptions } from 'sqs-consumer' import { Consumer } from 'sqs-consumer' import type { ZodSchema } from 'zod/v4' - import type { SQSMessage } from '../types/MessageTypes.ts' import { hasOffloadedPayload } from '../utils/messageUtils.ts' import { deleteSqs, initSqs } from '../utils/sqsInitter.ts' @@ -384,10 +384,7 @@ export abstract class AbstractSqsConsumer< // @ts-expect-error const uniqueTransactionKey = parsedMessage[this.messageIdField] this.transactionObservabilityManager?.start(transactionSpanId, uniqueTransactionKey) - if (this.logMessages) { - const resolvedLogMessage = this.resolveMessageLog(parsedMessage, messageType) - this.logMessage(resolvedLogMessage) - } + const result: Either<'retryLater' | Error, 'success'> = await this.internalProcessMessage( parsedMessage, messageType, @@ -835,9 +832,14 @@ export abstract class AbstractSqsConsumer< ) } - protected override resolveMessageLog(message: MessagePayloadType, messageType: string): unknown { - const handler = this.handlerContainer.resolveHandler(messageType) - return handler.messageLogFormatter(message) + protected override resolveMessageLog( + processedMessageMetadata: ProcessedMessageMetadata, + ): unknown | null { + if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { + return null + } + const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + return handler.messageLogFormatter(processedMessageMetadata.message) } protected override resolveMessage(message: SQSMessage) { diff --git a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts index 620d821c..f4ed02d4 100644 --- a/packages/sqs/lib/sqs/AbstractSqsPublisher.ts +++ b/packages/sqs/lib/sqs/AbstractSqsPublisher.ts @@ -117,12 +117,6 @@ export abstract class AbstractSqsPublisher const messageProcessingStartTimestamp = Date.now() const parsedMessage = messageSchemaResult.result.parse(message) - if (this.logMessages) { - const messageType = this.resolveMessageTypeFromMessage(message) ?? 'unknown' - const resolvedLogMessage = this.resolveMessageLog(message, messageType) - this.logMessage(resolvedLogMessage) - } - message = this.updateInternalProperties(message) // Resolve FIFO options from original message BEFORE offloading diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 851d764f..23d92c43 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -443,15 +443,21 @@ describe('SqsPermissionConsumer', () => { await newConsumer.handlerSpy.waitForMessageWithId('1', 'consumed') - expect(logger.loggedMessages.length).toBe(2) + expect(logger.loggedMessages.length).toBe(1) expect(logger.loggedMessages).toMatchObject([ { - id: '1', - messageType: 'add', - timestamp: expect.any(String), - }, - { - processedMessageMetadata: expect.any(String), + processedMessageMetadata: expect.objectContaining({ + messageId: '1', + messageType: 'add', + processingResult: { + status: 'consumed', + }, + }), + message: { + id: '1', + messageType: 'add', + timestamp: expect.any(String), + }, }, ]) await newConsumer.close() diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts index d0c56d5f..299ef672 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts @@ -113,7 +113,7 @@ describe('SqsPermissionPublisher - multi-store payload offloading', () => { expect(receivedSqsMessages.length).toBe(1) const parsedReceivedMessageBody = JSON.parse( - receivedSqsMessages[0]!.Body!, + receivedSqsMessages[0].Body!, ) as OffloadedPayloadPointerPayload // Check that message contains new payloadRef with correct store name diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts index cf5bd2a1..2e1c95dc 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts @@ -106,7 +106,7 @@ describe('SqsPermissionPublisher - payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload. expect(receivedSqsMessages.length).toBe(1) - const parsedReceivedMessageBody = JSON.parse(receivedSqsMessages[0]!.Body!) + const parsedReceivedMessageBody = JSON.parse(receivedSqsMessages[0].Body!) // Check that message contains both new payloadRef format and legacy format for backward compatibility expect(parsedReceivedMessageBody).toMatchObject({ From c60d2b1de7cadb636c05248e95b29b746f1b39d7 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:15:27 +0100 Subject: [PATCH 02/16] linting --- packages/core/lib/events/DomainEventEmitter.spec.ts | 2 +- packages/sns/lib/utils/snsInitter.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index 7272d8d4..cb01884a 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -204,7 +204,7 @@ describe('DomainEventEmitter', () => { expect(emitResult.message).toEqual({ id: expect.any(String), metadata: { - correlationId: createdEventPayload.metadata.correlationId!, + correlationId: createdEventPayload.metadata!.correlationId!, originatedFrom: 'service', producedBy: undefined, schemaVersion: '1', diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 04b2dfd8..5c1c9b40 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -111,7 +111,7 @@ export async function initSnsSqs( queueName = splitUrl[splitUrl.length - 1]! } else { // biome-ignore lint/style/noNonNullAssertion: It's ok - queueName = creationConfig.queue.QueueName! + queueName = creationConfig!.queue.QueueName! } return { From da8a6064a9c38204a932dc5117e2f49deb92eb66 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:22:18 +0100 Subject: [PATCH 03/16] test fix --- packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 97e5dae0..7101f0d2 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -168,6 +168,7 @@ describe('AmqpPermissionConsumer', () => { id: '1', messageType: 'add', }), + messageMetadata: undefined, }, ]) }) From eeb03d4c4b4cf3c6666f415fa1f58643b58b844e Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:37:14 +0100 Subject: [PATCH 04/16] removed accidental linting fixes --- packages/amqp/lib/AbstractAmqpConsumer.ts | 3 +++ .../core/lib/events/DomainEventEmitter.spec.ts | 2 +- packages/core/lib/queues/HandlerContainer.ts | 6 ++---- packages/core/test/queues/HandlerContainer.spec.ts | 14 -------------- .../lib/pubsub/AbstractPubSubConsumer.ts | 3 +++ packages/sns/lib/utils/snsInitter.ts | 2 +- ...onPublisher.multiStorePayloadOffloading.spec.ts | 2 +- ...nsPermissionPublisher.payloadOffloading.spec.ts | 2 +- packages/sns/test/utils/snsSubscriber.spec.ts | 2 +- packages/sqs/lib/fakes/TestSqsPublisher.spec.ts | 14 +++++++------- packages/sqs/lib/sqs/AbstractSqsConsumer.ts | 3 +++ ...qsPermissionPublisher.payloadOffloading.spec.ts | 2 +- 12 files changed, 24 insertions(+), 31 deletions(-) diff --git a/packages/amqp/lib/AbstractAmqpConsumer.ts b/packages/amqp/lib/AbstractAmqpConsumer.ts index 5277ede1..1cbb8d4f 100644 --- a/packages/amqp/lib/AbstractAmqpConsumer.ts +++ b/packages/amqp/lib/AbstractAmqpConsumer.ts @@ -270,6 +270,9 @@ export abstract class AbstractAmqpConsumer< return null } const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } return handler.messageLogFormatter(processedMessageMetadata.message) } diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index cb01884a..9a9e6e36 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -204,7 +204,7 @@ describe('DomainEventEmitter', () => { expect(emitResult.message).toEqual({ id: expect.any(String), metadata: { - correlationId: createdEventPayload.metadata!.correlationId!, + correlationId: createdEventPayload.metadata?.correlationId!, originatedFrom: 'service', producedBy: undefined, schemaVersion: '1', diff --git a/packages/core/lib/queues/HandlerContainer.ts b/packages/core/lib/queues/HandlerContainer.ts index cbfb8c0a..9beeefc4 100644 --- a/packages/core/lib/queues/HandlerContainer.ts +++ b/packages/core/lib/queues/HandlerContainer.ts @@ -55,8 +55,6 @@ export type Prehandler void, ) => void -export const defaultLogFormatter = (message: MessagePayloadSchema) => message - export type HandlerConfigOptions< MessagePayloadSchema extends object, ExecutionContext, @@ -98,7 +96,7 @@ export class MessageHandlerConfig< PrehandlerOutput, BarrierOutput > - public readonly messageLogFormatter: LogFormatter + public readonly messageLogFormatter?: LogFormatter public readonly preHandlerBarrier?: BarrierCallback< MessagePayloadSchema, ExecutionContext, @@ -126,7 +124,7 @@ export class MessageHandlerConfig< this.definition = eventDefinition this.messageType = options?.messageType this.handler = handler - this.messageLogFormatter = options?.messageLogFormatter ?? defaultLogFormatter + this.messageLogFormatter = options?.messageLogFormatter this.preHandlerBarrier = options?.preHandlerBarrier this.preHandlers = options?.preHandlers ?? [] } diff --git a/packages/core/test/queues/HandlerContainer.spec.ts b/packages/core/test/queues/HandlerContainer.spec.ts index fad743cf..a20c2a9e 100644 --- a/packages/core/test/queues/HandlerContainer.spec.ts +++ b/packages/core/test/queues/HandlerContainer.spec.ts @@ -106,20 +106,6 @@ describe('MessageHandlerConfigBuilder', () => { expect(config.messageLogFormatter).toBe(messageLogFormatter) expect(config.preHandlers).toEqual([]) }) - - it('should use default log formatter when not provided', () => { - const handler = () => Promise.resolve({ result: 'success' as const }) - - const config = new MessageHandlerConfig(USER_MESSAGE_SCHEMA, handler) - - const testMessage: UserMessage = { - type: 'user.created', - userId: '123', - email: 'test@example.com', - } - - expect(config.messageLogFormatter(testMessage)).toEqual(testMessage) - }) }) }) diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index 673e4c95..c2a8acda 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -869,6 +869,9 @@ export abstract class AbstractPubSubConsumer< return null } const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } return handler.messageLogFormatter(processedMessageMetadata.message) } diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index 5c1c9b40..b62c03c5 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -111,7 +111,7 @@ export async function initSnsSqs( queueName = splitUrl[splitUrl.length - 1]! } else { // biome-ignore lint/style/noNonNullAssertion: It's ok - queueName = creationConfig!.queue.QueueName! + queueName = creationConfig?.queue.QueueName! } return { diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts index 8e52e995..be3badd1 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.multiStorePayloadOffloading.spec.ts @@ -132,7 +132,7 @@ describe('SnsPermissionPublisher - multi-store payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload expect(receivedSnsMessages.length).toBe(1) const snsMessageBodyParseResult = SNS_MESSAGE_BODY_SCHEMA.safeParse( - JSON.parse(receivedSnsMessages[0].Body!), + JSON.parse(receivedSnsMessages[0]!.Body!), ) expect(snsMessageBodyParseResult.success).toBe(true) diff --git a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts index 1154f051..172e2790 100644 --- a/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sns/test/publishers/SnsPermissionPublisher.payloadOffloading.spec.ts @@ -130,7 +130,7 @@ describe('SnsPermissionPublisher - single-store payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload. expect(receivedSnsMessages.length).toBe(1) const snsMessageBodyParseResult = SNS_MESSAGE_BODY_SCHEMA.safeParse( - JSON.parse(receivedSnsMessages[0].Body!), + JSON.parse(receivedSnsMessages[0]!.Body!), ) expect(snsMessageBodyParseResult.success).toBe(true) diff --git a/packages/sns/test/utils/snsSubscriber.spec.ts b/packages/sns/test/utils/snsSubscriber.spec.ts index 9c9f3574..3a0ca021 100644 --- a/packages/sns/test/utils/snsSubscriber.spec.ts +++ b/packages/sns/test/utils/snsSubscriber.spec.ts @@ -144,7 +144,7 @@ describe('snsSubscriber', () => { const subscriptionAttributes = await getSubscriptionAttributes( snsClient, - updatedSubscription.SubscriptionArn!, + updatedSubscription!.SubscriptionArn!, ) expect(subscriptionAttributes).toEqual({ result: { diff --git a/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts b/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts index 4247d28e..0c90f55f 100644 --- a/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts +++ b/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts @@ -64,7 +64,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ totally: 'arbitrary', data: { nested: true }, @@ -95,7 +95,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ incomplete: 'message' }) }) @@ -133,7 +133,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual(complexMessage) }) }) @@ -160,7 +160,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ test: 'queueName' }) }) }) @@ -193,7 +193,7 @@ describe('TestSqsPublisher', () => { sqsConsumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ test: 'consumer' }) await consumer.close() @@ -240,7 +240,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ test: 'publisher' }) }) @@ -292,7 +292,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]!.Body!) + const body = JSON.parse(receivedMessages[0]?.Body!) expect(body).toEqual({ test: 'fifo-message' }) }) }) diff --git a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts index 474a06ae..d59d31c6 100644 --- a/packages/sqs/lib/sqs/AbstractSqsConsumer.ts +++ b/packages/sqs/lib/sqs/AbstractSqsConsumer.ts @@ -839,6 +839,9 @@ export abstract class AbstractSqsConsumer< return null } const handler = this.handlerContainer.resolveHandler(processedMessageMetadata.messageType) + if (!handler.messageLogFormatter) { + return null + } return handler.messageLogFormatter(processedMessageMetadata.message) } diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts index 2e1c95dc..cf5bd2a1 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.payloadOffloading.spec.ts @@ -106,7 +106,7 @@ describe('SqsPermissionPublisher - payload offloading', () => { // Check that the published message's body is a pointer to the offloaded payload. expect(receivedSqsMessages.length).toBe(1) - const parsedReceivedMessageBody = JSON.parse(receivedSqsMessages[0].Body!) + const parsedReceivedMessageBody = JSON.parse(receivedSqsMessages[0]!.Body!) // Check that message contains both new payloadRef format and legacy format for backward compatibility expect(parsedReceivedMessageBody).toMatchObject({ From cead1ef05a31df2ccd2db95d234fd0f2d2da56a3 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:38:08 +0100 Subject: [PATCH 05/16] removed accidental linting fixes --- packages/sqs/lib/fakes/TestSqsPublisher.spec.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts b/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts index 0c90f55f..4247d28e 100644 --- a/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts +++ b/packages/sqs/lib/fakes/TestSqsPublisher.spec.ts @@ -64,7 +64,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ totally: 'arbitrary', data: { nested: true }, @@ -95,7 +95,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ incomplete: 'message' }) }) @@ -133,7 +133,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual(complexMessage) }) }) @@ -160,7 +160,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ test: 'queueName' }) }) }) @@ -193,7 +193,7 @@ describe('TestSqsPublisher', () => { sqsConsumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ test: 'consumer' }) await consumer.close() @@ -240,7 +240,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ test: 'publisher' }) }) @@ -292,7 +292,7 @@ describe('TestSqsPublisher', () => { consumer.stop() expect(receivedMessages).toHaveLength(1) - const body = JSON.parse(receivedMessages[0]?.Body!) + const body = JSON.parse(receivedMessages[0]!.Body!) expect(body).toEqual({ test: 'fifo-message' }) }) }) From c8c8be8c49d0a1336ab0df3e9048cdb8c21b7d12 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:38:30 +0100 Subject: [PATCH 06/16] removed accidental linting fixes --- .../SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts b/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts index 299ef672..d0c56d5f 100644 --- a/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts +++ b/packages/sqs/test/publishers/SqsPermissionPublisher.multiStorePayloadOffloading.spec.ts @@ -113,7 +113,7 @@ describe('SqsPermissionPublisher - multi-store payload offloading', () => { expect(receivedSqsMessages.length).toBe(1) const parsedReceivedMessageBody = JSON.parse( - receivedSqsMessages[0].Body!, + receivedSqsMessages[0]!.Body!, ) as OffloadedPayloadPointerPayload // Check that message contains new payloadRef with correct store name From 6dabe52feda938fe588b3c1ffffa7f426984989f Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:44:55 +0100 Subject: [PATCH 07/16] tests fix --- packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts | 5 ----- packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts | 5 ----- 2 files changed, 10 deletions(-) diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 7101f0d2..232e106f 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -110,11 +110,6 @@ describe('AmqpPermissionConsumer', () => { messageType: 'add', processingResult: { status: 'consumed' }, }), - message: { - id: '1', - messageType: 'add', - timestamp: expect.any(String), - }, }, ]) }) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index 23d92c43..b25e9cac 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -453,11 +453,6 @@ describe('SqsPermissionConsumer', () => { status: 'consumed', }, }), - message: { - id: '1', - messageType: 'add', - timestamp: expect.any(String), - }, }, ]) await newConsumer.close() From 03b72207f77c25239d50f9a8b94d61edfeb91aa2 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 15:54:13 +0100 Subject: [PATCH 08/16] tests fix --- packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts index b25e9cac..13448db0 100644 --- a/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts +++ b/packages/sqs/test/consumers/SqsPermissionConsumer.spec.ts @@ -512,6 +512,9 @@ describe('SqsPermissionConsumer', () => { messageProcessingStartTimestamp: expect.any(Number), messageProcessingEndTimestamp: expect.any(Number), queueName: SqsPermissionConsumer.QUEUE_NAME, + messageMetadata: { + schemaVersions: '1.0.0', + }, message: expect.objectContaining({ id: '1', messageType: 'add', From 6ec3e15463dd2591cd100bf953505ffba2911668 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 16:03:33 +0100 Subject: [PATCH 09/16] linting --- packages/sns/lib/utils/snsInitter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sns/lib/utils/snsInitter.ts b/packages/sns/lib/utils/snsInitter.ts index b62c03c5..5c1c9b40 100644 --- a/packages/sns/lib/utils/snsInitter.ts +++ b/packages/sns/lib/utils/snsInitter.ts @@ -111,7 +111,7 @@ export async function initSnsSqs( queueName = splitUrl[splitUrl.length - 1]! } else { // biome-ignore lint/style/noNonNullAssertion: It's ok - queueName = creationConfig?.queue.QueueName! + queueName = creationConfig!.queue.QueueName! } return { From f45eaeef69e36e29506ecccb896eedc5dbca2b2f Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Thu, 18 Dec 2025 16:08:47 +0100 Subject: [PATCH 10/16] linting --- packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts | 2 +- packages/core/lib/events/DomainEventEmitter.spec.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts index 232e106f..0ee623cc 100644 --- a/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts +++ b/packages/amqp/test/consumers/AmqpPermissionConsumer.spec.ts @@ -405,7 +405,7 @@ describe('AmqpPermissionConsumer', () => { await waitAndRetry(() => errorReporterSpy.mock.calls.length > 0) expect(errorReporterSpy.mock.calls).toHaveLength(1) - expect(errorReporterSpy.mock.calls[0]?.[0]?.error).toMatchObject({ + expect(errorReporterSpy.mock.calls[0]![0]!.error).toMatchObject({ message: 'Unsupported message type: bad', }) }) diff --git a/packages/core/lib/events/DomainEventEmitter.spec.ts b/packages/core/lib/events/DomainEventEmitter.spec.ts index 9a9e6e36..cb01884a 100644 --- a/packages/core/lib/events/DomainEventEmitter.spec.ts +++ b/packages/core/lib/events/DomainEventEmitter.spec.ts @@ -204,7 +204,7 @@ describe('DomainEventEmitter', () => { expect(emitResult.message).toEqual({ id: expect.any(String), metadata: { - correlationId: createdEventPayload.metadata?.correlationId!, + correlationId: createdEventPayload.metadata!.correlationId!, originatedFrom: 'service', producedBy: undefined, schemaVersion: '1', From 3f995f843fd0f6d1ea5a01fdac9a6d6606271ca8 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 7 Jan 2026 14:26:50 +0100 Subject: [PATCH 11/16] minor types adjustment --- packages/core/lib/queues/AbstractQueueService.ts | 4 ++-- packages/core/lib/types/queueOptionsTypes.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index 5e85fe8b..e675fb23 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -344,12 +344,12 @@ export abstract class AbstractQueueService< const messageType = message ? this.resolveMessageTypeFromMessage(message) : undefined const messageDeduplicationId = message && this.messageDeduplicationIdField in message - ? // @ts-ignore + ? // @ts-expect-error message[this.messageDeduplicationId] : undefined const messageMetadata = message && this.messageMetadataField in message - ? // @ts-ignore + ? // @ts-expect-error message[this.messageMetadataField] : undefined diff --git a/packages/core/lib/types/queueOptionsTypes.ts b/packages/core/lib/types/queueOptionsTypes.ts index 8fa23172..d9d8a508 100644 --- a/packages/core/lib/types/queueOptionsTypes.ts +++ b/packages/core/lib/types/queueOptionsTypes.ts @@ -66,7 +66,7 @@ export type ProcessedMessageMetadata + messageMetadata?: Record } export interface MessageMetricsManager { From bcb2775b665e1a5caa144d5e1ff77e05d4e5fc08 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 7 Jan 2026 14:33:30 +0100 Subject: [PATCH 12/16] fixed typo in field name --- packages/core/lib/queues/AbstractQueueService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/lib/queues/AbstractQueueService.ts b/packages/core/lib/queues/AbstractQueueService.ts index e675fb23..2da2573e 100644 --- a/packages/core/lib/queues/AbstractQueueService.ts +++ b/packages/core/lib/queues/AbstractQueueService.ts @@ -345,7 +345,7 @@ export abstract class AbstractQueueService< const messageDeduplicationId = message && this.messageDeduplicationIdField in message ? // @ts-expect-error - message[this.messageDeduplicationId] + message[this.messageDeduplicationIdField] : undefined const messageMetadata = message && this.messageMetadataField in message From e8642b2b8073058b0daa90bea7ddd7aec32084ff Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Wed, 7 Jan 2026 14:42:17 +0100 Subject: [PATCH 13/16] fixed return type --- packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts index c2a8acda..5be4fff8 100644 --- a/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts +++ b/packages/gcp-pubsub/lib/pubsub/AbstractPubSubConsumer.ts @@ -864,7 +864,7 @@ export abstract class AbstractPubSubConsumer< protected override resolveMessageLog( processedMessageMetadata: ProcessedMessageMetadata, - ): unknown { + ): unknown | null { if (!processedMessageMetadata.message || !processedMessageMetadata.messageType) { return null } From 19cbb8375dafcf17a99140daf28fda9e6cedab65 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 3 Feb 2026 09:47:21 +0100 Subject: [PATCH 14/16] adjusted coverage threshold --- packages/core/vitest.config.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/vitest.config.ts b/packages/core/vitest.config.ts index 7ed17ad9..a1ab6c53 100644 --- a/packages/core/vitest.config.ts +++ b/packages/core/vitest.config.ts @@ -18,7 +18,7 @@ export default defineConfig({ thresholds: { lines: 42, functions: 51, - branches: 48, + branches: 47, statements: 42, }, }, From 9eec629d32ceab7eb7448aef6be676d0d058eb59 Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 3 Feb 2026 10:15:02 +0100 Subject: [PATCH 15/16] updated kafka consumer event name --- packages/kafka/lib/AbstractKafkaConsumer.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/kafka/lib/AbstractKafkaConsumer.ts b/packages/kafka/lib/AbstractKafkaConsumer.ts index cc69a009..c632669a 100644 --- a/packages/kafka/lib/AbstractKafkaConsumer.ts +++ b/packages/kafka/lib/AbstractKafkaConsumer.ts @@ -129,7 +129,7 @@ export abstract class AbstractKafkaConsumer< this.consumer.on('consumer:group:join', (_) => this.logger.debug(logDetails, 'Consumer is joining a group'), ) - this.consumer.on('consumer:rejoin', (_) => + this.consumer.on('consumer:group:rejoin', () => this.logger.debug(logDetails, 'Consumer is re-joining a group after a rebalance'), ) this.consumer.on('consumer:group:leave', (_) => From e3c504adca096b57eb152d0442b34b5e15a6299f Mon Sep 17 00:00:00 2001 From: Krzysztof Jamrog Date: Tue, 3 Feb 2026 15:55:15 +0100 Subject: [PATCH 16/16] README.md update --- README.md | 61 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 13690df4..c24cd50d 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,8 @@ They implement the following public methods: * `policyConfig` - SQS only - configuration for queue access policies (see [SQS Policy Configuration](#sqs-policy-configuration) for more information); * `deletionConfig` - automatic cleanup of resources; * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); - * `logMessages` - add logs for processed messages. + * `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. See [Message Logging](#message-logging) for more details. + * `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`). * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). * `messageDeduplicationConfig` - configuration for store-based message deduplication on publisher level. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication). * `enablePublisherDeduplication` - enable store-based publisher-level deduplication. For more details on setting this up, see [Publisher-level store-based-message deduplication](#publisher-level-store-based-message-deduplication). @@ -110,7 +111,8 @@ Multi-schema consumers support multiple message types via handler configs. They * `consumerOverrides` – available only for SQS consumers; * `deadLetterQueue` - available only for SQS and SNS consumers (see [Dead Letter Queue](#dead-letter-queue) for more information); * `handlerSpy` - allow awaiting certain messages to be published (see [Handler Spies](#handler-spies) for more information); - * `logMessages` - add logs for processed messages. + * `logMessages` - add debug logs for processed messages. When enabled, logs structured metadata including message id, type, timestamps, and queue name. For privacy reasons, the full message payload is not logged by default. To include custom message data in logs, configure `messageLogFormatter` on your handlers. See [Message Logging](#message-logging) for more details. + * `messageMetadataField` - which field in the message contains metadata for logging purposes (by default it is `metadata`). * `payloadStoreConfig` - configuration for payload offloading. This option enables the external storage of large message payloads to comply with message size limitations of the queue system. For more details on setting this up, see [Payload Offloading](#payload-offloading). * `concurrentConsumersAmount` - configuration for specifying the number of concurrent consumers to create. Available only for SQS and SNS consumers * `messageDeduplicationConfig` - configuration for store-based message deduplication on consumer level. For more details on setting this up, see [Consumer-level store-based-message deduplication](#consumer-level-store-based-message-deduplication). @@ -173,7 +175,9 @@ export class SqsPermissionConsumer extends AbstractSqsConsumer< preHandlerBarrier: async (message) => { // do barrier check here return true - } + }, + // Optional: customize what message data is logged (see Message Logging section) + messageLogFormatter: (message) => ({ id: message.id, type: message.type }), }, ) .addConfig(PERMISSIONS_REMOVE_MESSAGE_SCHEMA, @@ -642,6 +646,56 @@ const result = await myConsumer.handlerSpy.waitForMessageWithId('1') expect(result.processingResult).toEqual({ status: 'consumed' }) ``` +## Message Logging + +When `logMessages` is enabled, processed messages are logged at the `debug` level with structured metadata. For privacy reasons, the full message payload is **not logged by default** to avoid exposing sensitive data. + +### What is logged by default + +Each log entry includes processed message metadata: +- `messageId` - unique identifier of the message +- `messageType` - type of the message +- `queueName` - name of the queue or topic +- `messageTimestamp` - when the message was originally sent +- `messageProcessingStartTimestamp` - when processing started +- `messageProcessingEndTimestamp` - when processing completed +- `messageDeduplicationId` - deduplication id (if deduplication is enabled) +- `messageMetadata` - contents of the metadata field (configurable via `messageMetadataField`) +- `processingResult` - outcome of processing (e.g., `{ status: 'consumed' }` or `{ status: 'published' }`) + +### Custom message logging with messageLogFormatter + +If you need to include additional message data in logs, you can configure a `messageLogFormatter` on your handler. This formatter receives the message and returns the data to be logged: + +```typescript +new MessageHandlerConfigBuilder() + .addConfig( + MY_MESSAGE_SCHEMA, + async (message, context) => { + // handler logic + return { result: 'success' } + }, + { + // Only log specific fields, excluding sensitive data + messageLogFormatter: (message) => ({ + id: message.id, + type: message.type, + // Exclude sensitive fields like email, password, etc. + }), + }, + ) + .build() +``` + +When a `messageLogFormatter` is provided, its output is included in the log under the `message` key alongside the processed message metadata. + +### Configuration options + +| Option | Default | Description | +|--------|---------|-------------| +| `logMessages` | `false` | Enable debug logging for processed messages | +| `messageMetadataField` | `'metadata'` | Field in the message containing metadata to include in logs | + ## Payload Offloading Payload offloading allows you to manage large message payloads by storing them in external storage, bypassing any message size restrictions imposed by queue systems. @@ -773,6 +827,7 @@ It needs to implement the following methods: - `messageProcessingStartTimestamp` - the timestamp when the processing of the message started - `messageProcessingEndTimestamp` - the timestamp when the processing of the message finished - `messageDeduplicationId` - the deduplication id of the message, in case deduplication is enabled + - `messageMetadata` - contents of the message metadata field (configurable via `messageMetadataField`) See [@message-queue-toolkit/metrics](packages/metrics/README.md) for concrete implementations