Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
f7c5872
VED-755: refactor handler
Akol125 Sep 12, 2025
2f9d5e4
VED-755: refactor handler2
Akol125 Sep 12, 2025
bc9f4bc
VED-755: Refactor existing id_sync codebase
Akol125 Sep 12, 2025
13610c5
VED-755: refactoring record_processor
Akol125 Sep 12, 2025
bac82fe
resolve lint
Akol125 Sep 15, 2025
516ac9a
resolve lint2
Akol125 Sep 15, 2025
cd02066
add logic for demographics match and integrate with nhs number proces…
Akol125 Sep 15, 2025
b36d8ed
add unit test for demographics test
Akol125 Sep 15, 2025
001fda1
refactor all functions
Akol125 Sep 15, 2025
973c8a2
VED-755: Adding demographics logic
Akol125 Sep 16, 2025
7df9358
resolve failing test
Akol125 Sep 16, 2025
0644072
VED-755: Resolve lint issues
Akol125 Sep 16, 2025
4bb330a
VED-755: remove duplication
Akol125 Sep 16, 2025
af29a78
VED-755: remove duplication2
Akol125 Sep 16, 2025
876485f
VED-755: improve coverage
Akol125 Sep 16, 2025
f637a93
VED-755: refactor process_nhs_number
Akol125 Sep 16, 2025
31858cc
VED-755: added pagination, remove ieds_check_exist, modify test
Akol125 Sep 17, 2025
2ffffb7
refactor pagination
Akol125 Sep 17, 2025
9da9a17
VED-755: update records whose vaccination match
Akol125 Sep 17, 2025
d1cd6a7
Merge branch 'master' into VED-755-Handling-NHS-Confusions
Akol125 Sep 17, 2025
9c446b5
refactoring record processor.py
Akol125 Sep 18, 2025
a0381b7
Merge remote branch into VED-755-Handling-NHS-Confusions
Akol125 Sep 18, 2025
1ae026a
reduce cognitive complexity and use utils and constants
Akol125 Sep 18, 2025
8523b24
fix lint errors
Akol125 Sep 18, 2025
1941dba
VED-765: review based on changes
Akol125 Sep 18, 2025
c930608
VED-755: code review changes
Akol125 Sep 18, 2025
5dd0764
VED-755: code review
Akol125 Sep 18, 2025
e1461b4
Merge branch 'master' into VED-755-Handling-NHS-Confusions
Akol125 Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions lambdas/id_sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,4 @@

- Code is located in the `lambdas/id_sync/src/` directory.
- Unit tests are in the `lambdas/id_sync/tests/` directory.
- Use the provided Makefile and Dockerfile for building, testing, and packaging.

## License

This project is maintained by NHS. See [LICENSE](../LICENSE) for details.
- Use the provided Makefile and Dockerfile for building, testing, and packaging.
66 changes: 35 additions & 31 deletions lambdas/id_sync/src/id_sync.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,50 @@
from common.clients import logger
from common.clients import STREAM_NAME
"""
- Parses the incoming AWS event into `AwsLambdaEvent` and iterate its `records`.
- Delegate each record to `process_record` and collect `nhs_number` from each result.
- If any record has status == "error" raise `IdSyncException` with aggregated nhs_numbers.
- Any unexpected error is wrapped into `IdSyncException(message="Error processing id_sync event")`.
"""

from typing import Any, Dict
from common.clients import logger, STREAM_NAME
from common.log_decorator import logging_decorator
from common.aws_lambda_event import AwsLambdaEvent
from exceptions.id_sync_exception import IdSyncException
from record_processor import process_record
'''
Lambda function handler for processing SQS events.Lambda for ID Sync. Fired by SQS
'''


@logging_decorator(prefix="id_sync", stream_name=STREAM_NAME)
def handler(event_data, _):

def handler(event_data: Dict[str, Any], _context) -> Dict[str, Any]:
try:
logger.info("id_sync handler invoked")
event = AwsLambdaEvent(event_data)
record_count = len(event.records)
if record_count > 0:
logger.info("id_sync processing event with %d records", record_count)
error_count = 0
nhs_numbers = []
for record in event.records:
record_result = process_record(record)
nhs_numbers.append(record_result["nhs_number"])
if record_result["status"] == "error":
error_count += 1
if error_count > 0:
raise IdSyncException(message=f"Processed {record_count} records with {error_count} errors",
nhs_numbers=nhs_numbers)

else:
response = {"status": "success",
"message": f"Successfully processed {record_count} records",
"nhs_numbers": nhs_numbers}
else:
response = {"status": "success", "message": "No records found in event"}
records = event.records

if not records:
return {"status": "success", "message": "No records found in event"}

logger.info("id_sync processing event with %d records", len(records))

results = [process_record(record) for record in records]
nhs_numbers = [result["nhs_number"] for result in results]
error_count = sum(1 for result in results if result.get("status") == "error")

if error_count:
raise IdSyncException(message=f"Processed {len(records)} records with {error_count} errors",
nhs_numbers=nhs_numbers)

