Skip to content

Commit ff29858

Browse files
author
Bob Strahan
committed
> Add post-processing decompressor lambda with documentation updates
1 parent f38a649 commit ff29858

File tree

4 files changed

+249
-10
lines changed

4 files changed

+249
-10
lines changed

docs/post-processing-lambda-hook.md

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,58 @@ The GenAIIDP solution supports an optional post-processing Lambda hook that enab
1515
2. **EventBridge Integration**
1616
- The solution automatically publishes a custom event to EventBridge
1717
- Event contains complete document processing details and output locations
18-
- EventBridge rule matches the event pattern and triggers your Lambda function
19-
20-
3. **Custom Processing**
21-
- Your Lambda function receives the document processing event
22-
- Function can access extraction results, confidence scores, and metadata
18+
- EventBridge rule matches the event pattern and triggers the decompression lambda
19+
20+
3. **Document Decompression** (New as of compression feature)
21+
- An intermediate lambda function in the IDP stack receives the event
22+
- If the document is compressed, it decompresses it using `idp_common.models.Document.load_document()`
23+
- The decompressed document is reconstructed into the original payload format
24+
- The decompression lambda then invokes your custom post-processing lambda
25+
26+
4. **Custom Processing**
27+
- Your Lambda function receives the document processing event with a **decompressed** document
28+
- No need to import `idp_common` or handle decompression logic
29+
- Function can directly access extraction results, confidence scores, and metadata
2330
- Implement custom logic for notifications, data transformation, or system integration
2431

25-
4. **Error Handling**
32+
5. **Error Handling**
2633
- Built-in retry logic for transient failures
2734
- Dead letter queue for failed invocations
2835
- CloudWatch monitoring and alerting
2936

37+
## Architecture
38+
39+
```mermaid
40+
flowchart LR
41+
SF[Step Function<br/>Workflow<br/>SUCCEEDED] --> EB[EventBridge<br/>Rule]
42+
EB --> DL[Decompression<br/>Lambda<br/>in Stack]
43+
DL --> |Check if<br/>compressed| DL
44+
DL --> |If compressed:<br/>decompress from<br/>WorkingBucket| S3[(S3<br/>Working<br/>Bucket)]
45+
DL --> |Invoke with<br/>decompressed<br/>payload| CPL[Your Custom<br/>Post-Processing<br/>Lambda]
46+
DL --> |Handle<br/>errors| DLQ[Dead Letter<br/>Queue]
47+
48+
style DL fill:#90EE90
49+
style CPL fill:#87CEEB
50+
style S3 fill:#FFE4B5
51+
```
52+
53+
**Key Components:**
54+
- **EventBridge Rule**: Triggers on StepFunction SUCCEEDED status
55+
- **Decompression Lambda** (in IDP stack): Handles document decompression transparently
56+
- **Your Custom Lambda** (external): Receives decompressed payload, no `idp_common` dependency needed
57+
- **S3 Working Bucket**: Temporary storage for compressed documents
58+
- **Dead Letter Queue**: Captures failed invocations for retry/analysis
59+
60+
## Important Notes
61+
62+
> **Backward Compatibility**: The decompression lambda was introduced to handle document compression feature added in a previous release. Your custom post-processing lambda receives the document in its **original uncompressed format**, maintaining backward compatibility.
63+
64+
> **No External Dependencies**: Your custom post-processing lambda does **not** need to import the `idp_common` package. The IDP stack's decompression lambda handles all compression/decompression internally.
65+
66+
> **Payload Format**: The event payload structure remains unchanged from the original implementation. All document data, including sections, pages, and attributes, is provided in the standard format documented below.
67+
68+
> **Performance**: The decompression step adds minimal latency (typically < 1 second) and is transparent to your custom lambda function.
69+
3070
## Configuration
3171

