diff --git a/scripts/delete_pointers_by_id.py b/scripts/delete_pointers_by_id.py new file mode 100755 index 000000000..a7ffc70df --- /dev/null +++ b/scripts/delete_pointers_by_id.py @@ -0,0 +1,404 @@ +#!/usr/bin/env python +import json +import os +import tempfile +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, List + +import boto3 +import fire + +dynamodb = boto3.client("dynamodb") + + +def _load_pointers_from_file(pointers_file: str) -> list[str]: + """ + Read pointers from a file. Supports: + - JSON array of objects with an "id" field + - line-delimited plain text (one id per line) + + Returns a list of pointer id strings. Prints a warning for skipped malformed JSON entries. + """ + with open(pointers_file, "r") as file: + content = file.read().strip() + + if not content: + return [] + + if content.startswith("[") or content.startswith("{"): + return _parse_json_pointers(content, pointers_file) + + return _parse_plain_text_pointers(content) + + +def _parse_json_pointers(content: str, pointers_file: str) -> list[str]: + + try: + data = json.loads(content) + except json.JSONDecodeError as e: + raise ValueError(f"Failed to parse JSON file {pointers_file}: {e}") from e + + if not isinstance(data, list): + raise ValueError("JSON file must contain an array of objects") + + parsed_ids: list[str] = [] + skipped_count = 0 + + for item in data: + if _is_valid_pointer(item): + parsed_ids.append(item["id"].strip()) + else: + skipped_count += 1 + + if skipped_count: + print( + f"Warning: skipped {skipped_count} malformed entries in JSON file {pointers_file}" + ) + + return parsed_ids + + +def _is_valid_pointer(item: Any) -> bool: + return ( + isinstance(item, dict) + and "id" in item + and isinstance(item["id"], str) + and item["id"].strip() + ) + + +def _parse_plain_text_pointers(content: str) -> list[str]: + return [line.strip() for line in content.splitlines() if line.strip()] + + +@dataclass +class PointerDeletionContext: + pointers_to_delete: list[str] + ods_code: str + matched_pointers: list[str] + mismatched_pointers: list[str] + not_found_pointers: list[str] + pointers_deleted: list[str] + failed_deletes: list[str] + start_time: datetime + end_time: datetime + output_filename: str + + +def _build_and_write_result(ctx: PointerDeletionContext) -> Dict[str, Any]: + + result = { + "pointers_to_delete": len(ctx.pointers_to_delete), + "ods_code": ctx.ods_code, + "ods_code_matched": { + "count": len(ctx.matched_pointers), + "ids": ctx.matched_pointers, + }, + "ods_code_mismatched": { + "count": len(ctx.mismatched_pointers), + "ids": ctx.mismatched_pointers, + }, + "pointer_not_found": { + "count": len(ctx.not_found_pointers), + "ids": ctx.not_found_pointers, + }, + "deleted_pointers": { + "count": len(ctx.pointers_deleted), + "ids": ctx.pointers_deleted, + }, + "failed_deletes": {"count": len(ctx.failed_deletes), "ids": ctx.failed_deletes}, + "deletes-took-secs": timedelta.total_seconds(ctx.end_time - ctx.start_time), + "output_filename": ctx.output_filename, + } + + script_dir = os.path.dirname(os.path.abspath(__file__)) or "." + output_file_path = os.path.join(script_dir, ctx.output_filename) + + try: + _write_result_file(result, output_file_path) + except Exception as exc: + result["_output_error"] = ( + f"Failed to write result file {ctx.output_filename}: {exc}" + ) + _print_summary(result) + return result + + +def _write_result_file(result: Dict[str, Any], output_file: str) -> None: + + out_dir = os.path.dirname(os.path.abspath(output_file)) or "." + with tempfile.NamedTemporaryFile( + "w", delete=False, dir=out_dir, prefix=".tmp_delete_results_", suffix=".json" + ) as tf: + json.dump(result, tf, indent=2) + tf.flush() + os.fsync(tf.fileno()) + os.replace(tf.name, output_file) + + +def _print_summary(result: Dict[str, Any]) -> None: + + def count_from(field): + val = result.get(field) + if isinstance(val, dict): + return val.get("count", 0) + if isinstance(val, list): + return len(val) + return 0 + + print("*******************************************************") + print("Summary:") + print(f" pointers_to_delete: {result.get('pointers_to_delete')}") + print(f" ods_code_matched: {count_from('ods_code_matched')}") + print(f" ods_code_mismatched:{count_from('ods_code_mismatched')}") + print(f" pointer_not_found: {count_from('pointer_not_found')}") + print(f" deleted_pointers: {count_from('deleted_pointers')}") + print(f" failed_deletes: {count_from('failed_deletes')}") + if "deletes-took-secs" in result: + print(f" deletes-took-secs: {result.get('deletes-took-secs')}") + + if "_output_error" in result: + print(f" output_error: {result['_output_error']}") + elif "output_filename" in result: + print(f" See output file for full results: {result.get('output_filename')}") + print("*******************************************************") + + +def _check_pointers_match_ods_code( + ods_code: str, pointer_ids: List[str] +) -> tuple[List[str], List[str]]: + + matched = [] + mismatched = [] + + for pointer_id in pointer_ids: + if pointer_id.startswith(f"{ods_code}-"): + matched.append(pointer_id) + else: + mismatched.append(pointer_id) + + return matched, mismatched + + +def _batch_get_existing_pointers( + table_name: str, pointer_ids: List[str] +) -> tuple[List[str], List[str]]: + """ + Check which pointers exist using BatchGetItem (max 100 items per request). + Returns (existing_ids, not_found_ids) + """ + existing = [] + not_found = [] + + for batch_idx in range(0, len(pointer_ids), 100): + batch_ids = pointer_ids[batch_idx : batch_idx + 100] + + keys = [ + { + "pk": {"S": f"D#{pointer_id}"}, + "sk": {"S": f"D#{pointer_id}"}, + } + for pointer_id in batch_ids + ] + + response = dynamodb.batch_get_item(RequestItems={table_name: {"Keys": keys}}) + + found_ids = { + item["pk"]["S"][2:] + for item in response.get("Responses", {}).get(table_name, []) + } + + for pointer_id in batch_ids: + if pointer_id in found_ids: + existing.append(pointer_id) + else: + not_found.append(pointer_id) + + return existing, not_found + + +def _batch_delete_pointers( + table_name: str, pointer_ids: List[str] +) -> tuple[List[str], List[str]]: + """ + Delete pointers using BatchWriteItem (max 25 items per request). + """ + pointers_deleted = [] + failed_deletes_set: set[str] = set() + + for _batch_id in range(0, len(pointer_ids), 25): + batch_ptrs = pointer_ids[_batch_id : _batch_id + 25] + + batch = [ + { + "DeleteRequest": { + "Key": { + "pk": {"S": f"D#{pointer_id}"}, + "sk": {"S": f"D#{pointer_id}"}, + } + } + } + for pointer_id in batch_ptrs + ] + + result = dynamodb.batch_write_item(RequestItems={table_name: batch}) + unprocessed = result.get("UnprocessedItems", {}).get(table_name, []) + + # Collect unprocessed IDs + for item in unprocessed: + pk_val = item["DeleteRequest"]["Key"]["pk"]["S"] + failed_deletes_set.add(pk_val[2:]) # Remove "D#" + + # Only count successfully deleted items (batch size minus unprocessed) + successfully_deleted = [p for p in batch_ptrs if p not in failed_deletes_set] + pointers_deleted.extend(successfully_deleted) + + if len(pointers_deleted) % 1000 == 0 and len(pointers_deleted) > 0: + print(".", end="", flush=True) + + return pointers_deleted, sorted(failed_deletes_set) + + +def _delete_pointers_by_id( + table_name: str, + ods_code: str, + pointers_to_delete: List[str] | None = None, + pointers_file: str | None = None, +) -> None: + """ + Delete pointers from DynamoDB table. + + Can accept pointers as: + - list of strings: pointers_to_delete=["id1", "id2"] + - JSON file: pointers_file=/path/to/pointers.json (array of objects with "id" field) + - text file: pointers_file=/path/to/ids.txt (one id per line) + + Parameters: + - table_name: DynamoDB table name + - ods_code: ODS code of the organisation that the pointers belong to + - pointers_to_delete: list of pointer ids to delete + - pointers_file: path to JSON file (array of objects with "id" field) or text file (one id per line) + + Sample usage: + - Delete by list of ids: + python delete_pointers_by_id.py --table_name MyTable --ods_code ABC123 --pointers_to_delete '["ABC123-12345678910", "ABC123-109876543210"]' + - Delete by JSON file: + python delete_pointers_by_id.py --table_name MyTable --ods_code ABC123 --pointers_file /path/to/pointers.json + - Delete by text file: + python delete_pointers_by_id.py --table_name MyTable --ods_code ABC123 --pointers_file /path/to/ids.txt + """ + if pointers_to_delete is None and pointers_file is None: + raise ValueError("Provide either pointers_to_delete or pointers_file") + + if pointers_to_delete is not None and pointers_file is not None: + raise ValueError("Provide either pointers_to_delete or pointers_file, not both") + + if pointers_file: + pointers_to_delete = _load_pointers_from_file(pointers_file) + + start_time = datetime.now(tz=timezone.utc) + timestamp = start_time.strftime("%Y%m%dT%H%M%SZ") + output_filename = f"delete_results_{ods_code}_{timestamp}.json" + + if not pointers_to_delete: + end_time = datetime.now(tz=timezone.utc) + _build_and_write_result( + PointerDeletionContext( + pointers_to_delete=pointers_to_delete, + ods_code=ods_code, + matched_pointers=[], + mismatched_pointers=[], + not_found_pointers=[], + pointers_deleted=[], + failed_deletes=[], + start_time=start_time, + end_time=end_time, + output_filename=output_filename, + ) + ) + return + + print( + f"Validating {len(pointers_to_delete)} pointers against ODS code {ods_code}..." + ) + matched_pointers, mismatched_pointers = _check_pointers_match_ods_code( + ods_code, pointers_to_delete + ) + + print( + f"Validate pointer's ODS code: {len(matched_pointers)} matched, {len(mismatched_pointers)} mismatched" + ) + + if not matched_pointers: + print(f"None of the pointer IDs are a match for ODS code {ods_code}. Exiting.") + end_time = datetime.now(tz=timezone.utc) + _build_and_write_result( + PointerDeletionContext( + pointers_to_delete=pointers_to_delete, + ods_code=ods_code, + matched_pointers=matched_pointers, + mismatched_pointers=mismatched_pointers, + not_found_pointers=[], + pointers_deleted=[], + failed_deletes=[], + start_time=start_time, + end_time=end_time, + output_filename=output_filename, + ) + ) + return + + print(f"Checking existence of {len(matched_pointers)} pointers in {table_name}...") + existing_pointers, not_found_pointers = _batch_get_existing_pointers( + table_name, matched_pointers + ) + + print( + f"Found {len(existing_pointers)} existing pointers to delete, {len(not_found_pointers)} not found." + ) + + if not existing_pointers: + print("No pointers found to delete. Exiting.") + end_time = datetime.now(tz=timezone.utc) + _build_and_write_result( + PointerDeletionContext( + pointers_to_delete=pointers_to_delete, + ods_code=ods_code, + matched_pointers=matched_pointers, + mismatched_pointers=mismatched_pointers, + not_found_pointers=not_found_pointers, + pointers_deleted=[], + failed_deletes=[], + start_time=start_time, + end_time=end_time, + output_filename=output_filename, + ) + ) + return + + # Proceed with deletion using BatchWriteItem + pointers_deleted, failed_deletes = _batch_delete_pointers( + table_name, existing_pointers + ) + + end_time = datetime.now(tz=timezone.utc) + _build_and_write_result( + PointerDeletionContext( + pointers_to_delete=pointers_to_delete, + ods_code=ods_code, + matched_pointers=matched_pointers, + mismatched_pointers=mismatched_pointers, + not_found_pointers=not_found_pointers, + pointers_deleted=pointers_deleted, + failed_deletes=failed_deletes, + start_time=start_time, + end_time=end_time, + output_filename=output_filename, + ) + ) + print(" Done") + + +if __name__ == "__main__": + fire.Fire(_delete_pointers_by_id) diff --git a/scripts/tests/test_delete_pointers_by_id.py b/scripts/tests/test_delete_pointers_by_id.py new file mode 100644 index 000000000..23795f432 --- /dev/null +++ b/scripts/tests/test_delete_pointers_by_id.py @@ -0,0 +1,361 @@ +import json +from datetime import datetime, timezone +from unittest.mock import MagicMock, call, patch + +import pytest +from delete_pointers_by_id import ( + PointerDeletionContext, + _batch_delete_pointers, + _batch_get_existing_pointers, + _build_and_write_result, + _check_pointers_match_ods_code, + _delete_pointers_by_id, + _is_valid_pointer, + _load_pointers_from_file, + _parse_json_pointers, + _parse_plain_text_pointers, +) + + +class TestLoadPointersFromFile: + @patch("builtins.open", create=True) + def test_load_json_array(self, mock_open): + mock_open.return_value.__enter__.return_value.read.return_value = ( + '[{"id": "G3H9E-id1"}, {"id": "G3H9E-id2"}]' + ) + result = _load_pointers_from_file("./pointers.json") + assert result == ["G3H9E-id1", "G3H9E-id2"] + + @patch("builtins.open", create=True) + def test_load_plain_text_file(self, mock_open): + mock_open.return_value.__enter__.return_value.read.return_value = ( + "G3H9E-id1\nG3H9E-id2\nG3H9E-id3" + ) + result = _load_pointers_from_file("./ids.txt") + assert result == ["G3H9E-id1", "G3H9E-id2", "G3H9E-id3"] + + @patch("builtins.open", create=True) + def test_load_empty_file(self, mock_open): + mock_open.return_value.__enter__.return_value.read.return_value = "" + result = _load_pointers_from_file("./empty.txt") + assert result == [] + + +class TestParseJsonPointers: + def test_invalid_json(self): + content = '[{"id": "G3H9E-id1"' + with pytest.raises(ValueError, match="Failed to parse JSON file"): + _parse_json_pointers(content, "./invalid.json") + + def test_json_not_array(self): + content = '{"id": "G3H9E-id1"}' + with pytest.raises( + ValueError, match="JSON file must contain an array of objects" + ): + _parse_json_pointers(content, "./not_array.json") + + def test_valid_json_array(self): + content = '[{"id": "G3H9E-id1"}, {"id": "G3H9E-id2"}]' + result = _parse_json_pointers(content, "./pointers.json") + assert result == ["G3H9E-id1", "G3H9E-id2"] + + def test_json_with_malformed_entries(self): + content = '[{"id": "G3H9E-id1"}, {"patient_number": "123"}, {"id": ""}, {"id": "G3H9E-id2"}]' + result = _parse_json_pointers(content, "./pointers.json") + assert result == ["G3H9E-id1", "G3H9E-id2"] + + +class TestIsValidPointer: + def test_valid_pointer(self): + item = {"id": "G3H9E-id1"} + assert _is_valid_pointer(item) + + def test_missing_id_field(self): + item = {"patient_number": "123"} + assert not _is_valid_pointer(item) + + def test_empty_id_field(self): + item = {"id": ""} + assert not _is_valid_pointer(item) + + def test_non_string_id_field(self): + item = {"id": 123} + assert not _is_valid_pointer(item) + + def test_non_dict_item(self): + item = ["id", "G3H9E-id1"] + assert not _is_valid_pointer(item) + + +class TestParsePlainTextPointers: + def test_valid_plain_text(self): + content = "G3H9E-id1\nG3H9E-id2\nG3H9E-id3" + result = _parse_plain_text_pointers(content) + assert result == ["G3H9E-id1", "G3H9E-id2", "G3H9E-id3"] + + def test_plain_text_with_empty_lines(self): + content = "G3H9E-id1\n\nG3H9E-id2\n \nG3H9E-id3" + result = _parse_plain_text_pointers(content) + assert result == ["G3H9E-id1", "G3H9E-id2", "G3H9E-id3"] + + def test_empty_plain_text(self): + content = "" + result = _parse_plain_text_pointers(content) + assert result == [] + + +class TestBuildAndWriteResult: + @patch("delete_pointers_by_id._write_result_file") + @patch("delete_pointers_by_id._print_summary") + def test_build_and_write_result_success(self, mock_print, mock_write): + start_time = datetime.now(tz=timezone.utc) + end_time = datetime.now(tz=timezone.utc) + ctx = PointerDeletionContext( + pointers_to_delete=["G3H9E-1", "G3H9E-2"], + ods_code="G3H9E", + matched_pointers=["G3H9E-1", "G3H9E-2"], + mismatched_pointers=[], + not_found_pointers=[], + pointers_deleted=["G3H9E-1", "G3H9E-2"], + failed_deletes=[], + start_time=start_time, + end_time=end_time, + output_filename="delete_results_G3H9E_20231125T120000Z.json", + ) + result = _build_and_write_result(ctx) + + assert result["pointers_to_delete"] == 2 + assert result["deleted_pointers"]["count"] == 2 + assert result["ods_code"] == "G3H9E" + assert result["output_filename"] == "delete_results_G3H9E_20231125T120000Z.json" + assert "_output_error" not in result + mock_write.assert_called_once() + mock_print.assert_called_once() + + @patch("delete_pointers_by_id._write_result_file") + @patch("delete_pointers_by_id._print_summary") + def test_build_and_write_result_with_error(self, mock_print, mock_write): + mock_write.side_effect = Exception("Write failed") + start_time = datetime.now(tz=timezone.utc) + end_time = datetime.now(tz=timezone.utc) + ctx = PointerDeletionContext( + pointers_to_delete=["G3H9E-1"], + ods_code="G3H9E", + matched_pointers=["G3H9E-1"], + mismatched_pointers=[], + not_found_pointers=[], + pointers_deleted=["G3H9E-1"], + failed_deletes=[], + start_time=start_time, + end_time=end_time, + output_filename="delete_results_G3H9E_20231125T120000Z.json", + ) + result = _build_and_write_result(ctx) + + assert "_output_error" in result + assert "Write failed" in result["_output_error"] + mock_write.assert_called_once() + mock_print.assert_called_once() + + +class TestCheckPointersMatchOdsCode: + def test_all_match(self): + ods = "G3H9E" + ids = ["G3H9E-a", "G3H9E-b"] + matched, mismatched = _check_pointers_match_ods_code(ods, ids) + assert matched == ids and mismatched == [] + + def test_none_match(self): + ods = "G3H9E" + ids = ["X-a", "Y-b"] + matched, mismatched = _check_pointers_match_ods_code(ods, ids) + assert matched == [] and mismatched == ids + + def test_mixed(self): + ods = "G3H9E" + ids = ["G3H9E-a", "X-b", "G3H9E-c"] + matched, mismatched = _check_pointers_match_ods_code(ods, ids) + assert matched == ["G3H9E-a", "G3H9E-c"] + assert mismatched == ["X-b"] + + +class TestBatchGetExistingPointers: + @patch("delete_pointers_by_id.dynamodb") + def test_all_exist(self, mock_dynamodb): + table = "t" + ids = ["G3H9E-1", "G3H9E-2"] + mock_dynamodb.batch_get_item.return_value = { + "Responses": { + table: [{"pk": {"S": "D#G3H9E-1"}}, {"pk": {"S": "D#G3H9E-2"}}] + } + } + existing, not_found = _batch_get_existing_pointers(table, ids) + assert existing == ids and not_found == [] + + @patch("delete_pointers_by_id.dynamodb") + def test_none_exist(self, mock_dynamodb): + table = "t" + ids = ["G3H9E-1", "G3H9E-2"] + mock_dynamodb.batch_get_item.return_value = {"Responses": {table: []}} + existing, not_found = _batch_get_existing_pointers(table, ids) + assert existing == [] and not_found == ids + + +class TestBatchDeletePointers: + @patch("delete_pointers_by_id.dynamodb") + def test_all_deleted(self, mock_dynamodb): + table = "t" + ids = ["G3H9E-1", "G3H9E-2"] + mock_dynamodb.batch_write_item.return_value = {"UnprocessedItems": {}} + deleted, failed = _batch_delete_pointers(table, ids) + assert deleted == ids and failed == [] + + @patch("delete_pointers_by_id.dynamodb") + def test_some_unprocessed(self, mock_dynamodb): + table = "t" + ids = ["G3H9E-1", "G3H9E-2", "G3H9E-3"] + mock_dynamodb.batch_write_item.return_value = { + "UnprocessedItems": { + table: [ + { + "DeleteRequest": { + "Key": {"pk": {"S": "D#G3H9E-3"}, "sk": {"S": "D#G3H9E-3"}} + } + } + ] + } + } + deleted, failed = _batch_delete_pointers(table, ids) + assert deleted == ["G3H9E-1", "G3H9E-2"] + assert failed == ["G3H9E-3"] + + +class TestDeletePointersById: + def test_missing_params(self): + with pytest.raises( + ValueError, match="Provide either pointers_to_delete or pointers_file" + ): + _delete_pointers_by_id("t", "G3H9E") + + def test_both_params_provided(self): + with pytest.raises( + ValueError, + match="Provide either pointers_to_delete or pointers_file, not both", + ): + _delete_pointers_by_id( + "t", "G3H9E", pointers_to_delete=["a"], pointers_file="./f" + ) + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_empty_pointers_list(self, mock_delete, mock_get, mock_check, mock_build): + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=[]) + + mock_build.assert_called_once() + + mock_check.assert_not_called() + mock_get.assert_not_called() + mock_delete.assert_not_called() + + call_args = mock_build.call_args[0][0] + assert call_args.pointers_to_delete == [] + assert call_args.matched_pointers == [] + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_no_matched_ods_codes(self, mock_delete, mock_get, mock_check, mock_build): + mock_check.return_value = ([], ["RAT-1", "RAT-2"]) + + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=["RAT-1", "RAT-2"]) + + mock_build.assert_called_once() + mock_check.assert_called_once() + mock_get.assert_not_called() + mock_delete.assert_not_called() + + call_args = mock_build.call_args[0][0] + assert call_args.matched_pointers == [] + assert call_args.mismatched_pointers == ["RAT-1", "RAT-2"] + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_successful_flow(self, mock_delete, mock_get, mock_check, mock_build): + ids = ["G3H9E-1", "G3H9E-2"] + mock_check.return_value = (ids, []) + mock_get.return_value = (ids, []) + mock_delete.return_value = (ids, []) + + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=ids) + + mock_build.assert_called_once() + mock_check.assert_called_once() + mock_get.assert_called_once() + mock_delete.assert_called_once() + + call_args = mock_build.call_args[0][0] + assert call_args.pointers_deleted == ids + assert call_args.failed_deletes == [] + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_partial_with_failures(self, mock_delete, mock_get, mock_check, mock_build): + matched = ["G3H9E-1", "G3H9E-2", "G3H9E-3"] + mock_check.return_value = (matched, ["RAT-1"]) + mock_get.return_value = (matched, []) + mock_delete.return_value = (["G3H9E-1", "G3H9E-2"], ["G3H9E-3"]) + + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=matched + ["RAT-1"]) + + mock_build.assert_called_once() + + call_args = mock_build.call_args[0][0] + assert call_args.mismatched_pointers == ["RAT-1"] + assert call_args.pointers_deleted == ["G3H9E-1", "G3H9E-2"] + assert call_args.failed_deletes == ["G3H9E-3"] + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_some_pointers_not_found( + self, mock_delete, mock_get, mock_check, mock_build + ): + matched = ["G3H9E-1", "G3H9E-2", "G3H9E-3"] + mock_check.return_value = (matched, []) + mock_get.return_value = (["G3H9E-1", "G3H9E-2"], ["G3H9E-3"]) + mock_delete.return_value = (["G3H9E-1", "G3H9E-2"], []) + + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=matched) + + mock_build.assert_called_once() + + call_args = mock_build.call_args[0][0] + assert call_args.not_found_pointers == ["G3H9E-3"] + assert call_args.pointers_deleted == ["G3H9E-1", "G3H9E-2"] + + @patch("delete_pointers_by_id._build_and_write_result") + @patch("delete_pointers_by_id._check_pointers_match_ods_code") + @patch("delete_pointers_by_id._batch_get_existing_pointers") + @patch("delete_pointers_by_id._batch_delete_pointers") + def test_no_existing_pointers(self, mock_delete, mock_get, mock_check, mock_build): + matched = ["G3H9E-1", "G3H9E-2"] + mock_check.return_value = (matched, []) + mock_get.return_value = ([], matched) + + _delete_pointers_by_id("t", "G3H9E", pointers_to_delete=matched) + + mock_build.assert_called_once() + mock_delete.assert_not_called() + + # Verify the context + call_args = mock_build.call_args[0][0] + assert call_args.not_found_pointers == matched + assert call_args.pointers_deleted == []