diff --git a/powertools-e2e-tests/handlers/largemessage-functional/pom.xml b/powertools-e2e-tests/handlers/largemessage-functional/pom.xml new file mode 100644 index 000000000..4afb633f2 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-functional/pom.xml @@ -0,0 +1,46 @@ + + 4.0.0 + + + software.amazon.lambda + e2e-test-handlers-parent + 2.5.0 + + + e2e-test-handler-largemessage-functional + jar + E2E test handler – Large message functional + + + + software.amazon.awssdk + dynamodb + + + software.amazon.lambda + powertools-large-messages + + + software.amazon.lambda + powertools-logging-log4j + + + software.amazon.lambda + powertools-logging + + + com.amazonaws + aws-lambda-java-events + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + diff --git a/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java b/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java new file mode 100644 index 000000000..05a336500 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java @@ -0,0 +1,85 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.e2e; + +import static software.amazon.lambda.powertools.logging.PowertoolsLogging.withLogging; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbClient; +import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.utils.BinaryUtils; +import software.amazon.awssdk.utils.Md5Utils; +import software.amazon.lambda.powertools.largemessages.LargeMessages; + +public class Function implements RequestHandler { + + private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS"); + private DynamoDbClient client; + + public Function() { + if (client == null) { + client = DynamoDbClient.builder() + .httpClient(UrlConnectionHttpClient.builder().build()) + .region(Region.of(System.getenv("AWS_REGION"))) + .build(); + } + } + + public SQSBatchResponse handleRequest(SQSEvent event, Context context) { + return withLogging(context, () -> { + for (SQSMessage message : event.getRecords()) { + LargeMessages.processLargeMessage(message, msg -> processRawMessage(msg, context)); + } + return SQSBatchResponse.builder().build(); + }); + } + + private Void processRawMessage(SQSMessage sqsMessage, Context context) { + String bodyMD5 = md5(sqsMessage.getBody()); + if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) { + throw new SecurityException( + String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(), + bodyMD5)); + } + + Map item = new HashMap<>(); + item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build()); + item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build()); + item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build()); + item.put("bodySize", + AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length)) + .build()); + + client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build()); + + return null; + } + + private String md5(String message) { + return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8))); + } +} diff --git a/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml b/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml new file mode 100644 index 000000000..8925f70b9 --- /dev/null +++ b/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/powertools-e2e-tests/handlers/pom.xml b/powertools-e2e-tests/handlers/pom.xml index 86e867548..f171422c5 100644 --- a/powertools-e2e-tests/handlers/pom.xml +++ b/powertools-e2e-tests/handlers/pom.xml @@ -27,6 +27,7 @@ batch largemessage + largemessage-functional largemessage_idempotent logging-log4j logging-logback diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java index 74247ca2e..0de2dca60 100644 --- a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java +++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java @@ -16,9 +16,10 @@ import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import software.amazon.lambda.powertools.testutils.Infrastructure; import software.amazon.lambda.powertools.testutils.RetryUtils; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) class LargeMessageE2ET { private static final Logger LOG = LoggerFactory.getLogger(LargeMessageE2ET.class); @@ -55,25 +57,34 @@ class LargeMessageE2ET { .region(region) .build(); - private static Infrastructure infrastructure; - private static String functionName; - private static String bucketName; - private static String queueUrl; - private static String tableName; + private Infrastructure infrastructure; + private String functionName; + private String bucketName; + private String queueUrl; + private String tableName; private String messageId; + private String currentPathToFunction; + + private void setupInfrastructure(String pathToFunction) { + // Do not re-deploy the same function + if (pathToFunction.equals(currentPathToFunction)) { + return; + } + + // Destroy any existing infrastructure before re-deploying + if (infrastructure != null) { + infrastructure.destroy(); + } - @BeforeAll - @Timeout(value = 5, unit = TimeUnit.MINUTES) - static void setup() { String random = UUID.randomUUID().toString().substring(0, 6); bucketName = "largemessagebucket" + random; String queueName = "largemessagequeue" + random; infrastructure = Infrastructure.builder() - .testName(LargeMessageE2ET.class.getSimpleName()) + .testName(LargeMessageE2ET.class.getSimpleName() + "-" + pathToFunction) .queue(queueName) .largeMessagesBucket(bucketName) - .pathToFunction("largemessage") + .pathToFunction(pathToFunction) .timeoutInSeconds(60) .build(); @@ -81,19 +92,24 @@ static void setup() { functionName = outputs.get(FUNCTION_NAME_OUTPUT); queueUrl = outputs.get("QueueURL"); tableName = outputs.get("TableNameForAsyncTests"); + currentPathToFunction = pathToFunction; - LOG.info("Testing '" + LargeMessageE2ET.class.getSimpleName() + "'"); + LOG.info("Testing '{}' with {}", LargeMessageE2ET.class.getSimpleName(), pathToFunction); } @AfterAll - static void tearDown() { + void cleanup() { if (infrastructure != null) { infrastructure.destroy(); } } @AfterEach - void reset() { + void tearDown() { + reset(); + } + + private void reset() { if (messageId != null) { Map itemToDelete = new HashMap<>(); itemToDelete.put("functionName", AttributeValue.builder().s(functionName).build()); @@ -103,8 +119,12 @@ void reset() { } } - @Test - void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException { + @ParameterizedTest + @ValueSource(strings = { "largemessage", "largemessage-functional" }) + @Timeout(value = 5, unit = TimeUnit.MINUTES) + void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws IOException { + setupInfrastructure(pathToFunction); + // GIVEN final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, bucketName); @@ -146,8 +166,12 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException { assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75"); } - @Test - void smallSQSMessage_shouldNotReadFromS3() { + @ParameterizedTest + @ValueSource(strings = { "largemessage", "largemessage-functional" }) + @Timeout(value = 5, unit = TimeUnit.MINUTES) + void smallSQSMessage_shouldNotReadFromS3(String pathToFunction) { + setupInfrastructure(pathToFunction); + // GIVEN final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, bucketName); diff --git a/powertools-large-messages/pom.xml b/powertools-large-messages/pom.xml index a405941d7..bc7bbf03a 100644 --- a/powertools-large-messages/pom.xml +++ b/powertools-large-messages/pom.xml @@ -14,11 +14,11 @@ --> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 -A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier. + A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier. software.amazon.lambda @@ -41,6 +41,13 @@ software.amazon.lambda powertools-common + + software.amazon.lambda + powertools-common + ${project.version} + test-jar + test + com.amazonaws aws-lambda-java-events @@ -102,6 +109,11 @@ mockito-core test + + org.mockito + mockito-junit-jupiter + test + org.slf4j slf4j-simple diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java index 4e556966c..eb5b368a5 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java @@ -20,15 +20,15 @@ import java.lang.annotation.Target; /** - *

Use this annotation to handle large messages (> 256 KB) from SQS or SNS. + *

Use this annotation to handle large messages (> 1 MB) from SQS or SNS. * When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.

* *

{@code @LargeMessage} automatically retrieves and deletes messages * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib} * client libraries.

* - *

This version of the {@code @LargeMessage} is compatible with version - * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.

+ *

This version of the {@code @LargeMessage} is compatible with version 1.1.0+ and 2.0.0+ + * of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.

*
*

Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}. *
@@ -54,9 +54,11 @@ * *

