diff --git a/lambdas/handlers/bulk_upload_metadata_processor_handler.py b/lambdas/handlers/bulk_upload_metadata_processor_handler.py index 79fbdd4357..4df3b8bc6d 100644 --- a/lambdas/handlers/bulk_upload_metadata_processor_handler.py +++ b/lambdas/handlers/bulk_upload_metadata_processor_handler.py @@ -9,7 +9,6 @@ from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions from utils.decorators.override_error_check import override_error_check from utils.decorators.set_audit_arg import set_request_context_for_logging -from utils.exceptions import BulkUploadMetadataException logger = LoggingService(__name__) @@ -25,13 +24,14 @@ def lambda_handler(event, _context): "preFormatType", LloydGeorgePreProcessFormat.GENERAL ) formatter_service_class = get_formatter_service(raw_pre_format_type) - practice_directory = event.get("practiceDirectory", "") + input_file_location = event.get("inputFileLocation", "") remappings = event.get("metadataFieldRemappings", {}) - metadata_formatter_service = formatter_service_class(practice_directory) + metadata_formatter_service = formatter_service_class(input_file_location) metadata_service = BulkUploadMetadataProcessorService( metadata_formatter_service=metadata_formatter_service, metadata_heading_remap=remappings, + input_file_location=input_file_location, ) if "source" in event and event.get("source") == "aws.s3": @@ -40,14 +40,14 @@ def lambda_handler(event, _context): metadata_service.handle_expedite_event(event) return - if not practice_directory: + if not input_file_location: logger.error( - "Failed to start metadata processing due to missing practice directory" + "Failed to start metadata processing due to missing field: inputFileLocation" ) return logger.info( - f"Starting metadata processing for practice directory: {practice_directory}" + f"Starting metadata processing for file location: {input_file_location}" ) fixed_values = event.get("fixedValues", {}) @@ -56,10 +56,10 @@ def lambda_handler(event, _context): validator_service.validate_fixed_values( fixed_values, remappings) - metadata_formatter_service = formatter_service_class(practice_directory) metadata_service = BulkUploadMetadataProcessorService( metadata_formatter_service=metadata_formatter_service, metadata_heading_remap=remappings, - fixed_values=fixed_values + fixed_values=fixed_values, + input_file_location=input_file_location ) metadata_service.process_metadata() diff --git a/lambdas/services/bulk_upload_metadata_processor_service.py b/lambdas/services/bulk_upload_metadata_processor_service.py index 8d51783483..80aaa8a2b1 100644 --- a/lambdas/services/bulk_upload_metadata_processor_service.py +++ b/lambdas/services/bulk_upload_metadata_processor_service.py @@ -10,11 +10,11 @@ import pydantic from botocore.exceptions import ClientError + from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat from enums.upload_status import UploadStatus from enums.virus_scan_result import VirusScanResult from models.staging_metadata import ( - METADATA_FILENAME, BulkUploadQueueMetadata, MetadataFile, StagingSqsMetadata, @@ -53,10 +53,11 @@ class BulkUploadMetadataProcessorService: def __init__( - self, - metadata_formatter_service: MetadataPreprocessorService, - metadata_heading_remap: dict, - fixed_values: dict = None, + self, + metadata_formatter_service: MetadataPreprocessorService, + metadata_heading_remap: dict, + input_file_location: str = "", + fixed_values: dict = None, ): self.staging_bucket_name = os.getenv("STAGING_STORE_BUCKET_NAME") self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL") @@ -71,29 +72,27 @@ def __init__( self.fixed_values = fixed_values or {} self.temp_download_dir = tempfile.mkdtemp() - self.practice_directory = metadata_formatter_service.practice_directory - self.file_key = ( - f"{metadata_formatter_service.practice_directory}/{METADATA_FILENAME}" - if metadata_formatter_service.practice_directory - else METADATA_FILENAME - ) + self.file_key = input_file_location self.metadata_mapping_validator_service = MetadataMappingValidatorService() self.metadata_formatter_service = metadata_formatter_service def download_metadata_from_s3(self) -> str: - local_file_path = f"{self.temp_download_dir}/{METADATA_FILENAME}" + local_file_path = f"{self.temp_download_dir}/{self.file_key.split('/')[-1]}" logger.info( f"Fetching {local_file_path} from bucket {self.staging_bucket_name}" ) - self.s3_service.download_file( - s3_bucket_name=self.staging_bucket_name, - file_key=self.file_key, - download_path=local_file_path, - ) + try: + self.s3_service.download_file( + s3_bucket_name=self.staging_bucket_name, + file_key=self.file_key, + download_path=local_file_path, + ) + except ClientError: + raise BulkUploadMetadataException(f"Could not retrieve the following metadata file: {self.file_key}") return local_file_path def process_metadata(self): @@ -109,7 +108,7 @@ def process_metadata(self): except pydantic.ValidationError as e: failure_msg = ( - f"Failed to parse {METADATA_FILENAME} due to validation error: {str(e)}" + f"Failed to parse {self.file_key} due to validation error: {str(e)}" ) logger.error(failure_msg, {"Result": UNSUCCESSFUL}) raise BulkUploadMetadataException(failure_msg) @@ -121,7 +120,7 @@ def process_metadata(self): except ClientError as e: if "HeadObject" in str(e): - failure_msg = f'No metadata file could be found with the name "{METADATA_FILENAME}"' + failure_msg = f'No metadata file could be found with the name {self.file_key}"' else: failure_msg = str(e) logger.error(failure_msg, {"Result": UNSUCCESSFUL}) @@ -134,26 +133,26 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]: ) with open( - csv_file_path, mode="r", encoding="utf-8-sig", errors="replace" + csv_file_path, mode="r", encoding="utf-8-sig", errors="replace" ) as csv_file: csv_reader = csv.DictReader(csv_file) if csv_reader.fieldnames is None: raise BulkUploadMetadataException( - f"{METADATA_FILENAME} is empty or missing headers." + f"Metdata file is empty or missing headers." ) headers = [h.strip() for h in csv_reader.fieldnames] records = list(csv_reader) if not headers: - raise BulkUploadMetadataException(f"{METADATA_FILENAME} has no headers.") + raise BulkUploadMetadataException(f"{self.file_key} has no headers.") validated_rows, rejected_rows, rejected_reasons = ( self.metadata_mapping_validator_service.validate_and_normalize_metadata( records, self.fixed_values, self.metadata_heading_remap ) ) - if rejected_reasons: + if rejected_reasons: for reason in rejected_reasons: logger.warning(f"Rejected due to: {reason['REASON']}") @@ -175,7 +174,7 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]: ] def process_metadata_row( - self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]] + self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]] ) -> None: """Validate individual file metadata and attach to patient group.""" file_metadata = MetadataFile.model_validate(row) @@ -192,20 +191,18 @@ def process_metadata_row( return sqs_metadata = self.convert_to_sqs_metadata(file_metadata, correct_file_name) - sqs_metadata.file_path = (self.practice_directory.rstrip("/") - + "/" + - sqs_metadata.file_path.lstrip("/")) + sqs_metadata.file_path = self.file_key.rsplit("/", 1)[0] + "/" + sqs_metadata.file_path.lstrip("/") patients[(nhs_number, ods_code)].append(sqs_metadata) def apply_fixed_values(self, file_metadata: MetadataFile) -> MetadataFile: - + metadata_dict = file_metadata.model_dump(by_alias=True) for field_name, fixed_value in self.fixed_values.items(): metadata_dict[field_name] = fixed_value logger.info( f"Applied fixed value for field '{field_name}': '{fixed_value}'") - + return MetadataFile.model_validate(metadata_dict) @staticmethod @@ -298,10 +295,10 @@ def handle_expedite_event(self, event): raise BulkUploadMetadataException(failure_msg) def handle_invalid_filename( - self, - file_metadata: MetadataFile, - error: InvalidFileNameException, - nhs_number: str, + self, + file_metadata: MetadataFile, + error: InvalidFileNameException, + nhs_number: str, ) -> None: """Handle invalid filenames by logging and storing failure in Dynamo.""" logger.error( @@ -316,7 +313,7 @@ def handle_invalid_filename( ) def send_metadata_to_fifo_sqs( - self, staging_sqs_metadata_list: list[StagingSqsMetadata] + self, staging_sqs_metadata_list: list[StagingSqsMetadata] ) -> None: """Send validated metadata entries to SQS FIFO queue.""" for staging_sqs_metadata in staging_sqs_metadata_list: @@ -331,7 +328,7 @@ def send_metadata_to_fifo_sqs( logger.info("Sent bulk upload metadata to sqs queue") def send_metadata_to_expedite_sqs( - self, staging_sqs_metadata: StagingSqsMetadata + self, staging_sqs_metadata: StagingSqsMetadata ) -> None: """Send validated metadata entries to SQS expedite queue.""" sqs_group_id = f"bulk_upload_{uuid.uuid4()}" @@ -354,7 +351,7 @@ def copy_metadata_to_dated_folder(self): self.staging_bucket_name, f"metadata/{current_datetime}.csv", ) - self.s3_service.delete_object(self.staging_bucket_name, METADATA_FILENAME) + self.s3_service.delete_object(self.staging_bucket_name, self.file_key) def clear_temp_storage(self): """Delete temporary working directory.""" diff --git a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py index 913c51166e..d5a7da797a 100644 --- a/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py +++ b/lambdas/tests/unit/handlers/test_bulk_upload_metadata_processor_handler.py @@ -28,7 +28,7 @@ def eventbridge_event_with_s3_key(key: str): def test_metadata_processor_lambda_handler_valid_event( set_env, context, mock_metadata_service ): - lambda_handler({"practiceDirectory": "test"}, context) + lambda_handler({"inputFileLocation": "test"}, context) mock_metadata_service.process_metadata.assert_called_once() diff --git a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py index 7f14d7271e..987f9b713e 100644 --- a/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py +++ b/lambdas/tests/unit/services/test_bulk_upload_metadata_processor_service.py @@ -1,5 +1,5 @@ -import os import copy +import os import tempfile import urllib import urllib.parse @@ -79,6 +79,7 @@ def test_service(mocker, set_env, mock_tempfile): practice_directory="test_practice_directory" ), metadata_heading_remap={}, + input_file_location="test_input_file_location", ) mocker.patch.object(service, "s3_service") @@ -133,9 +134,9 @@ def base_metadata_file(): def test_process_metadata_send_metadata_to_sqs_queue( - mocker, - test_service, - mock_download_metadata_from_s3, + mocker, + test_service, + mock_download_metadata_from_s3, ): fake_csv_path = "fake/path/metadata.csv" @@ -168,32 +169,30 @@ def test_process_metadata_send_metadata_to_sqs_queue( def test_process_metadata_catch_and_log_error_when_fail_to_get_metadata_csv_from_s3( - set_env, - caplog, - mock_s3_service, - mock_sqs_service, - test_service, + set_env, + caplog, + mock_s3_service, + mock_sqs_service, + test_service, ): mock_s3_service.download_file.side_effect = ClientError( {"Error": {"Code": "403", "Message": "Forbidden"}}, "S3:HeadObject" ) - expected_err_msg = 'No metadata file could be found with the name "metadata.csv"' + expected_err_msg = 'Could not retrieve the following metadata file: test_input_file_location' with pytest.raises(BulkUploadMetadataException) as e: test_service.process_metadata() assert expected_err_msg in str(e.value) - assert caplog.records[-1].msg == expected_err_msg - assert caplog.records[-1].levelname == "ERROR" mock_sqs_service.send_message_with_nhs_number_attr_fifo.assert_not_called() def test_process_metadata_raise_validation_error_when_metadata_csv_is_invalid( - mock_sqs_service, - mock_download_metadata_from_s3, - test_service, - mocker, + mock_sqs_service, + mock_download_metadata_from_s3, + test_service, + mocker, ): mock_download_metadata_from_s3.return_value = "fake/path.csv" mocker.patch.object( @@ -210,16 +209,16 @@ def test_process_metadata_raise_validation_error_when_metadata_csv_is_invalid( def test_process_metadata_raise_validation_error_when_gp_practice_code_is_missing( - mock_sqs_service, - mock_download_metadata_from_s3, - test_service, - mocker, + mock_sqs_service, + mock_download_metadata_from_s3, + test_service, + mocker, ): mock_download_metadata_from_s3.return_value = "fake/path.csv" expected_error_log = ( - "Failed to parse metadata.csv: 1 validation error for MetadataFile\n" - + "GP-PRACTICE-CODE\n missing GP-PRACTICE-CODE for patient 1234567890" + "Failed to parse metadata.csv: 1 validation error for MetadataFile\n" + + "GP-PRACTICE-CODE\n missing GP-PRACTICE-CODE for patient 1234567890" ) mocker.patch.object( @@ -236,7 +235,7 @@ def test_process_metadata_raise_validation_error_when_gp_practice_code_is_missin def test_process_metadata_raise_client_error_when_failed_to_send_message_to_sqs( - test_service, mocker + test_service, mocker ): mocker.patch.object( test_service, "download_metadata_from_s3", return_value="fake/path.csv" @@ -278,10 +277,10 @@ def test_process_metadata_raise_client_error_when_failed_to_send_message_to_sqs( def test_download_metadata_from_s3(mock_s3_service, test_service): result = test_service.download_metadata_from_s3() + expected_file_key = test_service.file_key expected_download_path = os.path.join( - test_service.temp_download_dir, METADATA_FILENAME + test_service.temp_download_dir, expected_file_key ) - expected_file_key = f"{test_service.practice_directory}/{METADATA_FILENAME}" mock_s3_service.download_file.assert_called_once_with( s3_bucket_name=test_service.staging_bucket_name, @@ -293,13 +292,14 @@ def test_download_metadata_from_s3(mock_s3_service, test_service): def test_download_metadata_from_s3_raise_error_when_failed_to_download( - set_env, mock_s3_service, mock_tempfile, test_service + set_env, mock_s3_service, mock_tempfile, test_service ): mock_s3_service.download_file.side_effect = ClientError( {"Error": {"Code": "500", "Message": "file not exist in bucket"}}, "s3_get_object", ) - with pytest.raises(ClientError): + with pytest.raises(BulkUploadMetadataException, + match=r"Could not retrieve the following metadata file: test_input_file_location"): test_service.download_metadata_from_s3() @@ -329,6 +329,7 @@ def bulk_upload_service(mocker, set_env, mock_tempfile): practice_directory="test_practice_directory" ), metadata_heading_remap={}, + input_file_location="test_input_file_location", ) @@ -386,7 +387,7 @@ def test_duplicates_csv_to_sqs_metadata(mocker, bulk_upload_service): for metadata in expected: for file in metadata.files: file.file_path = ( - f"test_practice_directory/{file.stored_file_name.lstrip('/')}" + f"test_input_file_location/{file.stored_file_name.lstrip('/')}" ) assert actual == expected @@ -417,7 +418,7 @@ def test_send_metadata_to_sqs(set_env, mocker, mock_sqs_service, test_service): def test_send_metadata_to_sqs_raise_error_when_fail_to_send_message( - set_env, mock_sqs_service, test_service + set_env, mock_sqs_service, test_service ): mock_sqs_service.send_message_with_nhs_number_attr_fifo.side_effect = ClientError( { @@ -474,7 +475,7 @@ def test_process_metadata_row_success(mocker, test_service): expected_sqs_metadata = BulkUploadQueueMetadata.model_validate( { - "file_path": "test_practice_directory/some/path/file.pdf", + "file_path": "test_input_file_location/some/path/file.pdf", "nhs_number": "1234567890", "gp_practice_code": "Y12345", "scan_date": "01/01/2023", @@ -524,6 +525,7 @@ def test_process_metadata_row_adds_to_existing_entry(mocker): service = BulkUploadMetadataProcessorService( metadata_formatter_service=preprocessor, metadata_heading_remap={}, + input_file_location="test_input_file_location", ) service.process_metadata_row(row, patients) @@ -531,7 +533,7 @@ def test_process_metadata_row_adds_to_existing_entry(mocker): assert len(patients[key]) == 2 assert patients[key][0] == mock_metadata_existing assert isinstance(patients[key][1], BulkUploadQueueMetadata) - assert patients[key][1].file_path == "test_practice_directory/some/path/file2.pdf" + assert patients[key][1].file_path == "test_input_file_location/some/path/file2.pdf" assert patients[key][1].stored_file_name == "/some/path/file2.pdf" @@ -543,7 +545,7 @@ def test_extract_patient_info(test_service, base_metadata_file): def test_handle_invalid_filename_writes_failed_entry_to_dynamo( - mocker, test_service, base_metadata_file + mocker, test_service, base_metadata_file ): nhs_number = "1234567890" error = InvalidFileNameException("Invalid filename format") @@ -590,7 +592,7 @@ def test_convert_to_sqs_metadata(base_metadata_file): def test_validate_and_correct_filename_returns_happy_path( - mocker, test_service, base_metadata_file + mocker, test_service, base_metadata_file ): mocker.patch( "services.bulk_upload_metadata_processor_service.validate_file_name", @@ -603,7 +605,7 @@ def test_validate_and_correct_filename_returns_happy_path( def test_validate_and_correct_filename_sad_path( - mocker, test_service, base_metadata_file + mocker, test_service, base_metadata_file ): mocker.patch( "services.bulk_upload_metadata_processor_service.validate_file_name", @@ -740,7 +742,7 @@ def test_csv_to_sqs_metadata_happy_path(mocker, bulk_upload_service, mock_csv_co def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_no_headers( - mocker, bulk_upload_service + mocker, bulk_upload_service ): mocker.patch("builtins.open", mocker.mock_open(read_data="")) @@ -749,7 +751,7 @@ def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_no_headers( def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_all_rows_rejected( - mocker, bulk_upload_service, mock_csv_content + mocker, bulk_upload_service, mock_csv_content ): mocker.patch("builtins.open", mocker.mock_open(read_data=mock_csv_content)) @@ -764,7 +766,7 @@ def test_csv_to_sqs_metadata_raises_BulkUploadMetadataException_if_all_rows_reje ) with pytest.raises( - BulkUploadMetadataException, match="No valid metadata rows found" + BulkUploadMetadataException, match="No valid metadata rows found" ): bulk_upload_service.csv_to_sqs_metadata("fake/path.csv") @@ -837,6 +839,7 @@ def mock_service_remapping_mandatory_fields(mocker, set_env, mock_tempfile): "USER-ID": "User ID", "UPLOAD": "Upload Date", }, + input_file_location="test_input_file_location", ) mocker.patch.object( @@ -864,7 +867,7 @@ def mock_remap_csv_content(): def test_remapping_mandatory_fields( - mocker, mock_service_remapping_mandatory_fields, mock_remap_csv_content + mocker, mock_service_remapping_mandatory_fields, mock_remap_csv_content ): mocker.patch("builtins.open", mocker.mock_open(read_data=mock_remap_csv_content)) @@ -876,7 +879,7 @@ def test_remapping_mandatory_fields( nhs_number="123456789", files=[ BulkUploadQueueMetadata( - file_path="test_practice_directory/path/1.pdf", + file_path="test_input_file_location/path/1.pdf", gp_practice_code="Y12345", scan_date="02/01/2023", stored_file_name="/path/1.pdf", @@ -909,6 +912,7 @@ def mock_service_no_remapping(mocker, set_env, mock_tempfile): practice_directory="test_practice_directory" ), metadata_heading_remap={}, + input_file_location="test_input_file_location", ) mocker.patch.object( @@ -935,7 +939,7 @@ def mock_noremap_csv_content(): def test_no_remapping_logic( - mocker, mock_service_no_remapping, mock_noremap_csv_content + mocker, mock_service_no_remapping, mock_noremap_csv_content ): mocker.patch("builtins.open", mocker.mock_open(read_data=mock_noremap_csv_content)) @@ -946,7 +950,7 @@ def test_no_remapping_logic( nhs_number="123456789", files=[ BulkUploadQueueMetadata( - file_path="test_practice_directory/path/1.pdf", + file_path="test_input_file_location/path/1.pdf", gp_practice_code="Y12345", scan_date="02/01/2023", stored_file_name="/path/1.pdf", @@ -956,6 +960,7 @@ def test_no_remapping_logic( ) ] + @freeze_time("2025-02-03T10:00:00") def test_validate_expedite_file_happy_path_returns_expected_tuple(test_service): ods_code = "A12345" @@ -1084,7 +1089,7 @@ def test_enforce_virus_scanner_triggers_scan_when_no_result(mocker, test_service def test_enforce_virus_scanner_raises_bulk_exception_on_s3_access_error( - mocker, test_service + mocker, test_service ): file_key = "expedite/folder/file.pdf" client_error = ClientError( @@ -1161,9 +1166,11 @@ def test_check_file_status_logs_issue_when_not_clean(mocker, test_service, caplo f"Found an issue with the file {file_key}." in record.msg for record in caplog.records ) + + def test_apply_fixed_values_no_fixed_values(test_service, base_metadata_file): result = test_service.apply_fixed_values(base_metadata_file) - + assert result == base_metadata_file @@ -1176,9 +1183,9 @@ def test_apply_fixed_values_single_field(mocker, base_metadata_file): fixed_values={"SECTION": "AR"}, ) mocker.patch.object(service, "s3_service") - + result = service.apply_fixed_values(base_metadata_file) - + assert result.section == "AR" assert result.nhs_number == base_metadata_file.nhs_number assert result.gp_practice_code == base_metadata_file.gp_practice_code @@ -1197,9 +1204,9 @@ def test_apply_fixed_values_multiple_fields(mocker, base_metadata_file): }, ) mocker.patch.object(service, "s3_service") - + result = service.apply_fixed_values(base_metadata_file) - + assert result.section == "AR" assert result.sub_section == "Mental Health" assert result.scan_id == "FIXED_SCAN_ID" @@ -1209,7 +1216,7 @@ def test_apply_fixed_values_multiple_fields(mocker, base_metadata_file): def test_apply_fixed_values_overwrites_existing_value(mocker, base_metadata_file): original_section = base_metadata_file.section assert original_section == "LG" - + service = BulkUploadMetadataProcessorService( metadata_formatter_service=MockMetadataPreprocessorService( practice_directory="test_practice_directory" @@ -1218,9 +1225,9 @@ def test_apply_fixed_values_overwrites_existing_value(mocker, base_metadata_file fixed_values={"SECTION": "AR"}, ) mocker.patch.object(service, "s3_service") - + result = service.apply_fixed_values(base_metadata_file) - + assert result.section == "AR" assert result.section != original_section @@ -1234,9 +1241,9 @@ def test_apply_fixed_values_logs_applied_values(mocker, base_metadata_file, capl fixed_values={"SECTION": "AR", "SCAN-ID": "TEST_ID"}, ) mocker.patch.object(service, "s3_service") - + service.apply_fixed_values(base_metadata_file) - + log_messages = [record.message for record in caplog.records] assert any("Applied fixed value for field 'SECTION': 'AR'" in msg for msg in log_messages) assert any("Applied fixed value for field 'SCAN-ID': 'TEST_ID'" in msg for msg in log_messages) @@ -1251,9 +1258,9 @@ def test_apply_fixed_values_returns_valid_metadata_file(mocker, base_metadata_fi fixed_values={"SECTION": "AR"}, ) mocker.patch.object(service, "s3_service") - + result = service.apply_fixed_values(base_metadata_file) - + assert isinstance(result, MetadataFile) # Ensure it can be validated again validated = MetadataFile.model_validate(result.model_dump(by_alias=True))