Skip to content

Commit d033eb7

Browse files
author
Bob Strahan
committed
convert files to parquet
1 parent 88383ae commit d033eb7

File tree

3 files changed

+144
-56
lines changed

3 files changed

+144
-56
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
./lib/idp_common_pkg[appsync, evaluation]
1+
./lib/idp_common_pkg[appsync, evaluation]
2+
pyarrow==17.0.0

src/lambda/evaluation_function/save_to_reporting.py

Lines changed: 126 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,131 @@
11
"""
2-
Module for saving evaluation results to the reporting bucket in JSON format.
2+
Module for saving evaluation results to the reporting bucket in Parquet format.
33
"""
44

55
import boto3
66
import datetime
7+
import io
78
import json
89
import logging
910
import re
10-
from typing import Dict, Any, List, Optional
11+
from typing import Dict, List, Any
12+
import pyarrow as pa
13+
import pyarrow.parquet as pq
1114

1215
# Configure logging
1316
logger = logging.getLogger(__name__)
1417

18+
def _serialize_value(value: Any) -> str:
19+
"""
20+
Serialize complex values for Parquet storage as strings.
21+
22+
Args:
23+
value: The value to serialize
24+
25+
Returns:
26+
Serialized value as string, or None if input is None
27+
"""
28+
if value is None:
29+
return None
30+
elif isinstance(value, str):
31+
return value
32+
elif isinstance(value, (int, float, bool)):
33+
# Convert numeric/boolean values to strings
34+
return str(value)
35+
elif isinstance(value, (list, dict)):
36+
# Convert complex types to JSON strings
37+
return json.dumps(value)
38+
else:
39+
# Convert other types to string
40+
return str(value)
41+
42+
def _save_records_as_parquet(records: List[Dict], s3_bucket: str, s3_key: str, s3_client, schema: pa.Schema) -> None:
43+
"""
44+
Save a list of records as a Parquet file to S3 with explicit schema.
45+
46+
Args:
47+
records: List of dictionaries to save
48+
s3_bucket: S3 bucket name
49+
s3_key: S3 key path
50+
s3_client: Boto3 S3 client
51+
schema: PyArrow schema for the table
52+
"""
53+
if not records:
54+
logger.warning("No records to save")
55+
return
56+
57+
# Create PyArrow table from records with explicit schema
58+
table = pa.Table.from_pylist(records, schema=schema)
59+
60+
# Create in-memory buffer
61+
buffer = io.BytesIO()
62+
63+
# Write parquet data to buffer
64+
pq.write_table(table, buffer, compression='snappy')
65+
66+
# Upload to S3
67+
buffer.seek(0)
68+
s3_client.put_object(
69+
Bucket=s3_bucket,
70+
Key=s3_key,
71+
Body=buffer.getvalue(),
72+
ContentType='application/octet-stream'
73+
)
74+
logger.info(f"Saved {len(records)} records as Parquet to s3://{s3_bucket}/{s3_key}")
75+
1576
def save_evaluation_to_reporting_bucket(document, reporting_bucket: str) -> None:
1677
"""
17-
Save evaluation results to the reporting bucket in JSON format in three tables:
78+
Save evaluation results to the reporting bucket in Parquet format in three tables:
1879
1. Document level metrics
19-
2. Section level metrics
80+
2. Section level metrics
2081
3. Attribute level metrics
2182
2283
Args:
2384
document: Document with evaluation results
2485
reporting_bucket: S3 bucket for reporting data
2586
"""
87+
# Define schemas for each table to ensure type compatibility
88+
document_schema = pa.schema([
89+
('document_id', pa.string()),
90+
('input_key', pa.string()),
91+
('evaluation_date', pa.timestamp('ms')),
92+
('accuracy', pa.float64()),
93+
('precision', pa.float64()),
94+
('recall', pa.float64()),
95+
('f1_score', pa.float64()),
96+
('false_alarm_rate', pa.float64()),
97+
('false_discovery_rate', pa.float64()),
98+
('execution_time', pa.float64())
99+
])
100+
101+
section_schema = pa.schema([
102+
('document_id', pa.string()),
103+
('section_id', pa.string()),
104+
('section_type', pa.string()),
105+
('accuracy', pa.float64()),
106+
('precision', pa.float64()),
107+
('recall', pa.float64()),
108+
('f1_score', pa.float64()),
109+
('false_alarm_rate', pa.float64()),
110+
('false_discovery_rate', pa.float64()),
111+
('evaluation_date', pa.timestamp('ms'))
112+
])
113+
114+
attribute_schema = pa.schema([
115+
('document_id', pa.string()),
116+
('section_id', pa.string()),
117+
('section_type', pa.string()),
118+
('attribute_name', pa.string()),
119+
('expected', pa.string()),
120+
('actual', pa.string()),
121+
('matched', pa.bool_()),
122+
('score', pa.float64()),
123+
('reason', pa.string()),
124+
('evaluation_method', pa.string()),
125+
('expected_confidence', pa.string()),
126+
('actual_confidence', pa.string()),
127+
('evaluation_date', pa.timestamp('ms'))
128+
])
26129
logger.info(f"Writing evaluation results to ReportingBucket s3://{reporting_bucket}/evaluation_metrics/document_metrics")
27130
try:
28131
if not document.evaluation_result:
@@ -42,7 +145,7 @@ def save_evaluation_to_reporting_bucket(document, reporting_bucket: str) -> None
42145
document_record = {
43146
'document_id': document.id,
44147
'input_key': document.input_key,
45-
'evaluation_date': now.isoformat(),
148+
'evaluation_date': now, # Use datetime object directly
46149
'accuracy': eval_result.overall_metrics.get('accuracy', 0.0),
47150
'precision': eval_result.overall_metrics.get('precision', 0.0),
48151
'recall': eval_result.overall_metrics.get('recall', 0.0),
@@ -52,15 +155,9 @@ def save_evaluation_to_reporting_bucket(document, reporting_bucket: str) -> None
52155
'execution_time': eval_result.execution_time,
53156
}
54157

