From aa952b3b4a64ef1df3e153514c1dee8afadcb053 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 12:44:54 +0700 Subject: [PATCH 01/10] wip --- src/tirith/core/core.py | 5 ++++- src/tirith/providers/kubernetes/handler.py | 2 +- tests/providers/kubernetes/test_attribute.py | 13 +++++++++++++ 3 files changed, 18 insertions(+), 2 deletions(-) create mode 100644 tests/providers/kubernetes/test_attribute.py diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index f24917f5..15f76722 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -237,7 +237,10 @@ def start_policy_evaluation( if input_path.endswith(".yaml") or input_path.endswith(".yml"): # safe_load_all returns a generator, we need to convert it into a # dictionary because start_policy_evaluation_from_dict expects a dictionary - input_data = dict(yamls=list(yaml.safe_load_all(f))) + input_data = list(yaml.safe_load_all(f)) + # TODO: Handle zero length + if len(input_data) <= 1: + input_data = input_data[0] else: input_data = json.load(f) # TODO: validate input_data using the optionally available validate function in provider diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index 746b0dce..ff9f9f95 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -42,7 +42,7 @@ def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: if attribute_path == "": return create_result_dict(value=ProviderError(severity_value=99), err="attribute_path must be provided") - kubernetes_resources = input_data["yamls"] + kubernetes_resources = input_data is_kind_found = False for resource in kubernetes_resources: diff --git a/tests/providers/kubernetes/test_attribute.py b/tests/providers/kubernetes/test_attribute.py new file mode 100644 index 00000000..d81dcd5b --- /dev/null +++ b/tests/providers/kubernetes/test_attribute.py @@ -0,0 +1,13 @@ +import json +import os + +from tirith.core.core import start_policy_evaluation + + +def test_get_value(): + test_dir = os.path.dirname(os.path.realpath(__file__)) + input_path = os.path.join(test_dir, "input.yml") + policy_path = os.path.join(test_dir, "policy.json") + + result = start_policy_evaluation(policy_path=policy_path, input_path=input_path) + assert result["final_result"] == False From 610772ee70dcab67d0fceec3959e0896b16d4440 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:03:38 +0700 Subject: [PATCH 02/10] fix(json/get_value): enhance get_path_value_from_dict with wildcard support This is to fix get_value not being able to do `*.somevalue` and some other cases. --- src/tirith/providers/common.py | 136 +++++++++++++++- src/tirith/providers/json/handler.py | 28 +--- src/tirith/providers/kubernetes/handler.py | 30 +--- tests/providers/test_common.py | 174 +++++++++++++++++++++ 4 files changed, 309 insertions(+), 59 deletions(-) create mode 100644 tests/providers/test_common.py diff --git a/src/tirith/providers/common.py b/src/tirith/providers/common.py index 1f018954..c83a81e6 100644 --- a/src/tirith/providers/common.py +++ b/src/tirith/providers/common.py @@ -1,3 +1,5 @@ +import pydash + from typing import Dict @@ -5,9 +7,137 @@ def create_result_dict(value=None, meta=None, err=None) -> Dict: return dict(value=value, meta=meta, err=err) -def get_path_value_from_dict(key_path: str, input_dict: dict, get_path_value_from_dict_func): - splitted_attribute = key_path.split(".*.") - return get_path_value_from_dict_func(splitted_attribute, input_dict) +class PydashPathNotFound: + pass + + +def _get_path_value_from_dict_internal(splitted_paths, input_data, place_none_if_not_found=False): + + if not splitted_paths: + return [input_data] if input_data is not PydashPathNotFound else ([None] if place_none_if_not_found else []) + + final_data = [] + expression = splitted_paths[0] + remaining_paths = splitted_paths[1:] + + # Handle wildcard at the beginning (e.g., "*.something") + if expression == "": + if isinstance(input_data, list): + for item in input_data: + if remaining_paths: + results = _get_path_value_from_dict_internal(remaining_paths, item, place_none_if_not_found) + final_data.extend(results) + else: + final_data.append(item) + elif isinstance(input_data, dict): + for value in input_data.values(): + if remaining_paths: + results = _get_path_value_from_dict_internal(remaining_paths, value, place_none_if_not_found) + final_data.extend(results) + else: + final_data.append(value) + else: + # For primitive values with empty expression (wildcard match) + # Just return the value if no more paths to traverse + if not remaining_paths: + final_data.append(input_data) + return final_data + + # Get the value at the current path + intermediate_val = pydash.get(input_data, expression, default=PydashPathNotFound) + + if intermediate_val is PydashPathNotFound: + return [None] if place_none_if_not_found else [] + + # If there are more paths to traverse + if remaining_paths: + if isinstance(intermediate_val, list) and remaining_paths[0] == "": + # For lists with a wildcard marker, iterate over list items + # Skip the wildcard marker since iteration is implicit for lists + paths_to_apply = remaining_paths[1:] + for val in intermediate_val: + results = _get_path_value_from_dict_internal(paths_to_apply, val, place_none_if_not_found) + final_data.extend(results) + elif isinstance(intermediate_val, dict) and remaining_paths[0] == "": + # If it's a dict and next path is a wildcard, iterate over dict values + # Skip the wildcard marker and apply remaining paths to each value + for value in intermediate_val.values(): + results = _get_path_value_from_dict_internal(remaining_paths[1:], value, place_none_if_not_found) + final_data.extend(results) + else: + # For non-wildcard paths, continue traversal without iteration + results = _get_path_value_from_dict_internal(remaining_paths, intermediate_val, place_none_if_not_found) + final_data.extend(results) + else: + # This is the final path segment + final_data.append(intermediate_val) + + return final_data + + +def get_path_value_from_dict(key_path: str, input_dict: dict, place_none_if_not_found: bool = False): + """ + Retrieve values from a nested dictionary using a path expression with wildcard support. + + :param key_path: A dot-separated path to traverse the dictionary. + Use ``*.`` for wildcards to match all items at that level. + :type key_path: str + :param input_dict: The input dictionary to search through. + :type input_dict: dict + :param place_none_if_not_found: If True, returns [None] when a path is not found. + If False, returns an empty list []. Defaults to False. + :type place_none_if_not_found: bool + :return: A list of values found at the specified path. Returns empty list or [None] if path not found, + depending on place_none_if_not_found parameter. + :rtype: list + + **Examples:** + + Basic path traversal:: + + >>> data = {"user": {"name": "Alice", "age": 30}} + >>> get_path_value_from_dict("user.name", data) + ["Alice"] + + Wildcard with list items:: + + >>> data = {"users": [{"name": "Alice"}, {"name": "Bob"}]} + >>> get_path_value_from_dict("users.*.name", data) + ["Alice", "Bob"] + + Wildcard with dictionary values:: + + >>> data = {"countries": {"US": {"capital": "Washington"}, "UK": {"capital": "London"}}} + >>> get_path_value_from_dict("countries.*.capital", data) + ["Washington", "London"] + + Leading wildcard:: + + >>> data = [{"name": "Alice"}, {"name": "Bob"}] + >>> get_path_value_from_dict("*.name", data) + ["Alice", "Bob"] + + Path not found behavior:: + + >>> data = {"user": {"name": "Alice"}} + >>> get_path_value_from_dict("missing.path", data) + [] + >>> get_path_value_from_dict("missing.path", data, place_none_if_not_found=True) + [None] + """ + # Handle empty path - return the input data as is + if not key_path: + return [input_dict] + + # Split the path by dots and replace '*' with empty string to mark wildcards + # Empty strings act as markers to iterate over collections (lists or dict values) + # Example: "users.*.name" -> ["users", "", "name"] + # "*.name" -> ["", "name"] + # "numbers.*" -> ["numbers", ""] + splitted_attribute = key_path.split(".") + splitted_attribute = ["" if part == "*" else part for part in splitted_attribute] + + return _get_path_value_from_dict_internal(splitted_attribute, input_dict, place_none_if_not_found) class ProviderError: diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index ce4ced5b..0da8a143 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -1,38 +1,12 @@ -import pydash - from typing import Callable, Dict, List from ..common import create_result_dict, ProviderError, get_path_value_from_dict -class PydashPathNotFound: - pass - - -def _get_path_value_from_dict(splitted_paths, input_dict): - final_data = [] - for i, expression in enumerate(splitted_paths): - intermediate_val = pydash.get(input_dict, expression, default=PydashPathNotFound) - if isinstance(intermediate_val, list) and i < len(splitted_paths) - 1: - for val in intermediate_val: - final_attributes = _get_path_value_from_dict(splitted_paths[1:], val) - for final_attribute in final_attributes: - final_data.append(final_attribute) - elif i == len(splitted_paths) - 1 and intermediate_val is not PydashPathNotFound: - final_data.append(intermediate_val) - elif ".*" in expression: - intermediate_exp = expression.split(".*") - intermediate_data = pydash.get(input_dict, intermediate_exp[0], default=PydashPathNotFound) - if intermediate_data is not PydashPathNotFound and isinstance(intermediate_data, list): - for val in intermediate_data: - final_data.append(val) - return final_data - - def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: # Must be validated first whether the provider args are valid for this op type key_path: str = provider_args["key_path"] - values = get_path_value_from_dict(key_path, input_data, _get_path_value_from_dict) + values = get_path_value_from_dict(key_path, input_data) if len(values) == 0: severity_value = 2 diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index ff9f9f95..ae2a278a 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -4,34 +4,6 @@ from ..common import create_result_dict, ProviderError, get_path_value_from_dict -class PydashPathNotFound: - pass - - -def _get_path_value_from_dict(splitted_paths, input_dict): - final_data = [] - expression = splitted_paths[0] - is_the_last_expression = len(splitted_paths) == 1 - - intermediate_val = pydash.get(input_dict, expression, default=PydashPathNotFound) - if isinstance(intermediate_val, list) and not is_the_last_expression: - for val in intermediate_val: - final_attributes = _get_path_value_from_dict(splitted_paths[1:], val) - for final_attribute in final_attributes: - final_data.append(final_attribute) - elif intermediate_val is PydashPathNotFound: - final_data.append(None) - elif is_the_last_expression: - final_data.append(intermediate_val) - elif ".*" in expression: - intermediate_exp = expression.split(".*") - intermediate_data = pydash.get(input_dict, intermediate_exp[0], default=PydashPathNotFound) - if intermediate_data is not PydashPathNotFound and isinstance(intermediate_data, list): - for val in intermediate_data: - final_data.append(val) - return final_data - - def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: # Must be validated first whether the provider args are valid for this op type target_kind: str = provider_args.get("kubernetes_kind") @@ -49,7 +21,7 @@ def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: if resource["kind"] != target_kind: continue is_kind_found = True - values = get_path_value_from_dict(attribute_path, resource, _get_path_value_from_dict) + values = get_path_value_from_dict(attribute_path, resource, place_none_if_not_found=True) if ".*." not in attribute_path: # If there's no * in the attribute path, the values always have 1 member values = values[0] diff --git a/tests/providers/test_common.py b/tests/providers/test_common.py new file mode 100644 index 00000000..ed22bcba --- /dev/null +++ b/tests/providers/test_common.py @@ -0,0 +1,174 @@ +import pytest +from tirith.providers.common import get_path_value_from_dict + + +# Test data for simple path access +simple_path_cases = [ + ({"user": {"name": "Alice", "age": 30}}, "user.name", ["Alice"]), + ({"level1": {"level2": {"level3": {"value": "deep"}}}}, "level1.level2.level3.value", ["deep"]), + ({"name": "Alice", "age": 30}, "name", ["Alice"]), + ({"items": ["a", "b", "c"]}, "items.0", ["a"]), + ({"settings": {"enabled": True, "visible": False}}, "settings.enabled", [True]), + ({"items": {"0": "first", "1": "second"}}, "items.0", ["first"]), + ({"user-profile": {"first-name": "Alice"}}, "user-profile.first-name", ["Alice"]), +] + +# Test data for wildcard with lists +wildcard_list_cases = [ + ({"users": [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}]}, "users.*.name", ["Alice", "Bob", "Charlie"]), + ([{"name": "Alice"}, {"name": "Bob"}], "*.name", ["Alice", "Bob"]), + ({"numbers": [1, 2, 3, 4, 5]}, "numbers.*", [1, 2, 3, 4, 5]), + ({"items": [{"name": "Alice"}, {"other": "value"}, {"name": "Bob"}]}, "items.*.name", ["Alice", "Bob"]), +] + +# Test data for wildcard with dictionaries +wildcard_dict_cases = [ + ( + {"countries": {"US": {"capital": "Washington"}, "UK": {"capital": "London"}, "FR": {"capital": "Paris"}}}, + "countries.*.capital", + {"Washington", "London", "Paris"}, + ), + ({"a": {"value": 1}, "b": {"value": 2}, "c": {"value": 3}}, "*.value", {1, 2, 3}), + ({"mixed": {"a": 1, "b": "string", "c": True, "d": None}}, "mixed.*", {1, "string", True, None}), +] + +# Test data for multiple wildcards +multiple_wildcard_cases = [ + ( + { + "departments": [ + {"name": "Engineering", "employees": [{"name": "Alice"}, {"name": "Bob"}]}, + {"name": "Sales", "employees": [{"name": "Charlie"}, {"name": "Diana"}]}, + ] + }, + "departments.*.employees.*.name", + ["Alice", "Bob", "Charlie", "Diana"], + ), + ({"groups": [[{"id": 1}, {"id": 2}], [{"id": 3}, {"id": 4}]]}, "groups.*.*.id", [1, 2, 3, 4]), + ({"a": {"b": [{"c": 1}, {"c": 2}]}, "x": {"b": [{"c": 3}, {"c": 4}]}}, "*.b.*.c", {1, 2, 3, 4}), +] + +# Test data for path not found (default behavior) +path_not_found_cases = [ + ({"user": {"name": "Alice"}}, "user.age", []), + ({"level1": {"level2": {}}}, "level1.level2.level3.value", []), + ({"users": []}, "users.*.name", []), + ({"users": {}}, "users.*.name", []), +] + +# Test data for path not found with flag +path_not_found_with_flag_cases = [ + ({"user": {"name": "Alice"}}, "user.age", True, [None]), + ({"level1": {"level2": {}}}, "level1.level2.level3.value", True, [None]), +] + +# Test data for special values +special_value_cases = [ + ({"user": {"profile": None}}, "user.profile", [None]), + ({"key": "value"}, "", [{"key": "value"}]), +] + +# Test data for complex nested structures +complex_structure_cases = [ + ( + { + "organizations": [ + { + "name": "Org1", + "departments": [ + {"name": "Dept1", "teams": [{"members": [{"email": "a@test.com"}, {"email": "b@test.com"}]}]} + ], + }, + {"name": "Org2", "departments": [{"name": "Dept2", "teams": [{"members": [{"email": "c@test.com"}]}]}]}, + ] + }, + "organizations.*.departments.*.teams.*.members.*.email", + ["a@test.com", "b@test.com", "c@test.com"], + ), +] + + +@pytest.mark.parametrize("data,path,expected", simple_path_cases) +def test_simple_path_access(data, path, expected): + """Test basic path traversal without wildcards""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,expected", wildcard_list_cases) +def test_wildcard_with_list(data, path, expected): + """Test wildcard with list of dictionaries""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,expected_set", wildcard_dict_cases) +def test_wildcard_with_dict(data, path, expected_set): + """Test wildcard with dictionary values (order-independent)""" + result = get_path_value_from_dict(path, data) + assert set(result) == expected_set + + +@pytest.mark.parametrize("data,path,expected", multiple_wildcard_cases) +def test_multiple_wildcards(data, path, expected): + """Test multiple wildcards in the path""" + result = get_path_value_from_dict(path, data) + if isinstance(expected, set): + assert set(result) == expected + else: + assert result == expected + + +@pytest.mark.parametrize("data,path,expected", path_not_found_cases) +def test_path_not_found_default(data, path, expected): + """Test that non-existent path returns empty list by default""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,flag,expected", path_not_found_with_flag_cases) +def test_path_not_found_with_flag(data, path, flag, expected): + """Test that non-existent path returns [None] when flag is True""" + result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + assert result == expected + + +@pytest.mark.parametrize("data,path,expected", special_value_cases) +def test_special_values(data, path, expected): + """Test handling of special values like None and empty paths""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,expected", complex_structure_cases) +def test_complex_nested_structure(data, path, expected): + """Test complex real-world-like nested structures""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +def test_nested_wildcard_with_missing_intermediate(): + """Test nested wildcard when intermediate path is missing""" + data = {"groups": [{"users": [{"name": "Alice"}]}, {"other": "data"}]} + result = get_path_value_from_dict("groups.*.users.*.name", data) + assert result == ["Alice"] + + +def test_partial_match_with_wildcard(): + """Test when some list items have the key and others don't""" + data = {"items": [{"name": "Alice"}, {"other": "value"}, {"name": "Bob"}]} + result = get_path_value_from_dict("items.*.name", data) + assert result == ["Alice", "Bob"] + + +def test_empty_containers_default(): + """Test empty containers return empty list""" + assert get_path_value_from_dict("users.*.name", {"users": []}) == [] + assert get_path_value_from_dict("users.*.name", {"users": {}}) == [] + + +def test_empty_containers_with_flag(): + """Test empty containers with place_none_if_not_found flag""" + # Empty containers don't trigger the flag since they exist but are empty + assert get_path_value_from_dict("users.*.name", {"users": []}, place_none_if_not_found=True) == [] + assert get_path_value_from_dict("users.*.name", {"users": {}}, place_none_if_not_found=True) == [] From 2ee08a6fc6166a634d20ea08718eb7371c9491a3 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:07:50 +0700 Subject: [PATCH 03/10] update --- src/tirith/core/core.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/tirith/core/core.py b/src/tirith/core/core.py index 15f76722..27c60646 100644 --- a/src/tirith/core/core.py +++ b/src/tirith/core/core.py @@ -235,11 +235,8 @@ def start_policy_evaluation( with open(input_path) as f: if input_path.endswith(".yaml") or input_path.endswith(".yml"): - # safe_load_all returns a generator, we need to convert it into a - # dictionary because start_policy_evaluation_from_dict expects a dictionary input_data = list(yaml.safe_load_all(f)) - # TODO: Handle zero length - if len(input_data) <= 1: + if len(input_data) == 1: input_data = input_data[0] else: input_data = json.load(f) From b900d831533ede0254187d1daba2a41ad1a55a78 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:12:00 +0700 Subject: [PATCH 04/10] update --- tests/providers/json/playbook.yml | 44 +++++++++++++++++++++++ tests/providers/json/policy_playbook.json | 20 +++++++++++ tests/providers/json/test_get_value.py | 14 ++++++-- 3 files changed, 76 insertions(+), 2 deletions(-) create mode 100644 tests/providers/json/playbook.yml create mode 100644 tests/providers/json/policy_playbook.json diff --git a/tests/providers/json/playbook.yml b/tests/providers/json/playbook.yml new file mode 100644 index 00000000..44b7841d --- /dev/null +++ b/tests/providers/json/playbook.yml @@ -0,0 +1,44 @@ +- name: Provision EC2 instance and set up MySQL + hosts: localhost + gather_facts: false + become: True + vars: + region: "your_aws_region" + instance_type: "t2.micro" + ami_id: "your_ami_id" + key_name: "your_key_name" + security_group: "your_security_group_id" + subnet_id: "your_subnet_id" + mysql_root_password: "your_mysql_root_password" + package_list: + - unauthorized-app + tasks: + - name: Create EC2 instance + amazon.aws.ec2_instance: + region: "{{ region }}" + key_name: "{{ key_name }}" + instance_type: "{{ instance_type }}" + image_id: "{{ ami_id }}" + security_group: "{{ security_group }}" + subnet_id: "{{ subnet_id }}" + assign_public_ip: true + wait: yes + count: 1 + instance_tags: + Name: "MySQLInstance" + register: ec2 + + - name: Install Unauthorized App + become: true + ansible.builtin.package: + name: "{{ package_list }}" + state: present + + - name: Set MySQL root password [using unauthorized collection] + community.mysql.mysql_user: + name: root + password: "{{ mysql_root_password }}" + host: "{{ item }}" + login_unix_socket: yes + with_items: ["localhost", "127.0.0.1", "::1"] + diff --git a/tests/providers/json/policy_playbook.json b/tests/providers/json/policy_playbook.json new file mode 100644 index 00000000..1dc04c58 --- /dev/null +++ b/tests/providers/json/policy_playbook.json @@ -0,0 +1,20 @@ +{ + "meta": { + "version": "v1", + "required_provider": "stackguardian/json" + }, + "evaluators": [ + { + "id": "check0", + "provider_args": { + "operation_type": "get_value", + "key_path": "*.vars.region" + }, + "condition": { + "type": "Equals", + "value": "your_aws_region" + } + } + ], + "eval_expression": "check0" +} diff --git a/tests/providers/json/test_get_value.py b/tests/providers/json/test_get_value.py index 7d754a37..a702d957 100644 --- a/tests/providers/json/test_get_value.py +++ b/tests/providers/json/test_get_value.py @@ -1,7 +1,7 @@ import json import os -from tirith.core.core import start_policy_evaluation_from_dict +from tirith.core.core import start_policy_evaluation, start_policy_evaluation_from_dict # TODO: Need to split this into multiple tests @@ -13,4 +13,14 @@ def test_get_value(): policy = json.load(f) result = start_policy_evaluation_from_dict(policy, input_data) - assert result["final_result"] == True + assert result["final_result"] is True + + +def test_get_value_playbook(): + """Test get_value with playbook YAML data using wildcard path""" + test_dir = os.path.dirname(os.path.realpath(__file__)) + input_path = os.path.join(test_dir, "playbook.yml") + policy_path = os.path.join(test_dir, "policy_playbook.json") + + result = start_policy_evaluation(policy_path=policy_path, input_path=input_path) + assert result["final_result"] is True From 412b06d77524ff6674ac41fb399342d1906c2892 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:16:39 +0700 Subject: [PATCH 05/10] update --- tests/providers/test_common.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/providers/test_common.py b/tests/providers/test_common.py index ed22bcba..565e7d21 100644 --- a/tests/providers/test_common.py +++ b/tests/providers/test_common.py @@ -172,3 +172,21 @@ def test_empty_containers_with_flag(): # Empty containers don't trigger the flag since they exist but are empty assert get_path_value_from_dict("users.*.name", {"users": []}, place_none_if_not_found=True) == [] assert get_path_value_from_dict("users.*.name", {"users": {}}, place_none_if_not_found=True) == [] + + +def test_wildcard_with_no_remaining_paths(): + """Test wildcard at the end of path with no remaining paths - covers line 31-32""" + # Test with list at root level - wildcard with no further paths should return all items + data = [1, 2, 3, 4, 5] + result = get_path_value_from_dict("*", data) + assert result == [1, 2, 3, 4, 5] + + # Test with list of dicts - wildcard with no further paths should return all dict items + data2 = [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}] + result2 = get_path_value_from_dict("*", data2) + assert result2 == [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}] + + # Test with nested list - get list then apply wildcard with no remaining paths + data3 = {"items": [10, 20, 30]} + result3 = get_path_value_from_dict("items.*", data3) + assert result3 == [10, 20, 30] From 8e1ef6538d076c93cc0a0e84d40ce0a8f849f9ba Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:35:10 +0700 Subject: [PATCH 06/10] update --- tests/providers/test_common.py | 120 +++++++++++++++++++++++---------- 1 file changed, 86 insertions(+), 34 deletions(-) diff --git a/tests/providers/test_common.py b/tests/providers/test_common.py index 565e7d21..2b412b5c 100644 --- a/tests/providers/test_common.py +++ b/tests/providers/test_common.py @@ -147,46 +147,98 @@ def test_complex_nested_structure(data, path, expected): assert result == expected -def test_nested_wildcard_with_missing_intermediate(): - """Test nested wildcard when intermediate path is missing""" - data = {"groups": [{"users": [{"name": "Alice"}]}, {"other": "data"}]} - result = get_path_value_from_dict("groups.*.users.*.name", data) - assert result == ["Alice"] +# Test data for edge cases with wildcards +edge_case_wildcard_cases = [ + # Nested wildcard when intermediate path is missing + ({"groups": [{"users": [{"name": "Alice"}]}, {"other": "data"}]}, "groups.*.users.*.name", ["Alice"]), + # Partial match with wildcard - some list items have the key and others don't + ({"items": [{"name": "Alice"}, {"other": "value"}, {"name": "Bob"}]}, "items.*.name", ["Alice", "Bob"]), +] +# Test data for empty containers +empty_container_cases = [ + ({"users": []}, "users.*.name", False, []), + ({"users": {}}, "users.*.name", False, []), +] -def test_partial_match_with_wildcard(): - """Test when some list items have the key and others don't""" - data = {"items": [{"name": "Alice"}, {"other": "value"}, {"name": "Bob"}]} - result = get_path_value_from_dict("items.*.name", data) - assert result == ["Alice", "Bob"] +# Test data for empty containers with flag +empty_container_with_flag_cases = [ + ({"users": []}, "users.*.name", True, []), + ({"users": {}}, "users.*.name", True, []), +] -def test_empty_containers_default(): +@pytest.mark.parametrize("data,path,expected", edge_case_wildcard_cases) +def test_edge_case_wildcards(data, path, expected): + """Test edge cases with wildcards like missing intermediate paths and partial matches""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,flag,expected", empty_container_cases) +def test_empty_containers(data, path, flag, expected): """Test empty containers return empty list""" - assert get_path_value_from_dict("users.*.name", {"users": []}) == [] - assert get_path_value_from_dict("users.*.name", {"users": {}}) == [] + result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + assert result == expected -def test_empty_containers_with_flag(): +@pytest.mark.parametrize("data,path,flag,expected", empty_container_with_flag_cases) +def test_empty_containers_with_flag(data, path, flag, expected): """Test empty containers with place_none_if_not_found flag""" # Empty containers don't trigger the flag since they exist but are empty - assert get_path_value_from_dict("users.*.name", {"users": []}, place_none_if_not_found=True) == [] - assert get_path_value_from_dict("users.*.name", {"users": {}}, place_none_if_not_found=True) == [] - - -def test_wildcard_with_no_remaining_paths(): - """Test wildcard at the end of path with no remaining paths - covers line 31-32""" - # Test with list at root level - wildcard with no further paths should return all items - data = [1, 2, 3, 4, 5] - result = get_path_value_from_dict("*", data) - assert result == [1, 2, 3, 4, 5] - - # Test with list of dicts - wildcard with no further paths should return all dict items - data2 = [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}] - result2 = get_path_value_from_dict("*", data2) - assert result2 == [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}] - - # Test with nested list - get list then apply wildcard with no remaining paths - data3 = {"items": [10, 20, 30]} - result3 = get_path_value_from_dict("items.*", data3) - assert result3 == [10, 20, 30] + result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + assert result == expected + + +# Test data for wildcard with no remaining paths +wildcard_list_no_remaining_cases = [ + ([1, 2, 3, 4, 5], "*", [1, 2, 3, 4, 5]), + ( + [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}], + "*", + [{"name": "Alice"}, {"name": "Bob"}, {"name": "Charlie"}], + ), + ({"items": [10, 20, 30]}, "items.*", [10, 20, 30]), +] + +# Test data for wildcard dict with no remaining paths +wildcard_dict_no_remaining_cases = [ + ({"a": 1, "b": 2, "c": 3}, "*", {1, 2, 3}), + ({"config": {"x": 10, "y": 20, "z": 30}}, "config.*", {10, 20, 30}), + ( + {"config": {"setting1": "value1", "setting2": "value2", "setting3": "value3"}}, + "config.*", + {"value1", "value2", "value3"}, + ), + ({"settings": {"enabled": True, "count": 42, "name": "test"}}, "settings.*", {True, 42, "test"}), + ({"items": {"x": 100, "y": "text", "z": True}}, "items.*", {100, "text", True}), +] + +# Test data for wildcard with primitive values +wildcard_primitive_cases = [ + (42, "*", [42]), + ("hello", "*", ["hello"]), + (True, "*", [True]), + (None, "*", [None]), +] + + +@pytest.mark.parametrize("data,path,expected", wildcard_list_no_remaining_cases) +def test_wildcard_list_no_remaining_paths(data, path, expected): + """Test wildcard at the end of path with list and no remaining paths - covers lines 31-32""" + result = get_path_value_from_dict(path, data) + assert result == expected + + +@pytest.mark.parametrize("data,path,expected_set", wildcard_dict_no_remaining_cases) +def test_wildcard_dict_no_remaining_paths(data, path, expected_set): + """Test wildcard at the end of path with dict and no remaining paths - covers lines 38-39""" + result = get_path_value_from_dict(path, data) + assert set(result) == expected_set + + +@pytest.mark.parametrize("data,path,expected", wildcard_primitive_cases) +def test_wildcard_primitive_no_remaining_paths(data, path, expected): + """Test wildcard applied to primitive value with no remaining paths - covers lines 42-43""" + result = get_path_value_from_dict(path, data) + assert result == expected From e6092821d6118ffe2deb7d1329d526ccae4d79f7 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 16:57:26 +0700 Subject: [PATCH 07/10] update --- src/tirith/providers/common.py | 64 +++++++++++++------- src/tirith/providers/json/handler.py | 4 +- src/tirith/providers/kubernetes/handler.py | 4 +- tests/providers/kubernetes/test_attribute.py | 3 +- tests/providers/test_common.py | 30 ++++----- 5 files changed, 62 insertions(+), 43 deletions(-) diff --git a/src/tirith/providers/common.py b/src/tirith/providers/common.py index c83a81e6..5c0c54f3 100644 --- a/src/tirith/providers/common.py +++ b/src/tirith/providers/common.py @@ -1,6 +1,6 @@ import pydash -from typing import Dict +from typing import Dict, Any def create_result_dict(value=None, meta=None, err=None) -> Dict: @@ -11,7 +11,7 @@ class PydashPathNotFound: pass -def _get_path_value_from_dict_internal(splitted_paths, input_data, place_none_if_not_found=False): +def _get_path_value_from_input_internal(splitted_paths, input_data, place_none_if_not_found=False): if not splitted_paths: return [input_data] if input_data is not PydashPathNotFound else ([None] if place_none_if_not_found else []) @@ -25,14 +25,14 @@ def _get_path_value_from_dict_internal(splitted_paths, input_data, place_none_if if isinstance(input_data, list): for item in input_data: if remaining_paths: - results = _get_path_value_from_dict_internal(remaining_paths, item, place_none_if_not_found) + results = _get_path_value_from_input_internal(remaining_paths, item, place_none_if_not_found) final_data.extend(results) else: final_data.append(item) elif isinstance(input_data, dict): for value in input_data.values(): if remaining_paths: - results = _get_path_value_from_dict_internal(remaining_paths, value, place_none_if_not_found) + results = _get_path_value_from_input_internal(remaining_paths, value, place_none_if_not_found) final_data.extend(results) else: final_data.append(value) @@ -56,17 +56,17 @@ def _get_path_value_from_dict_internal(splitted_paths, input_data, place_none_if # Skip the wildcard marker since iteration is implicit for lists paths_to_apply = remaining_paths[1:] for val in intermediate_val: - results = _get_path_value_from_dict_internal(paths_to_apply, val, place_none_if_not_found) + results = _get_path_value_from_input_internal(paths_to_apply, val, place_none_if_not_found) final_data.extend(results) elif isinstance(intermediate_val, dict) and remaining_paths[0] == "": # If it's a dict and next path is a wildcard, iterate over dict values # Skip the wildcard marker and apply remaining paths to each value for value in intermediate_val.values(): - results = _get_path_value_from_dict_internal(remaining_paths[1:], value, place_none_if_not_found) + results = _get_path_value_from_input_internal(remaining_paths[1:], value, place_none_if_not_found) final_data.extend(results) else: # For non-wildcard paths, continue traversal without iteration - results = _get_path_value_from_dict_internal(remaining_paths, intermediate_val, place_none_if_not_found) + results = _get_path_value_from_input_internal(remaining_paths, intermediate_val, place_none_if_not_found) final_data.extend(results) else: # This is the final path segment @@ -75,15 +75,16 @@ def _get_path_value_from_dict_internal(splitted_paths, input_data, place_none_if return final_data -def get_path_value_from_dict(key_path: str, input_dict: dict, place_none_if_not_found: bool = False): +def get_path_value_from_input(key_path: str, input: Any, place_none_if_not_found: bool = False): """ - Retrieve values from a nested dictionary using a path expression with wildcard support. + Retrieve values from a nested data structure using a path expression with wildcard support. - :param key_path: A dot-separated path to traverse the dictionary. - Use ``*.`` for wildcards to match all items at that level. + :param key_path: A dot-separated path to traverse the data structure. + Use ``*`` for wildcard to match all items at that level. + Supports nested structures including dictionaries, lists, and primitives. :type key_path: str - :param input_dict: The input dictionary to search through. - :type input_dict: dict + :param input: The input data structure to search through (dict, list, or primitive). + :type input: Any :param place_none_if_not_found: If True, returns [None] when a path is not found. If False, returns an empty list []. Defaults to False. :type place_none_if_not_found: bool @@ -96,38 +97,57 @@ def get_path_value_from_dict(key_path: str, input_dict: dict, place_none_if_not_ Basic path traversal:: >>> data = {"user": {"name": "Alice", "age": 30}} - >>> get_path_value_from_dict("user.name", data) + >>> get_path_value_from_input("user.name", data) ["Alice"] Wildcard with list items:: >>> data = {"users": [{"name": "Alice"}, {"name": "Bob"}]} - >>> get_path_value_from_dict("users.*.name", data) + >>> get_path_value_from_input("users.*.name", data) ["Alice", "Bob"] Wildcard with dictionary values:: >>> data = {"countries": {"US": {"capital": "Washington"}, "UK": {"capital": "London"}}} - >>> get_path_value_from_dict("countries.*.capital", data) + >>> get_path_value_from_input("countries.*.capital", data) ["Washington", "London"] - Leading wildcard:: + Leading wildcard on lists:: >>> data = [{"name": "Alice"}, {"name": "Bob"}] - >>> get_path_value_from_dict("*.name", data) + >>> get_path_value_from_input("*.name", data) ["Alice", "Bob"] + Wildcard on primitives:: + + >>> get_path_value_from_input("*", 42) + [42] + >>> get_path_value_from_input("*", "hello") + ["hello"] + + Multiple wildcards:: + + >>> data = {"groups": [[{"id": 1}, {"id": 2}], [{"id": 3}]]} + >>> get_path_value_from_input("groups.*.*.id", data) + [1, 2, 3] + + Empty path returns input as-is:: + + >>> data = {"key": "value"} + >>> get_path_value_from_input("", data) + [{"key": "value"}] + Path not found behavior:: >>> data = {"user": {"name": "Alice"}} - >>> get_path_value_from_dict("missing.path", data) + >>> get_path_value_from_input("missing.path", data) [] - >>> get_path_value_from_dict("missing.path", data, place_none_if_not_found=True) + >>> get_path_value_from_input("missing.path", data, place_none_if_not_found=True) [None] """ # Handle empty path - return the input data as is if not key_path: - return [input_dict] + return [input] # Split the path by dots and replace '*' with empty string to mark wildcards # Empty strings act as markers to iterate over collections (lists or dict values) @@ -137,7 +157,7 @@ def get_path_value_from_dict(key_path: str, input_dict: dict, place_none_if_not_ splitted_attribute = key_path.split(".") splitted_attribute = ["" if part == "*" else part for part in splitted_attribute] - return _get_path_value_from_dict_internal(splitted_attribute, input_dict, place_none_if_not_found) + return _get_path_value_from_input_internal(splitted_attribute, input, place_none_if_not_found) class ProviderError: diff --git a/src/tirith/providers/json/handler.py b/src/tirith/providers/json/handler.py index 0da8a143..3d2c05c0 100644 --- a/src/tirith/providers/json/handler.py +++ b/src/tirith/providers/json/handler.py @@ -1,12 +1,12 @@ from typing import Callable, Dict, List -from ..common import create_result_dict, ProviderError, get_path_value_from_dict +from ..common import create_result_dict, ProviderError, get_path_value_from_input def get_value(provider_args: Dict, input_data: Dict) -> List[dict]: # Must be validated first whether the provider args are valid for this op type key_path: str = provider_args["key_path"] - values = get_path_value_from_dict(key_path, input_data) + values = get_path_value_from_input(key_path, input_data) if len(values) == 0: severity_value = 2 diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index ae2a278a..845f0c72 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -1,7 +1,7 @@ import pydash from typing import Callable, Dict, List -from ..common import create_result_dict, ProviderError, get_path_value_from_dict +from ..common import create_result_dict, ProviderError, get_path_value_from_input def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: @@ -21,7 +21,7 @@ def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: if resource["kind"] != target_kind: continue is_kind_found = True - values = get_path_value_from_dict(attribute_path, resource, place_none_if_not_found=True) + values = get_path_value_from_input(attribute_path, resource, place_none_if_not_found=True) if ".*." not in attribute_path: # If there's no * in the attribute path, the values always have 1 member values = values[0] diff --git a/tests/providers/kubernetes/test_attribute.py b/tests/providers/kubernetes/test_attribute.py index d81dcd5b..ee1d8d4b 100644 --- a/tests/providers/kubernetes/test_attribute.py +++ b/tests/providers/kubernetes/test_attribute.py @@ -1,4 +1,3 @@ -import json import os from tirith.core.core import start_policy_evaluation @@ -10,4 +9,4 @@ def test_get_value(): policy_path = os.path.join(test_dir, "policy.json") result = start_policy_evaluation(policy_path=policy_path, input_path=input_path) - assert result["final_result"] == False + assert result["final_result"] is False diff --git a/tests/providers/test_common.py b/tests/providers/test_common.py index 2b412b5c..3c65a1e6 100644 --- a/tests/providers/test_common.py +++ b/tests/providers/test_common.py @@ -1,5 +1,5 @@ import pytest -from tirith.providers.common import get_path_value_from_dict +from tirith.providers.common import get_path_value_from_input # Test data for simple path access @@ -91,28 +91,28 @@ @pytest.mark.parametrize("data,path,expected", simple_path_cases) def test_simple_path_access(data, path, expected): """Test basic path traversal without wildcards""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,expected", wildcard_list_cases) def test_wildcard_with_list(data, path, expected): """Test wildcard with list of dictionaries""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,expected_set", wildcard_dict_cases) def test_wildcard_with_dict(data, path, expected_set): """Test wildcard with dictionary values (order-independent)""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert set(result) == expected_set @pytest.mark.parametrize("data,path,expected", multiple_wildcard_cases) def test_multiple_wildcards(data, path, expected): """Test multiple wildcards in the path""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) if isinstance(expected, set): assert set(result) == expected else: @@ -122,28 +122,28 @@ def test_multiple_wildcards(data, path, expected): @pytest.mark.parametrize("data,path,expected", path_not_found_cases) def test_path_not_found_default(data, path, expected): """Test that non-existent path returns empty list by default""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,flag,expected", path_not_found_with_flag_cases) def test_path_not_found_with_flag(data, path, flag, expected): """Test that non-existent path returns [None] when flag is True""" - result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + result = get_path_value_from_input(path, data, place_none_if_not_found=flag) assert result == expected @pytest.mark.parametrize("data,path,expected", special_value_cases) def test_special_values(data, path, expected): """Test handling of special values like None and empty paths""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,expected", complex_structure_cases) def test_complex_nested_structure(data, path, expected): """Test complex real-world-like nested structures""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @@ -171,14 +171,14 @@ def test_complex_nested_structure(data, path, expected): @pytest.mark.parametrize("data,path,expected", edge_case_wildcard_cases) def test_edge_case_wildcards(data, path, expected): """Test edge cases with wildcards like missing intermediate paths and partial matches""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,flag,expected", empty_container_cases) def test_empty_containers(data, path, flag, expected): """Test empty containers return empty list""" - result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + result = get_path_value_from_input(path, data, place_none_if_not_found=flag) assert result == expected @@ -186,7 +186,7 @@ def test_empty_containers(data, path, flag, expected): def test_empty_containers_with_flag(data, path, flag, expected): """Test empty containers with place_none_if_not_found flag""" # Empty containers don't trigger the flag since they exist but are empty - result = get_path_value_from_dict(path, data, place_none_if_not_found=flag) + result = get_path_value_from_input(path, data, place_none_if_not_found=flag) assert result == expected @@ -226,19 +226,19 @@ def test_empty_containers_with_flag(data, path, flag, expected): @pytest.mark.parametrize("data,path,expected", wildcard_list_no_remaining_cases) def test_wildcard_list_no_remaining_paths(data, path, expected): """Test wildcard at the end of path with list and no remaining paths - covers lines 31-32""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected @pytest.mark.parametrize("data,path,expected_set", wildcard_dict_no_remaining_cases) def test_wildcard_dict_no_remaining_paths(data, path, expected_set): """Test wildcard at the end of path with dict and no remaining paths - covers lines 38-39""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert set(result) == expected_set @pytest.mark.parametrize("data,path,expected", wildcard_primitive_cases) def test_wildcard_primitive_no_remaining_paths(data, path, expected): """Test wildcard applied to primitive value with no remaining paths - covers lines 42-43""" - result = get_path_value_from_dict(path, data) + result = get_path_value_from_input(path, data) assert result == expected From b1685bd7fe1874d1306361480696da469be1a50e Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 17:55:17 +0700 Subject: [PATCH 08/10] update --- src/tirith/providers/kubernetes/handler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index 845f0c72..35cd9bdf 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -1,10 +1,10 @@ import pydash -from typing import Callable, Dict, List +from typing import Callable, Dict, List, Union from ..common import create_result_dict, ProviderError, get_path_value_from_input -def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: +def get_value(provider_args: Dict, input_data: Union[Dict|List], outputs: list) -> Dict: # Must be validated first whether the provider args are valid for this op type target_kind: str = provider_args.get("kubernetes_kind") attribute_path: str = provider_args.get("attribute_path", "") @@ -22,7 +22,7 @@ def get_value(provider_args: Dict, input_data: Dict, outputs: list) -> Dict: continue is_kind_found = True values = get_path_value_from_input(attribute_path, resource, place_none_if_not_found=True) - if ".*." not in attribute_path: + if "*" not in attribute_path: # If there's no * in the attribute path, the values always have 1 member values = values[0] outputs.append(create_result_dict(value=values)) From 2f0030798cd24db5d228b4115a3258ef49110ab3 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 18:00:08 +0700 Subject: [PATCH 09/10] update --- src/tirith/providers/kubernetes/handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index 35cd9bdf..b0f21c5c 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -4,7 +4,7 @@ from ..common import create_result_dict, ProviderError, get_path_value_from_input -def get_value(provider_args: Dict, input_data: Union[Dict|List], outputs: list) -> Dict: +def get_value(provider_args: Dict, input_data: Union[Dict, List], outputs: list) -> Dict: # Must be validated first whether the provider args are valid for this op type target_kind: str = provider_args.get("kubernetes_kind") attribute_path: str = provider_args.get("attribute_path", "") From 92d3d4419b7a0a69c5667b0e0eed9d3cf45a2364 Mon Sep 17 00:00:00 2001 From: Rafid Aslam Date: Wed, 19 Nov 2025 18:01:54 +0700 Subject: [PATCH 10/10] update --- src/tirith/providers/kubernetes/handler.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/tirith/providers/kubernetes/handler.py b/src/tirith/providers/kubernetes/handler.py index b0f21c5c..73e92d63 100644 --- a/src/tirith/providers/kubernetes/handler.py +++ b/src/tirith/providers/kubernetes/handler.py @@ -1,5 +1,3 @@ -import pydash - from typing import Callable, Dict, List, Union from ..common import create_result_dict, ProviderError, get_path_value_from_input