diff --git a/data_structures/advanced_trie.py b/data_structures/advanced_trie.py new file mode 100644 index 000000000000..dadde3763ceb --- /dev/null +++ b/data_structures/advanced_trie.py @@ -0,0 +1,556 @@ +""" +Advanced Trie (Prefix Tree) implementation. + +A Trie is a tree-like data structure that stores strings in a way that +allows for efficient prefix-based operations. This implementation includes: +- Basic insert, search, and delete operations +- Prefix search and autocomplete +- Longest common prefix +- Pattern matching with wildcards + +Time Complexity: + - Insert: O(m) where m is the length of the string + - Search: O(m) where m is the length of the string + - Delete: O(m) where m is the length of the string + - Prefix search: O(m + k) where m is prefix length, k is number of results +Space Complexity: O(ALPHABET_SIZE * N * M) where N is number of strings, M is avg length + +Reference: https://en.wikipedia.org/wiki/Trie +""" + +from typing import Optional, Any + + +class TrieNode: + """Node in the Trie data structure.""" + + def __init__(self): + self.children: dict[str, TrieNode] = {} + self.is_end_of_word: bool = False + self.word_count: int = 0 # Number of words ending at this node + self.prefix_count: int = 0 # Number of words with this prefix + self.data: Any = None # Additional data associated with the word + + +class Trie: + """ + Advanced Trie implementation with comprehensive functionality. + + Attributes: + root: Root node of the trie + size: Number of words in the trie + """ + + def __init__(self): + """Initialize an empty Trie.""" + self.root = TrieNode() + self.size = 0 + + def insert(self, word: str, data: Any = None) -> None: + """ + Insert a word into the trie. + + Args: + word: Word to insert + data: Additional data to associate with the word + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.search("hello") + True + """ + if not word: + return + + node = self.root + for char in word: + if char not in node.children: + node.children[char] = TrieNode() + node = node.children[char] + node.prefix_count += 1 + + if not node.is_end_of_word: + self.size += 1 + node.is_end_of_word = True + + node.word_count += 1 + node.data = data + + def search(self, word: str) -> bool: + """ + Search for a word in the trie. + + Args: + word: Word to search for + + Returns: + True if word exists, False otherwise + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.search("hello") + True + >>> trie.search("world") + False + """ + node = self._find_node(word) + return node is not None and node.is_end_of_word + + def starts_with(self, prefix: str) -> bool: + """ + Check if any word in the trie starts with the given prefix. + + Args: + prefix: Prefix to check + + Returns: + True if any word starts with prefix, False otherwise + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.starts_with("hel") + True + >>> trie.starts_with("xyz") + False + """ + node = self._find_node(prefix) + return node is not None + + def delete(self, word: str) -> bool: + """ + Delete a word from the trie. + + Args: + word: Word to delete + + Returns: + True if word was deleted, False if word didn't exist + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.delete("hello") + True + >>> trie.search("hello") + False + """ + if not word: + return False + + # First check if word exists + if not self.search(word): + return False + + # Delete the word + self._delete_helper(self.root, word, 0) + self.size -= 1 + return True + + def _delete_helper(self, node: TrieNode, word: str, index: int) -> bool: + """Helper method for deletion.""" + if index == len(word): + if node.is_end_of_word: + node.is_end_of_word = False + node.word_count -= 1 + return node.word_count == 0 + return False + + char = word[index] + if char not in node.children: + return False + + should_delete_child = self._delete_helper(node.children[char], word, index + 1) + + if should_delete_child: + del node.children[char] + + node.prefix_count -= 1 + return len(node.children) == 0 and not node.is_end_of_word + + def _find_node(self, word: str) -> TrieNode | None: + """ + Find the node corresponding to the given word. + + Args: + word: Word to find node for + + Returns: + TrieNode if found, None otherwise + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> node = trie._find_node("hello") + >>> node is not None + True + >>> node.is_end_of_word + True + >>> trie._find_node("world") is None + True + """ + node = self.root + for char in word: + if char not in node.children: + return None + node = node.children[char] + return node + + def get_all_words_with_prefix(self, prefix: str) -> list[str]: + """ + Get all words that start with the given prefix. + + Args: + prefix: Prefix to search for + + Returns: + List of words starting with the prefix + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> trie.insert("world") + >>> trie.get_all_words_with_prefix("hel") + ['hello', 'help'] + """ + node = self._find_node(prefix) + if node is None: + return [] + + words = [] + self._collect_words(node, prefix, words) + return words + + def _collect_words( + self, node: TrieNode, current_word: str, words: list[str] + ) -> None: + """ + Collect all words from a given node. + + Args: + node: Current trie node + current_word: Current word being built + words: List to collect words into + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> words = [] + >>> trie._collect_words(trie._find_node("hel"), "hel", words) + >>> sorted(words) + ['hello', 'help'] + """ + if node.is_end_of_word: + words.append(current_word) + + for char, child_node in node.children.items(): + self._collect_words(child_node, current_word + char, words) + + def autocomplete(self, prefix: str, max_results: int = 10) -> list[str]: + """ + Get autocomplete suggestions for the given prefix. + + Args: + prefix: Prefix to autocomplete + max_results: Maximum number of results to return + + Returns: + List of autocomplete suggestions + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> trie.insert("world") + >>> trie.autocomplete("hel", 5) + ['hello', 'help'] + """ + words = self.get_all_words_with_prefix(prefix) + return words[:max_results] + + def longest_common_prefix(self) -> str: + """ + Find the longest common prefix of all words in the trie. + + Returns: + Longest common prefix + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> trie.insert("helicopter") + >>> trie.longest_common_prefix() + 'hel' + """ + if self.size == 0: + return "" + + prefix = "" + node = self.root + + while len(node.children) == 1 and not node.is_end_of_word: + char = next(iter(node.children.keys())) + prefix += char + node = node.children[char] + + return prefix + + def get_word_count(self, word: str) -> int: + """ + Get the count of how many times a word was inserted. + + Args: + word: Word to get count for + + Returns: + Number of times the word was inserted + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("hello") + >>> trie.get_word_count("hello") + 2 + """ + node = self._find_node(word) + return node.word_count if node and node.is_end_of_word else 0 + + def get_prefix_count(self, prefix: str) -> int: + """ + Get the count of words that start with the given prefix. + + Args: + prefix: Prefix to count + + Returns: + Number of words starting with the prefix + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> trie.get_prefix_count("hel") + 2 + """ + node = self._find_node(prefix) + return node.prefix_count if node else 0 + + def pattern_search(self, pattern: str) -> list[str]: + """ + Search for words matching a pattern with wildcards. + Supports '*' for any character and '?' for single character. + + Args: + pattern: Pattern to match (supports * and ? wildcards) + + Returns: + List of words matching the pattern + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> trie.insert("world") + >>> trie.pattern_search("hel*") + ['hello', 'help'] + """ + words = [] + self._pattern_search_helper(self.root, "", pattern, words) + return words + + def _pattern_search_helper( + self, node: TrieNode, current_word: str, pattern: str, words: list[str] + ) -> None: + """ + Helper method for pattern search. + + Args: + node: Current trie node + current_word: Current word being built + pattern: Pattern to match + words: List to collect matching words + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("help") + >>> words = [] + >>> trie._pattern_search_helper(trie.root, "", "hel*", words) + >>> sorted(words) + ['hello', 'help'] + """ + if not pattern: + if node.is_end_of_word: + words.append(current_word) + return + + char = pattern[0] + remaining_pattern = pattern[1:] + + if char == "*": + # Match zero or more characters + self._pattern_search_helper(node, current_word, remaining_pattern, words) + for child_char, child_node in node.children.items(): + self._pattern_search_helper( + child_node, current_word + child_char, pattern, words + ) + elif char == "?": + # Match any single character + for child_char, child_node in node.children.items(): + self._pattern_search_helper( + child_node, current_word + child_char, remaining_pattern, words + ) + elif char in node.children: + # Match exact character + self._pattern_search_helper( + node.children[char], current_word + char, remaining_pattern, words + ) + + def get_all_words(self) -> list[str]: + """ + Get all words in the trie. + + Returns: + List of all words + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("world") + >>> trie.get_all_words() + ['hello', 'world'] + """ + return self.get_all_words_with_prefix("") + + def clear(self) -> None: + """ + Clear all words from the trie. + + Examples: + >>> trie = Trie() + >>> trie.insert("hello") + >>> trie.insert("world") + >>> len(trie) + 2 + >>> trie.clear() + >>> len(trie) + 0 + >>> trie.search("hello") + False + """ + self.root = TrieNode() + self.size = 0 + + def __len__(self) -> int: + """Return the number of words in the trie.""" + return self.size + + def __contains__(self, word: str) -> bool: + """Support 'in' operator.""" + return self.search(word) + + def __repr__(self) -> str: + """String representation of the trie.""" + return f"Trie(size={self.size})" + + +class CompressedTrie(Trie): + """ + Compressed Trie implementation for memory efficiency. + + Reduces memory usage by compressing chains of single-child nodes. + """ + + def __init__(self): + super().__init__() + self.compressed = True + + def _compress(self) -> None: + """Compress the trie by merging single-child chains.""" + self._compress_helper(self.root) + + def _compress_helper(self, node: TrieNode) -> None: + """Helper method for compression.""" + if len(node.children) == 1 and not node.is_end_of_word: + _child_char, child_node = next(iter(node.children.items())) + # Merge single child + node.children = child_node.children + node.is_end_of_word = child_node.is_end_of_word + node.word_count = child_node.word_count + node.prefix_count = child_node.prefix_count + node.data = child_node.data + + for child_node in node.children.values(): + self._compress_helper(child_node) + + +if __name__ == "__main__": + # Example usage + print("Trie Example") + print("=" * 50) + + # Create Trie + trie = Trie() + + # Insert words + words = ["hello", "help", "world", "word", "helicopter", "hero", "her"] + for word in words: + trie.insert(word) + print(f"Inserted: {word}") + + print(f"\nTrie size: {len(trie)}") + + # Search operations + print("\nSearch operations:") + search_words = ["hello", "help", "xyz", "world"] + for word in search_words: + result = trie.search(word) + print(f"'{word}': {'Found' if result else 'Not found'}") + + # Prefix operations + print("\nPrefix operations:") + prefixes = ["hel", "wor", "xyz"] + for prefix in prefixes: + has_prefix = trie.starts_with(prefix) + words_with_prefix = trie.get_all_words_with_prefix(prefix) + print(f"Prefix '{prefix}': {has_prefix}, Words: {words_with_prefix}") + + # Autocomplete + print("\nAutocomplete:") + autocomplete_prefixes = ["hel", "wor"] + for prefix in autocomplete_prefixes: + suggestions = trie.autocomplete(prefix, 3) + print(f"'{prefix}' -> {suggestions}") + + # Longest common prefix + print(f"\nLongest common prefix: '{trie.longest_common_prefix()}'") + + # Pattern search + print("\nPattern search:") + patterns = ["hel*", "wor?", "h*"] + for pattern in patterns: + matches = trie.pattern_search(pattern) + print(f"Pattern '{pattern}': {matches}") + + # Word counts + print("\nWord counts:") + trie.insert("hello") # Insert again + print(f"'hello' count: {trie.get_word_count('hello')}") + print(f"'hel' prefix count: {trie.get_prefix_count('hel')}") + + # Delete operation + print("\nDelete operation:") + print(f"Before delete - 'help' exists: {trie.search('help')}") + trie.delete("help") + print(f"After delete - 'help' exists: {trie.search('help')}") + print(f"After delete - 'hel' prefix count: {trie.get_prefix_count('hel')}") + + # All words + print(f"\nAll words in trie: {trie.get_all_words()}") + + print("\nTrie implementation completed successfully!") diff --git a/data_structures/bloom_filter.py b/data_structures/bloom_filter.py new file mode 100644 index 000000000000..5d5419aa330f --- /dev/null +++ b/data_structures/bloom_filter.py @@ -0,0 +1,351 @@ +""" +Bloom Filter implementation. + +A Bloom filter is a space-efficient probabilistic data structure that is designed +to test whether an element is a member of a set. It can have false positives +but never false negatives. + +Time Complexity: + - Insert: O(k) where k is the number of hash functions + - Lookup: O(k) where k is the number of hash functions +Space Complexity: O(m) where m is the size of the bit array + +Reference: https://en.wikipedia.org/wiki/Bloom_filter +""" + +import hashlib +import math +from typing import Union + + +class BloomFilter: + """ + Bloom Filter implementation with configurable false positive rate. + + Attributes: + bit_array: List of bits representing the filter + hash_functions: Number of hash functions to use + size: Size of the bit array + count: Number of elements added to the filter + """ + + def __init__(self, expected_items: int, false_positive_rate: float = 0.01): + """ + Initialize Bloom Filter. + + Args: + expected_items: Expected number of items to be stored + false_positive_rate: Desired false positive rate (0.0 to 1.0) + + Examples: + >>> bf = BloomFilter(1000, 0.01) + >>> bf.size > 0 + True + >>> bf.hash_functions > 0 + True + """ + self.expected_items = expected_items + self.false_positive_rate = false_positive_rate + + # Calculate optimal size and number of hash functions + self.size = self._calculate_size(expected_items, false_positive_rate) + self.hash_functions = self._calculate_hash_functions(self.size, expected_items) + + # Initialize bit array + self.bit_array = [False] * self.size + self.count = 0 + + def _calculate_size(self, n: int, p: float) -> int: + """Calculate optimal size of bit array.""" + if p <= 0 or p >= 1: + raise ValueError("False positive rate must be between 0 and 1") + + # m = -(n * ln(p)) / (ln(2)^2) + size = -(n * math.log(p)) / (math.log(2) ** 2) + return math.ceil(size) + + def _calculate_hash_functions(self, m: int, n: int) -> int: + """Calculate optimal number of hash functions.""" + # k = (m/n) * ln(2) + k = (m / n) * math.log(2) + return math.ceil(k) + + def _hash(self, item: str | bytes, seed: int) -> int: + """ + Generate hash value for an item with given seed. + + Args: + item: Item to hash + seed: Seed for hash function + + Returns: + Hash value + """ + if isinstance(item, str): + item = item.encode("utf-8") + + # Use different hash algorithms for different seeds + hash_algorithms = [ + hashlib.md5, + hashlib.sha1, + hashlib.sha256, + hashlib.sha512, + hashlib.blake2b, + hashlib.blake2s, + ] + + algorithm = hash_algorithms[seed % len(hash_algorithms)] + hash_obj = algorithm(item) + hash_obj.update(str(seed).encode("utf-8")) + + return int(hash_obj.hexdigest(), 16) % self.size + + def add(self, item: str | bytes) -> None: + """ + Add an item to the Bloom Filter. + + Args: + item: Item to add to the filter + + Examples: + >>> bf = BloomFilter(100, 0.01) + >>> bf.add("hello") + >>> bf.contains("hello") + True + """ + for i in range(self.hash_functions): + index = self._hash(item, i) + self.bit_array[index] = True + + self.count += 1 + + def contains(self, item: str | bytes) -> bool: + """ + Check if an item might be in the Bloom Filter. + + Args: + item: Item to check + + Returns: + True if item might be in the filter (no false negatives), + False if item is definitely not in the filter + + Examples: + >>> bf = BloomFilter(100, 0.01) + >>> bf.add("hello") + >>> bf.contains("hello") + True + >>> bf.contains("world") # Might be False or True (false positive) + False + """ + for i in range(self.hash_functions): + index = self._hash(item, i) + if not self.bit_array[index]: + return False + + return True + + def get_false_positive_rate(self) -> float: + """ + Calculate current false positive rate. + + Returns: + Current false positive rate + + Examples: + >>> bf = BloomFilter(100, 0.01) + >>> bf.add("test") + >>> rate = bf.get_false_positive_rate() + >>> 0 <= rate <= 1 + True + """ + if self.count == 0: + return 0.0 + + # (1 - e^(-k*n/m))^k + k = self.hash_functions + n = self.count + m = self.size + + return (1 - math.exp(-k * n / m)) ** k + + def get_load_factor(self) -> float: + """ + Get current load factor of the filter. + + Returns: + Load factor (number of items / expected items) + + Examples: + >>> bf = BloomFilter(100, 0.01) + >>> bf.add("test") + >>> bf.get_load_factor() > 0 + True + """ + return self.count / self.expected_items + + def clear(self) -> None: + """Clear all items from the Bloom Filter.""" + self.bit_array = [False] * self.size + self.count = 0 + + def __len__(self) -> int: + """Return the number of items added to the filter.""" + return self.count + + def __contains__(self, item: str | bytes) -> bool: + """Support 'in' operator.""" + return self.contains(item) + + def __repr__(self) -> str: + """String representation of the Bloom Filter.""" + return ( + f"BloomFilter(size={self.size}, hash_functions={self.hash_functions}, " + f"items={self.count}, load_factor={self.get_load_factor():.3f})" + ) + + +class CountingBloomFilter: + """ + Counting Bloom Filter that supports deletion. + + Uses counters instead of bits to allow for element removal. + """ + + def __init__(self, expected_items: int, false_positive_rate: float = 0.01): + """ + Initialize Counting Bloom Filter. + + Args: + expected_items: Expected number of items to be stored + false_positive_rate: Desired false positive rate (0.0 to 1.0) + """ + self.expected_items = expected_items + self.false_positive_rate = false_positive_rate + + # Calculate optimal size and number of hash functions + self.size = self._calculate_size(expected_items, false_positive_rate) + self.hash_functions = self._calculate_hash_functions(self.size, expected_items) + + # Initialize counter array + self.counters = [0] * self.size + self.count = 0 + + def _calculate_size(self, n: int, p: float) -> int: + """Calculate optimal size of counter array.""" + if p <= 0 or p >= 1: + raise ValueError("False positive rate must be between 0 and 1") + + size = -(n * math.log(p)) / (math.log(2) ** 2) + return math.ceil(size) + + def _calculate_hash_functions(self, m: int, n: int) -> int: + """Calculate optimal number of hash functions.""" + k = (m / n) * math.log(2) + return math.ceil(k) + + def _hash(self, item: str | bytes, seed: int) -> int: + """Generate hash value for an item with given seed.""" + if isinstance(item, str): + item = item.encode("utf-8") + + hash_algorithms = [ + hashlib.md5, + hashlib.sha1, + hashlib.sha256, + hashlib.sha512, + hashlib.blake2b, + hashlib.blake2s, + ] + + algorithm = hash_algorithms[seed % len(hash_algorithms)] + hash_obj = algorithm(item) + hash_obj.update(str(seed).encode("utf-8")) + + return int(hash_obj.hexdigest(), 16) % self.size + + def add(self, item: str | bytes) -> None: + """Add an item to the Counting Bloom Filter.""" + for i in range(self.hash_functions): + index = self._hash(item, i) + self.counters[index] += 1 + + self.count += 1 + + def remove(self, item: Union[str, bytes]) -> bool: + """ + Remove an item from the Counting Bloom Filter. + + Args: + item: Item to remove + + Returns: + True if item was removed, False if item was not in the filter + """ + if not self.contains(item): + return False + + for i in range(self.hash_functions): + index = self._hash(item, i) + self.counters[index] -= 1 + + self.count -= 1 + return True + + def contains(self, item: str | bytes) -> bool: + """Check if an item might be in the Counting Bloom Filter.""" + for i in range(self.hash_functions): + index = self._hash(item, i) + if self.counters[index] == 0: + return False + + return True + + +if __name__ == "__main__": + # Example usage + print("Bloom Filter Example") + print("=" * 50) + + # Create Bloom Filter + bf = BloomFilter(expected_items=1000, false_positive_rate=0.01) + + # Add some items + items_to_add = ["apple", "banana", "cherry", "date", "elderberry"] + for item in items_to_add: + bf.add(item) + print(f"Added: {item}") + + print("\nBloom Filter Info:") + print(f"Size: {bf.size}") + print(f"Hash Functions: {bf.hash_functions}") + print(f"Items Added: {len(bf)}") + print(f"Load Factor: {bf.get_load_factor():.3f}") + print(f"False Positive Rate: {bf.get_false_positive_rate():.3f}") + + # Test contains + test_items = ["apple", "banana", "grape", "kiwi", "mango"] + print("\nTesting items:") + for item in test_items: + result = bf.contains(item) + print(f"'{item}': {'Found' if result else 'Not found'}") + + # Counting Bloom Filter example + print("\nCounting Bloom Filter Example") + print("=" * 50) + + cbf = CountingBloomFilter(expected_items=100, false_positive_rate=0.05) + + # Add items + for item in ["test1", "test2", "test3"]: + cbf.add(item) + print(f"Added: {item}") + + # Test removal + print(f"\nRemoving 'test2': {cbf.remove('test2')}") + print(f"Contains 'test2': {cbf.contains('test2')}") + print(f"Contains 'test1': {cbf.contains('test1')}") + + print("\nCounting Bloom Filter Info:") + print(f"Items: {len(cbf)}") + print(f"Load Factor: {cbf.get_load_factor():.3f}") diff --git a/data_structures/fenwick_tree.py b/data_structures/fenwick_tree.py new file mode 100644 index 000000000000..3305823689f1 --- /dev/null +++ b/data_structures/fenwick_tree.py @@ -0,0 +1,329 @@ +""" +Fenwick Tree (Binary Indexed Tree) implementation. + +A Fenwick Tree is a data structure that can efficiently update elements +and calculate prefix sums in a table of numbers. It supports two main +operations: +1. Update an element at a given index +2. Query the sum of elements from index 1 to a given index + +Time Complexity: + - Update: O(log n) + - Query: O(log n) +Space Complexity: O(n) + +Reference: https://en.wikipedia.org/wiki/Fenwick_tree +""" + +from typing import Optional + + +class FenwickTree: + """ + Fenwick Tree implementation for efficient range sum queries and updates. + + Attributes: + tree: Internal array representing the Fenwick Tree + size: Size of the tree + """ + + def __init__(self, size: int): + """ + Initialize Fenwick Tree with given size. + + Args: + size: Size of the tree (1-indexed) + + Examples: + >>> ft = FenwickTree(10) + >>> ft.size + 10 + """ + self.size = size + self.tree = [0] * (size + 1) # 1-indexed + + def update(self, index: int, delta: int) -> None: + """ + Update element at given index by adding delta. + + Args: + index: 1-indexed position to update + delta: Value to add to the element + + Examples: + >>> ft = FenwickTree(5) + >>> ft.update(1, 5) + >>> ft.query(1) + 5 + """ + if index < 1 or index > self.size: + msg = f"Index {index} out of range [1, {self.size}]" + raise ValueError(msg) + + while index <= self.size: + self.tree[index] += delta + index += index & (-index) # Add least significant bit + + def query(self, index: int) -> int: + """ + Query sum from index 1 to given index (inclusive). + + Args: + index: 1-indexed position to query up to + + Returns: + Sum of elements from index 1 to index + + Examples: + >>> ft = FenwickTree(5) + >>> ft.update(1, 3) + >>> ft.update(2, 4) + >>> ft.query(2) + 7 + """ + if index < 1 or index > self.size: + msg = f"Index {index} out of range [1, {self.size}]" + raise ValueError(msg) + + result = 0 + while index > 0: + result += self.tree[index] + index -= index & (-index) # Remove least significant bit + + return result + + def range_query(self, left: int, right: int) -> int: + """ + Query sum from left to right (inclusive). + + Args: + left: 1-indexed left boundary + right: 1-indexed right boundary + + Returns: + Sum of elements from left to right + + Examples: + >>> ft = FenwickTree(5) + >>> ft.update(1, 1) + >>> ft.update(2, 2) + >>> ft.update(3, 3) + >>> ft.range_query(2, 3) + 5 + """ + if left < 1 or right > self.size or left > right: + msg = f"Invalid range [{left}, {right}]" + raise ValueError(msg) + + return self.query(right) - self.query(left - 1) + + def get(self, index: int) -> int: + """ + Get value at given index. + + Args: + index: 1-indexed position + + Returns: + Value at the given index + + Examples: + >>> ft = FenwickTree(5) + >>> ft.update(1, 5) + >>> ft.get(1) + 5 + """ + return self.range_query(index, index) + + def set_value(self, index: int, value: int) -> None: + """ + Set value at given index. + + Args: + index: 1-indexed position + value: New value to set + + Examples: + >>> ft = FenwickTree(5) + >>> ft.set_value(1, 10) + >>> ft.get(1) + 10 + """ + current_value = self.get(index) + delta = value - current_value + self.update(index, delta) + + def clear(self) -> None: + """Clear all values in the tree.""" + self.tree = [0] * (self.size + 1) + + def __len__(self) -> int: + """Return the size of the tree.""" + return self.size + + def __repr__(self) -> str: + """String representation of the Fenwick Tree.""" + return f"FenwickTree(size={self.size})" + + +class FenwickTree2D: + """ + 2D Fenwick Tree for 2D range sum queries and updates. + + Supports: + - Update element at (row, col) + - Query sum from (1, 1) to (row, col) + - Query sum in rectangle from (r1, c1) to (r2, c2) + """ + + def __init__(self, rows: int, cols: int): + """ + Initialize 2D Fenwick Tree. + + Args: + rows: Number of rows + cols: Number of columns + """ + self.rows = rows + self.cols = cols + self.tree = [[0] * (cols + 1) for _ in range(rows + 1)] + + def update(self, row: int, col: int, delta: int) -> None: + """ + Update element at (row, col) by adding delta. + + Args: + row: Row index (1-indexed) + col: Column index (1-indexed) + delta: Value to add + """ + if row < 1 or row > self.rows or col < 1 or col > self.cols: + msg = f"Position ({row}, {col}) out of range" + raise ValueError(msg) + + i = row + while i <= self.rows: + j = col + while j <= self.cols: + self.tree[i][j] += delta + j += j & (-j) + i += i & (-i) + + def query(self, row: int, col: int) -> int: + """ + Query sum from (1, 1) to (row, col). + + Args: + row: Row index (1-indexed) + col: Column index (1-indexed) + + Returns: + Sum from (1, 1) to (row, col) + """ + if row < 1 or row > self.rows or col < 1 or col > self.cols: + msg = f"Position ({row}, {col}) out of range" + raise ValueError(msg) + + result = 0 + i = row + while i > 0: + j = col + while j > 0: + result += self.tree[i][j] + j -= j & (-j) + i -= i & (-i) + + return result + + def range_query(self, r1: int, c1: int, r2: int, c2: int) -> int: + """ + Query sum in rectangle from (r1, c1) to (r2, c2). + + Args: + r1: Top-left row (1-indexed) + c1: Top-left column (1-indexed) + r2: Bottom-right row (1-indexed) + c2: Bottom-right column (1-indexed) + + Returns: + Sum in the rectangle + """ + return ( + self.query(r2, c2) + - self.query(r1 - 1, c2) + - self.query(r2, c1 - 1) + + self.query(r1 - 1, c1 - 1) + ) + + +if __name__ == "__main__": + # Example usage + print("Fenwick Tree Example") + print("=" * 50) + + # Create Fenwick Tree + ft = FenwickTree(10) + + # Add some values + values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + for i, val in enumerate(values, 1): + ft.update(i, val) + print(f"Added {val} at position {i}") + + print("\nPrefix sums:") + for i in range(1, 11): + prefix_sum = ft.query(i) + print(f"Sum from 1 to {i}: {prefix_sum}") + + print("\nRange queries:") + ranges = [(2, 5), (1, 10), (3, 7)] + for left, right in ranges: + range_sum = ft.range_query(left, right) + print(f"Sum from {left} to {right}: {range_sum}") + + print("\nIndividual values:") + for i in range(1, 11): + value = ft.get(i) + print(f"Value at position {i}: {value}") + + # Test update + print("\nUpdating position 3 by adding 5:") + ft.update(3, 5) + print(f"New value at position 3: {ft.get(3)}") + print(f"New prefix sum from 1 to 3: {ft.query(3)}") + + # 2D Fenwick Tree example + print("\n2D Fenwick Tree Example") + print("=" * 50) + + ft2d = FenwickTree2D(3, 3) + + # Add some values + matrix_values = [ + (1, 1, 1), + (1, 2, 2), + (1, 3, 3), + (2, 1, 4), + (2, 2, 5), + (2, 3, 6), + (3, 1, 7), + (3, 2, 8), + (3, 3, 9), + ] + + for row, col, val in matrix_values: + ft2d.update(row, col, val) + print(f"Added {val} at position ({row}, {col})") + + print("\n2D Range queries:") + queries = [ + (1, 1, 2, 2), # Top-left 2x2 + (2, 2, 3, 3), # Bottom-right 2x2 + (1, 1, 3, 3), # Entire matrix + ] + + for r1, c1, r2, c2 in queries: + result = ft2d.range_query(r1, c1, r2, c2) + print(f"Sum from ({r1}, {c1}) to ({r2}, {c2}): {result}") + + print("\nFenwick Tree implementation completed successfully!") diff --git a/data_structures/segment_tree.py b/data_structures/segment_tree.py new file mode 100644 index 000000000000..e60e3e8417f8 --- /dev/null +++ b/data_structures/segment_tree.py @@ -0,0 +1,469 @@ +""" +Segment Tree implementation for range queries and updates. + +A Segment Tree is a data structure that allows efficient range queries +and range updates on an array. It supports operations like: +- Range sum queries +- Range minimum/maximum queries +- Range updates (lazy propagation) + +Time Complexity: + - Build: O(n) + - Query: O(log n) + - Update: O(log n) + - Range Update: O(log n) with lazy propagation +Space Complexity: O(n) + +Reference: https://en.wikipedia.org/wiki/Segment_tree +""" + +from collections.abc import Callable +import math + + +class SegmentTree: + """ + Segment Tree implementation with configurable operations. + + Attributes: + data: Original array + tree: Segment tree array + size: Size of the original array + operation: Function for combining values (e.g., sum, min, max) + default_value: Default value for empty ranges + """ + + def __init__( + self, + data: list[int], + operation: Callable[[int, int], int] | None = None, + default_value: int = 0, + ): + """ + Initialize Segment Tree. + + Args: + data: Input array + operation: Function to combine two values (default: addition) + default_value: Default value for empty ranges + + Examples: + >>> st = SegmentTree([1, 2, 3, 4, 5]) + >>> st.query(0, 4) + 15 + """ + self.data = data.copy() + self.size = len(data) + self.operation = operation or (lambda x, y: x + y) + self.default_value = default_value + + # Calculate tree size (next power of 2) + self.tree_size = 2 * (2 ** math.ceil(math.log2(self.size))) - 1 + self.tree = [self.default_value] * self.tree_size + + self._build(0, 0, self.size - 1) + + def _build(self, node: int, start: int, end: int) -> None: + """ + Build the segment tree recursively. + + Args: + node: Current node index + start: Start of current segment + end: End of current segment + + Examples: + >>> st = SegmentTree([1, 2, 3, 4, 5]) + >>> st._build(0, 0, 4) # Builds the entire tree + >>> st.query(0, 4) + 15 + """ + if start == end: + self.tree[node] = self.data[start] + else: + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + self._build(left_child, start, mid) + self._build(right_child, mid + 1, end) + + self.tree[node] = self.operation( + self.tree[left_child], self.tree[right_child] + ) + + def query(self, left: int, right: int) -> int: + """ + Query range from left to right (0-indexed). + + Args: + left: Left boundary (inclusive) + right: Right boundary (inclusive) + + Returns: + Result of the operation over the range + + Examples: + >>> st = SegmentTree([1, 2, 3, 4, 5]) + >>> st.query(1, 3) + 9 + >>> st.query(0, 4) + 15 + >>> st.query(2, 2) + 3 + """ + if left < 0 or right >= self.size or left > right: + msg = f"Invalid range [{left}, {right}]" + raise ValueError(msg) + + return self._query(0, 0, self.size - 1, left, right) + + def _query(self, node: int, start: int, end: int, left: int, right: int) -> int: + """ + Internal query method. + + Args: + node: Current node index + start: Start of current segment + end: End of current segment + left: Left boundary of query + right: Right boundary of query + + Returns: + Result of the operation over the range + + Examples: + >>> st = SegmentTree([1, 2, 3, 4, 5]) + >>> st._query(0, 0, 4, 1, 3) + 9 + """ + if right < start or left > end: + return self.default_value + + if left <= start and end <= right: + return self.tree[node] + + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + left_result = self._query(left_child, start, mid, left, right) + right_result = self._query(right_child, mid + 1, end, left, right) + + return self.operation(left_result, right_result) + + def update(self, index: int, value: int) -> None: + """ + Update element at given index. + + Args: + index: Index to update (0-indexed) + value: New value + + Examples: + >>> st = SegmentTree([1, 2, 3, 4, 5]) + >>> st.update(2, 10) + >>> st.query(2, 2) + 10 + """ + if index < 0 or index >= self.size: + msg = f"Index {index} out of range" + raise ValueError(msg) + + self.data[index] = value + self._update(0, 0, self.size - 1, index, value) + + def _update(self, node: int, start: int, end: int, index: int, value: int) -> None: + """Internal update method.""" + if start == end: + self.tree[node] = value + else: + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + if index <= mid: + self._update(left_child, start, mid, index, value) + else: + self._update(right_child, mid + 1, end, index, value) + + self.tree[node] = self.operation( + self.tree[left_child], self.tree[right_child] + ) + + +class LazySegmentTree: + """ + Segment Tree with lazy propagation for range updates. + + Supports: + - Range queries + - Range updates + - Lazy propagation for efficient range updates + """ + + def __init__( + self, + data: list[int], + operation: Callable[[int, int], int] | None = None, + default_value: int = 0, + ): + """ + Initialize Lazy Segment Tree. + + Args: + data: Input array + operation: Function to combine two values (default: addition) + default_value: Default value for empty ranges + """ + self.data = data.copy() + self.size = len(data) + self.operation = operation or (lambda x, y: x + y) + self.default_value = default_value + + # Calculate tree size + self.tree_size = 2 * (2 ** math.ceil(math.log2(self.size))) - 1 + self.tree = [self.default_value] * self.tree_size + self.lazy = [0] * self.tree_size + + self._build(0, 0, self.size - 1) + + def _build(self, node: int, start: int, end: int) -> None: + """Build the segment tree recursively.""" + if start == end: + self.tree[node] = self.data[start] + else: + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + self._build(left_child, start, mid) + self._build(right_child, mid + 1, end) + + self.tree[node] = self.operation( + self.tree[left_child], self.tree[right_child] + ) + + def _push_lazy(self, node: int, start: int, end: int) -> None: + """ + Push lazy updates to children. + + Args: + node: Current node index + start: Start of current segment + end: End of current segment + + Examples: + >>> lst = LazySegmentTree([1, 2, 3, 4, 5]) + >>> lst.lazy[0] = 2 + >>> lst._push_lazy(0, 0, 4) + >>> lst.lazy[0] + 0 + """ + if self.lazy[node] != 0: + self.tree[node] += self.lazy[node] * (end - start + 1) + + if start != end: + left_child = 2 * node + 1 + right_child = 2 * node + 2 + self.lazy[left_child] += self.lazy[node] + self.lazy[right_child] += self.lazy[node] + + self.lazy[node] = 0 + + def range_update(self, left: int, right: int, delta: int) -> None: + """ + Update range from left to right by adding delta. + + Args: + left: Left boundary (0-indexed) + right: Right boundary (0-indexed) + delta: Value to add to the range + + Examples: + >>> lst = LazySegmentTree([1, 2, 3, 4, 5]) + >>> lst.range_update(1, 3, 2) + >>> lst.query(1, 3) + 11 + """ + if left < 0 or right >= self.size or left > right: + msg = f"Invalid range [{left}, {right}]" + raise ValueError(msg) + + self._range_update(0, 0, self.size - 1, left, right, delta) + + def _range_update( + self, node: int, start: int, end: int, left: int, right: int, delta: int + ) -> None: + """ + Internal range update method with lazy propagation. + + Args: + node: Current node index + start: Start of current segment + end: End of current segment + left: Left boundary of update + right: Right boundary of update + delta: Value to add to the range + + Examples: + >>> lst = LazySegmentTree([1, 2, 3, 4, 5]) + >>> lst._range_update(0, 0, 4, 1, 3, 2) + >>> lst.query(1, 3) + 11 + """ + self._push_lazy(node, start, end) + + if right < start or left > end: + return + + if left <= start and end <= right: + self.lazy[node] += delta + self._push_lazy(node, start, end) + return + + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + self._range_update(left_child, start, mid, left, right, delta) + self._range_update(right_child, mid + 1, end, left, right, delta) + + self._push_lazy(left_child, start, mid) + self._push_lazy(right_child, mid + 1, end) + + self.tree[node] = self.operation(self.tree[left_child], self.tree[right_child]) + + def query(self, left: int, right: int) -> int: + """ + Query range from left to right. + + Args: + left: Left boundary (0-indexed) + right: Right boundary (0-indexed) + + Returns: + Result of the operation over the range + + Examples: + >>> lst = LazySegmentTree([1, 2, 3, 4, 5]) + >>> lst.query(1, 3) + 9 + >>> lst.query(0, 4) + 15 + """ + if left < 0 or right >= self.size or left > right: + msg = f"Invalid range [{left}, {right}]" + raise ValueError(msg) + + return self._query(0, 0, self.size - 1, left, right) + + def _query(self, node: int, start: int, end: int, left: int, right: int) -> int: + """ + Internal query method. + + Args: + node: Current node index + start: Start of current segment + end: End of current segment + left: Left boundary of query + right: Right boundary of query + + Returns: + Result of the operation over the range + + Examples: + >>> lst = LazySegmentTree([1, 2, 3, 4, 5]) + >>> lst._query(0, 0, 4, 1, 3) + 9 + """ + self._push_lazy(node, start, end) + + if right < start or left > end: + return self.default_value + + if left <= start and end <= right: + return self.tree[node] + + mid = (start + end) // 2 + left_child = 2 * node + 1 + right_child = 2 * node + 2 + + left_result = self._query(left_child, start, mid, left, right) + right_result = self._query(right_child, mid + 1, end, left, right) + + return self.operation(left_result, right_result) + + +class MinSegmentTree(SegmentTree): + """Segment Tree for range minimum queries.""" + + def __init__(self, data: list[int]) -> None: + super().__init__(data, min, float("inf")) + + +class MaxSegmentTree(SegmentTree): + """Segment Tree for range maximum queries.""" + + def __init__(self, data: list[int]) -> None: + super().__init__(data, max, float("-inf")) + + +if __name__ == "__main__": + # Example usage + print("Segment Tree Example") + print("=" * 50) + + # Create Segment Tree + data = [1, 3, 5, 7, 9, 11] + st = SegmentTree(data) + + print(f"Original data: {data}") + print(f"Tree size: {st.tree_size}") + + # Range sum queries + print("\nRange sum queries:") + queries = [(0, 2), (1, 4), (0, 5), (2, 3)] + for left, right in queries: + result = st.query(left, right) + print(f"Sum from {left} to {right}: {result}") + + # Update element + print("\nUpdating element at index 2 to 10:") + st.update(2, 10) + print(f"New data: {st.data}") + print(f"Sum from 0 to 2: {st.query(0, 2)}") + + # Min Segment Tree + print("\nMin Segment Tree Example") + print("=" * 50) + + min_st = MinSegmentTree([3, 1, 4, 1, 5, 9, 2, 6]) + print(f"Original data: {min_st.data}") + + min_queries = [(0, 3), (2, 5), (0, 7)] + for left, right in min_queries: + result = min_st.query(left, right) + print(f"Min from {left} to {right}: {result}") + + # Lazy Segment Tree + print("\nLazy Segment Tree Example") + print("=" * 50) + + lazy_st = LazySegmentTree([1, 2, 3, 4, 5]) + print(f"Original data: {lazy_st.data}") + + # Range update + print("\nAdding 2 to range [1, 3]:") + lazy_st.range_update(1, 3, 2) + + # Query after update + print(f"Sum from 0 to 4: {lazy_st.query(0, 4)}") + print(f"Sum from 1 to 3: {lazy_st.query(1, 3)}") + + # Another range update + print("\nAdding 1 to range [0, 2]:") + lazy_st.range_update(0, 2, 1) + print(f"Sum from 0 to 4: {lazy_st.query(0, 4)}") + + print("\nSegment Tree implementation completed successfully!") diff --git a/maths/fft_cooley_tukey.py b/maths/fft_cooley_tukey.py new file mode 100644 index 000000000000..18f308d6367b --- /dev/null +++ b/maths/fft_cooley_tukey.py @@ -0,0 +1,209 @@ +""" +Fast Fourier Transform (FFT) using Cooley-Tukey algorithm. + +The Fast Fourier Transform is an efficient algorithm to compute the Discrete Fourier +Transform (DFT) and its inverse. The Cooley-Tukey algorithm is a divide-and-conquer +algorithm that recursively breaks down a DFT of any composite size N = N1*N2 into +many smaller DFTs of sizes N1 and N2. + +Time Complexity: O(N log N) +Space Complexity: O(N) + +Reference: https://en.wikipedia.org/wiki/Cooley%E2%80%93Tukey_FFT_algorithm +""" + +import math +from typing import Optional + + +def fft_cooley_tukey(signal: list[complex]) -> list[complex]: + """ + Compute the Fast Fourier Transform using Cooley-Tukey algorithm. + + Args: + signal: Input signal as a list of complex numbers + + Returns: + FFT of the input signal + + Examples: + >>> signal = [1, 2, 3, 4] + >>> fft_result = fft_cooley_tukey([complex(x) for x in signal]) + >>> len(fft_result) + 4 + + >>> signal = [0, 1, 0, -1] + >>> fft_result = fft_cooley_tukey([complex(x) for x in signal]) + >>> abs(fft_result[1]) > 1.9 # Should be close to 2 + True + """ + n = len(signal) + + # Base case + if n <= 1: + return signal + + # Ensure n is a power of 2 by zero-padding + if n & (n - 1) != 0: + next_power_of_2 = 1 << (n - 1).bit_length() + signal = signal + [0] * (next_power_of_2 - n) + n = next_power_of_2 + + # Divide + even = fft_cooley_tukey(signal[0::2]) + odd = fft_cooley_tukey(signal[1::2]) + + # Combine + result = [0] * n + for k in range(n // 2): + t = odd[k] * complex( + math.cos(-2 * math.pi * k / n), math.sin(-2 * math.pi * k / n) + ) + result[k] = even[k] + t + result[k + n // 2] = even[k] - t + + return result + + +def ifft_cooley_tukey(fft_signal: list[complex]) -> list[complex]: + """ + Compute the Inverse Fast Fourier Transform using Cooley-Tukey algorithm. + + Args: + fft_signal: FFT signal as a list of complex numbers + + Returns: + Inverse FFT of the input signal + + Examples: + >>> signal = [1, 2, 3, 4] + >>> fft_result = fft_cooley_tukey([complex(x) for x in signal]) + >>> ifft_result = ifft_cooley_tukey(fft_result) + >>> all(abs(ifft_result[i] - signal[i]) < 1e-10 for i in range(len(signal))) + True + """ + n = len(fft_signal) + + # Conjugate the input + conjugated = [x.conjugate() for x in fft_signal] + + # Apply forward FFT + fft_conjugated = fft_cooley_tukey(conjugated) + + # Conjugate and normalize + result = [x.conjugate() / n for x in fft_conjugated] + + return result + + +def fft_magnitude_phase(fft_result: list[complex]) -> tuple[list[float], list[float]]: + """ + Extract magnitude and phase from FFT result. + + Args: + fft_result: FFT result as a list of complex numbers + + Returns: + Tuple of (magnitudes, phases) + + Examples: + >>> signal = [1, 0, -1, 0] + >>> fft_result = fft_cooley_tukey([complex(x) for x in signal]) + >>> magnitudes, phases = fft_magnitude_phase(fft_result) + >>> len(magnitudes) == len(phases) + True + """ + magnitudes = [abs(x) for x in fft_result] + phases = [math.atan2(x.imag, x.real) for x in fft_result] + + return magnitudes, phases + + +def fft_frequency_bins(sample_rate: float, n_samples: int) -> list[float]: + """ + Generate frequency bins for FFT result. + + Args: + sample_rate: Sampling rate in Hz + n_samples: Number of samples + + Returns: + List of frequency values in Hz + + Examples: + >>> bins = fft_frequency_bins(1000, 8) + >>> len(bins) + 8 + >>> bins[0] + 0.0 + """ + return [i * sample_rate / n_samples for i in range(n_samples)] + + +if __name__ == "__main__": + # Example usage + import matplotlib.pyplot as plt + + # Create a test signal + sample_rate = 1000 + duration = 1 + t = [i / sample_rate for i in range(int(sample_rate * duration))] + + # Signal with multiple frequencies + signal = [ + math.sin(2 * math.pi * 50 * x) + 0.5 * math.sin(2 * math.pi * 120 * x) + for x in t + ] + + # Apply FFT + fft_result = fft_cooley_tukey([complex(x) for x in signal]) + + # Extract magnitude and phase + magnitudes, phases = fft_magnitude_phase(fft_result) + + # Generate frequency bins + frequencies = fft_frequency_bins(sample_rate, len(signal)) + + # Plot results + plt.figure(figsize=(12, 8)) + + plt.subplot(2, 2, 1) + plt.plot(t[:100], signal[:100]) + plt.title("Original Signal (first 100 samples)") + plt.xlabel("Time (s)") + plt.ylabel("Amplitude") + + plt.subplot(2, 2, 2) + plt.plot(frequencies[: len(frequencies) // 2], magnitudes[: len(magnitudes) // 2]) + plt.title("FFT Magnitude Spectrum") + plt.xlabel("Frequency (Hz)") + plt.ylabel("Magnitude") + + plt.subplot(2, 2, 3) + plt.plot(frequencies[: len(frequencies) // 2], phases[: len(phases) // 2]) + plt.title("FFT Phase Spectrum") + plt.xlabel("Frequency (Hz)") + plt.ylabel("Phase (radians)") + + # Test inverse FFT + ifft_result = ifft_cooley_tukey(fft_result) + reconstructed = [x.real for x in ifft_result] + + plt.subplot(2, 2, 4) + plt.plot(t[:100], reconstructed[:100]) + plt.title("Reconstructed Signal (first 100 samples)") + plt.xlabel("Time (s)") + plt.ylabel("Amplitude") + + plt.tight_layout() + plt.show() + + print("FFT implementation completed successfully!") + peak_frequencies = [ + f + for f, m in zip( + frequencies[: len(frequencies) // 2], magnitudes[: len(magnitudes) // 2] + ) + if m > max(magnitudes[: len(magnitudes) // 2]) * 0.1 + ] + print(f"Peak frequencies detected: {peak_frequencies}")