3272
Configure the post-processing hook during stack deployment:
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
# SPDX-License-Identifier: MIT-0
3+
4+
import boto3
5+
import json
6+
import os
7+
import logging
8+
from idp_common.models import Document
9+
from typing import Dict, Any
10+
11+
logger = logging.getLogger()
12+
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
13+
14+
lambda_client = boto3.client('lambda')
15+
16+
CUSTOM_POST_PROCESSOR_ARN = os.environ['CUSTOM_POST_PROCESSOR_ARN']
17+
WORKING_BUCKET = os.environ['WORKING_BUCKET']
18+
19+
20+
def handler(event: Dict[str, Any], context) -> Dict[str, Any]:
21+
"""
22+
Decompresses the document from StepFunction output and invokes custom post-processor lambda.
23+
24+
This lambda acts as an intermediary between EventBridge and the custom post-processing lambda,
25+
handling document decompression so external lambdas don't need to import idp_common.
26+
27+
Args:
28+
event: EventBridge event containing StepFunction execution details
29+
context: Lambda context
30+
31+
Returns:
32+
Response from custom post-processor lambda invocation
33+
"""
34+
logger.info(f"Processing event for custom post-processor invocation")
35+
36+
try:
37+
# Extract output data from event
38+
output_data = None
39+
if event.get('detail', {}).get('output'):
40+
output_data = json.loads(event['detail']['output'])
41+
42+
if not output_data:
43+
logger.error("No output data found in event")
44+
raise ValueError("Missing output data in event")
45+
46+
# Extract document data - handle both Pattern 1 and Pattern 2/3 structures
47+
document_data = None
48+
if 'document' in output_data:
49+
# Pattern 2/3 structure: at root level
50+
document_data = output_data['document']
51+
logger.info("Found document in output_data['document']")
52+
elif 'Result' in output_data and 'document' in output_data.get('Result', {}):
53+
# Pattern 1 structure: wrapped in Result
54+
document_data = output_data['Result']['document']
55+
logger.info("Found document in output_data['Result']['document']")
56+
else:
57+
logger.warning("Document not found in expected locations, using entire output")
58+
document_data = output_data
59+
60+
# Check if document is compressed
61+
is_compressed = isinstance(document_data, dict) and document_data.get('compressed', False)
62+
63+
if is_compressed:
64+
logger.info(f"Document is compressed, decompressing from S3 URI: {document_data.get('s3_uri', 'N/A')}")
65+
66+
# Decompress document using idp_common
67+
processed_doc = Document.load_document(document_data, WORKING_BUCKET, logger)
68+
69+
logger.info(f"Decompressed document: {processed_doc.num_pages} pages, "
70+
f"{len(processed_doc.sections)} sections")
71+
72+
# Reconstruct output_data with decompressed document
73+
if 'document' in output_data:
74+
output_data['document'] = processed_doc.to_dict()
75+
elif 'Result' in output_data and 'document' in output_data.get('Result', {}):
76+
output_data['Result']['document'] = processed_doc.to_dict()
77+
else:
78+
output_data = processed_doc.to_dict()
79+
80+
# Update event with decompressed payload
81+
event['detail']['output'] = json.dumps(output_data)
82+
83+
logger.info("Document decompressed successfully")
84+
else:
85+
logger.info("Document is not compressed, passing through as-is")
86+
87+
# Invoke custom post-processor lambda with decompressed payload
88+
logger.info(f"Invoking custom post-processor: {CUSTOM_POST_PROCESSOR_ARN}")
89+
90+
response = lambda_client.invoke(
91+
FunctionName=CUSTOM_POST_PROCESSOR_ARN,
92+
InvocationType='Event', # Async invocation
93+
Payload=json.dumps(event)
94+
)
95+
96+
logger.info(f"Custom post-processor invoked successfully. StatusCode: {response['StatusCode']}")
97+
98+
return {
99+
'statusCode': 200,
100+
'body': json.dumps({
101+
'message': 'Successfully invoked custom post-processor',
102+
'customProcessorArn': CUSTOM_POST_PROCESSOR_ARN,
103+
'documentCompressed': is_compressed
104+
})
105+
}
106+
107+
except Exception as e:
108+
logger.error(f"Error processing event: {str(e)}", exc_info=True)
109+
raise
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Post-processing decompressor lambda dependencies
2+
# Uses idp_common for document decompression

