1616import time
1717from concurrent .futures import ThreadPoolExecutor , as_completed
1818from datetime import datetime , timedelta , timezone
19+ from decimal import Decimal
1920from typing import Any , Dict , List , Optional , Set , Union
2021
2122import boto3
@@ -1035,34 +1036,39 @@ def _get_cached_page_classifications(
10351036 logger .info (f"No cache entry found for document { document .id } " )
10361037 return {}
10371038
1038- # Parse cached data
1039+ # Parse cached data from JSON
10391040 cached_data = response ["Item" ]
1040- logger .debug (f"Cached data: { cached_data } " )
1041+ logger .debug (f"Cached data keys : { list ( cached_data . keys ()) } " )
10411042 page_classifications = {}
10421043
1043- # Extract page classifications from separate page attributes
1044- for attr_name , attr_value in cached_data . items () :
1045- if attr_name . startswith ( "page_" ) :
1046- page_id = attr_name [ 5 :] # Remove "page_" prefix
1044+ # Extract page classifications from JSON attribute
1045+ if "page_classifications" in cached_data :
1046+ try :
1047+ page_data_list = json . loads ( cached_data [ "page_classifications" ])
10471048
1048- # Extract page data from DynamoDB item
1049- page_data = attr_value
1050- page_classifications [page_id ] = PageClassification (
1051- page_id = page_id ,
1052- classification = DocumentClassification (
1053- doc_type = page_data . get ( "doc_type" , "unclassified" ) ,
1054- confidence = page_data . get ( " confidence", 1.0 ) ,
1055- metadata = page_data . get ( " metadata", {}) ,
1056- ),
1057- image_uri = page_data .get ("image_uri" ),
1058- text_uri = page_data .get ("text_uri" ),
1059- raw_text_uri = page_data .get ("raw_text_uri" ),
1060- )
1049+ for page_data in page_data_list :
1050+ page_id = page_data [ "page_id" ]
1051+ page_classifications [page_id ] = PageClassification (
1052+ page_id = page_id ,
1053+ classification = DocumentClassification (
1054+ doc_type = page_data [ "classification" ][ "doc_type" ] ,
1055+ confidence = page_data [ "classification" ][ " confidence"] ,
1056+ metadata = page_data [ "classification" ][ " metadata"] ,
1057+ ),
1058+ image_uri = page_data .get ("image_uri" ),
1059+ text_uri = page_data .get ("text_uri" ),
1060+ raw_text_uri = page_data .get ("raw_text_uri" ),
1061+ )
10611062
1062- if page_classifications :
1063- logger .info (
1064- f"Retrieved { len (page_classifications )} cached page classifications for document { document .id } (PK: { cache_key } )"
1065- )
1063+ if page_classifications :
1064+ logger .info (
1065+ f"Retrieved { len (page_classifications )} cached page classifications for document { document .id } (PK: { cache_key } )"
1066+ )
1067+
1068+ except json .JSONDecodeError as e :
1069+ logger .warning (
1070+ f"Failed to parse cached page classifications JSON for document { document .id } : { e } "
1071+ )
10661072
10671073 return page_classifications
10681074
@@ -1076,7 +1082,7 @@ def _cache_successful_page_classifications(
10761082 self , document : Document , page_classifications : List [PageClassification ]
10771083 ) -> None :
10781084 """
1079- Cache successful page classifications to DynamoDB as separate attributes .
1085+ Cache successful page classifications to DynamoDB as a JSON-serialized list .
10801086
10811087 Args:
10821088 document: Document object
@@ -1088,45 +1094,48 @@ def _cache_successful_page_classifications(
10881094 cache_key = self ._get_cache_key (document )
10891095
10901096 try :
1091- # Filter out failed classifications and prepare item structure
1092- item = {
1093- "PK" : cache_key ,
1094- "SK" : "none" ,
1095- "cached_at" : str (int (time .time ())),
1096- "document_id" : document .id ,
1097- "workflow_execution_arn" : document .workflow_execution_arn ,
1098- "ExpiresAfter" : int (
1099- (datetime .now (timezone .utc ) + timedelta (days = 1 )).timestamp ()
1100- ),
1101- }
1102-
1103- successful_count = 0
1097+ # Filter out failed classifications and prepare data for JSON serialization
1098+ successful_pages = []
11041099 for page_result in page_classifications :
11051100 # Only cache if there's no error in the metadata
11061101 if "error" not in page_result .classification .metadata :
1107- # Store each page as a separate attribute with "page_" prefix
1108- page_attr_name = f"page_{ page_result .page_id } "
1109- item [page_attr_name ] = {
1110- "doc_type" : page_result .classification .doc_type ,
1111- "confidence" : page_result .classification .confidence ,
1112- "metadata" : page_result .classification .metadata ,
1102+ page_data = {
1103+ "page_id" : page_result .page_id ,
1104+ "classification" : {
1105+ "doc_type" : page_result .classification .doc_type ,
1106+ "confidence" : page_result .classification .confidence ,
1107+ "metadata" : page_result .classification .metadata ,
1108+ },
11131109 "image_uri" : page_result .image_uri ,
11141110 "text_uri" : page_result .text_uri ,
11151111 "raw_text_uri" : page_result .raw_text_uri ,
11161112 }
1117- successful_count += 1
1113+ successful_pages . append ( page_data )
11181114
1119- if successful_count == 0 :
1115+ if len ( successful_pages ) == 0 :
11201116 logger .debug (
11211117 f"No successful page classifications to cache for document { document .id } "
11221118 )
11231119 return
11241120
1125- # Store in DynamoDB using Table resource with separate page attributes
1121+ # Prepare item structure with JSON-serialized page classifications
1122+ item = {
1123+ "PK" : cache_key ,
1124+ "SK" : "none" ,
1125+ "cached_at" : str (int (time .time ())),
1126+ "document_id" : document .id ,
1127+ "workflow_execution_arn" : document .workflow_execution_arn ,
1128+ "page_classifications" : json .dumps (successful_pages ),
1129+ "ExpiresAfter" : int (
1130+ (datetime .now (timezone .utc ) + timedelta (days = 1 )).timestamp ()
1131+ ),
1132+ }
1133+
1134+ # Store in DynamoDB using Table resource with JSON serialization
11261135 self .cache_table .put_item (Item = item )
11271136
11281137 logger .info (
1129- f"Cached { successful_count } successful page classifications for document { document .id } (PK: { cache_key } )"
1138+ f"Cached { len ( successful_pages ) } successful page classifications for document { document .id } (PK: { cache_key } )"
11301139 )
11311140
11321141 except Exception as e :
@@ -1263,26 +1272,55 @@ def classify_document(self, document: Document) -> Document:
12631272
12641273 # Mark page as unclassified on error
12651274 if page_id in document .pages :
1266- document .pages [page_id ].classification = "unclassified"
1275+ document .pages [
1276+ page_id
1277+ ].classification = "error (retrying)"
12671278 document .pages [page_id ].confidence = 0.0
12681279
1269- # Store failed page exceptions in document metadata for caller to access
1270- if failed_page_exceptions :
1271- # Store the first encountered exception as the primary failure cause
1272- first_exception = next (iter (failed_page_exceptions .values ()))
1273- document .metadata = document .metadata or {}
1274- document .metadata ["failed_page_exceptions" ] = {
1275- page_id : {
1276- "exception_type" : type (exc ).__name__ ,
1277- "exception_message" : str (exc ),
1278- "exception_class" : exc .__class__ .__module__
1279- + "."
1280- + exc .__class__ .__name__ ,
1280+ # Store failed page exceptions in document metadata for caller to access
1281+ if failed_page_exceptions :
1282+ logger .info (
1283+ f"Processing { len (failed_page_exceptions )} failed page exceptions for document { document .id } "
1284+ )
1285+
1286+ # Store the first encountered exception as the primary failure cause
1287+ first_exception = next (iter (failed_page_exceptions .values ()))
1288+ document .metadata = document .metadata or {}
1289+ document .metadata ["failed_page_exceptions" ] = {
1290+ page_id : {
1291+ "exception_type" : type (exc ).__name__ ,
1292+ "exception_message" : str (exc ),
1293+ "exception_class" : exc .__class__ .__module__
1294+ + "."
1295+ + exc .__class__ .__name__ ,
1296+ }
1297+ for page_id , exc in failed_page_exceptions .items ()
12811298 }
1282- for page_id , exc in failed_page_exceptions .items ()
1283- }
1284- # Store the primary exception for easy access by caller
1285- document .metadata ["primary_exception" ] = first_exception
1299+ # Store the primary exception for easy access by caller
1300+ document .metadata ["primary_exception" ] = first_exception
1301+
1302+ # Cache successful page classifications (only when some pages fail - for retry scenarios)
1303+ successful_results = [
1304+ r
1305+ for r in all_page_results
1306+ if "error" not in r .classification .metadata
1307+ ]
1308+ if successful_results :
1309+ logger .info (
1310+ f"Caching { len (successful_results )} successful page classifications for document { document .id } due to { len (failed_page_exceptions )} failed pages (retry scenario)"
1311+ )
1312+ self ._cache_successful_page_classifications (
1313+ document , successful_results
1314+ )
1315+ else :
1316+ logger .warning (
1317+ f"No successful page classifications to cache for document { document .id } - all { len (failed_page_exceptions )} pages failed"
1318+ )
1319+ else :
1320+ # All pages succeeded - no need to cache since there won't be retries
1321+ logger .info (
1322+ f"All pages succeeded for document { document .id } - skipping cache (no retry needed)"
1323+ )
12861324 else :
12871325 logger .info (
12881326 f"All { len (cached_page_classifications )} page classifications found in cache"
@@ -1334,23 +1372,7 @@ def classify_document(self, document: Document) -> Document:
13341372 )
13351373
13361374 except Exception as e :
1337- # Cache successful page classifications before handling exception
1338- if pages_to_classify :
1339- successful_results = [
1340- r
1341- for r in all_page_results
1342- if "error" not in r .classification .metadata
1343- ]
1344- if successful_results :
1345- self ._cache_successful_page_classifications (
1346- document , successful_results
1347- )
1348- else :
1349- logger .warning ("No successful page classifications to cache" )
1350- else :
1351- logger .warning ("No pages to classify, nothing to cache" )
1352-
1353- error_msg = f"Error classifying document - cached partial results: { str (e )} "
1375+ error_msg = f"Error classifying all document pages: { str (e )} "
13541376 document = self ._update_document_status (
13551377 document , success = False , error_message = error_msg
13561378 )
0 commit comments