1212"""
1313
1414import json
15- import logging
1615import os
1716import time
18- from dataclasses import dataclass
1917from typing import Any , Dict , List , Optional , Tuple , Union
2018
19+ from aws_lambda_powertools import Logger
20+
2121from idp_common import image , metrics , s3 , utils
22+ from idp_common .assessment .models import AssessmentResult , AssessmentTask
2223from idp_common .assessment .strands_executor import execute_assessment_tasks_parallel
2324from idp_common .config .models import IDPConfig
2425from idp_common .config .schema_constants import (
3536from idp_common .utils import check_token_limit
3637from idp_common .utils .grid_overlay import add_ruler_edges
3738
38- logger = logging .getLogger (__name__ )
39-
40-
41- @dataclass
42- class AssessmentTask :
43- """Single-field assessment task for Strands executor."""
44-
45- task_id : str
46- task_type : str # Always "attribute" - single field assessment
47-
48- # Path to field as tuple: ("address", "street") or ("items", 0, "price")
49- field_path : Tuple [Union [str , int ], ...]
50-
51- # The field name being assessed (last element of path)
52- field_name : str
53-
54- # Schema for this specific field only
55- field_schema : Dict [str , Any ]
56-
57- # Confidence threshold for this field
58- confidence_threshold : float
59-
60- # Direct reference to parent container in assessment structure (for O(1) insertion)
61- # Can be Dict for regular fields or List for array items
62- parent_assessment_dict : Union [Dict [str , Any ], List [Any ]]
63-
64-
65- @dataclass
66- class AssessmentResult :
67- """Result of a single assessment task."""
68-
69- task_id : str
70- success : bool
71- assessment_data : Dict [str , Any ]
72- confidence_alerts : List [Dict [str , Any ]]
73- error_message : Optional [str ] = None
74- processing_time : float = 0.0
75- metering : Optional [Dict [str , Any ]] = None
39+ logger = Logger (service = "assessment" , level = os .getenv ("LOG_LEVEL" , "INFO" ))
7640
7741
7842def _safe_float_conversion (value : Any , default : float = 0.0 ) -> float :
@@ -114,49 +78,13 @@ def _safe_float_conversion(value: Any, default: float = 0.0) -> float:
11478 return default
11579
11680
117- def _get_value_at_path (data : Dict [str , Any ], path : Tuple [Union [str , int ], ...]) -> Any :
118- """
119- Navigate nested data structure using tuple path.
120-
121- Args:
122- data: Nested dictionary/list structure
123- path: Tuple of keys/indices like ("address", "street") or ("items", 0, "price")
124-
125- Returns:
126- Value at the specified path, or None if path doesn't exist
127-
128- Examples:
129- >>> data = {"address": {"street": "123 Main St"}}
130- >>> _get_value_at_path(data, ("address", "street"))
131- "123 Main St"
132-
133- >>> data = {"items": [{"price": 10.99}, {"price": 20.99}]}
134- >>> _get_value_at_path(data, ("items", 0, "price"))
135- 10.99
136- """
137- current = data
138- for key in path :
139- if current is None :
140- return None
141- if isinstance (current , dict ):
142- current = current .get (key )
143- elif isinstance (current , list ):
144- if isinstance (key , int ) and 0 <= key < len (current ):
145- current = current [key ]
146- else :
147- return None
148- else :
149- return None
150- return current
151-
152-
15381class GranularAssessmentService :
15482 """Enhanced assessment service with granular, cached, and parallel processing."""
15583
15684 def __init__ (
15785 self ,
15886 region : str | None = None ,
159- config : Dict [str , Any ] | IDPConfig | None = None ,
87+ config : dict [str , Any ] | IDPConfig | None = None ,
16088 cache_table : str | None = None ,
16189 ):
16290 """
@@ -194,7 +122,9 @@ def __init__(
194122 if self .cache_table_name :
195123 import boto3
196124
197- dynamodb = boto3 .resource ("dynamodb" , region_name = self .region )
125+ dynamodb : DynamoDBServiceResource = boto3 .resource (
126+ "dynamodb" , region_name = self .region
127+ ) # pyright: ignore[reportAssignmentType]modb", region_name=self.region)
198128 self .cache_table = dynamodb .Table (self .cache_table_name )
199129 logger .info (
200130 f"Granular assessment caching enabled using table: { self .cache_table_name } "
@@ -220,7 +150,7 @@ def __init__(
220150 f"caching={ 'enabled' if self .cache_table else 'disabled' } "
221151 )
222152
223- def _get_class_schema (self , class_label : str ) -> Dict [str , Any ]:
153+ def _get_class_schema (self , class_label : str ) -> dict [str , Any ]:
224154 """
225155 Get JSON Schema for a specific document class.
226156
@@ -238,7 +168,7 @@ def _get_class_schema(self, class_label: str) -> Dict[str, Any]:
238168 return {}
239169
240170 def _get_confidence_threshold_by_path (
241- self , properties : Dict [str , Any ], path : str , default : float = 0.9
171+ self , properties : dict [str , Any ], path : str , default : float = 0.9
242172 ) -> float :
243173 """
244174 Get confidence threshold for a property path (e.g., 'CompanyAddress.Street').
@@ -283,10 +213,10 @@ def _get_confidence_threshold_by_path(
283213
284214 def _create_assessment_tasks (
285215 self ,
286- extraction_results : Dict [str , Any ],
287- properties : Dict [str , Any ],
216+ extraction_results : dict [str , Any ],
217+ properties : dict [str , Any ],
288218 default_confidence_threshold : float ,
289- ) -> Tuple [ List [AssessmentTask ], Dict [str , Any ]]:
219+ ) -> tuple [ list [AssessmentTask ], dict [str , Any ]]:
290220 """
291221 Create assessment tasks and pre-build assessment structure.
292222
@@ -302,18 +232,18 @@ def _create_assessment_tasks(
302232
303233 Returns:
304234 Tuple of (tasks, assessment_structure)
305- - tasks: List of AssessmentTask objects
306- - assessment_structure: Dict mirroring extraction_results shape
235+ - tasks: list of AssessmentTask objects
236+ - assessment_structure: dict mirroring extraction_results shape
307237 """
308- tasks : List [AssessmentTask ] = []
309- assessment_structure : Dict [str , Any ] = {}
238+ tasks : list [AssessmentTask ] = []
239+ assessment_structure : dict [str , Any ] = {}
310240 task_counter = [0 ] # Use list for mutable counter in nested function
311241
312242 def _traverse (
313- schema_props : Dict [str , Any ],
314- extraction_data : Dict [str , Any ],
315- current_path : Tuple [ Union [ str , int ] , ...],
316- parent_dict : Dict [str , Any ],
243+ schema_props : dict [str , Any ],
244+ extraction_data : dict [str , Any ],
245+ current_path : tuple [ str | int , ...],
246+ parent_dict : dict [str , Any ],
317247 ) -> None :
318248 """
319249 Recursively traverse schema and extraction data to build tasks and structure.
@@ -334,7 +264,7 @@ def _traverse(
334264
335265 if prop_type == TYPE_OBJECT and isinstance (prop_value , dict ):
336266 # Create nested dict in assessment structure
337- nested_dict : Dict [str , Any ] = {}
267+ nested_dict : dict [str , Any ] = {}
338268 parent_dict [prop_name ] = nested_dict
339269
340270 # Recurse into nested object
@@ -343,7 +273,7 @@ def _traverse(
343273
344274 elif prop_type == TYPE_ARRAY and isinstance (prop_value , list ):
345275 # Create list in assessment structure
346- assessment_list : List [Any ] = []
276+ assessment_list : list [Any ] = []
347277 parent_dict [prop_name ] = assessment_list
348278
349279 # Process each array item
@@ -355,7 +285,7 @@ def _traverse(
355285
356286 if item_type == TYPE_OBJECT and isinstance (item_value , dict ):
357287 # Create dict for this array item
358- item_dict : Dict [str , Any ] = {}
288+ item_dict : dict [str , Any ] = {}
359289 assessment_list .append (item_dict )
360290
361291 # Recurse into array item properties
@@ -438,7 +368,7 @@ def _get_cache_key(
438368
439369 def _get_cached_assessment_tasks (
440370 self , document_id : str , workflow_execution_arn : str , section_id : str
441- ) -> Dict [str , AssessmentResult ]:
371+ ) -> dict [str , AssessmentResult ]:
442372 """
443373 Retrieve cached assessment task results for a document section.
444374
@@ -448,7 +378,7 @@ def _get_cached_assessment_tasks(
448378 section_id: Section ID
449379
450380 Returns:
451- Dictionary mapping task_id to cached AssessmentResult, empty dict if no cache
381+ dictionary mapping task_id to cached AssessmentResult, empty dict if no cache
452382 """
453383 logger .info (
454384 f"Attempting to retrieve cached assessment tasks for document { document_id } section { section_id } "
@@ -476,8 +406,6 @@ def _get_cached_assessment_tasks(
476406 # Extract task results from JSON attribute
477407 if "task_results" in cached_data :
478408 try :
479- import json
480-
481409 task_data_list = json .loads (cached_data ["task_results" ])
482410
483411 for task_data in task_data_list :
@@ -515,7 +443,7 @@ def _cache_successful_assessment_tasks(
515443 document_id : str ,
516444 workflow_execution_arn : str ,
517445 section_id : str ,
518- task_results : List [AssessmentResult ],
446+ task_results : list [AssessmentResult ],
519447 ) -> None :
520448 """
521449 Cache successful assessment task results to DynamoDB as a JSON-serialized list.
@@ -524,7 +452,7 @@ def _cache_successful_assessment_tasks(
524452 document_id: Document ID
525453 workflow_execution_arn: Workflow execution ARN
526454 section_id: Section ID
527- task_results: List of successful assessment task results
455+ task_results: list of successful assessment task results
528456 """
529457 if not self .cache_table or not task_results :
530458 return
@@ -608,16 +536,16 @@ def _is_throttling_exception(self, exception: Exception) -> bool:
608536
609537 def _aggregate_assessment_results (
610538 self ,
611- tasks : List [AssessmentTask ],
612- results : List [AssessmentResult ],
613- assessment_structure : Dict [str , Any ],
614- ) -> Tuple [ Dict [str , Any ], List [ Dict [str , Any ]], Dict [str , Any ]]:
539+ tasks : list [AssessmentTask ],
540+ results : list [AssessmentResult ],
541+ assessment_structure : dict [str , Any ],
542+ ) -> tuple [ dict [str , Any ], list [ dict [str , Any ]], dict [str , Any ]]:
615543 """
616544 Aggregate individual task results into assessment structure using direct parent insertion.
617545
618546 Args:
619- tasks: List of assessment tasks
620- results: List of assessment results
547+ tasks: list of assessment tasks
548+ results: list of assessment results
621549 assessment_structure: Pre-built assessment structure from _create_assessment_tasks
622550
623551 Returns:
@@ -717,17 +645,17 @@ def _get_text_confidence_data(self, page) -> str:
717645 return "Text Confidence Data Unavailable"
718646
719647 def _convert_bbox_to_geometry (
720- self , bbox_coords : List [float ], page_num : int
721- ) -> Dict [str , Any ]:
648+ self , bbox_coords : list [float ], page_num : int
649+ ) -> dict [str , Any ]:
722650 """
723651 Convert [x1,y1,x2,y2] coordinates to geometry format.
724652
725653 Args:
726- bbox_coords: List of 4 coordinates [x1, y1, x2, y2] in 0-1000 scale
654+ bbox_coords: list of 4 coordinates [x1, y1, x2, y2] in 0-1000 scale
727655 page_num: Page number where the bounding box appears
728656
729657 Returns:
730- Dictionary in geometry format compatible with pattern-1 UI
658+ dictionary in geometry format compatible with pattern-1 UI
731659 """
732660 if len (bbox_coords ) != 4 :
733661 raise ValueError (f"Expected 4 coordinates, got { len (bbox_coords )} " )
@@ -750,8 +678,8 @@ def _convert_bbox_to_geometry(
750678 }
751679
752680 def _process_single_assessment_geometry (
753- self , attr_assessment : Dict [str , Any ], attr_name : str = ""
754- ) -> Dict [str , Any ]:
681+ self , attr_assessment : dict [str , Any ], attr_name : str = ""
682+ ) -> dict [str , Any ]:
755683 """
756684 Process geometry data for a single assessment (with confidence key).
757685
@@ -808,8 +736,8 @@ def _process_single_assessment_geometry(
808736 return enhanced_attr
809737
810738 def _extract_geometry_from_assessment (
811- self , assessment_data : Dict [str , Any ]
812- ) -> Dict [str , Any ]:
739+ self , assessment_data : dict [str , Any ]
740+ ) -> dict [str , Any ]:
813741 """
814742 Extract geometry data from assessment response and convert to proper format.
815743 Now supports recursive processing of nested group attributes.
@@ -960,6 +888,9 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
960888 # Read page images with configurable dimensions (type-safe access)
961889 target_width = self .config .assessment .image .target_width
962890 target_height = self .config .assessment .image .target_height
891+ logger .info (
892+ f"Image resize config: target_width={ target_width } , target_height={ target_height } "
893+ )
963894
964895 page_images = []
965896 for page_id in sorted_page_ids :
@@ -968,9 +899,12 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
968899
969900 page = document .pages [page_id ]
970901 image_uri = page .image_uri
971- # Just pass the values directly - prepare_image handles empty strings/None
902+ # For assessment, convert to PNG for better compression with rulers/overlays
972903 image_content = image .prepare_image (
973- image_uri , target_width , target_height
904+ image_uri , target_width , target_height , output_format = "PNG"
905+ )
906+ logger .info (
907+ f"Loaded page { page_id } image as PNG: { len (image_content ):,} bytes"
974908 )
975909 page_images .append (image_content )
976910
@@ -1043,8 +977,11 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
1043977
1044978 # Apply grid overlay to page images for assessment
1045979 grid_page_images = []
1046- for page_img in page_images :
980+ for idx , page_img in enumerate ( page_images ) :
1047981 grid_img = add_ruler_edges (page_img )
982+ logger .info (
983+ f"Added ruler overlay to page { idx } : { len (page_img ):,} bytes -> { len (grid_img ):,} bytes"
984+ )
1048985 grid_page_images .append (grid_img )
1049986
1050987 # Execute tasks using Strands-based parallel executor
@@ -1065,6 +1002,7 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
10651002 system_prompt = self .config .assessment .system_prompt ,
10661003 temperature = self .config .assessment .temperature ,
10671004 max_tokens = self .config .assessment .max_tokens ,
1005+ document_schema = class_schema ,
10681006 max_concurrent = self .max_workers ,
10691007 )
10701008 )
@@ -1354,10 +1292,10 @@ def assess_document(self, document: Document) -> Document:
13541292 def _handle_parsing_errors (
13551293 self ,
13561294 document : Document ,
1357- failed_tasks : List [str ],
1295+ failed_tasks : list [str ],
13581296 document_text : str ,
1359- extraction_results : Dict ,
1360- ) -> Optional [ str ] :
1297+ extraction_results : dict ,
1298+ ) -> str | None :
13611299 """Handle multiple parsing errors with user-friendly messaging."""
13621300 # Check for token limit issues
13631301 token_warning = check_token_limit (
0 commit comments