diff --git a/.gitignore b/.gitignore index 5cea013..0c32551 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,6 @@ Thumbs.db .env.local .env.*.local secrets.json + +# Demos +demo_*.py diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 085bc2a..83c458a 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -33,6 +33,80 @@ TrueEntropy harvests entropy from real-world sources and converts it into crypto --- +## Operation Modes + +TrueEntropy supports two operation modes, selectable via `configure(mode=...)`: + +### Mode Comparison + +| Aspect | DIRECT Mode | HYBRID Mode | +|--------|-------------|-------------| +| **Speed** | ~60K ops/sec | ~5M ops/sec | +| **Source** | Direct pool extraction | PRNG seeded by pool | +| **Security** | Maximum (true random) | High (periodic reseed) | +| **Use Case** | Crypto keys, wallets | Simulations, games | + +### DIRECT Mode (Default) + +Every call extracts fresh entropy directly from the pool: + +``` +trueentropy.random() ──► EntropyTap.random() ──► pool.extract(8) ──► float +``` + +``` +┌────────────────────────────────────────────────────────────────────┐ +│ DIRECT MODE │ +├────────────────────────────────────────────────────────────────────┤ +│ │ +│ random() ───► EntropyTap ───► Pool ───► 8 bytes ───► float │ +│ │ │ +│ ▲ │ +│ Harvesters │ +│ │ +└────────────────────────────────────────────────────────────────────┘ +``` + +### HYBRID Mode + +Uses a fast PRNG (Mersenne Twister) that re-seeds from the pool periodically: + +``` +trueentropy.random() ──► HybridTap.random() ──► PRNG.random() ──► float + │ + └──► (every N seconds) ──► pool.extract(32) ──► PRNG.seed() +``` + +``` +┌────────────────────────────────────────────────────────────────────┐ +│ HYBRID MODE │ +├────────────────────────────────────────────────────────────────────┤ +│ │ +│ random() ───► HybridTap ───► PRNG (Mersenne Twister) ───► float │ +│ │ │ +│ ▼ (periodic reseed) │ +│ Pool ◄────── Harvesters │ +│ │ │ +│ └──► 32 bytes ──► PRNG.seed() │ +│ │ +└────────────────────────────────────────────────────────────────────┘ +``` + +### Configuration + +```python +import trueentropy + +# DIRECT mode (default) - maximum security +trueentropy.configure(mode="DIRECT") + +# HYBRID mode - maximum performance +trueentropy.configure(mode="HYBRID", hybrid_reseed_interval=60.0) + +# Combined: Hybrid + Offline (fast, no network) +trueentropy.configure(mode="HYBRID", offline_mode=True) +``` + ## Entropy Sources ### 1. Timing Jitter (timing.py) @@ -300,7 +374,9 @@ Guarantees all N! permutations are equally probable. | Module | Purpose | |--------|---------| | `pool.py` | Accumulates and mixes entropy with SHA-256 | -| `tap.py` | Extracts entropy and converts to types | +| `tap.py` | `BaseTap` abstract class + `EntropyTap` (DIRECT mode) | +| `hybrid.py` | `HybridTap` for HYBRID mode (PRNG seeded by pool) | +| `config.py` | `TrueEntropyConfig` dataclass + `configure()` function | | `collector.py` | Background thread for automatic collection | | `health.py` | Monitors pool health (score 0-100) | | `harvesters/` | Collectors for different entropy sources | diff --git a/README.md b/README.md index 1bf5ac5..31d6b1b 100644 --- a/README.md +++ b/README.md @@ -149,6 +149,43 @@ print_health_report(get_pool()) > **Note**: Offline mode provides reduced entropy diversity. For security-critical applications, consider using all available sources when network access is available. +## Direct vs Hybrid Mode + +TrueEntropy offers two modes of operation to balance security and performance: + +| Mode | Entropy Source | Performance | Use Case | +|------|---------------|-------------|----------| +| **DIRECT** (Default) | Directly from entropy pool | Slower, blocking | Cryptographic keys, wallets, severe security | +| **HYBRID** | PRNG seeded by pool | Extremely fast | Simulations, games, UI, general purpose | + +### Using Hybrid Mode + +Hybrid mode uses the TrueEntropy pool to periodically re-seed a fast pseudo-random number generator (PRNG). This provides the best of both worlds: the speed of standard Python random numbers with the entropy quality of our harvesters. + +```python +import trueentropy + +# Configure Hybrid Mode (re-seed every 60 seconds) +trueentropy.configure(mode="HYBRID", hybrid_reseed_interval=60.0) + +# Generate numbers at max speed +# The internal PRNG is automatically re-seeded from the entropy pool +for _ in range(1000000): + val = trueentropy.random() +``` + +### Tuning Hybrid Mode + +The `hybrid_reseed_interval` should be chosen based on your `offline_mode` setting: + +* **Online (Default)**: Network harvesters take time to collect entropy (latency). Set the interval to **10.0s or higher** to allow the pool to refill between reseeds. +* **Offline (`offline_mode=True`)**: Local sources are near-instant. You can use lower intervals (e.g., **1.0s - 2.0s**) for frequent reseeding. + +> **Tip**: If `health()` reports degraded status with low entropy bits, increase your reseed interval. + + + + ## Advanced Features ### Async Support @@ -278,10 +315,14 @@ True random numbers from quantum phenomena: | Function | Description | |----------|-------------| +| `configure(...)` | Set mode (DIRECT/HYBRID), offline_mode, enable sources | +| `reset_config()` | Reset configuration to defaults | | `health()` | Returns entropy pool health status | | `start_collector(interval)` | Starts background entropy collection | | `stop_collector()` | Stops background collection | | `feed(data)` | Manually feed entropy into the pool | +| `get_tap()` | Get current tap instance (EntropyTap or HybridTap) | +| `get_pool()` | Get the global entropy pool instance | ## How It Works @@ -307,7 +348,7 @@ True random numbers from quantum phenomena: | v | | +-----------+ | | | EXTRACTOR | Secure extraction | -| | (Tap) | Depletion protection | +| | (Tap) | DIRECT or HYBRID mode | | +-----+-----+ | | v | | +---------------+--------------+ | diff --git a/src/trueentropy/__init__.py b/src/trueentropy/__init__.py index 790a91d..36e9908 100644 --- a/src/trueentropy/__init__.py +++ b/src/trueentropy/__init__.py @@ -48,7 +48,7 @@ # Type Imports (for type hints) # ----------------------------------------------------------------------------- from collections.abc import MutableSequence, Sequence -from typing import TYPE_CHECKING, Any, TypeVar +from typing import TYPE_CHECKING, Any, Literal, TypeVar if TYPE_CHECKING: pass @@ -56,15 +56,16 @@ # ----------------------------------------------------------------------------- # Internal Module Imports # ----------------------------------------------------------------------------- +import trueentropy.config as _config_module from trueentropy.config import ( TrueEntropyConfig, - configure, get_config, reset_config, ) from trueentropy.health import HealthStatus, entropy_health +from trueentropy.hybrid import HybridTap from trueentropy.pool import EntropyPool -from trueentropy.tap import EntropyTap +from trueentropy.tap import BaseTap, EntropyTap # ----------------------------------------------------------------------------- # Type Variables for Generic Functions @@ -78,12 +79,96 @@ # Users can also create their own instances if needed. _pool: EntropyPool = EntropyPool() -_tap: EntropyTap = EntropyTap(_pool) +# _tap is initialized with EntropyTap (DIRECT mode) by default +_tap: BaseTap = EntropyTap(_pool) # Flag to track if background collector is running _collector_running: bool = False +# ----------------------------------------------------------------------------- +# Configuration Helper +# ----------------------------------------------------------------------------- + + +def _update_tap() -> None: + """ + Update the global _tap instance based on current configuration. + + Switches between EntropyTap (DIRECT) and HybridTap (HYBRID). + """ + global _tap + config = get_config() + + if config.mode == "HYBRID": + # Create new HybridTap if not already one + if not isinstance(_tap, HybridTap): + _tap = HybridTap(_pool, reseed_interval=config.hybrid_reseed_interval) + else: + # Update existing HybridTap interval + _tap._reseed_interval = config.hybrid_reseed_interval + else: + # Default to DIRECT mode (EntropyTap) + if not isinstance(_tap, EntropyTap): + _tap = EntropyTap(_pool) + + +def configure( + *, + mode: Literal["DIRECT", "HYBRID"] | None = None, + hybrid_reseed_interval: float | None = None, + offline_mode: bool | None = None, + enable_timing: bool | None = None, + enable_system: bool | None = None, + enable_network: bool | None = None, + enable_external: bool | None = None, + enable_weather: bool | None = None, + enable_radioactive: bool | None = None, +) -> TrueEntropyConfig: + """ + Configure TrueEntropy globally. + + This function updates the global configuration and switches operation mode + (DIRECT vs HYBRID) if requested. + + Args: + mode: Operation mode ("DIRECT" or "HYBRID") + hybrid_reseed_interval: Seconds between re-seeds in HYBRID mode + offline_mode: If True, disables all network-dependent sources. + enable_timing: Enable/disable CPU timing harvester + enable_system: Enable/disable system state harvester + enable_network: Enable/disable network latency harvester + enable_external: Enable/disable external API harvester + enable_weather: Enable/disable weather data harvester + enable_radioactive: Enable/disable quantum randomness harvester + + Returns: + The updated global configuration + + Example: + >>> import trueentropy + >>> # Switch to Hybrid Mode (faster) + >>> trueentropy.configure(mode="HYBRID") + """ + # Call the underlying config update + cfg = _config_module.configure( + mode=mode, + hybrid_reseed_interval=hybrid_reseed_interval, + offline_mode=offline_mode, + enable_timing=enable_timing, + enable_system=enable_system, + enable_network=enable_network, + enable_external=enable_external, + enable_weather=enable_weather, + enable_radioactive=enable_radioactive, + ) + + # Update the active tap based on new config + _update_tap() + + return cfg + + # ============================================================================= # PUBLIC API - Random Value Generation # ============================================================================= @@ -93,9 +178,7 @@ def random() -> float: """ Generate a random floating-point number in the range [0.0, 1.0). - This function extracts entropy from the pool and converts it to a - uniformly distributed float. The distribution is uniform, meaning - all values in the range are equally likely. + This function uses the currently configured tap (DIRECT or HYBRID). Returns: A float value where 0.0 <= value < 1.0 @@ -182,9 +265,8 @@ def randbytes(n: int) -> bytes: """ Generate n random bytes. - This function extracts raw entropy from the pool and returns it - as a bytes object. Useful for generating cryptographic keys, - tokens, or other binary data. + This function extracts raw entropy (or PRNG bytes in Hybrid mode). + Useful for generating cryptographic keys, tokens, or other binary data. Args: n: The number of bytes to generate (must be positive) @@ -208,8 +290,7 @@ def shuffle(seq: MutableSequence[Any]) -> None: """ Shuffle a mutable sequence in-place. - Uses the Fisher-Yates shuffle algorithm with true random numbers - to ensure a uniform distribution of permutations. + Uses the Fisher-Yates shuffle algorithm. Args: seq: A mutable sequence (list) to shuffle in-place @@ -503,15 +584,15 @@ def get_pool() -> EntropyPool: return _pool -def get_tap() -> EntropyTap: +def get_tap() -> BaseTap: """ Get the global entropy tap instance. This is useful for advanced users who want to use the tap - directly or create their own tap with custom settings. + directly or inspect which implementation (BaseTap/HybridTap) is active. Returns: - The global EntropyTap instance + The global BaseTap instance (EntropyTap or HybridTap) """ return _tap @@ -557,6 +638,8 @@ def get_tap() -> EntropyTap: # Classes (for type hints and advanced usage) "EntropyPool", "EntropyTap", + "HybridTap", + "BaseTap", "HealthStatus", "TrueEntropyConfig", ] diff --git a/src/trueentropy/config.py b/src/trueentropy/config.py index 0918aa0..66fc032 100644 --- a/src/trueentropy/config.py +++ b/src/trueentropy/config.py @@ -27,7 +27,7 @@ from __future__ import annotations from dataclasses import dataclass -from typing import Any +from typing import Any, Literal # ----------------------------------------------------------------------------- # Source Metadata @@ -72,6 +72,10 @@ class TrueEntropyConfig: enable_timing: bool = True enable_system: bool = True + # Mode configuration + mode: Literal["DIRECT", "HYBRID"] = "DIRECT" + hybrid_reseed_interval: float = 60.0 + # Network-dependent sources enable_network: bool = True enable_external: bool = True @@ -80,12 +84,18 @@ class TrueEntropyConfig: def __post_init__(self) -> None: """Validate configuration after initialization.""" - # Ensure at least one source is enabled + # Ensure at least one source is enabled (if in DIRECT mode or collecting) if not any(self.enabled_sources): + # In HYBRID mode, we technically need sources too for reseeding, + # but we could rely on os.urandom initial seed if desperate. + # For now, strict validation is safer. raise ValueError( "At least one entropy source must be enabled. " "Cannot disable all harvesters." ) + if self.hybrid_reseed_interval <= 0: + raise ValueError("hybrid_reseed_interval must be positive") + # ------------------------------------------------------------------------- # Properties # ------------------------------------------------------------------------- @@ -204,6 +214,8 @@ def get_config() -> TrueEntropyConfig: def configure( *, + mode: Literal["DIRECT", "HYBRID"] | None = None, + hybrid_reseed_interval: float | None = None, offline_mode: bool | None = None, enable_timing: bool | None = None, enable_system: bool | None = None, @@ -216,10 +228,12 @@ def configure( Configure TrueEntropy globally. This function updates the global configuration used by all - entropy collection functions. You can either set offline_mode=True + entropy collection functions. You can switch modes, set offline_mode to disable all network sources, or configure individual sources. Args: + mode: Operation mode ("DIRECT" or "HYBRID") + hybrid_reseed_interval: Seconds between re-seeds in HYBRID mode offline_mode: If True, disables all network-dependent sources. If False, enables all sources. If None, ignored. enable_timing: Enable/disable CPU timing harvester @@ -234,15 +248,17 @@ def configure( Example: >>> import trueentropy + >>> # Enable hybrid mode for better performance + >>> trueentropy.configure(mode="HYBRID", hybrid_reseed_interval=120) >>> # Enable offline mode >>> trueentropy.configure(offline_mode=True) - >>> # Or disable specific sources - >>> trueentropy.configure(enable_weather=False, enable_radioactive=False) """ global _global_config # Start with current config values new_config = { + "mode": _global_config.mode, + "hybrid_reseed_interval": _global_config.hybrid_reseed_interval, "enable_timing": _global_config.enable_timing, "enable_system": _global_config.enable_system, "enable_network": _global_config.enable_network, @@ -251,6 +267,11 @@ def configure( "enable_radioactive": _global_config.enable_radioactive, } + if mode is not None: + new_config["mode"] = mode + if hybrid_reseed_interval is not None: + new_config["hybrid_reseed_interval"] = hybrid_reseed_interval + # Handle offline_mode convenience flag if offline_mode is True: # Disable all network sources diff --git a/src/trueentropy/hybrid.py b/src/trueentropy/hybrid.py new file mode 100644 index 0000000..58ae2a6 --- /dev/null +++ b/src/trueentropy/hybrid.py @@ -0,0 +1,147 @@ +# ============================================================================= +# TrueEntropy - Hybrid Entopy Tap +# ============================================================================= +# +# The Hybrid Tap provides a high-performance Pseudo-Random Number Generator (PRNG) +# that is regularly re-seeded with true entropy from the pool. +# +# This architecture combines the best of both worlds: +# 1. Performance: Uses Python's optimized Mersenne Twister (via random.Random) +# 2. Resiliency: Even if sources fail, the PRNG continues to function +# 3. Security: Regular re-seeding prevents long-term predictability +# +# ============================================================================= + +""" +Hybrid Tap - PRNG seeded by TrueEntropy pool. +""" + +from __future__ import annotations + +import random +import time +from typing import Any + +from trueentropy.pool import EntropyPool +from trueentropy.tap import BaseTap + + +class HybridTap(BaseTap): + """ + High-performance PRNG seeded by true entropy. + + This tap uses Python's standard `random.Random` for generation, + but re-seeds it periodically using entropy extracted from the + TrueEntropy pool. + + Ideally used for: + - High-volume simulations + - Games and UI effects + - Scenarios where performance < 1ms is critical + - Resilience against temporary entropy source failures + """ + + def __init__( + self, + pool: EntropyPool, + reseed_interval: float = 60.0, + reseed_on_init: bool = True, + ) -> None: + """ + Initialize the hybrid tap. + + Args: + pool: The EntropyPool to use for seeding + reseed_interval: Seconds between automatic re-seeds (default: 60) + reseed_on_init: Whether to pull seed immediately (default: True) + """ + self._pool = pool + self._reseed_interval = reseed_interval + self._last_reseed_time = 0.0 + + # Dedicated PRNG instance to avoid sharing state with global random + self._prng = random.Random() + + if reseed_on_init: + self.reseed() + + def reseed(self) -> None: + """ + Force a re-seed of the internal PRNG from the entropy pool. + + Extracts 32 bytes (256 bits) from the pool to seed the PRNG. + This operation blocks briefly while extracting from the pool. + """ + # Extract 32 bytes of high-quality entropy + seed_data = self._pool.extract(32) + + # Seed the PRNG + self._prng.seed(seed_data) + + # Update timestamp + self._last_reseed_time = time.time() + + def _check_reseed(self) -> None: + """Check if re-seed is needed and perform it if so.""" + if time.time() - self._last_reseed_time > self._reseed_interval: + self.reseed() + + # ------------------------------------------------------------------------- + # BaseTap Implementation + # ------------------------------------------------------------------------- + + def random(self) -> float: + """Generate a random float in [0.0, 1.0).""" + self._check_reseed() + return self._prng.random() + + def randint(self, a: int, b: int) -> int: + """Generate a random integer N such that a <= N <= b.""" + self._check_reseed() + return self._prng.randint(a, b) + + def randbytes(self, n: int) -> bytes: + """Generate n random bytes.""" + self._check_reseed() + # random.randbytes was added in Python 3.9 + # For compatibility, we use getrandbits if randbytes is missing (older python 3) + if hasattr(self._prng, "randbytes"): + return self._prng.randbytes(n) + else: + # Fallback for older python versions + return self._prng.getrandbits(n * 8).to_bytes(n, "little") + + # ------------------------------------------------------------------------- + # Optimized Overrides + # ------------------------------------------------------------------------- + # We override these methods because random.Random usually implements them + # in C for better performance than our generic BaseTap implementation. + + def choice(self, seq: Any) -> Any: + self._check_reseed() + return self._prng.choice(seq) + + def shuffle(self, x: Any) -> None: + self._check_reseed() + return self._prng.shuffle(x) + + def sample(self, population: Any, k: int, **kwargs) -> list[Any]: + self._check_reseed() + # Support 'counts' keyword arg in newer python versions + return self._prng.sample(population, k, **kwargs) + + def uniform(self, a: float, b: float) -> float: + self._check_reseed() + return self._prng.uniform(a, b) + + def gauss(self, mu: float = 0.0, sigma: float = 1.0) -> float: + self._check_reseed() + return self._prng.gauss(mu, sigma) + + def triangular(self, low: float = 0.0, high: float = 1.0, mode: float | None = None) -> float: + self._check_reseed() + return self._prng.triangular(low, high, mode) + + def exponential(self, lambd: float = 1.0) -> float: + self._check_reseed() + return self._prng.expovariate(lambd) diff --git a/src/trueentropy/tap.py b/src/trueentropy/tap.py index 8db1306..fd57580 100644 --- a/src/trueentropy/tap.py +++ b/src/trueentropy/tap.py @@ -23,6 +23,7 @@ from __future__ import annotations import struct +from abc import ABC, abstractmethod from collections.abc import MutableSequence, Sequence from typing import Any, TypeVar @@ -32,163 +33,36 @@ T = TypeVar("T") -class EntropyTap: +class BaseTap(ABC): """ - Extracts and formats random values from an entropy pool. - - The tap is responsible for converting raw entropy bytes into - various formats like floats, integers, and booleans. It ensures - that all generated values are uniformly distributed. + Abstract base class for entropy taps. - Example: - >>> pool = EntropyPool() - >>> tap = EntropyTap(pool) - >>> value = tap.random() - >>> print(f"Random: {value}") + Provides common utility methods and higher-level distributions + dependent on the core random primitives. """ - # ------------------------------------------------------------------------- - # Initialization - # ------------------------------------------------------------------------- - - def __init__(self, pool: EntropyPool) -> None: - """ - Initialize the tap with an entropy pool. - - Args: - pool: The EntropyPool instance to extract entropy from - """ - self._pool = pool - - # ------------------------------------------------------------------------- - # Random Value Generation - # ------------------------------------------------------------------------- - + @abstractmethod def random(self) -> float: - """ - Generate a random float in the range [0.0, 1.0). - - Uses 64 bits of entropy to generate a uniformly distributed - floating-point number. The result is always less than 1.0. - - Returns: - A float value where 0.0 <= value < 1.0 - - How it works: - 1. Extract 8 bytes (64 bits) from the pool - 2. Interpret as unsigned 64-bit integer - 3. Divide by 2^64 to get value in [0, 1) - """ - # Extract 8 bytes of entropy - raw_bytes = self._pool.extract(8) - - # Unpack as unsigned 64-bit integer (big-endian) - # We use big-endian for consistency across platforms - value = struct.unpack("!Q", raw_bytes)[0] - - # Convert to float in range [0.0, 1.0) - # 2^64 = 18446744073709551616 - return value / 18446744073709551616.0 + """Generate a random float in the range [0.0, 1.0).""" + pass + @abstractmethod def randint(self, a: int, b: int) -> int: - """ - Generate a random integer N such that a <= N <= b. - - Uses rejection sampling to ensure uniform distribution. - This avoids modulo bias that would occur with simple modulo. - - Args: - a: Lower bound (inclusive) - b: Upper bound (inclusive) - - Returns: - Random integer in [a, b] - - Raises: - ValueError: If a > b - - How it works: - 1. Calculate the range size (b - a + 1) - 2. Find the smallest number of bits needed to represent range - 3. Generate random bits and check if value < range - 4. If not, reject and try again (rejection sampling) - 5. This ensures perfectly uniform distribution - """ - if a > b: - raise ValueError(f"randint: a ({a}) must be <= b ({b})") + """Generate a random integer N such that a <= N <= b.""" + pass - if a == b: - return a # Only one possible value - - # Calculate range size - range_size = b - a + 1 - - # Find number of bits needed to represent range_size - # We need ceil(log2(range_size)) bits - bits_needed = (range_size - 1).bit_length() - bytes_needed = (bits_needed + 7) // 8 # Round up to bytes - - # Mask to extract only the bits we need - # e.g., for range_size=100, bits_needed=7, mask=0x7F (127) - mask = (1 << bits_needed) - 1 - - # Rejection sampling loop - # We keep generating random values until we get one in range - # Expected number of iterations is < 2 on average - while True: - # Extract random bytes - raw_bytes = self._pool.extract(bytes_needed) - - # Pad to 8 bytes for unpacking (big-endian) - padded = raw_bytes.rjust(8, b"\x00") - - # Unpack as unsigned 64-bit integer - value = struct.unpack("!Q", padded)[0] - - # Apply mask to get only needed bits - value = value & mask - - # Check if value is in valid range - if value < range_size: - return a + value + @abstractmethod + def randbytes(self, n: int) -> bytes: + """Generate n random bytes.""" + pass def randbool(self) -> bool: """ Generate a random boolean (True or False). - Each value has exactly 50% probability - a fair coin flip. - - Returns: - True or False with equal probability - - How it works: - 1. Extract 1 byte from the pool - 2. Check the least significant bit - 3. Return True if bit is 1, False if 0 + Default implementation uses random(). Can be overridden for efficiency. """ - # Extract 1 byte of entropy - raw_byte = self._pool.extract(1) - - # Check least significant bit - return (raw_byte[0] & 1) == 1 - - def randbytes(self, n: int) -> bytes: - """ - Generate n random bytes. - - Args: - n: Number of bytes to generate (must be positive) - - Returns: - A bytes object of length n - - Raises: - ValueError: If n is not positive - """ - if n <= 0: - raise ValueError(f"randbytes: n ({n}) must be positive") - - return self._pool.extract(n) + return self.random() < 0.5 def choice(self, seq: Sequence[T]) -> T: """ @@ -586,6 +460,165 @@ def random_password( # Generate password by choosing random characters return "".join(self.choice(chars) for _ in range(length)) + +class EntropyTap(BaseTap): + """ + Extracts and formats random values from an entropy pool. + + The tap is responsible for converting raw entropy bytes into + various formats like floats, integers, and booleans. It ensures + that all generated values are uniformly distributed. + + Example: + >>> pool = EntropyPool() + >>> tap = EntropyTap(pool) + >>> value = tap.random() + >>> print(f"Random: {value}") + """ + + # ------------------------------------------------------------------------- + # Initialization + # ------------------------------------------------------------------------- + + def __init__(self, pool: EntropyPool) -> None: + """ + Initialize the tap with an entropy pool. + + Args: + pool: The EntropyPool instance to extract entropy from + """ + self._pool = pool + + # ------------------------------------------------------------------------- + # Random Value Generation + # ------------------------------------------------------------------------- + + def random(self) -> float: + """ + Generate a random float in the range [0.0, 1.0). + + Uses 64 bits of entropy to generate a uniformly distributed + floating-point number. The result is always less than 1.0. + + Returns: + A float value where 0.0 <= value < 1.0 + + How it works: + 1. Extract 8 bytes (64 bits) from the pool + 2. Interpret as unsigned 64-bit integer + 3. Divide by 2^64 to get value in [0, 1) + """ + # Extract 8 bytes of entropy + raw_bytes = self._pool.extract(8) + + # Unpack as unsigned 64-bit integer (big-endian) + # We use big-endian for consistency across platforms + value = struct.unpack("!Q", raw_bytes)[0] + + # Convert to float in range [0.0, 1.0) + # 2^64 = 18446744073709551616 + return value / 18446744073709551616.0 + + def randint(self, a: int, b: int) -> int: + """ + Generate a random integer N such that a <= N <= b. + + Uses rejection sampling to ensure uniform distribution. + This avoids modulo bias that would occur with simple modulo. + + Args: + a: Lower bound (inclusive) + b: Upper bound (inclusive) + + Returns: + Random integer in [a, b] + + Raises: + ValueError: If a > b + + How it works: + 1. Calculate the range size (b - a + 1) + 2. Find the smallest number of bits needed to represent range + 3. Generate random bits and check if value < range + 4. If not, reject and try again (rejection sampling) + 5. This ensures perfectly uniform distribution + """ + if a > b: + raise ValueError(f"randint: a ({a}) must be <= b ({b})") + + if a == b: + return a # Only one possible value + + # Calculate range size + range_size = b - a + 1 + + # Find number of bits needed to represent range_size + # We need ceil(log2(range_size)) bits + bits_needed = (range_size - 1).bit_length() + bytes_needed = (bits_needed + 7) // 8 # Round up to bytes + + # Mask to extract only the bits we need + # e.g., for range_size=100, bits_needed=7, mask=0x7F (127) + mask = (1 << bits_needed) - 1 + + # Rejection sampling loop + # We keep generating random values until we get one in range + # Expected number of iterations is < 2 on average + while True: + # Extract random bytes + raw_bytes = self._pool.extract(bytes_needed) + + # Pad to 8 bytes for unpacking (big-endian) + padded = raw_bytes.rjust(8, b"\x00") + + # Unpack as unsigned 64-bit integer + value = struct.unpack("!Q", padded)[0] + + # Apply mask to get only needed bits + value = value & mask + + # Check if value is in valid range + if value < range_size: + return a + value + + def randbool(self) -> bool: + """ + Generate a random boolean (True or False). + + Each value has exactly 50% probability - a fair coin flip. + + Returns: + True or False with equal probability + + How it works: + 1. Extract 1 byte from the pool + 2. Check the least significant bit + 3. Return True if bit is 1, False if 0 + """ + # Extract 1 byte of entropy + raw_byte = self._pool.extract(1) + + # Check least significant bit + return (raw_byte[0] & 1) == 1 + + def randbytes(self, n: int) -> bytes: + """ + Generate n random bytes. + + Args: + n: Number of bytes to generate (must be positive) + + Returns: + A bytes object of length n + + Raises: + ValueError: If n is not positive + """ + if n <= 0: + raise ValueError(f"randbytes: n ({n}) must be positive") + + return self._pool.extract(n) + # ------------------------------------------------------------------------- # String Representation # ------------------------------------------------------------------------- diff --git a/tests/test_harvesters.py b/tests/test_harvesters.py index 6728db6..c127967 100644 --- a/tests/test_harvesters.py +++ b/tests/test_harvesters.py @@ -14,49 +14,39 @@ # # ============================================================================= -from __future__ import annotations -"""Tests for the entropy harvesters.""" +from __future__ import annotations import pytest -from trueentropy.harvesters.base import BaseHarvester, HarvestResult -from trueentropy.harvesters.timing import TimingHarvester +from trueentropy.harvesters.base import HarvestResult +from trueentropy.harvesters.external import ExternalHarvester from trueentropy.harvesters.network import NetworkHarvester +from trueentropy.harvesters.radioactive import RadioactiveHarvester from trueentropy.harvesters.system import SystemHarvester -from trueentropy.harvesters.external import ExternalHarvester +from trueentropy.harvesters.timing import TimingHarvester from trueentropy.harvesters.weather import WeatherHarvester -from trueentropy.harvesters.radioactive import RadioactiveHarvester class TestHarvestResult: """Test HarvestResult dataclass.""" - + def test_success_result(self) -> None: """Test creating a successful result.""" - result = HarvestResult( - data=b"test_data", - entropy_bits=16, - source="test", - success=True - ) - + result = HarvestResult(data=b"test_data", entropy_bits=16, source="test", success=True) + assert result.data == b"test_data" assert result.entropy_bits == 16 assert result.source == "test" assert result.success is True assert result.error is None - + def test_failure_result(self) -> None: """Test creating a failed result.""" result = HarvestResult( - data=b"", - entropy_bits=0, - source="test", - success=False, - error="Something went wrong" + data=b"", entropy_bits=0, source="test", success=False, error="Something went wrong" ) - + assert result.data == b"" assert result.entropy_bits == 0 assert result.success is False @@ -65,94 +55,94 @@ def test_failure_result(self) -> None: class TestTimingHarvester: """Test TimingHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = TimingHarvester() assert harvester.name == "timing" - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = TimingHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + def test_collect_succeeds(self) -> None: """collect() should succeed (timing is always available).""" harvester = TimingHarvester() result = harvester.collect() - + assert result.success is True assert result.data != b"" assert result.entropy_bits > 0 - + def test_collect_returns_bytes(self) -> None: """collect() should return bytes data.""" harvester = TimingHarvester() result = harvester.collect() - + assert isinstance(result.data, bytes) assert len(result.data) > 0 - + def test_num_samples_configuration(self) -> None: """Harvester should respect num_samples config.""" harvester = TimingHarvester(num_samples=32) assert harvester.num_samples == 32 - + result = harvester.collect() # 32 samples * 8 bytes per sample assert len(result.data) == 32 * 8 - + def test_different_collections_different_data(self) -> None: """Successive collections should return different data.""" harvester = TimingHarvester() - + results = [harvester.collect().data for _ in range(10)] - + # All should be unique assert len(set(results)) == 10 - + def test_safe_collect(self) -> None: """safe_collect() should never raise.""" harvester = TimingHarvester() - + # Should not raise result = harvester.safe_collect() - + assert isinstance(result, HarvestResult) class TestSystemHarvester: """Test SystemHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = SystemHarvester() assert harvester.name == "system" - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = SystemHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + def test_collect_returns_data(self) -> None: """collect() should return non-empty data.""" harvester = SystemHarvester() result = harvester.collect() - + # Should succeed if psutil is available if result.success: assert result.data != b"" assert result.entropy_bits > 0 - + def test_list_available_metrics(self) -> None: """list_available_metrics() should return list.""" harvester = SystemHarvester() metrics = harvester.list_available_metrics() - + assert isinstance(metrics, list) # At minimum, timestamp should be available assert "timestamp_ns" in metrics @@ -160,38 +150,38 @@ def test_list_available_metrics(self) -> None: class TestNetworkHarvester: """Test NetworkHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = NetworkHarvester() assert harvester.name == "network" - + def test_default_targets(self) -> None: """Harvester should have default targets.""" harvester = NetworkHarvester() - + assert len(harvester.targets) > 0 assert all(t.startswith("http") for t in harvester.targets) - + def test_custom_targets(self) -> None: """Harvester should accept custom targets.""" targets = ["https://example.com", "https://test.com"] harvester = NetworkHarvester(targets=targets) - + assert harvester.targets == targets - + def test_timeout_configuration(self) -> None: """Harvester should respect timeout config.""" harvester = NetworkHarvester(timeout=1.0) assert harvester.timeout == 1.0 - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = NetworkHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + # Note: We don't test actual network calls here to avoid # network-dependent test failures. Integration tests would # cover that. @@ -199,41 +189,38 @@ def test_collect_returns_result(self) -> None: class TestExternalHarvester: """Test ExternalHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = ExternalHarvester() assert harvester.name == "external" - + def test_enable_flags(self) -> None: """Harvester should respect enable flags.""" - harvester = ExternalHarvester( - enable_earthquake=False, - enable_crypto=True - ) - + harvester = ExternalHarvester(enable_earthquake=False, enable_crypto=True) + assert harvester.enable_earthquake is False assert harvester.enable_crypto is True - + def test_timeout_configuration(self) -> None: """Harvester should respect timeout config.""" harvester = ExternalHarvester(timeout=3.0) assert harvester.timeout == 3.0 - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = ExternalHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + # Note: We don't test actual API calls here to avoid # network-dependent test failures and rate limiting. class TestBaseHarvesterInterface: """Test that harvesters implement the interface correctly.""" - + def test_all_harvesters_have_name(self) -> None: """All harvesters should implement name property.""" harvesters = [ @@ -242,11 +229,11 @@ def test_all_harvesters_have_name(self) -> None: NetworkHarvester(), ExternalHarvester(), ] - + for h in harvesters: assert isinstance(h.name, str) assert len(h.name) > 0 - + def test_all_harvesters_have_collect(self) -> None: """All harvesters should implement collect method.""" harvesters = [ @@ -255,11 +242,11 @@ def test_all_harvesters_have_collect(self) -> None: NetworkHarvester(), ExternalHarvester(), ] - + for h in harvesters: result = h.collect() assert isinstance(result, HarvestResult) - + def test_all_harvesters_have_safe_collect(self) -> None: """All harvesters should have safe_collect from base class.""" harvesters = [ @@ -268,11 +255,11 @@ def test_all_harvesters_have_safe_collect(self) -> None: NetworkHarvester(), ExternalHarvester(), ] - + for h in harvesters: result = h.safe_collect() assert isinstance(result, HarvestResult) - + def test_repr(self) -> None: """All harvesters should have useful repr.""" harvesters = [ @@ -283,7 +270,7 @@ def test_repr(self) -> None: WeatherHarvester(), RadioactiveHarvester(), ] - + for h in harvesters: repr_str = repr(h) assert h.name in repr_str @@ -291,34 +278,34 @@ def test_repr(self) -> None: class TestWeatherHarvester: """Test WeatherHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = WeatherHarvester() assert harvester.name == "weather" - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = WeatherHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + def test_timeout_configuration(self) -> None: """Harvester should respect timeout config.""" harvester = WeatherHarvester(timeout=3.0) assert harvester.timeout == 3.0 - + def test_api_key_masking(self) -> None: """API key should be masked when accessed.""" harvester = WeatherHarvester(api_key="my_secret_api_key_12345") masked = harvester.api_key - + # Should be partially masked assert masked is not None assert "..." in masked - assert "my_secret_api_key_12345" != masked - + assert masked != "my_secret_api_key_12345" + def test_no_api_key_returns_none(self) -> None: """api_key property should return None when not set.""" harvester = WeatherHarvester() @@ -327,44 +314,43 @@ def test_no_api_key_returns_none(self) -> None: class TestRadioactiveHarvester: """Test RadioactiveHarvester.""" - + def test_name(self) -> None: """Harvester should have correct name.""" harvester = RadioactiveHarvester() assert harvester.name == "radioactive" - + def test_collect_returns_result(self) -> None: """collect() should return HarvestResult.""" harvester = RadioactiveHarvester() result = harvester.collect() - + assert isinstance(result, HarvestResult) - + def test_timeout_configuration(self) -> None: """Harvester should respect timeout config.""" harvester = RadioactiveHarvester(timeout=15.0) assert harvester.timeout == 15.0 - + def test_num_integers_configuration(self) -> None: """Harvester should respect num_integers config.""" harvester = RadioactiveHarvester(num_integers=20) assert harvester.num_integers == 20 - + def test_num_integers_validation(self) -> None: """num_integers should have valid range.""" harvester = RadioactiveHarvester() - + with pytest.raises(ValueError): harvester.num_integers = 0 - + with pytest.raises(ValueError): harvester.num_integers = 1001 - + def test_api_key_masking(self) -> None: """API key should be masked when accessed.""" harvester = RadioactiveHarvester(api_key="12345678-abcd-efgh-ijkl") masked = harvester.api_key - + assert masked is not None assert "..." in masked - diff --git a/tests/test_harvesters_live.py b/tests/test_harvesters_live.py index 553437c..0ef8f5a 100644 --- a/tests/test_harvesters_live.py +++ b/tests/test_harvesters_live.py @@ -20,19 +20,18 @@ from __future__ import annotations import time -from dataclasses import dataclass, field +from dataclasses import dataclass from typing import ClassVar import pytest -from trueentropy.harvesters.base import BaseHarvester, HarvestResult -from trueentropy.harvesters.timing import TimingHarvester +from trueentropy.harvesters.base import BaseHarvester +from trueentropy.harvesters.external import ExternalHarvester from trueentropy.harvesters.network import NetworkHarvester +from trueentropy.harvesters.radioactive import RadioactiveHarvester from trueentropy.harvesters.system import SystemHarvester -from trueentropy.harvesters.external import ExternalHarvester +from trueentropy.harvesters.timing import TimingHarvester from trueentropy.harvesters.weather import WeatherHarvester -from trueentropy.harvesters.radioactive import RadioactiveHarvester - # ----------------------------------------------------------------------------- # Performance Tracking @@ -42,7 +41,7 @@ @dataclass class HarvesterMetrics: """Metrics for a single harvester test run.""" - + name: str success: bool duration_ms: float @@ -54,14 +53,14 @@ class HarvesterMetrics: class PerformanceTracker: """Track performance metrics across all harvester tests.""" - + metrics: ClassVar[list[HarvesterMetrics]] = [] - + @classmethod def add_metric(cls, metric: HarvesterMetrics) -> None: """Add a metric to the tracker.""" cls.metrics.append(metric) - + @classmethod def print_report(cls) -> None: """Print a summary report of all metrics.""" @@ -69,27 +68,33 @@ def print_report(cls) -> None: print("=" * 75) print(" HARVESTER PERFORMANCE REPORT") print("=" * 75) - print(f"{'Harvester':<15} {'Status':<10} {'Time (ms)':<12} {'Entropy':<10} {'Bytes':<10} {'Network'}") + print( + f"{'Harvester':<15} {'Status':<10} {'Time (ms)':<12} {'Entropy':<10} {'Bytes':<10} {'Network'}" + ) print("-" * 75) - + total_time = 0 total_entropy = 0 successful = 0 - + for m in cls.metrics: status = "✓ OK" if m.success else "✗ FAIL" network = "Yes" if m.requires_network else "No" - print(f"{m.name:<15} {status:<10} {m.duration_ms:>8.2f} ms {m.entropy_bits:>6} bits {m.data_bytes:>6} B {network}") - + print( + f"{m.name:<15} {status:<10} {m.duration_ms:>8.2f} ms {m.entropy_bits:>6} bits {m.data_bytes:>6} B {network}" + ) + total_time += m.duration_ms if m.success: total_entropy += m.entropy_bits successful += 1 - + print("-" * 75) - print(f"{'TOTAL':<15} {successful}/{len(cls.metrics):<8} {total_time:>8.2f} ms {total_entropy:>6} bits") + print( + f"{'TOTAL':<15} {successful}/{len(cls.metrics):<8} {total_time:>8.2f} ms {total_entropy:>6} bits" + ) print("=" * 75) - + # Print failures if any failures = [m for m in cls.metrics if not m.success] if failures: @@ -97,7 +102,7 @@ def print_report(cls) -> None: for m in failures: print(f" - {m.name}: {m.error}") print() - + @classmethod def clear(cls) -> None: """Clear all metrics.""" @@ -107,20 +112,20 @@ def clear(cls) -> None: def measure_harvester(harvester: BaseHarvester, requires_network: bool = False) -> HarvesterMetrics: """ Measure a harvester's performance. - + Args: harvester: The harvester to measure requires_network: Whether this harvester requires network access - + Returns: HarvesterMetrics with timing and result information """ start_time = time.perf_counter() result = harvester.safe_collect() end_time = time.perf_counter() - + duration_ms = (end_time - start_time) * 1000 - + return HarvesterMetrics( name=harvester.name, success=result.success, @@ -139,60 +144,64 @@ def measure_harvester(harvester: BaseHarvester, requires_network: bool = False) class TestOfflineHarvesters: """Tests for harvesters that work offline (no network required).""" - + def test_timing_harvester_live(self) -> None: """Test TimingHarvester with real CPU timing measurements.""" harvester = TimingHarvester(num_samples=64) - + metrics = measure_harvester(harvester, requires_network=False) PerformanceTracker.add_metric(metrics) - - print(f"\n [TIMING] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [TIMING] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + assert metrics.success, f"Timing harvester failed: {metrics.error}" assert metrics.entropy_bits > 0, "Should produce entropy" assert metrics.data_bytes > 0, "Should produce data" # Timing should be fast (< 100ms typically) assert metrics.duration_ms < 500, f"Timing harvester too slow: {metrics.duration_ms}ms" - + def test_system_harvester_live(self) -> None: """Test SystemHarvester with real system state collection.""" harvester = SystemHarvester() - + metrics = measure_harvester(harvester, requires_network=False) PerformanceTracker.add_metric(metrics) - - print(f"\n [SYSTEM] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [SYSTEM] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + assert metrics.success, f"System harvester failed: {metrics.error}" assert metrics.entropy_bits > 0, "Should produce entropy" # System calls should be fast assert metrics.duration_ms < 1000, f"System harvester too slow: {metrics.duration_ms}ms" - + def test_timing_consistency(self) -> None: """Test that repeated timing collections produce different data.""" harvester = TimingHarvester(num_samples=32) - + results = [] for i in range(5): result = harvester.collect() results.append(result.data) time.sleep(0.01) # Small delay between samples - + # All results should be unique unique_results = set(results) assert len(unique_results) == 5, "Timing data should vary between collections" - print(f"\n [TIMING CONSISTENCY] 5 unique samples collected") - + print("\n [TIMING CONSISTENCY] 5 unique samples collected") + def test_system_metrics_available(self) -> None: """Test that system harvester can list available metrics.""" harvester = SystemHarvester() metrics = harvester.list_available_metrics() - + print(f"\n [SYSTEM METRICS] Available: {metrics}") - + assert isinstance(metrics, list) assert len(metrics) > 0, "Should have at least one metric" assert "timestamp_ns" in metrics, "Timestamp should always be available" @@ -207,74 +216,82 @@ def test_system_metrics_available(self) -> None: class TestOnlineHarvesters: """ Tests for harvesters that require network access. - + These tests make REAL network calls and may be slow or fail if network is unavailable. - + Run with: pytest -m network tests/test_harvesters_live.py -v -s """ - + def test_network_harvester_live(self) -> None: """Test NetworkHarvester with real network pings.""" harvester = NetworkHarvester(timeout=5.0) - + metrics = measure_harvester(harvester, requires_network=True) PerformanceTracker.add_metric(metrics) - - print(f"\n [NETWORK] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [NETWORK] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + # Network may fail but should handle gracefully if metrics.success: assert metrics.entropy_bits > 0, "Should produce entropy on success" else: print(f" [NETWORK] Warning: Failed ({metrics.error}) - may be offline") - + def test_external_harvester_live(self) -> None: """Test ExternalHarvester with real API calls (USGS, crypto).""" harvester = ExternalHarvester(timeout=10.0) - + metrics = measure_harvester(harvester, requires_network=True) PerformanceTracker.add_metric(metrics) - - print(f"\n [EXTERNAL] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [EXTERNAL] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + if metrics.success: assert metrics.entropy_bits > 0, "Should produce entropy on success" else: print(f" [EXTERNAL] Warning: Failed ({metrics.error}) - API may be unavailable") - + def test_weather_harvester_live(self) -> None: """Test WeatherHarvester with real weather API calls.""" # Using wttr.in (no API key required) harvester = WeatherHarvester(timeout=10.0) - + metrics = measure_harvester(harvester, requires_network=True) PerformanceTracker.add_metric(metrics) - - print(f"\n [WEATHER] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [WEATHER] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + if metrics.success: assert metrics.entropy_bits > 0, "Should produce entropy on success" else: print(f" [WEATHER] Warning: Failed ({metrics.error}) - API may be unavailable") - + def test_radioactive_harvester_live(self) -> None: """Test RadioactiveHarvester with real quantum/random.org API calls.""" harvester = RadioactiveHarvester(timeout=15.0) - + metrics = measure_harvester(harvester, requires_network=True) PerformanceTracker.add_metric(metrics) - - print(f"\n [RADIOACTIVE] Duration: {metrics.duration_ms:.2f}ms, " - f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes") - + + print( + f"\n [RADIOACTIVE] Duration: {metrics.duration_ms:.2f}ms, " + f"Entropy: {metrics.entropy_bits} bits, Data: {metrics.data_bytes} bytes" + ) + if metrics.success: assert metrics.entropy_bits > 0, "Should produce entropy on success" # Quantum random should give full entropy - print(f" [RADIOACTIVE] True quantum randomness collected!") + print(" [RADIOACTIVE] True quantum randomness collected!") else: print(f" [RADIOACTIVE] Warning: Failed ({metrics.error}) - may be rate limited") @@ -286,21 +303,21 @@ def test_radioactive_harvester_live(self) -> None: class TestAllHarvesters: """Test all harvesters together and generate performance report.""" - + def setup_method(self) -> None: """Clear metrics before each test.""" PerformanceTracker.clear() - + def test_all_harvesters_benchmark(self) -> None: """ Benchmark ALL harvesters and print performance report. - + This is the main comprehensive test that exercises all entropy sources. """ print("\n" + "=" * 60) print(" RUNNING FULL HARVESTER BENCHMARK") print("=" * 60) - + harvesters = [ # Offline harvesters (TimingHarvester(num_samples=64), False), @@ -311,45 +328,46 @@ def test_all_harvesters_benchmark(self) -> None: (WeatherHarvester(timeout=10.0), True), (RadioactiveHarvester(timeout=15.0), True), ] - + for harvester, requires_network in harvesters: print(f" Testing {harvester.name}...", end="", flush=True) metrics = measure_harvester(harvester, requires_network) PerformanceTracker.add_metric(metrics) - + status = "✓" if metrics.success else "✗" print(f" {status} ({metrics.duration_ms:.0f}ms)") - + # Print full report PerformanceTracker.print_report() - + # At minimum, offline harvesters should work offline_metrics = [m for m in PerformanceTracker.metrics if not m.requires_network] offline_success = all(m.success for m in offline_metrics) - + assert offline_success, "Offline harvesters must always succeed" - + def test_entropy_quality(self) -> None: """Test that collected entropy has good distribution.""" harvester = TimingHarvester(num_samples=128) result = harvester.collect() - + assert result.success assert len(result.data) >= 128 - + # Check byte distribution (simple entropy check) from collections import Counter + byte_counts = Counter(result.data) - + # In truly random data, no single byte should dominate max_count = max(byte_counts.values()) total_bytes = len(result.data) max_percentage = (max_count / total_bytes) * 100 - + print(f"\n [QUALITY] Data size: {total_bytes} bytes") print(f" [QUALITY] Unique byte values: {len(byte_counts)}/256") print(f" [QUALITY] Max byte frequency: {max_percentage:.1f}%") - + # Warning if distribution looks bad if max_percentage > 10: print(f" [QUALITY] Warning: High byte frequency ({max_percentage:.1f}%)") @@ -362,44 +380,43 @@ def test_entropy_quality(self) -> None: class TestHarvesterConfiguration: """Test harvester configuration options.""" - + def test_timing_num_samples(self) -> None: """Test configurable sample count for timing harvester.""" for num_samples in [16, 32, 64, 128]: harvester = TimingHarvester(num_samples=num_samples) result = harvester.collect() - + expected_bytes = num_samples * 8 # 8 bytes per sample - assert len(result.data) == expected_bytes, \ - f"Expected {expected_bytes} bytes for {num_samples} samples" - - print(f"\n [CONFIG] TimingHarvester(num_samples={num_samples}) -> {len(result.data)} bytes") - + assert ( + len(result.data) == expected_bytes + ), f"Expected {expected_bytes} bytes for {num_samples} samples" + + print( + f"\n [CONFIG] TimingHarvester(num_samples={num_samples}) -> {len(result.data)} bytes" + ) + def test_network_custom_targets(self) -> None: """Test custom network targets configuration.""" custom_targets = ["https://google.com", "https://github.com"] harvester = NetworkHarvester(targets=custom_targets, timeout=5.0) - + assert harvester.targets == custom_targets print(f"\n [CONFIG] NetworkHarvester with custom targets: {custom_targets}") - + def test_network_timeout_configuration(self) -> None: """Test that timeout is respected.""" timeout = 2.0 harvester = NetworkHarvester(timeout=timeout) - + assert harvester.timeout == timeout print(f"\n [CONFIG] NetworkHarvester timeout: {timeout}s") - + def test_external_source_toggles(self) -> None: """Test enabling/disabling external sources.""" # Disable earthquake, enable crypto only - harvester = ExternalHarvester( - enable_earthquake=False, - enable_crypto=True, - timeout=5.0 - ) - + harvester = ExternalHarvester(enable_earthquake=False, enable_crypto=True, timeout=5.0) + assert harvester.enable_earthquake is False assert harvester.enable_crypto is True print("\n [CONFIG] ExternalHarvester with crypto only") @@ -414,7 +431,7 @@ def test_external_source_toggles(self) -> None: def final_report(request): """Print final performance report after all tests complete.""" yield - + if PerformanceTracker.metrics: print("\n\n" + "=" * 75) print(" FINAL SESSION PERFORMANCE SUMMARY") diff --git a/tests/test_hybrid.py b/tests/test_hybrid.py new file mode 100644 index 0000000..6df4519 --- /dev/null +++ b/tests/test_hybrid.py @@ -0,0 +1,136 @@ +import time +from unittest.mock import patch + +import trueentropy +from trueentropy.config import configure, reset_config +from trueentropy.hybrid import HybridTap +from trueentropy.pool import EntropyPool +from trueentropy.tap import EntropyTap + + +class TestHybridTap: + + def test_initial_reseed(self): + """Test that HybridTap reseeds on initialization.""" + pool = EntropyPool() + # Mock extract to verifying it's called + with patch.object(pool, "extract", wraps=pool.extract) as mock_extract: + tap = HybridTap(pool, reseed_on_init=True) + mock_extract.assert_called_with(32) + + # Verify tap works + assert 0.0 <= tap.random() < 1.0 + + def test_manual_reseed(self): + """Test manual reseeding.""" + pool = EntropyPool() + tap = HybridTap(pool) + + with patch.object(pool, "extract", wraps=pool.extract) as mock_extract: + tap.reseed() + mock_extract.assert_called_with(32) + + def test_auto_reseed_on_time(self): + """Test that tap reseeds after interval calls.""" + pool = EntropyPool() + interval = 0.1 + tap = HybridTap(pool, reseed_interval=interval) + + # Initial access + tap.random() + last_reseed = tap._last_reseed_time + + # Access immediately - should NOT reseed + tap.random() + assert tap._last_reseed_time == last_reseed + + # Wait for interval + time.sleep(interval + 0.05) + + # Access again - SHOULD reseed + tap.random() + assert tap._last_reseed_time > last_reseed + + def test_random_methods(self): + """Test that all random methods work and delegate to internal PRNG.""" + pool = EntropyPool() + tap = HybridTap(pool) + + assert isinstance(tap.random(), float) + assert isinstance(tap.randint(1, 10), int) + assert len(tap.randbytes(10)) == 10 + assert tap.choice([1, 2, 3]) in [1, 2, 3] + + seq = [1, 2, 3] + tap.shuffle(seq) + assert len(seq) == 3 + + sample = tap.sample([1, 2, 3, 4], 2) + assert len(sample) == 2 + assert len(set(sample)) == 2 # Unique + + def test_performance_comparison(self): + """Verify that HybridTap is faster than EntropyTap (sanity check).""" + pool = EntropyPool() + direct_tap = EntropyTap(pool) + hybrid_tap = HybridTap(pool) + + # Measure 1000 calls + start = time.perf_counter() + for _ in range(1000): + direct_tap.random() + direct_duration = time.perf_counter() - start + + start = time.perf_counter() + for _ in range(1000): + hybrid_tap.random() + hybrid_duration = time.perf_counter() - start + + # Hybrid should be significantly faster + # (Though with 1000 calls it might be noise, but typically it's 10x-100x faster) + # We assert simple faster, if it fails due to noise, we might need more samples + # but let's be conservative. + assert ( + hybrid_duration < direct_duration + ), f"Hybrid ({hybrid_duration:.5f}s) should be faster than Direct ({direct_duration:.5f}s)" + + +class TestHybridConfig: + + def setup_method(self): + """Reset config before each test.""" + reset_config() + # Reset global tap to default + configure(mode="DIRECT") + + def teardown_method(self): + reset_config() + configure(mode="DIRECT") + + def test_switch_modes(self): + """Test switching between DIRECT and HYBRID modes.""" + # Default is DIRECT + assert isinstance(trueentropy.get_tap(), EntropyTap) + + # Switch to HYBRID + trueentropy.configure(mode="HYBRID") + tap = trueentropy.get_tap() + assert isinstance(tap, HybridTap) + # Verify default interval + assert tap._reseed_interval == 60.0 + + # Switch back to DIRECT + trueentropy.configure(mode="DIRECT") + assert isinstance(trueentropy.get_tap(), EntropyTap) + + def test_hybrid_config_params(self): + """Test configuring hybrid params.""" + trueentropy.configure(mode="HYBRID", hybrid_reseed_interval=123.4) + tap = trueentropy.get_tap() + assert isinstance(tap, HybridTap) + assert tap._reseed_interval == 123.4 + + # Test update interval only + trueentropy.configure(hybrid_reseed_interval=456.7) + tap = trueentropy.get_tap() + assert tap._reseed_interval == 456.7 diff --git a/tests/test_integration.py b/tests/test_integration.py index 6a817b0..f1a05af 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -17,79 +17,77 @@ import time -import pytest - class TestPublicAPI: """Test the public API exported from trueentropy.""" - + def test_import(self) -> None: """Library should import without errors.""" import trueentropy - + assert hasattr(trueentropy, "__version__") - + def test_random(self) -> None: """trueentropy.random() should work.""" import trueentropy - + value = trueentropy.random() - + assert isinstance(value, float) assert 0.0 <= value < 1.0 - + def test_randint(self) -> None: """trueentropy.randint() should work.""" import trueentropy - + value = trueentropy.randint(1, 100) - + assert isinstance(value, int) assert 1 <= value <= 100 - + def test_randbool(self) -> None: """trueentropy.randbool() should work.""" import trueentropy - + value = trueentropy.randbool() - + assert isinstance(value, bool) - + def test_choice(self) -> None: """trueentropy.choice() should work.""" import trueentropy - + items = ["a", "b", "c"] value = trueentropy.choice(items) - + assert value in items - + def test_randbytes(self) -> None: """trueentropy.randbytes() should work.""" import trueentropy - + value = trueentropy.randbytes(32) - + assert isinstance(value, bytes) assert len(value) == 32 - + def test_shuffle(self) -> None: """trueentropy.shuffle() should work.""" import trueentropy - + items = [1, 2, 3, 4, 5] trueentropy.shuffle(items) - + assert len(items) == 5 assert set(items) == {1, 2, 3, 4, 5} - + def test_sample(self) -> None: """trueentropy.sample() should work.""" import trueentropy - + items = list(range(10)) result = trueentropy.sample(items, 5) - + assert len(result) == 5 assert len(set(result)) == 5 for item in result: @@ -98,64 +96,64 @@ def test_sample(self) -> None: class TestHealthAPI: """Test the health monitoring API.""" - + def test_health_returns_dict(self) -> None: """health() should return a dictionary.""" import trueentropy - + status = trueentropy.health() - + assert isinstance(status, dict) - + def test_health_has_required_keys(self) -> None: """health() should have all required keys.""" import trueentropy - + status = trueentropy.health() - + assert "score" in status assert "status" in status assert "entropy_bits" in status assert "recommendation" in status - + def test_health_score_in_range(self) -> None: """health score should be 0-100.""" import trueentropy - + status = trueentropy.health() - + assert 0 <= status["score"] <= 100 - + def test_health_status_valid(self) -> None: """health status should be valid value.""" import trueentropy - + status = trueentropy.health() - + assert status["status"] in ["healthy", "degraded", "critical"] class TestFeedAPI: """Test the manual feed API.""" - + def test_feed_accepts_bytes(self) -> None: """feed() should accept bytes.""" import trueentropy - + # Should not raise trueentropy.feed(b"test entropy data") - + def test_feed_affects_pool(self) -> None: """feed() should affect random output.""" import trueentropy - + # Get current health before = trueentropy.health() - + # Feed a lot of data for _ in range(10): trueentropy.feed(b"entropy" * 100) - + # Health should still be valid after = trueentropy.health() assert 0 <= after["score"] <= 100 @@ -163,100 +161,102 @@ def test_feed_affects_pool(self) -> None: class TestAdvancedAPI: """Test advanced API functions.""" - + def test_get_pool(self) -> None: """get_pool() should return EntropyPool.""" import trueentropy from trueentropy.pool import EntropyPool - + pool = trueentropy.get_pool() - + assert isinstance(pool, EntropyPool) - + def test_get_tap(self) -> None: """get_tap() should return EntropyTap.""" import trueentropy from trueentropy.tap import EntropyTap - + tap = trueentropy.get_tap() - + assert isinstance(tap, EntropyTap) class TestBackgroundCollector: """Test background collector functionality.""" - + def setup_method(self) -> None: """Reset config and stop collector before each test.""" import trueentropy + # Use offline mode for faster tests (no network harvesters blocking) trueentropy.configure(offline_mode=True) trueentropy.stop_collector() time.sleep(0.1) - + def teardown_method(self) -> None: """Ensure collector is stopped after each test.""" import trueentropy + trueentropy.stop_collector() trueentropy.reset_config() # Reset config for next test time.sleep(0.1) # Give time for thread to stop - + def test_start_and_stop_collector(self) -> None: """Collector should start and stop cleanly.""" import trueentropy from trueentropy.collector import is_collector_running - + # Ensure stopped first trueentropy.stop_collector() time.sleep(0.1) - + # Start collector (uses offline mode from setup_method) trueentropy.start_collector(interval=0.5) time.sleep(0.2) # Give time for thread to start - + # Should be running assert is_collector_running() - + # Stop collector trueentropy.stop_collector() - + # Should be stopped # Give it a moment to stop time.sleep(0.3) assert not is_collector_running() - + def test_collector_feeds_pool(self) -> None: """Collector should feed entropy into the pool.""" import trueentropy - + # Ensure stopped first trueentropy.stop_collector() time.sleep(0.1) - + pool = trueentropy.get_pool() initial_fed = pool.total_fed - + # Start collector with fast interval (uses offline mode) trueentropy.start_collector(interval=0.2) - + # Wait for some collections time.sleep(0.8) - + # Stop collector trueentropy.stop_collector() time.sleep(0.2) - + # Should have fed some data assert pool.total_fed > initial_fed class TestExports: """Test that all expected names are exported.""" - + def test_all_exports(self) -> None: """__all__ should contain expected names.""" import trueentropy - + expected = [ "__version__", "random", @@ -276,7 +276,7 @@ def test_all_exports(self) -> None: "EntropyTap", "HealthStatus", ] - + for name in expected: assert name in trueentropy.__all__ assert hasattr(trueentropy, name) @@ -284,36 +284,37 @@ def test_all_exports(self) -> None: class TestRandomQuality: """Test the quality of random output.""" - + def test_no_obvious_patterns(self) -> None: """Random output should not have obvious patterns.""" import trueentropy - + # Generate many random bytes data = trueentropy.randbytes(1000) - + # Count each byte value from collections import Counter + counts = Counter(data) - + # No single byte should appear more than ~10% of the time # (uniform distribution would be 1000/256 ≈ 4 per byte) max_count = max(counts.values()) assert max_count < 50 # Very conservative threshold - + def test_entropy_estimation(self) -> None: """Pool should track entropy correctly.""" from trueentropy.pool import EntropyPool from trueentropy.tap import EntropyTap - + # Use a fresh pool for this test to avoid state pollution pool = EntropyPool() tap = EntropyTap(pool) - + # Extract some entropy initial_bits = pool.entropy_bits tap.randbytes(64) after_bits = pool.entropy_bits - + # Entropy should have decreased assert after_bits < initial_bits diff --git a/tests/test_pool.py b/tests/test_pool.py index d705741..17fdd16 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -26,222 +26,219 @@ class TestEntropyPoolInitialization: """Test EntropyPool initialization.""" - + def test_default_initialization(self) -> None: """Pool should initialize with random seed.""" pool = EntropyPool() - + # Pool should have full entropy initially assert pool.entropy_bits == pool.POOL_SIZE * 8 assert pool.total_fed == 0 assert pool.total_extracted == 0 - + def test_custom_seed_initialization(self) -> None: """Pool should accept custom seed.""" seed = b"test_seed_1234567890" pool = EntropyPool(seed=seed) - + # Pool should still have expected size assert pool.entropy_bits == pool.POOL_SIZE * 8 - + def test_different_pools_have_different_state(self) -> None: """Two pools without seed should have different states.""" pool1 = EntropyPool() pool2 = EntropyPool() - + # Extract bytes from both - they should be different bytes1 = pool1.extract(32) bytes2 = pool2.extract(32) - + assert bytes1 != bytes2 class TestEntropyPoolFeed: """Test EntropyPool feed operation.""" - + def test_feed_updates_state(self) -> None: """Feeding data should change pool state.""" pool = EntropyPool(seed=b"fixed_seed") - + # Extract initial bytes initial = pool.extract(32) - + # Create new pool with same seed pool2 = EntropyPool(seed=b"fixed_seed") - + # Feed some data pool2.feed(b"new_entropy_data") - + # Extract bytes - should be different after_feed = pool2.extract(32) - + assert initial != after_feed - + def test_feed_updates_statistics(self) -> None: """Feeding should update total_fed counter.""" pool = EntropyPool() - + data = b"test_data_12345" pool.feed(data) - + assert pool.total_fed == len(data) - + def test_feed_updates_entropy_estimate(self) -> None: """Feeding with entropy estimate should update bits.""" pool = EntropyPool() initial_bits = pool.entropy_bits - + pool.feed(b"data", entropy_estimate=16) - + # Should be capped at max - assert pool.entropy_bits == min( - initial_bits + 16, - pool.POOL_SIZE * 8 - ) - + assert pool.entropy_bits == min(initial_bits + 16, pool.POOL_SIZE * 8) + def test_empty_feed_is_noop(self) -> None: """Feeding empty data should not change state.""" pool = EntropyPool(seed=b"fixed") initial = pool.extract(16) - + pool2 = EntropyPool(seed=b"fixed") pool2.feed(b"") after = pool2.extract(16) - + assert initial == after - + def test_feed_updates_last_feed_time(self) -> None: """Feed should update last_feed_time.""" pool = EntropyPool() initial_time = pool.last_feed_time - + time.sleep(0.01) # Small delay pool.feed(b"data") - + assert pool.last_feed_time > initial_time class TestEntropyPoolExtract: """Test EntropyPool extract operation.""" - + def test_extract_returns_correct_size(self) -> None: """Extract should return exact number of bytes requested.""" pool = EntropyPool() - + for size in [1, 16, 32, 64, 100, 256]: result = pool.extract(size) assert len(result) == size - + def test_extract_updates_statistics(self) -> None: """Extracting should update total_extracted counter.""" pool = EntropyPool() - + pool.extract(32) assert pool.total_extracted == 32 - + pool.extract(16) assert pool.total_extracted == 48 - + def test_extract_reduces_entropy_estimate(self) -> None: """Extracting should reduce entropy bits estimate.""" pool = EntropyPool() initial_bits = pool.entropy_bits - + pool.extract(32) - + assert pool.entropy_bits == initial_bits - (32 * 8) - + def test_extract_never_returns_same_bytes(self) -> None: """Successive extractions should never return same bytes.""" pool = EntropyPool() - + results = [pool.extract(32) for _ in range(100)] - + # All results should be unique assert len(set(results)) == len(results) - + def test_extract_forward_secrecy(self) -> None: """Same seed should give different results after extraction.""" # Create two pools with same seed seed = b"test_seed_for_forward_secrecy" pool1 = EntropyPool(seed=seed) pool2 = EntropyPool(seed=seed) - + # Both should give same first extraction first1 = pool1.extract(16) first2 = pool2.extract(16) assert first1 == first2 - + # But second extraction should still be same # (pool state updated deterministically) second1 = pool1.extract(16) second2 = pool2.extract(16) assert second1 == second2 - + def test_extract_invalid_size_raises(self) -> None: """Extracting zero or negative bytes should raise.""" pool = EntropyPool() - + with pytest.raises(ValueError): pool.extract(0) - + with pytest.raises(ValueError): pool.extract(-1) class TestEntropyPoolReseed: """Test EntropyPool reseed operation.""" - + def test_reseed_changes_state(self) -> None: """Reseed should change pool state.""" pool = EntropyPool(seed=b"fixed") before = pool.extract(32) - + pool2 = EntropyPool(seed=b"fixed") pool2.reseed() after = pool2.extract(32) - + assert before != after - + def test_reseed_restores_entropy(self) -> None: """Reseed should restore entropy estimate.""" pool = EntropyPool() - + # Drain some entropy for _ in range(10): pool.extract(64) - + low_bits = pool.entropy_bits - + # Reseed pool.reseed() - + # Should have full entropy again assert pool.entropy_bits > low_bits class TestEntropyPoolThreadSafety: """Test EntropyPool thread safety.""" - + def test_concurrent_feed_and_extract(self) -> None: """Pool should handle concurrent operations safely.""" pool = EntropyPool() errors: list = [] - + def feed_worker() -> None: try: for _ in range(100): pool.feed(b"concurrent_data") except Exception as e: errors.append(e) - + def extract_worker() -> None: try: for _ in range(100): pool.extract(16) except Exception as e: errors.append(e) - + # Start multiple threads threads = [ threading.Thread(target=feed_worker), @@ -249,41 +246,41 @@ def extract_worker() -> None: threading.Thread(target=extract_worker), threading.Thread(target=extract_worker), ] - + for t in threads: t.start() - + for t in threads: t.join() - + # No errors should have occurred assert len(errors) == 0 - + def test_statistics_consistency(self) -> None: """Statistics should remain consistent under concurrent access.""" pool = EntropyPool() - + def worker(feed_size: int, extract_size: int) -> None: for _ in range(50): pool.feed(b"x" * feed_size) pool.extract(extract_size) - + threads = [ threading.Thread(target=worker, args=(10, 8)), threading.Thread(target=worker, args=(20, 16)), threading.Thread(target=worker, args=(15, 12)), ] - + for t in threads: t.start() - + for t in threads: t.join() - + # Total fed should be sum of all feeds expected_fed = 50 * (10 + 20 + 15) assert pool.total_fed == expected_fed - + # Total extracted should be sum of all extracts expected_extracted = 50 * (8 + 16 + 12) assert pool.total_extracted == expected_extracted @@ -291,15 +288,15 @@ def worker(feed_size: int, extract_size: int) -> None: class TestEntropyPoolRepr: """Test EntropyPool string representation.""" - + def test_repr_contains_stats(self) -> None: """Repr should contain useful statistics.""" pool = EntropyPool() pool.feed(b"test") pool.extract(16) - + repr_str = repr(pool) - + assert "EntropyPool" in repr_str assert "entropy_bits" in repr_str assert "total_fed" in repr_str diff --git a/tests/test_tap.py b/tests/test_tap.py index 26f4474..39c8121 100644 --- a/tests/test_tap.py +++ b/tests/test_tap.py @@ -28,97 +28,97 @@ class TestEntropyTapRandom: """Test EntropyTap.random() method.""" - + def test_random_returns_float(self) -> None: """random() should return a float.""" pool = EntropyPool() tap = EntropyTap(pool) - + value = tap.random() - + assert isinstance(value, float) - + def test_random_in_range(self) -> None: """random() should return values in [0.0, 1.0).""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(1000): value = tap.random() assert 0.0 <= value < 1.0 - + def test_random_distribution(self) -> None: """random() should be approximately uniform.""" pool = EntropyPool() tap = EntropyTap(pool) - + # Generate samples samples = [tap.random() for _ in range(10000)] - + # Check mean is close to 0.5 mean = sum(samples) / len(samples) assert 0.45 < mean < 0.55 - + # Check variance is close to 1/12 (uniform distribution) variance = sum((x - mean) ** 2 for x in samples) / len(samples) - expected_variance = 1/12 + expected_variance = 1 / 12 assert abs(variance - expected_variance) < 0.02 class TestEntropyTapRandint: """Test EntropyTap.randint() method.""" - + def test_randint_returns_int(self) -> None: """randint() should return an integer.""" pool = EntropyPool() tap = EntropyTap(pool) - + value = tap.randint(1, 10) - + assert isinstance(value, int) - + def test_randint_in_range(self) -> None: """randint(a, b) should return values in [a, b].""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(1000): value = tap.randint(5, 15) assert 5 <= value <= 15 - + def test_randint_single_value(self) -> None: """randint(a, a) should always return a.""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(100): assert tap.randint(42, 42) == 42 - + def test_randint_negative_range(self) -> None: """randint() should work with negative numbers.""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(100): value = tap.randint(-10, -5) assert -10 <= value <= -5 - + def test_randint_invalid_range_raises(self) -> None: """randint(a, b) where a > b should raise ValueError.""" pool = EntropyPool() tap = EntropyTap(pool) - + with pytest.raises(ValueError): tap.randint(10, 5) - + def test_randint_distribution(self) -> None: """randint() should produce uniform distribution.""" pool = EntropyPool() tap = EntropyTap(pool) - + # Roll a die 6000 times counts = Counter(tap.randint(1, 6) for _ in range(6000)) - + # Each face should appear roughly 1000 times for face in range(1, 7): assert 800 < counts[face] < 1200 @@ -126,26 +126,26 @@ def test_randint_distribution(self) -> None: class TestEntropyTapRandbool: """Test EntropyTap.randbool() method.""" - + def test_randbool_returns_bool(self) -> None: """randbool() should return a boolean.""" pool = EntropyPool() tap = EntropyTap(pool) - + value = tap.randbool() - + assert isinstance(value, bool) - + def test_randbool_distribution(self) -> None: """randbool() should be approximately 50/50.""" pool = EntropyPool() tap = EntropyTap(pool) - + results = [tap.randbool() for _ in range(10000)] - + true_count = sum(results) false_count = len(results) - true_count - + # Should be close to 50% each assert 4500 < true_count < 5500 assert 4500 < false_count < 5500 @@ -153,75 +153,75 @@ def test_randbool_distribution(self) -> None: class TestEntropyTapRandbytes: """Test EntropyTap.randbytes() method.""" - + def test_randbytes_returns_bytes(self) -> None: """randbytes() should return bytes.""" pool = EntropyPool() tap = EntropyTap(pool) - + value = tap.randbytes(16) - + assert isinstance(value, bytes) - + def test_randbytes_correct_length(self) -> None: """randbytes(n) should return exactly n bytes.""" pool = EntropyPool() tap = EntropyTap(pool) - + for n in [1, 8, 16, 32, 64, 100, 256]: result = tap.randbytes(n) assert len(result) == n - + def test_randbytes_invalid_size_raises(self) -> None: """randbytes() with invalid size should raise.""" pool = EntropyPool() tap = EntropyTap(pool) - + with pytest.raises(ValueError): tap.randbytes(0) - + with pytest.raises(ValueError): tap.randbytes(-1) class TestEntropyTapChoice: """Test EntropyTap.choice() method.""" - + def test_choice_returns_element(self) -> None: """choice() should return an element from the sequence.""" pool = EntropyPool() tap = EntropyTap(pool) - + items = ["apple", "banana", "cherry"] - + for _ in range(100): result = tap.choice(items) assert result in items - + def test_choice_single_element(self) -> None: """choice() with single element should always return it.""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(100): assert tap.choice([42]) == 42 - + def test_choice_empty_raises(self) -> None: """choice() with empty sequence should raise IndexError.""" pool = EntropyPool() tap = EntropyTap(pool) - + with pytest.raises(IndexError): tap.choice([]) - + def test_choice_distribution(self) -> None: """choice() should select uniformly.""" pool = EntropyPool() tap = EntropyTap(pool) - + items = ["a", "b", "c", "d"] counts = Counter(tap.choice(items) for _ in range(4000)) - + # Each should be selected ~1000 times for item in items: assert 800 < counts[item] < 1200 @@ -229,124 +229,124 @@ def test_choice_distribution(self) -> None: class TestEntropyTapShuffle: """Test EntropyTap.shuffle() method.""" - + def test_shuffle_modifies_in_place(self) -> None: """shuffle() should modify the list in place.""" pool = EntropyPool() tap = EntropyTap(pool) - + items = [1, 2, 3, 4, 5] original = items.copy() - + tap.shuffle(items) - + # Should be same elements assert sorted(items) == sorted(original) - + def test_shuffle_changes_order(self) -> None: """shuffle() should change the order (usually).""" pool = EntropyPool() tap = EntropyTap(pool) - + items = list(range(10)) original = items.copy() - + tap.shuffle(items) - + # Very unlikely to be the same order assert items != original - + def test_shuffle_preserves_elements(self) -> None: """shuffle() should not add or remove elements.""" pool = EntropyPool() tap = EntropyTap(pool) - + items = ["a", "b", "c", "d", "e"] tap.shuffle(items) - + assert len(items) == 5 assert set(items) == {"a", "b", "c", "d", "e"} class TestEntropyTapSample: """Test EntropyTap.sample() method.""" - + def test_sample_returns_list(self) -> None: """sample() should return a list.""" pool = EntropyPool() tap = EntropyTap(pool) - + result = tap.sample([1, 2, 3, 4, 5], 3) - + assert isinstance(result, list) - + def test_sample_correct_length(self) -> None: """sample() should return k elements.""" pool = EntropyPool() tap = EntropyTap(pool) - + result = tap.sample(range(10), 5) - + assert len(result) == 5 - + def test_sample_unique_elements(self) -> None: """sample() should return unique elements.""" pool = EntropyPool() tap = EntropyTap(pool) - + result = tap.sample(range(100), 50) - + assert len(set(result)) == 50 - + def test_sample_from_sequence(self) -> None: """sample() elements should be from the sequence.""" pool = EntropyPool() tap = EntropyTap(pool) - + items = ["a", "b", "c", "d", "e"] result = tap.sample(items, 3) - + for item in result: assert item in items - + def test_sample_invalid_k_raises(self) -> None: """sample() with invalid k should raise.""" pool = EntropyPool() tap = EntropyTap(pool) - + with pytest.raises(ValueError): tap.sample([1, 2, 3], 5) # k > len - + with pytest.raises(ValueError): tap.sample([1, 2, 3], -1) # k < 0 - + def test_sample_zero(self) -> None: """sample(seq, 0) should return empty list.""" pool = EntropyPool() tap = EntropyTap(pool) - + result = tap.sample([1, 2, 3], 0) - + assert result == [] class TestEntropyTapUniform: """Test EntropyTap.uniform() method.""" - + def test_uniform_in_range(self) -> None: """uniform(a, b) should return values in [a, b].""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(1000): value = tap.uniform(5.0, 10.0) assert 5.0 <= value <= 10.0 - + def test_uniform_negative_range(self) -> None: """uniform() should work with negative numbers.""" pool = EntropyPool() tap = EntropyTap(pool) - + for _ in range(100): value = tap.uniform(-10.0, -5.0) assert -10.0 <= value <= -5.0 @@ -354,33 +354,33 @@ def test_uniform_negative_range(self) -> None: class TestEntropyTapGauss: """Test EntropyTap.gauss() method.""" - + def test_gauss_returns_float(self) -> None: """gauss() should return a float.""" pool = EntropyPool() tap = EntropyTap(pool) - + value = tap.gauss() - + assert isinstance(value, float) - + def test_gauss_mean(self) -> None: """gauss() should have correct mean.""" pool = EntropyPool() tap = EntropyTap(pool) - + samples = [tap.gauss(mu=5.0, sigma=1.0) for _ in range(10000)] mean = sum(samples) / len(samples) - + assert 4.8 < mean < 5.2 - + def test_gauss_standard_deviation(self) -> None: """gauss() should have correct standard deviation.""" pool = EntropyPool() tap = EntropyTap(pool) - + samples = [tap.gauss(mu=0.0, sigma=2.0) for _ in range(10000)] mean = sum(samples) / len(samples) std = math.sqrt(sum((x - mean) ** 2 for x in samples) / len(samples)) - + assert 1.8 < std < 2.2