From dd212c4565a196690a7285a3ae8cf1297e6002fc Mon Sep 17 00:00:00 2001 From: ruman23 Date: Mon, 20 Oct 2025 16:10:50 -0500 Subject: [PATCH 1/5] Add Gaussian Mixture Model (GMM) algorithm --- machine_learning/gaussian_mixture_model.py | 158 +++++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 machine_learning/gaussian_mixture_model.py diff --git a/machine_learning/gaussian_mixture_model.py b/machine_learning/gaussian_mixture_model.py new file mode 100644 index 000000000000..5ecd04462b37 --- /dev/null +++ b/machine_learning/gaussian_mixture_model.py @@ -0,0 +1,158 @@ +""" +README, Author - Md Ruman Islam (mailto:ruman23.github.io) +Requirements: + - numpy + - matplotlib +Python: + - 3.8+ +Inputs: + - X : a 2D numpy array of features. + - n_components : number of Gaussian distributions (clusters) to fit. + - max_iter : maximum number of EM iterations. + - tol : convergence tolerance. +Usage: + 1. define 'n_components' value and 'X' features array + 2. initialize model: + gmm = GaussianMixture(n_components=3, max_iter=100) + 3. fit model to data: + gmm.fit(X) + 4. get cluster predictions: + labels = gmm.predict(X) + 5. visualize results: + gmm.plot_results(X) +""" + +import warnings +import numpy as np +import matplotlib.pyplot as plt +from scipy.stats import multivariate_normal + +warnings.filterwarnings("ignore") + +TAG = "GAUSSIAN-MIXTURE/ " + + +class GaussianMixture: + """ + Gaussian Mixture Model implemented using the Expectation-Maximization algorithm. + """ + + def __init__(self, n_components=2, max_iter=100, tol=1e-4, seed=None): + self.n_components = n_components + self.max_iter = max_iter + self.tol = tol + self.seed = seed + + # parameters + self.weights_ = None + self.means_ = None + self.covariances_ = None + self.log_likelihoods_ = [] + + def _initialize_parameters(self, X): + """Randomly initialize means, covariances, and mixture weights""" + rng = np.random.default_rng(self.seed) + n_samples, n_features = X.shape + + indices = rng.choice(n_samples, self.n_components, replace=False) + self.means_ = X[indices] + + self.covariances_ = np.array( + [np.cov(X, rowvar=False) for _ in range(self.n_components)] + ) + self.weights_ = np.ones(self.n_components) / self.n_components + + def _e_step(self, X): + """Compute responsibilities (posterior probabilities)""" + n_samples = X.shape[0] + responsibilities = np.zeros((n_samples, self.n_components)) + + for k in range(self.n_components): + rv = multivariate_normal(mean=self.means_[k], cov=self.covariances_[k]) + responsibilities[:, k] = self.weights_[k] * rv.pdf(X) + + # Normalize to get probabilities + responsibilities /= responsibilities.sum(axis=1, keepdims=True) + return responsibilities + + def _m_step(self, X, responsibilities): + """Update weights, means, and covariances""" + n_samples, n_features = X.shape + Nk = responsibilities.sum(axis=0) + + self.weights_ = Nk / n_samples + self.means_ = (responsibilities.T @ X) / Nk[:, np.newaxis] + + for k in range(self.n_components): + diff = X - self.means_[k] + self.covariances_[k] = (responsibilities[:, k][:, np.newaxis] * diff).T @ diff + self.covariances_[k] /= Nk[k] + # Add small regularization term for numerical stability + self.covariances_[k] += np.eye(n_features) * 1e-6 + + def _compute_log_likelihood(self, X): + """Compute total log-likelihood of the model""" + n_samples = X.shape[0] + total_pdf = np.zeros((n_samples, self.n_components)) + + for k in range(self.n_components): + rv = multivariate_normal(mean=self.means_[k], cov=self.covariances_[k]) + total_pdf[:, k] = self.weights_[k] * rv.pdf(X) + + log_likelihood = np.sum(np.log(np.sum(total_pdf, axis=1) + 1e-12)) + return log_likelihood + + def fit(self, X): + """Fit the Gaussian Mixture Model to data using the EM algorithm""" + self._initialize_parameters(X) + + prev_log_likelihood = None + + for i in range(self.max_iter): + # E-step + responsibilities = self._e_step(X) + + # M-step + self._m_step(X, responsibilities) + + # Log-likelihood + log_likelihood = self._compute_log_likelihood(X) + self.log_likelihoods_.append(log_likelihood) + + if prev_log_likelihood is not None: + if abs(log_likelihood - prev_log_likelihood) < self.tol: + print(f"{TAG}Converged at iteration {i}.") + break + prev_log_likelihood = log_likelihood + + print(f"{TAG}Training complete. Final log-likelihood: {log_likelihood:.4f}") + + def predict(self, X): + """Predict cluster assignment for each data point""" + responsibilities = self._e_step(X) + return np.argmax(responsibilities, axis=1) + + def plot_results(self, X): + """Visualize GMM clustering results (2D only)""" + if X.shape[1] != 2: + print(f"{TAG}Plotting only supported for 2D data.") + return + + labels = self.predict(X) + plt.scatter(X[:, 0], X[:, 1], c=labels, cmap="viridis", s=30) + plt.scatter(self.means_[:, 0], self.means_[:, 1], c="red", s=100, marker="x") + plt.title("Gaussian Mixture Model Clustering") + plt.xlabel("Feature 1") + plt.ylabel("Feature 2") + plt.show() + + +# Mock test +if __name__ == "__main__": + from sklearn.datasets import make_blobs + + X, _ = make_blobs(n_samples=300, centers=3, cluster_std=1.2, random_state=42) + gmm = GaussianMixture(n_components=3, max_iter=100, seed=42) + gmm.fit(X) + labels = gmm.predict(X) + gmm.plot_results(X) \ No newline at end of file From 872e5e3e6e0440d133d367c8b6060f089bdaf31c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 20 Oct 2025 21:24:18 +0000 Subject: [PATCH 2/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- machine_learning/gaussian_mixture_model.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/machine_learning/gaussian_mixture_model.py b/machine_learning/gaussian_mixture_model.py index 5ecd04462b37..086f4aafbc98 100644 --- a/machine_learning/gaussian_mixture_model.py +++ b/machine_learning/gaussian_mixture_model.py @@ -85,7 +85,9 @@ def _m_step(self, X, responsibilities): for k in range(self.n_components): diff = X - self.means_[k] - self.covariances_[k] = (responsibilities[:, k][:, np.newaxis] * diff).T @ diff + self.covariances_[k] = ( + responsibilities[:, k][:, np.newaxis] * diff + ).T @ diff self.covariances_[k] /= Nk[k] # Add small regularization term for numerical stability self.covariances_[k] += np.eye(n_features) * 1e-6 @@ -155,4 +157,4 @@ def plot_results(self, X): gmm = GaussianMixture(n_components=3, max_iter=100, seed=42) gmm.fit(X) labels = gmm.predict(X) - gmm.plot_results(X) \ No newline at end of file + gmm.plot_results(X) From 2d6ac15ab25b150847dfc1082830d38a4efa29f3 Mon Sep 17 00:00:00 2001 From: ruman23 Date: Mon, 20 Oct 2025 17:08:47 -0500 Subject: [PATCH 3/5] Add trailing newline and finalize Gaussian Mixture Model implementation --- machine_learning/gaussian_mixture_model.py | 268 +++++++++++++++------ 1 file changed, 201 insertions(+), 67 deletions(-) diff --git a/machine_learning/gaussian_mixture_model.py b/machine_learning/gaussian_mixture_model.py index 086f4aafbc98..e600eeff9766 100644 --- a/machine_learning/gaussian_mixture_model.py +++ b/machine_learning/gaussian_mixture_model.py @@ -6,30 +6,33 @@ Python: - 3.8+ Inputs: - - X : a 2D numpy array of features. + - data : a 2D numpy array of features. - n_components : number of Gaussian distributions (clusters) to fit. - max_iter : maximum number of EM iterations. - tol : convergence tolerance. Usage: - 1. define 'n_components' value and 'X' features array + 1. define 'n_components' value and 'data' features array 2. initialize model: gmm = GaussianMixture(n_components=3, max_iter=100) 3. fit model to data: - gmm.fit(X) + gmm.fit(data) 4. get cluster predictions: - labels = gmm.predict(X) + labels = gmm.predict(data) 5. visualize results: - gmm.plot_results(X) + gmm.plot_results(data) """ import warnings -import numpy as np + import matplotlib.pyplot as plt +import numpy as np +from numpy.typing import NDArray from scipy.stats import multivariate_normal warnings.filterwarnings("ignore") TAG = "GAUSSIAN-MIXTURE/ " +FloatArray = NDArray[np.float64] class GaussianMixture: @@ -37,111 +40,240 @@ class GaussianMixture: Gaussian Mixture Model implemented using the Expectation-Maximization algorithm. """ - def __init__(self, n_components=2, max_iter=100, tol=1e-4, seed=None): - self.n_components = n_components - self.max_iter = max_iter - self.tol = tol - self.seed = seed + def __init__( + self, + n_components: int = 2, + max_iter: int = 100, + tol: float = 1e-4, + seed: int | None = None, + ) -> None: + self.n_components: int = n_components + self.max_iter: int = max_iter + self.tol: float = tol + self.seed: int | None = seed # parameters - self.weights_ = None - self.means_ = None - self.covariances_ = None - self.log_likelihoods_ = [] - - def _initialize_parameters(self, X): - """Randomly initialize means, covariances, and mixture weights""" + self.weights_: FloatArray | None = None + self.means_: FloatArray | None = None + self.covariances_: NDArray[np.float64] | None = None + self.log_likelihoods_: list[float] = [] + + def _initialize_parameters(self, data: FloatArray) -> None: + """Randomly initialize means, covariances, and mixture weights. + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, seed=0) + >>> model._initialize_parameters(sample) + >>> model.means_.shape + (2, 2) + >>> bool(np.isclose(model.weights_.sum(), 1.0)) + True + """ rng = np.random.default_rng(self.seed) - n_samples, n_features = X.shape + n_samples, _ = data.shape indices = rng.choice(n_samples, self.n_components, replace=False) - self.means_ = X[indices] + self.means_ = data[indices] + identity = np.eye(data.shape[1]) * 1e-6 self.covariances_ = np.array( - [np.cov(X, rowvar=False) for _ in range(self.n_components)] + [np.cov(data, rowvar=False) + identity for _ in range(self.n_components)] ) self.weights_ = np.ones(self.n_components) / self.n_components - def _e_step(self, X): - """Compute responsibilities (posterior probabilities)""" - n_samples = X.shape[0] + def _e_step(self, data: FloatArray) -> FloatArray: + """Compute responsibilities (posterior probabilities). + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, seed=0) + >>> model._initialize_parameters(sample) + >>> resp = model._e_step(sample) + >>> resp.shape + (4, 2) + >>> bool(np.allclose(resp.sum(axis=1), 1.0)) + True + """ + if self.weights_ is None or self.means_ is None or self.covariances_ is None: + raise ValueError( + "Model parameters must be initialized before running the E-step." + ) + + n_samples = data.shape[0] responsibilities = np.zeros((n_samples, self.n_components)) + weights = self.weights_ + means = self.means_ + covariances = self.covariances_ for k in range(self.n_components): - rv = multivariate_normal(mean=self.means_[k], cov=self.covariances_[k]) - responsibilities[:, k] = self.weights_[k] * rv.pdf(X) + rv = multivariate_normal( + mean=means[k], cov=covariances[k], allow_singular=True + ) + responsibilities[:, k] = weights[k] * rv.pdf(data) # Normalize to get probabilities responsibilities /= responsibilities.sum(axis=1, keepdims=True) return responsibilities - def _m_step(self, X, responsibilities): - """Update weights, means, and covariances""" - n_samples, n_features = X.shape - Nk = responsibilities.sum(axis=0) - - self.weights_ = Nk / n_samples - self.means_ = (responsibilities.T @ X) / Nk[:, np.newaxis] + def _m_step(self, data: FloatArray, responsibilities: FloatArray) -> None: + """Update weights, means, and covariances. + + Note: assumes the model parameters are already initialized. + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, seed=0) + >>> model._initialize_parameters(sample) + >>> resp = model._e_step(sample) + >>> model._m_step(sample, resp) + >>> bool(np.isclose(model.weights_.sum(), 1.0)) + True + """ + n_samples, n_features = data.shape + component_counts = responsibilities.sum(axis=0) + + self.weights_ = component_counts / n_samples + self.means_ = (responsibilities.T @ data) / component_counts[:, np.newaxis] + + if self.covariances_ is None or self.means_ is None: + raise ValueError( + "Model parameters must be initialized before running the M-step." + ) + + covariances = self.covariances_ + means = self.means_ for k in range(self.n_components): - diff = X - self.means_[k] - self.covariances_[k] = ( - responsibilities[:, k][:, np.newaxis] * diff - ).T @ diff - self.covariances_[k] /= Nk[k] + diff = data - means[k] + covariances[k] = (responsibilities[:, k][:, np.newaxis] * diff).T @ diff + covariances[k] /= component_counts[k] # Add small regularization term for numerical stability - self.covariances_[k] += np.eye(n_features) * 1e-6 - - def _compute_log_likelihood(self, X): - """Compute total log-likelihood of the model""" - n_samples = X.shape[0] + covariances[k] += np.eye(n_features) * 1e-6 + + def _compute_log_likelihood(self, data: FloatArray) -> float: + """Compute total log-likelihood of the model. + + Note: assumes the model parameters are already initialized. + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, seed=0) + >>> model._initialize_parameters(sample) + >>> bool(np.isfinite(model._compute_log_likelihood(sample))) + True + """ + if self.weights_ is None or self.means_ is None or self.covariances_ is None: + raise ValueError( + "Model parameters must be initialized before computing likelihood." + ) + + n_samples = data.shape[0] total_pdf = np.zeros((n_samples, self.n_components)) + weights = self.weights_ + means = self.means_ + covariances = self.covariances_ for k in range(self.n_components): - rv = multivariate_normal(mean=self.means_[k], cov=self.covariances_[k]) - total_pdf[:, k] = self.weights_[k] * rv.pdf(X) + rv = multivariate_normal( + mean=means[k], cov=covariances[k], allow_singular=True + ) + total_pdf[:, k] = weights[k] * rv.pdf(data) log_likelihood = np.sum(np.log(np.sum(total_pdf, axis=1) + 1e-12)) return log_likelihood - def fit(self, X): - """Fit the Gaussian Mixture Model to data using the EM algorithm""" - self._initialize_parameters(X) + def fit(self, data: FloatArray) -> None: + """Fit the Gaussian Mixture Model to data using the EM algorithm. + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, max_iter=5, tol=1e-3, seed=0) + >>> model.fit(sample) # doctest: +ELLIPSIS + GAUSSIAN-MIXTURE/ ... + >>> len(model.log_likelihoods_) > 0 + True + """ + self._initialize_parameters(data) prev_log_likelihood = None for i in range(self.max_iter): # E-step - responsibilities = self._e_step(X) + responsibilities = self._e_step(data) # M-step - self._m_step(X, responsibilities) + self._m_step(data, responsibilities) # Log-likelihood - log_likelihood = self._compute_log_likelihood(X) + log_likelihood = self._compute_log_likelihood(data) self.log_likelihoods_.append(log_likelihood) - if prev_log_likelihood is not None: - if abs(log_likelihood - prev_log_likelihood) < self.tol: - print(f"{TAG}Converged at iteration {i}.") - break + if ( + prev_log_likelihood is not None + and abs(log_likelihood - prev_log_likelihood) < self.tol + ): + print(f"{TAG}Converged at iteration {i}.") + break prev_log_likelihood = log_likelihood print(f"{TAG}Training complete. Final log-likelihood: {log_likelihood:.4f}") - def predict(self, X): - """Predict cluster assignment for each data point""" - responsibilities = self._e_step(X) + def predict(self, data: FloatArray) -> NDArray[np.int_]: + """Predict cluster assignment for each data point. + + Note: assumes the model parameters are already initialized. + + Examples + -------- + >>> sample = np.array( + ... [[0.0, 0.5], [1.0, 1.5], [2.0, 2.5], [3.0, 3.5]] + ... ) + >>> model = GaussianMixture(n_components=2, max_iter=5, tol=1e-3, seed=0) + >>> model.fit(sample) # doctest: +ELLIPSIS + GAUSSIAN-MIXTURE/ ... + >>> labels = model.predict(sample) + >>> labels.shape + (4,) + """ + responsibilities = self._e_step(data) return np.argmax(responsibilities, axis=1) - def plot_results(self, X): - """Visualize GMM clustering results (2D only)""" - if X.shape[1] != 2: + def plot_results(self, data: FloatArray) -> None: + """Visualize GMM clustering results (2D only). + + Note: This method assumes self.means_ is initialized. + + Examples + -------- + >>> sample = np.ones((3, 3)) + >>> model = GaussianMixture() + >>> model.plot_results(sample) + GAUSSIAN-MIXTURE/ Plotting only supported for 2D data. + """ + if data.shape[1] != 2: print(f"{TAG}Plotting only supported for 2D data.") return - labels = self.predict(X) - plt.scatter(X[:, 0], X[:, 1], c=labels, cmap="viridis", s=30) + labels = self.predict(data) + if self.means_ is None: + raise ValueError("Model means must be initialized before plotting.") + plt.scatter(data[:, 0], data[:, 1], c=labels, cmap="viridis", s=30) plt.scatter(self.means_[:, 0], self.means_[:, 1], c="red", s=100, marker="x") plt.title("Gaussian Mixture Model Clustering") plt.xlabel("Feature 1") @@ -153,8 +285,10 @@ def plot_results(self, X): if __name__ == "__main__": from sklearn.datasets import make_blobs - X, _ = make_blobs(n_samples=300, centers=3, cluster_std=1.2, random_state=42) + sample_data, _ = make_blobs( + n_samples=300, centers=3, cluster_std=1.2, random_state=42 + ) gmm = GaussianMixture(n_components=3, max_iter=100, seed=42) - gmm.fit(X) - labels = gmm.predict(X) - gmm.plot_results(X) + gmm.fit(sample_data) + labels = gmm.predict(sample_data) + gmm.plot_results(sample_data) From 4f78e3ef4c1229d8b716c3514d6b9964084986c4 Mon Sep 17 00:00:00 2001 From: ruman23 Date: Mon, 20 Oct 2025 17:21:04 -0500 Subject: [PATCH 4/5] Fix Mypy type alias issue for FloatArray --- machine_learning/gaussian_mixture_model.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/machine_learning/gaussian_mixture_model.py b/machine_learning/gaussian_mixture_model.py index e600eeff9766..d52df5dcf013 100644 --- a/machine_learning/gaussian_mixture_model.py +++ b/machine_learning/gaussian_mixture_model.py @@ -23,6 +23,7 @@ """ import warnings +from typing import TypeAlias import matplotlib.pyplot as plt import numpy as np @@ -32,7 +33,7 @@ warnings.filterwarnings("ignore") TAG = "GAUSSIAN-MIXTURE/ " -FloatArray = NDArray[np.float64] +FloatArray: TypeAlias = NDArray[np.float64] class GaussianMixture: From 0f205dc62b7adad178d49c9de00990f7e91c93e3 Mon Sep 17 00:00:00 2001 From: ruman23 Date: Mon, 20 Oct 2025 17:23:44 -0500 Subject: [PATCH 5/5] Use modern 'type' syntax for FloatArray to satisfy Ruff UP040 --- machine_learning/gaussian_mixture_model.py | 24 ++++++++++++---------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/machine_learning/gaussian_mixture_model.py b/machine_learning/gaussian_mixture_model.py index d52df5dcf013..16c814efaa0f 100644 --- a/machine_learning/gaussian_mixture_model.py +++ b/machine_learning/gaussian_mixture_model.py @@ -23,7 +23,6 @@ """ import warnings -from typing import TypeAlias import matplotlib.pyplot as plt import numpy as np @@ -33,7 +32,6 @@ warnings.filterwarnings("ignore") TAG = "GAUSSIAN-MIXTURE/ " -FloatArray: TypeAlias = NDArray[np.float64] class GaussianMixture: @@ -54,12 +52,12 @@ def __init__( self.seed: int | None = seed # parameters - self.weights_: FloatArray | None = None - self.means_: FloatArray | None = None + self.weights_: NDArray[np.float64] | None = None + self.means_: NDArray[np.float64] | None = None self.covariances_: NDArray[np.float64] | None = None self.log_likelihoods_: list[float] = [] - def _initialize_parameters(self, data: FloatArray) -> None: + def _initialize_parameters(self, data: NDArray[np.float64]) -> None: """Randomly initialize means, covariances, and mixture weights. Examples @@ -86,7 +84,7 @@ def _initialize_parameters(self, data: FloatArray) -> None: ) self.weights_ = np.ones(self.n_components) / self.n_components - def _e_step(self, data: FloatArray) -> FloatArray: + def _e_step(self, data: NDArray[np.float64]) -> NDArray[np.float64]: """Compute responsibilities (posterior probabilities). Examples @@ -123,7 +121,11 @@ def _e_step(self, data: FloatArray) -> FloatArray: responsibilities /= responsibilities.sum(axis=1, keepdims=True) return responsibilities - def _m_step(self, data: FloatArray, responsibilities: FloatArray) -> None: + def _m_step( + self, + data: NDArray[np.float64], + responsibilities: NDArray[np.float64], + ) -> None: """Update weights, means, and covariances. Note: assumes the model parameters are already initialized. @@ -161,7 +163,7 @@ def _m_step(self, data: FloatArray, responsibilities: FloatArray) -> None: # Add small regularization term for numerical stability covariances[k] += np.eye(n_features) * 1e-6 - def _compute_log_likelihood(self, data: FloatArray) -> float: + def _compute_log_likelihood(self, data: NDArray[np.float64]) -> float: """Compute total log-likelihood of the model. Note: assumes the model parameters are already initialized. @@ -196,7 +198,7 @@ def _compute_log_likelihood(self, data: FloatArray) -> float: log_likelihood = np.sum(np.log(np.sum(total_pdf, axis=1) + 1e-12)) return log_likelihood - def fit(self, data: FloatArray) -> None: + def fit(self, data: NDArray[np.float64]) -> None: """Fit the Gaussian Mixture Model to data using the EM algorithm. Examples @@ -235,7 +237,7 @@ def fit(self, data: FloatArray) -> None: print(f"{TAG}Training complete. Final log-likelihood: {log_likelihood:.4f}") - def predict(self, data: FloatArray) -> NDArray[np.int_]: + def predict(self, data: NDArray[np.float64]) -> NDArray[np.int_]: """Predict cluster assignment for each data point. Note: assumes the model parameters are already initialized. @@ -255,7 +257,7 @@ def predict(self, data: FloatArray) -> NDArray[np.int_]: responsibilities = self._e_step(data) return np.argmax(responsibilities, axis=1) - def plot_results(self, data: FloatArray) -> None: + def plot_results(self, data: NDArray[np.float64]) -> None: """Visualize GMM clustering results (2D only). Note: This method assumes self.means_ is initialized.