From 2761e785018bfc5093689f4e70fe58f0c7536737 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Tue, 10 Feb 2026 11:40:04 +0000 Subject: [PATCH 01/14] [PRMP-939] checking what report s3 content lambda is created --- .../base-lambdas-reusable-deploy-all.yml | 14 ++++ lambdas/handlers/report_s3_content_handler.py | 62 ++++++++++++++++ .../reporting/report_s3_content_service.py | 72 +++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 lambdas/handlers/report_s3_content_handler.py create mode 100644 lambdas/services/reporting/report_s3_content_service.py diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 495c36fb4..3b6d16fad 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -838,3 +838,17 @@ jobs: lambda_layer_names: "core_lambda_layer" secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + + deploy_report_s3_content_lambda: + name: Deploy Report S3 Content Lambda + uses: ./.github/workflows/base-lambdas-reusable-deploy.yml + with: + environment: ${{ inputs.environment }} + python_version: ${{ inputs.python_version }} + build_branch: ${{ inputs.build_branch }} + sandbox: ${{ inputs.sandbox }} + lambda_handler_name: report_s3_content_handler + lambda_aws_name: reportS3ContentLambda + lambda_layer_names: "core_lambda_layer" + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/lambdas/handlers/report_s3_content_handler.py b/lambdas/handlers/report_s3_content_handler.py new file mode 100644 index 000000000..e10f08dc8 --- /dev/null +++ b/lambdas/handlers/report_s3_content_handler.py @@ -0,0 +1,62 @@ +import os +import tempfile +from datetime import datetime, timedelta, timezone + +from repositories.reporting.reporting_dynamo_repository import ReportingDynamoRepository +from services.reporting.excel_report_generator_service import ExcelReportGenerator +from services.reporting.report_orchestration_service import ReportOrchestrationService +from services.reporting.report_s3_content_service import ReportS3ContentService +from utils.audit_logging_setup import LoggingService +from utils.decorators.ensure_env_var import ensure_environment_variables +from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions +from utils.decorators.override_error_check import override_error_check +from utils.decorators.set_audit_arg import set_request_context_for_logging + +logger = LoggingService(__name__) + + +# def calculate_reporting_window(): +# now = datetime.now(timezone.utc) +# today_7am = now.replace(hour=7, minute=0, second=0, microsecond=0) +# +# if now < today_7am: +# today_7am -= timedelta(days=1) +# +# yesterday_7am = today_7am - timedelta(days=1) +# +# return ( +# int(yesterday_7am.timestamp()), +# int(today_7am.timestamp()), +# ) + + +@ensure_environment_variables(names=["LLOYD_GEORGE_BUCKET_NAME"]) +@ensure_environment_variables(names=["LLOYD_GEORGE_BUCKET_NAME"]) +@ensure_environment_variables(names=["STATISTICAL_REPORTS_BUCKET"]) +@override_error_check +@handle_lambda_exceptions +@set_request_context_for_logging +def lambda_handler(event, context): + logger.info("Report S3 content lambda invoked") + lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") + bulk_staging_store = os.getenv("LLOYD_GEORGE_BUCKET_NAME") + statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") + + # repository = ReportingDynamoRepository(table_name) + # excel_generator = ExcelReportGenerator() + # + # service = ReportOrchestrationService( + # repository=repository, + # excel_generator=excel_generator, + # ) + + service = ReportS3ContentService() + + # window_start, window_end = calculate_reporting_window() + # tmp_dir = tempfile.mkdtemp() + + service.process_s3_content( + # window_start_ts=window_start, + # window_end_ts=window_end, + # output_dir=tmp_dir, + ) diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py new file mode 100644 index 000000000..765167382 --- /dev/null +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -0,0 +1,72 @@ +import os +import tempfile +from collections import defaultdict + +from services.base.s3_service import S3Service +from utils.audit_logging_setup import LoggingService + +logger = LoggingService(__name__) + + +class ReportS3ContentService: + def __init__( + self + # repository, + # excel_generator, + ): + self.lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") + self.bulk_staging_store = os.getenv("LLOYD_GEORGE_BUCKET_NAME") + self.statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") + self.s3_service = S3Service() + + def process_s3_content( + self, + # window_start_ts: int, + # window_end_ts: int, + # output_dir: str, + ): + bucket_names = [self.lloyd_george_bucket,self.bulk_staging_store] + for bucket_name in bucket_names: + all_bucket_objects = self.s3_service.list_all_objects(bucket_name) + logger.info(f"All Bucket Objects for bucket : {bucket_name}") + logger.info(f"{all_bucket_objects}") + + # records = self.repository.get_records_for_time_window( + # window_start_ts, + # window_end_ts, + # ) + # if not records: + # logger.info("No records found for reporting window") + # return + # + # records_by_ods = self.group_records_by_ods(records) + # + # for ods_code, ods_records in records_by_ods.items(): + # logger.info( + # f"Generating report for ODS ods_code = {ods_code} record_count = {len(ods_records)}" + # ) + # self.generate_ods_report(ods_code, ods_records) + # logger.info("Report orchestration completed") + + # @staticmethod + # def group_records_by_ods(records: list[dict]) -> dict[str, list[dict]]: + # grouped = defaultdict(list) + # for record in records: + # ods_code = record.get("UploaderOdsCode") or "UNKNOWN" + # grouped[ods_code].append(record) + # return grouped + # + # def generate_ods_report( + # self, + # ods_code: str, + # records: list[dict], + # ): + # with tempfile.NamedTemporaryFile( + # suffix=f"_{ods_code}.xlsx", + # delete=False, + # ) as tmp: + # self.excel_generator.create_report_orchestration_xlsx( + # ods_code=ods_code, + # records=records, + # output_path=tmp.name, + # ) From d90e18307166ba321bed54a057dfcd0c6a79fa12 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Tue, 10 Feb 2026 14:42:50 +0000 Subject: [PATCH 02/14] [PRMP-939] updated logic to upload the csv --- lambdas/handlers/report_s3_content_handler.py | 42 +------- lambdas/services/base/s3_service.py | 21 ++++ .../reporting/csv_report_generator_service.py | 66 ++++++++++++ .../reporting/report_s3_content_service.py | 100 ++++++++---------- .../test_report_s3_content_handler.py | 43 ++++++++ .../test_csv_report_generator_service.py | 27 +++++ .../test_report_s3_content_service.py | 34 ++++++ 7 files changed, 235 insertions(+), 98 deletions(-) create mode 100644 lambdas/services/reporting/csv_report_generator_service.py create mode 100644 lambdas/tests/unit/handlers/test_report_s3_content_handler.py create mode 100644 lambdas/tests/unit/services/reporting/test_csv_report_generator_service.py create mode 100644 lambdas/tests/unit/services/reporting/test_report_s3_content_service.py diff --git a/lambdas/handlers/report_s3_content_handler.py b/lambdas/handlers/report_s3_content_handler.py index e10f08dc8..70de3ffbc 100644 --- a/lambdas/handlers/report_s3_content_handler.py +++ b/lambdas/handlers/report_s3_content_handler.py @@ -1,10 +1,3 @@ -import os -import tempfile -from datetime import datetime, timedelta, timezone - -from repositories.reporting.reporting_dynamo_repository import ReportingDynamoRepository -from services.reporting.excel_report_generator_service import ExcelReportGenerator -from services.reporting.report_orchestration_service import ReportOrchestrationService from services.reporting.report_s3_content_service import ReportS3ContentService from utils.audit_logging_setup import LoggingService from utils.decorators.ensure_env_var import ensure_environment_variables @@ -14,49 +7,16 @@ logger = LoggingService(__name__) - -# def calculate_reporting_window(): -# now = datetime.now(timezone.utc) -# today_7am = now.replace(hour=7, minute=0, second=0, microsecond=0) -# -# if now < today_7am: -# today_7am -= timedelta(days=1) -# -# yesterday_7am = today_7am - timedelta(days=1) -# -# return ( -# int(yesterday_7am.timestamp()), -# int(today_7am.timestamp()), -# ) - - -@ensure_environment_variables(names=["LLOYD_GEORGE_BUCKET_NAME"]) @ensure_environment_variables(names=["LLOYD_GEORGE_BUCKET_NAME"]) +@ensure_environment_variables(names=["BULK_STAGING_BUCKET_NAME"]) @ensure_environment_variables(names=["STATISTICAL_REPORTS_BUCKET"]) @override_error_check @handle_lambda_exceptions @set_request_context_for_logging def lambda_handler(event, context): logger.info("Report S3 content lambda invoked") - lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") - bulk_staging_store = os.getenv("LLOYD_GEORGE_BUCKET_NAME") - statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") - - # repository = ReportingDynamoRepository(table_name) - # excel_generator = ExcelReportGenerator() - # - # service = ReportOrchestrationService( - # repository=repository, - # excel_generator=excel_generator, - # ) service = ReportS3ContentService() - # window_start, window_end = calculate_reporting_window() - # tmp_dir = tempfile.mkdtemp() - service.process_s3_content( - # window_start_ts=window_start, - # window_end_ts=window_end, - # output_dir=tmp_dir, ) diff --git a/lambdas/services/base/s3_service.py b/lambdas/services/base/s3_service.py index 3ee42b4e2..4a9bdb0a3 100644 --- a/lambdas/services/base/s3_service.py +++ b/lambdas/services/base/s3_service.py @@ -255,3 +255,24 @@ def save_or_create_file(self, source_bucket: str, file_key: str, body: bytes): return self.client.put_object( Bucket=source_bucket, Key=file_key, Body=BytesIO(body) ) + + def list_all_object_versions(self, bucket_name: str): + paginator = self.client.get_paginator("list_object_versions") + versions, delete_markers = [], [] + + for page in paginator.paginate(Bucket=bucket_name): + versions.extend(page.get("Versions", [])) + delete_markers.extend(page.get("DeleteMarkers", [])) + + return versions, delete_markers + + def get_object_tags_versioned(self, bucket: str, key: str, version_id: str | None): + try: + params = {"Bucket": bucket, "Key": key} + if version_id: + params["VersionId"] = version_id + response = self.client.get_object_tagging(**params) + return response.get("TagSet", []) + except ClientError: + return [] + diff --git a/lambdas/services/reporting/csv_report_generator_service.py b/lambdas/services/reporting/csv_report_generator_service.py new file mode 100644 index 000000000..683ede526 --- /dev/null +++ b/lambdas/services/reporting/csv_report_generator_service.py @@ -0,0 +1,66 @@ +import csv +from io import StringIO +from utils.audit_logging_setup import LoggingService + +logger = LoggingService(__name__) + + +class CsvReportGenerator: + def generate_s3_inventory_csv(self, bucket, versions, delete_markers) -> str: + logger.info(f"Generating S3 inventory CSV for bucket {bucket}") + + output = StringIO() + writer = csv.writer(output) + + writer.writerow( + [ + "bucket", + "key", + "entry_type", + "version_id", + "is_latest", + "last_modified", + "size", + "etag", + "storage_class", + "tags", + ] + ) + + for version in versions: + tags = version.get("Tags", []) + tag_str = ";".join(f"{t['Key']}={t['Value']}" for t in tags) + + writer.writerow( + [ + bucket, + version["Key"], + "VERSION", + version["VersionId"], + version["IsLatest"], + version["LastModified"].isoformat(), + version.get("Size"), + version.get("ETag"), + version.get("StorageClass"), + tag_str, + ] + ) + + for marker in delete_markers: + writer.writerow( + [ + bucket, + marker["Key"], + "DELETE_MARKER", + marker["VersionId"], + marker["IsLatest"], + marker["LastModified"].isoformat(), + None, + None, + None, + "", + ] + ) + + logger.info(f"Finished CSV generation for {bucket}") + return output.getvalue() diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index 765167382..1df6290cf 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -1,72 +1,58 @@ import os -import tempfile -from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from io import BytesIO from services.base.s3_service import S3Service +from services.reporting.csv_report_generator_service import CsvReportGenerator from utils.audit_logging_setup import LoggingService logger = LoggingService(__name__) class ReportS3ContentService: - def __init__( - self - # repository, - # excel_generator, - ): + def __init__(self): self.lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") - self.bulk_staging_store = os.getenv("LLOYD_GEORGE_BUCKET_NAME") + self.bulk_staging_store = os.getenv("BULK_STAGING_BUCKET_NAME") self.statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") self.s3_service = S3Service() + self.csv_generator = CsvReportGenerator() + self.max_objects = 100000 #To avoid prod from melting - def process_s3_content( - self, - # window_start_ts: int, - # window_end_ts: int, - # output_dir: str, - ): - bucket_names = [self.lloyd_george_bucket,self.bulk_staging_store] - for bucket_name in bucket_names: - all_bucket_objects = self.s3_service.list_all_objects(bucket_name) - logger.info(f"All Bucket Objects for bucket : {bucket_name}") - logger.info(f"{all_bucket_objects}") + def _fetch_tags(self, bucket, version): + tags = self.s3_service.get_object_tags_versioned( + bucket, version["Key"], version["VersionId"] + ) + version["Tags"] = tags + return version - # records = self.repository.get_records_for_time_window( - # window_start_ts, - # window_end_ts, - # ) - # if not records: - # logger.info("No records found for reporting window") - # return - # - # records_by_ods = self.group_records_by_ods(records) - # - # for ods_code, ods_records in records_by_ods.items(): - # logger.info( - # f"Generating report for ODS ods_code = {ods_code} record_count = {len(ods_records)}" - # ) - # self.generate_ods_report(ods_code, ods_records) - # logger.info("Report orchestration completed") + def process_s3_content(self): + for bucket in [self.lloyd_george_bucket, self.bulk_staging_store]: + logger.info(f"Listing versions for bucket {bucket}") + versions, delete_markers = self.s3_service.list_all_object_versions(bucket) - # @staticmethod - # def group_records_by_ods(records: list[dict]) -> dict[str, list[dict]]: - # grouped = defaultdict(list) - # for record in records: - # ods_code = record.get("UploaderOdsCode") or "UNKNOWN" - # grouped[ods_code].append(record) - # return grouped - # - # def generate_ods_report( - # self, - # ods_code: str, - # records: list[dict], - # ): - # with tempfile.NamedTemporaryFile( - # suffix=f"_{ods_code}.xlsx", - # delete=False, - # ) as tmp: - # self.excel_generator.create_report_orchestration_xlsx( - # ods_code=ods_code, - # records=records, - # output_path=tmp.name, - # ) + if len(versions) > self.max_objects: + logger.warning(f"Limiting versions from {len(versions)} to {self.max_objects}") + versions = versions[: self.max_objects] + + logger.info(f"Fetching tags in parallel for {len(versions)} versions") + + with ThreadPoolExecutor(max_workers=20) as executor: + futures = [ + executor.submit(self._fetch_tags, bucket, v) for v in versions + ] + for _ in as_completed(futures): + pass + + logger.info(f"Generating CSV for bucket {bucket}") + csv_content = self.csv_generator.generate_s3_inventory_csv( + bucket, versions, delete_markers + ) + + logger.info(f"Uploading report for bucket {bucket}") + self.s3_service.upload_file_obj( + BytesIO(csv_content.encode("utf-8")), + self.statistic_reports_bucket, + f"s3-content-report/{bucket}-inventory.csv", + ) + + logger.info(f"Completed report for {bucket}") diff --git a/lambdas/tests/unit/handlers/test_report_s3_content_handler.py b/lambdas/tests/unit/handlers/test_report_s3_content_handler.py new file mode 100644 index 000000000..75976cb2f --- /dev/null +++ b/lambdas/tests/unit/handlers/test_report_s3_content_handler.py @@ -0,0 +1,43 @@ +import pytest +from types import SimpleNamespace +from lambdas.handlers.report_s3_content_handler import lambda_handler + + +@pytest.fixture(autouse=True) +def patch_env_vars(monkeypatch): + env_vars = { + "LLOYD_GEORGE_BUCKET_NAME": "bucket-a", + "STATISTICAL_REPORTS_BUCKET": "bucket-b", + "BULK_STAGING_BUCKET_NAME": "bucket-c", + } + for key, value in env_vars.items(): + monkeypatch.setenv(key, value) + + +@pytest.fixture +def lambda_context(): + return SimpleNamespace(aws_request_id="test-request-id") + + +def test_lambda_handler_invokes_service(mocker, lambda_context): + mock_service_cls = mocker.patch( + "lambdas.handlers.report_s3_content_handler.ReportS3ContentService" + ) + mock_service = mock_service_cls.return_value + + lambda_handler({}, lambda_context) + + mock_service_cls.assert_called_once() + mock_service.process_s3_content.assert_called_once() + + +def test_lambda_handler_runs_without_event_data(mocker, lambda_context): + mock_service_cls = mocker.patch( + "lambdas.handlers.report_s3_content_handler.ReportS3ContentService" + ) + mock_service = mock_service_cls.return_value + + lambda_handler({}, lambda_context) + + mock_service_cls.assert_called_once() + mock_service.process_s3_content.assert_called_once() diff --git a/lambdas/tests/unit/services/reporting/test_csv_report_generator_service.py b/lambdas/tests/unit/services/reporting/test_csv_report_generator_service.py new file mode 100644 index 000000000..fba9fd9cf --- /dev/null +++ b/lambdas/tests/unit/services/reporting/test_csv_report_generator_service.py @@ -0,0 +1,27 @@ +from datetime import datetime, timezone +from services.reporting.csv_report_generator_service import CsvReportGenerator + + +def test_generate_s3_inventory_csv(): + generator = CsvReportGenerator() + + versions = [ + { + "Key": "file1.txt", + "VersionId": "v1", + "IsLatest": True, + "LastModified": datetime(2024, 1, 1, tzinfo=timezone.utc), + "Size": 123, + "ETag": "etag1", + "StorageClass": "STANDARD", + "Tags": [{"Key": "autodelete", "Value": "true"}], + } + ] + + delete_markers = [] + + csv_output = generator.generate_s3_inventory_csv("bucket-a", versions, delete_markers) + + assert "bucket-a" in csv_output + assert "file1.txt" in csv_output + assert "autodelete=true" in csv_output diff --git a/lambdas/tests/unit/services/reporting/test_report_s3_content_service.py b/lambdas/tests/unit/services/reporting/test_report_s3_content_service.py new file mode 100644 index 000000000..5c8c8354f --- /dev/null +++ b/lambdas/tests/unit/services/reporting/test_report_s3_content_service.py @@ -0,0 +1,34 @@ +from unittest.mock import MagicMock +from services.reporting.report_s3_content_service import ReportS3ContentService + + +def test_process_s3_content(mocker): + service = ReportS3ContentService() + service.lloyd_george_bucket = "bucket-a" + service.bulk_staging_store = "bucket-b" + service.statistic_reports_bucket = "reports-bucket" + service.s3_service = mocker.Mock() + service.csv_generator = mocker.Mock() + + fake_versions = [ + { + "Key": "file1.txt", + "VersionId": "v1", + "IsLatest": True, + "LastModified": mocker.Mock(), + } + ] + fake_delete_markers = [] + + service.s3_service.list_all_object_versions.return_value = ( + fake_versions, + fake_delete_markers, + ) + service.s3_service.get_object_tags_versioned.return_value = [ + {"Key": "autodelete", "Value": "true"} + ] + service.csv_generator.generate_s3_inventory_csv.return_value = "csv-data" + + service.process_s3_content() + + assert service.s3_service.upload_file_obj.call_count == 2 From 1974a3bcea4559b3191d83410a0159fdad20c162 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Tue, 10 Feb 2026 15:09:46 +0000 Subject: [PATCH 03/14] [PRMP-939] updated name --- .github/workflows/base-lambdas-reusable-deploy-all.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 3b6d16fad..7ae4922e0 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -848,7 +848,7 @@ jobs: build_branch: ${{ inputs.build_branch }} sandbox: ${{ inputs.sandbox }} lambda_handler_name: report_s3_content_handler - lambda_aws_name: reportS3ContentLambda + lambda_aws_name: ReportS3Content lambda_layer_names: "core_lambda_layer" secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} From d99d87bfbe7fc9e6548c8d4af1225f3934afaf9e Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Tue, 10 Feb 2026 16:43:00 +0000 Subject: [PATCH 04/14] [PRMP-939] adjusted ack max workers --- lambdas/services/reporting/report_s3_content_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index 1df6290cf..00008a6df 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -16,7 +16,7 @@ def __init__(self): self.statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") self.s3_service = S3Service() self.csv_generator = CsvReportGenerator() - self.max_objects = 100000 #To avoid prod from melting + self.max_objects = 4500000 #To avoid prod from melting def _fetch_tags(self, bucket, version): tags = self.s3_service.get_object_tags_versioned( From 886fca311f4b3d2e7165f76ed92855940f1b32fa Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Wed, 11 Feb 2026 13:58:19 +0000 Subject: [PATCH 05/14] [PRMP-939] optimizing --- .../services/reporting/report_s3_content_service.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index 00008a6df..a5d01b0eb 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -28,12 +28,16 @@ def _fetch_tags(self, bucket, version): def process_s3_content(self): for bucket in [self.lloyd_george_bucket, self.bulk_staging_store]: logger.info(f"Listing versions for bucket {bucket}") - versions, delete_markers = self.s3_service.list_all_object_versions(bucket) - + # versions, delete_markers = self.s3_service.list_all_object_versions(bucket) + versions = self.s3_service.list_all_objects(bucket) + delete_markers = [] if len(versions) > self.max_objects: logger.warning(f"Limiting versions from {len(versions)} to {self.max_objects}") versions = versions[: self.max_objects] - + # versions = [ + # v for v in versions + # if v["IsLatest"] + # ] logger.info(f"Fetching tags in parallel for {len(versions)} versions") with ThreadPoolExecutor(max_workers=20) as executor: From 5c572992d750071fdb8562ae68b3c4c1373e3b5e Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 10:51:03 +0000 Subject: [PATCH 06/14] [PRMP-939] trying to create ecs task --- .../base-lambdas-reusable-deploy-all.yml | 14 +++ .github/workflows/full-deploy-to-pre-prod.yml | 11 +++ .github/workflows/full-deploy-to-prod.yml | 10 +++ .github/workflows/lambdas-dev-to-main-ci.yml | 17 ++++ .github/workflows/s3-base-data-collection.yml | 90 +++++++++++++++++++ .../s3-data-collection-deploy-to-sandbox.yml | 58 ++++++++++++ lambdas/ecs/s3_data_collection/Dockerfile | 15 ++++ lambdas/handlers/report_s3_content_handler.py | 13 +++ .../reporting/report_s3_content_service.py | 4 +- 9 files changed, 230 insertions(+), 2 deletions(-) create mode 100644 .github/workflows/s3-base-data-collection.yml create mode 100644 .github/workflows/s3-data-collection-deploy-to-sandbox.yml create mode 100644 lambdas/ecs/s3_data_collection/Dockerfile diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 7ae4922e0..59c0b3d38 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -434,6 +434,20 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_s3_data_collection_lambda: + name: Deploy S3 data collection lambda + uses: ./.github/workflows/base-lambdas-reusable-deploy.yml + with: + environment: ${{ inputs.environment}} + python_version: ${{ inputs.python_version }} + build_branch: ${{ inputs.build_branch}} + sandbox: ${{ inputs.sandbox }} + lambda_handler_name: data_collection_handler + lambda_aws_name: ReportS3Content + lambda_layer_names: "core_lambda_layer,data_lambda_layer" + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_statistical_report_lambda: name: Deploy statistical report lambda uses: ./.github/workflows/base-lambdas-reusable-deploy.yml diff --git a/.github/workflows/full-deploy-to-pre-prod.yml b/.github/workflows/full-deploy-to-pre-prod.yml index de251f2c2..d121c2be9 100644 --- a/.github/workflows/full-deploy-to-pre-prod.yml +++ b/.github/workflows/full-deploy-to-pre-prod.yml @@ -136,6 +136,17 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_s3_data_collection: + name: Deploy S3 Data Collection + needs: [ "tag_and_release" ] + uses: ./.github/workflows/s3-base-data-collection.yml + with: + build_branch: ${{ needs.tag_and_release.outputs.version }} + environment: pre-prod + sandbox: pre-prod + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + run_fhir_api_e2e_tests: name: Run FHIR API E2E Tests needs: ["deploy_all_lambdas"] diff --git a/.github/workflows/full-deploy-to-prod.yml b/.github/workflows/full-deploy-to-prod.yml index b9dae923e..f2a43132e 100644 --- a/.github/workflows/full-deploy-to-prod.yml +++ b/.github/workflows/full-deploy-to-prod.yml @@ -75,3 +75,13 @@ jobs: sandbox: prod secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + + deploy_s3_data_collection: + name: Deploy S3 Data Collection + uses: ./.github/workflows/s3_base-data-collection.yml + with: + build_branch: ${{ inputs.tag_version }} + environment: prod + sandbox: prod + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/.github/workflows/lambdas-dev-to-main-ci.yml b/.github/workflows/lambdas-dev-to-main-ci.yml index f67055b17..f5b38a651 100644 --- a/.github/workflows/lambdas-dev-to-main-ci.yml +++ b/.github/workflows/lambdas-dev-to-main-ci.yml @@ -80,6 +80,18 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_s3_data_collection: + name: Deploy S3 Data Collection + needs: [ "run_tests" ] + uses: ./.github/workflows/s3-base-data-collection.yml + if: github.ref == 'refs/heads/main' + with: + build_branch: ${{ github.event.pull_request.head.ref }} + environment: development + sandbox: ndr-dev + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + notify-slack: name: Notify Slack on Failure runs-on: ubuntu-latest @@ -91,6 +103,7 @@ jobs: publish_all_lambda_layers, deploy_all_lambdas, deploy_data_collection, + deploy_s3_data_collection, ] if: failure() && github.event_name == 'push' && github.ref == 'refs/heads/main' steps: @@ -160,6 +173,10 @@ jobs: { "type": "mrkdwn", "text": "*deploy_data_collection:* ${{ needs.deploy_data_collection.result == 'success' && ':white_check_mark:' || ':x:' }}" + }, + { + "type": "mrkdwn", + "text": "*deploy_s3_data_collection:* ${{ needs.deploy_s3_data_collection.result == 'success' && ':white_check_mark:' || ':x:' }}" } ] }, diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml new file mode 100644 index 000000000..d08990047 --- /dev/null +++ b/.github/workflows/s3-base-data-collection.yml @@ -0,0 +1,90 @@ +name: "Z-BASE Deploy S3 Data Collection: Build data collection image" + +run-name: "${{ github.event.inputs.build_branch }} | ${{ github.event.inputs.environment }} | ${{ github.event.inputs.sandbox }}" + +on: + workflow_call: + inputs: + build_branch: + required: true + type: string + environment: + required: true + type: string + sandbox: + required: true + type: string + secrets: + AWS_ASSUME_ROLE: + required: true + +permissions: + pull-requests: write + id-token: write # This is required for requesting the JWT + contents: read # This is required for actions/checkout + +jobs: + s3_data_collection_build_docker_image: + runs-on: ubuntu-latest + environment: ${{ inputs.environment }} + defaults: + run: + working-directory: lambdas + + steps: + - uses: actions/checkout@v6 + with: + repository: "NHSDigital/national-document-repository" + ref: ${{ inputs.build_branch }} + fetch-depth: "0" + + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v5 + with: + role-to-assume: ${{ secrets.AWS_ASSUME_ROLE }} + role-skip-session-tagging: true + aws-region: ${{ vars.AWS_REGION }} + mask-aws-account-id: true + + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + - name: Build, tag, and push image to Amazon ECR + id: build-image + env: + ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} + ECR_REPOSITORY: ${{ inputs.sandbox }}-s3-data-collection + IMAGE_TAG: latest + IMAGE_TAG_SHA: ${{ github.sha }} + run: | + docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA -f ecs/s3_data_collection/Dockerfile . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG + docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA + echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> $GITHUB_OUTPUT + + # Looks like the ECS does not check for image updates, as such we need to force a new task definition to run the new image + # We will always use the "latest" image tag so we do not need to modify the task-definition + - name: Download task definition + id: download-task + run: | + aws ecs describe-task-definition --task-definition ${{ inputs.sandbox }}-task-s3-data-collection --query taskDefinition > task-definition.json + echo "revision=$(cat task-definition.json | jq .revision)" >> $GITHUB_OUTPUT + + - name: Fill in the new image ID in the Amazon ECS task definition + id: task-def + uses: aws-actions/amazon-ecs-render-task-definition@v1 + with: + task-definition: task-definition.json + container-name: ${{ inputs.sandbox }}-container-s3-data-collection + image: ${{ steps.build-image.outputs.image }} + + - name: Register new ECS task definition revision + run: | + aws ecs register-task-definition --cli-input-json file://task-definition.json + + + - name: De-register previous revision + run: | + aws ecs deregister-task-definition \ + --task-definition ${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }} diff --git a/.github/workflows/s3-data-collection-deploy-to-sandbox.yml b/.github/workflows/s3-data-collection-deploy-to-sandbox.yml new file mode 100644 index 000000000..20693494a --- /dev/null +++ b/.github/workflows/s3-data-collection-deploy-to-sandbox.yml @@ -0,0 +1,58 @@ +name: "SANDBOX S3 Data Collection - Publish Data Collection Image to ECR" + +run-name: "${{ github.event.inputs.build_branch }} | ${{ github.event.inputs.sandbox }} | ${{ github.event.inputs.environment }}" + +on: + workflow_dispatch: + inputs: + build_branch: + description: "Feature branch to push." + required: true + type: string + default: "main" + sandbox: + description: "Which Sandbox to push to." + required: true + type: string + default: "ndr" + environment: + description: "Which Environment settings to use." + required: true + type: string + default: "development" + workflow_call: + inputs: + build_branch: + description: "Feature branch to push." + required: true + type: string + default: "main" + sandbox: + description: "Which Sandbox to push to." + required: true + type: string + default: "ndr" + environment: + description: "Which Environment settings to use." + required: true + type: string + default: "development" + secrets: + AWS_ASSUME_ROLE: + required: true + +permissions: + pull-requests: write + id-token: write # This is required for requesting the JWT + contents: read # This is required for actions/checkout + +jobs: + push_image: + name: Push Image + uses: ./.github/workflows/s3-base-data-collection.yml + with: + build_branch: ${{ inputs.build_branch }} + environment: ${{ inputs.environment }} + sandbox: ${{ inputs.sandbox }} + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/lambdas/ecs/s3_data_collection/Dockerfile b/lambdas/ecs/s3_data_collection/Dockerfile new file mode 100644 index 000000000..81417fa22 --- /dev/null +++ b/lambdas/ecs/s3_data_collection/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.11 + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 +ENV PYTHONPATH=/lambdas + +WORKDIR /lambdas +COPY ../../requirements /lambdas/requirements + +RUN pip install -r requirements/layers/requirements_core_lambda_layer.txt +RUN pip install -r requirements/layers/requirements_data_lambda_layer.txt + +COPY ../../ /lambdas + +ENTRYPOINT ["python", "-m", "handlers.report_s3_content_handler"] diff --git a/lambdas/handlers/report_s3_content_handler.py b/lambdas/handlers/report_s3_content_handler.py index 70de3ffbc..74077f7a7 100644 --- a/lambdas/handlers/report_s3_content_handler.py +++ b/lambdas/handlers/report_s3_content_handler.py @@ -20,3 +20,16 @@ def lambda_handler(event, context): service.process_s3_content( ) + + + +class _EcsContext: + aws_request_id = "ecs-run" + function_name = "ReportS3ContentEcsTask" + invoked_function_arn = "ecs://ReportS3ContentEcsTask" + log_group_name = "ecs" + log_stream_name = "ecs" + + +if __name__ == "__main__": + lambda_handler({}, _EcsContext()) diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index a5d01b0eb..241b23b01 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -28,8 +28,8 @@ def _fetch_tags(self, bucket, version): def process_s3_content(self): for bucket in [self.lloyd_george_bucket, self.bulk_staging_store]: logger.info(f"Listing versions for bucket {bucket}") - # versions, delete_markers = self.s3_service.list_all_object_versions(bucket) - versions = self.s3_service.list_all_objects(bucket) + versions, delete_markers = self.s3_service.list_all_object_versions(bucket) + # versions = self.s3_service.list_all_objects(bucket) delete_markers = [] if len(versions) > self.max_objects: logger.warning(f"Limiting versions from {len(versions)} to {self.max_objects}") From 362e398afc6de6f474a41219ffa1a5a50c76fa4b Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 12:55:07 +0000 Subject: [PATCH 07/14] [PRMP-939] Removed Deployment of lambda since it is an ecs task now --- .../base-lambdas-reusable-deploy-all.yml | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/.github/workflows/base-lambdas-reusable-deploy-all.yml b/.github/workflows/base-lambdas-reusable-deploy-all.yml index 59c0b3d38..c2ec863b8 100644 --- a/.github/workflows/base-lambdas-reusable-deploy-all.yml +++ b/.github/workflows/base-lambdas-reusable-deploy-all.yml @@ -853,16 +853,16 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} - deploy_report_s3_content_lambda: - name: Deploy Report S3 Content Lambda - uses: ./.github/workflows/base-lambdas-reusable-deploy.yml - with: - environment: ${{ inputs.environment }} - python_version: ${{ inputs.python_version }} - build_branch: ${{ inputs.build_branch }} - sandbox: ${{ inputs.sandbox }} - lambda_handler_name: report_s3_content_handler - lambda_aws_name: ReportS3Content - lambda_layer_names: "core_lambda_layer" - secrets: - AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} +# deploy_report_s3_content_lambda: +# name: Deploy Report S3 Content Lambda +# uses: ./.github/workflows/base-lambdas-reusable-deploy.yml +# with: +# environment: ${{ inputs.environment }} +# python_version: ${{ inputs.python_version }} +# build_branch: ${{ inputs.build_branch }} +# sandbox: ${{ inputs.sandbox }} +# lambda_handler_name: report_s3_content_handler +# lambda_aws_name: ReportS3Content +# lambda_layer_names: "core_lambda_layer" +# secrets: +# AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} From 484c6ef5b1f1d6efe8ed66ac2c47b3085d8b7706 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 14:15:56 +0000 Subject: [PATCH 08/14] [PRMP-939] updated ecs task docker file --- .github/workflows/s3-base-data-collection.yml | 52 ++++++++++++++----- lambdas/ecs/s3_data_collection/Dockerfile | 4 +- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml index d08990047..80b6c4c3d 100644 --- a/.github/workflows/s3-base-data-collection.yml +++ b/.github/workflows/s3-base-data-collection.yml @@ -20,8 +20,8 @@ on: permissions: pull-requests: write - id-token: write # This is required for requesting the JWT - contents: read # This is required for actions/checkout + id-token: write + contents: read jobs: s3_data_collection_build_docker_image: @@ -58,33 +58,61 @@ jobs: IMAGE_TAG: latest IMAGE_TAG_SHA: ${{ github.sha }} run: | - docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA -f ecs/s3_data_collection/Dockerfile . + set -euo pipefail + + docker build \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG \ + -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA \ + -f ecs/s3_data_collection/Dockerfile \ + . + docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> $GITHUB_OUTPUT - # Looks like the ECS does not check for image updates, as such we need to force a new task definition to run the new image - # We will always use the "latest" image tag so we do not need to modify the task-definition - - name: Download task definition + - name: Download current task definition (raw) id: download-task run: | - aws ecs describe-task-definition --task-definition ${{ inputs.sandbox }}-task-s3-data-collection --query taskDefinition > task-definition.json - echo "revision=$(cat task-definition.json | jq .revision)" >> $GITHUB_OUTPUT + set -euo pipefail + + aws ecs describe-task-definition \ + --task-definition ${{ inputs.sandbox }}-task-s3-data-collection \ + --query taskDefinition \ + > task-definition-raw.json + + echo "revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT - - name: Fill in the new image ID in the Amazon ECS task definition + - name: Render task definition with new image id: task-def uses: aws-actions/amazon-ecs-render-task-definition@v1 with: - task-definition: task-definition.json + task-definition: task-definition-raw.json container-name: ${{ inputs.sandbox }}-container-s3-data-collection image: ${{ steps.build-image.outputs.image }} + - name: Sanitize task definition JSON for registration + run: | + set -euo pipefail + + jq 'del( + .taskDefinitionArn, + .revision, + .status, + .requiresAttributes, + .compatibilities, + .registeredAt, + .registeredBy + )' \ + "${{ steps.task-def.outputs.task-definition }}" \ + > task-definition.json + - name: Register new ECS task definition revision run: | + set -euo pipefail aws ecs register-task-definition --cli-input-json file://task-definition.json - - - name: De-register previous revision + - name: De-register previous revision (optional) run: | + set -euo pipefail aws ecs deregister-task-definition \ --task-definition ${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }} diff --git a/lambdas/ecs/s3_data_collection/Dockerfile b/lambdas/ecs/s3_data_collection/Dockerfile index 81417fa22..29cf7891e 100644 --- a/lambdas/ecs/s3_data_collection/Dockerfile +++ b/lambdas/ecs/s3_data_collection/Dockerfile @@ -5,11 +5,11 @@ ENV PYTHONUNBUFFERED=1 ENV PYTHONPATH=/lambdas WORKDIR /lambdas -COPY ../../requirements /lambdas/requirements +COPY requirements /lambdas/requirements RUN pip install -r requirements/layers/requirements_core_lambda_layer.txt RUN pip install -r requirements/layers/requirements_data_lambda_layer.txt -COPY ../../ /lambdas +COPY . /lambdas ENTRYPOINT ["python", "-m", "handlers.report_s3_content_handler"] From 10fb3be0af844f587e17b7b8a53367c03f597b17 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 14:39:45 +0000 Subject: [PATCH 09/14] [PRMP-939] testing --- .github/workflows/s3-base-data-collection.yml | 61 ++++++++++++++++--- 1 file changed, 52 insertions(+), 9 deletions(-) diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml index 80b6c4c3d..f46f7f204 100644 --- a/.github/workflows/s3-base-data-collection.yml +++ b/.github/workflows/s3-base-data-collection.yml @@ -53,6 +53,7 @@ jobs: - name: Build, tag, and push image to Amazon ECR id: build-image env: + AWS_REGION: ${{ vars.AWS_REGION }} ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} ECR_REPOSITORY: ${{ inputs.sandbox }}-s3-data-collection IMAGE_TAG: latest @@ -60,27 +61,58 @@ jobs: run: | set -euo pipefail + echo "AWS_REGION=$AWS_REGION" + echo "ECR_REGISTRY=$ECR_REGISTRY" + echo "ECR_REPOSITORY=$ECR_REPOSITORY" + echo "Tags: $IMAGE_TAG and $IMAGE_TAG_SHA" + echo "Dockerfile: ecs/s3_data_collection/Dockerfile" + echo "Build context: $(pwd)" + + # Build (context is lambdas/) docker build \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG \ - -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA \ + -t "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" \ + -t "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" \ -f ecs/s3_data_collection/Dockerfile \ . - docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG - docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA + # Push both tags + docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" + docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" + + # Output the immutable tag for task definition update echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> $GITHUB_OUTPUT + - name: Verify image exists in ECR (debug) + env: + AWS_REGION: ${{ vars.AWS_REGION }} + REPO: ${{ inputs.sandbox }}-s3-data-collection + run: | + set -euo pipefail + echo "Checking ECR images for repo: $REPO in region: $AWS_REGION" + aws ecr describe-images \ + --region "$AWS_REGION" \ + --repository-name "$REPO" \ + --query "imageDetails[].{tags:imageTags,pushedAt:imagePushedAt,digest:imageDigest}" \ + --output table + - name: Download current task definition (raw) id: download-task + env: + AWS_REGION: ${{ vars.AWS_REGION }} run: | set -euo pipefail aws ecs describe-task-definition \ - --task-definition ${{ inputs.sandbox }}-task-s3-data-collection \ + --region "$AWS_REGION" \ + --task-definition "${{ inputs.sandbox }}-task-s3-data-collection" \ --query taskDefinition \ > task-definition-raw.json - echo "revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT + echo "previous_revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT + echo "family=$(jq -r .family task-definition-raw.json)" >> $GITHUB_OUTPUT + + echo "Downloaded task definition:" + jq '{family,revision,executionRoleArn,taskRoleArn,containerDefinitions:[.containerDefinitions[]|{name,image}]}' task-definition-raw.json - name: Render task definition with new image id: task-def @@ -90,10 +122,11 @@ jobs: container-name: ${{ inputs.sandbox }}-container-s3-data-collection image: ${{ steps.build-image.outputs.image }} - - name: Sanitize task definition JSON for registration + - name: Sanitize rendered task definition JSON run: | set -euo pipefail + # The render action outputs a file path; sanitize that file for register-task-definition jq 'del( .taskDefinitionArn, .revision, @@ -106,13 +139,23 @@ jobs: "${{ steps.task-def.outputs.task-definition }}" \ > task-definition.json + echo "Sanitized task definition (family + container image):" + jq '{family,executionRoleArn,taskRoleArn,containerDefinitions:[.containerDefinitions[]|{name,image,command,entryPoint}]}' task-definition.json + - name: Register new ECS task definition revision + env: + AWS_REGION: ${{ vars.AWS_REGION }} run: | set -euo pipefail - aws ecs register-task-definition --cli-input-json file://task-definition.json + aws ecs register-task-definition \ + --region "$AWS_REGION" \ + --cli-input-json file://task-definition.json - name: De-register previous revision (optional) + env: + AWS_REGION: ${{ vars.AWS_REGION }} run: | set -euo pipefail aws ecs deregister-task-definition \ - --task-definition ${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }} + --region "$AWS_REGION" \ + --task-definition "${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.previous_revision }}" From d8830c0a8a00831e4a8574fa679fd5986cf37b60 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 14:49:23 +0000 Subject: [PATCH 10/14] [PRMP-939] testing --- .github/workflows/full-deploy-to-sandbox.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/.github/workflows/full-deploy-to-sandbox.yml b/.github/workflows/full-deploy-to-sandbox.yml index bf5621cf2..c4311253d 100644 --- a/.github/workflows/full-deploy-to-sandbox.yml +++ b/.github/workflows/full-deploy-to-sandbox.yml @@ -166,6 +166,17 @@ jobs: secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + deploy_s3_data_collection: + name: Deploy S3 Data Collection + uses: ./.github/workflows/s3-base-data-collection.yml + needs: ["deploy_all_lambdas"] + with: + build_branch: ${{ inputs.build_branch }} + environment: ${{ inputs.environment }} + sandbox: ${{ inputs.sandbox }} + secrets: + AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} + bulk_upload: name: "Run Bulk Upload" if: ${{ inputs.bulk_upload }} From 16e2518a6f6f012dd2141dd9c583b788f7bb9839 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 14:57:12 +0000 Subject: [PATCH 11/14] [PRMP-939] testing --- .github/workflows/s3-base-data-collection.yml | 59 +++---------------- 1 file changed, 7 insertions(+), 52 deletions(-) diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml index f46f7f204..b60eea399 100644 --- a/.github/workflows/s3-base-data-collection.yml +++ b/.github/workflows/s3-base-data-collection.yml @@ -1,7 +1,5 @@ name: "Z-BASE Deploy S3 Data Collection: Build data collection image" -run-name: "${{ github.event.inputs.build_branch }} | ${{ github.event.inputs.environment }} | ${{ github.event.inputs.sandbox }}" - on: workflow_call: inputs: @@ -53,7 +51,6 @@ jobs: - name: Build, tag, and push image to Amazon ECR id: build-image env: - AWS_REGION: ${{ vars.AWS_REGION }} ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }} ECR_REPOSITORY: ${{ inputs.sandbox }}-s3-data-collection IMAGE_TAG: latest @@ -61,72 +58,41 @@ jobs: run: | set -euo pipefail - echo "AWS_REGION=$AWS_REGION" - echo "ECR_REGISTRY=$ECR_REGISTRY" - echo "ECR_REPOSITORY=$ECR_REPOSITORY" - echo "Tags: $IMAGE_TAG and $IMAGE_TAG_SHA" - echo "Dockerfile: ecs/s3_data_collection/Dockerfile" - echo "Build context: $(pwd)" - - # Build (context is lambdas/) docker build \ -t "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" \ -t "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" \ -f ecs/s3_data_collection/Dockerfile \ . - # Push both tags docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" - # Output the immutable tag for task definition update echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> $GITHUB_OUTPUT - - name: Verify image exists in ECR (debug) - env: - AWS_REGION: ${{ vars.AWS_REGION }} - REPO: ${{ inputs.sandbox }}-s3-data-collection - run: | - set -euo pipefail - echo "Checking ECR images for repo: $REPO in region: $AWS_REGION" - aws ecr describe-images \ - --region "$AWS_REGION" \ - --repository-name "$REPO" \ - --query "imageDetails[].{tags:imageTags,pushedAt:imagePushedAt,digest:imageDigest}" \ - --output table - - - name: Download current task definition (raw) + - name: Download current task definition id: download-task - env: - AWS_REGION: ${{ vars.AWS_REGION }} run: | set -euo pipefail aws ecs describe-task-definition \ - --region "$AWS_REGION" \ - --task-definition "${{ inputs.sandbox }}-task-s3-data-collection" \ + --task-definition ${{ inputs.sandbox }}-task-s3-data-collection \ --query taskDefinition \ > task-definition-raw.json - echo "previous_revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT - echo "family=$(jq -r .family task-definition-raw.json)" >> $GITHUB_OUTPUT - - echo "Downloaded task definition:" - jq '{family,revision,executionRoleArn,taskRoleArn,containerDefinitions:[.containerDefinitions[]|{name,image}]}' task-definition-raw.json + echo "revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT - name: Render task definition with new image id: task-def uses: aws-actions/amazon-ecs-render-task-definition@v1 with: - task-definition: task-definition-raw.json + task-definition: lambdas/task-definition-raw.json container-name: ${{ inputs.sandbox }}-container-s3-data-collection image: ${{ steps.build-image.outputs.image }} - - name: Sanitize rendered task definition JSON + - name: Sanitize task definition JSON run: | set -euo pipefail - # The render action outputs a file path; sanitize that file for register-task-definition jq 'del( .taskDefinitionArn, .revision, @@ -139,23 +105,12 @@ jobs: "${{ steps.task-def.outputs.task-definition }}" \ > task-definition.json - echo "Sanitized task definition (family + container image):" - jq '{family,executionRoleArn,taskRoleArn,containerDefinitions:[.containerDefinitions[]|{name,image,command,entryPoint}]}' task-definition.json - - name: Register new ECS task definition revision - env: - AWS_REGION: ${{ vars.AWS_REGION }} run: | - set -euo pipefail aws ecs register-task-definition \ - --region "$AWS_REGION" \ --cli-input-json file://task-definition.json - - name: De-register previous revision (optional) - env: - AWS_REGION: ${{ vars.AWS_REGION }} + - name: Deregister previous revision run: | - set -euo pipefail aws ecs deregister-task-definition \ - --region "$AWS_REGION" \ - --task-definition "${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.previous_revision }}" + --task-definition ${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }} From 66a3cdb5b23f0fec9ceed533d9fce732aeceb828 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 15:09:12 +0000 Subject: [PATCH 12/14] [PRMP-939] testing --- .github/workflows/s3-base-data-collection.yml | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml index b60eea399..920941650 100644 --- a/.github/workflows/s3-base-data-collection.yml +++ b/.github/workflows/s3-base-data-collection.yml @@ -1,5 +1,7 @@ name: "Z-BASE Deploy S3 Data Collection: Build data collection image" +run-name: "${{ github.event.inputs.build_branch }} | ${{ github.event.inputs.environment }} | ${{ github.event.inputs.sandbox }}" + on: workflow_call: inputs: @@ -67,29 +69,29 @@ jobs: docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG" docker push "$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" - echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> $GITHUB_OUTPUT + echo "image=$ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG_SHA" >> "$GITHUB_OUTPUT" - - name: Download current task definition + - name: Download current task definition (raw) id: download-task run: | set -euo pipefail aws ecs describe-task-definition \ - --task-definition ${{ inputs.sandbox }}-task-s3-data-collection \ + --task-definition "${{ inputs.sandbox }}-task-s3-data-collection" \ --query taskDefinition \ > task-definition-raw.json - echo "revision=$(jq -r .revision task-definition-raw.json)" >> $GITHUB_OUTPUT + echo "revision=$(jq -r .revision task-definition-raw.json)" >> "$GITHUB_OUTPUT" - name: Render task definition with new image id: task-def uses: aws-actions/amazon-ecs-render-task-definition@v1 with: - task-definition: lambdas/task-definition-raw.json + task-definition: ./lambdas/task-definition-raw.json container-name: ${{ inputs.sandbox }}-container-s3-data-collection image: ${{ steps.build-image.outputs.image }} - - name: Sanitize task definition JSON + - name: Sanitize task definition JSON for registration run: | set -euo pipefail @@ -107,10 +109,11 @@ jobs: - name: Register new ECS task definition revision run: | - aws ecs register-task-definition \ - --cli-input-json file://task-definition.json + set -euo pipefail + aws ecs register-task-definition --cli-input-json file://task-definition.json - - name: Deregister previous revision + - name: De-register previous revision run: | + set -euo pipefail aws ecs deregister-task-definition \ - --task-definition ${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }} + --task-definition "${{ inputs.sandbox }}-task-s3-data-collection:${{ steps.download-task.outputs.revision }}" From d4c6d0ab4e24819dfa1008740dd7f3d3e1f4fe42 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 15:10:45 +0000 Subject: [PATCH 13/14] [PRMP-939] testing --- .github/workflows/full-deploy-to-sandbox.yml | 4 +- .../reporting/report_s3_content_service.py | 274 +++++++++++++++--- 2 files changed, 233 insertions(+), 45 deletions(-) diff --git a/.github/workflows/full-deploy-to-sandbox.yml b/.github/workflows/full-deploy-to-sandbox.yml index c4311253d..16e15bd9c 100644 --- a/.github/workflows/full-deploy-to-sandbox.yml +++ b/.github/workflows/full-deploy-to-sandbox.yml @@ -167,13 +167,13 @@ jobs: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} deploy_s3_data_collection: - name: Deploy S3 Data Collection + name: "Deploy S3 Data Collection (ECS task image)" uses: ./.github/workflows/s3-base-data-collection.yml needs: ["deploy_all_lambdas"] with: build_branch: ${{ inputs.build_branch }} - environment: ${{ inputs.environment }} sandbox: ${{ inputs.sandbox }} + environment: ${{ inputs.environment }} secrets: AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }} diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index 241b23b01..57482ea4e 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -1,62 +1,250 @@ +import gzip +import io +import json import os -from concurrent.futures import ThreadPoolExecutor, as_completed -from io import BytesIO +from typing import Iterable, Optional from services.base.s3_service import S3Service -from services.reporting.csv_report_generator_service import CsvReportGenerator from utils.audit_logging_setup import LoggingService logger = LoggingService(__name__) class ReportS3ContentService: + """ + Reads S3 Inventory outputs (manifest + shard files) from the STATISTICAL_REPORTS_BUCKET, + then merges shards into ONE CSV per bucket. + + Inventory input location (within STATISTICAL_REPORTS_BUCKET): + unrefined-reports/s3-inventory//.../manifest.json + unrefined-reports/s3-inventory//.../ + + Output (within STATISTICAL_REPORTS_BUCKET): + unrefined-reports/s3-content-report/-inventory.csv + """ + + INVENTORY_BASE_PREFIX = "unrefined-reports/s3-inventory/" + OUTPUT_PREFIX = "unrefined-reports/s3-content-report/" + + # Multipart upload tuning + MULTIPART_MIN_PART_SIZE = 8 * 1024 * 1024 # 8MB (>= 5MB required by S3) + READ_CHUNK_SIZE = 1024 * 1024 # 1MB (not used directly but kept for tuning) + def __init__(self): + self.statistical_reports_bucket = os.environ["STATISTICAL_REPORTS_BUCKET"] self.lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") self.bulk_staging_store = os.getenv("BULK_STAGING_BUCKET_NAME") - self.statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") self.s3_service = S3Service() - self.csv_generator = CsvReportGenerator() - self.max_objects = 4500000 #To avoid prod from melting - - def _fetch_tags(self, bucket, version): - tags = self.s3_service.get_object_tags_versioned( - bucket, version["Key"], version["VersionId"] - ) - version["Tags"] = tags - return version def process_s3_content(self): - for bucket in [self.lloyd_george_bucket, self.bulk_staging_store]: - logger.info(f"Listing versions for bucket {bucket}") - versions, delete_markers = self.s3_service.list_all_object_versions(bucket) - # versions = self.s3_service.list_all_objects(bucket) - delete_markers = [] - if len(versions) > self.max_objects: - logger.warning(f"Limiting versions from {len(versions)} to {self.max_objects}") - versions = versions[: self.max_objects] - # versions = [ - # v for v in versions - # if v["IsLatest"] - # ] - logger.info(f"Fetching tags in parallel for {len(versions)} versions") - - with ThreadPoolExecutor(max_workers=20) as executor: - futures = [ - executor.submit(self._fetch_tags, bucket, v) for v in versions - ] - for _ in as_completed(futures): - pass - - logger.info(f"Generating CSV for bucket {bucket}") - csv_content = self.csv_generator.generate_s3_inventory_csv( - bucket, versions, delete_markers + jobs = [ + self.lloyd_george_bucket, + self.bulk_staging_store, + ] + + for bucket_name in jobs: + if not bucket_name: + logger.warning("Bucket name env var missing; skipping job") + continue + + inventory_prefix = f"{self.INVENTORY_BASE_PREFIX}{bucket_name}/" + output_key = f"{self.OUTPUT_PREFIX}{bucket_name}-inventory.csv" + + logger.info( + f"Processing S3 Inventory merge for bucket_name={bucket_name} " + f"inventory_prefix={inventory_prefix}" + ) + + manifest_key = self._find_latest_manifest_key( + bucket=self.statistical_reports_bucket, + prefix=inventory_prefix, + ) + + if not manifest_key: + logger.warning( + f"No manifest.json found under s3://{self.statistical_reports_bucket}/{inventory_prefix} " + f"for {bucket_name}. Inventory may not have delivered yet." + ) + continue + + manifest = self._read_json( + bucket=self.statistical_reports_bucket, + key=manifest_key, + ) + + shard_keys = self._extract_shard_keys(manifest) + if not shard_keys: + logger.warning(f"Manifest {manifest_key} contained no shard keys for {bucket_name}") + continue + + header_line = self._build_header_from_manifest(manifest) + + logger.info( + f"Merging {len(shard_keys)} shards into s3://{self.statistical_reports_bucket}/{output_key} " + f"(manifest={manifest_key})" + ) + + self._multipart_upload_merged_csv( + source_bucket=self.statistical_reports_bucket, + dest_bucket=self.statistical_reports_bucket, + dest_key=output_key, + shard_keys=shard_keys, + header_line=header_line, + ) + + logger.info(f"Completed merged inventory CSV for {bucket_name}") + + # ------------------------- + # Inventory helpers + # ------------------------- + + def _find_latest_manifest_key(self, *, bucket: str, prefix: str) -> Optional[str]: + """ + Find most recently modified object ending with 'manifest.json' under a prefix. + """ + paginator = self.s3_service.client.get_paginator("list_objects_v2") + latest = None + + for page in paginator.paginate(Bucket=bucket, Prefix=prefix): + for obj in page.get("Contents", []): + key = obj["Key"] + if not key.endswith("manifest.json"): + continue + if latest is None or obj["LastModified"] > latest["LastModified"]: + latest = obj + + return latest["Key"] if latest else None + + def _read_json(self, *, bucket: str, key: str) -> dict: + resp = self.s3_service.client.get_object(Bucket=bucket, Key=key) + body = resp["Body"].read() + return json.loads(body) + + def _extract_shard_keys(self, manifest: dict) -> list[str]: + """ + Typical S3 Inventory manifest: + { + "files": [ + {"key":"unrefined-reports/s3-inventory//.../data/part-00000.csv.gz", ...}, + ... + ], + "fileSchema": "Bucket, Key, VersionId, ..." + } + + We support common variants ("files"/"fileList", "key"/"Key"). + """ + files = manifest.get("files") or manifest.get("fileList") or [] + shard_keys: list[str] = [] + + for f in files: + if isinstance(f, dict): + if "key" in f: + shard_keys.append(f["key"]) + elif "Key" in f: + shard_keys.append(f["Key"]) + + return shard_keys + + def _build_header_from_manifest(self, manifest: dict) -> Optional[str]: + """ + If 'fileSchema' exists, write it as a CSV header row. + This gives you a single file that's self-describing. + """ + schema = manifest.get("fileSchema") + if not schema or not isinstance(schema, str): + return None + schema = schema.strip() + if not schema: + return None + return schema + "\n" + + # ------------------------- + # Stream shard content + # ------------------------- + + def _open_text_stream_for_object(self, *, bucket: str, key: str) -> io.TextIOBase: + """ + Returns a text stream for the object body, transparently handling .gz. + """ + resp = self.s3_service.client.get_object(Bucket=bucket, Key=key) + body = resp["Body"] # StreamingBody + + if key.endswith(".gz"): + gz = gzip.GzipFile(fileobj=body) + return io.TextIOWrapper(gz, encoding="utf-8", newline="") + return io.TextIOWrapper(body, encoding="utf-8", newline="") + + # ------------------------- + # Merge + multipart upload + # ------------------------- + + def _multipart_upload_merged_csv( + self, + *, + source_bucket: str, + dest_bucket: str, + dest_key: str, + shard_keys: Iterable[str], + header_line: Optional[str], + ) -> None: + s3 = self.s3_service.client + + create = s3.create_multipart_upload(Bucket=dest_bucket, Key=dest_key) + upload_id = create["UploadId"] + parts = [] + + part_number = 1 + buf = io.BytesIO() + + def flush_part(): + nonlocal part_number + data = buf.getvalue() + if not data: + return + resp = s3.upload_part( + Bucket=dest_bucket, + Key=dest_key, + UploadId=upload_id, + PartNumber=part_number, + Body=data, ) + parts.append({"ETag": resp["ETag"], "PartNumber": part_number}) + part_number += 1 + buf.seek(0) + buf.truncate(0) + + try: + # Write header once (if we have it) + if header_line: + buf.write(header_line.encode("utf-8")) + if buf.tell() >= self.MULTIPART_MIN_PART_SIZE: + flush_part() + + # Inventory shard files generally do NOT contain a header row, + # so concatenation is correct. + for shard_key in shard_keys: + logger.info(f"Reading inventory shard: s3://{source_bucket}/{shard_key}") + stream = self._open_text_stream_for_object(bucket=source_bucket, key=shard_key) + + for line in stream: + buf.write(line.encode("utf-8")) + if buf.tell() >= self.MULTIPART_MIN_PART_SIZE: + flush_part() + + # Final part + flush_part() - logger.info(f"Uploading report for bucket {bucket}") - self.s3_service.upload_file_obj( - BytesIO(csv_content.encode("utf-8")), - self.statistic_reports_bucket, - f"s3-content-report/{bucket}-inventory.csv", + s3.complete_multipart_upload( + Bucket=dest_bucket, + Key=dest_key, + UploadId=upload_id, + MultipartUpload={"Parts": parts}, ) - logger.info(f"Completed report for {bucket}") + except Exception as e: + logger.error(f"Failed merged upload to s3://{dest_bucket}/{dest_key}: {e}") + try: + s3.abort_multipart_upload(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id) + except Exception: + pass + raise From e509ecb78c7a61279d06ce691c5253965ab19656 Mon Sep 17 00:00:00 2001 From: PedroSoaresNHS Date: Thu, 12 Feb 2026 15:45:01 +0000 Subject: [PATCH 14/14] [PRMP-939] testing --- .github/workflows/s3-base-data-collection.yml | 4 +- .../reporting/report_s3_content_service.py | 274 +++--------------- 2 files changed, 45 insertions(+), 233 deletions(-) diff --git a/.github/workflows/s3-base-data-collection.yml b/.github/workflows/s3-base-data-collection.yml index 920941650..4d53e7e1c 100644 --- a/.github/workflows/s3-base-data-collection.yml +++ b/.github/workflows/s3-base-data-collection.yml @@ -87,7 +87,7 @@ jobs: id: task-def uses: aws-actions/amazon-ecs-render-task-definition@v1 with: - task-definition: ./lambdas/task-definition-raw.json + task-definition: lambdas/task-definition-raw.json container-name: ${{ inputs.sandbox }}-container-s3-data-collection image: ${{ steps.build-image.outputs.image }} @@ -112,7 +112,7 @@ jobs: set -euo pipefail aws ecs register-task-definition --cli-input-json file://task-definition.json - - name: De-register previous revision + - name: De-register previous revision (optional) run: | set -euo pipefail aws ecs deregister-task-definition \ diff --git a/lambdas/services/reporting/report_s3_content_service.py b/lambdas/services/reporting/report_s3_content_service.py index 57482ea4e..8802e27b1 100644 --- a/lambdas/services/reporting/report_s3_content_service.py +++ b/lambdas/services/reporting/report_s3_content_service.py @@ -1,250 +1,62 @@ -import gzip -import io -import json import os -from typing import Iterable, Optional +from concurrent.futures import ThreadPoolExecutor, as_completed +from io import BytesIO from services.base.s3_service import S3Service +from services.reporting.csv_report_generator_service import CsvReportGenerator from utils.audit_logging_setup import LoggingService logger = LoggingService(__name__) class ReportS3ContentService: - """ - Reads S3 Inventory outputs (manifest + shard files) from the STATISTICAL_REPORTS_BUCKET, - then merges shards into ONE CSV per bucket. - - Inventory input location (within STATISTICAL_REPORTS_BUCKET): - unrefined-reports/s3-inventory//.../manifest.json - unrefined-reports/s3-inventory//.../ - - Output (within STATISTICAL_REPORTS_BUCKET): - unrefined-reports/s3-content-report/-inventory.csv - """ - - INVENTORY_BASE_PREFIX = "unrefined-reports/s3-inventory/" - OUTPUT_PREFIX = "unrefined-reports/s3-content-report/" - - # Multipart upload tuning - MULTIPART_MIN_PART_SIZE = 8 * 1024 * 1024 # 8MB (>= 5MB required by S3) - READ_CHUNK_SIZE = 1024 * 1024 # 1MB (not used directly but kept for tuning) - def __init__(self): - self.statistical_reports_bucket = os.environ["STATISTICAL_REPORTS_BUCKET"] self.lloyd_george_bucket = os.getenv("LLOYD_GEORGE_BUCKET_NAME") self.bulk_staging_store = os.getenv("BULK_STAGING_BUCKET_NAME") + self.statistic_reports_bucket = os.getenv("STATISTICAL_REPORTS_BUCKET") self.s3_service = S3Service() + self.csv_generator = CsvReportGenerator() + self.max_objects = 4500000 #To avoid prod from melting - def process_s3_content(self): - jobs = [ - self.lloyd_george_bucket, - self.bulk_staging_store, - ] - - for bucket_name in jobs: - if not bucket_name: - logger.warning("Bucket name env var missing; skipping job") - continue - - inventory_prefix = f"{self.INVENTORY_BASE_PREFIX}{bucket_name}/" - output_key = f"{self.OUTPUT_PREFIX}{bucket_name}-inventory.csv" - - logger.info( - f"Processing S3 Inventory merge for bucket_name={bucket_name} " - f"inventory_prefix={inventory_prefix}" - ) - - manifest_key = self._find_latest_manifest_key( - bucket=self.statistical_reports_bucket, - prefix=inventory_prefix, - ) - - if not manifest_key: - logger.warning( - f"No manifest.json found under s3://{self.statistical_reports_bucket}/{inventory_prefix} " - f"for {bucket_name}. Inventory may not have delivered yet." - ) - continue - - manifest = self._read_json( - bucket=self.statistical_reports_bucket, - key=manifest_key, - ) - - shard_keys = self._extract_shard_keys(manifest) - if not shard_keys: - logger.warning(f"Manifest {manifest_key} contained no shard keys for {bucket_name}") - continue - - header_line = self._build_header_from_manifest(manifest) - - logger.info( - f"Merging {len(shard_keys)} shards into s3://{self.statistical_reports_bucket}/{output_key} " - f"(manifest={manifest_key})" - ) - - self._multipart_upload_merged_csv( - source_bucket=self.statistical_reports_bucket, - dest_bucket=self.statistical_reports_bucket, - dest_key=output_key, - shard_keys=shard_keys, - header_line=header_line, - ) - - logger.info(f"Completed merged inventory CSV for {bucket_name}") + def _fetch_tags(self, bucket, version): + tags = self.s3_service.get_object_tags_versioned( + bucket, version["Key"], version["VersionId"] + ) + version["Tags"] = tags + return version - # ------------------------- - # Inventory helpers - # ------------------------- - - def _find_latest_manifest_key(self, *, bucket: str, prefix: str) -> Optional[str]: - """ - Find most recently modified object ending with 'manifest.json' under a prefix. - """ - paginator = self.s3_service.client.get_paginator("list_objects_v2") - latest = None - - for page in paginator.paginate(Bucket=bucket, Prefix=prefix): - for obj in page.get("Contents", []): - key = obj["Key"] - if not key.endswith("manifest.json"): - continue - if latest is None or obj["LastModified"] > latest["LastModified"]: - latest = obj - - return latest["Key"] if latest else None - - def _read_json(self, *, bucket: str, key: str) -> dict: - resp = self.s3_service.client.get_object(Bucket=bucket, Key=key) - body = resp["Body"].read() - return json.loads(body) - - def _extract_shard_keys(self, manifest: dict) -> list[str]: - """ - Typical S3 Inventory manifest: - { - "files": [ - {"key":"unrefined-reports/s3-inventory//.../data/part-00000.csv.gz", ...}, - ... - ], - "fileSchema": "Bucket, Key, VersionId, ..." - } - - We support common variants ("files"/"fileList", "key"/"Key"). - """ - files = manifest.get("files") or manifest.get("fileList") or [] - shard_keys: list[str] = [] - - for f in files: - if isinstance(f, dict): - if "key" in f: - shard_keys.append(f["key"]) - elif "Key" in f: - shard_keys.append(f["Key"]) - - return shard_keys - - def _build_header_from_manifest(self, manifest: dict) -> Optional[str]: - """ - If 'fileSchema' exists, write it as a CSV header row. - This gives you a single file that's self-describing. - """ - schema = manifest.get("fileSchema") - if not schema or not isinstance(schema, str): - return None - schema = schema.strip() - if not schema: - return None - return schema + "\n" - - # ------------------------- - # Stream shard content - # ------------------------- - - def _open_text_stream_for_object(self, *, bucket: str, key: str) -> io.TextIOBase: - """ - Returns a text stream for the object body, transparently handling .gz. - """ - resp = self.s3_service.client.get_object(Bucket=bucket, Key=key) - body = resp["Body"] # StreamingBody - - if key.endswith(".gz"): - gz = gzip.GzipFile(fileobj=body) - return io.TextIOWrapper(gz, encoding="utf-8", newline="") - return io.TextIOWrapper(body, encoding="utf-8", newline="") - - # ------------------------- - # Merge + multipart upload - # ------------------------- - - def _multipart_upload_merged_csv( - self, - *, - source_bucket: str, - dest_bucket: str, - dest_key: str, - shard_keys: Iterable[str], - header_line: Optional[str], - ) -> None: - s3 = self.s3_service.client - - create = s3.create_multipart_upload(Bucket=dest_bucket, Key=dest_key) - upload_id = create["UploadId"] - parts = [] - - part_number = 1 - buf = io.BytesIO() - - def flush_part(): - nonlocal part_number - data = buf.getvalue() - if not data: - return - resp = s3.upload_part( - Bucket=dest_bucket, - Key=dest_key, - UploadId=upload_id, - PartNumber=part_number, - Body=data, + def process_s3_content(self): + for bucket in [self.lloyd_george_bucket, self.bulk_staging_store]: + logger.info(f"Listing versions for bucket {bucket}") + versions, delete_markers = self.s3_service.list_all_object_versions(bucket) + # versions = self.s3_service.list_all_objects(bucket) + delete_markers = [] + if len(versions) > self.max_objects: + logger.warning(f"Limiting versions from {len(versions)} to {self.max_objects}") + versions = versions[: self.max_objects] + # versions = [ + # v for v in versions + # if v["IsLatest"] + # ] + logger.info(f"Fetching tags in parallel for {len(versions)} versions") + + with ThreadPoolExecutor(max_workers=20) as executor: + futures = [ + executor.submit(self._fetch_tags, bucket, v) for v in versions + ] + for _ in as_completed(futures): + pass + + logger.info(f"Generating CSV for bucket {bucket}") + csv_content = self.csv_generator.generate_s3_inventory_csv( + bucket, versions, delete_markers ) - parts.append({"ETag": resp["ETag"], "PartNumber": part_number}) - part_number += 1 - buf.seek(0) - buf.truncate(0) - - try: - # Write header once (if we have it) - if header_line: - buf.write(header_line.encode("utf-8")) - if buf.tell() >= self.MULTIPART_MIN_PART_SIZE: - flush_part() - - # Inventory shard files generally do NOT contain a header row, - # so concatenation is correct. - for shard_key in shard_keys: - logger.info(f"Reading inventory shard: s3://{source_bucket}/{shard_key}") - stream = self._open_text_stream_for_object(bucket=source_bucket, key=shard_key) - - for line in stream: - buf.write(line.encode("utf-8")) - if buf.tell() >= self.MULTIPART_MIN_PART_SIZE: - flush_part() - - # Final part - flush_part() - s3.complete_multipart_upload( - Bucket=dest_bucket, - Key=dest_key, - UploadId=upload_id, - MultipartUpload={"Parts": parts}, + logger.info(f"Uploading report for bucket {bucket}") + self.s3_service.upload_file_obj( + BytesIO(csv_content.encode("utf-8")), + self.statistic_reports_bucket, + f"s3-content-report/{bucket}-inventory.csv", ) - except Exception as e: - logger.error(f"Failed merged upload to s3://{dest_bucket}/{dest_key}: {e}") - try: - s3.abort_multipart_upload(Bucket=dest_bucket, Key=dest_key, UploadId=upload_id) - except Exception: - pass - raise + logger.info(f"Completed report for {bucket}") \ No newline at end of file