From a41ccd50e712e0640e7cfbd868d0d26968de22a0 Mon Sep 17 00:00:00 2001 From: Jiri Vrany Date: Wed, 20 Aug 2025 13:40:04 +0200 Subject: [PATCH 1/5] more robust function for filtering and spliting flowspec rules for user + tests --- flowapp/tests/test_validators.py | 191 +++++++++++++++++++++++++++++++ flowapp/validators.py | 29 +++-- 2 files changed, 209 insertions(+), 11 deletions(-) diff --git a/flowapp/tests/test_validators.py b/flowapp/tests/test_validators.py index cab089f..614d3a5 100644 --- a/flowapp/tests/test_validators.py +++ b/flowapp/tests/test_validators.py @@ -16,6 +16,10 @@ editable_range, network_in_range, range_in_network, + filter_rules_in_network, + split_rules_for_user, + filter_rtbh_rules, + split_rtbh_rules_for_user, ) @@ -354,3 +358,190 @@ def test_network_validator_invalid(field, address, mask): form = MockForm(address, mask) with pytest.raises(ValidationError): validator(form, field) + + +# Mock rule classes for testing robust attribute handling +class MockRule: + """Mock rule with all expected attributes""" + + def __init__(self, source=None, source_mask=None, dest=None, dest_mask=None): + self.source = source + self.source_mask = source_mask + self.dest = dest + self.dest_mask = dest_mask + + +class MockRuleIncomplete: + """Mock rule with missing attributes""" + + def __init__(self, name=None): + self.name = name + # Intentionally missing source, source_mask, dest, dest_mask attributes + + +class MockRulePartial: + """Mock rule with some attributes""" + + def __init__(self, source=None): + self.source = source + # Missing source_mask, dest, dest_mask attributes + + +class MockRTBHRule: + """Mock RTBH rule with all expected attributes""" + + def __init__(self, ipv4=None, ipv4_mask=None, ipv6=None, ipv6_mask=None): + self.ipv4 = ipv4 + self.ipv4_mask = ipv4_mask + self.ipv6 = ipv6 + self.ipv6_mask = ipv6_mask + + +class MockRTBHRuleIncomplete: + """Mock RTBH rule with missing attributes""" + + def __init__(self, name=None): + self.name = name + # Intentionally missing ipv4, ipv4_mask, ipv6, ipv6_mask attributes + + +# Tests for filter_rules_in_network with robust attribute handling +def test_filter_rules_in_network_normal_rules(): + """Test filter_rules_in_network with normal rule objects""" + net_ranges = ["192.168.0.0/16", "10.0.0.0/8"] + rules = [ + MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Should match + MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Should not match + MockRule("10.1.0.0", "16", None, None), # Should match (source only) + ] + + filtered = filter_rules_in_network(net_ranges, rules) + assert len(filtered) == 2 + assert rules[0] in filtered # 192.168.x.x rule + assert rules[2] in filtered # 10.x.x.x rule + assert rules[1] not in filtered # 172.16.x.x rule + + +def test_filter_rules_in_network_missing_attributes(): + """Test filter_rules_in_network with rules missing required attributes""" + net_ranges = ["192.168.0.0/16"] + rules = [ + MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Normal rule - should match + MockRuleIncomplete("rule_without_network_attrs"), # Missing attrs - should be included + MockRulePartial("172.16.1.0"), # Partial attrs - should be included + ] + + filtered = filter_rules_in_network(net_ranges, rules) + assert len(filtered) == 3 # All rules should be included + assert all(rule in filtered for rule in rules) + + +def test_filter_rules_in_network_none_values(): + """Test filter_rules_in_network with None values in attributes""" + net_ranges = ["192.168.0.0/16"] + rules = [ + MockRule("192.168.1.0", "24", None, None), # Should match on source + MockRule(None, None, "192.168.2.0", "24"), # Should match on dest + MockRule(None, None, None, None), # Should not match + ] + + filtered = filter_rules_in_network(net_ranges, rules) + assert len(filtered) == 2 + assert rules[0] in filtered + assert rules[1] in filtered + assert rules[2] not in filtered + + +# Tests for split_rules_for_user with robust attribute handling +def test_split_rules_for_user_normal_rules(): + """Test split_rules_for_user with normal rule objects""" + net_ranges = ["192.168.0.0/16"] + rules = [ + MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Should be user rule + MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Should be rest rule + ] + + user_rules, rest_rules = split_rules_for_user(net_ranges, rules) + assert len(user_rules) == 1 + assert len(rest_rules) == 1 + assert rules[0] in user_rules + assert rules[1] in rest_rules + + +def test_split_rules_for_user_missing_attributes(): + """Test split_rules_for_user with rules missing required attributes""" + net_ranges = ["192.168.0.0/16"] + rules = [ + MockRule("192.168.1.0", "24", "10.0.1.0", "24"), # Normal rule - user rule + MockRuleIncomplete("rule_without_attrs"), # Missing attrs - should be user rule + MockRule("172.16.1.0", "24", "172.16.2.0", "24"), # Normal rule - rest rule + ] + + user_rules, rest_rules = split_rules_for_user(net_ranges, rules) + assert len(user_rules) == 2 # Normal matching rule + incomplete rule + assert len(rest_rules) == 1 + assert rules[0] in user_rules # Matching rule + assert rules[1] in user_rules # Incomplete rule treated as editable + assert rules[2] in rest_rules # Non-matching rule + + +# Tests for filter_rtbh_rules with robust attribute handling +def test_filter_rtbh_rules_normal_rules(): + """Test filter_rtbh_rules with normal RTBH rule objects""" + net_ranges = ["192.168.0.0/16", "2001:db8::/32"] + rules = [ + MockRTBHRule("192.168.1.0", "24", None, None), # Should match on IPv4 + MockRTBHRule(None, None, "2001:db8:1::", "48"), # Should match on IPv6 + MockRTBHRule("172.16.1.0", "24", "2001:db9::", "32"), # Should not match + ] + + filtered = filter_rtbh_rules(net_ranges, rules) + assert len(filtered) == 2 + assert rules[0] in filtered + assert rules[1] in filtered + assert rules[2] not in filtered + + +# Tests for split_rtbh_rules_for_user with robust attribute handling +def test_split_rtbh_rules_for_user_normal_rules(): + """Test split_rtbh_rules_for_user with normal RTBH rule objects""" + net_ranges = ["192.168.0.0/16"] + rules = [ + MockRTBHRule("192.168.1.0", "24", None, None), # Should be filtered (user) + MockRTBHRule("172.16.1.0", "24", None, None), # Should be read-only + ] + + filtered, read_only = split_rtbh_rules_for_user(net_ranges, rules) + assert len(filtered) == 1 + assert len(read_only) == 1 + assert rules[0] in filtered + assert rules[1] in read_only + + +# Edge case tests +def test_filter_functions_empty_input(): + """Test all filter functions with empty input""" + net_ranges = ["192.168.0.0/16"] + + # Empty rules list + assert filter_rules_in_network(net_ranges, []) == [] + assert split_rules_for_user(net_ranges, []) == ([], []) + assert filter_rtbh_rules(net_ranges, []) == [] + assert split_rtbh_rules_for_user(net_ranges, []) == ([], []) + + +def test_filter_functions_empty_net_ranges(): + """Test filter functions with empty network ranges""" + rules = [MockRule("192.168.1.0", "24", None, None)] + rtbh_rules = [MockRTBHRule("192.168.1.0", "24", None, None)] + + # Empty network ranges - nothing should match + assert filter_rules_in_network([], rules) == [] + user_rules, rest_rules = split_rules_for_user([], rules) + assert user_rules == [] + assert rest_rules == rules + + assert filter_rtbh_rules([], rtbh_rules) == [] + filtered, read_only = split_rtbh_rules_for_user([], rtbh_rules) + assert filtered == [] + assert read_only == rtbh_rules diff --git a/flowapp/validators.py b/flowapp/validators.py index 890bdac..ecb88d1 100644 --- a/flowapp/validators.py +++ b/flowapp/validators.py @@ -12,12 +12,15 @@ def filter_rules_in_network(net_ranges, rules): :param rules: list of rules (ipv4 or ipv6 :return: filtered list of rules """ - return [ - rule - for rule in rules - if network_in_range(rule.source, rule.source_mask, net_ranges) - or network_in_range(rule.dest, rule.dest_mask, net_ranges) - ] + try: + return [ + rule + for rule in rules + if network_in_range(rule.source, rule.source_mask, net_ranges) + or network_in_range(rule.dest, rule.dest_mask, net_ranges) + ] + except AttributeError: + return rules # If rules have no source or dest, return all rules def split_rules_for_user(net_ranges, rules): @@ -30,12 +33,16 @@ def split_rules_for_user(net_ranges, rules): user_rules = [] rest_rules = [] for rule in rules: - if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range( - rule.dest, rule.dest_mask, net_ranges - ): + try: + if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range( + rule.dest, rule.dest_mask, net_ranges + ): + user_rules.append(rule) + else: + rest_rules.append(rule) + except AttributeError: + # If rule has no source or dest, the split is not possible user_rules.append(rule) - else: - rest_rules.append(rule) return user_rules, rest_rules From 675ea51a25e9ebdbd79332ec0f1037f1e246930d Mon Sep 17 00:00:00 2001 From: Jiri Vrany Date: Wed, 20 Aug 2025 15:23:18 +0200 Subject: [PATCH 2/5] update filer rules action to be more robust --- flowapp/flowspec.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/flowapp/flowspec.py b/flowapp/flowspec.py index ae357ca..78cbfb6 100644 --- a/flowapp/flowspec.py +++ b/flowapp/flowspec.py @@ -89,9 +89,12 @@ def filter_rules_action(user_actions, rules): editable = [] viewonly = [] for rule in rules: - if rule.action_id in user_actions: - editable.append(rule) - else: - viewonly.append(rule) + try: + if rule.action_id in user_actions: + editable.append(rule) + else: + viewonly.append(rule) + except AttributeError: + editable.append(rule) # If rule has no action_id, treat it as editable return editable, viewonly From f04a0cd1ab345255478e67341c6c13702c1beb09 Mon Sep 17 00:00:00 2001 From: Jiri Vrany Date: Wed, 20 Aug 2025 15:52:43 +0200 Subject: [PATCH 3/5] bump version --- flowapp/__about__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowapp/__about__.py b/flowapp/__about__.py index e34527b..d65e0a5 100755 --- a/flowapp/__about__.py +++ b/flowapp/__about__.py @@ -1 +1 @@ -__version__ = "1.1.3" +__version__ = "1.1.4" From 1f6b6e3087d4d4eb35b8982ca4b3f4a17031b293 Mon Sep 17 00:00:00 2001 From: Jiri Vrany Date: Wed, 20 Aug 2025 15:53:48 +0200 Subject: [PATCH 4/5] readme update for 1.1.4 --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 2d31192..526a2e1 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,7 @@ The REST API is documented using Swagger (OpenAPI). After installing and running ## Change Log +- 1.1.4 - minor bug fixes and code cleanup - 1.1.3 - introduced configurable footer menu for links in bottom of the default template - 1.1.2 - minor security updates (removed unused JS files), setup.py now reads dependencies from requirements.txt - 1.1.1 - Machine API Key rewrited. From 5ea4f95d473c2729eaf94908d1391ae48b84fdc8 Mon Sep 17 00:00:00 2001 From: Jiri Vrany Date: Thu, 25 Sep 2025 13:09:03 +0200 Subject: [PATCH 5/5] Update flowapp/validators.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- flowapp/validators.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flowapp/validators.py b/flowapp/validators.py index ecb88d1..890fe5d 100644 --- a/flowapp/validators.py +++ b/flowapp/validators.py @@ -12,15 +12,15 @@ def filter_rules_in_network(net_ranges, rules): :param rules: list of rules (ipv4 or ipv6 :return: filtered list of rules """ - try: - return [ - rule - for rule in rules - if network_in_range(rule.source, rule.source_mask, net_ranges) - or network_in_range(rule.dest, rule.dest_mask, net_ranges) - ] - except AttributeError: - return rules # If rules have no source or dest, return all rules + filtered_rules = [] + for rule in rules: + try: + if network_in_range(rule.source, rule.source_mask, net_ranges) or network_in_range(rule.dest, rule.dest_mask, net_ranges): + filtered_rules.append(rule) + except AttributeError: + # If rule has no source or dest, include it (consistent with split_rules_for_user) + filtered_rules.append(rule) + return filtered_rules def split_rules_for_user(net_ranges, rules):