Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions lambdas/handlers/bulk_upload_metadata_processor_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging
from utils.exceptions import BulkUploadMetadataException

logger = LoggingService(__name__)

Expand All @@ -25,13 +24,14 @@ def lambda_handler(event, _context):
"preFormatType", LloydGeorgePreProcessFormat.GENERAL
)
formatter_service_class = get_formatter_service(raw_pre_format_type)
practice_directory = event.get("practiceDirectory", "")
input_file_location = event.get("inputFileLocation", "")

remappings = event.get("metadataFieldRemappings", {})
metadata_formatter_service = formatter_service_class(practice_directory)
metadata_formatter_service = formatter_service_class(input_file_location)
metadata_service = BulkUploadMetadataProcessorService(
metadata_formatter_service=metadata_formatter_service,
metadata_heading_remap=remappings,
input_file_location=input_file_location,
)

if "source" in event and event.get("source") == "aws.s3":
Expand All @@ -40,14 +40,14 @@ def lambda_handler(event, _context):
metadata_service.handle_expedite_event(event)
return

if not practice_directory:
if not input_file_location:
logger.error(
"Failed to start metadata processing due to missing practice directory"
"Failed to start metadata processing due to missing field: inputFileLocation"
)
return

logger.info(
f"Starting metadata processing for practice directory: {practice_directory}"
f"Starting metadata processing for file location: {input_file_location}"
)

fixed_values = event.get("fixedValues", {})
Expand All @@ -56,10 +56,10 @@ def lambda_handler(event, _context):
validator_service.validate_fixed_values(
fixed_values, remappings)

metadata_formatter_service = formatter_service_class(practice_directory)
metadata_service = BulkUploadMetadataProcessorService(
metadata_formatter_service=metadata_formatter_service,
metadata_heading_remap=remappings,
fixed_values=fixed_values
fixed_values=fixed_values,
input_file_location=input_file_location
)
metadata_service.process_metadata()
69 changes: 33 additions & 36 deletions lambdas/services/bulk_upload_metadata_processor_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@

import pydantic
from botocore.exceptions import ClientError

from enums.lloyd_george_pre_process_format import LloydGeorgePreProcessFormat
from enums.upload_status import UploadStatus
from enums.virus_scan_result import VirusScanResult
from models.staging_metadata import (
METADATA_FILENAME,
BulkUploadQueueMetadata,
MetadataFile,
StagingSqsMetadata,
Expand Down Expand Up @@ -53,10 +53,11 @@
class BulkUploadMetadataProcessorService:

def __init__(
self,
metadata_formatter_service: MetadataPreprocessorService,
metadata_heading_remap: dict,
fixed_values: dict = None,
self,
metadata_formatter_service: MetadataPreprocessorService,
metadata_heading_remap: dict,
input_file_location: str = "",
fixed_values: dict = None,
):
self.staging_bucket_name = os.getenv("STAGING_STORE_BUCKET_NAME")
self.metadata_queue_url = os.getenv("METADATA_SQS_QUEUE_URL")
Expand All @@ -71,29 +72,27 @@ def __init__(
self.fixed_values = fixed_values or {}

self.temp_download_dir = tempfile.mkdtemp()
self.practice_directory = metadata_formatter_service.practice_directory
self.file_key = (
f"{metadata_formatter_service.practice_directory}/{METADATA_FILENAME}"
if metadata_formatter_service.practice_directory
else METADATA_FILENAME
)
self.file_key = input_file_location

self.metadata_mapping_validator_service = MetadataMappingValidatorService()

self.metadata_formatter_service = metadata_formatter_service

def download_metadata_from_s3(self) -> str:
local_file_path = f"{self.temp_download_dir}/{METADATA_FILENAME}"
local_file_path = f"{self.temp_download_dir}/{self.file_key.split('/')[-1]}"

logger.info(
f"Fetching {local_file_path} from bucket {self.staging_bucket_name}"
)

self.s3_service.download_file(
s3_bucket_name=self.staging_bucket_name,
file_key=self.file_key,
download_path=local_file_path,
)
try:
self.s3_service.download_file(
s3_bucket_name=self.staging_bucket_name,
file_key=self.file_key,
download_path=local_file_path,
)
except ClientError:
raise BulkUploadMetadataException(f"Could not retrieve the following metadata file: {self.file_key}")
return local_file_path

def process_metadata(self):
Expand All @@ -109,7 +108,7 @@ def process_metadata(self):

except pydantic.ValidationError as e:
failure_msg = (
f"Failed to parse {METADATA_FILENAME} due to validation error: {str(e)}"
f"Failed to parse {self.file_key} due to validation error: {str(e)}"
)
logger.error(failure_msg, {"Result": UNSUCCESSFUL})
raise BulkUploadMetadataException(failure_msg)
Expand All @@ -121,7 +120,7 @@ def process_metadata(self):

except ClientError as e:
if "HeadObject" in str(e):
failure_msg = f'No metadata file could be found with the name "{METADATA_FILENAME}"'
failure_msg = f'No metadata file could be found with the name {self.file_key}"'
else:
failure_msg = str(e)
logger.error(failure_msg, {"Result": UNSUCCESSFUL})
Expand All @@ -134,26 +133,26 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
)

with open(
csv_file_path, mode="r", encoding="utf-8-sig", errors="replace"
csv_file_path, mode="r", encoding="utf-8-sig", errors="replace"
) as csv_file:
csv_reader = csv.DictReader(csv_file)
if csv_reader.fieldnames is None:
raise BulkUploadMetadataException(
f"{METADATA_FILENAME} is empty or missing headers."
f"Metdata file is empty or missing headers."
)

headers = [h.strip() for h in csv_reader.fieldnames]
records = list(csv_reader)

if not headers:
raise BulkUploadMetadataException(f"{METADATA_FILENAME} has no headers.")
raise BulkUploadMetadataException(f"{self.file_key} has no headers.")

validated_rows, rejected_rows, rejected_reasons = (
self.metadata_mapping_validator_service.validate_and_normalize_metadata(
records, self.fixed_values, self.metadata_heading_remap
)
)
if rejected_reasons:
if rejected_reasons:
for reason in rejected_reasons:
logger.warning(f"Rejected due to: {reason['REASON']}")

Expand All @@ -175,7 +174,7 @@ def csv_to_sqs_metadata(self, csv_file_path: str) -> list[StagingSqsMetadata]:
]

