diff --git a/powertools-e2e-tests/handlers/largemessage-functional/pom.xml b/powertools-e2e-tests/handlers/largemessage-functional/pom.xml
new file mode 100644
index 000000000..4afb633f2
--- /dev/null
+++ b/powertools-e2e-tests/handlers/largemessage-functional/pom.xml
@@ -0,0 +1,46 @@
+
+ 4.0.0
+
+
+ software.amazon.lambda
+ e2e-test-handlers-parent
+ 2.5.0
+
+
+ e2e-test-handler-largemessage-functional
+ jar
+ E2E test handler – Large message functional
+
+
+
+ software.amazon.awssdk
+ dynamodb
+
+
+ software.amazon.lambda
+ powertools-large-messages
+
+
+ software.amazon.lambda
+ powertools-logging-log4j
+
+
+ software.amazon.lambda
+ powertools-logging
+
+
+ com.amazonaws
+ aws-lambda-java-events
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-shade-plugin
+
+
+
+
diff --git a/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java b/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java
new file mode 100644
index 000000000..05a336500
--- /dev/null
+++ b/powertools-e2e-tests/handlers/largemessage-functional/src/main/java/software/amazon/lambda/powertools/e2e/Function.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.e2e;
+
+import static software.amazon.lambda.powertools.logging.PowertoolsLogging.withLogging;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
+
+import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
+import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
+import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
+import software.amazon.awssdk.utils.BinaryUtils;
+import software.amazon.awssdk.utils.Md5Utils;
+import software.amazon.lambda.powertools.largemessages.LargeMessages;
+
+public class Function implements RequestHandler {
+
+ private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS");
+ private DynamoDbClient client;
+
+ public Function() {
+ if (client == null) {
+ client = DynamoDbClient.builder()
+ .httpClient(UrlConnectionHttpClient.builder().build())
+ .region(Region.of(System.getenv("AWS_REGION")))
+ .build();
+ }
+ }
+
+ public SQSBatchResponse handleRequest(SQSEvent event, Context context) {
+ return withLogging(context, () -> {
+ for (SQSMessage message : event.getRecords()) {
+ LargeMessages.processLargeMessage(message, msg -> processRawMessage(msg, context));
+ }
+ return SQSBatchResponse.builder().build();
+ });
+ }
+
+ private Void processRawMessage(SQSMessage sqsMessage, Context context) {
+ String bodyMD5 = md5(sqsMessage.getBody());
+ if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) {
+ throw new SecurityException(
+ String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(),
+ bodyMD5));
+ }
+
+ Map item = new HashMap<>();
+ item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build());
+ item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build());
+ item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build());
+ item.put("bodySize",
+ AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length))
+ .build());
+
+ client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build());
+
+ return null;
+ }
+
+ private String md5(String message) {
+ return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8)));
+ }
+}
diff --git a/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml b/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..8925f70b9
--- /dev/null
+++ b/powertools-e2e-tests/handlers/largemessage-functional/src/main/resources/log4j2.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/powertools-e2e-tests/handlers/pom.xml b/powertools-e2e-tests/handlers/pom.xml
index 86e867548..f171422c5 100644
--- a/powertools-e2e-tests/handlers/pom.xml
+++ b/powertools-e2e-tests/handlers/pom.xml
@@ -27,6 +27,7 @@
batchlargemessage
+ largemessage-functionallargemessage_idempotentlogging-log4jlogging-logback
diff --git a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java
index 74247ca2e..0de2dca60 100644
--- a/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java
+++ b/powertools-e2e-tests/src/test/java/software/amazon/lambda/powertools/LargeMessageE2ET.java
@@ -16,9 +16,10 @@
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,7 @@
import software.amazon.lambda.powertools.testutils.Infrastructure;
import software.amazon.lambda.powertools.testutils.RetryUtils;
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class LargeMessageE2ET {
private static final Logger LOG = LoggerFactory.getLogger(LargeMessageE2ET.class);
@@ -55,25 +57,34 @@ class LargeMessageE2ET {
.region(region)
.build();
- private static Infrastructure infrastructure;
- private static String functionName;
- private static String bucketName;
- private static String queueUrl;
- private static String tableName;
+ private Infrastructure infrastructure;
+ private String functionName;
+ private String bucketName;
+ private String queueUrl;
+ private String tableName;
private String messageId;
+ private String currentPathToFunction;
+
+ private void setupInfrastructure(String pathToFunction) {
+ // Do not re-deploy the same function
+ if (pathToFunction.equals(currentPathToFunction)) {
+ return;
+ }
+
+ // Destroy any existing infrastructure before re-deploying
+ if (infrastructure != null) {
+ infrastructure.destroy();
+ }
- @BeforeAll
- @Timeout(value = 5, unit = TimeUnit.MINUTES)
- static void setup() {
String random = UUID.randomUUID().toString().substring(0, 6);
bucketName = "largemessagebucket" + random;
String queueName = "largemessagequeue" + random;
infrastructure = Infrastructure.builder()
- .testName(LargeMessageE2ET.class.getSimpleName())
+ .testName(LargeMessageE2ET.class.getSimpleName() + "-" + pathToFunction)
.queue(queueName)
.largeMessagesBucket(bucketName)
- .pathToFunction("largemessage")
+ .pathToFunction(pathToFunction)
.timeoutInSeconds(60)
.build();
@@ -81,19 +92,24 @@ static void setup() {
functionName = outputs.get(FUNCTION_NAME_OUTPUT);
queueUrl = outputs.get("QueueURL");
tableName = outputs.get("TableNameForAsyncTests");
+ currentPathToFunction = pathToFunction;
- LOG.info("Testing '" + LargeMessageE2ET.class.getSimpleName() + "'");
+ LOG.info("Testing '{}' with {}", LargeMessageE2ET.class.getSimpleName(), pathToFunction);
}
@AfterAll
- static void tearDown() {
+ void cleanup() {
if (infrastructure != null) {
infrastructure.destroy();
}
}
@AfterEach
- void reset() {
+ void tearDown() {
+ reset();
+ }
+
+ private void reset() {
if (messageId != null) {
Map itemToDelete = new HashMap<>();
itemToDelete.put("functionName", AttributeValue.builder().s(functionName).build());
@@ -103,8 +119,12 @@ void reset() {
}
}
- @Test
- void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
+ @ParameterizedTest
+ @ValueSource(strings = { "largemessage", "largemessage-functional" })
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws IOException {
+ setupInfrastructure(pathToFunction);
+
// GIVEN
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, bucketName);
@@ -146,8 +166,12 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75");
}
- @Test
- void smallSQSMessage_shouldNotReadFromS3() {
+ @ParameterizedTest
+ @ValueSource(strings = { "largemessage", "largemessage-functional" })
+ @Timeout(value = 5, unit = TimeUnit.MINUTES)
+ void smallSQSMessage_shouldNotReadFromS3(String pathToFunction) {
+ setupInfrastructure(pathToFunction);
+
// GIVEN
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, bucketName);
diff --git a/powertools-large-messages/pom.xml b/powertools-large-messages/pom.xml
index a405941d7..bc7bbf03a 100644
--- a/powertools-large-messages/pom.xml
+++ b/powertools-large-messages/pom.xml
@@ -14,11 +14,11 @@
-->
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
-A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.
+ A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.software.amazon.lambda
@@ -41,6 +41,13 @@
software.amazon.lambdapowertools-common
+
+ software.amazon.lambda
+ powertools-common
+ ${project.version}
+ test-jar
+ test
+ com.amazonawsaws-lambda-java-events
@@ -102,6 +109,11 @@
mockito-coretest
+
+ org.mockito
+ mockito-junit-jupiter
+ test
+ org.slf4jslf4j-simple
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java
index 4e556966c..eb5b368a5 100644
--- a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessage.java
@@ -20,15 +20,15 @@
import java.lang.annotation.Target;
/**
- *
Use this annotation to handle large messages (> 256 KB) from SQS or SNS.
+ *
Use this annotation to handle large messages (> 1 MB) from SQS or SNS.
* When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.
*
*
{@code @LargeMessage} automatically retrieves and deletes messages
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
* client libraries.
*
- *
This version of the {@code @LargeMessage} is compatible with version
- * 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.
+ *
This version of the {@code @LargeMessage} is compatible with version 1.1.0+ and 2.0.0+
+ * of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.
*
*
Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
*
@@ -54,9 +54,11 @@
*
*
*
- *
Note 1: Retrieving payloads and deleting objects from S3 will increase the duration of the
+ *
Note 1: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
+ * the large blob in memory. The message body will be replaced with the S3 object content.
+ *
Note 2: Retrieving payloads and deleting objects from S3 will increase the duration of the
* Lambda function.
- *
Note 2: Make sure to configure your function with enough memory to be able to retrieve S3 objects.
+ *
Note 3: Make sure to configure your function with enough memory to be able to retrieve S3 objects.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
diff --git a/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java
new file mode 100644
index 000000000..52675d3eb
--- /dev/null
+++ b/powertools-large-messages/src/main/java/software/amazon/lambda/powertools/largemessages/LargeMessages.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2023 Amazon.com, Inc. or its affiliates.
+ * Licensed under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package software.amazon.lambda.powertools.largemessages;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessor;
+import software.amazon.lambda.powertools.largemessages.internal.LargeMessageProcessorFactory;
+
+/**
+ * Functional API for processing large messages without AspectJ.
+ *
+ * Use this class to handle large messages (> 1 MB) from SQS or SNS.
+ * When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.
+ *
+ * {@code LargeMessages} automatically retrieves and optionally deletes messages
+ * which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
+ * client libraries.
+ *
+ * This version is compatible with version 1.1.0+ and 2.0.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.
+ *
+ * Note 1: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
+ * the large blob in memory. The message body will be replaced with the S3 object content.
+ *
+ * Note 2: Retrieving payloads and deleting objects from S3 will increase the duration of the Lambda function.
+ *
+ * Note 3: Make sure to configure your function with enough memory to be able to retrieve S3 objects.
+ *
+ * @see LargeMessage
+ */
+public final class LargeMessages {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LargeMessages.class);
+
+ private LargeMessages() {
+ // Utility class
+ }
+
+ /**
+ * Process a large message and execute the function with the processed message.
+ *
+ * The S3 object will be deleted after processing (default behavior).
+ * To disable S3 object deletion, use {@link #processLargeMessage(Object, Function, boolean)}.
+ *
+ *
+ * @param message the message to process (SQSMessage or SNSRecord)
+ * @param function the function to execute with the processed message
+ * @param the message type
+ * @param the return type of the function
+ * @return the result of the function execution
+ */
+ public static R processLargeMessage(T message, Function function) {
+ return processLargeMessage(message, function, true);
+ }
+
+ /**
+ * Process a large message and execute the function with the processed message.
+ *