diff --git a/machine_learning/random_forest_classifier.py b/machine_learning/random_forest_classifier.py new file mode 100644 index 000000000000..a6ff359aff1c --- /dev/null +++ b/machine_learning/random_forest_classifier.py @@ -0,0 +1,409 @@ +"""Random Forest Classifier implementation from scratch. + +This module implements a Random Forest Classifier using: +- Decision Tree base learners built from scratch +- Bootstrap sampling (bagging) +- Random feature selection at splits +- Majority voting for aggregation + +References: +- https://en.wikipedia.org/wiki/Random_forest +- https://en.wikipedia.org/wiki/Decision_tree_learning +""" + +from __future__ import annotations + +from collections import Counter +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import numpy as np + +TreeNode = Dict[str, Any] + + +class DecisionTreeClassifier: + """A Decision Tree Classifier built from scratch. + + This tree uses information gain (entropy-based) for splitting decisions. + + Attributes: + max_depth: Maximum depth of the tree + min_samples_split: Minimum samples required to split a node + n_features: Number of features to consider for best split + tree: The built tree structure + """ + + def __init__( + self, + max_depth: int = 10, + min_samples_split: int = 2, + n_features: Optional[int] = None, + ) -> None: + self.max_depth: int = max_depth + self.min_samples_split: int = min_samples_split + self.n_features: Optional[int] = n_features + self.tree: Optional[TreeNode] = None + + def fit(self, x: np.ndarray, y: np.ndarray) -> None: + """Build the decision tree. + + Args: + x: Training features, shape (n_samples, n_features) + y: Training labels, shape (n_samples,) + + >>> clf = DecisionTreeClassifier(max_depth=1, min_samples_split=2, n_features=1) + >>> x = np.array([[0.0], [0.0], [1.0], [1.0]]) + >>> y = np.array([0, 0, 1, 1]) + >>> clf.fit(x, y) + >>> isinstance(clf.tree, dict) + True + """ + n_total_features = x.shape[1] + self.n_features = ( + n_total_features + if self.n_features in (None, 0) + else min(self.n_features, n_total_features) + ) + self.tree = self._grow_tree(x, y, depth=0) + + def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNode: + """Recursively grow the decision tree. + + >>> clf = DecisionTreeClassifier(max_depth=0) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> node = clf._grow_tree(x, y, depth=0) + >>> node['leaf'] + True + """ + n_samples, n_features = x.shape + n_labels = len(np.unique(y)) + + # Stopping criteria + if ( + depth >= self.max_depth + or n_labels == 1 + or n_samples < self.min_samples_split + ): + leaf_value = self._most_common_label(y) + return {"leaf": True, "value": int(leaf_value)} + + # Find best split + rng = np.random.default_rng() + feat_indices = rng.choice(n_features, int(self.n_features), replace=False) + best_feat, best_thresh = self._best_split(x, y, feat_indices) + if best_feat is None: + leaf_value = self._most_common_label(y) + return {"leaf": True, "value": int(leaf_value)} + + # Split the data + left_mask = x[:, best_feat] <= best_thresh + right_mask = ~left_mask + + # Grow subtrees + left = self._grow_tree(x[left_mask], y[left_mask], depth + 1) + right = self._grow_tree(x[right_mask], y[right_mask], depth + 1) + return { + "leaf": False, + "feature": int(best_feat), + "threshold": float(best_thresh), + "left": left, + "right": right, + } + + def _best_split( + self, x: np.ndarray, y: np.ndarray, feat_indices: Sequence[int] + ) -> Tuple[Optional[int], Optional[float]]: + """Find the best feature and threshold to split on. + + >>> clf = DecisionTreeClassifier() + >>> x = np.array([[0.0], [0.5], [1.0]]) + >>> y = np.array([0, 0, 1]) + >>> feat, thresh = clf._best_split(x, y, [0]) + >>> feat in (None, 0) + True + """ + best_gain = -np.inf + split_idx: Optional[int] = None + split_thresh: Optional[float] = None + + for feat_idx in feat_indices: + x_column = x[:, int(feat_idx)] + thresholds = np.unique(x_column) + for threshold in thresholds: + gain = self._information_gain(y, x_column, float(threshold)) + if gain > best_gain: + best_gain = gain + split_idx = int(feat_idx) + split_thresh = float(threshold) + return split_idx, split_thresh + + def _information_gain( + self, y: np.ndarray, x_column: np.ndarray, threshold: float + ) -> float: + """Calculate information gain from a split. + + >>> y = np.array([0, 0, 1, 1]) + >>> x_col = np.array([0.0, 0.2, 0.8, 1.0]) + >>> DecisionTreeClassifier()._information_gain(y, x_col, 0.5) >= 0.0 + True + """ + # Parent entropy + parent_entropy = self._entropy(y) + + # Create children + left_mask = x_column <= threshold + right_mask = ~left_mask + if np.sum(left_mask) == 0 or np.sum(right_mask) == 0: + return 0.0 + + # Calculate weighted average entropy of children + n = len(y) + n_left, n_right = int(np.sum(left_mask)), int(np.sum(right_mask)) + e_left, e_right = self._entropy(y[left_mask]), self._entropy(y[right_mask]) + child_entropy = (n_left / n) * e_left + (n_right / n) * e_right + + # Information gain + ig = parent_entropy - child_entropy + return float(ig) + + def _entropy(self, y: np.ndarray) -> float: + """Calculate entropy of a label distribution. + + >>> DecisionTreeClassifier()._entropy(np.array([0, 0, 1, 1])) >= 0 + True + """ + hist = np.bincount(y) + ps = hist / len(y) + return float(-np.sum([p * np.log2(p) for p in ps if p > 0])) + + def _most_common_label(self, y: np.ndarray) -> int: + """Return the most common label. + + >>> DecisionTreeClassifier()._most_common_label(np.array([0, 1, 1])) + 1 + """ + counter = Counter(y.tolist()) + return int(counter.most_common(1)[0][0]) + + def predict(self, x: np.ndarray) -> np.ndarray: + """Predict class labels for samples in x. + + Args: + x: Features, shape (n_samples, n_features) + Returns: + Predicted labels, shape (n_samples,) + + >>> clf = DecisionTreeClassifier(max_depth=1, n_features=1) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> clf.fit(x, y) + >>> clf.predict(x).tolist() + [0, 1] + """ + assert self.tree is not None, "Model is not fitted. Call fit first." + return np.array([self._traverse_tree(row, self.tree) for row in x]) + + def _traverse_tree(self, x_row: np.ndarray, node: TreeNode) -> int: + """Traverse the tree to make a prediction for a single sample. + + >>> node = {"leaf": True, "value": 1} + >>> DecisionTreeClassifier()._traverse_tree(np.array([0.0]), node) + 1 + """ + if node["leaf"]: + return int(node["value"]) + if x_row[int(node["feature"])] <= float(node["threshold"]): + return self._traverse_tree(x_row, node["left"]) # type: ignore[arg-type] + return self._traverse_tree(x_row, node["right"]) # type: ignore[arg-type] + + +class RandomForestClassifier: + """Random Forest Classifier built from scratch. + + Random Forest is an ensemble learning method that constructs multiple + decision trees during training and outputs the mode of the classes + (classification) of the individual trees. + + Features: + - Bootstrap sampling (bagging) to create diverse trees + - Random feature selection at each split + - Majority voting for final predictions + + Attributes: + n_estimators: Number of trees in the forest + max_depth: Maximum depth of each tree + min_samples_split: Minimum samples required to split a node + n_features: Number of features to consider for best split + trees: List of trained decision trees + + Example: + >>> from sklearn.datasets import make_classification + >>> from sklearn.model_selection import train_test_split + >>> x, y = make_classification(n_samples=200, n_features=10, random_state=0) + >>> x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=0) + >>> rf = RandomForestClassifier(n_estimators=5, max_depth=5, n_features=3) + >>> _ = rf.fit(x_train, y_train) + >>> y_pred = rf.predict(x_test) + >>> isinstance(y_pred, np.ndarray) + True + """ + + def __init__( + self, + n_estimators: int = 100, + max_depth: int = 10, + min_samples_split: int = 2, + n_features: Optional[int] = None, + ) -> None: + """Initialize Random Forest Classifier. + + Args: + n_estimators: Number of trees in the forest (default: 100) + max_depth: Maximum depth of each tree (default: 10) + min_samples_split: Minimum samples required to split (default: 2) + n_features: Number of features to consider for best split. + If None, uses sqrt(n_features) (default: None) + """ + self.n_estimators: int = n_estimators + self.max_depth: int = max_depth + self.min_samples_split: int = min_samples_split + self.n_features: Optional[int] = n_features + self.trees: List[DecisionTreeClassifier] = [] + + def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestClassifier": + """Build a forest of trees from the training set (x, y). + + Args: + x: Training features, shape (n_samples, n_features) + y: Training labels, shape (n_samples,) + Returns: + self: Fitted classifier + + >>> rf = RandomForestClassifier(n_estimators=2, max_depth=2, n_features=1) + >>> x = np.array([[0.0], [0.1], [0.9], [1.0]]) + >>> y = np.array([0, 0, 1, 1]) + >>> isinstance(rf.fit(x, y), RandomForestClassifier) + True + """ + self.trees = [] + n_features = x.shape[1] + # Default to sqrt of total features if not specified + if self.n_features is None: + self.n_features = int(np.sqrt(n_features)) + for _ in range(self.n_estimators): + tree = DecisionTreeClassifier( + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + n_features=self.n_features, + ) + x_sample, y_sample = self._bootstrap_sample(x, y) + tree.fit(x_sample, y_sample) + self.trees.append(tree) + return self + + def _bootstrap_sample( + self, x: np.ndarray, y: np.ndarray + ) -> Tuple[np.ndarray, np.ndarray]: + """Create a bootstrap sample from the dataset. + + Bootstrap sampling randomly samples with replacement from the dataset. + This creates diverse training sets for each tree. + + Args: + x: Features, shape (n_samples, n_features) + y: Labels, shape (n_samples,) + Returns: + x_sample: Bootstrap sample of features + y_sample: Bootstrap sample of labels + + >>> rf = RandomForestClassifier() + >>> x = np.arange(10).reshape(5, 2).astype(float) + >>> y = np.array([0, 1, 0, 1, 0]) + >>> xs, ys = rf._bootstrap_sample(x, y) + >>> xs.shape[0] == x.shape[0] == ys.shape[0] + True + """ + n_samples = x.shape[0] + rng = np.random.default_rng() + idxs = rng.choice(n_samples, n_samples, replace=True) + return x[idxs], y[idxs] + + def predict(self, x: np.ndarray) -> np.ndarray: + """Predict class labels for samples in x. + + Uses majority voting: each tree votes for a class, and the + class with the most votes becomes the final prediction. + + Args: + x: Features, shape (n_samples, n_features) + Returns: + Predicted labels, shape (n_samples,) + + >>> rf = RandomForestClassifier(n_estimators=3, max_depth=2, n_features=1) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> _ = rf.fit(x, y) + >>> rf.predict(x).shape + (2,) + """ + if not self.trees: + raise RuntimeError("Model is not fitted. Call fit first.") + # Get predictions from all trees + tree_preds = np.array([tree.predict(x) for tree in self.trees]) + # Majority voting: transpose to get predictions per sample then most common + tree_preds = np.swapaxes(tree_preds, 0, 1) + y_pred = [self._most_common_label(sample_preds) for sample_preds in tree_preds] + return np.array(y_pred) + + def _most_common_label(self, y: Sequence[int]) -> int: + """Return the most common label (majority vote). + + >>> RandomForestClassifier()._most_common_label([0, 1, 1]) + 1 + """ + counter = Counter(list(map(int, y))) + return int(counter.most_common(1)[0][0]) + + +if __name__ == "__main__": + # Example usage with synthetic data + from sklearn.datasets import make_classification + from sklearn.metrics import accuracy_score, classification_report + from sklearn.model_selection import train_test_split + + print("Random Forest Classifier - Example Usage") + print("=" * 50) + + # Generate sample classification dataset + x, y = make_classification( + n_samples=1000, n_features=20, n_informative=15, n_redundant=5, random_state=42 + ) + + # Split the data + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.2, random_state=42 + ) + + print(f"Training samples: {x_train.shape[0]}") + print(f"Test samples: {x_test.shape[0]}") + print(f"Number of features: {x_train.shape[1]}") + print() + + # Train Random Forest Classifier + print("Training Random Forest Classifier...") + rf_classifier = RandomForestClassifier( + n_estimators=10, max_depth=10, min_samples_split=2 + ) + rf_classifier.fit(x_train, y_train) + print("Training complete!") + print() + + # Make predictions + y_pred = rf_classifier.predict(x_test) + + # Evaluate + accuracy = accuracy_score(y_test, y_pred) + print(f"Accuracy: {accuracy:.4f}") + print() + print("Classification Report:") diff --git a/machine_learning/random_forest_regressor.py b/machine_learning/random_forest_regressor.py new file mode 100644 index 000000000000..db75d9bd2ff8 --- /dev/null +++ b/machine_learning/random_forest_regressor.py @@ -0,0 +1,364 @@ +"""Random Forest Regressor implementation from scratch. + +References: +- https://en.wikipedia.org/wiki/Random_forest +- https://en.wikipedia.org/wiki/Decision_tree_learning +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import numpy as np + +TreeNodeReg = Dict[str, Any] + + +class DecisionTreeRegressor: + """ + A simple decision tree regressor implementation. + + Parameters + ---------- + max_depth : int, optional (default=None) + The maximum depth of the tree. + min_samples_split : int, optional (default=2) + The minimum number of samples required to split an internal node. + + Examples + -------- + >>> x = np.array([[1.0], [2.0], [3.0], [4.0], [5.0]]) + >>> y = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) + >>> tree = DecisionTreeRegressor(max_depth=2) + >>> _ = tree.fit(x, y) + >>> preds = tree.predict(x) + >>> np.allclose(preds, y, atol=1.0) + True + """ + + def __init__( + self, max_depth: Optional[int] = None, min_samples_split: int = 2 + ) -> None: + self.max_depth: Optional[int] = max_depth + self.min_samples_split: int = min_samples_split + self.tree: Optional[TreeNodeReg] = None + + def fit(self, x: np.ndarray, y: np.ndarray) -> "DecisionTreeRegressor": + """ + Build a decision tree regressor from the training set (x, y). + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + The training input samples. + y : array-like of shape (n_samples,) + The target values. + + Returns + ------- + self : object + Fitted estimator. + """ + self.tree = self._grow_tree(x, y) + return self + + def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNodeReg: + """ + Recursively grow the decision tree. + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + Training samples. + y : array-like of shape (n_samples,) + Target values. + depth : int, optional (default=0) + Current depth of the tree. + + Returns + ------- + node : dict + A node in the decision tree. + """ + n_samples, n_features = x.shape + # Stopping criteria + if ( + (self.max_depth is not None and depth >= self.max_depth) + or n_samples < self.min_samples_split + or len(np.unique(y)) == 1 + ): + return {"value": float(np.mean(y))} + + # Find the best split + best_split = self._best_split(x, y, n_features) + if best_split is None: + return {"value": float(np.mean(y))} + + # Recursively build the tree + left_indices = x[:, best_split["feature"]] <= best_split["threshold"] + right_indices = ~left_indices + left_subtree = self._grow_tree(x[left_indices], y[left_indices], depth + 1) + right_subtree = self._grow_tree(x[right_indices], y[right_indices], depth + 1) + return { + "feature": int(best_split["feature"]), + "threshold": float(best_split["threshold"]), + "left": left_subtree, + "right": right_subtree, + } + + def _best_split( + self, x: np.ndarray, y: np.ndarray, n_features: int + ) -> Optional[Dict[str, Any]]: + """ + Find the best feature and threshold to split on. + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + Training samples. + y : array-like of shape (n_samples,) + Target values. + n_features : int + Number of features to consider. + + Returns + ------- + best_split : dict or None + The best split configuration. + """ + best_mse = float("inf") + best_split: Optional[Dict[str, Any]] = None + for feature in range(n_features): + thresholds = np.unique(x[:, feature]) + for threshold in thresholds: + left_indices = x[:, feature] <= threshold + right_indices = ~left_indices + if np.sum(left_indices) == 0 or np.sum(right_indices) == 0: + continue + mse = self._calculate_mse(y[left_indices], y[right_indices], len(y)) + if mse < best_mse: + best_mse = mse + best_split = { + "feature": int(feature), + "threshold": float(threshold), + } + return best_split + + def _calculate_mse( + self, left_y: np.ndarray, right_y: np.ndarray, n_samples: int + ) -> float: + """ + Calculate weighted mean squared error for a split. + + Parameters + ---------- + left_y : array-like + Target values in the left split. + right_y : array-like + Target values in the right split. + n_samples : int + Total number of samples. + + Returns + ------- + mse : float + Weighted mean squared error. + """ + n_left, n_right = len(left_y), len(right_y) + mse_left = float(np.var(left_y)) if n_left > 0 else 0.0 + mse_right = float(np.var(right_y)) if n_right > 0 else 0.0 + return (n_left / n_samples) * mse_left + (n_right / n_samples) * mse_right + + def predict(self, x: np.ndarray) -> np.ndarray: + """ + Predict target values for x. + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + y_pred : array-like of shape (n_samples,) + The predicted values. + """ + assert self.tree is not None + return np.array([self._predict_sample(sample, self.tree) for sample in x]) + + def _predict_sample(self, sample: np.ndarray, tree: TreeNodeReg) -> float: + """ + Predict the target value for a single sample. + + Parameters + ---------- + sample : array-like + A single sample. + tree : dict + The decision tree node. + + Returns + ------- + prediction : float + The predicted value. + """ + if "value" in tree: + return float(tree["value"]) + if sample[int(tree["feature"])] <= float(tree["threshold"]): + return self._predict_sample(sample, tree["left"]) # type: ignore[arg-type] + return self._predict_sample(sample, tree["right"]) # type: ignore[arg-type] + + +class RandomForestRegressor: + """ + Random Forest Regressor implementation from scratch. + + A random forest is an ensemble of decision trees, generally trained via + the bagging method. The predictions are made by averaging the predictions + of individual trees. + + Parameters + ---------- + n_estimators : int, optional (default=100) + The number of trees in the forest. + max_depth : int, optional (default=None) + The maximum depth of the trees. + min_samples_split : int, optional (default=2) + The minimum number of samples required to split an internal node. + max_features : int, str or None, optional (default='sqrt') + The number of features to consider when looking for the best split. + - If int, then consider max_features features at each split. + - If 'sqrt', then max_features=sqrt(n_features). + - If None, then max_features=n_features. + random_state : int or None, optional (default=None) + Controls the randomness of the estimator. + + Examples + -------- + >>> x = np.array([[1.0, 2.0], [2.0, 3.0], [3.0, 4.0]]) + >>> y = np.array([1.5, 2.5, 3.5]) + >>> rf = RandomForestRegressor(n_estimators=3, max_depth=2, random_state=42) + >>> _ = rf.fit(x, y) + >>> preds = rf.predict(x) + >>> preds.shape + (3,) + """ + + def __init__( + self, + n_estimators: int = 100, + max_depth: Optional[int] = None, + min_samples_split: int = 2, + max_features: Optional["str|int"] = "sqrt", + random_state: Optional[int] = None, + ) -> None: + self.n_estimators: int = n_estimators + self.max_depth: Optional[int] = max_depth + self.min_samples_split: int = min_samples_split + self.max_features: Optional["str|int"] = max_features + self.random_state: Optional[int] = random_state + self.trees: List[Tuple[DecisionTreeRegressor, np.ndarray]] = [] + + def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestRegressor": + """ + Build a random forest regressor from the training set (x, y). + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + The training input samples. + y : array-like of shape (n_samples,) + The target values. + + Returns + ------- + self : object + Fitted estimator. + """ + rng = np.random.default_rng(self.random_state) + x = np.array(x) + y = np.array(y) + n_samples, n_features = x.shape + # Determine max_features + if self.max_features == "sqrt": + max_features = int(np.sqrt(n_features)) + elif self.max_features is None: + max_features = n_features + elif isinstance(self.max_features, int): + max_features = int(self.max_features) + else: + raise ValueError("max_features must be int, 'sqrt', or None") + + self.trees = [] + for _ in range(self.n_estimators): + # Bootstrap sampling + indices = rng.choice(n_samples, n_samples, replace=True) + x_bootstrap = x[indices] + y_bootstrap = y[indices] + # Feature sampling + feature_indices = rng.choice(n_features, max_features, replace=False) + x_bootstrap = x_bootstrap[:, feature_indices] + # Train decision tree + tree = DecisionTreeRegressor( + max_depth=self.max_depth, min_samples_split=self.min_samples_split + ) + tree.fit(x_bootstrap, y_bootstrap) + self.trees.append((tree, feature_indices)) + return self + + def predict(self, x: np.ndarray) -> np.ndarray: + """ + Predict target values for x. + + Parameters + ---------- + x : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + y_pred : array-like of shape (n_samples,) + The predicted values (average of all tree predictions). + """ + x = np.array(x) + preds: List[np.ndarray] = [] + for tree, feature_indices in self.trees: + x_subset = x[:, feature_indices] + preds.append(tree.predict(x_subset)) + # Average predictions from all trees + return np.mean(preds, axis=0) + + +if __name__ == "__main__": + import doctest + + doctest.testmod() + + # Example usage + from sklearn.datasets import make_regression + from sklearn.metrics import mean_squared_error, r2_score + from sklearn.model_selection import train_test_split + + # Generate synthetic regression data + x, y = make_regression( + n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42 + ) + + # Split the data + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.3, random_state=42 + ) + + # Train the Random Forest Regressor + rf_regressor = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42) + rf_regressor.fit(x_train, y_train) + + # Make predictions + y_pred = rf_regressor.predict(x_test) + + # Evaluate the model + mse = mean_squared_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + print(f"Mean Squared Error: {mse:.2f}") + print(f"R² Score: {r2:.2f}") + print(f"Number of trees: {len(rf_regressor.trees)}")