def process_metadata_row(
self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]]
self, row: dict, patients: dict[tuple[str, str], list[BulkUploadQueueMetadata]]
) -> None:
"""Validate individual file metadata and attach to patient group."""
file_metadata = MetadataFile.model_validate(row)
Expand All @@ -192,20 +191,18 @@ def process_metadata_row(
return

sqs_metadata = self.convert_to_sqs_metadata(file_metadata, correct_file_name)
sqs_metadata.file_path = (self.practice_directory.rstrip("/")
+ "/" +
sqs_metadata.file_path.lstrip("/"))
sqs_metadata.file_path = self.file_key.rsplit("/", 1)[0] + "/" + sqs_metadata.file_path.lstrip("/")
patients[(nhs_number, ods_code)].append(sqs_metadata)

def apply_fixed_values(self, file_metadata: MetadataFile) -> MetadataFile:

metadata_dict = file_metadata.model_dump(by_alias=True)

for field_name, fixed_value in self.fixed_values.items():
metadata_dict[field_name] = fixed_value
logger.info(
f"Applied fixed value for field '{field_name}': '{fixed_value}'")

return MetadataFile.model_validate(metadata_dict)

@staticmethod
Expand Down Expand Up @@ -298,10 +295,10 @@ def handle_expedite_event(self, event):
raise BulkUploadMetadataException(failure_msg)

def handle_invalid_filename(
self,
file_metadata: MetadataFile,
error: InvalidFileNameException,
nhs_number: str,
self,
file_metadata: MetadataFile,
error: InvalidFileNameException,
nhs_number: str,
) -> None:
"""Handle invalid filenames by logging and storing failure in Dynamo."""
logger.error(
Expand All @@ -316,7 +313,7 @@ def handle_invalid_filename(
)

def send_metadata_to_fifo_sqs(
self, staging_sqs_metadata_list: list[StagingSqsMetadata]
self, staging_sqs_metadata_list: list[StagingSqsMetadata]
) -> None:
"""Send validated metadata entries to SQS FIFO queue."""
for staging_sqs_metadata in staging_sqs_metadata_list:
Expand All @@ -331,7 +328,7 @@ def send_metadata_to_fifo_sqs(
logger.info("Sent bulk upload metadata to sqs queue")

def send_metadata_to_expedite_sqs(
self, staging_sqs_metadata: StagingSqsMetadata
self, staging_sqs_metadata: StagingSqsMetadata
) -> None:
"""Send validated metadata entries to SQS expedite queue."""
sqs_group_id = f"bulk_upload_{uuid.uuid4()}"
Expand All @@ -354,7 +351,7 @@ def copy_metadata_to_dated_folder(self):
self.staging_bucket_name,
f"metadata/{current_datetime}.csv",
)
self.s3_service.delete_object(self.staging_bucket_name, METADATA_FILENAME)
self.s3_service.delete_object(self.staging_bucket_name, self.file_key)

def clear_temp_storage(self):
"""Delete temporary working directory."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def eventbridge_event_with_s3_key(key: str):
def test_metadata_processor_lambda_handler_valid_event(
set_env, context, mock_metadata_service
):
lambda_handler({"practiceDirectory": "test"}, context)
lambda_handler({"inputFileLocation": "test"}, context)

mock_metadata_service.process_metadata.assert_called_once()

Expand Down
Loading
Loading