Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,126 @@ def test_search_document_reference_happy_path_with_nicip_type(
}


@mock_aws
@mock_repository
def test_search_document_reference_happy_path_with_nicip_type_for_both_categories(
repository: DocumentPointerRepository,
):
doc_ref = load_document_reference("Y05868-736253002-Valid")
doc_ref.type.coding[0].code = "MAULR"
doc_ref.type.coding[0].system = "https://nicip.nhs.uk"
doc_ref.type.coding[0].display = "MRA Upper Limb Rt"
doc_ref.category[0].coding[0].code = "721981007"
doc_ref.category[0].coding[0].display = "Diagnostic Studies Report"
doc_pointer = DocumentPointer.from_document_reference(doc_ref)

repository.create(doc_pointer)

doc_ref2 = load_document_reference("Y05868-736253002-Valid")
doc_ref2.type.coding[0].code = "MAULR"
doc_ref2.type.coding[0].system = "https://nicip.nhs.uk"
doc_ref2.type.coding[0].display = "MRA Upper Limb Rt"
doc_ref2.id = "Y05868-736253002-Valid2"
doc_ref2.category[0].coding[0].code = "103693007"
doc_ref2.category[0].coding[0].display = "Diagnostic procedure"
doc_pointer2 = DocumentPointer.from_document_reference(doc_ref2)

repository.create(doc_pointer2)

event = create_test_api_gateway_event(
headers=create_headers(),
query_string_parameters={
"subject:identifier": "https://fhir.nhs.uk/Id/nhs-number|6700028191",
"type": "https://nicip.nhs.uk|MAULR",
},
)

result = handler(event, create_mock_context())
body = result.pop("body")

assert result == {
"statusCode": "200",
"headers": default_response_headers(),
"isBase64Encoded": False,
}

parsed_body = json.loads(body)
assert parsed_body == {
"resourceType": "Bundle",
"type": "searchset",
"link": [
{
"relation": "self",
"url": "https://pytest.api.service.nhs.uk/record-locator/consumer/FHIR/R4/DocumentReference?subject:identifier=https://fhir.nhs.uk/Id/nhs-number|6700028191&type=https://nicip.nhs.uk|MAULR",
}
],
"total": 2,
"entry": [
{"resource": doc_ref2.model_dump(exclude_none=True)},
{"resource": doc_ref.model_dump(exclude_none=True)},
],
}


@mock_aws
@mock_repository
def test_search_document_reference_happy_path_with_nicip_type_for_one_category(
repository: DocumentPointerRepository,
):
doc_ref = load_document_reference("Y05868-736253002-Valid")
doc_ref.type.coding[0].code = "MAULR"
doc_ref.type.coding[0].system = "https://nicip.nhs.uk"
doc_ref.type.coding[0].display = "MRA Upper Limb Rt"
doc_ref.category[0].coding[0].code = "721981007"
doc_ref.category[0].coding[0].display = "Diagnostic Studies Report"
doc_pointer = DocumentPointer.from_document_reference(doc_ref)

repository.create(doc_pointer)

doc_ref2 = load_document_reference("Y05868-736253002-Valid")
doc_ref2.type.coding[0].code = "MAULR"
doc_ref2.type.coding[0].system = "https://nicip.nhs.uk"
doc_ref2.type.coding[0].display = "MRA Upper Limb Rt"
doc_ref2.id = "Y05868-736253002-Valid2"
doc_ref2.category[0].coding[0].code = "103693007"
doc_ref2.category[0].coding[0].display = "Diagnostic procedure"
doc_pointer2 = DocumentPointer.from_document_reference(doc_ref2)

repository.create(doc_pointer2)

event = create_test_api_gateway_event(
headers=create_headers(),
query_string_parameters={
"subject:identifier": "https://fhir.nhs.uk/Id/nhs-number|6700028191",
"type": "https://nicip.nhs.uk|MAULR",
"category": "http://snomed.info/sct|721981007",
},
)

result = handler(event, create_mock_context())
body = result.pop("body")

assert result == {
"statusCode": "200",
"headers": default_response_headers(),
"isBase64Encoded": False,
}

parsed_body = json.loads(body)
assert parsed_body == {
"resourceType": "Bundle",
"type": "searchset",
"link": [
{
"relation": "self",
"url": "https://pytest.api.service.nhs.uk/record-locator/consumer/FHIR/R4/DocumentReference?subject:identifier=https://fhir.nhs.uk/Id/nhs-number|6700028191&type=https://nicip.nhs.uk|MAULR&category=http://snomed.info/sct|721981007",
}
],
"total": 1,
"entry": [{"resource": doc_ref.model_dump(exclude_none=True)}],
}


