Skip to content

Commit 0109138

Browse files
committed
fixes
1 parent 794ead7 commit 0109138

File tree

11 files changed

+544
-246
lines changed

11 files changed

+544
-246
lines changed

lib/idp_common_pkg/idp_common/assessment/granular_service.py

Lines changed: 58 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@
1212
"""
1313

1414
import json
15-
import logging
1615
import os
1716
import time
18-
from dataclasses import dataclass
1917
from typing import Any, Dict, List, Optional, Tuple, Union
2018

19+
from aws_lambda_powertools import Logger
20+
2121
from idp_common import image, metrics, s3, utils
22+
from idp_common.assessment.models import AssessmentResult, AssessmentTask
2223
from idp_common.assessment.strands_executor import execute_assessment_tasks_parallel
2324
from idp_common.config.models import IDPConfig
2425
from idp_common.config.schema_constants import (
@@ -35,44 +36,7 @@
3536
from idp_common.utils import check_token_limit
3637
from idp_common.utils.grid_overlay import add_ruler_edges
3738

38-
logger = logging.getLogger(__name__)
39-
40-
41-
@dataclass
42-
class AssessmentTask:
43-
"""Single-field assessment task for Strands executor."""
44-
45-
task_id: str
46-
task_type: str # Always "attribute" - single field assessment
47-
48-
# Path to field as tuple: ("address", "street") or ("items", 0, "price")
49-
field_path: Tuple[Union[str, int], ...]
50-
51-
# The field name being assessed (last element of path)
52-
field_name: str
53-
54-
# Schema for this specific field only
55-
field_schema: Dict[str, Any]
56-
57-
# Confidence threshold for this field
58-
confidence_threshold: float
59-
60-
# Direct reference to parent container in assessment structure (for O(1) insertion)
61-
# Can be Dict for regular fields or List for array items
62-
parent_assessment_dict: Union[Dict[str, Any], List[Any]]
63-
64-
65-
@dataclass
66-
class AssessmentResult:
67-
"""Result of a single assessment task."""
68-
69-
task_id: str
70-
success: bool
71-
assessment_data: Dict[str, Any]
72-
confidence_alerts: List[Dict[str, Any]]
73-
error_message: Optional[str] = None
74-
processing_time: float = 0.0
75-
metering: Optional[Dict[str, Any]] = None
39+
logger = Logger(service="assessment", level=os.getenv("LOG_LEVEL", "INFO"))
7640

7741

7842
def _safe_float_conversion(value: Any, default: float = 0.0) -> float:
@@ -114,49 +78,13 @@ def _safe_float_conversion(value: Any, default: float = 0.0) -> float:
11478
return default
11579

11680

117-
def _get_value_at_path(data: Dict[str, Any], path: Tuple[Union[str, int], ...]) -> Any:
118-
"""
119-
Navigate nested data structure using tuple path.
120-
121-
Args:
122-
data: Nested dictionary/list structure
123-
path: Tuple of keys/indices like ("address", "street") or ("items", 0, "price")
124-
125-
Returns:
126-
Value at the specified path, or None if path doesn't exist
127-
128-
Examples:
129-
>>> data = {"address": {"street": "123 Main St"}}
130-
>>> _get_value_at_path(data, ("address", "street"))
131-
"123 Main St"
132-
133-
>>> data = {"items": [{"price": 10.99}, {"price": 20.99}]}
134-
>>> _get_value_at_path(data, ("items", 0, "price"))
135-
10.99
136-
"""
137-
current = data
138-
for key in path:
139-
if current is None:
140-
return None
141-
if isinstance(current, dict):
142-
current = current.get(key)
143-
elif isinstance(current, list):
144-
if isinstance(key, int) and 0 <= key < len(current):
145-
current = current[key]
146-
else:
147-
return None
148-
else:
149-
return None
150-
return current
151-
152-
15381
class GranularAssessmentService:
15482
"""Enhanced assessment service with granular, cached, and parallel processing."""
15583

15684
def __init__(
15785
self,
15886
region: str | None = None,
159-
config: Dict[str, Any] | IDPConfig | None = None,
87+
config: dict[str, Any] | IDPConfig | None = None,
16088
cache_table: str | None = None,
16189
):
16290
"""
@@ -194,7 +122,9 @@ def __init__(
194122
if self.cache_table_name:
195123
import boto3
196124

197-
dynamodb = boto3.resource("dynamodb", region_name=self.region)
125+
dynamodb: DynamoDBServiceResource = boto3.resource(
126+
"dynamodb", region_name=self.region
127+
) # pyright: ignore[reportAssignmentType]modb", region_name=self.region)
198128
self.cache_table = dynamodb.Table(self.cache_table_name)
199129
logger.info(
200130
f"Granular assessment caching enabled using table: {self.cache_table_name}"
@@ -220,7 +150,7 @@ def __init__(
220150
f"caching={'enabled' if self.cache_table else 'disabled'}"
221151
)
222152

223-
def _get_class_schema(self, class_label: str) -> Dict[str, Any]:
153+
def _get_class_schema(self, class_label: str) -> dict[str, Any]:
224154
"""
225155
Get JSON Schema for a specific document class.
226156
@@ -238,7 +168,7 @@ def _get_class_schema(self, class_label: str) -> Dict[str, Any]:
238168
return {}
239169

240170
def _get_confidence_threshold_by_path(
241-
self, properties: Dict[str, Any], path: str, default: float = 0.9
171+
self, properties: dict[str, Any], path: str, default: float = 0.9
242172
) -> float:
243173
"""
244174
Get confidence threshold for a property path (e.g., 'CompanyAddress.Street').
@@ -283,10 +213,10 @@ def _get_confidence_threshold_by_path(
283213

284214
def _create_assessment_tasks(
285215
self,
286-
extraction_results: Dict[str, Any],
287-
properties: Dict[str, Any],
216+
extraction_results: dict[str, Any],
217+
properties: dict[str, Any],
288218
default_confidence_threshold: float,
289-
) -> Tuple[List[AssessmentTask], Dict[str, Any]]:
219+
) -> tuple[list[AssessmentTask], dict[str, Any]]:
290220
"""
291221
Create assessment tasks and pre-build assessment structure.
292222
@@ -302,18 +232,18 @@ def _create_assessment_tasks(
302232
303233
Returns:
304234
Tuple of (tasks, assessment_structure)
305-
- tasks: List of AssessmentTask objects
306-
- assessment_structure: Dict mirroring extraction_results shape
235+
- tasks: list of AssessmentTask objects
236+
- assessment_structure: dict mirroring extraction_results shape
307237
"""
308-
tasks: List[AssessmentTask] = []
309-
assessment_structure: Dict[str, Any] = {}
238+
tasks: list[AssessmentTask] = []
239+
assessment_structure: dict[str, Any] = {}
310240
task_counter = [0] # Use list for mutable counter in nested function
311241

312242
def _traverse(
313-
schema_props: Dict[str, Any],
314-
extraction_data: Dict[str, Any],
315-
current_path: Tuple[Union[str, int], ...],
316-
parent_dict: Dict[str, Any],
243+
schema_props: dict[str, Any],
244+
extraction_data: dict[str, Any],
245+
current_path: tuple[str | int, ...],
246+
parent_dict: dict[str, Any],
317247
) -> None:
318248
"""
319249
Recursively traverse schema and extraction data to build tasks and structure.
@@ -334,7 +264,7 @@ def _traverse(
334264

335265
if prop_type == TYPE_OBJECT and isinstance(prop_value, dict):
336266
# Create nested dict in assessment structure
337-
nested_dict: Dict[str, Any] = {}
267+
nested_dict: dict[str, Any] = {}
338268
parent_dict[prop_name] = nested_dict
339269

340270
# Recurse into nested object
@@ -343,7 +273,7 @@ def _traverse(
343273

344274
elif prop_type == TYPE_ARRAY and isinstance(prop_value, list):
345275
# Create list in assessment structure
346-
assessment_list: List[Any] = []
276+
assessment_list: list[Any] = []
347277
parent_dict[prop_name] = assessment_list
348278

349279
# Process each array item
@@ -355,7 +285,7 @@ def _traverse(
355285

356286
if item_type == TYPE_OBJECT and isinstance(item_value, dict):
357287
# Create dict for this array item
358-
item_dict: Dict[str, Any] = {}
288+
item_dict: dict[str, Any] = {}
359289
assessment_list.append(item_dict)
360290

361291
# Recurse into array item properties
@@ -438,7 +368,7 @@ def _get_cache_key(
438368

439369
def _get_cached_assessment_tasks(
440370
self, document_id: str, workflow_execution_arn: str, section_id: str
441-
) -> Dict[str, AssessmentResult]:
371+
) -> dict[str, AssessmentResult]:
442372
"""
443373
Retrieve cached assessment task results for a document section.
444374
@@ -448,7 +378,7 @@ def _get_cached_assessment_tasks(
448378
section_id: Section ID
449379
450380
Returns:
451-
Dictionary mapping task_id to cached AssessmentResult, empty dict if no cache
381+
dictionary mapping task_id to cached AssessmentResult, empty dict if no cache
452382
"""
453383
logger.info(
454384
f"Attempting to retrieve cached assessment tasks for document {document_id} section {section_id}"
@@ -476,8 +406,6 @@ def _get_cached_assessment_tasks(
476406
# Extract task results from JSON attribute
477407
if "task_results" in cached_data:
478408
try:
479-
import json
480-
481409
task_data_list = json.loads(cached_data["task_results"])
482410

483411
for task_data in task_data_list:
@@ -515,7 +443,7 @@ def _cache_successful_assessment_tasks(
515443
document_id: str,
516444
workflow_execution_arn: str,
517445
section_id: str,
518-
task_results: List[AssessmentResult],
446+
task_results: list[AssessmentResult],
519447
) -> None:
520448
"""
521449
Cache successful assessment task results to DynamoDB as a JSON-serialized list.
@@ -524,7 +452,7 @@ def _cache_successful_assessment_tasks(
524452
document_id: Document ID
525453
workflow_execution_arn: Workflow execution ARN
526454
section_id: Section ID
527-
task_results: List of successful assessment task results
455+
task_results: list of successful assessment task results
528456
"""
529457
if not self.cache_table or not task_results:
530458
return
@@ -608,16 +536,16 @@ def _is_throttling_exception(self, exception: Exception) -> bool:
608536

609537
def _aggregate_assessment_results(
610538
self,
611-
tasks: List[AssessmentTask],
612-
results: List[AssessmentResult],
613-
assessment_structure: Dict[str, Any],
614-
) -> Tuple[Dict[str, Any], List[Dict[str, Any]], Dict[str, Any]]:
539+
tasks: list[AssessmentTask],
540+
results: list[AssessmentResult],
541+
assessment_structure: dict[str, Any],
542+
) -> tuple[dict[str, Any], list[dict[str, Any]], dict[str, Any]]:
615543
"""
616544
Aggregate individual task results into assessment structure using direct parent insertion.
617545
618546
Args:
619-
tasks: List of assessment tasks
620-
results: List of assessment results
547+
tasks: list of assessment tasks
548+
results: list of assessment results
621549
assessment_structure: Pre-built assessment structure from _create_assessment_tasks
622550
623551
Returns:
@@ -728,17 +656,17 @@ def _get_text_confidence_data(self, page) -> str:
728656
return ""
729657

730658
def _convert_bbox_to_geometry(
731-
self, bbox_coords: List[float], page_num: int
732-
) -> Dict[str, Any]:
659+
self, bbox_coords: list[float], page_num: int
660+
) -> dict[str, Any]:
733661
"""
734662
Convert [x1,y1,x2,y2] coordinates to geometry format.
735663
736664
Args:
737-
bbox_coords: List of 4 coordinates [x1, y1, x2, y2] in 0-1000 scale
665+
bbox_coords: list of 4 coordinates [x1, y1, x2, y2] in 0-1000 scale
738666
page_num: Page number where the bounding box appears
739667
740668
Returns:
741-
Dictionary in geometry format compatible with pattern-1 UI
669+
dictionary in geometry format compatible with pattern-1 UI
742670
"""
743671
if len(bbox_coords) != 4:
744672
raise ValueError(f"Expected 4 coordinates, got {len(bbox_coords)}")
@@ -761,8 +689,8 @@ def _convert_bbox_to_geometry(
761689
}
762690

763691
def _process_single_assessment_geometry(
764-
self, attr_assessment: Dict[str, Any], attr_name: str = ""
765-
) -> Dict[str, Any]:
692+
self, attr_assessment: dict[str, Any], attr_name: str = ""
693+
) -> dict[str, Any]:
766694
"""
767695
Process geometry data for a single assessment (with confidence key).
768696
@@ -819,8 +747,8 @@ def _process_single_assessment_geometry(
819747
return enhanced_attr
820748

821749
def _extract_geometry_from_assessment(
822-
self, assessment_data: Dict[str, Any]
823-
) -> Dict[str, Any]:
750+
self, assessment_data: dict[str, Any]
751+
) -> dict[str, Any]:
824752
"""
825753
Extract geometry data from assessment response and convert to proper format.
826754
Now supports recursive processing of nested group attributes.
@@ -971,6 +899,9 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
971899
# Read page images with configurable dimensions (type-safe access)
972900
target_width = self.config.assessment.image.target_width
973901
target_height = self.config.assessment.image.target_height
902+
logger.info(
903+
f"Image resize config: target_width={target_width}, target_height={target_height}"
904+
)
974905

975906
page_images = []
976907
for page_id in sorted_page_ids:
@@ -979,9 +910,12 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
979910

980911
page = document.pages[page_id]
981912
image_uri = page.image_uri
982-
# Just pass the values directly - prepare_image handles empty strings/None
913+
# For assessment, convert to PNG for better compression with rulers/overlays
983914
image_content = image.prepare_image(
984-
image_uri, target_width, target_height
915+
image_uri, target_width, target_height, output_format="PNG"
916+
)
917+
logger.info(
918+
f"Loaded page {page_id} image as PNG: {len(image_content):,} bytes"
985919
)
986920
page_images.append(image_content)
987921

@@ -1054,8 +988,11 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
1054988

1055989
# Apply grid overlay to page images for assessment
1056990
grid_page_images = []
1057-
for page_img in page_images:
991+
for idx, page_img in enumerate(page_images):
1058992
grid_img = add_ruler_edges(page_img)
993+
logger.info(
994+
f"Added ruler overlay to page {idx}: {len(page_img):,} bytes -> {len(grid_img):,} bytes"
995+
)
1059996
grid_page_images.append(grid_img)
1060997

1061998
# Execute tasks using Strands-based parallel executor
@@ -1076,6 +1013,7 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
10761013
system_prompt=self.config.assessment.system_prompt,
10771014
temperature=self.config.assessment.temperature,
10781015
max_tokens=self.config.assessment.max_tokens,
1016+
document_schema=class_schema,
10791017
max_concurrent=self.max_workers,
10801018
)
10811019
)
@@ -1365,10 +1303,10 @@ def assess_document(self, document: Document) -> Document:
13651303
def _handle_parsing_errors(
13661304
self,
13671305
document: Document,
1368-
failed_tasks: List[str],
1306+
failed_tasks: list[str],
13691307
document_text: str,
1370-
extraction_results: Dict,
1371-
) -> Optional[str]:
1308+
extraction_results: dict,
1309+
) -> str | None:
13721310
"""Handle multiple parsing errors with user-friendly messaging."""
13731311
# Check for token limit issues
13741312
token_warning = check_token_limit(

0 commit comments

Comments
 (0)