-
Notifications
You must be signed in to change notification settings - Fork 1
[PRMP-1057] Creation of Report Distribution Lambda #1026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e61d3bd
3d9a291
528efe0
7c55481
dfbff8d
f69afbf
326a6c8
4c31c97
31fc5e4
d247394
38ea089
25fda59
bdff697
21b062c
4d9b466
4ebc15e
5cecec9
8cd2335
aa1ac5c
5d2a089
e7cfb8c
68f7d3b
cb0fd89
4ba0003
5c3fa49
5035fed
0264411
8f74fe3
509f3e9
0cf7002
2948ffe
c67f895
895863f
717f32c
7377d97
52cdcf9
4dd272b
4c9ed8f
b05e171
7923346
c78120e
1f1c37f
d180ec2
f00950c
8148c1c
11327c7
ab6d112
d4b6987
5ac09fd
5e9cb3e
b0f6b71
0c98c34
6085927
a755a54
2361c30
04b1ecd
59f0531
453c9c5
68cd6d6
d8c3005
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| from enum import StrEnum | ||
|
|
||
|
|
||
| class ReportDistributionAction(StrEnum): | ||
| LIST = "list" | ||
| PROCESS_ONE = "process_one" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| import os | ||
| from typing import Any, Dict | ||
|
|
||
| from enums.lambda_error import LambdaError | ||
| from enums.report_distribution_action import ReportDistributionAction | ||
| from repositories.reporting.report_contact_repository import ReportContactRepository | ||
| from services.base.s3_service import S3Service | ||
| from services.email_service import EmailService | ||
| from services.reporting.report_distribution_service import ReportDistributionService | ||
| from utils.audit_logging_setup import LoggingService | ||
| from utils.decorators.ensure_env_var import ensure_environment_variables | ||
| from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions | ||
| from utils.decorators.override_error_check import override_error_check | ||
| from utils.decorators.set_audit_arg import set_request_context_for_logging | ||
| from utils.lambda_exceptions import ReportDistributionException | ||
|
|
||
| logger = LoggingService(__name__) | ||
|
|
||
|
|
||
| @ensure_environment_variables( | ||
| names=[ | ||
| "REPORT_BUCKET_NAME", | ||
| "CONTACT_TABLE_NAME", | ||
| "PRM_MAILBOX_EMAIL", | ||
| "SES_FROM_ADDRESS", | ||
| ] | ||
| ) | ||
| @override_error_check | ||
| @handle_lambda_exceptions | ||
| @set_request_context_for_logging | ||
| def lambda_handler(event, context) -> Dict[str, Any]: | ||
| action = event.get("action") | ||
| if action not in { | ||
| ReportDistributionAction.LIST, | ||
| ReportDistributionAction.PROCESS_ONE, | ||
| }: | ||
| logger.error("Invalid action. Expected 'list' or 'process_one'.") | ||
| raise ReportDistributionException(400, LambdaError.InvalidAction) | ||
|
|
||
| bucket = event.get("bucket") or os.environ["REPORT_BUCKET_NAME"] | ||
|
|
||
| service = ReportDistributionService( | ||
| bucket=bucket | ||
| ) | ||
|
|
||
| response = {"status": "ok", "bucket": bucket} | ||
|
|
||
| if action == ReportDistributionAction.LIST: | ||
| prefix = event["prefix"] | ||
| keys = service.list_xlsx_keys(prefix=prefix) | ||
| logger.info(f"List mode: returning {len(keys)} key(s) for prefix={prefix}") | ||
| response.update({"prefix": prefix, "keys": keys}) | ||
| else: | ||
| key = event["key"] | ||
| ods_code = service.extract_ods_code_from_key(key) | ||
| service.process_one_report(ods_code=ods_code, key=key) | ||
| logger.info(f"Process-one mode: processed ods={ods_code}, key={key}") | ||
| response.update({"key": key, "ods_code": ods_code}) | ||
|
|
||
| return response | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,9 @@ | ||
| import os | ||
| import tempfile | ||
| from datetime import datetime, timedelta, timezone | ||
| from typing import Any, Dict, Tuple | ||
|
|
||
| from repositories.reporting.reporting_dynamo_repository import ReportingDynamoRepository | ||
| from services.base.s3_service import S3Service | ||
| from services.reporting.excel_report_generator_service import ExcelReportGenerator | ||
| from services.reporting.report_orchestration_service import ReportOrchestrationService | ||
| from utils.audit_logging_setup import LoggingService | ||
|
|
@@ -14,42 +15,92 @@ | |
| logger = LoggingService(__name__) | ||
|
|
||
|
|
||
| def calculate_reporting_window(): | ||
| def calculate_reporting_window() -> Tuple[int, int]: | ||
| now = datetime.now(timezone.utc) | ||
| today_7am = now.replace(hour=7, minute=0, second=0, microsecond=0) | ||
|
|
||
| if now < today_7am: | ||
| today_7am -= timedelta(days=1) | ||
|
|
||
| yesterday_7am = today_7am - timedelta(days=1) | ||
| return int(yesterday_7am.timestamp()), int(today_7am.timestamp()) | ||
|
|
||
| return ( | ||
| int(yesterday_7am.timestamp()), | ||
| int(today_7am.timestamp()), | ||
| ) | ||
|
|
||
| def get_report_date_folder() -> str: | ||
| return datetime.now(timezone.utc).strftime("%Y-%m-%d") | ||
|
|
||
|
|
||
| def build_s3_key(ods_code: str, report_date: str) -> str: | ||
| return f"Report-Orchestration/{report_date}/{ods_code}.xlsx" | ||
|
|
||
|
|
||
|
|
||
| def upload_generated_reports( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. feels like this should belong in a service/handler is doing quite a bit
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tricky one, So if it was in the service, the report_orchestration_service would only be calling the s3 service, and as such doing no logic. That would increase nesting and make it harder to understand the workflow. |
||
| s3_service: S3Service, | ||
| bucket: str, | ||
| report_date: str, | ||
| generated_files: Dict[str, str], | ||
| ) -> list[str]: | ||
| keys: list[str] = [] | ||
| for ods_code, local_path in generated_files.items(): | ||
| key = build_s3_key(ods_code, report_date) | ||
| s3_service.upload_file_with_extra_args( | ||
| file_name=local_path, | ||
| s3_bucket_name=bucket, | ||
| file_key=key, | ||
| extra_args={"ServerSideEncryption": "aws:kms"}, | ||
| ) | ||
| keys.append(key) | ||
| logger.info(f"Uploaded report for ODS={ods_code} to s3://{bucket}/{key}") | ||
| return keys | ||
|
|
||
|
|
||
| @ensure_environment_variables(names=["BULK_UPLOAD_REPORT_TABLE_NAME"]) | ||
| def build_response(report_date: str, bucket: str, keys: list[str]) -> Dict[str, Any]: | ||
| prefix = f"Report-Orchestration/{report_date}/" | ||
| return { | ||
| "status": "ok", | ||
| "report_date": report_date, | ||
| "bucket": bucket, | ||
| "prefix": prefix, | ||
| "keys": keys, | ||
| } | ||
|
|
||
|
|
||
| @ensure_environment_variables( | ||
| names=[ | ||
| "BULK_UPLOAD_REPORT_TABLE_NAME", | ||
| "REPORT_BUCKET_NAME", | ||
| ] | ||
| ) | ||
| @override_error_check | ||
| @handle_lambda_exceptions | ||
| @set_request_context_for_logging | ||
| def lambda_handler(event, context): | ||
| def lambda_handler(event, context) -> Dict[str, Any]: | ||
| logger.info("Report orchestration lambda invoked") | ||
| table_name = os.getenv("BULK_UPLOAD_REPORT_TABLE_NAME") | ||
|
|
||
| repository = ReportingDynamoRepository(table_name) | ||
| excel_generator = ExcelReportGenerator() | ||
|
|
||
| service = ReportOrchestrationService( | ||
| repository=repository, | ||
| excel_generator=excel_generator, | ||
| report_bucket = os.environ["REPORT_BUCKET_NAME"] | ||
| orchestration_service = ReportOrchestrationService( | ||
| ) | ||
| s3_service = S3Service() | ||
|
|
||
| window_start, window_end = calculate_reporting_window() | ||
| tmp_dir = tempfile.mkdtemp() | ||
| report_date = get_report_date_folder() | ||
|
|
||
| service.process_reporting_window( | ||
| generated_files = orchestration_service.process_reporting_window( | ||
| window_start_ts=window_start, | ||
| window_end_ts=window_end, | ||
| output_dir=tmp_dir, | ||
| ) | ||
|
|
||
| if not generated_files: | ||
| logger.info("No reports generated; exiting") | ||
| return build_response(report_date=report_date, bucket=report_bucket, keys=[]) | ||
|
|
||
| keys = upload_generated_reports( | ||
| s3_service=s3_service, | ||
| bucket=report_bucket, | ||
| report_date=report_date, | ||
| generated_files=generated_files, | ||
| ) | ||
|
|
||
| logger.info(f"Generated and uploaded {len(keys)} report(s) for report_date={report_date}") | ||
| return build_response(report_date=report_date, bucket=report_bucket, keys=keys) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| import os | ||
|
|
||
| from services.base.dynamo_service import DynamoDBService | ||
|
|
||
|
|
||
| class ReportContactRepository: | ||
| def __init__(self): | ||
| self.table_name = os.environ["CONTACT_TABLE_NAME"] | ||
| self.dynamo = DynamoDBService() | ||
|
|
||
| def get_contact_email(self, ods_code: str) -> str | None: | ||
| resp = self.dynamo.get_item( | ||
| table_name=self.table_name, | ||
| key={"OdsCode": ods_code}, | ||
| ) | ||
| item = (resp or {}).get("Item") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why (resp or {})?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is a failsafe in case resp is None,
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is a bit more clear - |
||
| if not item: | ||
| return None | ||
| return item.get("Email") | ||
|
Comment on lines
+11
to
+19
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. could this live within the email or distribution service?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, but it is more about separation of concerns. |
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,35 +1,55 @@ | ||
| import os | ||
| from datetime import timedelta | ||
| from typing import Dict, List | ||
|
|
||
| from boto3.dynamodb.conditions import Attr | ||
| from boto3.dynamodb.conditions import Key | ||
| from services.base.dynamo_service import DynamoDBService | ||
| from utils.audit_logging_setup import LoggingService | ||
| from utils.utilities import utc_date_string, utc_date, utc_day_start_timestamp, utc_day_end_timestamp | ||
|
|
||
| logger = LoggingService(__name__) | ||
|
|
||
|
|
||
| class ReportingDynamoRepository: | ||
| def __init__(self, table_name: str): | ||
| self.table_name = table_name | ||
| def __init__(self): | ||
| self.table_name = os.environ["BULK_UPLOAD_REPORT_TABLE_NAME"] | ||
| self.dynamo_service = DynamoDBService() | ||
|
|
||
| def get_records_for_time_window( | ||
| self, | ||
| start_timestamp: int, | ||
| end_timestamp: int, | ||
| ) -> List[Dict]: | ||
| timestamp_index_name = "TimestampIndex" | ||
| logger.info( | ||
| f"Querying reporting table for window, " | ||
| f"table_name: {self.table_name}, " | ||
| f"start_timestamp: {start_timestamp}, " | ||
| f"end_timestamp: {end_timestamp}", | ||
| "Querying reporting table via TimestampIndex for window, " | ||
| f"table_name={self.table_name}, start_timestamp={start_timestamp}, end_timestamp={end_timestamp}", | ||
| ) | ||
|
|
||
| filter_expression = Attr("Timestamp").between( | ||
| start_timestamp, | ||
| end_timestamp, | ||
| ) | ||
| start_date = utc_date(start_timestamp) | ||
| end_date = utc_date(end_timestamp) | ||
|
|
||
| return self.dynamo_service.scan_whole_table( | ||
| table_name=self.table_name, | ||
| filter_expression=filter_expression, | ||
| ) | ||
| records_for_window: List[Dict] = [] | ||
| current_date = start_date | ||
|
|
||
| while current_date <= end_date: | ||
| day_start_ts = utc_day_start_timestamp(current_date) | ||
| day_end_ts = utc_day_end_timestamp(current_date) | ||
|
|
||
| effective_start_ts = max(start_timestamp, day_start_ts) | ||
| effective_end_ts = min(end_timestamp, day_end_ts) | ||
|
|
||
| key_condition = ( | ||
| Key("Date").eq(current_date.isoformat()) | ||
| & Key("Timestamp").between(effective_start_ts, effective_end_ts) | ||
| ) | ||
|
|
||
| records_for_day = self.dynamo_service.query_by_key_condition_expression( | ||
| table_name=self.table_name, | ||
| index_name=timestamp_index_name, | ||
| key_condition_expression=key_condition, | ||
| ) | ||
|
|
||
| records_for_window.extend(records_for_day) | ||
| current_date += timedelta(days=1) | ||
|
|
||
| return records_for_window |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,2 +1,3 @@ | ||
| openpyxl==3.1.5 | ||
| reportlab==4.3.1 | ||
| reportlab==4.3.1 | ||
| pyzipper==0.3.6 | ||
NogaNHS marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Uh oh!
There was an error while loading. Please reload this page.