@mock_aws
@mock_repository
def test_search_document_reference_no_results(repository: DocumentPointerRepository):
Expand Down
10 changes: 8 additions & 2 deletions layer/nrlf/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,14 @@ def coding_value(self):
PointerTypes.SUMMARY_RECORD.value: Categories.CLINICAL_NOTE.value,
#
# Imaging
PointerTypes.MRA_UPPER_LIMB_ARTERY.value: Categories.DIAGNOSTIC_STUDIES_REPORT.value,
PointerTypes.MRI_AXILLA_BOTH.value: Categories.DIAGNOSTIC_PROCEDURE.value,
PointerTypes.MRA_UPPER_LIMB_ARTERY.value: {
Categories.DIAGNOSTIC_STUDIES_REPORT.value,
Categories.DIAGNOSTIC_PROCEDURE.value,
},
PointerTypes.MRI_AXILLA_BOTH.value: {
Categories.DIAGNOSTIC_PROCEDURE.value,
Categories.DIAGNOSTIC_STUDIES_REPORT.value,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: is it better to use sets for everything, even those types that map to only one category? Advantage: consistency, simplifies logic as we can use set union without having to check for type. Disadvantage: This branch diverges further from the main branch, and changes can't be ported over as easily.
Can be picked up later if this work proceeds.

}