55-
# Save document metrics in JSON Lines format
56-
doc_key = f"evaluation_metrics/document_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.jsonl"
57-
s3_client.put_object(
58-
Bucket=reporting_bucket,
59-
Key=doc_key,
60-
Body=json.dumps(document_record),
61-
ContentType='application/x-ndjson'
62-
)
63-
logger.info(f"Saved document metrics to s3://{reporting_bucket}/{doc_key}")
158+
# Save document metrics in Parquet format
159+
doc_key = f"evaluation_metrics/document_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.parquet"
160+
_save_records_as_parquet([document_record], reporting_bucket, doc_key, s3_client, document_schema)
64161

65162
# 2. Section level metrics
66163
section_records = []
@@ -85,7 +182,7 @@ def save_evaluation_to_reporting_bucket(document, reporting_bucket: str) -> None
85182
'f1_score': section_result.metrics.get('f1_score', 0.0),
86183
'false_alarm_rate': section_result.metrics.get('false_alarm_rate', 0.0),
87184
'false_discovery_rate': section_result.metrics.get('false_discovery_rate', 0.0),
88-
'evaluation_date': now.isoformat(),
185+
'evaluation_date': now, # Use datetime object directly
89186
}
90187
section_records.append(section_record)
91188

@@ -106,48 +203,34 @@ def save_evaluation_to_reporting_bucket(document, reporting_bucket: str) -> None
106203
'document_id': document.id,
107204
'section_id': section_id,
108205
'section_type': section_type,
109-
'attribute_name': getattr(attr, 'name', ''),
110-
'expected': getattr(attr, 'expected', ''),
111-
'actual': getattr(attr, 'actual', ''),
206+
'attribute_name': _serialize_value(getattr(attr, 'name', '')),
207+
'expected': _serialize_value(getattr(attr, 'expected', '')),
208+
'actual': _serialize_value(getattr(attr, 'actual', '')),
112209
'matched': getattr(attr, 'matched', False),
113210
'score': getattr(attr, 'score', 0.0),
114-
'reason': getattr(attr, 'reason', ''),
115-
'evaluation_method': getattr(attr, 'evaluation_method', ''),
116-
'expected_confidence': getattr(attr, 'expected_confidence', None),
117-
'actual_confidence': getattr(attr, 'actual_confidence', None),
118-
'evaluation_date': now.isoformat(),
211+
'reason': _serialize_value(getattr(attr, 'reason', '')),
212+
'evaluation_method': _serialize_value(getattr(attr, 'evaluation_method', '')),
213+
'expected_confidence': _serialize_value(getattr(attr, 'expected_confidence', None)),
214+
'actual_confidence': _serialize_value(getattr(attr, 'actual_confidence', None)),
215+
'evaluation_date': now, # Use datetime object directly
119216
}
120217
attribute_records.append(attribute_record)
121218
logger.debug(f"Added attribute record for attribute_name={getattr(attr, 'name', '')}")
122219

123220
# Log counts
124221
logger.info(f"Collected {len(section_records)} section records and {len(attribute_records)} attribute records")
125222

