diff --git a/lambdas/id_sync/README.md b/lambdas/id_sync/README.md index 634805364..cfd2da256 100644 --- a/lambdas/id_sync/README.md +++ b/lambdas/id_sync/README.md @@ -28,8 +28,4 @@ - Code is located in the `lambdas/id_sync/src/` directory. - Unit tests are in the `lambdas/id_sync/tests/` directory. -- Use the provided Makefile and Dockerfile for building, testing, and packaging. - -## License - -This project is maintained by NHS. See [LICENSE](../LICENSE) for details. +- Use the provided Makefile and Dockerfile for building, testing, and packaging. \ No newline at end of file diff --git a/lambdas/id_sync/src/id_sync.py b/lambdas/id_sync/src/id_sync.py index 91855efef..06c8d577d 100644 --- a/lambdas/id_sync/src/id_sync.py +++ b/lambdas/id_sync/src/id_sync.py @@ -1,46 +1,50 @@ -from common.clients import logger -from common.clients import STREAM_NAME +""" +- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`. +- Delegate each record to `process_record` and collect `nhs_number` from each result. +- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers. +- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`. +""" + +from typing import Any, Dict +from common.clients import logger, STREAM_NAME from common.log_decorator import logging_decorator from common.aws_lambda_event import AwsLambdaEvent from exceptions.id_sync_exception import IdSyncException from record_processor import process_record -''' -Lambda function handler for processing SQS events.Lambda for ID Sync. Fired by SQS -''' @logging_decorator(prefix="id_sync", stream_name=STREAM_NAME) -def handler(event_data, _): - +def handler(event_data: Dict[str, Any], _context) -> Dict[str, Any]: try: - logger.info("id_sync handler invoked") event = AwsLambdaEvent(event_data) - record_count = len(event.records) - if record_count > 0: - logger.info("id_sync processing event with %d records", record_count) - error_count = 0 - nhs_numbers = [] - for record in event.records: - record_result = process_record(record) - nhs_numbers.append(record_result["nhs_number"]) - if record_result["status"] == "error": - error_count += 1 - if error_count > 0: - raise IdSyncException(message=f"Processed {record_count} records with {error_count} errors", - nhs_numbers=nhs_numbers) - - else: - response = {"status": "success", - "message": f"Successfully processed {record_count} records", - "nhs_numbers": nhs_numbers} - else: - response = {"status": "success", "message": "No records found in event"} + records = event.records + + if not records: + return {"status": "success", "message": "No records found in event"} + + logger.info("id_sync processing event with %d records", len(records)) + + results = [process_record(record) for record in records] + nhs_numbers = [result["nhs_number"] for result in results] + error_count = sum(1 for result in results if result.get("status") == "error") + + if error_count: + raise IdSyncException(message=f"Processed {len(records)} records with {error_count} errors", + nhs_numbers=nhs_numbers) + + response = { + "status": "success", + "message": f"Successfully processed {len(records)} records", + "nhs_numbers": nhs_numbers + } + logger.info("id_sync handler completed: %s", response) return response + except IdSyncException as e: logger.exception(f"id_sync error: {e.message}") - raise e - except Exception as e: + raise + except Exception: msg = "Error processing id_sync event" logger.exception(msg) - raise IdSyncException(message=msg, exception=e) + raise IdSyncException(message=msg) diff --git a/lambdas/id_sync/src/ieds_db_operations.py b/lambdas/id_sync/src/ieds_db_operations.py index d5bf82f7d..cfff62697 100644 --- a/lambdas/id_sync/src/ieds_db_operations.py +++ b/lambdas/id_sync/src/ieds_db_operations.py @@ -2,9 +2,11 @@ from os_vars import get_ieds_table_name from common.aws_dynamodb import get_dynamodb_table from common.clients import logger, dynamodb_client +from utils import make_status from exceptions.id_sync_exception import IdSyncException ieds_table = None +BATCH_SIZE = 25 # DynamoDB TransactWriteItems max batch size def get_ieds_table(): @@ -16,97 +18,47 @@ def get_ieds_table(): return ieds_table -def ieds_check_exist(id: str) -> bool: - """Check if a record exists in the IEDS table for the given ID.""" - logger.info(f"Check Id exists ID: {id}") - items = get_items_from_patient_id(id, 1) - - if items or len(items) > 0: - logger.info(f"Found patient ID: {id}") - return True - return False - - -BATCH_SIZE = 25 - - -def ieds_update_patient_id(old_id: str, new_id: str) -> dict: +def ieds_update_patient_id(old_id: str, new_id: str, items_to_update: list | None = None) -> dict: """Update the patient ID in the IEDS table.""" logger.info(f"ieds_update_patient_id. Update patient ID from {old_id} to {new_id}") if not old_id or not new_id or not old_id.strip() or not new_id.strip(): - return {"status": "error", "message": "Old ID and New ID cannot be empty"} + return make_status("Old ID and New ID cannot be empty", old_id, "error") if old_id == new_id: - return {"status": "success", "message": f"No change in patient ID: {old_id}"} + return make_status(f"No change in patient ID: {old_id}", old_id) try: logger.info(f"Updating patient ID in IEDS from {old_id} to {new_id}") - new_patient_pk = f"Patient#{new_id}" - - logger.info("Getting items to update in IEDS table...") - items_to_update = get_items_from_patient_id(old_id) + if items_to_update is None: + logger.info("Getting items to update in IEDS table...") + items_to_update = get_items_from_patient_id(old_id) + else: + logger.info("Using provided items_to_update list, size=%d", len(items_to_update)) if not items_to_update: logger.warning(f"No items found to update for patient ID: {old_id}") - return { - "status": "success", - "message": f"No items found to update for patient ID: {old_id}" - } - - transact_items = [] + return make_status(f"No items found to update for patient ID: {old_id}", old_id) logger.info(f"Items to update: {len(items_to_update)}") - ieds_table_name = get_ieds_table_name() - for item in items_to_update: - transact_items.append({ - 'Update': { - 'TableName': ieds_table_name, - 'Key': { - 'PK': {'S': item['PK']}, - }, - 'UpdateExpression': 'SET PatientPK = :new_val', - 'ExpressionAttributeValues': { - ':new_val': {'S': new_patient_pk} - } - } - }) - - logger.info("Transacting items in IEDS table...") - # success tracking - all_batches_successful = True - total_batches = 0 - - # Batch transact in chunks of BATCH_SIZE - for i in range(0, len(transact_items), BATCH_SIZE): - batch = transact_items[i:i+BATCH_SIZE] - total_batches += 1 - logger.info(f"Transacting batch {total_batches} of size: {len(batch)}") - response = dynamodb_client.transact_write_items(TransactItems=batch) - logger.info("Batch update complete. Response: %s", response) + # Build transact items and execute them in batches via helpers to keep + # the top-level function easy to read and test. + transact_items = build_transact_items(old_id, new_id, items_to_update) - # Check each batch response - if response['ResponseMetadata']['HTTPStatusCode'] != 200: - all_batches_successful = False - logger.error( - f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}") + all_batches_successful, total_batches = execute_transaction_in_batches(transact_items) # Consolidated response handling logger.info( f"All batches complete. Total batches: {total_batches}, All successful: {all_batches_successful}") if all_batches_successful: - return { - "status": "success", - "message": - f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}." - } + return make_status( + f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}.", + old_id, + ) else: - return { - "status": "error", - "message": f"Failed to update some batches for patient ID: {old_id}" - } + return make_status(f"Failed to update some batches for patient ID: {old_id}", old_id, "error") except Exception as e: logger.exception("Error updating patient ID") @@ -118,26 +70,133 @@ def ieds_update_patient_id(old_id: str, new_id: str) -> dict: ) -def get_items_from_patient_id(id: str, limit=BATCH_SIZE) -> list: - """Get all items for patient ID.""" - logger.info(f"Getting items for patient id: {id}") +def get_items_from_patient_id(id: str) -> list: + """Public wrapper: build PatientPK and return all matching items. + + Delegates actual paging to the internal helper `_paginate_items_for_patient_pk`. + Raises IdSyncException on error. + """ + logger.info("Getting items for patient id: %s", id) patient_pk = f"Patient#{id}" try: - response = get_ieds_table().query( - IndexName='PatientGSI', # query the GSI - KeyConditionExpression=Key('PatientPK').eq(patient_pk), - Limit=limit - ) - - if 'Items' not in response or not response['Items']: - logger.warning(f"No items found for patient PK: {patient_pk}") - return [] - - return response['Items'] + return paginate_items_for_patient_pk(patient_pk) + except IdSyncException: + raise except Exception as e: - logger.exception(f"Error querying items for patient PK: {patient_pk}") + logger.exception("Error querying items for patient PK: %s", patient_pk) raise IdSyncException( message=f"Error querying items for patient PK: {patient_pk}", nhs_numbers=[patient_pk], - exception=e + exception=e, ) + + +def paginate_items_for_patient_pk(patient_pk: str) -> list: + """Internal helper that pages through the PatientGSI and returns all items. + + Raises IdSyncException when the DynamoDB response is malformed. + """ + all_items: list = [] + last_evaluated_key = None + while True: + query_args = { + "IndexName": "PatientGSI", + "KeyConditionExpression": Key('PatientPK').eq(patient_pk), + } + if last_evaluated_key: + query_args["ExclusiveStartKey"] = last_evaluated_key + + response = get_ieds_table().query(**query_args) + + if "Items" not in response: + # Unexpected DynamoDB response shape - surface as IdSyncException + logger.exception("Unexpected DynamoDB response: missing 'Items'") + raise IdSyncException( + message="No Items in DynamoDB response", + nhs_numbers=[patient_pk], + exception=response, + ) + + items = response.get("Items", []) + all_items.extend(items) + + last_evaluated_key = response.get("LastEvaluatedKey") + if not last_evaluated_key: + break + + if not all_items: + logger.warning("No items found for patient PK: %s", patient_pk) + return [] + + return all_items + + +def extract_patient_resource_from_item(item: dict) -> dict | None: + """ + Extract a Patient resource dict from an IEDS database. + """ + patient_resource = item.get("Resource", None) + if not isinstance(patient_resource, dict): + return None + + for response in patient_resource.get("contained", []): + if isinstance(response, dict) and response.get("resourceType") == "Patient": + return response + + return None + + +def build_transact_items(old_id: str, new_id: str, items_to_update: list) -> list: + """Construct the list of TransactItems for DynamoDB TransactWriteItems. + + Each item uses a conditional expression to ensure PatientPK hasn't changed + since it was read. + """ + transact_items = [] + ieds_table_name = get_ieds_table_name() + new_patient_pk = f"Patient#{new_id}" + + for item in items_to_update: + old_patient_pk = item.get('PatientPK', f"Patient#{old_id}") + + transact_items.append({ + 'Update': { + 'TableName': ieds_table_name, + 'Key': { + 'PK': {'S': item['PK']}, + }, + 'UpdateExpression': 'SET PatientPK = :new_val', + "ConditionExpression": "PatientPK = :expected_old", + 'ExpressionAttributeValues': { + ':new_val': {'S': new_patient_pk}, + ':expected_old': {'S': old_patient_pk} + } + } + }) + + return transact_items + + +def execute_transaction_in_batches(transact_items: list) -> tuple: + """Execute transact write items in batches of BATCH_SIZE. + + Returns (all_batches_successful: bool, total_batches: int). + """ + all_batches_successful = True + total_batches = 0 + + for i in range(0, len(transact_items), BATCH_SIZE): + batch = transact_items[i:i+BATCH_SIZE] + total_batches += 1 + logger.info(f"Transacting batch {total_batches} of size: {len(batch)}") + + response = dynamodb_client.transact_write_items(TransactItems=batch) + logger.info("Batch update complete. Response: %s", response) + + # Check each batch response + if response['ResponseMetadata']['HTTPStatusCode'] != 200: + all_batches_successful = False + logger.error( + f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}") + + return all_batches_successful, total_batches diff --git a/lambdas/id_sync/src/pds_details.py b/lambdas/id_sync/src/pds_details.py index e8fecb5a7..4729a0aec 100644 --- a/lambdas/id_sync/src/pds_details.py +++ b/lambdas/id_sync/src/pds_details.py @@ -13,6 +13,7 @@ safe_tmp_dir = tempfile.mkdtemp(dir="/tmp") # NOSONAR +# Get Patient details from external service PDS using NHS number from MNS notification def pds_get_patient_details(nhs_number: str) -> dict: try: logger.info(f"get patient details. nhs_number: {nhs_number}") @@ -34,6 +35,7 @@ def pds_get_patient_details(nhs_number: str) -> dict: raise IdSyncException(message=msg, exception=e) +# Extract Patient identifier value from PDS patient details def pds_get_patient_id(nhs_number: str) -> str: """ Get PDS patient ID from NHS number. @@ -48,7 +50,6 @@ def pds_get_patient_id(nhs_number: str) -> str: return patient_details["identifier"][0]["value"] - # ✅ Remove the IdSyncException catch since you're just re-raising except Exception as e: msg = f"Error getting PDS patient ID for {nhs_number}" logger.exception(msg) diff --git a/lambdas/id_sync/src/record_processor.py b/lambdas/id_sync/src/record_processor.py index 146095431..5b5eac7c6 100644 --- a/lambdas/id_sync/src/record_processor.py +++ b/lambdas/id_sync/src/record_processor.py @@ -1,61 +1,168 @@ -''' - record Processor -''' from common.clients import logger -from typing import Optional -from pds_details import pds_get_patient_id -from ieds_db_operations import ieds_check_exist, ieds_update_patient_id +from typing import Dict, Any +from pds_details import pds_get_patient_id, pds_get_patient_details +from ieds_db_operations import ( + ieds_update_patient_id, + extract_patient_resource_from_item, + get_items_from_patient_id, +) +from utils import make_status import json import ast -def process_record(event_record): +def process_record(event_record: Dict[str, Any]) -> Dict[str, Any]: logger.info("process_record. Processing record: %s", event_record) body_text = event_record.get('body', '') - # convert body to json + + # convert body to json (try JSON first, then fall back to Python literal) if isinstance(body_text, str): try: - # Try JSON first body = json.loads(body_text) except json.JSONDecodeError: try: - # Fall back to Python dict syntax body = ast.literal_eval(body_text) except (ValueError, SyntaxError): logger.error("Failed to parse body: %s", body_text) return {"status": "error", "message": "Invalid body format"} else: body = body_text + nhs_number = body.get("subject") logger.info("process record NHS number: %s", nhs_number) if nhs_number: return process_nhs_number(nhs_number) - else: - logger.info("No NHS number found in event record") - return {"status": "error", "message": "No NHS number found in event record"} + + logger.info("No NHS number found in event record") + return {"status": "error", "message": "No NHS number found in event record"} -def process_nhs_number(nhs_number: str) -> Optional[str]: +def process_nhs_number(nhs_number: str) -> Dict[str, Any]: # get patient details from PDS - logger.info(f"process_nhs_number. Processing NHS number: {nhs_number}") - patient_details_id = pds_get_patient_id(nhs_number) - - base_log_data = {"nhs_number": nhs_number} - if patient_details_id: - logger.info(f"process_nhs_number. Patient details ID: {patient_details_id}") - # if patient NHS != id, update patient index of vax events to new number - if patient_details_id != nhs_number and patient_details_id: - logger.info(f"process_nhs_number. Update patient ID from {nhs_number} to {patient_details_id}") - if ieds_check_exist(nhs_number): - logger.info("process_nhs_number. IEDS record found, updating patient ID") - response = ieds_update_patient_id(nhs_number, patient_details_id) - else: - logger.info("process_nhs_number. No ieds record found for: %s", nhs_number) - response = {"status": "success", "message": f"No records returned for ID: {nhs_number}"} + new_nhs_number = pds_get_patient_id(nhs_number) + + if not new_nhs_number: + return make_status("No patient ID found for NHS number", nhs_number) + + if new_nhs_number == nhs_number: + return make_status("No update required", nhs_number) + + logger.info("Update patient ID from %s to %s", nhs_number, new_nhs_number) + + try: + # Fetch PDS Patient resource and IEDS resources for the old NHS number + pds_patient_resource, ieds_resources = fetch_pds_and_ieds_resources(nhs_number) + except Exception as e: + logger.exception("process_nhs_number: failed to fetch demographic details: %s", e) + return make_status(str(e), nhs_number, "error") + + if not ieds_resources: + logger.info("No IEDS records returned for NHS number: %s", nhs_number) + return make_status(f"No records returned for ID: {nhs_number}", nhs_number) + + # Compare demographics from PDS to each IEDS item, keep only matching records + matching_records = [] + discarded_count = 0 + for detail in ieds_resources: + if demographics_match(pds_patient_resource, detail): + matching_records.append(detail) else: - response = {"status": "success", "message": "No update required"} - else: - response = {"status": "success", "message": f"No patient ID found for NHS number: {nhs_number}"} - response.update(base_log_data) + discarded_count += 1 + + if not matching_records: + logger.info("No records matched PDS demographics: %d", discarded_count) + return make_status("No records matched PDS demographics; update skipped", nhs_number) + + response = ieds_update_patient_id( + nhs_number, new_nhs_number, items_to_update=matching_records + ) + response["nhs_number"] = nhs_number + # add counts for observability + response["matched"] = len(matching_records) + response["discarded"] = discarded_count return response + + +# Function to fetch PDS Patient details and IEDS Immunisation records +def fetch_pds_and_ieds_resources(nhs_number: str): + try: + pds = pds_get_patient_details(nhs_number) + except Exception as e: + raise RuntimeError("Failed to fetch PDS details") from e + try: + ieds = get_items_from_patient_id(nhs_number) + except Exception as e: + raise RuntimeError("Failed to fetch IEDS items") from e + return pds, ieds + + +def extract_normalized_name_from_patient(patient: dict) -> str | None: + """Return a normalized 'given family' name string from a Patient resource or None.""" + if not patient: + return None + name = patient.get("name") + if not name: + return None + try: + name_entry = name[0] if isinstance(name, list) else name + given = name_entry.get("given") + given_str = None + if isinstance(given, list) and given: + given_str = given[0] + elif isinstance(given, str): + given_str = given + family = name_entry.get("family") + parts = [p for p in [given_str, family] if p] + return " ".join(parts).strip().lower() if parts else None + except Exception: + return None + + +def demographics_match(pds_details: dict, ieds_item: dict) -> bool: + """Compare PDS patient details from PDS to an IEDS item (FHIR Patient resource). + Returns True if name, birthDate and gender match (when present in both sources). + If required fields are missing or unparsable on the IEDS side the function returns False. + """ + try: + def normalize_strings(item: Any) -> str | None: + return str(item).strip().lower() if item else None + + # Retrieve patient resource from PDS + pds_name = normalize_strings(extract_normalized_name_from_patient(pds_details)) + pds_gender = normalize_strings(pds_details.get("gender")) + pds_birth = normalize_strings(pds_details.get("birthDate")) + + # Retrieve patient resource from IEDS item + patient = extract_patient_resource_from_item(ieds_item) + if not patient: + logger.debug("demographics_match: no patient resource in IEDS table item") + return False + + # normalize patient fields from IEDS + ieds_name = normalize_strings(extract_normalized_name_from_patient(patient)) + ieds_gender = normalize_strings(patient.get("gender")) + ieds_birth = normalize_strings(patient.get("birthDate")) + + # All required fields must be present + if not all([pds_name, pds_gender, pds_birth, ieds_name, ieds_gender, ieds_birth]): + logger.debug("demographics_match: missing required demographics") + return False + + # Compare fields + if pds_birth != ieds_birth: + logger.debug("demographics_match: birthDate mismatch %s != %s", pds_birth, ieds_birth) + return False + + if pds_gender != ieds_gender: + logger.debug("demographics_match: gender mismatch %s != %s", pds_gender, ieds_gender) + return False + + if pds_name != ieds_name: + logger.debug("demographics_match: name mismatch %s != %s", pds_name, ieds_name) + return False + + return True + except Exception: + logger.exception("demographics_match: comparison failed with exception") + return False diff --git a/lambdas/id_sync/src/utils.py b/lambdas/id_sync/src/utils.py new file mode 100644 index 000000000..ea9dcdef4 --- /dev/null +++ b/lambdas/id_sync/src/utils.py @@ -0,0 +1,13 @@ +from typing import Dict, Any + + +def make_status(msg: str, nhs_number: str | None = None, status: str = "success") -> Dict[str, Any]: + """Return a simple status dict used by record processing for observability. + + If `nhs_number` is None the key is omitted which keeps the output shape + compatible with callers that expect only a status/message. + """ + result = {"status": status, "message": msg} + if nhs_number is not None: + result["nhs_number"] = nhs_number + return result diff --git a/lambdas/id_sync/tests/test_ieds_db_operations.py b/lambdas/id_sync/tests/test_ieds_db_operations.py index b0a0151cf..6a9d956c5 100644 --- a/lambdas/id_sync/tests/test_ieds_db_operations.py +++ b/lambdas/id_sync/tests/test_ieds_db_operations.py @@ -312,6 +312,7 @@ def test_ieds_update_patient_id_success(self): "status": "success", "message": f"IEDS update, patient ID: {old_id}=>{new_id}. {len(mock_items)} updated 1." } + expected_result["nhs_number"] = old_id self.assertEqual(result, expected_result) # Verify get_items_from_patient_id was called @@ -342,6 +343,7 @@ def test_ieds_update_patient_id_non_200_response(self): "status": "error", "message": f"Failed to update some batches for patient ID: {old_id}" } + expected_result["nhs_number"] = old_id self.assertEqual(result, expected_result) # Verify transact_write_items was called (not update_item) @@ -364,6 +366,7 @@ def test_ieds_update_patient_id_no_items_found(self): "status": "success", "message": f"No items found to update for patient ID: {old_id}" } + expected_result["nhs_number"] = old_id self.assertEqual(result, expected_result) # Verify get_items_from_patient_id was called @@ -386,6 +389,7 @@ def test_ieds_update_patient_id_empty_old_id(self): "status": "error", "message": "Old ID and New ID cannot be empty" } + expected_result["nhs_number"] = old_id self.assertEqual(result, expected_result) # Verify no update was attempted @@ -406,6 +410,7 @@ def test_ieds_update_patient_id_empty_new_id(self): "status": "error", "message": "Old ID and New ID cannot be empty" } + expected_result["nhs_number"] = old_id self.assertEqual(result, expected_result) # Verify no update was attempted @@ -425,6 +430,7 @@ def test_ieds_update_patient_id_same_old_and_new_id(self): "status": "success", "message": f"No change in patient ID: {patient_id}" } + expected_result["nhs_number"] = patient_id self.assertEqual(result, expected_result) # Verify no update was attempted @@ -535,144 +541,69 @@ def test_get_items_from_patient_id_no_records(self): self.assertEqual(result, []) -class TestIedsCheckExists(TestIedsDbOperations): - +class TestIedsDbOperationsConditional(unittest.TestCase): def setUp(self): - """Set up test fixtures""" - super().setUp() - - # Mock get_items_from_patient_id instead of table.query - self.get_items_from_patient_id_patcher = patch('ieds_db_operations.get_items_from_patient_id') - self.mock_get_items_from_patient_id = self.get_items_from_patient_id_patcher.start() - - def tearDown(self): - """Clean up patches""" - super().tearDown() - - def test_ieds_check_exist_record_exists(self): - """Test when record exists in IEDS table""" - # Arrange - patient_id = "test-patient-123" - mock_items = [{'PK': 'Patient#test-patient-123', 'PatientPK': 'Patient#test-patient-123'}] - self.mock_get_items_from_patient_id.return_value = mock_items - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertTrue(result) - - # Verify get_items_from_patient_id was called with correct parameters - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_record_not_exists(self): - """Test when no record exists in IEDS table""" - # Arrange - patient_id = "test-patient-456" - self.mock_get_items_from_patient_id.return_value = [] - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertFalse(result) - - # Verify get_items_from_patient_id was called - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_empty_id(self): - """Test with empty patient ID""" - # Arrange - patient_id = "" - self.mock_get_items_from_patient_id.return_value = [] - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertFalse(result) - - # Verify get_items_from_patient_id was called with empty ID - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_none_id(self): - """Test with None patient ID""" - # Arrange - patient_id = None - self.mock_get_items_from_patient_id.return_value = [] - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertFalse(result) - - # Verify get_items_from_patient_id was called with None ID - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_query_exception(self): - """Test exception handling during get_items_from_patient_id""" - # Arrange - patient_id = "test-patient-error" - test_exception = Exception("DynamoDB query failed") - self.mock_get_items_from_patient_id.side_effect = test_exception - - # Act & Assert - with self.assertRaises(Exception) as context: - ieds_db_operations.ieds_check_exist(patient_id) - - self.assertEqual(str(context.exception), "DynamoDB query failed") - - # Verify get_items_from_patient_id was attempted - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_multiple_items_found(self): - """Test when multiple items are found (should still return True)""" - # Arrange - patient_id = "test-patient-multiple" - mock_items = [ - {'PK': 'Patient#test-patient-multiple', 'PatientPK': 'Patient#test-patient-multiple'}, - {'PK': 'Patient#test-patient-multiple#record1', 'PatientPK': 'Patient#test-patient-multiple'} - ] - self.mock_get_items_from_patient_id.return_value = mock_items - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertTrue(result) - - # Verify get_items_from_patient_id was called with limit=1 - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) - - def test_ieds_check_exist_single_item_found(self): - """Test when exactly one item is found""" - # Arrange - patient_id = "test-patient-single" - mock_items = [{'PK': 'Patient#test-patient-single', 'PatientPK': 'Patient#test-patient-single'}] - self.mock_get_items_from_patient_id.return_value = mock_items - - # Act - result = ieds_db_operations.ieds_check_exist(patient_id) - - # Assert - self.assertTrue(result) + # Patch logger to suppress output + self.logger_patcher = patch('ieds_db_operations.logger') + self.mock_logger = self.logger_patcher.start() - # Verify get_items_from_patient_id was called - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) + # Patch get_ieds_table_name and get_ieds_table + self.get_ieds_table_name_patcher = patch('ieds_db_operations.get_ieds_table_name') + self.mock_get_ieds_table_name = self.get_ieds_table_name_patcher.start() + self.mock_get_ieds_table_name.return_value = 'test-table' - def test_ieds_check_exist_limit_parameter(self): - """Test that the function passes limit=1 to get_items_from_patient_id""" - # Arrange - patient_id = "test-patient-limit" - self.mock_get_items_from_patient_id.return_value = [] + self.get_ieds_table_patcher = patch('ieds_db_operations.get_ieds_table') + self.mock_get_ieds_table = self.get_ieds_table_patcher.start() - # Act - ieds_db_operations.ieds_check_exist(patient_id) + # Patch dynamodb client + self.dynamodb_client_patcher = patch('ieds_db_operations.dynamodb_client') + self.mock_dynamodb_client = self.dynamodb_client_patcher.start() - # Assert - Verify the limit parameter is correctly passed - self.mock_get_items_from_patient_id.assert_called_once_with(patient_id, 1) + def tearDown(self): + patch.stopall() - # ✅ Remove tests that are no longer relevant: - # - test_ieds_check_exist_missing_count_field (no longer uses Count) - # - test_ieds_check_exist_count_greater_than_one (no longer uses Count) + def test_ieds_update_patient_id_empty_inputs(self): + res = ieds_db_operations.ieds_update_patient_id('', '') + self.assertEqual(res['status'], 'error') + + def test_ieds_update_patient_id_same_ids(self): + res = ieds_db_operations.ieds_update_patient_id('a', 'a') + self.assertEqual(res['status'], 'success') + + def test_ieds_update_with_items_to_update_uses_provided_list(self): + items = [{'PK': 'Patient#1'}, {'PK': 'Patient#1#r2'}] + # patch transact_write_items to return success + self.mock_dynamodb_client.transact_write_items = MagicMock( + return_value={'ResponseMetadata': {'HTTPStatusCode': 200}}) + + res = ieds_db_operations.ieds_update_patient_id('1', '2', items_to_update=items) + self.assertEqual(res['status'], 'success') + # ensure transact called at least once + self.mock_dynamodb_client.transact_write_items.assert_called() + + def test_ieds_update_batches_multiple_calls(self): + # create 60 items to force 3 batches (25,25,10) + items = [{'PK': f'Patient#old#{i}'} for i in range(60)] + called = [] + + def fake_transact(TransactItems): + called.append(len(TransactItems)) + return {'ResponseMetadata': {'HTTPStatusCode': 200}} + + self.mock_dynamodb_client.transact_write_items = MagicMock(side_effect=fake_transact) + + res = ieds_db_operations.ieds_update_patient_id('old', 'new', items_to_update=items) + self.assertEqual(res['status'], 'success') + # should have been called 3 times + self.assertEqual(len(called), 3) + self.assertEqual(called[0], 25) + self.assertEqual(called[1], 25) + self.assertEqual(called[2], 10) + + def test_ieds_update_non_200_response(self): + items = [{'PK': 'Patient#1'}] + self.mock_dynamodb_client.transact_write_items = MagicMock( + return_value={'ResponseMetadata': {'HTTPStatusCode': 500}}) + + res = ieds_db_operations.ieds_update_patient_id('1', '2', items_to_update=items) + self.assertEqual(res['status'], 'error') diff --git a/lambdas/id_sync/tests/test_record_processor.py b/lambdas/id_sync/tests/test_record_processor.py index 0d301ce25..c0b0dba81 100644 --- a/lambdas/id_sync/tests/test_record_processor.py +++ b/lambdas/id_sync/tests/test_record_processor.py @@ -7,18 +7,23 @@ class TestRecordProcessor(unittest.TestCase): def setUp(self): """Set up test fixtures and mocks""" + # Patch logger self.logger_patcher = patch('record_processor.logger') self.mock_logger = self.logger_patcher.start() + # PDS helpers self.pds_get_patient_id_patcher = patch('record_processor.pds_get_patient_id') self.mock_pds_get_patient_id = self.pds_get_patient_id_patcher.start() - self.ieds_check_exist_patcher = patch('record_processor.ieds_check_exist') - self.mock_ieds_check_exist = self.ieds_check_exist_patcher.start() + self.pds_get_patient_details_patcher = patch('record_processor.pds_get_patient_details') + self.mock_pds_get_patient_details = self.pds_get_patient_details_patcher.start() self.ieds_update_patient_id_patcher = patch('record_processor.ieds_update_patient_id') self.mock_ieds_update_patient_id = self.ieds_update_patient_id_patcher.start() + self.get_items_from_patient_id_patcher = patch('record_processor.get_items_from_patient_id') + self.mock_get_items_from_patient_id = self.get_items_from_patient_id_patcher.start() + def tearDown(self): patch.stopall() @@ -26,59 +31,192 @@ def test_process_record_success_no_update_required(self): """Test successful processing when patient ID matches""" # Arrange test_id = "54321" - with patch('record_processor.ieds_check_exist', return_value=True): - - test_record = {"body": {"subject": test_id}} - self.mock_pds_get_patient_id.return_value = test_id + # Simulate IEDS items exist + self.mock_get_items_from_patient_id.return_value = [{"Resource": {}}] + test_record = {"body": {"subject": test_id}} + self.mock_pds_get_patient_id.return_value = test_id - # Act - result = process_record(test_record) + # Act + result = process_record(test_record) - # Assert - self.assertEqual(result["nhs_number"], test_id) - self.assertEqual(result["message"], "No update required") - self.assertEqual(result["status"], "success") + # Assert + self.assertEqual(result["nhs_number"], test_id) + self.assertEqual(result["message"], "No update required") + self.assertEqual(result["status"], "success") - # Verify calls - self.mock_pds_get_patient_id.assert_called_once_with(test_id) + # Verify calls + self.mock_pds_get_patient_id.assert_called_once_with(test_id) def test_process_record_success_update_required(self): - """Test successful processing when patient ID differs""" + """Test successful processing when patient ID differs and demographics match""" # Arrange - pds_id = "pds-id-1" - nhs_number = "nhs-number-1" + pds_id = "9000000008" + nhs_number = "9000000009" test_sqs_record = {"body": {"subject": nhs_number}} self.mock_pds_get_patient_id.return_value = pds_id + + # pds_get_patient_details should return details used by demographics_match + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["John"], "family": "Doe"}], + "gender": "male", + "birthDate": "1980-01-01", + } + + # Provide one IEDS item that will match demographics via demographics_match + matching_item = { + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "Pat1", + "name": [{"given": ["John"], "family": "Doe"}], + "gender": "male", + "birthDate": "1980-01-01", + } + ] + } + } + self.mock_get_items_from_patient_id.return_value = [matching_item] + success_response = {"status": "success"} self.mock_ieds_update_patient_id.return_value = success_response + # Act result = process_record(test_sqs_record) # Assert - expected_result = success_response - self.assertEqual(result, expected_result) - - # Verify calls + self.assertEqual(result, success_response) self.mock_pds_get_patient_id.assert_called_once_with(nhs_number) + def test_process_record_demographics_mismatch_skips_update(self): + """If no IEDS item matches demographics, the update should be skipped""" + # Arrange + pds_id = "pds-1" + nhs_number = "nhs-1" + test_sqs_record = {"body": {"subject": nhs_number}} + + self.mock_pds_get_patient_id.return_value = pds_id + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["Alice"], "family": "Smith"}], + "gender": "female", + "birthDate": "1995-05-05", + } + + # IEDS items exist but do not match demographics + non_matching_item = { + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "Pat2", + "name": [{"given": ["Bob"], "family": "Jones"}], + "gender": "male", + "birthDate": "1990-01-01", + } + ] + } + } + self.mock_get_items_from_patient_id.return_value = [non_matching_item] + + # Act + result = process_record(test_sqs_record) + + # Assert + self.assertEqual(result["status"], "success") + self.assertEqual(result["message"], "No records matched PDS demographics; update skipped") + + def test_invalid_body_parsing_returns_error(self): + """When body is a malformed string, process_record should return an error""" + bad_record = {"body": "not-a-json-or-python-literal"} + result = process_record(bad_record) + self.assertEqual(result["status"], "error") + self.assertIn("Invalid body format", result["message"]) + + def test_no_subject_in_body_returns_error(self): + """When body doesn't contain a subject, return an error""" + result = process_record({"body": {"other": "value"}}) + self.assertEqual(result["status"], "error") + self.assertIn("No NHS number found", result["message"]) + + def test_pds_details_exception_aborts_update(self): + """If fetching PDS details raises, function should return error""" + nhs_number = "nhs-exc-1" + test_sqs_record = {"body": {"subject": nhs_number}} + # pds returns a different id to force update path + self.mock_pds_get_patient_id.return_value = "pds-new" + self.mock_get_items_from_patient_id.return_value = [{"Resource": {}}] + self.mock_pds_get_patient_details.side_effect = Exception("pds fail") + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "error") + self.assertIn("Failed to fetch PDS details", result["message"]) + + def test_get_items_exception_aborts_update(self): + """If fetching IEDS items raises, function should return error""" + nhs_number = "nhs-exc-2" + test_sqs_record = {"body": {"subject": nhs_number}} + self.mock_pds_get_patient_id.return_value = "pds-new" + self.mock_get_items_from_patient_id.return_value = [{"Resource": {}}] + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["J"], "family": "K"}], + "gender": "male", "birthDate": "2000-01-01" + } + self.mock_get_items_from_patient_id.side_effect = Exception("dynamo fail") + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "error") + self.assertIn("Failed to fetch IEDS items", result["message"]) + + def test_update_called_on_match(self): + """Verify ieds_update_patient_id is called when demographics match""" + pds_id = "pds-match" + nhs_number = "nhs-match" + test_sqs_record = {"body": {"subject": nhs_number}} + self.mock_pds_get_patient_id.return_value = pds_id + self.mock_pds_get_patient_details.return_value = { + "name": [ + { + "given": ["Sarah"], + "family": "Fowley"} + ], + "gender": "male", + "birthDate": "1956-07-09" + } + item = { + "Resource": { + "resourceType": "Immunization", + "contained": [{ + "resourceType": "Patient", + "id": "PatM", + "name": [{"given": ["Sarah"], "family": "Fowley"}], + "gender": "male", "birthDate": "1956-07-09"} + ]} + } + self.mock_get_items_from_patient_id.return_value = [item] + self.mock_ieds_update_patient_id.return_value = {"status": "success"} + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "success") + self.mock_ieds_update_patient_id.assert_called_once_with(nhs_number, pds_id, items_to_update=[item]) + def test_process_record_no_records_exist(self): """Test when no records exist for the patient ID""" - # Arrange test_id = "12345" - with patch('record_processor.ieds_check_exist', return_value=False): + # Simulate no IEDS items + self.mock_get_items_from_patient_id.return_value = [] + test_record = {"body": {"subject": test_id}} - # Act - test_record = {"body": {"subject": test_id}} - result = process_record(test_record) + # Act + result = process_record(test_record) - # Assert - self.assertEqual(result["status"], "success") - self.assertEqual(result["message"], f"No records returned for ID: {test_id}") + self.assertEqual(result["message"], f"No records returned for ID: {test_id}") - # Verify PDS was not called - self.mock_pds_get_patient_id.assert_called_once() + # Verify PDS was not called + self.mock_pds_get_patient_id.assert_called_once() def test_process_record_pds_returns_none_id(self): """Test when PDS returns none """ @@ -86,11 +224,13 @@ def test_process_record_pds_returns_none_id(self): test_id = "12345a" self.mock_pds_get_patient_id.return_value = None test_record = {"body": {"subject": test_id}} + # Act & Assert result = process_record(test_record) self.assertEqual(result["status"], "success") - self.assertEqual(result["message"], f"No patient ID found for NHS number: {test_id}") - self.mock_ieds_check_exist.assert_not_called() + self.assertEqual(result["message"], "No patient ID found for NHS number") + # No IEDS lookups should have been attempted when PDS returns None + self.mock_get_items_from_patient_id.assert_not_called() self.mock_ieds_update_patient_id.assert_not_called() def test_process_record_ieds_returns_false(self): @@ -99,7 +239,9 @@ def test_process_record_ieds_returns_false(self): test_id = "12345a" pds_id = "pds-id-1" self.mock_pds_get_patient_id.return_value = pds_id - self.mock_ieds_check_exist.return_value = False + # Simulate no items returned from IEDS + self.mock_get_items_from_patient_id.return_value = [] + # Act & Assert result = process_record({"body": {"subject": test_id}}) self.assertEqual(result["status"], "success") @@ -112,9 +254,135 @@ def test_body_is_string(self): new_test_id = "nhs-number-2" self.mock_pds_get_patient_id.return_value = new_test_id + # Mock demographics so update proceeds + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["A"], "family": "B"}], + "gender": "female", "birthDate": "1990-01-01" + } + self.mock_get_items_from_patient_id.return_value = [{ + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "Pat3", + "name": [{"given": ["A"], "family": "B"}], + "gender": "female", "birthDate": "1990-01-01" + } + ] + } + }] self.mock_ieds_update_patient_id.return_value = {"status": "success"} # Act result = process_record(test_record) # Assert self.assertEqual(result["status"], "success") + + def test_process_record_birthdate_mismatch_skips_update(self): + """If birthDate differs between PDS and IEDS, update should be skipped""" + pds_id = "pds-2" + nhs_number = "nhs-2" + test_sqs_record = {"body": {"subject": nhs_number}} + + self.mock_pds_get_patient_id.return_value = pds_id + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["John"], "family": "Doe"}], + "gender": "male", + "birthDate": "1980-01-01", + } + + # IEDS has different birthDate + item = { + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "PatX", + "name": + [{ + "given": ["John"], + "family": "Doe" + }], + "gender": "male", + "birthDate": "1980-01-02" + } + ] + } + } + self.mock_get_items_from_patient_id.return_value = [item] + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "success") + self.assertEqual(result["message"], "No records matched PDS demographics; update skipped") + + def test_process_record_gender_mismatch_skips_update(self): + """If gender differs between PDS and IEDS, update should be skipped""" + pds_id = "pds-3" + nhs_number = "nhs-3" + test_sqs_record = {"body": {"subject": nhs_number}} + + self.mock_pds_get_patient_id.return_value = pds_id + self.mock_pds_get_patient_details.return_value = { + "name": [{"given": ["Alex"], "family": "Smith"}], + "gender": "female", + "birthDate": "1992-03-03", + } + + # IEDS has different gender + item = { + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "PatY", + "name": [{ + "given": ["Alex"], + "family": "Smith" + }], + "gender": "male", + "birthDate": "1992-03-03" + } + ] + } + } + self.mock_get_items_from_patient_id.return_value = [item] + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "success") + self.assertEqual(result["message"], "No records matched PDS demographics; update skipped") + + def test_process_record_no_comparable_fields_skips_update(self): + """If PDS provides no comparable fields, do not update (skip)""" + pds_id = "pds-4" + nhs_number = "nhs-4" + test_sqs_record = {"body": {"subject": nhs_number}} + + self.mock_pds_get_patient_id.return_value = pds_id + # PDS returns minimal/empty details + self.mock_pds_get_patient_details.return_value = {} + + item = { + "Resource": { + "resourceType": "Immunization", + "contained": [ + { + "resourceType": "Patient", + "id": "PatZ", + "name": [{ + "given": ["Zoe"], + "family": "Lee" + }], + "gender": "female", + "birthDate": "2000-01-01" + } + ] + } + } + self.mock_get_items_from_patient_id.return_value = [item] + + result = process_record(test_sqs_record) + self.assertEqual(result["status"], "success") + self.assertEqual(result["message"], "No records matched PDS demographics; update skipped")