Skip to content

Commit b26869e

Browse files
committed
Merge branch 'fix/reprocess-starts-two-executions' into 'develop'
Fix duplicate Step Functions executions on document reprocess See merge request genaiic-reusable-assets/engagement-artifacts/genaiic-idp-accelerator!341
2 parents c53bbdb + b6d1dfb commit b26869e

File tree

4 files changed

+126
-37
lines changed

4 files changed

+126
-37
lines changed

CHANGELOG.md

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,13 @@ SPDX-License-Identifier: MIT-0
2424

2525

2626
### Fixed
27-
- Problem with setting correctly formatted WAF IPv4 CIDR range - #73
28-
27+
- **Problem with setting correctly formatted WAF IPv4 CIDR range** - #73
2928

29+
- **Duplicate Step Functions Executions on Document Reprocess - [GitHub Issue #66](https://github.com/aws-solutions-library-samples/accelerated-intelligent-document-processing-on-aws/issues/66)**
30+
- Eliminated duplicate workflow executions when reprocessing large documents (>40MB, 500+ pages)
31+
- **Root Cause**: S3 `copy_object` operations were triggering multiple "Object Created" events for large files, causing `queue_sender` to create duplicate document entries and workflow executions
32+
- **Solution**: Refactored `reprocess_document_resolver` to directly create fresh Document objects and queue to SQS, completely bypassing S3 event notifications
33+
- **Benefits**: Eliminates unnecessary S3 copy operations (cost savings)
3034

3135
## [0.3.18]
3236

src/lambda/reprocess_document_resolver/index.py

Lines changed: 105 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,53 +5,125 @@
55
import os
66
import boto3
77
import logging
8-
from datetime import datetime
8+
from datetime import datetime, timezone, timedelta
9+
10+
# Import IDP Common modules
11+
from idp_common.models import Document, Status
12+
from idp_common.docs_service import create_document_service
913

1014
logger = logging.getLogger()
1115
logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO'))
1216

17+
# Initialize AWS clients
18+
sqs_client = boto3.client('sqs')
1319
s3_client = boto3.client('s3')
1420

15-
def handler(event, context):
16-
logger.info(f"Event: {json.dumps(event)}")
21+
# Initialize document service (same as queue_sender - defaults to AppSync)
22+
document_service = create_document_service()
1723

24+
# Environment variables
25+
queue_url = os.environ.get('QUEUE_URL')
26+
input_bucket = os.environ.get('INPUT_BUCKET')
27+
output_bucket = os.environ.get('OUTPUT_BUCKET')
28+
retentionDays = int(os.environ.get('DATA_RETENTION_IN_DAYS', '365'))
29+
30+
def handler(event, context):
31+
logger.info(f"Reprocess resolver invoked with event: {json.dumps(event)}")
32+
1833
try:
19-
# Get the input bucket name from the environment variable
20-
input_bucket = os.environ.get('INPUT_BUCKET')
34+
# Validate environment variables
2135
if not input_bucket:
2236
raise Exception("INPUT_BUCKET environment variable is not set")
23-
24-
# Extract object keys from the arguments
37+
if not output_bucket:
38+
raise Exception("OUTPUT_BUCKET environment variable is not set")
39+
if not queue_url:
40+
raise Exception("QUEUE_URL environment variable is not set")
41+
42+
# Extract arguments from GraphQL event
2543
args = event.get('arguments', {})
2644
object_keys = args.get('objectKeys', [])
2745

2846
if not object_keys:
29-
return {
30-
'statusCode': 400,
31-
'body': 'No document keys provided'
32-
}
33-
34-
logger.info(f"Reprocessing documents: {object_keys}")
47+
logger.error("objectKeys is required but not provided")
48+
return False
3549

36-
# Copy each object over itself to trigger the S3 event notification
37-
for key in object_keys:
38-
logger.info(f"Reprocessing document: {key}")
39-
40-
# Copy the object to itself using the copy_object API
41-
s3_client.copy_object(
42-
Bucket=input_bucket,
43-
CopySource={'Bucket': input_bucket, 'Key': key},
44-
Key=key,
45-
MetadataDirective='REPLACE',
46-
Metadata={
47-
'reprocessed': 'true',
48-
'reprocessed_timestamp': datetime.utcnow().isoformat()
49-
}
50-
)
51-
52-
logger.info(f"Successfully reprocessed document: {key}")
53-
50+
logger.info(f"Reprocessing {len(object_keys)} documents")
51+
52+
# Process each document
53+
success_count = 0
54+
for object_key in object_keys:
55+
try:
56+
reprocess_document(object_key)
57+
success_count += 1
58+
except Exception as e:
59+
logger.error(f"Error reprocessing document {object_key}: {str(e)}", exc_info=True)
60+
# Continue with other documents even if one fails
61+
62+
logger.info(f"Successfully queued {success_count}/{len(object_keys)} documents for reprocessing")
5463
return True
64+
5565
except Exception as e:
56-
logger.error(f"Error reprocessing documents: {str(e)}")
57-
raise e
66+
logger.error(f"Error in reprocess handler: {str(e)}", exc_info=True)
67+
raise e
68+
69+
def reprocess_document(object_key):
70+
"""
71+
Reprocess a document by creating a fresh Document object and queueing it.
72+
This exactly mirrors the queue_sender pattern for consistency and avoids
73+
S3 copy operations that can trigger duplicate events for large files.
74+
"""
75+
logger.info(f"Reprocessing document: {object_key}")
76+
77+
# Verify file exists in S3
78+
try:
79+
s3_client.head_object(Bucket=input_bucket, Key=object_key)
80+
except Exception as e:
81+
raise ValueError(f"Document {object_key} not found in S3 bucket {input_bucket}: {str(e)}")
82+
83+
# Create a fresh Document object (same as queue_sender does)
84+
current_time = datetime.now(timezone.utc).isoformat()
85+
86+
document = Document(
87+
id=object_key, # Document ID is the object key
88+
input_bucket=input_bucket,
89+
input_key=object_key,
90+
output_bucket=output_bucket,
91+
status=Status.QUEUED,
92+
queued_time=current_time,
93+
initial_event_time=current_time,
94+
pages={},
95+
sections=[],
96+
)
97+
98+
logger.info(f"Created fresh document object for reprocessing: {object_key}")
99+
100+
# Calculate expiry date (same as queue_sender)
101+
expires_after = int((datetime.now(timezone.utc) + timedelta(days=retentionDays)).timestamp())
102+
103+
# Create document in DynamoDB via document service (same as queue_sender - uses AppSync by default)
104+
logger.info(f"Creating document via document service: {document.input_key}")
105+
created_key = document_service.create_document(document, expires_after=expires_after)
106+
logger.info(f"Document created with key: {created_key}")
107+
108+
# Send serialized document to SQS queue (same as queue_sender)
109+
doc_json = document.to_json()
110+
message = {
111+
'QueueUrl': queue_url,
112+
'MessageBody': doc_json,
113+
'MessageAttributes': {
114+
'EventType': {
115+
'StringValue': 'DocumentReprocessed',
116+
'DataType': 'String'
117+
},
118+
'ObjectKey': {
119+
'StringValue': object_key,
120+
'DataType': 'String'
121+
}
122+
}
123+
}
124+
logger.info(f"Sending document to SQS queue: {object_key}")
125+
response = sqs_client.send_message(**message)
126+
logger.info(f"SQS response: {response}")
127+
128+
logger.info(f"Successfully reprocessed document: {object_key}")
129+
return response.get('MessageId')
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
boto3>=1.28.0
1+
./lib/idp_common_pkg[docs_service] # idp_common package with Document model and document service integration

template.yaml

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5689,12 +5689,25 @@ Resources:
56895689
Variables:
56905690
LOG_LEVEL: !Ref LogLevel
56915691
INPUT_BUCKET: !Ref InputBucket
5692+
OUTPUT_BUCKET: !Ref OutputBucket
5693+
QUEUE_URL: !Ref DocumentQueue
5694+
APPSYNC_API_URL: !GetAtt GraphQLApi.GraphQLUrl
5695+
DATA_RETENTION_IN_DAYS: !Ref DataRetentionInDays
56925696
LoggingConfig:
56935697
LogGroup: !Ref ReprocessDocumentResolverFunctionLogGroup
56945698
Policies:
5695-
- S3CrudPolicy:
5699+
- DynamoDBCrudPolicy:
5700+
TableName: !Ref TrackingTable
5701+
- SQSSendMessagePolicy:
5702+
QueueName: !GetAtt DocumentQueue.QueueName
5703+
- S3ReadPolicy:
56965704
BucketName: !Ref InputBucket
56975705
- Statement:
5706+
- Effect: Allow
5707+
Action:
5708+
- appsync:GraphQL
5709+
Resource:
5710+
- !Sub "${GraphQLApi.Arn}/types/Mutation/*"
56985711
- Effect: Allow
56995712
Action:
57005713
- kms:Encrypt

0 commit comments

Comments
 (0)