From 22f73a3d7c9c9d517b77e6f3a3a42361ce54f169 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 31 Oct 2025 17:53:57 +0000 Subject: [PATCH 1/4] Initial plan From fc9853e39e7bb7d8e56748071859d1ac9fc3aa1d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 31 Oct 2025 18:05:55 +0000 Subject: [PATCH 2/4] Add 5 new statistical test plugins with comprehensive tests Co-authored-by: EdgeTypE <34396598+EdgeTypE@users.noreply.github.com> --- patternanalyzer/plugins/chi_square.py | 236 ++++++++++++++ patternanalyzer/plugins/gap_test.py | 280 +++++++++++++++++ patternanalyzer/plugins/kolmogorov_smirnov.py | 117 +++++++ patternanalyzer/plugins/permutation_test.py | 245 +++++++++++++++ patternanalyzer/plugins/poker_test.py | 297 ++++++++++++++++++ pyproject.toml | 5 + tests/test_chi_square.py | 174 ++++++++++ tests/test_gap_test.py | 176 +++++++++++ tests/test_kolmogorov_smirnov.py | 169 ++++++++++ tests/test_permutation_test.py | 222 +++++++++++++ tests/test_poker_test.py | 202 ++++++++++++ 11 files changed, 2123 insertions(+) create mode 100644 patternanalyzer/plugins/chi_square.py create mode 100644 patternanalyzer/plugins/gap_test.py create mode 100644 patternanalyzer/plugins/kolmogorov_smirnov.py create mode 100644 patternanalyzer/plugins/permutation_test.py create mode 100644 patternanalyzer/plugins/poker_test.py create mode 100644 tests/test_chi_square.py create mode 100644 tests/test_gap_test.py create mode 100644 tests/test_kolmogorov_smirnov.py create mode 100644 tests/test_permutation_test.py create mode 100644 tests/test_poker_test.py diff --git a/patternanalyzer/plugins/chi_square.py b/patternanalyzer/plugins/chi_square.py new file mode 100644 index 0000000..c8eb340 --- /dev/null +++ b/patternanalyzer/plugins/chi_square.py @@ -0,0 +1,236 @@ +"""Chi-Square test plugin for uniformity of byte distribution. + +The Chi-Square test checks whether the observed byte frequency distribution +significantly deviates from the expected uniform distribution. A low p-value +indicates the data is non-random or biased. +""" + +import math +from collections import Counter +from typing import Dict, Any + +try: + from ..plugin_api import BytesView, TestResult, TestPlugin +except Exception: + from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore + + +class ChiSquareTest(TestPlugin): + """Chi-Square test for byte frequency uniformity.""" + + def __init__(self): + """Initialize the plugin with streaming state.""" + # Streaming accumulators + self._counter = Counter() + self._total_bytes = 0 + + def describe(self) -> str: + """Return plugin description.""" + return "Chi-Square test for uniformity of byte distribution" + + def run(self, data: BytesView, params: dict) -> TestResult: + """Run Chi-Square test in batch mode.""" + data_bytes = data.to_bytes() + n = len(data_bytes) + + if n == 0: + return TestResult( + test_name="chi_square", + passed=True, + p_value=1.0, + category="statistical", + metrics={"total_bytes": 0, "chi_square_statistic": 0.0}, + ) + + # Count frequency of each byte value + counter = Counter(data_bytes) + + # Expected frequency for uniform distribution + expected = n / 256.0 + + # Calculate chi-square statistic + chi_square = sum((count - expected) ** 2 / expected for count in counter.values()) + + # Add missing byte values (count = 0) to chi-square + observed_bytes = len(counter) + missing_bytes = 256 - observed_bytes + if missing_bytes > 0: + chi_square += missing_bytes * (expected ** 2 / expected) + + # Degrees of freedom = 256 - 1 = 255 + df = 255 + + # Calculate p-value using chi-square CDF + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + return TestResult( + test_name="chi_square", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bytes": n, + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "unique_bytes": observed_bytes, + }, + p_values={"chi_square": p_value}, + ) + + def update(self, chunk: bytes, params: dict) -> None: + """Update internal accumulators with a chunk of raw bytes.""" + if not chunk: + return + self._counter.update(chunk) + self._total_bytes += len(chunk) + + def finalize(self, params: dict) -> TestResult: + """Finalize streaming aggregation and return TestResult.""" + n = self._total_bytes + counter = self._counter + + # Reset accumulators for possible reuse + self._counter = Counter() + self._total_bytes = 0 + + if n == 0: + return TestResult( + test_name="chi_square", + passed=True, + p_value=1.0, + category="statistical", + metrics={"total_bytes": 0, "chi_square_statistic": 0.0}, + ) + + # Expected frequency for uniform distribution + expected = n / 256.0 + + # Calculate chi-square statistic + chi_square = sum((count - expected) ** 2 / expected for count in counter.values()) + + # Add missing byte values (count = 0) to chi-square + observed_bytes = len(counter) + missing_bytes = 256 - observed_bytes + if missing_bytes > 0: + chi_square += missing_bytes * (expected ** 2 / expected) + + # Degrees of freedom = 256 - 1 = 255 + df = 255 + + # Calculate p-value using chi-square CDF + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + return TestResult( + test_name="chi_square", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bytes": n, + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "unique_bytes": observed_bytes, + }, + p_values={"chi_square": p_value}, + ) + + def _chi_square_cdf(self, x: float, df: int) -> float: + """Approximate chi-square cumulative distribution function. + + Uses the relationship between chi-square and gamma distribution. + For large df, uses normal approximation. + """ + if x <= 0: + return 0.0 + + if df > 100: + # Wilson-Hilferty transformation for large df + z = ((x / df) ** (1.0/3.0) - (1.0 - 2.0/(9.0*df))) / math.sqrt(2.0/(9.0*df)) + return self._normal_cdf(z) + + # Use incomplete gamma function for small to medium df + return self._gamma_cdf(x / 2.0, df / 2.0) + + def _gamma_cdf(self, x: float, k: float) -> float: + """Approximate gamma CDF using incomplete gamma function.""" + if x <= 0: + return 0.0 + + # Use series expansion for small x*k, continued fraction for large x*k + if x * k < 1.0: + # Series expansion + return self._gamma_series(x, k) + else: + # Continued fraction + return 1.0 - self._gamma_cf(x, k) + + def _gamma_series(self, x: float, k: float) -> float: + """Series expansion for lower incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + + result = 1.0 / k + term = result + + for n in range(1, max_iter): + term *= x / (k + n) + result += term + if abs(term) < epsilon: + break + + return result * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _gamma_cf(self, x: float, k: float) -> float: + """Continued fraction for upper incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + + # Lentz's algorithm + tiny = 1e-30 + b = x + 1.0 - k + c = 1.0 / tiny + d = 1.0 / b + h = d + + for i in range(1, max_iter): + a = -i * (i - k) + b += 2.0 + d = a * d + b + if abs(d) < tiny: + d = tiny + c = b + a / c + if abs(c) < tiny: + c = tiny + d = 1.0 / d + delta = d * c + h *= delta + if abs(delta - 1.0) < epsilon: + break + + return h * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _normal_cdf(self, x: float) -> float: + """Approximation of standard normal cumulative distribution function.""" + # Abramowitz and Stegun approximation + a1 = 0.254829592 + a2 = -0.284496736 + a3 = 1.421413741 + a4 = -1.453152027 + a5 = 1.061405429 + p = 0.3275911 + + sign = 1 if x >= 0 else -1 + x = abs(x) / math.sqrt(2.0) + + t = 1.0 / (1.0 + p * x) + y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x) + + return 0.5 * (1.0 + sign * y) diff --git a/patternanalyzer/plugins/gap_test.py b/patternanalyzer/plugins/gap_test.py new file mode 100644 index 0000000..15b0668 --- /dev/null +++ b/patternanalyzer/plugins/gap_test.py @@ -0,0 +1,280 @@ +"""Gap test plugin for randomness. + +The Gap test examines the distances (gaps) between occurrences of specific +patterns in the bit sequence. For random data, the gap lengths should follow +a geometric distribution. Uses chi-square to test the fit. +""" + +import math +from collections import Counter +from typing import Dict, Any, List + +try: + from ..plugin_api import BytesView, TestResult, TestPlugin +except Exception: + from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore + + +class GapTest(TestPlugin): + """Gap test for randomness of bit sequences.""" + + requires = ['bits'] + + def __init__(self): + """Initialize the plugin.""" + pass + + def describe(self) -> str: + """Return plugin description.""" + return "Gap test analyzing distances between occurrences of bit patterns" + + def run(self, data: BytesView, params: dict) -> TestResult: + """Run Gap test in batch mode.""" + bits = data.bit_view() + n = len(bits) + + if n < 100: + # Need sufficient data for gap analysis + return TestResult( + test_name="gap", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "status": "skipped_insufficient_data", + }, + ) + + # Pattern to search for (default: '1' bit) + pattern_bits = params.get("pattern", [1]) + if isinstance(pattern_bits, int): + pattern_bits = [pattern_bits] + + pattern_len = len(pattern_bits) + + if pattern_len > n // 10: + # Pattern too long for meaningful analysis + return TestResult( + test_name="gap", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "pattern_length": pattern_len, + "status": "skipped_pattern_too_long", + }, + ) + + # Find all occurrences of the pattern + occurrences = [] + for i in range(n - pattern_len + 1): + if bits[i:i+pattern_len] == pattern_bits: + occurrences.append(i) + + if len(occurrences) < 10: + # Need sufficient occurrences for gap analysis + return TestResult( + test_name="gap", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "pattern_occurrences": len(occurrences), + "status": "skipped_insufficient_occurrences", + }, + ) + + # Calculate gaps between consecutive occurrences + gaps = [] + for i in range(len(occurrences) - 1): + gap = occurrences[i + 1] - occurrences[i] - pattern_len + if gap >= 0: + gaps.append(gap) + + if len(gaps) < 5: + return TestResult( + test_name="gap", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "gap_count": len(gaps), + "status": "skipped_insufficient_gaps", + }, + ) + + # Define gap categories (bins) + # For geometric distribution with parameter p + # P(gap = k) = (1-p)^k * p + p = len(occurrences) / (n - pattern_len + 1) # Estimate of pattern probability + + # Create bins: [0], [1], [2], ..., [max_gap-1], [max_gap, ∞) + max_individual_gap = 10 + bins = list(range(max_individual_gap + 1)) + [float('inf')] + + # Count observed gaps in each bin + observed_counts = [0] * (len(bins) - 1) + for gap in gaps: + for i in range(len(bins) - 1): + if gap < bins[i + 1]: + observed_counts[i] += 1 + break + + # Calculate expected counts using geometric distribution + total_gaps = len(gaps) + expected_counts = [] + + for i in range(len(bins) - 1): + if bins[i + 1] == float('inf'): + # Last bin: P(gap >= max_individual_gap) + prob = (1 - p) ** max_individual_gap + else: + # P(gap = k) = (1-p)^k * p + prob = ((1 - p) ** bins[i]) * p + expected_counts.append(prob * total_gaps) + + # Merge bins with low expected counts + merged_observed = [] + merged_expected = [] + current_obs = 0 + current_exp = 0.0 + + for obs, exp in zip(observed_counts, expected_counts): + current_obs += obs + current_exp += exp + if current_exp >= 5.0 or exp == expected_counts[-1]: + merged_observed.append(current_obs) + merged_expected.append(current_exp) + current_obs = 0 + current_exp = 0.0 + + if current_obs > 0 or current_exp > 0: + if merged_expected: + merged_observed[-1] += current_obs + merged_expected[-1] += current_exp + else: + merged_observed.append(current_obs) + merged_expected.append(current_exp) + + # Calculate chi-square statistic + chi_square = 0.0 + for obs, exp in zip(merged_observed, merged_expected): + if exp > 0: + chi_square += ((obs - exp) ** 2) / exp + + # Degrees of freedom = number of bins - 1 (minus 1 for estimated parameter) + df = max(1, len(merged_observed) - 2) + + # Calculate p-value + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + return TestResult( + test_name="gap", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bits": n, + "pattern_length": pattern_len, + "pattern_occurrences": len(occurrences), + "gap_count": len(gaps), + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "mean_gap": sum(gaps) / len(gaps) if gaps else 0.0, + "min_gap": min(gaps) if gaps else 0, + "max_gap": max(gaps) if gaps else 0, + }, + p_values={"gap": p_value}, + ) + + def _chi_square_cdf(self, x: float, df: int) -> float: + """Approximate chi-square cumulative distribution function.""" + if x <= 0: + return 0.0 + + if df > 100: + # Wilson-Hilferty transformation for large df + z = ((x / df) ** (1.0/3.0) - (1.0 - 2.0/(9.0*df))) / math.sqrt(2.0/(9.0*df)) + return self._normal_cdf(z) + + # Use incomplete gamma function + return self._gamma_cdf(x / 2.0, df / 2.0) + + def _gamma_cdf(self, x: float, k: float) -> float: + """Approximate gamma CDF.""" + if x <= 0: + return 0.0 + + if x * k < 1.0: + return self._gamma_series(x, k) + else: + return 1.0 - self._gamma_cf(x, k) + + def _gamma_series(self, x: float, k: float) -> float: + """Series expansion for lower incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + + result = 1.0 / k + term = result + + for n in range(1, max_iter): + term *= x / (k + n) + result += term + if abs(term) < epsilon: + break + + return result * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _gamma_cf(self, x: float, k: float) -> float: + """Continued fraction for upper incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + tiny = 1e-30 + + b = x + 1.0 - k + c = 1.0 / tiny + d = 1.0 / b + h = d + + for i in range(1, max_iter): + a = -i * (i - k) + b += 2.0 + d = a * d + b + if abs(d) < tiny: + d = tiny + c = b + a / c + if abs(c) < tiny: + c = tiny + d = 1.0 / d + delta = d * c + h *= delta + if abs(delta - 1.0) < epsilon: + break + + return h * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _normal_cdf(self, x: float) -> float: + """Approximation of standard normal CDF.""" + a1 = 0.254829592 + a2 = -0.284496736 + a3 = 1.421413741 + a4 = -1.453152027 + a5 = 1.061405429 + p = 0.3275911 + + sign = 1 if x >= 0 else -1 + x = abs(x) / math.sqrt(2.0) + + t = 1.0 / (1.0 + p * x) + y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x) + + return 0.5 * (1.0 + sign * y) diff --git a/patternanalyzer/plugins/kolmogorov_smirnov.py b/patternanalyzer/plugins/kolmogorov_smirnov.py new file mode 100644 index 0000000..42e1538 --- /dev/null +++ b/patternanalyzer/plugins/kolmogorov_smirnov.py @@ -0,0 +1,117 @@ +"""Kolmogorov-Smirnov test plugin for uniformity. + +The Kolmogorov-Smirnov (K-S) test checks whether the cumulative distribution +of byte values matches the expected uniform distribution. It's a non-parametric +test that measures the maximum deviation between observed and expected CDFs. +""" + +import math +from typing import Dict, Any + +try: + from ..plugin_api import BytesView, TestResult, TestPlugin +except Exception: + from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore + + +class KolmogorovSmirnovTest(TestPlugin): + """Kolmogorov-Smirnov test for byte value uniformity.""" + + def describe(self) -> str: + """Return plugin description.""" + return "Kolmogorov-Smirnov test for uniformity of byte distribution" + + def run(self, data: BytesView, params: dict) -> TestResult: + """Run K-S test in batch mode.""" + data_bytes = data.to_bytes() + n = len(data_bytes) + + if n == 0: + return TestResult( + test_name="kolmogorov_smirnov", + passed=True, + p_value=1.0, + category="statistical", + metrics={"total_bytes": 0, "ks_statistic": 0.0}, + ) + + # Sort the byte values + sorted_bytes = sorted(data_bytes) + + # Calculate empirical CDF at each unique value + # Expected CDF for uniform distribution: F(x) = (x + 1) / 256 + max_deviation = 0.0 + + for i, byte_val in enumerate(sorted_bytes): + # Empirical CDF at this point + empirical_cdf = (i + 1) / n + + # Expected CDF for uniform distribution over [0, 255] + expected_cdf = (byte_val + 1) / 256.0 + + # Calculate deviation + deviation = abs(empirical_cdf - expected_cdf) + max_deviation = max(max_deviation, deviation) + + # K-S statistic + ks_statistic = max_deviation + + # Calculate p-value using K-S distribution + # For large n, use asymptotic approximation + p_value = self._ks_pvalue(ks_statistic, n) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + return TestResult( + test_name="kolmogorov_smirnov", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bytes": n, + "ks_statistic": ks_statistic, + "max_deviation": max_deviation, + }, + p_values={"kolmogorov_smirnov": p_value}, + ) + + def _ks_pvalue(self, d: float, n: int) -> float: + """Calculate p-value for K-S statistic using asymptotic formula. + + Uses the Kolmogorov distribution for large n. + """ + if d <= 0: + return 1.0 + + if n < 1: + return 1.0 + + # For small samples, use exact distribution (simplified) + if n < 35: + # Use a conservative approximation + lambda_val = (math.sqrt(n) + 0.12 + 0.11 / math.sqrt(n)) * d + else: + # Asymptotic approximation: λ = sqrt(n) * D + lambda_val = math.sqrt(n) * d + + # Calculate p-value using Kolmogorov distribution + # P(D > d) ≈ 2 * sum_{k=1}^∞ (-1)^(k-1) * exp(-2k²λ²) + # We compute a few terms for practical convergence + p_value = 0.0 + max_terms = 100 + + for k in range(1, max_terms + 1): + term = (-1) ** (k - 1) * math.exp(-2 * k * k * lambda_val * lambda_val) + p_value += term + # Stop if terms become negligible + if abs(term) < 1e-10: + break + + p_value *= 2.0 + + # Ensure p-value is in valid range + p_value = max(0.0, min(1.0, p_value)) + + return p_value diff --git a/patternanalyzer/plugins/permutation_test.py b/patternanalyzer/plugins/permutation_test.py new file mode 100644 index 0000000..305e4b8 --- /dev/null +++ b/patternanalyzer/plugins/permutation_test.py @@ -0,0 +1,245 @@ +"""Permutation test plugin for randomness. + +The Permutation test divides the bit sequence into non-overlapping blocks +and examines the different orderings (permutations) of values within each block. +For truly random data, all possible permutations should occur with roughly equal probability. +""" + +import math +from collections import Counter +from typing import Dict, Any + +try: + from ..plugin_api import BytesView, TestResult, TestPlugin +except Exception: + from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore + + +class PermutationTest(TestPlugin): + """Permutation test for randomness of byte sequences.""" + + def __init__(self): + """Initialize the plugin.""" + pass + + def describe(self) -> str: + """Return plugin description.""" + return "Permutation test analyzing ordering patterns in byte blocks" + + def run(self, data: BytesView, params: dict) -> TestResult: + """Run Permutation test in batch mode.""" + data_bytes = data.to_bytes() + n = len(data_bytes) + + # Block size (number of bytes per block) + block_size = params.get("block_size", 3) + + if block_size < 2 or block_size > 5: + # Practical limits: 2! = 2 to 5! = 120 permutations + return TestResult( + test_name="permutation", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bytes": n, + "block_size": block_size, + "status": "skipped_invalid_block_size", + }, + ) + + # Number of complete blocks + num_blocks = n // block_size + + if num_blocks < 20: + # Need sufficient blocks for meaningful analysis + return TestResult( + test_name="permutation", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bytes": n, + "block_size": block_size, + "num_blocks": num_blocks, + "status": "skipped_insufficient_blocks", + }, + ) + + # Count permutation patterns + permutation_counts = Counter() + + for i in range(num_blocks): + start = i * block_size + block = data_bytes[start:start + block_size] + + # Convert block to permutation pattern (rank ordering) + perm_pattern = self._to_permutation_pattern(block) + permutation_counts[perm_pattern] += 1 + + # Calculate expected number of each permutation + num_permutations = math.factorial(block_size) + expected_count = num_blocks / num_permutations + + # Calculate chi-square statistic + chi_square = 0.0 + for perm_id in range(num_permutations): + observed = permutation_counts.get(perm_id, 0) + chi_square += ((observed - expected_count) ** 2) / expected_count + + # Degrees of freedom = k! - 1 + df = num_permutations - 1 + + # Calculate p-value + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + # Calculate additional metrics + unique_permutations = len(permutation_counts) + + return TestResult( + test_name="permutation", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bytes": n, + "block_size": block_size, + "num_blocks": num_blocks, + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "unique_permutations": unique_permutations, + "possible_permutations": num_permutations, + }, + p_values={"permutation": p_value}, + ) + + def _to_permutation_pattern(self, block: bytes) -> int: + """Convert a byte block to its permutation pattern ID. + + Maps the relative ordering of values to a unique permutation ID. + For example, [5, 2, 8] -> [1, 0, 2] (ranks) -> permutation ID + """ + # Create list of (value, original_index) pairs + indexed = [(val, idx) for idx, val in enumerate(block)] + + # Sort by value, preserving original indices for ties + indexed.sort(key=lambda x: (x[0], x[1])) + + # Create rank array (which position each element goes to) + ranks = [0] * len(block) + for rank, (val, orig_idx) in enumerate(indexed): + ranks[orig_idx] = rank + + # Convert rank permutation to a unique ID using factorial number system + return self._permutation_to_id(ranks) + + def _permutation_to_id(self, perm: list) -> int: + """Convert a permutation to a unique integer ID using Lehmer code. + + Uses the factorial number system (also called Lehmer code). + """ + n = len(perm) + perm_id = 0 + + # Create a working copy + available = list(range(n)) + + for i in range(n): + # Find position of perm[i] in available + pos = available.index(perm[i]) + + # Add contribution to permutation ID + perm_id = perm_id * (n - i) + pos + + # Remove used element + available.pop(pos) + + return perm_id + + def _chi_square_cdf(self, x: float, df: int) -> float: + """Approximate chi-square cumulative distribution function.""" + if x <= 0: + return 0.0 + + if df > 100: + # Wilson-Hilferty transformation for large df + z = ((x / df) ** (1.0/3.0) - (1.0 - 2.0/(9.0*df))) / math.sqrt(2.0/(9.0*df)) + return self._normal_cdf(z) + + # Use incomplete gamma function + return self._gamma_cdf(x / 2.0, df / 2.0) + + def _gamma_cdf(self, x: float, k: float) -> float: + """Approximate gamma CDF.""" + if x <= 0: + return 0.0 + + if x * k < 1.0: + return self._gamma_series(x, k) + else: + return 1.0 - self._gamma_cf(x, k) + + def _gamma_series(self, x: float, k: float) -> float: + """Series expansion for lower incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + + result = 1.0 / k + term = result + + for n in range(1, max_iter): + term *= x / (k + n) + result += term + if abs(term) < epsilon: + break + + return result * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _gamma_cf(self, x: float, k: float) -> float: + """Continued fraction for upper incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + tiny = 1e-30 + + b = x + 1.0 - k + c = 1.0 / tiny + d = 1.0 / b + h = d + + for i in range(1, max_iter): + a = -i * (i - k) + b += 2.0 + d = a * d + b + if abs(d) < tiny: + d = tiny + c = b + a / c + if abs(c) < tiny: + c = tiny + d = 1.0 / d + delta = d * c + h *= delta + if abs(delta - 1.0) < epsilon: + break + + return h * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _normal_cdf(self, x: float) -> float: + """Approximation of standard normal CDF.""" + a1 = 0.254829592 + a2 = -0.284496736 + a3 = 1.421413741 + a4 = -1.453152027 + a5 = 1.061405429 + p = 0.3275911 + + sign = 1 if x >= 0 else -1 + x = abs(x) / math.sqrt(2.0) + + t = 1.0 / (1.0 + p * x) + y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x) + + return 0.5 * (1.0 + sign * y) diff --git a/patternanalyzer/plugins/poker_test.py b/patternanalyzer/plugins/poker_test.py new file mode 100644 index 0000000..acf4ff0 --- /dev/null +++ b/patternanalyzer/plugins/poker_test.py @@ -0,0 +1,297 @@ +"""Poker test plugin for randomness. + +The Poker test divides the bit sequence into fixed-size segments (hands) +and examines the distribution of different patterns within each hand. +It uses chi-square to compare observed pattern frequencies with expected values. +""" + +import math +from collections import Counter +from typing import Dict, Any + +try: + from ..plugin_api import BytesView, TestResult, TestPlugin +except Exception: + from patternanalyzer.plugin_api import BytesView, TestResult, TestPlugin # type: ignore + + +class PokerTest(TestPlugin): + """Poker test for randomness of bit sequences.""" + + requires = ['bits'] + + def __init__(self): + """Initialize the plugin with streaming state.""" + # Streaming accumulators + self._pattern_counts = Counter() + self._total_hands = 0 + self._hand_size = 4 # Default hand size + + def describe(self) -> str: + """Return plugin description.""" + return "Poker test analyzing pattern distribution in fixed-size bit segments" + + def run(self, data: BytesView, params: dict) -> TestResult: + """Run Poker test in batch mode.""" + bits = data.bit_view() + n = len(bits) + + # Hand size (m bits per hand) + hand_size = params.get("hand_size", 4) + + if hand_size < 2 or hand_size > 8: + # Practical limits for hand size + return TestResult( + test_name="poker", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "hand_size": hand_size, + "status": "skipped_invalid_hand_size", + }, + ) + + # Number of complete hands + num_hands = n // hand_size + + if num_hands < 50: + # Need sufficient hands for meaningful analysis + return TestResult( + test_name="poker", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "total_bits": n, + "hand_size": hand_size, + "num_hands": num_hands, + "status": "skipped_insufficient_hands", + }, + ) + + # Count pattern frequencies + pattern_counts = Counter() + for i in range(num_hands): + start = i * hand_size + hand = tuple(bits[start:start + hand_size]) + pattern_counts[hand] += 1 + + # Calculate observed frequencies + num_patterns = 2 ** hand_size + expected_count = num_hands / num_patterns + + # Calculate chi-square statistic + chi_square = 0.0 + for pattern in range(num_patterns): + # Convert pattern number to tuple of bits + pattern_tuple = tuple((pattern >> i) & 1 for i in range(hand_size - 1, -1, -1)) + observed = pattern_counts.get(pattern_tuple, 0) + chi_square += ((observed - expected_count) ** 2) / expected_count + + # Degrees of freedom = 2^m - 1 + df = num_patterns - 1 + + # Calculate p-value + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Ensure p_value is in valid range [0, 1] + p_value = max(0.0, min(1.0, p_value)) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + # Calculate additional metrics + unique_patterns = len(pattern_counts) + + return TestResult( + test_name="poker", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "total_bits": n, + "hand_size": hand_size, + "num_hands": num_hands, + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "unique_patterns": unique_patterns, + "possible_patterns": num_patterns, + }, + p_values={"poker": p_value}, + ) + + def update(self, chunk: bytes, params: dict) -> None: + """Update internal accumulators with a chunk of raw bytes.""" + if not chunk: + return + + from ..plugin_api import BytesView + bv = BytesView(chunk) + bits = bv.bit_view() + + # Get hand size from params (or use instance default) + hand_size = params.get("hand_size", self._hand_size) + + # Process complete hands only + num_hands = len(bits) // hand_size + + for i in range(num_hands): + start = i * hand_size + hand = tuple(bits[start:start + hand_size]) + self._pattern_counts[hand] += 1 + + self._total_hands += num_hands + self._hand_size = hand_size + + def finalize(self, params: dict) -> TestResult: + """Finalize streaming aggregation and return TestResult.""" + num_hands = self._total_hands + hand_size = params.get("hand_size", self._hand_size) + pattern_counts = self._pattern_counts + + # Reset accumulators for possible reuse + self._pattern_counts = Counter() + self._total_hands = 0 + + if num_hands < 50: + return TestResult( + test_name="poker", + passed=True, + p_value=1.0, + category="statistical", + metrics={ + "hand_size": hand_size, + "num_hands": num_hands, + "status": "skipped_insufficient_hands", + }, + ) + + # Calculate observed frequencies + num_patterns = 2 ** hand_size + expected_count = num_hands / num_patterns + + # Calculate chi-square statistic + chi_square = 0.0 + for pattern in range(num_patterns): + # Convert pattern number to tuple of bits + pattern_tuple = tuple((pattern >> i) & 1 for i in range(hand_size - 1, -1, -1)) + observed = pattern_counts.get(pattern_tuple, 0) + chi_square += ((observed - expected_count) ** 2) / expected_count + + # Degrees of freedom = 2^m - 1 + df = num_patterns - 1 + + # Calculate p-value + p_value = 1.0 - self._chi_square_cdf(chi_square, df) + + # Ensure p_value is in valid range [0, 1] + p_value = max(0.0, min(1.0, p_value)) + + # Determine if test passed + alpha = float(params.get("alpha", 0.01)) + passed = p_value > alpha + + unique_patterns = len(pattern_counts) + + return TestResult( + test_name="poker", + passed=passed, + p_value=p_value, + category="statistical", + metrics={ + "hand_size": hand_size, + "num_hands": num_hands, + "chi_square_statistic": chi_square, + "degrees_of_freedom": df, + "unique_patterns": unique_patterns, + "possible_patterns": num_patterns, + }, + p_values={"poker": p_value}, + ) + + def _chi_square_cdf(self, x: float, df: int) -> float: + """Approximate chi-square cumulative distribution function.""" + if x <= 0: + return 0.0 + + if df > 100: + # Wilson-Hilferty transformation for large df + z = ((x / df) ** (1.0/3.0) - (1.0 - 2.0/(9.0*df))) / math.sqrt(2.0/(9.0*df)) + return self._normal_cdf(z) + + # Use incomplete gamma function + return self._gamma_cdf(x / 2.0, df / 2.0) + + def _gamma_cdf(self, x: float, k: float) -> float: + """Approximate gamma CDF.""" + if x <= 0: + return 0.0 + + if x * k < 1.0: + return self._gamma_series(x, k) + else: + return 1.0 - self._gamma_cf(x, k) + + def _gamma_series(self, x: float, k: float) -> float: + """Series expansion for lower incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + + result = 1.0 / k + term = result + + for n in range(1, max_iter): + term *= x / (k + n) + result += term + if abs(term) < epsilon: + break + + return result * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _gamma_cf(self, x: float, k: float) -> float: + """Continued fraction for upper incomplete gamma.""" + max_iter = 1000 + epsilon = 1e-10 + tiny = 1e-30 + + b = x + 1.0 - k + c = 1.0 / tiny + d = 1.0 / b + h = d + + for i in range(1, max_iter): + a = -i * (i - k) + b += 2.0 + d = a * d + b + if abs(d) < tiny: + d = tiny + c = b + a / c + if abs(c) < tiny: + c = tiny + d = 1.0 / d + delta = d * c + h *= delta + if abs(delta - 1.0) < epsilon: + break + + return h * math.exp(-x + k * math.log(x) - math.lgamma(k)) + + def _normal_cdf(self, x: float) -> float: + """Approximation of standard normal CDF.""" + a1 = 0.254829592 + a2 = -0.284496736 + a3 = 1.421413741 + a4 = -1.453152027 + a5 = 1.061405429 + p = 0.3275911 + + sign = 1 if x >= 0 else -1 + x = abs(x) / math.sqrt(2.0) + + t = 1.0 / (1.0 + p * x) + y = 1.0 - (((((a5 * t + a4) * t) + a3) * t + a2) * t + a1) * t * math.exp(-x * x) + + return 0.5 * (1.0 + sign * y) diff --git a/pyproject.toml b/pyproject.toml index 2c70aac..f2ef38b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -100,6 +100,11 @@ pdf_structure = "patternanalyzer.plugins.pdf_structure:PDFStructure" ecb_detector = "patternanalyzer.plugins.ecb_detector:ECBDetector" frequency_pattern = "patternanalyzer.plugins.frequency_pattern:FrequencyPattern" known_constants_search = "patternanalyzer.plugins.known_constants_search:KnownConstantsSearch" +chi_square = "patternanalyzer.plugins.chi_square:ChiSquareTest" +kolmogorov_smirnov = "patternanalyzer.plugins.kolmogorov_smirnov:KolmogorovSmirnovTest" +gap = "patternanalyzer.plugins.gap_test:GapTest" +poker = "patternanalyzer.plugins.poker_test:PokerTest" +permutation = "patternanalyzer.plugins.permutation_test:PermutationTest" [project.urls] Homepage = "https://github.com/EdgeTypE/pattern-analyzer" diff --git a/tests/test_chi_square.py b/tests/test_chi_square.py new file mode 100644 index 0000000..b22db64 --- /dev/null +++ b/tests/test_chi_square.py @@ -0,0 +1,174 @@ +"""Tests for Chi-Square test plugin.""" + +import pytest +from patternanalyzer.plugins.chi_square import ChiSquareTest +from patternanalyzer.plugin_api import BytesView, TestResult + + +class TestChiSquareTest: + """Test cases for ChiSquareTest.""" + + def setup_method(self): + """Setup test fixtures.""" + self.plugin = ChiSquareTest() + + def test_describe(self): + """Test plugin description.""" + desc = self.plugin.describe() + assert isinstance(desc, str) + assert len(desc) > 0 + assert "chi" in desc.lower() or "square" in desc.lower() + + def test_empty_data(self): + """Test with empty data.""" + data = BytesView(b'') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is True + assert result.p_value == 1.0 + assert result.metrics["total_bytes"] == 0 + + def test_uniform_distribution(self): + """Test with uniformly distributed data.""" + # Create data with each byte value appearing roughly equally + data_bytes = bytearray() + for i in range(256): + data_bytes.extend([i] * 100) # Each byte value appears 100 times + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is True # Should pass for uniform distribution + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value > 0.1 # Should have high p-value for uniform data + assert result.metrics["total_bytes"] == 25600 + assert result.metrics["unique_bytes"] == 256 + + def test_biased_distribution(self): + """Test with highly biased data (all same byte).""" + # Create data with all zeros + data = BytesView(b'\x00' * 1000) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is False # Should fail for biased data + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value < 0.01 # Should have very low p-value + assert result.metrics["total_bytes"] == 1000 + assert result.metrics["unique_bytes"] == 1 + + def test_moderately_biased_distribution(self): + """Test with moderately biased data.""" + # Create data with some bytes appearing more frequently + data_bytes = bytearray() + data_bytes.extend([0] * 500) # Byte 0 appears 500 times + data_bytes.extend([1] * 300) # Byte 1 appears 300 times + for i in range(2, 102): + data_bytes.extend([i] * 2) # Other bytes appear less frequently + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 1000 + assert result.metrics["unique_bytes"] == 102 + + def test_small_sample(self): + """Test with small data sample.""" + data = BytesView(b'\x00\x01\x02\x03\x04') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 5 + + def test_streaming_matches_batch(self): + """Test that streaming mode produces same result as batch mode.""" + # Create test data + data_bytes = bytearray() + for i in range(256): + data_bytes.extend([i] * 50) + + # Batch mode + batch_plugin = ChiSquareTest() + data = BytesView(bytes(data_bytes)) + batch_result = batch_plugin.run(data, {}) + + # Streaming mode + stream_plugin = ChiSquareTest() + chunk_size = 1000 + for i in range(0, len(data_bytes), chunk_size): + chunk = bytes(data_bytes[i:i + chunk_size]) + stream_plugin.update(chunk, {}) + stream_result = stream_plugin.finalize({}) + + # Results should be identical + assert batch_result.passed == stream_result.passed + assert abs(batch_result.p_value - stream_result.p_value) < 1e-10 + assert batch_result.metrics["total_bytes"] == stream_result.metrics["total_bytes"] + assert abs(batch_result.metrics["chi_square_statistic"] - + stream_result.metrics["chi_square_statistic"]) < 1e-10 + + def test_custom_alpha(self): + """Test with custom alpha parameter.""" + # Create biased data + data = BytesView(b'\xFF' * 1000) + params = {"alpha": 0.05} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is False + assert result.p_value < 0.05 + + def test_p_value_range(self): + """Test that p_value is always in valid range.""" + test_cases = [ + b'\x00' * 500, + b'\xFF' * 500, + bytes(range(256)) * 4, + bytes([i % 256 for i in range(1000)]), + ] + + for test_data in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {}) + assert 0.0 <= result.p_value <= 1.0, f"Invalid p_value: {result.p_value}" + + def test_chi_square_statistic_positive(self): + """Test that chi-square statistic is always non-negative.""" + test_cases = [ + b'\x00' * 100, + bytes(range(256)), + bytes([i % 10 for i in range(1000)]), + ] + + for test_data in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {}) + assert result.metrics["chi_square_statistic"] >= 0.0 + + def test_degrees_of_freedom(self): + """Test that degrees of freedom is always 255.""" + data = BytesView(bytes(range(256)) * 10) + result = self.plugin.run(data, {}) + assert result.metrics["degrees_of_freedom"] == 255 diff --git a/tests/test_gap_test.py b/tests/test_gap_test.py new file mode 100644 index 0000000..7c54941 --- /dev/null +++ b/tests/test_gap_test.py @@ -0,0 +1,176 @@ +"""Tests for Gap test plugin.""" + +import pytest +from patternanalyzer.plugins.gap_test import GapTest +from patternanalyzer.plugin_api import BytesView, TestResult + + +class TestGapTest: + """Test cases for GapTest.""" + + def setup_method(self): + """Setup test fixtures.""" + self.plugin = GapTest() + + def test_describe(self): + """Test plugin description.""" + desc = self.plugin.describe() + assert isinstance(desc, str) + assert len(desc) > 0 + assert "gap" in desc.lower() + + def test_insufficient_data(self): + """Test with insufficient data.""" + data = BytesView(b'\x00\x01\x02') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert result.passed is True + assert result.p_value == 1.0 + assert result.metrics["status"] == "skipped_insufficient_data" + + def test_random_like_data(self): + """Test with random-like data.""" + # Create data with good mix of bits + data_bytes = bytearray() + for i in range(200): + data_bytes.append(0xAA if i % 2 == 0 else 0x55) # Alternating patterns + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bits"] == 1600 + + def test_pattern_with_regular_gaps(self): + """Test with pattern having regular gaps.""" + # Create pattern: 1 followed by several 0s, repeated + bits = [] + for _ in range(50): + bits.extend([1, 0, 0, 0, 0, 0, 0, 0]) # Pattern with regular gaps + + # Convert bits to bytes + data_bytes = bytearray() + for i in range(0, len(bits), 8): + byte_val = 0 + for j in range(8): + if i + j < len(bits): + byte_val |= (bits[i + j] << (7 - j)) + data_bytes.append(byte_val) + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_all_zeros(self): + """Test with all zeros (no pattern occurrences).""" + data = BytesView(b'\x00' * 100) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + # Should skip due to insufficient occurrences of pattern + assert result.metrics.get("status") in [ + "skipped_insufficient_occurrences", + "skipped_insufficient_gaps", + None + ] or result.passed is not None + + def test_all_ones(self): + """Test with all ones (frequent pattern occurrences).""" + data = BytesView(b'\xFF' * 100) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_custom_pattern(self): + """Test with custom bit pattern.""" + # Create test data with specific pattern + data = BytesView(b'\xAA' * 100) # 10101010 pattern + params = {"pattern": [1, 0]} # Look for "10" pattern + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_p_value_range(self): + """Test that p_value is always in valid range.""" + test_cases = [ + b'\xAA' * 100, + b'\x55' * 100, + b'\xCC' * 100, + b'\x33' * 100, + ] + + for test_data in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {}) + assert 0.0 <= result.p_value <= 1.0, f"Invalid p_value: {result.p_value}" + + def test_gap_metrics(self): + """Test that gap metrics are calculated.""" + data = BytesView(b'\xF0' * 100) # 11110000 pattern + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + + # Check that metrics are present when test runs + if result.metrics.get("status") is None: + assert "gap_count" in result.metrics + assert "pattern_occurrences" in result.metrics + assert result.metrics["gap_count"] >= 0 + + def test_large_data(self): + """Test with larger data set.""" + # Create larger random-like data + data_bytes = bytearray() + for i in range(500): + data_bytes.append((i * 137) % 256) # Pseudo-random pattern + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_custom_alpha(self): + """Test with custom alpha parameter.""" + data = BytesView(b'\xAA' * 200) + params = {"alpha": 0.05} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_requires_bits(self): + """Test that plugin requires bits.""" + assert hasattr(self.plugin, 'requires') + assert 'bits' in self.plugin.requires diff --git a/tests/test_kolmogorov_smirnov.py b/tests/test_kolmogorov_smirnov.py new file mode 100644 index 0000000..b2d5bd0 --- /dev/null +++ b/tests/test_kolmogorov_smirnov.py @@ -0,0 +1,169 @@ +"""Tests for Kolmogorov-Smirnov test plugin.""" + +import pytest +from patternanalyzer.plugins.kolmogorov_smirnov import KolmogorovSmirnovTest +from patternanalyzer.plugin_api import BytesView, TestResult + + +class TestKolmogorovSmirnovTest: + """Test cases for KolmogorovSmirnovTest.""" + + def setup_method(self): + """Setup test fixtures.""" + self.plugin = KolmogorovSmirnovTest() + + def test_describe(self): + """Test plugin description.""" + desc = self.plugin.describe() + assert isinstance(desc, str) + assert len(desc) > 0 + + def test_empty_data(self): + """Test with empty data.""" + data = BytesView(b'') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert result.passed is True + assert result.p_value == 1.0 + assert result.metrics["total_bytes"] == 0 + + def test_uniform_distribution(self): + """Test with uniformly distributed data.""" + # Create data with each byte value appearing equally + data_bytes = bytearray() + for i in range(256): + data_bytes.extend([i] * 100) + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert result.passed is True + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value > 0.1 # Should have high p-value for uniform data + assert result.metrics["total_bytes"] == 25600 + + def test_biased_distribution(self): + """Test with highly biased data.""" + # Create data with all same byte value + data = BytesView(b'\x00' * 1000) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert result.passed is False + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value < 0.01 + assert result.metrics["total_bytes"] == 1000 + + def test_skewed_distribution(self): + """Test with skewed distribution.""" + # Create data skewed towards lower byte values + data_bytes = bytearray() + for i in range(128): + data_bytes.extend([i] * 10) + for i in range(128, 256): + data_bytes.extend([i] * 2) + + data = BytesView(bytes(data_bytes)) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 1536 + + def test_small_sample(self): + """Test with small data sample.""" + data = BytesView(b'\x00\x01\x02\x03\x04') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 5 + + def test_sequential_data(self): + """Test with sequential byte values.""" + data = BytesView(bytes(range(256))) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 256 + + def test_custom_alpha(self): + """Test with custom alpha parameter.""" + data = BytesView(b'\xFF' * 1000) + params = {"alpha": 0.05} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert result.passed is False + assert result.p_value < 0.05 + + def test_ks_statistic_range(self): + """Test that K-S statistic is in valid range [0, 1].""" + test_cases = [ + b'\x00' * 500, + b'\xFF' * 500, + bytes(range(256)) * 4, + bytes([i % 256 for i in range(1000)]), + ] + + for test_data in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {}) + assert 0.0 <= result.metrics["ks_statistic"] <= 1.0 + + def test_p_value_range(self): + """Test that p_value is always in valid range.""" + test_cases = [ + b'\x00' * 500, + b'\xFF' * 500, + bytes(range(256)) * 4, + bytes([i % 256 for i in range(1000)]), + ] + + for test_data in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {}) + assert 0.0 <= result.p_value <= 1.0, f"Invalid p_value: {result.p_value}" + + def test_max_deviation_equals_statistic(self): + """Test that max_deviation equals ks_statistic.""" + data = BytesView(bytes(range(256)) * 10) + result = self.plugin.run(data, {}) + + assert result.metrics["max_deviation"] == result.metrics["ks_statistic"] + + def test_repeated_values(self): + """Test with repeated byte values.""" + # Create data with multiple occurrences of same values + data = BytesView(b'\x10\x20\x30' * 100) + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "kolmogorov_smirnov" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 300 diff --git a/tests/test_permutation_test.py b/tests/test_permutation_test.py new file mode 100644 index 0000000..7f5e82d --- /dev/null +++ b/tests/test_permutation_test.py @@ -0,0 +1,222 @@ +"""Tests for Permutation test plugin.""" + +import pytest +from patternanalyzer.plugins.permutation_test import PermutationTest +from patternanalyzer.plugin_api import BytesView, TestResult + + +class TestPermutationTest: + """Test cases for PermutationTest.""" + + def setup_method(self): + """Setup test fixtures.""" + self.plugin = PermutationTest() + + def test_describe(self): + """Test plugin description.""" + desc = self.plugin.describe() + assert isinstance(desc, str) + assert len(desc) > 0 + assert "permutation" in desc.lower() + + def test_insufficient_blocks(self): + """Test with insufficient data.""" + data = BytesView(b'\x00\x01\x02\x03\x04') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert result.passed is True + assert result.p_value == 1.0 + assert result.metrics["status"] == "skipped_insufficient_blocks" + + def test_uniform_permutation_distribution(self): + """Test with uniformly distributed permutations.""" + # Create data with good variety + data_bytes = bytearray() + for i in range(200): + data_bytes.append((i * 137) % 256) + + data = BytesView(bytes(data_bytes)) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["block_size"] == 3 + + def test_biased_permutation_distribution(self): + """Test with biased permutation distribution.""" + # All zeros should have only one permutation + data = BytesView(b'\x00' * 100) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert result.passed is False # Should fail for biased data + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value < 0.01 + + def test_sequential_data(self): + """Test with sequential byte values.""" + # Sequential data has consistent permutation pattern + data = BytesView(bytes(range(100))) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert 0.0 <= result.p_value <= 1.0 + + def test_block_size_2(self): + """Test with block size of 2.""" + data = BytesView(bytes(range(256))) + params = {"block_size": 2} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["block_size"] == 2 + assert result.metrics["possible_permutations"] == 2 # 2! + + def test_block_size_4(self): + """Test with block size of 4.""" + data = BytesView(bytes(range(256))) + params = {"block_size": 4} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["block_size"] == 4 + assert result.metrics["possible_permutations"] == 24 # 4! + + def test_invalid_block_size_too_small(self): + """Test with invalid block size (too small).""" + data = BytesView(bytes(range(100))) + params = {"block_size": 1} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert result.metrics["status"] == "skipped_invalid_block_size" + + def test_invalid_block_size_too_large(self): + """Test with invalid block size (too large).""" + data = BytesView(bytes(range(100))) + params = {"block_size": 6} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert result.metrics["status"] == "skipped_invalid_block_size" + + def test_repeated_pattern(self): + """Test with repeated byte pattern.""" + data = BytesView(b'\x10\x20\x30' * 50) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + # Should fail as all blocks have same permutation + assert result.passed is False + assert 0.0 <= result.p_value <= 1.0 + + def test_reverse_pattern(self): + """Test with reverse ordered pattern.""" + data = BytesView(b'\x30\x20\x10' * 50) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + # Should fail as all blocks have same permutation + assert result.passed is False + assert 0.0 <= result.p_value <= 1.0 + + def test_p_value_range(self): + """Test that p_value is always in valid range.""" + test_cases = [ + (b'\x00' * 100, 3), + (bytes(range(100)), 3), + (b'\x10\x20\x30' * 30, 3), + (bytes([i % 50 for i in range(200)]), 4), + ] + + for test_data, block_size in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {"block_size": block_size}) + assert 0.0 <= result.p_value <= 1.0, f"Invalid p_value: {result.p_value}" + + def test_chi_square_positive(self): + """Test that chi-square statistic is always non-negative.""" + data = BytesView(bytes(range(100))) + params = {"block_size": 3} + result = self.plugin.run(data, params) + + if "chi_square_statistic" in result.metrics: + assert result.metrics["chi_square_statistic"] >= 0.0 + + def test_custom_alpha(self): + """Test with custom alpha parameter.""" + data = BytesView(b'\x00' * 100) + params = {"block_size": 3, "alpha": 0.05} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert result.passed is False + assert result.p_value < 0.05 + + def test_unique_permutations_metric(self): + """Test that unique permutations metric is calculated.""" + data = BytesView(bytes(range(100))) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert "unique_permutations" in result.metrics + assert "possible_permutations" in result.metrics + assert result.metrics["unique_permutations"] <= result.metrics["possible_permutations"] + + def test_with_ties(self): + """Test with byte blocks containing tied values.""" + # Blocks with repeated values + data = BytesView(b'\x10\x10\x20' * 50) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "permutation" + assert 0.0 <= result.p_value <= 1.0 + + def test_permutation_to_id(self): + """Test the permutation ID conversion.""" + # Test internal method + perm_id_1 = self.plugin._permutation_to_id([0, 1, 2]) + perm_id_2 = self.plugin._permutation_to_id([2, 1, 0]) + + # Different permutations should have different IDs + assert perm_id_1 != perm_id_2 + + # Same permutation should have same ID + perm_id_3 = self.plugin._permutation_to_id([0, 1, 2]) + assert perm_id_1 == perm_id_3 diff --git a/tests/test_poker_test.py b/tests/test_poker_test.py new file mode 100644 index 0000000..61068a0 --- /dev/null +++ b/tests/test_poker_test.py @@ -0,0 +1,202 @@ +"""Tests for Poker test plugin.""" + +import pytest +from patternanalyzer.plugins.poker_test import PokerTest +from patternanalyzer.plugin_api import BytesView, TestResult + + +class TestPokerTest: + """Test cases for PokerTest.""" + + def setup_method(self): + """Setup test fixtures.""" + self.plugin = PokerTest() + + def test_describe(self): + """Test plugin description.""" + desc = self.plugin.describe() + assert isinstance(desc, str) + assert len(desc) > 0 + assert "poker" in desc.lower() + + def test_insufficient_hands(self): + """Test with insufficient data.""" + data = BytesView(b'\x00\x01\x02') + params = {} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert result.passed is True + assert result.p_value == 1.0 + assert result.metrics["status"] == "skipped_insufficient_hands" + + def test_uniform_pattern_distribution(self): + """Test with uniformly distributed patterns.""" + # Create data with good mix of different patterns + data_bytes = bytearray() + for i in range(200): + data_bytes.append((i * 137) % 256) # Pseudo-random + + data = BytesView(bytes(data_bytes)) + params = {"hand_size": 4} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["hand_size"] == 4 + + def test_biased_pattern_distribution(self): + """Test with biased pattern distribution.""" + # All zeros should have very biased pattern distribution + data = BytesView(b'\x00' * 100) + params = {"hand_size": 4} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert result.passed is False # Should fail for biased data + assert 0.0 <= result.p_value <= 1.0 + assert result.p_value < 0.01 + + def test_alternating_pattern(self): + """Test with alternating bit pattern.""" + # 0xAA = 10101010 + data = BytesView(b'\xAA' * 100) + params = {"hand_size": 4} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert 0.0 <= result.p_value <= 1.0 + + def test_hand_size_3(self): + """Test with hand size of 3 bits.""" + data = BytesView(b'\x55' * 100) # 01010101 pattern + params = {"hand_size": 3} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["hand_size"] == 3 + assert result.metrics["possible_patterns"] == 8 # 2^3 + + def test_hand_size_5(self): + """Test with hand size of 5 bits.""" + data = BytesView(bytes(range(256)) * 2) + params = {"hand_size": 5} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["hand_size"] == 5 + assert result.metrics["possible_patterns"] == 32 # 2^5 + + def test_invalid_hand_size_too_small(self): + """Test with invalid hand size (too small).""" + data = BytesView(b'\xAA' * 100) + params = {"hand_size": 1} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert result.metrics["status"] == "skipped_invalid_hand_size" + + def test_invalid_hand_size_too_large(self): + """Test with invalid hand size (too large).""" + data = BytesView(b'\xAA' * 100) + params = {"hand_size": 10} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert result.metrics["status"] == "skipped_invalid_hand_size" + + def test_streaming_matches_batch(self): + """Test that streaming mode produces same result as batch mode.""" + # Create test data + data_bytes = bytearray() + for i in range(200): + data_bytes.append((i * 73) % 256) + + params = {"hand_size": 4} + + # Batch mode + batch_plugin = PokerTest() + data = BytesView(bytes(data_bytes)) + batch_result = batch_plugin.run(data, params) + + # Streaming mode + stream_plugin = PokerTest() + chunk_size = 50 + for i in range(0, len(data_bytes), chunk_size): + chunk = bytes(data_bytes[i:i + chunk_size]) + stream_plugin.update(chunk, params) + stream_result = stream_plugin.finalize(params) + + # Results should be very similar (may have minor differences due to chunk boundaries) + assert batch_result.passed == stream_result.passed + assert abs(batch_result.p_value - stream_result.p_value) < 0.1 + + def test_p_value_range(self): + """Test that p_value is always in valid range.""" + test_cases = [ + (b'\x00' * 100, 4), + (b'\xFF' * 100, 4), + (b'\xAA' * 100, 4), + (bytes(range(256)) * 2, 3), + ] + + for test_data, hand_size in test_cases: + data = BytesView(test_data) + result = self.plugin.run(data, {"hand_size": hand_size}) + assert 0.0 <= result.p_value <= 1.0, f"Invalid p_value: {result.p_value}" + + def test_chi_square_positive(self): + """Test that chi-square statistic is always non-negative.""" + data = BytesView(bytes(range(256)) * 2) + params = {"hand_size": 4} + result = self.plugin.run(data, params) + + if "chi_square_statistic" in result.metrics: + assert result.metrics["chi_square_statistic"] >= 0.0 + + def test_custom_alpha(self): + """Test with custom alpha parameter.""" + data = BytesView(b'\x00' * 100) + params = {"hand_size": 4, "alpha": 0.05} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "poker" + assert result.passed is False + assert result.p_value < 0.05 + + def test_requires_bits(self): + """Test that plugin requires bits.""" + assert hasattr(self.plugin, 'requires') + assert 'bits' in self.plugin.requires + + def test_unique_patterns_metric(self): + """Test that unique patterns metric is calculated.""" + data = BytesView(bytes(range(256))) + params = {"hand_size": 4} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert "unique_patterns" in result.metrics + assert "possible_patterns" in result.metrics + assert result.metrics["unique_patterns"] <= result.metrics["possible_patterns"] From 2fa09915d56432d6b2300e160d932da39fa0982a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 31 Oct 2025 18:14:58 +0000 Subject: [PATCH 3/4] Fix code review issues: simplify chi-square calculation and add p-value bounds check Co-authored-by: EdgeTypE <34396598+EdgeTypE@users.noreply.github.com> --- patternanalyzer/plugins/chi_square.py | 4 ++-- patternanalyzer/plugins/permutation_test.py | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/patternanalyzer/plugins/chi_square.py b/patternanalyzer/plugins/chi_square.py index c8eb340..75cc8a7 100644 --- a/patternanalyzer/plugins/chi_square.py +++ b/patternanalyzer/plugins/chi_square.py @@ -55,7 +55,7 @@ def run(self, data: BytesView, params: dict) -> TestResult: observed_bytes = len(counter) missing_bytes = 256 - observed_bytes if missing_bytes > 0: - chi_square += missing_bytes * (expected ** 2 / expected) + chi_square += missing_bytes * expected # Degrees of freedom = 256 - 1 = 255 df = 255 @@ -116,7 +116,7 @@ def finalize(self, params: dict) -> TestResult: observed_bytes = len(counter) missing_bytes = 256 - observed_bytes if missing_bytes > 0: - chi_square += missing_bytes * (expected ** 2 / expected) + chi_square += missing_bytes * expected # Degrees of freedom = 256 - 1 = 255 df = 255 diff --git a/patternanalyzer/plugins/permutation_test.py b/patternanalyzer/plugins/permutation_test.py index 305e4b8..42bb35f 100644 --- a/patternanalyzer/plugins/permutation_test.py +++ b/patternanalyzer/plugins/permutation_test.py @@ -93,6 +93,9 @@ def run(self, data: BytesView, params: dict) -> TestResult: # Calculate p-value p_value = 1.0 - self._chi_square_cdf(chi_square, df) + # Ensure p_value is in valid range [0, 1] + p_value = max(0.0, min(1.0, p_value)) + # Determine if test passed alpha = float(params.get("alpha", 0.01)) passed = p_value > alpha From 81b3d9b703f3edfd3f602a508e09d49db5ff1eb5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 31 Oct 2025 19:10:41 +0000 Subject: [PATCH 4/4] Add 45 comprehensive tests across all plugins (112 total tests) Co-authored-by: EdgeTypE <34396598+EdgeTypE@users.noreply.github.com> --- tests/test_chi_square.py | 118 +++++++++++++++++++++++++++++++ tests/test_gap_test.py | 79 +++++++++++++++++++++ tests/test_kolmogorov_smirnov.py | 88 +++++++++++++++++++++++ tests/test_permutation_test.py | 117 ++++++++++++++++++++++++++++++ tests/test_poker_test.py | 107 ++++++++++++++++++++++++++++ 5 files changed, 509 insertions(+) diff --git a/tests/test_chi_square.py b/tests/test_chi_square.py index b22db64..f709634 100644 --- a/tests/test_chi_square.py +++ b/tests/test_chi_square.py @@ -172,3 +172,121 @@ def test_degrees_of_freedom(self): data = BytesView(bytes(range(256)) * 10) result = self.plugin.run(data, {}) assert result.metrics["degrees_of_freedom"] == 255 + + def test_very_large_data(self): + """Test with very large data sample.""" + # Create 100KB of uniform data + data_bytes = bytearray() + for _ in range(400): + data_bytes.extend(bytes(range(256))) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is True + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 102400 + + def test_single_byte(self): + """Test with single byte of data.""" + data = BytesView(b'\xFF') + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 1 + + def test_two_values_only(self): + """Test with data containing only two different byte values.""" + data = BytesView(b'\x00\xFF' * 500) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert result.passed is False + assert result.metrics["unique_bytes"] == 2 + assert result.metrics["total_bytes"] == 1000 + + def test_streaming_empty_chunks(self): + """Test streaming with some empty chunks.""" + stream_plugin = ChiSquareTest() + + stream_plugin.update(b'', {}) + stream_plugin.update(bytes(range(256)), {}) + stream_plugin.update(b'', {}) + stream_plugin.update(bytes(range(256)), {}) + + result = stream_plugin.finalize({}) + + assert isinstance(result, TestResult) + assert result.metrics["total_bytes"] == 512 + + def test_streaming_single_byte_chunks(self): + """Test streaming with very small chunks.""" + stream_plugin = ChiSquareTest() + + for i in range(256): + for _ in range(10): + stream_plugin.update(bytes([i]), {}) + + result = stream_plugin.finalize({}) + + assert isinstance(result, TestResult) + assert result.passed is True + assert result.metrics["total_bytes"] == 2560 + + def test_multiple_finalize_calls(self): + """Test that multiple finalize calls reset state.""" + stream_plugin = ChiSquareTest() + + # First run + stream_plugin.update(bytes(range(256)), {}) + result1 = stream_plugin.finalize({}) + + # Second run should start fresh + stream_plugin.update(b'\x00' * 1000, {}) + result2 = stream_plugin.finalize({}) + + assert result1.metrics["total_bytes"] == 256 + assert result2.metrics["total_bytes"] == 1000 + assert result1.passed != result2.passed + + def test_category_field(self): + """Test that category is correctly set.""" + data = BytesView(bytes(range(100))) + result = self.plugin.run(data, {}) + assert result.category == "statistical" + + def test_p_values_dict(self): + """Test that p_values dictionary is populated.""" + data = BytesView(bytes(range(256)) * 10) + result = self.plugin.run(data, {}) + assert "chi_square" in result.p_values + assert result.p_values["chi_square"] == result.p_value + + def test_metrics_completeness(self): + """Test that all expected metrics are present.""" + data = BytesView(bytes(range(256)) * 10) + result = self.plugin.run(data, {}) + + expected_metrics = ["total_bytes", "chi_square_statistic", + "degrees_of_freedom", "unique_bytes"] + for metric in expected_metrics: + assert metric in result.metrics + + def test_near_uniform_distribution(self): + """Test with near-uniform but slightly skewed distribution.""" + data_bytes = bytearray() + for i in range(256): + count = 100 + (i % 3) # Slight variation + data_bytes.extend([i] * count) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.test_name == "chi_square" + assert 0.0 <= result.p_value <= 1.0 diff --git a/tests/test_gap_test.py b/tests/test_gap_test.py index 7c54941..227c374 100644 --- a/tests/test_gap_test.py +++ b/tests/test_gap_test.py @@ -174,3 +174,82 @@ def test_requires_bits(self): """Test that plugin requires bits.""" assert hasattr(self.plugin, 'requires') assert 'bits' in self.plugin.requires + + def test_pattern_length_2(self): + """Test with 2-bit pattern.""" + data = BytesView(b'\xAA' * 200) # 10101010 pattern + params = {"pattern": [1, 0, 1]} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + assert 0.0 <= result.p_value <= 1.0 + + def test_very_regular_gaps(self): + """Test with perfectly regular gaps (should fail).""" + # Create pattern where '1' appears every 8 bits + bits = [1 if i % 8 == 0 else 0 for i in range(800)] + data_bytes = bytearray() + for i in range(0, len(bits), 8): + byte_val = 0 + for j in range(8): + if i + j < len(bits): + byte_val |= (bits[i + j] << (7 - j)) + data_bytes.append(byte_val) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + + def test_no_pattern_found(self): + """Test when pattern never occurs.""" + data = BytesView(b'\x00' * 100) # All zeros, looking for '1' + params = {"pattern": [1]} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + # Should skip due to insufficient occurrences + + def test_category_field(self): + """Test that category is correctly set.""" + data = BytesView(b'\xAA' * 200) + result = self.plugin.run(data, {}) + assert result.category == "statistical" + + def test_metrics_for_successful_run(self): + """Test metrics are populated for successful run.""" + data = BytesView(b'\xF0' * 150) + result = self.plugin.run(data, {}) + + if result.metrics.get("status") is None: + assert "gap_count" in result.metrics + assert "pattern_occurrences" in result.metrics + assert "mean_gap" in result.metrics + assert "min_gap" in result.metrics + assert "max_gap" in result.metrics + + def test_pattern_as_list(self): + """Test that pattern parameter accepts list.""" + data = BytesView(b'\xCC' * 200) # 11001100 + params = {"pattern": [1, 1, 0]} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.test_name == "gap" + + def test_very_large_data_gap(self): + """Test gap analysis with large dataset.""" + data_bytes = bytearray() + for i in range(1000): + data_bytes.append((i * 73) % 256) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.metrics["total_bits"] == 8000 diff --git a/tests/test_kolmogorov_smirnov.py b/tests/test_kolmogorov_smirnov.py index b2d5bd0..ac547c3 100644 --- a/tests/test_kolmogorov_smirnov.py +++ b/tests/test_kolmogorov_smirnov.py @@ -167,3 +167,91 @@ def test_repeated_values(self): assert result.test_name == "kolmogorov_smirnov" assert 0.0 <= result.p_value <= 1.0 assert result.metrics["total_bytes"] == 300 + + def test_ascending_sequence(self): + """Test with strictly ascending byte sequence.""" + data = BytesView(bytes(range(256))) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 256 + + def test_descending_sequence(self): + """Test with strictly descending byte sequence.""" + data = BytesView(bytes(range(255, -1, -1))) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert 0.0 <= result.p_value <= 1.0 + assert result.metrics["total_bytes"] == 256 + + def test_very_large_sample(self): + """Test K-S with very large sample.""" + data_bytes = bytearray() + for _ in range(500): + data_bytes.extend(bytes(range(256))) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.metrics["total_bytes"] == 128000 + assert 0.0 <= result.p_value <= 1.0 + + def test_single_value_repeated(self): + """Test with single value repeated many times.""" + data = BytesView(b'\x80' * 1000) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.passed is False + assert result.p_value < 0.01 + + def test_category_and_p_values(self): + """Test that category and p_values are set correctly.""" + data = BytesView(bytes(range(256)) * 5) + result = self.plugin.run(data, {}) + + assert result.category == "statistical" + assert "kolmogorov_smirnov" in result.p_values + assert result.p_values["kolmogorov_smirnov"] == result.p_value + + def test_ks_statistic_zero_for_perfect_uniform(self): + """Test that perfect uniform distribution has very low K-S statistic.""" + data_bytes = bytearray() + for i in range(256): + data_bytes.extend([i] * 100) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {}) + + # For perfect uniform, K-S statistic should be very small + assert result.metrics["ks_statistic"] < 0.01 + + def test_two_byte_values(self): + """Test with only two different byte values.""" + data = BytesView(b'\x00\xFF' * 500) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.passed is False + assert result.metrics["total_bytes"] == 1000 + + def test_metrics_completeness(self): + """Test that all expected metrics are present.""" + data = BytesView(bytes(range(100))) + result = self.plugin.run(data, {}) + + expected_metrics = ["total_bytes", "ks_statistic", "max_deviation"] + for metric in expected_metrics: + assert metric in result.metrics + + def test_lower_half_bytes_only(self): + """Test with only lower half of byte range (0-127).""" + data = BytesView(bytes(range(128)) * 10) + result = self.plugin.run(data, {}) + + assert isinstance(result, TestResult) + assert result.passed is False # Should fail as not uniform over full range + assert 0.0 <= result.p_value <= 1.0 diff --git a/tests/test_permutation_test.py b/tests/test_permutation_test.py index 7f5e82d..5f66d47 100644 --- a/tests/test_permutation_test.py +++ b/tests/test_permutation_test.py @@ -220,3 +220,120 @@ def test_permutation_to_id(self): # Same permutation should have same ID perm_id_3 = self.plugin._permutation_to_id([0, 1, 2]) assert perm_id_1 == perm_id_3 + + def test_block_size_5(self): + """Test with maximum block size.""" + data = BytesView(bytes(range(200))) + params = {"block_size": 5} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.metrics["block_size"] == 5 + assert result.metrics["possible_permutations"] == 120 # 5! + + def test_all_identical_blocks(self): + """Test when all blocks are identical.""" + data = BytesView(b'\x01\x02\x03' * 100) + params = {"block_size": 3} + + result = self.plugin.run(data, params) + + assert result.passed is False + assert result.metrics["unique_permutations"] == 1 + + def test_very_large_permutation_test(self): + """Test with very large dataset.""" + data_bytes = bytearray() + for i in range(2000): + data_bytes.append((i * 137) % 256) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {"block_size": 3}) + + assert isinstance(result, TestResult) + assert result.metrics["total_bytes"] == 2000 + assert 0.0 <= result.p_value <= 1.0 + + def test_category_and_p_values(self): + """Test category and p_values dict.""" + data = BytesView(bytes(range(100))) + result = self.plugin.run(data, {"block_size": 3}) + + assert result.category == "statistical" + assert "permutation" in result.p_values + + def test_metrics_completeness(self): + """Test that all expected metrics are present.""" + data = BytesView(bytes(range(100))) + result = self.plugin.run(data, {"block_size": 3}) + + expected_metrics = ["total_bytes", "block_size", "num_blocks", + "chi_square_statistic", "degrees_of_freedom", + "unique_permutations", "possible_permutations"] + for metric in expected_metrics: + assert metric in result.metrics + + def test_alternating_high_low(self): + """Test with alternating high and low values.""" + data_bytes = bytearray() + for i in range(200): + if i % 3 == 0: + data_bytes.append(0) + elif i % 3 == 1: + data_bytes.append(255) + else: + data_bytes.append(128) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {"block_size": 3}) + + assert isinstance(result, TestResult) + assert 0.0 <= result.p_value <= 1.0 + + def test_to_permutation_pattern_correctness(self): + """Test that permutation pattern conversion is correct.""" + # Test specific known patterns + plugin = PermutationTest() + + # Ascending order + pattern1 = plugin._to_permutation_pattern(bytes([10, 20, 30])) + # Descending order + pattern2 = plugin._to_permutation_pattern(bytes([30, 20, 10])) + + # Should be different + assert pattern1 != pattern2 + + def test_degrees_of_freedom_calculation(self): + """Test that degrees of freedom equals k! - 1.""" + test_cases = [(2, 1), (3, 5), (4, 23), (5, 119)] + + for block_size, expected_df in test_cases: + data = BytesView(bytes(range(200))) + result = self.plugin.run(data, {"block_size": block_size}) + assert result.metrics["degrees_of_freedom"] == expected_df + + def test_with_negative_rank_differences(self): + """Test permutation detection with various orderings.""" + # Mix of ascending and descending triplets + data = BytesView(b'\x01\x02\x03\x03\x02\x01' * 50) + result = self.plugin.run(data, {"block_size": 3}) + + assert isinstance(result, TestResult) + assert result.metrics["unique_permutations"] <= 6 + + def test_chi_square_zero_for_perfect_uniform(self): + """Test that perfectly uniform permutations have low chi-square.""" + # Create data where all 6 permutations appear equally + # This is difficult to construct naturally, so we test the concept + data_bytes = bytearray() + # Create varied patterns + for i in range(100): + a, b, c = (i * 7) % 256, (i * 13) % 256, (i * 19) % 256 + data_bytes.extend([a, b, c]) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {"block_size": 3}) + + assert isinstance(result, TestResult) + assert result.metrics["chi_square_statistic"] >= 0.0 diff --git a/tests/test_poker_test.py b/tests/test_poker_test.py index 61068a0..5ec346f 100644 --- a/tests/test_poker_test.py +++ b/tests/test_poker_test.py @@ -200,3 +200,110 @@ def test_unique_patterns_metric(self): assert "unique_patterns" in result.metrics assert "possible_patterns" in result.metrics assert result.metrics["unique_patterns"] <= result.metrics["possible_patterns"] + + def test_hand_size_2(self): + """Test with minimum hand size.""" + data = BytesView(bytes(range(256)) * 2) + params = {"hand_size": 2} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.metrics["hand_size"] == 2 + assert result.metrics["possible_patterns"] == 4 # 2^2 + + def test_hand_size_8(self): + """Test with maximum hand size.""" + data = BytesView(bytes(range(256)) * 4) + params = {"hand_size": 8} + + result = self.plugin.run(data, params) + + assert isinstance(result, TestResult) + assert result.metrics["hand_size"] == 8 + assert result.metrics["possible_patterns"] == 256 # 2^8 + + def test_streaming_with_large_chunks(self): + """Test streaming with large chunks.""" + stream_plugin = PokerTest() + data_bytes = bytearray() + for i in range(500): + data_bytes.append((i * 137) % 256) + + # Split into 5 large chunks + chunk_size = 100 + for i in range(0, len(data_bytes), chunk_size): + chunk = bytes(data_bytes[i:i + chunk_size]) + stream_plugin.update(chunk, {"hand_size": 4}) + + result = stream_plugin.finalize({"hand_size": 4}) + + assert isinstance(result, TestResult) + assert result.metrics["num_hands"] == 1000 # 4000 bits / 4 bits per hand + + def test_all_same_pattern(self): + """Test when all hands have the same pattern.""" + data = BytesView(b'\x00' * 200) + params = {"hand_size": 4} + + result = self.plugin.run(data, params) + + assert result.passed is False + assert result.metrics["unique_patterns"] == 1 + + def test_category_and_p_values(self): + """Test category and p_values dict.""" + data = BytesView(bytes(range(256)) * 2) + params = {"hand_size": 4} + result = self.plugin.run(data, params) + + assert result.category == "statistical" + assert "poker" in result.p_values + + def test_degrees_of_freedom_calculation(self): + """Test that degrees of freedom equals 2^m - 1.""" + test_cases = [(3, 7), (4, 15), (5, 31), (6, 63)] + + for hand_size, expected_df in test_cases: + data = BytesView(bytes(range(256)) * 2) + result = self.plugin.run(data, {"hand_size": hand_size}) + assert result.metrics["degrees_of_freedom"] == expected_df + + def test_streaming_finalize_resets_state(self): + """Test that finalize resets state for reuse.""" + stream_plugin = PokerTest() + + # First run + stream_plugin.update(bytes(range(256)), {"hand_size": 4}) + result1 = stream_plugin.finalize({"hand_size": 4}) + + # Second run + stream_plugin.update(b'\x00' * 200, {"hand_size": 4}) + result2 = stream_plugin.finalize({"hand_size": 4}) + + # Results should be independent + assert result1.passed != result2.passed + + def test_metrics_completeness(self): + """Test that all expected metrics are present.""" + data = BytesView(bytes(range(256)) * 2) + result = self.plugin.run(data, {"hand_size": 4}) + + expected_metrics = ["total_bits", "hand_size", "num_hands", + "chi_square_statistic", "degrees_of_freedom", + "unique_patterns", "possible_patterns"] + for metric in expected_metrics: + assert metric in result.metrics + + def test_very_large_poker_test(self): + """Test poker with very large dataset.""" + data_bytes = bytearray() + for _ in range(400): + data_bytes.extend(bytes(range(256))) + + data = BytesView(bytes(data_bytes)) + result = self.plugin.run(data, {"hand_size": 4}) + + assert isinstance(result, TestResult) + assert result.metrics["total_bits"] == 819200 + assert 0.0 <= result.p_value <= 1.0