template.yaml

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3268,6 +3268,94 @@ Resources:
32683268
##########################################################################
32693269
# Optional Post Processing Lambda Hook
32703270
##########################################################################
3271+
3272+
# Decompression Lambda - handles document decompression before invoking custom post-processor
3273+
PostProcessingDecompressorDLQ:
3274+
Type: AWS::SQS::Queue
3275+
Condition: ShouldEnablePostProcessingLambdaHook
3276+
Properties:
3277+
KmsMasterKeyId: !Ref CustomerManagedEncryptionKey
3278+
VisibilityTimeout: 30
3279+
MessageRetentionPeriod: 345600 # 4 days
3280+
3281+
PostProcessingDecompressorDLQPolicy:
3282+
Type: AWS::SQS::QueuePolicy
3283+
Condition: ShouldEnablePostProcessingLambdaHook
3284+
Properties:
3285+
Queues:
3286+
- !Ref PostProcessingDecompressorDLQ
3287+
PolicyDocument:
3288+
Version: "2012-10-17"
3289+
Statement:
3290+
- Sid: EnforceSSLOnly
3291+
Effect: Deny
3292+
Principal: "*"
3293+
Action: "sqs:*"
3294+
Resource: !GetAtt PostProcessingDecompressorDLQ.Arn
3295+
Condition:
3296+
Bool:
3297+
"aws:SecureTransport": false
3298+
3299+
PostProcessingDecompressor:
3300+
Type: AWS::Serverless::Function
3301+
Condition: ShouldEnablePostProcessingLambdaHook
3302+
Metadata:
3303+
cfn_nag:
3304+
rules_to_suppress:
3305+
- id: W89
3306+
reason: "Function does not require VPC access as it only interacts with AWS services via APIs"
3307+
- id: W92
3308+
reason: "Function does not require reserved concurrency as it scales based on demand"
3309+
# checkov:skip=CKV_AWS_117: "Function does not require VPC access as it only interacts with AWS services via APIs"
3310+
# checkov:skip=CKV_AWS_115: "Function does not require reserved concurrency as it scales based on demand"
3311+
# checkov:skip=CKV_AWS_173: "Environment variables do not contain sensitive data - only configuration values like feature flags and non-sensitive settings"
3312+
Properties:
3313+
PermissionsBoundary:
3314+
!If [
3315+
HasPermissionsBoundary,
3316+
!Ref PermissionsBoundaryArn,
3317+
!Ref AWS::NoValue,
3318+
]
3319+
CodeUri: src/lambda/post_processing_decompressor/
3320+
Handler: index.handler
3321+
Runtime: python3.12
3322+
Timeout: 300
3323+
MemorySize: 512
3324+
Tracing: Active
3325+
DeadLetterQueue:
3326+
Type: SQS
3327+
TargetArn: !GetAtt PostProcessingDecompressorDLQ.Arn
3328+
LoggingConfig:
3329+
LogGroup: !Ref PostProcessingDecompressorLogGroup
3330+
Environment:
3331+
Variables:
3332+
LOG_LEVEL: !Ref LogLevel
3333+
CUSTOM_POST_PROCESSOR_ARN: !Ref PostProcessingLambdaHookFunctionArn
3334+
WORKING_BUCKET: !Ref WorkingBucket
3335+
Policies:
3336+
- S3CrudPolicy:
3337+
BucketName: !Ref WorkingBucket
3338+
- Statement:
3339+
- Effect: Allow
3340+
Action:
3341+
- lambda:InvokeFunction
3342+
Resource: !Ref PostProcessingLambdaHookFunctionArn
3343+
- Effect: Allow
3344+
Action:
3345+
- kms:Encrypt
3346+
- kms:Decrypt
3347+
- kms:ReEncrypt*
3348+
- kms:GenerateDataKey*
3349+
- kms:DescribeKey
3350+
Resource: !GetAtt CustomerManagedEncryptionKey.Arn
3351+
3352+
PostProcessingDecompressorLogGroup:
3353+
Type: AWS::Logs::LogGroup
3354+
Condition: ShouldEnablePostProcessingLambdaHook
3355+
Properties:
3356+
KmsKeyId: !GetAtt CustomerManagedEncryptionKey.Arn
3357+
RetentionInDays: !Ref LogRetentionDays
3358+
32713359
PostProcessingLambdaHookRule:
32723360
Type: AWS::Events::Rule
32733361
Condition: ShouldEnablePostProcessingLambdaHook
@@ -3290,17 +3378,17 @@ Resources:
32903378
- SUCCEEDED
32913379
State: ENABLED
32923380
Targets:
3293-
- Arn: !Ref PostProcessingLambdaHookFunctionArn
3294-
Id: PostProcessingLambdaHookFunction
3381+
- Arn: !GetAtt PostProcessingDecompressor.Arn
3382+
Id: PostProcessingDecompressor
32953383
RetryPolicy:
32963384
MaximumRetryAttempts: 3
32973385

3298-
PostProcessingLambdaHookPermission:
3386+
PostProcessingDecompressorPermission:
32993387
Type: AWS::Lambda::Permission
33003388
Condition: ShouldEnablePostProcessingLambdaHook
33013389
Properties:
33023390
Action: lambda:InvokeFunction
3303-
FunctionName: !Ref PostProcessingLambdaHookFunctionArn
3391+
FunctionName: !Ref PostProcessingDecompressor
33043392
Principal: !Sub "events.${AWS::URLSuffix}"
33053393
SourceArn: !GetAtt PostProcessingLambdaHookRule.Arn
33063394

0 commit comments

Comments
 (0)