-
Notifications
You must be signed in to change notification settings - Fork 4
NRL-1114 for nicip types allow multiple categories #907
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/imaging
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,11 +15,16 @@ | |
| RepositoryModel = TypeVar("RepositoryModel", bound=DynamoDBModel) | ||
|
|
||
|
|
||
| def _get_sk_ids_for_type(pointer_type: str) -> tuple: | ||
| def _get_sk_ids_for_type(pointer_type: str) -> tuple[str, str]: | ||
| if pointer_type not in TYPE_CATEGORIES: | ||
| raise ValueError(f"Cannot find category for pointer type: {pointer_type}") | ||
|
|
||
| category = TYPE_CATEGORIES[pointer_type] | ||
| categories = TYPE_CATEGORIES[pointer_type] | ||
| if isinstance(categories, str): | ||
| category = categories | ||
| else: | ||
| category = next(iter(categories)) | ||
|
|
||
| category_system, category_code = category.split("|") | ||
| if category_system not in SYSTEM_SHORT_IDS: | ||
| raise ValueError(f"Unknown system for category: {category_system}") | ||
|
|
@@ -33,6 +38,47 @@ def _get_sk_ids_for_type(pointer_type: str) -> tuple: | |
| return category_id, type_id | ||
|
|
||
|
|
||
| def _get_categories_for_pointer_types(pointer_types: list[str]) -> set[str]: | ||
| """Return all unique categories for the given pointer types.""" | ||
| category_set = set() | ||
| for pointer_type in pointer_types: | ||
| if pointer_type in TYPE_CATEGORIES: | ||
| cats = TYPE_CATEGORIES[pointer_type] | ||
| if isinstance(cats, str): | ||
| category_set.add(cats) | ||
| else: | ||
| category_set.update(cats) | ||
| return category_set | ||
|
|
||
|
|
||
| def _build_filter_expressions( | ||
| pointer_types, categories, expression_names, expression_values | ||
| ): | ||
| """Build DynamoDB filter expressions for pointer_types and categories.""" | ||
| filter_expressions = [] | ||
| if pointer_types: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For clarity, this will ALWAYS be true, as if the searcher does not specify a a pointer type filter in their search, pointer_types is populated by the list of pointers for which they have permissions. If they have no pointer types permitted, they never get to this point, as it is caught by the decorator and 403 is returned. |
||
| expression_names["#pointer_type"] = "type" | ||
| types_filters = [ | ||
| f"#pointer_type = :type_{i}" for i in range(len(pointer_types)) | ||
| ] | ||
| types_filter_values = { | ||
| f":type_{i}": pointer_types[i] for i in range(len(pointer_types)) | ||
| } | ||
| filter_expressions.append(f"({' OR '.join(types_filters)})") | ||
| expression_values.update(types_filter_values) | ||
| if categories: | ||
| expression_names["#category"] = "category" | ||
| category_filters = [ | ||
| f"#category = :category_{i}" for i in range(len(categories)) | ||
| ] | ||
| category_filter_values = { | ||
| f":category_{i}": categories[i] for i in range(len(categories)) | ||
| } | ||
| filter_expressions.append(f"({' OR '.join(category_filters)})") | ||
| expression_values.update(category_filter_values) | ||
| return filter_expressions | ||
|
|
||
|
|
||
| class Repository(ABC, Generic[RepositoryModel]): | ||
| ITEM_TYPE: Type[RepositoryModel] | ||
|
|
||
|
|
@@ -220,56 +266,36 @@ def search( | |
| pointer_types: Optional[List[str]] = [], | ||
| categories: Optional[List[str]] = [], | ||
| ) -> Iterator[DocumentPointer]: | ||
| """""" | ||
| logger.log( | ||
| LogReference.REPOSITORY020, | ||
| nhs_number=nhs_number, | ||
| custodian=custodian, | ||
| pointer_types=pointer_types, | ||
| categories=categories, | ||
| ) | ||
|
|
||
| key_conditions = ["patient_key = :patient_key"] | ||
| filter_expressions = [] | ||
| expression_names = {} | ||
| expression_values = {":patient_key": f"P#{nhs_number}"} | ||
|
|
||
| if len(pointer_types) == 1: | ||
| # Optimisation for single pointer type | ||
| category_id, type_id = _get_sk_ids_for_type(pointer_types[0]) | ||
| patient_sort = f"C#{category_id}#T#{type_id}" | ||
| key_conditions.append("begins_with(patient_sort, :patient_sort)") | ||
| expression_values[":patient_sort"] = patient_sort | ||
| else: | ||
| # Handle single/multiple categories and pointer types with filter expressions | ||
| if len(categories) == 1: | ||
| split_category = categories[0].split("|") | ||
| category_id = ( | ||
| SYSTEM_SHORT_IDS[split_category[0]] + "-" + split_category[1] | ||
| ) | ||
| patient_sort = f"C#{category_id}" | ||
| key_conditions.append("begins_with(patient_sort, :patient_sort)") | ||
| expression_values[":patient_sort"] = patient_sort | ||
|
|
||
| if len(categories) > 1: | ||
| expression_names["#category"] = "category" | ||
| category_filters = [ | ||
| f"#category = :category_{i}" for i in range(len(categories)) | ||
| ] | ||
| category_filter_values = { | ||
| f":category_{i}": categories[i] for i in range(len(categories)) | ||
| } | ||
| filter_expressions.append(f"({' OR '.join(category_filters)})") | ||
| expression_values.update(category_filter_values) | ||
|
|
||
| expression_names["#pointer_type"] = "type" | ||
| types_filters = [ | ||
| f"#pointer_type = :type_{i}" for i in range(len(pointer_types)) | ||
| ] | ||
| types_filter_values = { | ||
| f":type_{i}": pointer_types[i] for i in range(len(pointer_types)) | ||
| } | ||
| filter_expressions.append(f"({' OR '.join(types_filters)})") | ||
| expression_values.update(types_filter_values) | ||
| # Determine which filters to apply | ||
| if pointer_types and categories: | ||
| # Use both pointer_types and categories as filters | ||
| filter_expressions = _build_filter_expressions( | ||
| pointer_types, categories, expression_names, expression_values | ||
| ) | ||
| elif pointer_types and not categories: | ||
| # Get all categories for these pointer_types | ||
| all_categories = list(_get_categories_for_pointer_types(pointer_types)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Open question: Is there value to looking up all possible categories and adding filters for them, if type(s) already specified? |
||
| filter_expressions = _build_filter_expressions( | ||
| pointer_types, all_categories, expression_names, expression_values | ||
| ) | ||
| elif categories and not pointer_types: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to the above comment, pointer_types will always be populated (whether by a chosen filter or by the searcher's list of permitted pointers) so this will never get reached. |
||
| # Only categories provided | ||
| filter_expressions = _build_filter_expressions( | ||
| [], categories, expression_names, expression_values | ||
| ) | ||
|
|
||
| if custodian: | ||
| logger.log( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Open question: is it better to use sets for everything, even those types that map to only one category? Advantage: consistency, simplifies logic as we can use set union without having to check for type. Disadvantage: This branch diverges further from the main branch, and changes can't be ported over as easily.
Can be picked up later if this work proceeds.