PRACTICE_SETTING_VALUE_SET_URL = (
Expand Down
106 changes: 66 additions & 40 deletions layer/nrlf/core/dynamodb/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
RepositoryModel = TypeVar("RepositoryModel", bound=DynamoDBModel)


def _get_sk_ids_for_type(pointer_type: str) -> tuple:
def _get_sk_ids_for_type(pointer_type: str) -> tuple[str, str]:
if pointer_type not in TYPE_CATEGORIES:
raise ValueError(f"Cannot find category for pointer type: {pointer_type}")

category = TYPE_CATEGORIES[pointer_type]
categories = TYPE_CATEGORIES[pointer_type]
if isinstance(categories, str):
category = categories
else:
category = next(iter(categories))

category_system, category_code = category.split("|")
if category_system not in SYSTEM_SHORT_IDS:
raise ValueError(f"Unknown system for category: {category_system}")
Expand All @@ -33,6 +38,47 @@ def _get_sk_ids_for_type(pointer_type: str) -> tuple:
return category_id, type_id


def _get_categories_for_pointer_types(pointer_types: list[str]) -> set[str]:
"""Return all unique categories for the given pointer types."""
category_set = set()
for pointer_type in pointer_types:
if pointer_type in TYPE_CATEGORIES:
cats = TYPE_CATEGORIES[pointer_type]
if isinstance(cats, str):
category_set.add(cats)
else:
category_set.update(cats)
return category_set


def _build_filter_expressions(
pointer_types, categories, expression_names, expression_values
):
"""Build DynamoDB filter expressions for pointer_types and categories."""
filter_expressions = []
if pointer_types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clarity, this will ALWAYS be true, as if the searcher does not specify a a pointer type filter in their search, pointer_types is populated by the list of pointers for which they have permissions. If they have no pointer types permitted, they never get to this point, as it is caught by the decorator and 403 is returned.

expression_names["#pointer_type"] = "type"
types_filters = [
f"#pointer_type = :type_{i}" for i in range(len(pointer_types))
]
types_filter_values = {
f":type_{i}": pointer_types[i] for i in range(len(pointer_types))
}
filter_expressions.append(f"({' OR '.join(types_filters)})")
expression_values.update(types_filter_values)
if categories:
expression_names["#category"] = "category"
category_filters = [
f"#category = :category_{i}" for i in range(len(categories))
]
category_filter_values = {
f":category_{i}": categories[i] for i in range(len(categories))
}
filter_expressions.append(f"({' OR '.join(category_filters)})")
expression_values.update(category_filter_values)
return filter_expressions


class Repository(ABC, Generic[RepositoryModel]):
ITEM_TYPE: Type[RepositoryModel]

Expand Down Expand Up @@ -220,56 +266,36 @@ def search(
pointer_types: Optional[List[str]] = [],
categories: Optional[List[str]] = [],
) -> Iterator[DocumentPointer]:
""""""
logger.log(
LogReference.REPOSITORY020,
nhs_number=nhs_number,
custodian=custodian,
pointer_types=pointer_types,
categories=categories,
)

key_conditions = ["patient_key = :patient_key"]
filter_expressions = []
expression_names = {}
expression_values = {":patient_key": f"P#{nhs_number}"}

if len(pointer_types) == 1:
# Optimisation for single pointer type
category_id, type_id = _get_sk_ids_for_type(pointer_types[0])
patient_sort = f"C#{category_id}#T#{type_id}"
key_conditions.append("begins_with(patient_sort, :patient_sort)")
expression_values[":patient_sort"] = patient_sort
else:
# Handle single/multiple categories and pointer types with filter expressions
if len(categories) == 1:
split_category = categories[0].split("|")
category_id = (
SYSTEM_SHORT_IDS[split_category[0]] + "-" + split_category[1]
)
patient_sort = f"C#{category_id}"
key_conditions.append("begins_with(patient_sort, :patient_sort)")
expression_values[":patient_sort"] = patient_sort

if len(categories) > 1:
expression_names["#category"] = "category"
category_filters = [
f"#category = :category_{i}" for i in range(len(categories))
]
category_filter_values = {
f":category_{i}": categories[i] for i in range(len(categories))
}
filter_expressions.append(f"({' OR '.join(category_filters)})")
expression_values.update(category_filter_values)

expression_names["#pointer_type"] = "type"
types_filters = [
f"#pointer_type = :type_{i}" for i in range(len(pointer_types))
]
types_filter_values = {
f":type_{i}": pointer_types[i] for i in range(len(pointer_types))
}
filter_expressions.append(f"({' OR '.join(types_filters)})")
expression_values.update(types_filter_values)
# Determine which filters to apply
if pointer_types and categories:
# Use both pointer_types and categories as filters
filter_expressions = _build_filter_expressions(
pointer_types, categories, expression_names, expression_values
)
elif pointer_types and not categories:
# Get all categories for these pointer_types
all_categories = list(_get_categories_for_pointer_types(pointer_types))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question: Is there value to looking up all possible categories and adding filters for them, if type(s) already specified?

filter_expressions = _build_filter_expressions(
pointer_types, all_categories, expression_names, expression_values
)
elif categories and not pointer_types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the above comment, pointer_types will always be populated (whether by a chosen filter or by the searcher's list of permitted pointers) so this will never get reached.

# Only categories provided
filter_expressions = _build_filter_expressions(
[], categories, expression_names, expression_values
)

if custodian:
logger.log(
Expand Down
20 changes: 13 additions & 7 deletions layer/nrlf/core/tests/test_type_categories.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def test_pointer_types(pointer_type):

@pytest.mark.parametrize("category", Categories)
def test_pointer_category_has_types(category):
assert (
category.value in TYPE_CATEGORIES.values()
assert any(
category.value in cat_set for cat_set in TYPE_CATEGORIES.values()
), f"Pointer category {category.value} is not used by any type"


Expand All @@ -42,11 +42,17 @@ def test_type_category_type_is_known(type):
assert type in PointerTypes.list(), f"Unknown type {type} used in TYPE_CATEGORIES"


@pytest.mark.parametrize("category", TYPE_CATEGORIES.values())
def test_type_category_category_is_known(category):
assert (
category in Categories.list()
), f"Unknown category {category} used in TYPE_CATEGORIES"
@pytest.mark.parametrize("cat_set", TYPE_CATEGORIES.values())
def test_type_category_category_is_known(cat_set):
if isinstance(cat_set, (set, list, tuple)):
for category in cat_set:
assert (
category in Categories.list()
), f"Unknown category {category} used in TYPE_CATEGORIES"
else:
assert (
cat_set in Categories.list()
), f"Unknown category {cat_set} used in TYPE_CATEGORIES"


@pytest.mark.parametrize("category", Categories)
Expand Down
21 changes: 15 additions & 6 deletions layer/nrlf/core/tests/test_validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,11 @@ def test_validate_category_coding_display_mismatch(
matching_type_str = next(
(
type_str
for type_str in TYPE_CATEGORIES
if TYPE_CATEGORIES[type_str] == category_str
for type_str, cat_val in TYPE_CATEGORIES.items()
if (
(isinstance(cat_val, set) and category_str in cat_val)
or (isinstance(cat_val, str) and cat_val == category_str)
)
),
None,
)
Expand All @@ -417,11 +420,12 @@ def test_validate_category_coding_display_mismatch(
}

result = validator.validate(document_ref_data)

expected_issues = 2 if type_code == "https://nicip.nhs.uk|" else 1
issue_number = 1 if type_code == "https://nicip.nhs.uk|" else 0
assert result.is_valid is False
assert result.resource.id == "Y05868-99999-99999-999999"
assert len(result.issues) == 1
assert result.issues[0].model_dump(exclude_none=True) == {
assert len(result.issues) == expected_issues
assert result.issues[issue_number].model_dump(exclude_none=True) == {
"severity": "error",
"code": "business-rule",
"details": {
Expand Down Expand Up @@ -603,10 +607,15 @@ def test_validate_type_coding_display_mismatch(type_str: str, display: str):
}

# Find the category string that matches the category code to avoid that error
category_str = TYPE_CATEGORIES[type_str]
category_val = TYPE_CATEGORIES[type_str]
if isinstance(category_val, set):
category_str = next(iter(category_val))
else:
category_str = category_val
category_parts = category_str.split("|")
category_system = category_parts[0]
category_code = category_parts[1]

document_ref_data["category"][0] = {
"coding": [
{
Expand Down
Loading