|
| 1 | +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
| 2 | +# SPDX-License-Identifier: MIT-0 |
| 3 | + |
| 4 | +import json |
| 5 | +import os |
| 6 | +import boto3 |
| 7 | +import logging |
| 8 | +from datetime import datetime, timezone |
| 9 | +from boto3.dynamodb.conditions import Key |
| 10 | + |
| 11 | +logger = logging.getLogger() |
| 12 | +logger.setLevel(os.environ.get('LOG_LEVEL', 'INFO')) |
| 13 | + |
| 14 | +# Initialize AWS clients |
| 15 | +dynamodb = boto3.resource('dynamodb') |
| 16 | +s3_client = boto3.client('s3') |
| 17 | +sqs_client = boto3.client('sqs') |
| 18 | + |
| 19 | +# Environment variables |
| 20 | +DOCUMENTS_TABLE = os.environ.get('DOCUMENTS_TABLE') |
| 21 | +QUEUE_URL = os.environ.get('QUEUE_URL') |
| 22 | +DATA_RETENTION_DAYS = int(os.environ.get('DATA_RETENTION_IN_DAYS', '90')) |
| 23 | + |
| 24 | +def handler(event, context): |
| 25 | + logger.info(f"Event: {json.dumps(event)}") |
| 26 | + |
| 27 | + try: |
| 28 | + # Extract arguments from the GraphQL event |
| 29 | + args = event.get('arguments', {}) |
| 30 | + object_key = args.get('objectKey') |
| 31 | + modified_sections = args.get('modifiedSections', []) |
| 32 | + |
| 33 | + if not object_key: |
| 34 | + raise ValueError("objectKey is required") |
| 35 | + |
| 36 | + if not modified_sections: |
| 37 | + raise ValueError("modifiedSections is required") |
| 38 | + |
| 39 | + logger.info(f"Processing changes for document: {object_key}") |
| 40 | + logger.info(f"Modified sections: {json.dumps(modified_sections)}") |
| 41 | + |
| 42 | + # Get the document from DynamoDB |
| 43 | + table = dynamodb.Table(DOCUMENTS_TABLE) |
| 44 | + doc_pk = f"doc#{object_key}" |
| 45 | + |
| 46 | + response = table.get_item(Key={'PK': doc_pk, 'SK': doc_pk}) |
| 47 | + if 'Item' not in response: |
| 48 | + raise ValueError(f"Document {object_key} not found") |
| 49 | + |
| 50 | + document = response['Item'] |
| 51 | + logger.info(f"Found document: {document.get('ObjectKey')}") |
| 52 | + |
| 53 | + # Process section modifications |
| 54 | + updated_sections = [] |
| 55 | + updated_pages = [] |
| 56 | + modified_section_ids = [] |
| 57 | + |
| 58 | + # Convert existing sections and pages to dictionaries for easier manipulation |
| 59 | + existing_sections = {section['Id']: section for section in document.get('Sections', [])} |
| 60 | + existing_pages = {page['Id']: page for page in document.get('Pages', [])} |
| 61 | + |
| 62 | + for modified_section in modified_sections: |
| 63 | + section_id = modified_section['sectionId'] |
| 64 | + classification = modified_section['classification'] |
| 65 | + page_ids = modified_section['pageIds'] |
| 66 | + is_new = modified_section.get('isNew', False) |
| 67 | + is_deleted = modified_section.get('isDeleted', False) |
| 68 | + |
| 69 | + if is_deleted: |
| 70 | + # Remove the section (don't add to updated_sections) |
| 71 | + logger.info(f"Deleting section: {section_id}") |
| 72 | + # Also need to clear the extraction data from S3 |
| 73 | + if section_id in existing_sections: |
| 74 | + output_uri = existing_sections[section_id].get('OutputJSONUri') |
| 75 | + if output_uri: |
| 76 | + clear_extraction_data(output_uri) |
| 77 | + continue |
| 78 | + |
| 79 | + # Create or update section |
| 80 | + section = { |
| 81 | + 'Id': section_id, |
| 82 | + 'Class': classification, |
| 83 | + 'PageIds': page_ids, |
| 84 | + 'OutputJSONUri': None # Clear extraction data for reprocessing |
| 85 | + } |
| 86 | + |
| 87 | + # If this was an existing section, preserve any confidence threshold alerts |
| 88 | + if section_id in existing_sections and not is_new: |
| 89 | + existing_section = existing_sections[section_id] |
| 90 | + if 'ConfidenceThresholdAlerts' in existing_section: |
| 91 | + section['ConfidenceThresholdAlerts'] = existing_section['ConfidenceThresholdAlerts'] |
| 92 | + |
| 93 | + # Clear the existing extraction data |
| 94 | + output_uri = existing_section.get('OutputJSONUri') |
| 95 | + if output_uri: |
| 96 | + clear_extraction_data(output_uri) |
| 97 | + |
| 98 | + updated_sections.append(section) |
| 99 | + modified_section_ids.append(section_id) |
| 100 | + |
| 101 | + # Update page classifications to match section classification |
| 102 | + for page_id in page_ids: |
| 103 | + if page_id in existing_pages: |
| 104 | + page = existing_pages[page_id].copy() |
| 105 | + page['Class'] = classification |
| 106 | + updated_pages.append(page) |
| 107 | + logger.info(f"Updated page {page_id} classification to {classification}") |
| 108 | + |
| 109 | + # Add unchanged sections back |
| 110 | + for section_id, section in existing_sections.items(): |
| 111 | + if section_id not in [ms['sectionId'] for ms in modified_sections]: |
| 112 | + updated_sections.append(section) |
| 113 | + |
| 114 | + # Add unchanged pages back |
| 115 | + all_modified_page_ids = set() |
| 116 | + for modified_section in modified_sections: |
| 117 | + if not modified_section.get('isDeleted', False): |
| 118 | + all_modified_page_ids.update(modified_section['pageIds']) |
| 119 | + |
| 120 | + for page_id, page in existing_pages.items(): |
| 121 | + if page_id not in all_modified_page_ids: |
| 122 | + updated_pages.append(page) |
| 123 | + |
| 124 | + # Sort sections by starting page ID |
| 125 | + updated_sections.sort(key=lambda s: min(s.get('PageIds', [float('inf')]))) |
| 126 | + |
| 127 | + # Sort pages by ID |
| 128 | + updated_pages.sort(key=lambda p: p.get('Id', 0)) |
| 129 | + |
| 130 | + # Update the document in DynamoDB |
| 131 | + current_time = datetime.now(timezone.utc).isoformat() |
| 132 | + |
| 133 | + update_expression = "SET #sections = :sections, #pages = :pages, #status = :status, #queued_time = :queued_time" |
| 134 | + expression_attribute_names = { |
| 135 | + '#sections': 'Sections', |
| 136 | + '#pages': 'Pages', |
| 137 | + '#status': 'ObjectStatus', |
| 138 | + '#queued_time': 'QueuedTime' |
| 139 | + } |
| 140 | + expression_attribute_values = { |
| 141 | + ':sections': updated_sections, |
| 142 | + ':pages': updated_pages, |
| 143 | + ':status': 'QUEUED', |
| 144 | + ':queued_time': current_time |
| 145 | + } |
| 146 | + |
| 147 | + table.update_item( |
| 148 | + Key={'PK': doc_pk, 'SK': doc_pk}, |
| 149 | + UpdateExpression=update_expression, |
| 150 | + ExpressionAttributeNames=expression_attribute_names, |
| 151 | + ExpressionAttributeValues=expression_attribute_values |
| 152 | + ) |
| 153 | + |
| 154 | + logger.info(f"Updated document {object_key} with {len(updated_sections)} sections and {len(updated_pages)} pages") |
| 155 | + |
| 156 | + # Create the document object for SQS processing |
| 157 | + document_for_processing = { |
| 158 | + 'id': object_key, |
| 159 | + 'input_bucket': document.get('InputBucket', ''), |
| 160 | + 'input_key': object_key, |
| 161 | + 'output_bucket': document.get('OutputBucket', ''), |
| 162 | + 'status': 'QUEUED', |
| 163 | + 'queued_time': current_time, |
| 164 | + 'num_pages': len(updated_pages), |
| 165 | + 'pages': {str(page['Id']): { |
| 166 | + 'page_id': str(page['Id']), |
| 167 | + 'image_uri': page.get('ImageUri'), |
| 168 | + 'raw_text_uri': page.get('TextUri'), |
| 169 | + 'parsed_text_uri': page.get('TextUri'), |
| 170 | + 'text_confidence_uri': page.get('TextConfidenceUri'), |
| 171 | + 'classification': page.get('Class'), |
| 172 | + 'confidence': 1.0, |
| 173 | + 'tables': [], |
| 174 | + 'forms': {} |
| 175 | + } for page in updated_pages}, |
| 176 | + 'sections': [{ |
| 177 | + 'section_id': section['Id'], |
| 178 | + 'classification': section['Class'], |
| 179 | + 'confidence': 1.0, |
| 180 | + 'page_ids': [str(pid) for pid in section['PageIds']], |
| 181 | + 'extraction_result_uri': section.get('OutputJSONUri'), |
| 182 | + 'attributes': None, |
| 183 | + 'confidence_threshold_alerts': section.get('ConfidenceThresholdAlerts', []) |
| 184 | + } for section in updated_sections], |
| 185 | + 'metering': document.get('Metering', {}), |
| 186 | + 'metadata': {}, |
| 187 | + 'errors': [] |
| 188 | + } |
| 189 | + |
| 190 | + # Send to SQS with selective processing flags |
| 191 | + sqs_message = { |
| 192 | + 'document': document_for_processing, |
| 193 | + 'processing_mode': 'selective', |
| 194 | + 'skip_ocr': True, |
| 195 | + 'skip_classification': True, |
| 196 | + 'modified_sections': modified_section_ids, |
| 197 | + 'reprocess_extraction_only': True |
| 198 | + } |
| 199 | + |
| 200 | + if QUEUE_URL: |
| 201 | + response = sqs_client.send_message( |
| 202 | + QueueUrl=QUEUE_URL, |
| 203 | + MessageBody=json.dumps(sqs_message) |
| 204 | + ) |
| 205 | + |
| 206 | + logger.info(f"Sent document to SQS queue. MessageId: {response.get('MessageId')}") |
| 207 | + processing_job_id = response.get('MessageId') |
| 208 | + else: |
| 209 | + logger.warning("QUEUE_URL not configured, skipping SQS message") |
| 210 | + processing_job_id = None |
| 211 | + |
| 212 | + return { |
| 213 | + 'success': True, |
| 214 | + 'message': f'Successfully processed changes for {len(modified_sections)} sections', |
| 215 | + 'processingJobId': processing_job_id |
| 216 | + } |
| 217 | + |
| 218 | + except Exception as e: |
| 219 | + logger.error(f"Error processing changes: {str(e)}") |
| 220 | + return { |
| 221 | + 'success': False, |
| 222 | + 'message': f'Error processing changes: {str(e)}', |
| 223 | + 'processingJobId': None |
| 224 | + } |
| 225 | + |
| 226 | +def clear_extraction_data(s3_uri): |
| 227 | + """Clear extraction data from S3""" |
| 228 | + try: |
| 229 | + if not s3_uri or not s3_uri.startswith('s3://'): |
| 230 | + return |
| 231 | + |
| 232 | + # Parse S3 URI |
| 233 | + parts = s3_uri.replace('s3://', '').split('/', 1) |
| 234 | + if len(parts) != 2: |
| 235 | + return |
| 236 | + |
| 237 | + bucket, key = parts |
| 238 | + |
| 239 | + # Delete the object |
| 240 | + s3_client.delete_object(Bucket=bucket, Key=key) |
| 241 | + logger.info(f"Cleared extraction data: {s3_uri}") |
| 242 | + |
| 243 | + except Exception as e: |
| 244 | + logger.warning(f"Failed to clear extraction data {s3_uri}: {str(e)}") |
0 commit comments