response = {
"status": "success",
"message": f"Successfully processed {len(records)} records",
"nhs_numbers": nhs_numbers
}

logger.info("id_sync handler completed: %s", response)
return response

except IdSyncException as e:
logger.exception(f"id_sync error: {e.message}")
raise e
except Exception as e:
raise
except Exception:
msg = "Error processing id_sync event"
logger.exception(msg)
raise IdSyncException(message=msg, exception=e)
raise IdSyncException(message=msg)
227 changes: 143 additions & 84 deletions lambdas/id_sync/src/ieds_db_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from os_vars import get_ieds_table_name
from common.aws_dynamodb import get_dynamodb_table
from common.clients import logger, dynamodb_client
from utils import make_status
from exceptions.id_sync_exception import IdSyncException

ieds_table = None
BATCH_SIZE = 25 # DynamoDB TransactWriteItems max batch size


def get_ieds_table():
Expand All @@ -16,97 +18,47 @@ def get_ieds_table():
return ieds_table


def ieds_check_exist(id: str) -> bool:
"""Check if a record exists in the IEDS table for the given ID."""
logger.info(f"Check Id exists ID: {id}")
items = get_items_from_patient_id(id, 1)

if items or len(items) > 0:
logger.info(f"Found patient ID: {id}")
return True
return False


BATCH_SIZE = 25


def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
def ieds_update_patient_id(old_id: str, new_id: str, items_to_update: list | None = None) -> dict:
"""Update the patient ID in the IEDS table."""
logger.info(f"ieds_update_patient_id. Update patient ID from {old_id} to {new_id}")
if not old_id or not new_id or not old_id.strip() or not new_id.strip():
return {"status": "error", "message": "Old ID and New ID cannot be empty"}
return make_status("Old ID and New ID cannot be empty", old_id, "error")

if old_id == new_id:
return {"status": "success", "message": f"No change in patient ID: {old_id}"}
return make_status(f"No change in patient ID: {old_id}", old_id)

try:
logger.info(f"Updating patient ID in IEDS from {old_id} to {new_id}")

new_patient_pk = f"Patient#{new_id}"

logger.info("Getting items to update in IEDS table...")
items_to_update = get_items_from_patient_id(old_id)
if items_to_update is None:
logger.info("Getting items to update in IEDS table...")
items_to_update = get_items_from_patient_id(old_id)
else:
logger.info("Using provided items_to_update list, size=%d", len(items_to_update))

if not items_to_update:
logger.warning(f"No items found to update for patient ID: {old_id}")
return {
"status": "success",
"message": f"No items found to update for patient ID: {old_id}"
}

transact_items = []
return make_status(f"No items found to update for patient ID: {old_id}", old_id)

logger.info(f"Items to update: {len(items_to_update)}")
ieds_table_name = get_ieds_table_name()
for item in items_to_update:
transact_items.append({
'Update': {
'TableName': ieds_table_name,
'Key': {
'PK': {'S': item['PK']},
},
'UpdateExpression': 'SET PatientPK = :new_val',
'ExpressionAttributeValues': {
':new_val': {'S': new_patient_pk}
}
}
})

logger.info("Transacting items in IEDS table...")
# success tracking
all_batches_successful = True
total_batches = 0

# Batch transact in chunks of BATCH_SIZE
for i in range(0, len(transact_items), BATCH_SIZE):
batch = transact_items[i:i+BATCH_SIZE]
total_batches += 1
logger.info(f"Transacting batch {total_batches} of size: {len(batch)}")

response = dynamodb_client.transact_write_items(TransactItems=batch)
logger.info("Batch update complete. Response: %s", response)
# Build transact items and execute them in batches via helpers to keep
# the top-level function easy to read and test.
transact_items = build_transact_items(old_id, new_id, items_to_update)

# Check each batch response
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
all_batches_successful = False
logger.error(
f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}")
all_batches_successful, total_batches = execute_transaction_in_batches(transact_items)

# Consolidated response handling
logger.info(
f"All batches complete. Total batches: {total_batches}, All successful: {all_batches_successful}")

if all_batches_successful:
return {
"status": "success",
"message":
f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}."
}
return make_status(
f"IEDS update, patient ID: {old_id}=>{new_id}. {len(items_to_update)} updated {total_batches}.",
old_id,
)
else:
return {
"status": "error",
"message": f"Failed to update some batches for patient ID: {old_id}"
}
return make_status(f"Failed to update some batches for patient ID: {old_id}", old_id, "error")

except Exception as e:
logger.exception("Error updating patient ID")
Expand All @@ -118,26 +70,133 @@ def ieds_update_patient_id(old_id: str, new_id: str) -> dict:
)


def get_items_from_patient_id(id: str, limit=BATCH_SIZE) -> list:
"""Get all items for patient ID."""
logger.info(f"Getting items for patient id: {id}")
def get_items_from_patient_id(id: str) -> list:
"""Public wrapper: build PatientPK and return all matching items.

Delegates actual paging to the internal helper `_paginate_items_for_patient_pk`.
Raises IdSyncException on error.
"""
logger.info("Getting items for patient id: %s", id)
patient_pk = f"Patient#{id}"
try:
response = get_ieds_table().query(
IndexName='PatientGSI', # query the GSI
KeyConditionExpression=Key('PatientPK').eq(patient_pk),
Limit=limit
)