* - *

Note 1: Retrieving payloads and deleting objects from S3 will increase the duration of the + *

Note 1: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating + * the large blob in memory. The message body will be replaced with the S3 object content.

+ *

Note 2: Retrieving payloads and deleting objects from S3 will increase the duration of the * Lambda function.

- *

Note 2: Make sure to configure your function with enough memory to be able to retrieve S3 objects.

+ *

Note 3: Make sure to configure your function with enough memory to be able to retrieve S3 objects.

*/ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java new file mode 100644 index 000000000..52675d3eb --- /dev/null +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java @@ -0,0 +1,156 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.largemessages; + +import java.util.Optional; +import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessor; +import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessorFactory; + +/** + * Functional API for processing large messages without AspectJ. + *

+ * Use this class to handle large messages (> 1 MB) from SQS or SNS. + * When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record. + *

+ * {@code LargeMessages} automatically retrieves and optionally deletes messages + * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib} + * client libraries. + *

+ * This version is compatible with version 1.1.0+ and 2.0.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}. + *

+ * SQS Example: + *

+ * public class SqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
+ *     private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
+ *
+ *     public SqsBatchHandler() {
+ *         handler = new BatchMessageHandlerBuilder()
+ *                 .withSqsBatchHandler()
+ *                 .buildWithRawMessageHandler(this::processMessage);
+ *     }
+ *
+ *     @Override
+ *     public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
+ *         return handler.processBatch(sqsEvent, context);
+ *     }
+ *
+ *     private void processMessage(SQSEvent.SQSMessage sqsMessage) {
+ *         LargeMessages.processLargeMessage(sqsMessage, this::handleProcessedMessage);
+ *     }
+ *
+ *     private void handleProcessedMessage(SQSEvent.SQSMessage processedMessage) {
+ *         // processedMessage.getBody() will contain the content of the S3 Object
+ *     }
+ * }
+ * 
+ *

+ * To disable the deletion of S3 objects: + *

+ * LargeMessages.processLargeMessage(sqsMessage, this::handleProcessedMessage, false);
+ * 
+ *

+ * For multi-argument methods, use a lambda to pass additional parameters: + *

+ * public void handleRequest(SQSEvent event, Context context) {
+ *     event.getRecords().forEach(message ->
+ *         LargeMessages.processLargeMessage(message, processedMsg -> processMessage(processedMsg, context))
+ *     );
+ * }
+ *
+ * private void processMessage(SQSMessage processedMessage, Context context) {
+ *     // processedMessage.getBody() will contain the content of the S3 Object
+ * }
+ * 
+ *

+ * Note 1: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating + * the large blob in memory. The message body will be replaced with the S3 object content. + *

+ * Note 2: Retrieving payloads and deleting objects from S3 will increase the duration of the Lambda function. + *

+ * Note 3: Make sure to configure your function with enough memory to be able to retrieve S3 objects. + * + * @see LargeMessage + */ +public final class LargeMessages { + + private static final Logger LOG = LoggerFactory.getLogger(LargeMessages.class); + + private LargeMessages() { + // Utility class + } + + /** + * Process a large message and execute the function with the processed message. + *

+ * The S3 object will be deleted after processing (default behavior). + * To disable S3 object deletion, use {@link #processLargeMessage(Object, Function, boolean)}. + *

+ * Example usage: + *

+     * String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, this::handleMessage);
+     * String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, processedMsg -> processOrder(processedMsg, orderId, amount));
+     * 
+ * + * @param message the message to process (SQSMessage or SNSRecord) + * @param function the function to execute with the processed message + * @param the message type + * @param the return type of the function + * @return the result of the function execution + */ + public static R processLargeMessage(T message, Function function) { + return processLargeMessage(message, function, true); + } + + /** + * Process a large message and execute the function with the processed message. + *

+ * Example usage: + *

+     * String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, this::handleMessage, false);
+     * String returnValueOfFunction = LargeMessages.processLargeMessage(sqsMessage, processedMsg -> processOrder(processedMsg, orderId, amount), false);
+     * 
+ * + * @param message the message to process (SQSMessage or SNSRecord) + * @param function the function to execute with the processed message + * @param deleteS3Object whether to delete the S3 object after processing + * @param the message type + * @param the return type of the function + * @return the result of the function execution + */ + public static R processLargeMessage(T message, Function function, boolean deleteS3Object) { + Optional> processor = LargeMessageProcessorFactory.get(message); + + if (!processor.isPresent()) { + LOG.warn("Unsupported message type [{}], proceeding without large message processing", + message.getClass()); + return function.apply(message); + } + + try { + @SuppressWarnings("unchecked") + LargeMessageProcessor typedProcessor = (LargeMessageProcessor) processor.get(); + return typedProcessor.process(message, function::apply, deleteS3Object); + } catch (RuntimeException e) { + throw e; + } catch (Throwable t) { + throw new LargeMessageProcessingException("Failed to process large message", t); + } + } +} diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java index 2aa81691f..0a8b93095 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspect.java @@ -15,12 +15,14 @@ package software.amazon.lambda.powertools.largemessages.internal; import java.util.Optional; + import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import software.amazon.lambda.powertools.largemessages.LargeMessage; /** @@ -31,17 +33,17 @@ public class LargeMessageAspect { private static final Logger LOG = LoggerFactory.getLogger(LargeMessageAspect.class); - @SuppressWarnings({"EmptyMethod"}) + @SuppressWarnings({ "EmptyMethod" }) @Pointcut("@annotation(largeMessage)") public void callAt(LargeMessage largeMessage) { + // Pointcut method - body intentionally empty } + @SuppressWarnings("unchecked") @Around(value = "callAt(largeMessage) && execution(@LargeMessage * *.*(..))", argNames = "pjp,largeMessage") - public Object around(ProceedingJoinPoint pjp, - LargeMessage largeMessage) throws Throwable { + public Object around(ProceedingJoinPoint pjp, LargeMessage largeMessage) throws Throwable { Object[] proceedArgs = pjp.getArgs(); - // we need a message to process if (proceedArgs.length == 0) { LOG.warn("@LargeMessage annotation is placed on a method without any message to process, proceeding"); return pjp.proceed(proceedArgs); @@ -56,7 +58,8 @@ public Object around(ProceedingJoinPoint pjp, return pjp.proceed(proceedArgs); } - return largeMessageProcessor.get().process(pjp, largeMessage.deleteS3Object()); + return ((LargeMessageProcessor) largeMessageProcessor.get()).process(message, + msg -> pjp.proceed(proceedArgs), largeMessage.deleteS3Object()); } } diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageFunction.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageFunction.java new file mode 100644 index 000000000..1690eedac --- /dev/null +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageFunction.java @@ -0,0 +1,35 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.largemessages.internal; + +/** + * Functional interface for large message processing. + *

+ * This interface is similar to {@link java.util.function.Function} but throws {@link Throwable} + * instead of no exceptions. This is necessary to support AspectJ's {@code ProceedingJoinPoint.proceed()} + * which throws {@code Throwable}, allowing exceptions to bubble up naturally without wrapping. + *

+ * This interface should not be exposed to user-facing APIs such as + * {@link software.amazon.lambda.powertools.largemessages.LargeMessages}. These should use plain + * {@link java.util.function.Function}. + * + * @param the input type (message type) + * @param the return type of the function + */ +@FunctionalInterface +public interface LargeMessageFunction { + @SuppressWarnings("java:S112") // Throwable is required for AspectJ compatibility + R apply(T processedMessage) throws Throwable; +} diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java index 5478931f1..c41af0cea 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessor.java @@ -17,9 +17,10 @@ import static java.lang.String.format; import java.nio.charset.StandardCharsets; -import org.aspectj.lang.ProceedingJoinPoint; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; @@ -28,33 +29,44 @@ import software.amazon.payloadoffloading.S3Dao; /** - * Abstract processor for Large Messages. Handle the download from S3 and replace the actual S3 pointer with the content - * of the S3 Object leveraging the payloadoffloading library. + * Abstract processor for Large Messages. + *

+ * Handles the download from S3 and replaces the S3 pointer with the actual content + * of the S3 Object, leveraging the payloadoffloading library. * - * @param any message type that support Large Messages with S3 pointers - * ({@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} and {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord} at the moment) + * @param any message type that supports Large Messages with S3 pointers + * ({@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} + * and {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}) */ -abstract class LargeMessageProcessor { +public abstract class LargeMessageProcessor { protected static final String RESERVED_ATTRIBUTE_NAME = "ExtendedPayloadSize"; private static final Logger LOG = LoggerFactory.getLogger(LargeMessageProcessor.class); private final S3Client s3Client = LargeMessageConfig.get().getS3Client(); private final S3BackedPayloadStore payloadStore = new S3BackedPayloadStore(new S3Dao(s3Client), "DUMMY"); - public Object process(ProceedingJoinPoint pjp, boolean deleteS3Object) throws Throwable { - Object[] proceedArgs = pjp.getArgs(); - T message = (T) proceedArgs[0]; - + /** + * Process a large message using a functional interface. + * + * @param message the message to process + * @param function the function to execute with the processed message + * @param deleteS3Object whether to delete the S3 object after processing + * @param the return type of the wrapped function + * @return the result of the function execution + * @throws Throwable if an error occurs during processing + */ + public R process(T message, LargeMessageFunction function, boolean deleteS3Object) throws Throwable { if (!isLargeMessage(message)) { LOG.warn("Not a large message, proceeding"); - return pjp.proceed(proceedArgs); + return function.apply(message); } String payloadPointer = getMessageContent(message); if (payloadPointer == null) { LOG.warn("No content in the message, proceeding"); - return pjp.proceed(proceedArgs); + return function.apply(message); } + // legacy attribute (sqs only) payloadPointer = payloadPointer.replace("com.amazon.sqs.javamessaging.MessageS3Pointer", "software.amazon.payloadoffloading.PayloadS3Pointer"); @@ -73,7 +85,7 @@ public Object process(ProceedingJoinPoint pjp, boolean deleteS3Object) throws Th updateMessageContent(message, s3ObjectContent); removeLargeMessageAttributes(message); - Object response = pjp.proceed(proceedArgs); + R result = function.apply(message); if (deleteS3Object) { if (LOG.isInfoEnabled()) { @@ -82,45 +94,45 @@ public Object process(ProceedingJoinPoint pjp, boolean deleteS3Object) throws Th deleteS3Object(payloadPointer); } - return response; + return result; } /** - * Retrieve the message id + * Retrieve the message ID. * - * @param message the message itself - * @return the id of the message (String format) + * @param message the message + * @return the message ID */ protected abstract String getMessageId(T message); /** - * Retrieve the content of the message (ex: body of an SQSMessage) + * Retrieve the content of the message (e.g., body of an SQSMessage). * - * @param message the message itself - * @return the content of the message (String format) + * @param message the message + * @return the message content */ protected abstract String getMessageContent(T message); /** - * Update the message content of the message (ex: body of an SQSMessage) + * Update the message content (e.g., body of an SQSMessage). * - * @param message the message itself - * @param messageContent the new content of the message (String format) + * @param message the message + * @param messageContent the new message content */ protected abstract void updateMessageContent(T message, String messageContent); /** - * Check if the message is actually a large message (indicator in message attributes) + * Check if the message is a large message (based on message attributes). * - * @param message the message itself - * @return true if the message is a large message + * @param message the message + * @return true if the message is a large message, false otherwise */ protected abstract boolean isLargeMessage(T message); /** - * Remove the large message indicator (in message attributes) + * Remove the large message indicator from message attributes. * - * @param message the message itself + * @param message the message */ protected abstract void removeLargeMessageAttributes(T message); diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactory.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactory.java index 26c33738a..06ee92968 100644 --- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactory.java +++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageProcessorFactory.java @@ -14,14 +14,15 @@ package software.amazon.lambda.powertools.largemessages.internal; +import java.util.Optional; + import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import java.util.Optional; -class LargeMessageProcessorFactory { +public final class LargeMessageProcessorFactory { private LargeMessageProcessorFactory() { - // not intended to be instantiated + // Utility class } public static Optional> get(Object message) { diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java new file mode 100644 index 000000000..7bd3e80e2 --- /dev/null +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/LargeMessagesTest.java @@ -0,0 +1,287 @@ +/* + * Copyright 2023 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package software.amazon.lambda.powertools.largemessages; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNS; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.MessageAttribute; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; + +@ExtendWith(MockitoExtension.class) +class LargeMessagesTest { + + private static final String BIG_MSG = "A biiiiiiiig message"; + private static final String BUCKET_NAME = "bucketname"; + private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf"; + private static final String BIG_MESSAGE_BODY = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\", {\"s3BucketName\":\"" + + BUCKET_NAME + + "\", \"s3Key\":\"" + BUCKET_KEY + "\"}]"; + + @Mock + private S3Client s3Client; + + @BeforeEach + void init() throws NoSuchFieldException, IllegalAccessException { + // need to clean the s3Client with introspection (singleton) + Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); + client.setAccessible(true); + client.set(LargeMessageConfig.get(), null); + LargeMessageConfig.init().withS3Client(s3Client); + } + + @Test + void testProcessLargeSQSMessage_shouldRetrieveFromS3AndDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody); + + // then + assertThat(result).isEqualTo(BIG_MSG); + ArgumentCaptor delete = ArgumentCaptor.forClass(DeleteObjectRequest.class); + verify(s3Client).deleteObject(delete.capture()); + assertThat(delete.getValue().bucket()).isEqualTo(BUCKET_NAME); + assertThat(delete.getValue().key()).isEqualTo(BUCKET_KEY); + } + + @Test + void testProcessLargeSQSMessage_withDeleteDisabled_shouldNotDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody, false); + + // then + assertThat(result).isEqualTo(BIG_MSG); + verify(s3Client, never()).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testProcessLargeSNSMessage_shouldRetrieveFromS3AndDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SNSRecord snsRecord = snsRecordWithMessage(BIG_MESSAGE_BODY, true); + + // when + String result = LargeMessages.processLargeMessage(snsRecord, msg -> msg.getSNS().getMessage()); + + // then + assertThat(result).isEqualTo(BIG_MSG); + verify(s3Client).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testProcessSmallMessage_shouldNotInteractWithS3() { + // given + SQSMessage sqsMessage = sqsMessageWithBody("Small message", false); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody); + + // then + assertThat(result).isEqualTo("Small message"); + verifyNoInteractions(s3Client); + } + + @Test + void testProcessUnsupportedMessageType_shouldCallHandlerDirectly() { + // given + KinesisEventRecord kinesisRecord = new KinesisEventRecord(); + kinesisRecord.setEventID("kinesis-123"); + + // when + String result = LargeMessages.processLargeMessage(kinesisRecord, KinesisEventRecord::getEventID); + + // then + assertThat(result).isEqualTo("kinesis-123"); + verifyNoInteractions(s3Client); + } + + @Test + void testProcessMessageWithNullBody_shouldCallHandler() { + // given + SQSMessage sqsMessage = sqsMessageWithBody(null, true); + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody); + + // then + assertThat(result).isNull(); + verifyNoInteractions(s3Client); + } + + @Test + void testProcessMessage_whenS3GetFails_shouldThrowException() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))) + .thenThrow(S3Exception.create("Access denied", new Exception("Permission denied"))); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("Failed processing S3 record"); + } + + @Test + void testProcessMessage_whenS3DeleteFails_shouldThrowException() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + when(s3Client.deleteObject(any(DeleteObjectRequest.class))) + .thenThrow(S3Exception.create("Access denied", new Exception("Permission denied"))); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> LargeMessages.processLargeMessage(sqsMessage, SQSMessage::getBody)) + .isInstanceOf(LargeMessageProcessingException.class) + .hasMessageContaining("Failed deleting S3 record"); + } + + @Test + void testProcessMessage_whenHandlerThrowsRuntimeException_shouldPropagate() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + + // when / then + assertThatThrownBy(() -> LargeMessages.processLargeMessage(sqsMessage, msg -> { + throw new IllegalStateException("Handler error"); + })) + .isInstanceOf(IllegalStateException.class) + .hasMessage("Handler error"); + } + + @Test + void testProcessLargeMessage_withMultiParam_shouldRetrieveFromS3AndDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + String orderId = "order-123"; + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, msg -> processOrderSimple(msg, orderId)); + + // then + assertThat(result).isEqualTo("order-123-processed"); + verify(s3Client).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testProcessLargeMessage_withMultiParamAndDeleteDisabled_shouldNotDelete() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + String orderId = "order-456"; + + // when + String result = LargeMessages.processLargeMessage(sqsMessage, msg -> processOrderSimple(msg, orderId), false); + + // then + assertThat(result).isEqualTo("order-456-processed"); + verify(s3Client, never()).deleteObject(any(DeleteObjectRequest.class)); + } + + @Test + void testProcessLargeMessage_shouldModifyMessageInPlace() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + String originalBody = sqsMessage.getBody(); + + // when + LargeMessages.processLargeMessage(sqsMessage, msg -> { + assertThat(msg.getBody()).isEqualTo(BIG_MSG); + return null; + }); + + // then - verify the original message object was modified + assertThat(sqsMessage.getBody()).isEqualTo(BIG_MSG); + assertThat(sqsMessage.getBody()).isNotEqualTo(originalBody); + } + + private String processOrderSimple(SQSMessage message, String orderId) { + assertThat(message.getBody()).isEqualTo(BIG_MSG); + return orderId + "-processed"; + } + + private ResponseInputStream s3ObjectWithLargeMessage() { + return new ResponseInputStream<>(GetObjectResponse.builder().build(), + AbortableInputStream.create(new ByteArrayInputStream(BIG_MSG.getBytes()))); + } + + private SQSMessage sqsMessageWithBody(String messageBody, boolean largeMessage) { + SQSMessage sqsMessage = new SQSMessage(); + sqsMessage.setBody(messageBody); + if (messageBody != null) { + sqsMessage.setMd5OfBody("dummy-md5"); + } + + if (largeMessage) { + Map attributeMap = new HashMap<>(); + MessageAttribute payloadAttribute = new MessageAttribute(); + payloadAttribute.setStringValue("300450"); + payloadAttribute.setDataType("Number"); + attributeMap.put("ExtendedPayloadSize", payloadAttribute); + + sqsMessage.setMessageAttributes(attributeMap); + sqsMessage.setMd5OfMessageAttributes("dummy-md5"); + } + return sqsMessage; + } + + private SNSRecord snsRecordWithMessage(String messageBody, boolean largeMessage) { + SNS sns = new SNS().withMessage(messageBody); + if (largeMessage) { + sns.setMessageAttributes(Collections.singletonMap("ExtendedPayloadSize", + new SNSEvent.MessageAttribute())); + } + return new SNSRecord().withSns(sns); + } +} diff --git a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java index c364a89d9..b84709ddc 100644 --- a/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java +++ b/powertools-large-messages/src/test/java/software/amazon/lambda/powertools/largemessages/internal/LargeMessageAspectTest.java @@ -22,17 +22,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.openMocks; import static software.amazon.lambda.powertools.largemessages.internal.LargeSQSMessageProcessor.calculateMessageAttributesMd5; import static software.amazon.lambda.powertools.largemessages.internal.LargeSQSMessageProcessor.calculateMessageBodyMd5; -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; -import com.amazonaws.services.lambda.runtime.events.SNSEvent; -import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNS; -import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; -import com.amazonaws.services.lambda.runtime.events.SQSEvent.MessageAttribute; -import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import java.io.ByteArrayInputStream; import java.lang.reflect.Field; import java.nio.ByteBuffer; @@ -41,11 +33,22 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Consumer; + import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.amazonaws.services.lambda.runtime.events.SNSEvent; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNS; +import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.MessageAttribute; +import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.services.s3.S3Client; @@ -53,11 +56,13 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.lambda.powertools.common.stubs.TestLambdaContext; import software.amazon.lambda.powertools.largemessages.LargeMessage; import software.amazon.lambda.powertools.largemessages.LargeMessageConfig; import software.amazon.lambda.powertools.largemessages.LargeMessageProcessingException; -public class LargeMessageAspectTest { +@ExtendWith(MockitoExtension.class) +class LargeMessageAspectTest { private static final String BIG_MSG = "A biiiiiiiig message"; private static final String BIG_MSG_MD5 = "919ebd392d8cb7161f95cb612a903d42"; @@ -65,19 +70,17 @@ public class LargeMessageAspectTest { private static final String BUCKET_NAME = "bucketname"; private static final String BUCKET_KEY = "c71eb2ae-37e0-4265-8909-32f4153faddf"; - private static final String BIG_MESSAGE_BODY = - "[\"software.amazon.payloadoffloading.PayloadS3Pointer\", {\"s3BucketName\":\"" + BUCKET_NAME + - "\", \"s3Key\":\"" + BUCKET_KEY + "\"}]"; + private static final String BIG_MESSAGE_BODY = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\", {\"s3BucketName\":\"" + + BUCKET_NAME + + "\", \"s3Key\":\"" + BUCKET_KEY + "\"}]"; @Mock private S3Client s3Client; - @Mock - private Context context; + + private final TestLambdaContext context = new TestLambdaContext(); @BeforeEach - public void init() throws NoSuchFieldException, IllegalAccessException { - openMocks(this); - setupContext(); + void init() throws NoSuchFieldException, IllegalAccessException { // need to clean the s3Client with introspection (singleton) Field client = LargeMessageConfig.class.getDeclaredField("s3Client"); client.setAccessible(true); @@ -86,13 +89,13 @@ public void init() throws NoSuchFieldException, IllegalAccessException { } @LargeMessage - private String processSQSMessage(SQSMessage sqsMessage, Context context) { + private String processSQSMessage(SQSMessage sqsMessage, TestLambdaContext context) { return sqsMessage.getBody(); } @LargeMessage private String processSQSMessageWithMd5Checks(SQSMessage transformedMessage, String initialBodyMD5, - String initialAttributesMD5) { + String initialAttributesMD5) { assertThat(transformedMessage.getMd5OfBody()).isNotEqualTo(initialBodyMD5); assertThat(transformedMessage.getMd5OfBody()).isEqualTo(BIG_MSG_MD5); @@ -108,7 +111,7 @@ private String processSNSMessageWithoutContext(SNSRecord snsRecord) { } @LargeMessage(deleteS3Object = false) - private String processSQSMessageNoDelete(SQSMessage sqsMessage, Context context) { + private String processSQSMessageNoDelete(SQSMessage sqsMessage, TestLambdaContext context) { return sqsMessage.getBody(); } @@ -122,8 +125,15 @@ private String processNoMessage() { return "Hello World"; } + @LargeMessage + private void verifyMessageObjectIsModified(SQSMessage sqsMessage) { + // This test verifies the message object itself is modified, not a copy + assertThat(sqsMessage.getBody()).isEqualTo(BIG_MSG); + assertThat(sqsMessage.getMd5OfBody()).isEqualTo(BIG_MSG_MD5); + } + @Test - public void testLargeSQSMessageWithDefaultDeletion() { + void testLargeSQSMessageWithDefaultDeletion() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); @@ -136,8 +146,7 @@ public void testLargeSQSMessageWithDefaultDeletion() { ArgumentCaptor delete = ArgumentCaptor.forClass(DeleteObjectRequest.class); verify(s3Client).deleteObject(delete.capture()); Assertions.assertThat(delete.getValue()) - .satisfies((Consumer) deleteObjectRequest -> - { + .satisfies((Consumer) deleteObjectRequest -> { assertThat(deleteObjectRequest.bucket()) .isEqualTo(BUCKET_NAME); @@ -147,7 +156,7 @@ public void testLargeSQSMessageWithDefaultDeletion() { } @Test - public void testLargeSQSMessage_shouldChangeMd5OfBodyAndAttributes() { + void testLargeSQSMessage_shouldChangeMd5OfBodyAndAttributes() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); @@ -179,12 +188,12 @@ public void testLargeSQSMessage_shouldChangeMd5OfBodyAndAttributes() { } @Test - public void testLargeSNSMessageWithDefaultDeletion() { + void testLargeSNSMessageWithDefaultDeletion() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); SNSRecord snsRecord = snsRecordWithMessage(BIG_MESSAGE_BODY, true); - //when + // when String message = processSNSMessageWithoutContext(snsRecord); // then @@ -192,8 +201,7 @@ public void testLargeSNSMessageWithDefaultDeletion() { ArgumentCaptor delete = ArgumentCaptor.forClass(DeleteObjectRequest.class); verify(s3Client).deleteObject(delete.capture()); Assertions.assertThat(delete.getValue()) - .satisfies((Consumer) deleteObjectRequest -> - { + .satisfies((Consumer) deleteObjectRequest -> { assertThat(deleteObjectRequest.bucket()) .isEqualTo(BUCKET_NAME); @@ -203,7 +211,7 @@ public void testLargeSNSMessageWithDefaultDeletion() { } @Test - public void testLargeSQSMessageWithNoDeletion_shouldNotDelete() { + void testLargeSQSMessageWithNoDeletion_shouldNotDelete() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); @@ -217,7 +225,7 @@ public void testLargeSQSMessageWithNoDeletion_shouldNotDelete() { } @Test - public void testKinesisMessage_shouldProceedWithoutS3() { + void testKinesisMessage_shouldProceedWithoutS3() { // given KinesisEventRecord kinesisEventRecord = new KinesisEventRecord(); kinesisEventRecord.setEventID("kinesis_id1234567890"); @@ -231,7 +239,7 @@ public void testKinesisMessage_shouldProceedWithoutS3() { } @Test - public void testNoMessage_shouldProceedWithoutS3() { + void testNoMessage_shouldProceedWithoutS3() { // when String message = processNoMessage(); @@ -241,7 +249,7 @@ public void testNoMessage_shouldProceedWithoutS3() { } @Test - public void testSmallMessage_shouldProceedWithoutS3() { + void testSmallMessage_shouldProceedWithoutS3() { // given SQSMessage sqsMessage = sqsMessageWithBody("This is small message", false); @@ -255,7 +263,7 @@ public void testSmallMessage_shouldProceedWithoutS3() { } @Test - public void testNullMessage_shouldProceedWithoutS3() { + void testNullMessage_shouldProceedWithoutS3() { // given SQSMessage sqsMessage = sqsMessageWithBody(null, true); @@ -268,7 +276,7 @@ public void testNullMessage_shouldProceedWithoutS3() { } @Test - public void testGetS3ObjectException_shouldThrowLargeMessageProcessingException() { + void testGetS3ObjectException_shouldThrowLargeMessageProcessingException() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenThrow(S3Exception.create("Permission denied", new Exception("User is not allowed to access bucket " + BUCKET_NAME))); @@ -281,7 +289,7 @@ public void testGetS3ObjectException_shouldThrowLargeMessageProcessingException( } @Test - public void testDeleteS3ObjectException_shouldThrowLargeMessageProcessingException() { + void testDeleteS3ObjectException_shouldThrowLargeMessageProcessingException() { // given when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); when(s3Client.deleteObject(any(DeleteObjectRequest.class))).thenThrow(S3Exception.create("Permission denied", @@ -294,6 +302,22 @@ public void testDeleteS3ObjectException_shouldThrowLargeMessageProcessingExcepti .hasMessage(format("Failed deleting S3 record [%s]", BIG_MESSAGE_BODY)); } + @Test + void testMessageObjectIsModifiedInPlace() { + // given + when(s3Client.getObject(any(GetObjectRequest.class))).thenReturn(s3ObjectWithLargeMessage()); + SQSMessage sqsMessage = sqsMessageWithBody(BIG_MESSAGE_BODY, true); + String originalBody = sqsMessage.getBody(); + + // when + verifyMessageObjectIsModified(sqsMessage); + + // then - verify the original message object was modified + assertThat(sqsMessage.getBody()).isEqualTo(BIG_MSG); + assertThat(sqsMessage.getBody()).isNotEqualTo(originalBody); + assertThat(sqsMessage.getMd5OfBody()).isEqualTo(BIG_MSG_MD5); + } + private ResponseInputStream s3ObjectWithLargeMessage() { return new ResponseInputStream<>(GetObjectResponse.builder().build(), AbortableInputStream.create(new ByteArrayInputStream(BIG_MSG.getBytes()))); @@ -304,7 +328,7 @@ private SQSMessage sqsMessageWithBody(String messageBody, boolean largeMessage) } private SQSMessage sqsMessageWithBody(String messageBody, boolean largeMessage, - Map optionalAttributes) { + Map optionalAttributes) { SQSMessage sqsMessage = new SQSMessage(); sqsMessage.setBody(messageBody); if (messageBody != null) { @@ -338,10 +362,4 @@ private SNSRecord snsRecordWithMessage(String messageBody, boolean largeMessage) return new SNSRecord().withSns(sns); } - private void setupContext() { - when(context.getFunctionName()).thenReturn("testFunction"); - when(context.getInvokedFunctionArn()).thenReturn("testArn"); - when(context.getFunctionVersion()).thenReturn("1"); - when(context.getMemoryLimitInMB()).thenReturn(1024); - } } diff --git a/powertools-large-messages/src/test/resources/simplelogger.properties b/powertools-large-messages/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..559c22385 --- /dev/null +++ b/powertools-large-messages/src/test/resources/simplelogger.properties @@ -0,0 +1,7 @@ +org.slf4j.simpleLogger.logFile=target/large-messages-test.log +org.slf4j.simpleLogger.defaultLogLevel=warn +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS +org.slf4j.simpleLogger.showThreadName=false +org.slf4j.simpleLogger.showLogName=true +org.slf4j.simpleLogger.showShortLogName=false