126-
# Save section metrics in JSON Lines format
223+
# Save section metrics in Parquet format
127224
if section_records:
128-
section_key = f"evaluation_metrics/section_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.jsonl"
129-
section_lines = '\n'.join(json.dumps(record) for record in section_records)
130-
s3_client.put_object(
131-
Bucket=reporting_bucket,
132-
Key=section_key,
133-
Body=section_lines,
134-
ContentType='application/x-ndjson'
135-
)
136-
logger.info(f"Saved {len(section_records)} section metrics to s3://{reporting_bucket}/{section_key}")
225+
section_key = f"evaluation_metrics/section_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.parquet"
226+
_save_records_as_parquet(section_records, reporting_bucket, section_key, s3_client, section_schema)
137227
else:
138228
logger.warning("No section records to save")
139229

140-
# Save attribute metrics in JSON Lines format
230+
# Save attribute metrics in Parquet format
141231
if attribute_records:
142-
attr_key = f"evaluation_metrics/attribute_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.jsonl"
143-
attribute_lines = '\n'.join(json.dumps(record) for record in attribute_records)
144-
s3_client.put_object(
145-
Bucket=reporting_bucket,
146-
Key=attr_key,
147-
Body=attribute_lines,
148-
ContentType='application/x-ndjson'
149-
)
150-
logger.info(f"Saved {len(attribute_records)} attribute metrics to s3://{reporting_bucket}/{attr_key}")
232+
attr_key = f"evaluation_metrics/attribute_metrics/year={year}/month={month}/day={day}/document={escaped_doc_id}/results.parquet"
233+
_save_records_as_parquet(attribute_records, reporting_bucket, attr_key, s3_client, attribute_schema)
151234
else:
152235
logger.warning("No attribute records to save")
153236

template.yaml

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,7 +1267,7 @@ Resources:
12671267
Description: "Table for document-level evaluation metrics"
12681268
TableType: "EXTERNAL_TABLE"
12691269
Parameters: {
1270-
"classification": "json",
1270+
"classification": "parquet",
12711271
"typeOfData": "file"
12721272
}
12731273
StorageDescriptor:
@@ -1277,10 +1277,10 @@ Resources:
12771277
- ShouldCreateReportingBucket
12781278
- !Ref ReportingBucket
12791279
- !Ref ReportingBucketName
1280-
InputFormat: "org.apache.hadoop.mapred.TextInputFormat"
1281-
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
1280+
InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
1281+
OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
12821282
SerdeInfo:
1283-
SerializationLibrary: "org.apache.hadoop.hive.serde2.JsonSerDe"
1283+
SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
12841284
Columns:
12851285
- Name: document_id
12861286
Type: string
@@ -1323,7 +1323,7 @@ Resources:
13231323
Description: "Table for section-level evaluation metrics"
13241324
TableType: "EXTERNAL_TABLE"
13251325
Parameters: {
1326-
"classification": "json",
1326+
"classification": "parquet",
13271327
"typeOfData": "file"
13281328
}
13291329
StorageDescriptor:
@@ -1333,10 +1333,10 @@ Resources:
13331333
- ShouldCreateReportingBucket
13341334
- !Ref ReportingBucket
13351335
- !Ref ReportingBucketName
1336-
InputFormat: "org.apache.hadoop.mapred.TextInputFormat"
1337-
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
1336+
InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
1337+
OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
13381338
SerdeInfo:
1339-
SerializationLibrary: "org.apache.hadoop.hive.serde2.JsonSerDe"
1339+
SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
13401340
Columns:
13411341
- Name: document_id
13421342
Type: string
@@ -1379,7 +1379,7 @@ Resources:
13791379
Description: "Table for attribute-level evaluation metrics"
13801380
TableType: "EXTERNAL_TABLE"
13811381
Parameters: {
1382-
"classification": "json",
1382+
"classification": "parquet",
13831383
"typeOfData": "file"
13841384
}
13851385
StorageDescriptor:
@@ -1389,10 +1389,10 @@ Resources:
13891389
- ShouldCreateReportingBucket
13901390
- !Ref ReportingBucket
13911391
- !Ref ReportingBucketName
1392-
InputFormat: "org.apache.hadoop.mapred.TextInputFormat"
1393-
OutputFormat: "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
1392+
InputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"
1393+
OutputFormat: "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"
13941394
SerdeInfo:
1395-
SerializationLibrary: "org.openx.data.jsonserde.JsonSerDe"
1395+
SerializationLibrary: "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
13961396
Columns:
13971397
- Name: document_id
13981398
Type: string
@@ -1414,6 +1414,10 @@ Resources:
14141414
Type: string
14151415
- Name: evaluation_method
14161416
Type: string
1417+
- Name: expected_confidence
1418+
Type: string
1419+
- Name: actual_confidence
1420+
Type: string
14171421
- Name: evaluation_date
14181422
Type: timestamp
14191423
Compressed: true

0 commit comments

Comments
 (0)