if 'Items' not in response or not response['Items']:
logger.warning(f"No items found for patient PK: {patient_pk}")
return []

return response['Items']
return paginate_items_for_patient_pk(patient_pk)
except IdSyncException:
raise
except Exception as e:
logger.exception(f"Error querying items for patient PK: {patient_pk}")
logger.exception("Error querying items for patient PK: %s", patient_pk)
raise IdSyncException(
message=f"Error querying items for patient PK: {patient_pk}",
nhs_numbers=[patient_pk],
exception=e
exception=e,
)


def paginate_items_for_patient_pk(patient_pk: str) -> list:
"""Internal helper that pages through the PatientGSI and returns all items.

Raises IdSyncException when the DynamoDB response is malformed.
"""
all_items: list = []
last_evaluated_key = None
while True:
query_args = {
"IndexName": "PatientGSI",
"KeyConditionExpression": Key('PatientPK').eq(patient_pk),
}
if last_evaluated_key:
query_args["ExclusiveStartKey"] = last_evaluated_key

response = get_ieds_table().query(**query_args)

if "Items" not in response:
# Unexpected DynamoDB response shape - surface as IdSyncException
logger.exception("Unexpected DynamoDB response: missing 'Items'")
raise IdSyncException(
message="No Items in DynamoDB response",
nhs_numbers=[patient_pk],
exception=response,
)

items = response.get("Items", [])
all_items.extend(items)

last_evaluated_key = response.get("LastEvaluatedKey")
if not last_evaluated_key:
break

if not all_items:
logger.warning("No items found for patient PK: %s", patient_pk)
return []

return all_items


def extract_patient_resource_from_item(item: dict) -> dict | None:
"""
Extract a Patient resource dict from an IEDS database.
"""
patient_resource = item.get("Resource", None)
if not isinstance(patient_resource, dict):
return None

for response in patient_resource.get("contained", []):
if isinstance(response, dict) and response.get("resourceType") == "Patient":
return response

return None


def build_transact_items(old_id: str, new_id: str, items_to_update: list) -> list:
"""Construct the list of TransactItems for DynamoDB TransactWriteItems.

Each item uses a conditional expression to ensure PatientPK hasn't changed
since it was read.
"""
transact_items = []
ieds_table_name = get_ieds_table_name()
new_patient_pk = f"Patient#{new_id}"

for item in items_to_update:
old_patient_pk = item.get('PatientPK', f"Patient#{old_id}")

transact_items.append({
'Update': {
'TableName': ieds_table_name,
'Key': {
'PK': {'S': item['PK']},
},
'UpdateExpression': 'SET PatientPK = :new_val',
"ConditionExpression": "PatientPK = :expected_old",
'ExpressionAttributeValues': {
':new_val': {'S': new_patient_pk},
':expected_old': {'S': old_patient_pk}
}
}
})

return transact_items


def execute_transaction_in_batches(transact_items: list) -> tuple:
"""Execute transact write items in batches of BATCH_SIZE.

Returns (all_batches_successful: bool, total_batches: int).
"""
all_batches_successful = True
total_batches = 0

for i in range(0, len(transact_items), BATCH_SIZE):
batch = transact_items[i:i+BATCH_SIZE]
total_batches += 1
logger.info(f"Transacting batch {total_batches} of size: {len(batch)}")

response = dynamodb_client.transact_write_items(TransactItems=batch)
logger.info("Batch update complete. Response: %s", response)

# Check each batch response
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
all_batches_successful = False
logger.error(
f"Batch {total_batches} failed with status: {response['ResponseMetadata']['HTTPStatusCode']}")

return all_batches_successful, total_batches
3 changes: 2 additions & 1 deletion lambdas/id_sync/src/pds_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
safe_tmp_dir = tempfile.mkdtemp(dir="/tmp") # NOSONAR


# Get Patient details from external service PDS using NHS number from MNS notification
def pds_get_patient_details(nhs_number: str) -> dict:
try:
logger.info(f"get patient details. nhs_number: {nhs_number}")
Expand All @@ -34,6 +35,7 @@ def pds_get_patient_details(nhs_number: str) -> dict:
raise IdSyncException(message=msg, exception=e)


# Extract Patient identifier value from PDS patient details
def pds_get_patient_id(nhs_number: str) -> str:
"""
Get PDS patient ID from NHS number.
Expand All @@ -48,7 +50,6 @@ def pds_get_patient_id(nhs_number: str) -> str:

return patient_details["identifier"][0]["value"]

# ✅ Remove the IdSyncException catch since you're just re-raising
except Exception as e:
msg = f"Error getting PDS patient ID for {nhs_number}"
logger.exception(msg)
Expand Down
Loading
Loading