From 7413dedffdb168f1f55a1490b65ff100001777fc Mon Sep 17 00:00:00 2001 From: Tejasrahane <161036451+Tejasrahane@users.noreply.github.com> Date: Mon, 20 Oct 2025 10:30:59 +0530 Subject: [PATCH 1/7] Add Random Forest Classifier implementation from scratch Implements Random Forest Classifier with: - Decision Tree base learners from scratch - Bootstrap sampling (bagging) - Random feature selection at splits - Majority voting aggregation - Clear docstrings and example usage Part of implementation for issue #13537 --- machine_learning/random_forest_classifier.py | 326 +++++++++++++++++++ 1 file changed, 326 insertions(+) create mode 100644 machine_learning/random_forest_classifier.py diff --git a/machine_learning/random_forest_classifier.py b/machine_learning/random_forest_classifier.py new file mode 100644 index 000000000000..95e1a9c1c6c5 --- /dev/null +++ b/machine_learning/random_forest_classifier.py @@ -0,0 +1,326 @@ +"""Random Forest Classifier implementation from scratch. + +This module implements a Random Forest Classifier using: +- Decision Tree base learners built from scratch +- Bootstrap sampling (bagging) +- Random feature selection at splits +- Majority voting for aggregation +""" + +import numpy as np +from collections import Counter + + +class DecisionTreeClassifier: + """A Decision Tree Classifier built from scratch. + + This tree uses information gain (entropy-based) for splitting decisions. + + Attributes: + max_depth: Maximum depth of the tree + min_samples_split: Minimum samples required to split a node + n_features: Number of features to consider for best split + tree: The built tree structure + """ + + def __init__(self, max_depth=10, min_samples_split=2, n_features=None): + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.n_features = n_features + self.tree = None + + def fit(self, X, y): + """Build the decision tree. + + Args: + X: Training features, shape (n_samples, n_features) + y: Training labels, shape (n_samples,) + """ + self.n_features = X.shape[1] if not self.n_features else min(self.n_features, X.shape[1]) + self.tree = self._grow_tree(X, y) + + def _grow_tree(self, X, y, depth=0): + """Recursively grow the decision tree.""" + n_samples, n_features = X.shape + n_labels = len(np.unique(y)) + + # Stopping criteria + if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split: + leaf_value = self._most_common_label(y) + return {'leaf': True, 'value': leaf_value} + + # Find best split + feat_idxs = np.random.choice(n_features, self.n_features, replace=False) + best_feat, best_thresh = self._best_split(X, y, feat_idxs) + + if best_feat is None: + leaf_value = self._most_common_label(y) + return {'leaf': True, 'value': leaf_value} + + # Split the data + left_idxs = X[:, best_feat] <= best_thresh + right_idxs = ~left_idxs + + # Grow subtrees + left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1) + right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1) + + return { + 'leaf': False, + 'feature': best_feat, + 'threshold': best_thresh, + 'left': left, + 'right': right + } + + def _best_split(self, X, y, feat_idxs): + """Find the best feature and threshold to split on.""" + best_gain = -1 + split_idx, split_thresh = None, None + + for feat_idx in feat_idxs: + X_column = X[:, feat_idx] + thresholds = np.unique(X_column) + + for threshold in thresholds: + gain = self._information_gain(y, X_column, threshold) + + if gain > best_gain: + best_gain = gain + split_idx = feat_idx + split_thresh = threshold + + return split_idx, split_thresh + + def _information_gain(self, y, X_column, threshold): + """Calculate information gain from a split.""" + # Parent entropy + parent_entropy = self._entropy(y) + + # Create children + left_idxs = X_column <= threshold + right_idxs = ~left_idxs + + if np.sum(left_idxs) == 0 or np.sum(right_idxs) == 0: + return 0 + + # Calculate weighted average entropy of children + n = len(y) + n_left, n_right = np.sum(left_idxs), np.sum(right_idxs) + e_left, e_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs]) + child_entropy = (n_left / n) * e_left + (n_right / n) * e_right + + # Information gain + ig = parent_entropy - child_entropy + return ig + + def _entropy(self, y): + """Calculate entropy of a label distribution.""" + hist = np.bincount(y) + ps = hist / len(y) + return -np.sum([p * np.log2(p) for p in ps if p > 0]) + + def _most_common_label(self, y): + """Return the most common label.""" + counter = Counter(y) + return counter.most_common(1)[0][0] + + def predict(self, X): + """Predict class labels for samples in X. + + Args: + X: Features, shape (n_samples, n_features) + + Returns: + Predicted labels, shape (n_samples,) + """ + return np.array([self._traverse_tree(x, self.tree) for x in X]) + + def _traverse_tree(self, x, node): + """Traverse the tree to make a prediction for a single sample.""" + if node['leaf']: + return node['value'] + + if x[node['feature']] <= node['threshold']: + return self._traverse_tree(x, node['left']) + return self._traverse_tree(x, node['right']) + + +class RandomForestClassifier: + """Random Forest Classifier built from scratch. + + Random Forest is an ensemble learning method that constructs multiple + decision trees during training and outputs the mode of the classes + (classification) of the individual trees. + + Features: + - Bootstrap sampling (bagging) to create diverse trees + - Random feature selection at each split + - Majority voting for final predictions + + Attributes: + n_estimators: Number of trees in the forest + max_depth: Maximum depth of each tree + min_samples_split: Minimum samples required to split a node + n_features: Number of features to consider for best split + trees: List of trained decision trees + + Example: + >>> from sklearn.datasets import make_classification + >>> from sklearn.model_selection import train_test_split + >>> from sklearn.metrics import accuracy_score + >>> + >>> # Generate sample data + >>> X, y = make_classification(n_samples=1000, n_features=20, + ... n_informative=15, n_redundant=5, + ... random_state=42) + >>> X_train, X_test, y_train, y_test = train_test_split( + ... X, y, test_size=0.2, random_state=42) + >>> + >>> # Train Random Forest + >>> rf = RandomForestClassifier(n_estimators=10, max_depth=10) + >>> rf.fit(X_train, y_train) + >>> + >>> # Make predictions + >>> y_pred = rf.predict(X_test) + >>> print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}") + """ + + def __init__(self, n_estimators=100, max_depth=10, min_samples_split=2, n_features=None): + """Initialize Random Forest Classifier. + + Args: + n_estimators: Number of trees in the forest (default: 100) + max_depth: Maximum depth of each tree (default: 10) + min_samples_split: Minimum samples required to split (default: 2) + n_features: Number of features to consider for best split. + If None, uses sqrt(n_features) (default: None) + """ + self.n_estimators = n_estimators + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.n_features = n_features + self.trees = [] + + def fit(self, X, y): + """Build a forest of trees from the training set (X, y). + + Args: + X: Training features, shape (n_samples, n_features) + y: Training labels, shape (n_samples,) + + Returns: + self: Fitted classifier + """ + self.trees = [] + n_features = X.shape[1] + + # Default to sqrt of total features if not specified + if self.n_features is None: + self.n_features = int(np.sqrt(n_features)) + + for _ in range(self.n_estimators): + tree = DecisionTreeClassifier( + max_depth=self.max_depth, + min_samples_split=self.min_samples_split, + n_features=self.n_features + ) + X_sample, y_sample = self._bootstrap_sample(X, y) + tree.fit(X_sample, y_sample) + self.trees.append(tree) + + return self + + def _bootstrap_sample(self, X, y): + """Create a bootstrap sample from the dataset. + + Bootstrap sampling randomly samples with replacement from the dataset. + This creates diverse training sets for each tree. + + Args: + X: Features, shape (n_samples, n_features) + y: Labels, shape (n_samples,) + + Returns: + X_sample: Bootstrap sample of features + y_sample: Bootstrap sample of labels + """ + n_samples = X.shape[0] + idxs = np.random.choice(n_samples, n_samples, replace=True) + return X[idxs], y[idxs] + + def predict(self, X): + """Predict class labels for samples in X. + + Uses majority voting: each tree votes for a class, and the + class with the most votes becomes the final prediction. + + Args: + X: Features, shape (n_samples, n_features) + + Returns: + Predicted labels, shape (n_samples,) + """ + # Get predictions from all trees + tree_preds = np.array([tree.predict(X) for tree in self.trees]) + + # Majority voting: transpose to get predictions per sample + # then find most common prediction for each sample + tree_preds = np.swapaxes(tree_preds, 0, 1) + y_pred = [self._most_common_label(tree_pred) for tree_pred in tree_preds] + return np.array(y_pred) + + def _most_common_label(self, y): + """Return the most common label (majority vote).""" + counter = Counter(y) + return counter.most_common(1)[0][0] + + +if __name__ == "__main__": + # Example usage with synthetic data + from sklearn.datasets import make_classification + from sklearn.model_selection import train_test_split + from sklearn.metrics import accuracy_score, classification_report + + print("Random Forest Classifier - Example Usage") + print("=" * 50) + + # Generate sample classification dataset + X, y = make_classification( + n_samples=1000, + n_features=20, + n_informative=15, + n_redundant=5, + random_state=42 + ) + + # Split the data + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.2, random_state=42 + ) + + print(f"Training samples: {X_train.shape[0]}") + print(f"Test samples: {X_test.shape[0]}") + print(f"Number of features: {X_train.shape[1]}") + print() + + # Train Random Forest Classifier + print("Training Random Forest Classifier...") + rf_classifier = RandomForestClassifier( + n_estimators=10, + max_depth=10, + min_samples_split=2 + ) + rf_classifier.fit(X_train, y_train) + print("Training complete!") + print() + + # Make predictions + y_pred = rf_classifier.predict(X_test) + + # Evaluate + accuracy = accuracy_score(y_test, y_pred) + print(f"Accuracy: {accuracy:.4f}") + print() + print("Classification Report:") + print(classification_report(y_test, y_pred)) From e39a2ce76073934e127137d22a33b130993e0ccf Mon Sep 17 00:00:00 2001 From: Tejasrahane <161036451+Tejasrahane@users.noreply.github.com> Date: Mon, 20 Oct 2025 10:52:45 +0530 Subject: [PATCH 2/7] Add Random Forest Regressor implementation from scratch - Implemented DecisionTreeRegressor with MSE-based splitting - Implemented RandomForestRegressor with bootstrap aggregating - Added comprehensive docstrings and examples - Includes doctest and demo usage with sklearn metrics - Completes issue #13537 alongside the classifier implementation --- machine_learning/random_forest_regressor.py | 370 ++++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 machine_learning/random_forest_regressor.py diff --git a/machine_learning/random_forest_regressor.py b/machine_learning/random_forest_regressor.py new file mode 100644 index 000000000000..dda583b0dafb --- /dev/null +++ b/machine_learning/random_forest_regressor.py @@ -0,0 +1,370 @@ +"""Random Forest Regressor implementation from scratch.""" + +import numpy as np +from collections import Counter + + +class DecisionTreeRegressor: + """ + A simple decision tree regressor implementation. + + Parameters + ---------- + max_depth : int, optional (default=None) + The maximum depth of the tree. + min_samples_split : int, optional (default=2) + The minimum number of samples required to split an internal node. + + Examples + -------- + >>> X = np.array([[1], [2], [3], [4], [5]]) + >>> y = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) + >>> tree = DecisionTreeRegressor(max_depth=2) + >>> tree.fit(X, y) + >>> predictions = tree.predict(X) + >>> np.allclose(predictions, y, atol=0.5) + True + """ + + def __init__(self, max_depth=None, min_samples_split=2): + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.tree = None + + def fit(self, X, y): + """ + Build a decision tree regressor from the training set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + y : array-like of shape (n_samples,) + The target values. + + Returns + ------- + self : object + Fitted estimator. + """ + self.tree = self._grow_tree(X, y) + return self + + def _grow_tree(self, X, y, depth=0): + """ + Recursively grow the decision tree. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training samples. + y : array-like of shape (n_samples,) + Target values. + depth : int, optional (default=0) + Current depth of the tree. + + Returns + ------- + node : dict + A node in the decision tree. + """ + n_samples, n_features = X.shape + + # Stopping criteria + if ( + depth == self.max_depth + or n_samples < self.min_samples_split + or len(np.unique(y)) == 1 + ): + return {"value": np.mean(y)} + + # Find the best split + best_split = self._best_split(X, y, n_features) + if best_split is None: + return {"value": np.mean(y)} + + # Recursively build the tree + left_indices = X[:, best_split["feature"]] <= best_split["threshold"] + right_indices = ~left_indices + + left_subtree = self._grow_tree(X[left_indices], y[left_indices], depth + 1) + right_subtree = self._grow_tree( + X[right_indices], y[right_indices], depth + 1 + ) + + return { + "feature": best_split["feature"], + "threshold": best_split["threshold"], + "left": left_subtree, + "right": right_subtree, + } + + def _best_split(self, X, y, n_features): + """ + Find the best feature and threshold to split on. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + Training samples. + y : array-like of shape (n_samples,) + Target values. + n_features : int + Number of features to consider. + + Returns + ------- + best_split : dict or None + The best split configuration. + """ + best_mse = float("inf") + best_split = None + + for feature in range(n_features): + thresholds = np.unique(X[:, feature]) + for threshold in thresholds: + left_indices = X[:, feature] <= threshold + right_indices = ~left_indices + + if np.sum(left_indices) == 0 or np.sum(right_indices) == 0: + continue + + mse = self._calculate_mse( + y[left_indices], y[right_indices], len(y) + ) + + if mse < best_mse: + best_mse = mse + best_split = {"feature": feature, "threshold": threshold} + + return best_split + + def _calculate_mse(self, left_y, right_y, n_samples): + """ + Calculate weighted mean squared error for a split. + + Parameters + ---------- + left_y : array-like + Target values in the left split. + right_y : array-like + Target values in the right split. + n_samples : int + Total number of samples. + + Returns + ------- + mse : float + Weighted mean squared error. + """ + n_left, n_right = len(left_y), len(right_y) + mse_left = np.var(left_y) if n_left > 0 else 0 + mse_right = np.var(right_y) if n_right > 0 else 0 + return (n_left / n_samples) * mse_left + (n_right / n_samples) * mse_right + + def predict(self, X): + """ + Predict target values for X. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + y_pred : array-like of shape (n_samples,) + The predicted values. + """ + return np.array([self._predict_sample(sample, self.tree) for sample in X]) + + def _predict_sample(self, sample, tree): + """ + Predict the target value for a single sample. + + Parameters + ---------- + sample : array-like + A single sample. + tree : dict + The decision tree node. + + Returns + ------- + prediction : float + The predicted value. + """ + if "value" in tree: + return tree["value"] + + if sample[tree["feature"]] <= tree["threshold"]: + return self._predict_sample(sample, tree["left"]) + return self._predict_sample(sample, tree["right"]) + + +class RandomForestRegressor: + """ + Random Forest Regressor implementation from scratch. + + A random forest is an ensemble of decision trees, generally trained via + the bagging method. The predictions are made by averaging the predictions + of individual trees. + + Parameters + ---------- + n_estimators : int, optional (default=100) + The number of trees in the forest. + max_depth : int, optional (default=None) + The maximum depth of the trees. + min_samples_split : int, optional (default=2) + The minimum number of samples required to split an internal node. + max_features : int, str or None, optional (default='sqrt') + The number of features to consider when looking for the best split. + - If int, then consider max_features features at each split. + - If 'sqrt', then max_features=sqrt(n_features). + - If None, then max_features=n_features. + random_state : int or None, optional (default=None) + Controls the randomness of the estimator. + + Examples + -------- + >>> X = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [5, 6]]) + >>> y = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) + >>> rf = RandomForestRegressor(n_estimators=5, max_depth=2, random_state=42) + >>> rf.fit(X, y) + >>> predictions = rf.predict(X) + >>> len(predictions) == len(y) + True + >>> np.all((predictions >= y.min()) & (predictions <= y.max())) + True + """ + + def __init__( + self, + n_estimators=100, + max_depth=None, + min_samples_split=2, + max_features="sqrt", + random_state=None, + ): + self.n_estimators = n_estimators + self.max_depth = max_depth + self.min_samples_split = min_samples_split + self.max_features = max_features + self.random_state = random_state + self.trees = [] + + def fit(self, X, y): + """ + Build a random forest regressor from the training set (X, y). + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The training input samples. + y : array-like of shape (n_samples,) + The target values. + + Returns + ------- + self : object + Fitted estimator. + """ + np.random.seed(self.random_state) + X = np.array(X) + y = np.array(y) + + n_samples, n_features = X.shape + + # Determine max_features + if self.max_features == "sqrt": + max_features = int(np.sqrt(n_features)) + elif self.max_features is None: + max_features = n_features + else: + max_features = self.max_features + + self.trees = [] + for _ in range(self.n_estimators): + # Bootstrap sampling + indices = np.random.choice(n_samples, n_samples, replace=True) + X_bootstrap = X[indices] + y_bootstrap = y[indices] + + # Feature sampling + feature_indices = np.random.choice( + n_features, max_features, replace=False + ) + X_bootstrap = X_bootstrap[:, feature_indices] + + # Train decision tree + tree = DecisionTreeRegressor( + max_depth=self.max_depth, min_samples_split=self.min_samples_split + ) + tree.fit(X_bootstrap, y_bootstrap) + + self.trees.append((tree, feature_indices)) + + return self + + def predict(self, X): + """ + Predict target values for X. + + Parameters + ---------- + X : array-like of shape (n_samples, n_features) + The input samples. + + Returns + ------- + y_pred : array-like of shape (n_samples,) + The predicted values (average of all tree predictions). + """ + X = np.array(X) + predictions = [] + + for tree, feature_indices in self.trees: + X_subset = X[:, feature_indices] + predictions.append(tree.predict(X_subset)) + + # Average predictions from all trees + return np.mean(predictions, axis=0) + + +if __name__ == "__main__": + import doctest + + doctest.testmod() + + # Example usage + from sklearn.datasets import make_regression + from sklearn.model_selection import train_test_split + from sklearn.metrics import mean_squared_error, r2_score + + # Generate synthetic regression data + X, y = make_regression( + n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42 + ) + + # Split the data + X_train, X_test, y_train, y_test = train_test_split( + X, y, test_size=0.3, random_state=42 + ) + + # Train the Random Forest Regressor + rf_regressor = RandomForestRegressor( + n_estimators=10, max_depth=5, random_state=42 + ) + rf_regressor.fit(X_train, y_train) + + # Make predictions + y_pred = rf_regressor.predict(X_test) + + # Evaluate the model + mse = mean_squared_error(y_test, y_pred) + r2 = r2_score(y_test, y_pred) + + print(f"Mean Squared Error: {mse:.2f}") + print(f"R² Score: {r2:.2f}") + print(f"Number of trees: {len(rf_regressor.trees)}") From e0ef096d5ccbfcce1870590e04bb116ea2008374 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 20 Oct 2025 05:24:54 +0000 Subject: [PATCH 3/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- machine_learning/random_forest_classifier.py | 92 ++++++++++---------- machine_learning/random_forest_regressor.py | 16 +--- 2 files changed, 51 insertions(+), 57 deletions(-) diff --git a/machine_learning/random_forest_classifier.py b/machine_learning/random_forest_classifier.py index 95e1a9c1c6c5..8fffcf199cc5 100644 --- a/machine_learning/random_forest_classifier.py +++ b/machine_learning/random_forest_classifier.py @@ -13,9 +13,9 @@ class DecisionTreeClassifier: """A Decision Tree Classifier built from scratch. - + This tree uses information gain (entropy-based) for splitting decisions. - + Attributes: max_depth: Maximum depth of the tree min_samples_split: Minimum samples required to split a node @@ -31,12 +31,14 @@ def __init__(self, max_depth=10, min_samples_split=2, n_features=None): def fit(self, X, y): """Build the decision tree. - + Args: X: Training features, shape (n_samples, n_features) y: Training labels, shape (n_samples,) """ - self.n_features = X.shape[1] if not self.n_features else min(self.n_features, X.shape[1]) + self.n_features = ( + X.shape[1] if not self.n_features else min(self.n_features, X.shape[1]) + ) self.tree = self._grow_tree(X, y) def _grow_tree(self, X, y, depth=0): @@ -45,9 +47,13 @@ def _grow_tree(self, X, y, depth=0): n_labels = len(np.unique(y)) # Stopping criteria - if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split: + if ( + depth >= self.max_depth + or n_labels == 1 + or n_samples < self.min_samples_split + ): leaf_value = self._most_common_label(y) - return {'leaf': True, 'value': leaf_value} + return {"leaf": True, "value": leaf_value} # Find best split feat_idxs = np.random.choice(n_features, self.n_features, replace=False) @@ -55,7 +61,7 @@ def _grow_tree(self, X, y, depth=0): if best_feat is None: leaf_value = self._most_common_label(y) - return {'leaf': True, 'value': leaf_value} + return {"leaf": True, "value": leaf_value} # Split the data left_idxs = X[:, best_feat] <= best_thresh @@ -66,11 +72,11 @@ def _grow_tree(self, X, y, depth=0): right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1) return { - 'leaf': False, - 'feature': best_feat, - 'threshold': best_thresh, - 'left': left, - 'right': right + "leaf": False, + "feature": best_feat, + "threshold": best_thresh, + "left": left, + "right": right, } def _best_split(self, X, y, feat_idxs): @@ -127,10 +133,10 @@ def _most_common_label(self, y): def predict(self, X): """Predict class labels for samples in X. - + Args: X: Features, shape (n_samples, n_features) - + Returns: Predicted labels, shape (n_samples,) """ @@ -138,33 +144,33 @@ def predict(self, X): def _traverse_tree(self, x, node): """Traverse the tree to make a prediction for a single sample.""" - if node['leaf']: - return node['value'] + if node["leaf"]: + return node["value"] - if x[node['feature']] <= node['threshold']: - return self._traverse_tree(x, node['left']) - return self._traverse_tree(x, node['right']) + if x[node["feature"]] <= node["threshold"]: + return self._traverse_tree(x, node["left"]) + return self._traverse_tree(x, node["right"]) class RandomForestClassifier: """Random Forest Classifier built from scratch. - + Random Forest is an ensemble learning method that constructs multiple decision trees during training and outputs the mode of the classes (classification) of the individual trees. - + Features: - Bootstrap sampling (bagging) to create diverse trees - Random feature selection at each split - Majority voting for final predictions - + Attributes: n_estimators: Number of trees in the forest max_depth: Maximum depth of each tree min_samples_split: Minimum samples required to split a node n_features: Number of features to consider for best split trees: List of trained decision trees - + Example: >>> from sklearn.datasets import make_classification >>> from sklearn.model_selection import train_test_split @@ -186,9 +192,11 @@ class RandomForestClassifier: >>> print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}") """ - def __init__(self, n_estimators=100, max_depth=10, min_samples_split=2, n_features=None): + def __init__( + self, n_estimators=100, max_depth=10, min_samples_split=2, n_features=None + ): """Initialize Random Forest Classifier. - + Args: n_estimators: Number of trees in the forest (default: 100) max_depth: Maximum depth of each tree (default: 10) @@ -204,17 +212,17 @@ def __init__(self, n_estimators=100, max_depth=10, min_samples_split=2, n_featur def fit(self, X, y): """Build a forest of trees from the training set (X, y). - + Args: X: Training features, shape (n_samples, n_features) y: Training labels, shape (n_samples,) - + Returns: self: Fitted classifier """ self.trees = [] n_features = X.shape[1] - + # Default to sqrt of total features if not specified if self.n_features is None: self.n_features = int(np.sqrt(n_features)) @@ -223,24 +231,24 @@ def fit(self, X, y): tree = DecisionTreeClassifier( max_depth=self.max_depth, min_samples_split=self.min_samples_split, - n_features=self.n_features + n_features=self.n_features, ) X_sample, y_sample = self._bootstrap_sample(X, y) tree.fit(X_sample, y_sample) self.trees.append(tree) - + return self def _bootstrap_sample(self, X, y): """Create a bootstrap sample from the dataset. - + Bootstrap sampling randomly samples with replacement from the dataset. This creates diverse training sets for each tree. - + Args: X: Features, shape (n_samples, n_features) y: Labels, shape (n_samples,) - + Returns: X_sample: Bootstrap sample of features y_sample: Bootstrap sample of labels @@ -251,19 +259,19 @@ def _bootstrap_sample(self, X, y): def predict(self, X): """Predict class labels for samples in X. - + Uses majority voting: each tree votes for a class, and the class with the most votes becomes the final prediction. - + Args: X: Features, shape (n_samples, n_features) - + Returns: Predicted labels, shape (n_samples,) """ # Get predictions from all trees tree_preds = np.array([tree.predict(X) for tree in self.trees]) - + # Majority voting: transpose to get predictions per sample # then find most common prediction for each sample tree_preds = np.swapaxes(tree_preds, 0, 1) @@ -287,11 +295,7 @@ def _most_common_label(self, y): # Generate sample classification dataset X, y = make_classification( - n_samples=1000, - n_features=20, - n_informative=15, - n_redundant=5, - random_state=42 + n_samples=1000, n_features=20, n_informative=15, n_redundant=5, random_state=42 ) # Split the data @@ -307,9 +311,7 @@ def _most_common_label(self, y): # Train Random Forest Classifier print("Training Random Forest Classifier...") rf_classifier = RandomForestClassifier( - n_estimators=10, - max_depth=10, - min_samples_split=2 + n_estimators=10, max_depth=10, min_samples_split=2 ) rf_classifier.fit(X_train, y_train) print("Training complete!") diff --git a/machine_learning/random_forest_regressor.py b/machine_learning/random_forest_regressor.py index dda583b0dafb..e0f7628a3aed 100644 --- a/machine_learning/random_forest_regressor.py +++ b/machine_learning/random_forest_regressor.py @@ -88,9 +88,7 @@ def _grow_tree(self, X, y, depth=0): right_indices = ~left_indices left_subtree = self._grow_tree(X[left_indices], y[left_indices], depth + 1) - right_subtree = self._grow_tree( - X[right_indices], y[right_indices], depth + 1 - ) + right_subtree = self._grow_tree(X[right_indices], y[right_indices], depth + 1) return { "feature": best_split["feature"], @@ -129,9 +127,7 @@ def _best_split(self, X, y, n_features): if np.sum(left_indices) == 0 or np.sum(right_indices) == 0: continue - mse = self._calculate_mse( - y[left_indices], y[right_indices], len(y) - ) + mse = self._calculate_mse(y[left_indices], y[right_indices], len(y)) if mse < best_mse: best_mse = mse @@ -292,9 +288,7 @@ def fit(self, X, y): y_bootstrap = y[indices] # Feature sampling - feature_indices = np.random.choice( - n_features, max_features, replace=False - ) + feature_indices = np.random.choice(n_features, max_features, replace=False) X_bootstrap = X_bootstrap[:, feature_indices] # Train decision tree @@ -353,9 +347,7 @@ def predict(self, X): ) # Train the Random Forest Regressor - rf_regressor = RandomForestRegressor( - n_estimators=10, max_depth=5, random_state=42 - ) + rf_regressor = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42) rf_regressor.fit(X_train, y_train) # Make predictions From 5d819b8a44e34a3d2ad85a51fbc2798a0c354a81 Mon Sep 17 00:00:00 2001 From: Tejasrahane <161036451+Tejasrahane@users.noreply.github.com> Date: Tue, 21 Oct 2025 08:13:02 +0530 Subject: [PATCH 4/7] Add type hints, snake_case, and doctests to RandomForestClassifier module - Annotate all function parameters and return types - Rename variables to snake_case (x_column, x_bootstrap, x_subset, x_train/x_test) - Add/expand doctests for public and core internal functions - Address algorithms-keeper review comments --- machine_learning/random_forest_classifier.py | 346 +++++++++++-------- 1 file changed, 206 insertions(+), 140 deletions(-) diff --git a/machine_learning/random_forest_classifier.py b/machine_learning/random_forest_classifier.py index 8fffcf199cc5..1fa02373299a 100644 --- a/machine_learning/random_forest_classifier.py +++ b/machine_learning/random_forest_classifier.py @@ -5,10 +5,19 @@ - Bootstrap sampling (bagging) - Random feature selection at splits - Majority voting for aggregation + +References: +- https://en.wikipedia.org/wiki/Random_forest +- https://en.wikipedia.org/wiki/Decision_tree_learning """ +from __future__ import annotations -import numpy as np from collections import Counter +from typing import Any, Dict, List, Optional, Sequence, Tuple + +import numpy as np + +TreeNode = Dict[str, Any] class DecisionTreeClassifier: @@ -23,133 +32,181 @@ class DecisionTreeClassifier: tree: The built tree structure """ - def __init__(self, max_depth=10, min_samples_split=2, n_features=None): - self.max_depth = max_depth - self.min_samples_split = min_samples_split - self.n_features = n_features - self.tree = None - - def fit(self, X, y): + def __init__( + self, + max_depth: int = 10, + min_samples_split: int = 2, + n_features: Optional[int] = None, + ) -> None: + self.max_depth: int = max_depth + self.min_samples_split: int = min_samples_split + self.n_features: Optional[int] = n_features + self.tree: Optional[TreeNode] = None + + def fit(self, x: np.ndarray, y: np.ndarray) -> None: """Build the decision tree. Args: - X: Training features, shape (n_samples, n_features) + x: Training features, shape (n_samples, n_features) y: Training labels, shape (n_samples,) + + >>> clf = DecisionTreeClassifier(max_depth=1, min_samples_split=2, n_features=1) + >>> x = np.array([[0.0], [0.0], [1.0], [1.0]]) + >>> y = np.array([0, 0, 1, 1]) + >>> clf.fit(x, y) + >>> isinstance(clf.tree, dict) + True """ + n_total_features = x.shape[1] self.n_features = ( - X.shape[1] if not self.n_features else min(self.n_features, X.shape[1]) + n_total_features if self.n_features in (None, 0) else min(self.n_features, n_total_features) ) - self.tree = self._grow_tree(X, y) + self.tree = self._grow_tree(x, y, depth=0) + + def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNode: + """Recursively grow the decision tree. - def _grow_tree(self, X, y, depth=0): - """Recursively grow the decision tree.""" - n_samples, n_features = X.shape + >>> clf = DecisionTreeClassifier(max_depth=0) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> node = clf._grow_tree(x, y, depth=0) + >>> node['leaf'] + True + """ + n_samples, n_features = x.shape n_labels = len(np.unique(y)) # Stopping criteria - if ( - depth >= self.max_depth - or n_labels == 1 - or n_samples < self.min_samples_split - ): + if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split: leaf_value = self._most_common_label(y) - return {"leaf": True, "value": leaf_value} + return {"leaf": True, "value": int(leaf_value)} # Find best split - feat_idxs = np.random.choice(n_features, self.n_features, replace=False) - best_feat, best_thresh = self._best_split(X, y, feat_idxs) - + rng = np.random.default_rng() + feat_indices = rng.choice(n_features, int(self.n_features), replace=False) + best_feat, best_thresh = self._best_split(x, y, feat_indices) if best_feat is None: leaf_value = self._most_common_label(y) - return {"leaf": True, "value": leaf_value} + return {"leaf": True, "value": int(leaf_value)} # Split the data - left_idxs = X[:, best_feat] <= best_thresh - right_idxs = ~left_idxs + left_mask = x[:, best_feat] <= best_thresh + right_mask = ~left_mask # Grow subtrees - left = self._grow_tree(X[left_idxs], y[left_idxs], depth + 1) - right = self._grow_tree(X[right_idxs], y[right_idxs], depth + 1) - + left = self._grow_tree(x[left_mask], y[left_mask], depth + 1) + right = self._grow_tree(x[right_mask], y[right_mask], depth + 1) return { "leaf": False, - "feature": best_feat, - "threshold": best_thresh, + "feature": int(best_feat), + "threshold": float(best_thresh), "left": left, "right": right, } - def _best_split(self, X, y, feat_idxs): - """Find the best feature and threshold to split on.""" - best_gain = -1 - split_idx, split_thresh = None, None - - for feat_idx in feat_idxs: - X_column = X[:, feat_idx] - thresholds = np.unique(X_column) + def _best_split( + self, x: np.ndarray, y: np.ndarray, feat_indices: Sequence[int] + ) -> Tuple[Optional[int], Optional[float]]: + """Find the best feature and threshold to split on. + + >>> clf = DecisionTreeClassifier() + >>> x = np.array([[0.0], [0.5], [1.0]]) + >>> y = np.array([0, 0, 1]) + >>> feat, thresh = clf._best_split(x, y, [0]) + >>> feat in (None, 0) + True + """ + best_gain = -np.inf + split_idx: Optional[int] = None + split_thresh: Optional[float] = None + for feat_idx in feat_indices: + x_column = x[:, int(feat_idx)] + thresholds = np.unique(x_column) for threshold in thresholds: - gain = self._information_gain(y, X_column, threshold) - + gain = self._information_gain(y, x_column, float(threshold)) if gain > best_gain: best_gain = gain - split_idx = feat_idx - split_thresh = threshold - + split_idx = int(feat_idx) + split_thresh = float(threshold) return split_idx, split_thresh - def _information_gain(self, y, X_column, threshold): - """Calculate information gain from a split.""" + def _information_gain(self, y: np.ndarray, x_column: np.ndarray, threshold: float) -> float: + """Calculate information gain from a split. + + >>> y = np.array([0, 0, 1, 1]) + >>> x_col = np.array([0.0, 0.2, 0.8, 1.0]) + >>> DecisionTreeClassifier()._information_gain(y, x_col, 0.5) >= 0.0 + True + """ # Parent entropy parent_entropy = self._entropy(y) # Create children - left_idxs = X_column <= threshold - right_idxs = ~left_idxs - - if np.sum(left_idxs) == 0 or np.sum(right_idxs) == 0: - return 0 + left_mask = x_column <= threshold + right_mask = ~left_mask + if np.sum(left_mask) == 0 or np.sum(right_mask) == 0: + return 0.0 # Calculate weighted average entropy of children n = len(y) - n_left, n_right = np.sum(left_idxs), np.sum(right_idxs) - e_left, e_right = self._entropy(y[left_idxs]), self._entropy(y[right_idxs]) + n_left, n_right = int(np.sum(left_mask)), int(np.sum(right_mask)) + e_left, e_right = self._entropy(y[left_mask]), self._entropy(y[right_mask]) child_entropy = (n_left / n) * e_left + (n_right / n) * e_right # Information gain ig = parent_entropy - child_entropy - return ig + return float(ig) + + def _entropy(self, y: np.ndarray) -> float: + """Calculate entropy of a label distribution. - def _entropy(self, y): - """Calculate entropy of a label distribution.""" + >>> DecisionTreeClassifier()._entropy(np.array([0, 0, 1, 1])) >= 0 + True + """ hist = np.bincount(y) ps = hist / len(y) - return -np.sum([p * np.log2(p) for p in ps if p > 0]) + return float(-np.sum([p * np.log2(p) for p in ps if p > 0])) - def _most_common_label(self, y): - """Return the most common label.""" - counter = Counter(y) - return counter.most_common(1)[0][0] + def _most_common_label(self, y: np.ndarray) -> int: + """Return the most common label. - def predict(self, X): - """Predict class labels for samples in X. + >>> DecisionTreeClassifier()._most_common_label(np.array([0, 1, 1])) + 1 + """ + counter = Counter(y.tolist()) + return int(counter.most_common(1)[0][0]) - Args: - X: Features, shape (n_samples, n_features) + def predict(self, x: np.ndarray) -> np.ndarray: + """Predict class labels for samples in x. + Args: + x: Features, shape (n_samples, n_features) Returns: Predicted labels, shape (n_samples,) + + >>> clf = DecisionTreeClassifier(max_depth=1, n_features=1) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> clf.fit(x, y) + >>> clf.predict(x).tolist() + [0, 1] """ - return np.array([self._traverse_tree(x, self.tree) for x in X]) + assert self.tree is not None, "Model is not fitted. Call fit first." + return np.array([self._traverse_tree(row, self.tree) for row in x]) - def _traverse_tree(self, x, node): - """Traverse the tree to make a prediction for a single sample.""" - if node["leaf"]: - return node["value"] + def _traverse_tree(self, x_row: np.ndarray, node: TreeNode) -> int: + """Traverse the tree to make a prediction for a single sample. - if x[node["feature"]] <= node["threshold"]: - return self._traverse_tree(x, node["left"]) - return self._traverse_tree(x, node["right"]) + >>> node = {"leaf": True, "value": 1} + >>> DecisionTreeClassifier()._traverse_tree(np.array([0.0]), node) + 1 + """ + if node["leaf"]: + return int(node["value"]) + if x_row[int(node["feature"])] <= float(node["threshold"]): + return self._traverse_tree(x_row, node["left"]) # type: ignore[arg-type] + return self._traverse_tree(x_row, node["right"]) # type: ignore[arg-type] class RandomForestClassifier: @@ -174,27 +231,22 @@ class RandomForestClassifier: Example: >>> from sklearn.datasets import make_classification >>> from sklearn.model_selection import train_test_split - >>> from sklearn.metrics import accuracy_score - >>> - >>> # Generate sample data - >>> X, y = make_classification(n_samples=1000, n_features=20, - ... n_informative=15, n_redundant=5, - ... random_state=42) - >>> X_train, X_test, y_train, y_test = train_test_split( - ... X, y, test_size=0.2, random_state=42) - >>> - >>> # Train Random Forest - >>> rf = RandomForestClassifier(n_estimators=10, max_depth=10) - >>> rf.fit(X_train, y_train) - >>> - >>> # Make predictions - >>> y_pred = rf.predict(X_test) - >>> print(f"Accuracy: {accuracy_score(y_test, y_pred):.2f}") + >>> x, y = make_classification(n_samples=200, n_features=10, random_state=0) + >>> x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.25, random_state=0) + >>> rf = RandomForestClassifier(n_estimators=5, max_depth=5, n_features=3) + >>> _ = rf.fit(x_train, y_train) + >>> y_pred = rf.predict(x_test) + >>> isinstance(y_pred, np.ndarray) + True """ def __init__( - self, n_estimators=100, max_depth=10, min_samples_split=2, n_features=None - ): + self, + n_estimators: int = 100, + max_depth: int = 10, + min_samples_split: int = 2, + n_features: Optional[int] = None, + ) -> None: """Initialize Random Forest Classifier. Args: @@ -204,125 +256,139 @@ def __init__( n_features: Number of features to consider for best split. If None, uses sqrt(n_features) (default: None) """ - self.n_estimators = n_estimators - self.max_depth = max_depth - self.min_samples_split = min_samples_split - self.n_features = n_features - self.trees = [] + self.n_estimators: int = n_estimators + self.max_depth: int = max_depth + self.min_samples_split: int = min_samples_split + self.n_features: Optional[int] = n_features + self.trees: List[DecisionTreeClassifier] = [] - def fit(self, X, y): - """Build a forest of trees from the training set (X, y). + def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestClassifier": + """Build a forest of trees from the training set (x, y). Args: - X: Training features, shape (n_samples, n_features) + x: Training features, shape (n_samples, n_features) y: Training labels, shape (n_samples,) - Returns: self: Fitted classifier + + >>> rf = RandomForestClassifier(n_estimators=2, max_depth=2, n_features=1) + >>> x = np.array([[0.0], [0.1], [0.9], [1.0]]) + >>> y = np.array([0, 0, 1, 1]) + >>> isinstance(rf.fit(x, y), RandomForestClassifier) + True """ self.trees = [] - n_features = X.shape[1] - + n_features = x.shape[1] # Default to sqrt of total features if not specified if self.n_features is None: self.n_features = int(np.sqrt(n_features)) - for _ in range(self.n_estimators): tree = DecisionTreeClassifier( max_depth=self.max_depth, min_samples_split=self.min_samples_split, n_features=self.n_features, ) - X_sample, y_sample = self._bootstrap_sample(X, y) - tree.fit(X_sample, y_sample) + x_sample, y_sample = self._bootstrap_sample(x, y) + tree.fit(x_sample, y_sample) self.trees.append(tree) - return self - def _bootstrap_sample(self, X, y): + def _bootstrap_sample(self, x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """Create a bootstrap sample from the dataset. Bootstrap sampling randomly samples with replacement from the dataset. This creates diverse training sets for each tree. Args: - X: Features, shape (n_samples, n_features) + x: Features, shape (n_samples, n_features) y: Labels, shape (n_samples,) - Returns: - X_sample: Bootstrap sample of features + x_sample: Bootstrap sample of features y_sample: Bootstrap sample of labels + + >>> rf = RandomForestClassifier() + >>> x = np.arange(10).reshape(5, 2).astype(float) + >>> y = np.array([0, 1, 0, 1, 0]) + >>> xs, ys = rf._bootstrap_sample(x, y) + >>> xs.shape[0] == x.shape[0] == ys.shape[0] + True """ - n_samples = X.shape[0] - idxs = np.random.choice(n_samples, n_samples, replace=True) - return X[idxs], y[idxs] + n_samples = x.shape[0] + rng = np.random.default_rng() + idxs = rng.choice(n_samples, n_samples, replace=True) + return x[idxs], y[idxs] - def predict(self, X): - """Predict class labels for samples in X. + def predict(self, x: np.ndarray) -> np.ndarray: + """Predict class labels for samples in x. Uses majority voting: each tree votes for a class, and the class with the most votes becomes the final prediction. Args: - X: Features, shape (n_samples, n_features) - + x: Features, shape (n_samples, n_features) Returns: Predicted labels, shape (n_samples,) + + >>> rf = RandomForestClassifier(n_estimators=3, max_depth=2, n_features=1) + >>> x = np.array([[0.0], [1.0]]) + >>> y = np.array([0, 1]) + >>> _ = rf.fit(x, y) + >>> rf.predict(x).shape + (2,) """ + if not self.trees: + raise RuntimeError("Model is not fitted. Call fit first.") # Get predictions from all trees - tree_preds = np.array([tree.predict(X) for tree in self.trees]) - - # Majority voting: transpose to get predictions per sample - # then find most common prediction for each sample + tree_preds = np.array([tree.predict(x) for tree in self.trees]) + # Majority voting: transpose to get predictions per sample then most common tree_preds = np.swapaxes(tree_preds, 0, 1) - y_pred = [self._most_common_label(tree_pred) for tree_pred in tree_preds] + y_pred = [self._most_common_label(sample_preds) for sample_preds in tree_preds] return np.array(y_pred) - def _most_common_label(self, y): - """Return the most common label (majority vote).""" - counter = Counter(y) - return counter.most_common(1)[0][0] + def _most_common_label(self, y: Sequence[int]) -> int: + """Return the most common label (majority vote). + + >>> RandomForestClassifier()._most_common_label([0, 1, 1]) + 1 + """ + counter = Counter(list(map(int, y))) + return int(counter.most_common(1)[0][0]) if __name__ == "__main__": # Example usage with synthetic data from sklearn.datasets import make_classification - from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score, classification_report + from sklearn.model_selection import train_test_split print("Random Forest Classifier - Example Usage") print("=" * 50) # Generate sample classification dataset - X, y = make_classification( + x, y = make_classification( n_samples=1000, n_features=20, n_informative=15, n_redundant=5, random_state=42 ) # Split the data - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=0.2, random_state=42 - ) + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42) - print(f"Training samples: {X_train.shape[0]}") - print(f"Test samples: {X_test.shape[0]}") - print(f"Number of features: {X_train.shape[1]}") + print(f"Training samples: {x_train.shape[0]}") + print(f"Test samples: {x_test.shape[0]}") + print(f"Number of features: {x_train.shape[1]}") print() # Train Random Forest Classifier print("Training Random Forest Classifier...") - rf_classifier = RandomForestClassifier( - n_estimators=10, max_depth=10, min_samples_split=2 - ) - rf_classifier.fit(X_train, y_train) + rf_classifier = RandomForestClassifier(n_estimators=10, max_depth=10, min_samples_split=2) + rf_classifier.fit(x_train, y_train) print("Training complete!") print() # Make predictions - y_pred = rf_classifier.predict(X_test) + y_pred = rf_classifier.predict(x_test) # Evaluate accuracy = accuracy_score(y_test, y_pred) print(f"Accuracy: {accuracy:.4f}") print() print("Classification Report:") - print(classification_report(y_test, y_pred)) From d2d7392b2489812171e2a5b34247c7b8b2e6f745 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 02:43:22 +0000 Subject: [PATCH 5/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- machine_learning/random_forest_classifier.py | 27 +++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/machine_learning/random_forest_classifier.py b/machine_learning/random_forest_classifier.py index 1fa02373299a..a6ff359aff1c 100644 --- a/machine_learning/random_forest_classifier.py +++ b/machine_learning/random_forest_classifier.py @@ -10,6 +10,7 @@ - https://en.wikipedia.org/wiki/Random_forest - https://en.wikipedia.org/wiki/Decision_tree_learning """ + from __future__ import annotations from collections import Counter @@ -59,7 +60,9 @@ def fit(self, x: np.ndarray, y: np.ndarray) -> None: """ n_total_features = x.shape[1] self.n_features = ( - n_total_features if self.n_features in (None, 0) else min(self.n_features, n_total_features) + n_total_features + if self.n_features in (None, 0) + else min(self.n_features, n_total_features) ) self.tree = self._grow_tree(x, y, depth=0) @@ -77,7 +80,11 @@ def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNode: n_labels = len(np.unique(y)) # Stopping criteria - if depth >= self.max_depth or n_labels == 1 or n_samples < self.min_samples_split: + if ( + depth >= self.max_depth + or n_labels == 1 + or n_samples < self.min_samples_split + ): leaf_value = self._most_common_label(y) return {"leaf": True, "value": int(leaf_value)} @@ -131,7 +138,9 @@ def _best_split( split_thresh = float(threshold) return split_idx, split_thresh - def _information_gain(self, y: np.ndarray, x_column: np.ndarray, threshold: float) -> float: + def _information_gain( + self, y: np.ndarray, x_column: np.ndarray, threshold: float + ) -> float: """Calculate information gain from a split. >>> y = np.array([0, 0, 1, 1]) @@ -293,7 +302,9 @@ def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestClassifier": self.trees.append(tree) return self - def _bootstrap_sample(self, x: np.ndarray, y: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: + def _bootstrap_sample( + self, x: np.ndarray, y: np.ndarray + ) -> Tuple[np.ndarray, np.ndarray]: """Create a bootstrap sample from the dataset. Bootstrap sampling randomly samples with replacement from the dataset. @@ -370,7 +381,9 @@ def _most_common_label(self, y: Sequence[int]) -> int: ) # Split the data - x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42) + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.2, random_state=42 + ) print(f"Training samples: {x_train.shape[0]}") print(f"Test samples: {x_test.shape[0]}") @@ -379,7 +392,9 @@ def _most_common_label(self, y: Sequence[int]) -> int: # Train Random Forest Classifier print("Training Random Forest Classifier...") - rf_classifier = RandomForestClassifier(n_estimators=10, max_depth=10, min_samples_split=2) + rf_classifier = RandomForestClassifier( + n_estimators=10, max_depth=10, min_samples_split=2 + ) rf_classifier.fit(x_train, y_train) print("Training complete!") print() From 16647575f0966329fcb00bad3dc193ab86094039 Mon Sep 17 00:00:00 2001 From: Tejasrahane <161036451+Tejasrahane@users.noreply.github.com> Date: Tue, 21 Oct 2025 08:15:03 +0530 Subject: [PATCH 6/7] Add type hints, snake_case, and doctests to RandomForestRegressor module - Annotate all parameters and return types across tree and forest - Rename variables to snake_case (x_bootstrap, x_subset, etc.) - Add doctests for predict, _best_split, _calculate_mse, and class examples - Replace RNG usage with numpy Generator for determinism --- machine_learning/random_forest_regressor.py | 218 +++++++++----------- 1 file changed, 102 insertions(+), 116 deletions(-) diff --git a/machine_learning/random_forest_regressor.py b/machine_learning/random_forest_regressor.py index e0f7628a3aed..064020e26ae8 100644 --- a/machine_learning/random_forest_regressor.py +++ b/machine_learning/random_forest_regressor.py @@ -1,7 +1,16 @@ -"""Random Forest Regressor implementation from scratch.""" +"""Random Forest Regressor implementation from scratch. + +References: +- https://en.wikipedia.org/wiki/Random_forest +- https://en.wikipedia.org/wiki/Decision_tree_learning +""" +from __future__ import annotations + +from typing import Any, Dict, List, Optional, Sequence, Tuple import numpy as np -from collections import Counter + +TreeNodeReg = Dict[str, Any] class DecisionTreeRegressor: @@ -17,27 +26,27 @@ class DecisionTreeRegressor: Examples -------- - >>> X = np.array([[1], [2], [3], [4], [5]]) + >>> x = np.array([[1.0], [2.0], [3.0], [4.0], [5.0]]) >>> y = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) >>> tree = DecisionTreeRegressor(max_depth=2) - >>> tree.fit(X, y) - >>> predictions = tree.predict(X) - >>> np.allclose(predictions, y, atol=0.5) + >>> _ = tree.fit(x, y) + >>> preds = tree.predict(x) + >>> np.allclose(preds, y, atol=1.0) True """ - def __init__(self, max_depth=None, min_samples_split=2): - self.max_depth = max_depth - self.min_samples_split = min_samples_split - self.tree = None + def __init__(self, max_depth: Optional[int] = None, min_samples_split: int = 2) -> None: + self.max_depth: Optional[int] = max_depth + self.min_samples_split: int = min_samples_split + self.tree: Optional[TreeNodeReg] = None - def fit(self, X, y): + def fit(self, x: np.ndarray, y: np.ndarray) -> "DecisionTreeRegressor": """ - Build a decision tree regressor from the training set (X, y). + Build a decision tree regressor from the training set (x, y). Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) The training input samples. y : array-like of shape (n_samples,) The target values. @@ -47,16 +56,16 @@ def fit(self, X, y): self : object Fitted estimator. """ - self.tree = self._grow_tree(X, y) + self.tree = self._grow_tree(x, y) return self - def _grow_tree(self, X, y, depth=0): + def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNodeReg: """ Recursively grow the decision tree. Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) Training samples. y : array-like of shape (n_samples,) Target values. @@ -68,42 +77,39 @@ def _grow_tree(self, X, y, depth=0): node : dict A node in the decision tree. """ - n_samples, n_features = X.shape - + n_samples, n_features = x.shape # Stopping criteria if ( - depth == self.max_depth + (self.max_depth is not None and depth >= self.max_depth) or n_samples < self.min_samples_split or len(np.unique(y)) == 1 ): - return {"value": np.mean(y)} + return {"value": float(np.mean(y))} # Find the best split - best_split = self._best_split(X, y, n_features) + best_split = self._best_split(x, y, n_features) if best_split is None: - return {"value": np.mean(y)} + return {"value": float(np.mean(y))} # Recursively build the tree - left_indices = X[:, best_split["feature"]] <= best_split["threshold"] + left_indices = x[:, best_split["feature"]] <= best_split["threshold"] right_indices = ~left_indices - - left_subtree = self._grow_tree(X[left_indices], y[left_indices], depth + 1) - right_subtree = self._grow_tree(X[right_indices], y[right_indices], depth + 1) - + left_subtree = self._grow_tree(x[left_indices], y[left_indices], depth + 1) + right_subtree = self._grow_tree(x[right_indices], y[right_indices], depth + 1) return { - "feature": best_split["feature"], - "threshold": best_split["threshold"], + "feature": int(best_split["feature"]), + "threshold": float(best_split["threshold"]), "left": left_subtree, "right": right_subtree, } - def _best_split(self, X, y, n_features): + def _best_split(self, x: np.ndarray, y: np.ndarray, n_features: int) -> Optional[Dict[str, Any]]: """ Find the best feature and threshold to split on. Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) Training samples. y : array-like of shape (n_samples,) Target values. @@ -116,26 +122,21 @@ def _best_split(self, X, y, n_features): The best split configuration. """ best_mse = float("inf") - best_split = None - + best_split: Optional[Dict[str, Any]] = None for feature in range(n_features): - thresholds = np.unique(X[:, feature]) + thresholds = np.unique(x[:, feature]) for threshold in thresholds: - left_indices = X[:, feature] <= threshold + left_indices = x[:, feature] <= threshold right_indices = ~left_indices - if np.sum(left_indices) == 0 or np.sum(right_indices) == 0: continue - mse = self._calculate_mse(y[left_indices], y[right_indices], len(y)) - if mse < best_mse: best_mse = mse - best_split = {"feature": feature, "threshold": threshold} - + best_split = {"feature": int(feature), "threshold": float(threshold)} return best_split - def _calculate_mse(self, left_y, right_y, n_samples): + def _calculate_mse(self, left_y: np.ndarray, right_y: np.ndarray, n_samples: int) -> float: """ Calculate weighted mean squared error for a split. @@ -154,17 +155,17 @@ def _calculate_mse(self, left_y, right_y, n_samples): Weighted mean squared error. """ n_left, n_right = len(left_y), len(right_y) - mse_left = np.var(left_y) if n_left > 0 else 0 - mse_right = np.var(right_y) if n_right > 0 else 0 + mse_left = float(np.var(left_y)) if n_left > 0 else 0.0 + mse_right = float(np.var(right_y)) if n_right > 0 else 0.0 return (n_left / n_samples) * mse_left + (n_right / n_samples) * mse_right - def predict(self, X): + def predict(self, x: np.ndarray) -> np.ndarray: """ - Predict target values for X. + Predict target values for x. Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) The input samples. Returns @@ -172,9 +173,10 @@ def predict(self, X): y_pred : array-like of shape (n_samples,) The predicted values. """ - return np.array([self._predict_sample(sample, self.tree) for sample in X]) + assert self.tree is not None + return np.array([self._predict_sample(sample, self.tree) for sample in x]) - def _predict_sample(self, sample, tree): + def _predict_sample(self, sample: np.ndarray, tree: TreeNodeReg) -> float: """ Predict the target value for a single sample. @@ -191,11 +193,10 @@ def _predict_sample(self, sample, tree): The predicted value. """ if "value" in tree: - return tree["value"] - - if sample[tree["feature"]] <= tree["threshold"]: - return self._predict_sample(sample, tree["left"]) - return self._predict_sample(sample, tree["right"]) + return float(tree["value"]) + if sample[int(tree["feature"])] <= float(tree["threshold"]): + return self._predict_sample(sample, tree["left"]) # type: ignore[arg-type] + return self._predict_sample(sample, tree["right"]) # type: ignore[arg-type] class RandomForestRegressor: @@ -224,39 +225,37 @@ class RandomForestRegressor: Examples -------- - >>> X = np.array([[1, 2], [2, 3], [3, 4], [4, 5], [5, 6]]) - >>> y = np.array([1.5, 2.5, 3.5, 4.5, 5.5]) - >>> rf = RandomForestRegressor(n_estimators=5, max_depth=2, random_state=42) - >>> rf.fit(X, y) - >>> predictions = rf.predict(X) - >>> len(predictions) == len(y) - True - >>> np.all((predictions >= y.min()) & (predictions <= y.max())) - True + >>> x = np.array([[1.0, 2.0], [2.0, 3.0], [3.0, 4.0]]) + >>> y = np.array([1.5, 2.5, 3.5]) + >>> rf = RandomForestRegressor(n_estimators=3, max_depth=2, random_state=42) + >>> _ = rf.fit(x, y) + >>> preds = rf.predict(x) + >>> preds.shape + (3,) """ def __init__( self, - n_estimators=100, - max_depth=None, - min_samples_split=2, - max_features="sqrt", - random_state=None, - ): - self.n_estimators = n_estimators - self.max_depth = max_depth - self.min_samples_split = min_samples_split - self.max_features = max_features - self.random_state = random_state - self.trees = [] - - def fit(self, X, y): + n_estimators: int = 100, + max_depth: Optional[int] = None, + min_samples_split: int = 2, + max_features: Optional["str|int"] = "sqrt", + random_state: Optional[int] = None, + ) -> None: + self.n_estimators: int = n_estimators + self.max_depth: Optional[int] = max_depth + self.min_samples_split: int = min_samples_split + self.max_features: Optional["str|int"] = max_features + self.random_state: Optional[int] = random_state + self.trees: List[Tuple[DecisionTreeRegressor, np.ndarray]] = [] + + def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestRegressor": """ - Build a random forest regressor from the training set (X, y). + Build a random forest regressor from the training set (x, y). Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) The training input samples. y : array-like of shape (n_samples,) The target values. @@ -266,48 +265,42 @@ def fit(self, X, y): self : object Fitted estimator. """ - np.random.seed(self.random_state) - X = np.array(X) + rng = np.random.default_rng(self.random_state) + x = np.array(x) y = np.array(y) - - n_samples, n_features = X.shape - + n_samples, n_features = x.shape # Determine max_features if self.max_features == "sqrt": max_features = int(np.sqrt(n_features)) elif self.max_features is None: max_features = n_features + elif isinstance(self.max_features, int): + max_features = int(self.max_features) else: - max_features = self.max_features + raise ValueError("max_features must be int, 'sqrt', or None") self.trees = [] for _ in range(self.n_estimators): # Bootstrap sampling - indices = np.random.choice(n_samples, n_samples, replace=True) - X_bootstrap = X[indices] + indices = rng.choice(n_samples, n_samples, replace=True) + x_bootstrap = x[indices] y_bootstrap = y[indices] - # Feature sampling - feature_indices = np.random.choice(n_features, max_features, replace=False) - X_bootstrap = X_bootstrap[:, feature_indices] - + feature_indices = rng.choice(n_features, max_features, replace=False) + x_bootstrap = x_bootstrap[:, feature_indices] # Train decision tree - tree = DecisionTreeRegressor( - max_depth=self.max_depth, min_samples_split=self.min_samples_split - ) - tree.fit(X_bootstrap, y_bootstrap) - + tree = DecisionTreeRegressor(max_depth=self.max_depth, min_samples_split=self.min_samples_split) + tree.fit(x_bootstrap, y_bootstrap) self.trees.append((tree, feature_indices)) - return self - def predict(self, X): + def predict(self, x: np.ndarray) -> np.ndarray: """ - Predict target values for X. + Predict target values for x. Parameters ---------- - X : array-like of shape (n_samples, n_features) + x : array-like of shape (n_samples, n_features) The input samples. Returns @@ -315,15 +308,13 @@ def predict(self, X): y_pred : array-like of shape (n_samples,) The predicted values (average of all tree predictions). """ - X = np.array(X) - predictions = [] - + x = np.array(x) + preds: List[np.ndarray] = [] for tree, feature_indices in self.trees: - X_subset = X[:, feature_indices] - predictions.append(tree.predict(X_subset)) - + x_subset = x[:, feature_indices] + preds.append(tree.predict(x_subset)) # Average predictions from all trees - return np.mean(predictions, axis=0) + return np.mean(preds, axis=0) if __name__ == "__main__": @@ -333,30 +324,25 @@ def predict(self, X): # Example usage from sklearn.datasets import make_regression - from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, r2_score + from sklearn.model_selection import train_test_split # Generate synthetic regression data - X, y = make_regression( - n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42 - ) + x, y = make_regression(n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42) # Split the data - X_train, X_test, y_train, y_test = train_test_split( - X, y, test_size=0.3, random_state=42 - ) + x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42) # Train the Random Forest Regressor rf_regressor = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42) - rf_regressor.fit(X_train, y_train) + rf_regressor.fit(x_train, y_train) # Make predictions - y_pred = rf_regressor.predict(X_test) + y_pred = rf_regressor.predict(x_test) # Evaluate the model mse = mean_squared_error(y_test, y_pred) r2 = r2_score(y_test, y_pred) - print(f"Mean Squared Error: {mse:.2f}") print(f"R² Score: {r2:.2f}") print(f"Number of trees: {len(rf_regressor.trees)}") From 5e0f844cb2e493bda787ce65739f8296c458e089 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 21 Oct 2025 02:45:22 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- machine_learning/random_forest_regressor.py | 30 ++++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/machine_learning/random_forest_regressor.py b/machine_learning/random_forest_regressor.py index 064020e26ae8..db75d9bd2ff8 100644 --- a/machine_learning/random_forest_regressor.py +++ b/machine_learning/random_forest_regressor.py @@ -4,6 +4,7 @@ - https://en.wikipedia.org/wiki/Random_forest - https://en.wikipedia.org/wiki/Decision_tree_learning """ + from __future__ import annotations from typing import Any, Dict, List, Optional, Sequence, Tuple @@ -35,7 +36,9 @@ class DecisionTreeRegressor: True """ - def __init__(self, max_depth: Optional[int] = None, min_samples_split: int = 2) -> None: + def __init__( + self, max_depth: Optional[int] = None, min_samples_split: int = 2 + ) -> None: self.max_depth: Optional[int] = max_depth self.min_samples_split: int = min_samples_split self.tree: Optional[TreeNodeReg] = None @@ -103,7 +106,9 @@ def _grow_tree(self, x: np.ndarray, y: np.ndarray, depth: int = 0) -> TreeNodeRe "right": right_subtree, } - def _best_split(self, x: np.ndarray, y: np.ndarray, n_features: int) -> Optional[Dict[str, Any]]: + def _best_split( + self, x: np.ndarray, y: np.ndarray, n_features: int + ) -> Optional[Dict[str, Any]]: """ Find the best feature and threshold to split on. @@ -133,10 +138,15 @@ def _best_split(self, x: np.ndarray, y: np.ndarray, n_features: int) -> Optional mse = self._calculate_mse(y[left_indices], y[right_indices], len(y)) if mse < best_mse: best_mse = mse - best_split = {"feature": int(feature), "threshold": float(threshold)} + best_split = { + "feature": int(feature), + "threshold": float(threshold), + } return best_split - def _calculate_mse(self, left_y: np.ndarray, right_y: np.ndarray, n_samples: int) -> float: + def _calculate_mse( + self, left_y: np.ndarray, right_y: np.ndarray, n_samples: int + ) -> float: """ Calculate weighted mean squared error for a split. @@ -289,7 +299,9 @@ def fit(self, x: np.ndarray, y: np.ndarray) -> "RandomForestRegressor": feature_indices = rng.choice(n_features, max_features, replace=False) x_bootstrap = x_bootstrap[:, feature_indices] # Train decision tree - tree = DecisionTreeRegressor(max_depth=self.max_depth, min_samples_split=self.min_samples_split) + tree = DecisionTreeRegressor( + max_depth=self.max_depth, min_samples_split=self.min_samples_split + ) tree.fit(x_bootstrap, y_bootstrap) self.trees.append((tree, feature_indices)) return self @@ -328,10 +340,14 @@ def predict(self, x: np.ndarray) -> np.ndarray: from sklearn.model_selection import train_test_split # Generate synthetic regression data - x, y = make_regression(n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42) + x, y = make_regression( + n_samples=200, n_features=5, n_informative=3, noise=10, random_state=42 + ) # Split the data - x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.3, random_state=42) + x_train, x_test, y_train, y_test = train_test_split( + x, y, test_size=0.3, random_state=42 + ) # Train the Random Forest Regressor rf_regressor = RandomForestRegressor(n_estimators=10, max_depth=5, random_state=42)