Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions powertools-e2e-tests/handlers/largemessage-functional/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>software.amazon.lambda</groupId>
<artifactId>e2e-test-handlers-parent</artifactId>
<version>2.5.0</version>
</parent>

<artifactId>e2e-test-handler-largemessage-functional</artifactId>
<packaging>jar</packaging>
<name>E2E test handler – Large message functional</name>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-large-messages</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging-log4j</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-logging</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2023 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package software.amazon.lambda.powertools.e2e;

import static software.amazon.lambda.powertools.logging.PowertoolsLogging.withLogging;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;

import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.utils.BinaryUtils;
import software.amazon.awssdk.utils.Md5Utils;
import software.amazon.lambda.powertools.largemessages.LargeMessages;

public class Function implements RequestHandler<SQSEvent, SQSBatchResponse> {

private static final String TABLE_FOR_ASYNC_TESTS = System.getenv("TABLE_FOR_ASYNC_TESTS");
private DynamoDbClient client;

public Function() {
if (client == null) {
client = DynamoDbClient.builder()
.httpClient(UrlConnectionHttpClient.builder().build())
.region(Region.of(System.getenv("AWS_REGION")))
.build();
}
}

public SQSBatchResponse handleRequest(SQSEvent event, Context context) {
return withLogging(context, () -> {
for (SQSMessage message : event.getRecords()) {
LargeMessages.processLargeMessage(message, msg -> processRawMessage(msg, context));
}
return SQSBatchResponse.builder().build();
});
}

private Void processRawMessage(SQSMessage sqsMessage, Context context) {
String bodyMD5 = md5(sqsMessage.getBody());
if (!sqsMessage.getMd5OfBody().equals(bodyMD5)) {
throw new SecurityException(
String.format("message digest does not match, expected %s, got %s", sqsMessage.getMd5OfBody(),
bodyMD5));
}

Map<String, AttributeValue> item = new HashMap<>();
item.put("functionName", AttributeValue.builder().s(context.getFunctionName()).build());
item.put("id", AttributeValue.builder().s(sqsMessage.getMessageId()).build());
item.put("bodyMD5", AttributeValue.builder().s(bodyMD5).build());
item.put("bodySize",
AttributeValue.builder().n(String.valueOf(sqsMessage.getBody().getBytes(StandardCharsets.UTF_8).length))
.build());

client.putItem(PutItemRequest.builder().tableName(TABLE_FOR_ASYNC_TESTS).item(item).build());

return null;
}

private String md5(String message) {
return BinaryUtils.toHex(Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<Console name="JsonAppender" target="SYSTEM_OUT">
<JsonTemplateLayout eventTemplateUri="classpath:LambdaJsonLayout.json" />
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="JsonAppender"/>
</Root>
<Logger name="JsonLogger" level="INFO" additivity="false">
<AppenderRef ref="JsonAppender"/>
</Logger>
</Loggers>
</Configuration>
1 change: 1 addition & 0 deletions powertools-e2e-tests/handlers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<modules>
<module>batch</module>
<module>largemessage</module>
<module>largemessage-functional</module>
<module>largemessage_idempotent</module>
<module>logging-log4j</module>
<module>logging-logback</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +41,7 @@
import software.amazon.lambda.powertools.testutils.Infrastructure;
import software.amazon.lambda.powertools.testutils.RetryUtils;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class LargeMessageE2ET {

private static final Logger LOG = LoggerFactory.getLogger(LargeMessageE2ET.class);
Expand All @@ -55,45 +57,59 @@ class LargeMessageE2ET {
.region(region)
.build();

private static Infrastructure infrastructure;
private static String functionName;
private static String bucketName;
private static String queueUrl;
private static String tableName;
private Infrastructure infrastructure;
private String functionName;
private String bucketName;
private String queueUrl;
private String tableName;
private String messageId;
private String currentPathToFunction;

private void setupInfrastructure(String pathToFunction) {
// Do not re-deploy the same function
if (pathToFunction.equals(currentPathToFunction)) {
return;
}

// Destroy any existing infrastructure before re-deploying
if (infrastructure != null) {
infrastructure.destroy();
}

@BeforeAll
@Timeout(value = 5, unit = TimeUnit.MINUTES)
static void setup() {
String random = UUID.randomUUID().toString().substring(0, 6);
bucketName = "largemessagebucket" + random;
String queueName = "largemessagequeue" + random;

infrastructure = Infrastructure.builder()
.testName(LargeMessageE2ET.class.getSimpleName())
.testName(LargeMessageE2ET.class.getSimpleName() + "-" + pathToFunction)
.queue(queueName)
.largeMessagesBucket(bucketName)
.pathToFunction("largemessage")
.pathToFunction(pathToFunction)
.timeoutInSeconds(60)
.build();

Map<String, String> outputs = infrastructure.deploy();
functionName = outputs.get(FUNCTION_NAME_OUTPUT);
queueUrl = outputs.get("QueueURL");
tableName = outputs.get("TableNameForAsyncTests");
currentPathToFunction = pathToFunction;

LOG.info("Testing '" + LargeMessageE2ET.class.getSimpleName() + "'");
LOG.info("Testing '{}' with {}", LargeMessageE2ET.class.getSimpleName(), pathToFunction);
}

@AfterAll
static void tearDown() {
void cleanup() {
if (infrastructure != null) {
infrastructure.destroy();
}
}

@AfterEach
void reset() {
void tearDown() {
reset();
}

private void reset() {
if (messageId != null) {
Map<String, AttributeValue> itemToDelete = new HashMap<>();
itemToDelete.put("functionName", AttributeValue.builder().s(functionName).build());
Expand All @@ -103,8 +119,12 @@ void reset() {
}
}

@Test
void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
@ParameterizedTest
@ValueSource(strings = { "largemessage", "largemessage-functional" })
@Timeout(value = 5, unit = TimeUnit.MINUTES)
void bigSQSMessageOffloadedToS3_shouldLoadFromS3(String pathToFunction) throws IOException {
setupInfrastructure(pathToFunction);

// GIVEN
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, bucketName);
Expand Down Expand Up @@ -146,8 +166,12 @@ void bigSQSMessageOffloadedToS3_shouldLoadFromS3() throws IOException {
assertThat(items.get(0).get("bodyMD5").s()).isEqualTo("22bde5e7b05fa80bc7be45bdd4bc6c75");
}

@Test
void smallSQSMessage_shouldNotReadFromS3() {
@ParameterizedTest
@ValueSource(strings = { "largemessage", "largemessage-functional" })
@Timeout(value = 5, unit = TimeUnit.MINUTES)
void smallSQSMessage_shouldNotReadFromS3(String pathToFunction) {
setupInfrastructure(pathToFunction);

// GIVEN
final ExtendedClientConfiguration extendedClientConfig = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, bucketName);
Expand Down
18 changes: 15 additions & 3 deletions powertools-large-messages/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>
<description>A suite of utilities for AWS Lambda Functions that makes handling large messages in SQS and SNS easier.</description>

<parent>
<groupId>software.amazon.lambda</groupId>
Expand All @@ -41,6 +41,13 @@
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-common</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.lambda</groupId>
<artifactId>powertools-common</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
Expand Down Expand Up @@ -102,6 +109,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@
import java.lang.annotation.Target;

/**
* <p>Use this annotation to handle large messages (> 256 KB) from SQS or SNS.
* <p>Use this annotation to handle large messages (> 1 MB) from SQS or SNS.
* When large messages are sent to an SQS Queue or SNS Topic, they are offloaded to S3 and only a reference is passed in the message/record.</p>
*
* <p>{@code @LargeMessage} automatically retrieves and deletes messages
* which have been offloaded to S3 using the {@code amazon-sqs-java-extended-client-lib} or {@code amazon-sns-java-extended-client-lib}
* client libraries.</p>
*
* <p>This version of the {@code @LargeMessage} is compatible with version
* 1.1.0+ of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
* <p>This version of the {@code @LargeMessage} is compatible with version 1.1.0+ and 2.0.0+
* of {@code amazon-sqs-java-extended-client-lib} / {@code amazon-sns-java-extended-client-lib}.</p>
* <br/>
* <p>Put this annotation on a method where the first parameter is either a {@link com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage} or {@link com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord}.
* <br/>
Expand All @@ -54,9 +54,11 @@
* </pre>
* </p>
*
* <p><b>Note 1</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
* <p><b>Note 1</b>: The message object (SQSMessage or SNSRecord) is modified in-place to avoid duplicating
* the large blob in memory. The message body will be replaced with the S3 object content.</p>
* <p><b>Note 2</b>: Retrieving payloads and deleting objects from S3 will increase the duration of the
* Lambda function.</p>
* <p><b>Note 2</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
* <p><b>Note 3</b>: Make sure to configure your function with enough memory to be able to retrieve S3 objects.</p>
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
Expand Down
Loading
Loading