Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 98 additions & 9 deletions machine_learning/k_nearest_neighbours.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
of the given point. In effect, the label of the given point is decided by a
majority vote.

This implementation uses the commonly used Euclidean distance metric, but other
distance metrics can also be used.
This implementation uses the Euclidean distance metric by default, and also
supports Manhattan (L1) and Minkowski (Lp) distances.

Reference: https://en.wikipedia.org/wiki/K-nearest_neighbors_algorithm
"""
Expand All @@ -16,8 +16,6 @@
from heapq import nsmallest

import numpy as np
from sklearn import datasets
from sklearn.model_selection import train_test_split


class KNN:
Expand All @@ -26,12 +24,42 @@ def __init__(
train_data: np.ndarray[float],
train_target: np.ndarray[int],
class_labels: list[str],
*,
distance_metric: str = "euclidean",
p: float = 2.0,
) -> None:
"""
Create a kNN classifier using the given training data and class labels

Parameters
----------
train_data : np.ndarray[float]
Training features.
train_target : np.ndarray[int]
Training labels as integer indices.
class_labels : list[str]
Mapping from label index to label name.
distance_metric : {"euclidean", "manhattan", "minkowski"}
Distance to use for neighbour search. Defaults to "euclidean".
p : float
Power parameter for Minkowski distance (Lp norm). Must be >= 1 when
distance_metric is "minkowski". Defaults to 2.0.
"""
self.data = zip(train_data, train_target)
# Store a reusable copy; zip() returns an iterator that would be
# exhausted after one classification otherwise.
self.data = list(zip(train_data, train_target))
self.labels = class_labels
self.distance_metric = distance_metric.lower()
self.p = float(p)

if self.distance_metric not in {"euclidean", "manhattan", "minkowski"}:
msg = (
"distance_metric must be one of {'euclidean', 'manhattan', 'minkowski'}"
)
raise ValueError(msg)
if self.distance_metric == "minkowski" and self.p < 1:
msg = "For Minkowski distance, p must be >= 1"
raise ValueError(msg)

@staticmethod
def _euclidean_distance(a: np.ndarray[float], b: np.ndarray[float]) -> float:
Expand All @@ -44,6 +72,30 @@ def _euclidean_distance(a: np.ndarray[float], b: np.ndarray[float]) -> float:
"""
return float(np.linalg.norm(a - b))

@staticmethod
def _manhattan_distance(a: np.ndarray[float], b: np.ndarray[float]) -> float:
"""
Calculate the Manhattan (L1) distance between two points
>>> KNN._manhattan_distance(np.array([0, 0]), np.array([3, 4]))
7.0
>>> KNN._manhattan_distance(np.array([1, 2, 3]), np.array([1, 8, 11]))
14.0
"""
return float(np.linalg.norm(a - b, ord=1))

@staticmethod
def _minkowski_distance(
a: np.ndarray[float], b: np.ndarray[float], p: float
) -> float:
"""
Calculate the Minkowski (Lp) distance between two points
>>> KNN._minkowski_distance(np.array([0, 0]), np.array([3, 4]), 2)
5.0
>>> KNN._minkowski_distance(np.array([0, 0]), np.array([3, 4]), 1)
7.0
"""
return float(np.linalg.norm(a - b, ord=p))

def classify(self, pred_point: np.ndarray[float], k: int = 5) -> str:
"""
Classify a given point using the kNN algorithm
Expand All @@ -56,12 +108,44 @@ def classify(self, pred_point: np.ndarray[float], k: int = 5) -> str:
>>> point = np.array([1.2, 1.2])
>>> knn.classify(point)
'A'
>>> # Manhattan distance yields the same class here
>>> knn_l1 = KNN(train_X, train_y, classes, distance_metric='manhattan')
>>> knn_l1.classify(point)
'A'
>>> # Minkowski with p=2 equals Euclidean
>>> knn_lp = KNN(train_X, train_y, classes, distance_metric='minkowski', p=2)
>>> knn_lp.classify(point)
'A'
>>> # Invalid distance metric
>>> try:
... _ = KNN(train_X, train_y, classes, distance_metric='chebyshev')
... except ValueError as e:
... 'distance_metric' in str(e)
True
>>> # Invalid Minkowski power
>>> try:
... _ = KNN(train_X, train_y, classes, distance_metric='minkowski', p=0.5)
... except ValueError as e:
... 'p must be >=' in str(e)
True
"""
# Choose the distance function once
if self.distance_metric == "euclidean":

def dist_fn(a: np.ndarray[float]) -> float:
return self._euclidean_distance(a, pred_point)
elif self.distance_metric == "manhattan":

def dist_fn(a: np.ndarray[float]) -> float:
return self._manhattan_distance(a, pred_point)
else: # minkowski
p = self.p

def dist_fn(a: np.ndarray[float]) -> float:
return self._minkowski_distance(a, pred_point, p)

# Distances of all points from the point to be classified
distances = (
(self._euclidean_distance(data_point[0], pred_point), data_point[1])
for data_point in self.data
)
distances = ((dist_fn(dp), lbl) for dp, lbl in self.data)

# Choosing k points with the shortest distances
votes = (i[1] for i in nsmallest(k, distances))
Expand All @@ -76,6 +160,11 @@ def classify(self, pred_point: np.ndarray[float], k: int = 5) -> str:

doctest.testmod()

# Optional demo using scikit-learn's iris dataset. Kept under __main__ to
# avoid making scikit-learn a hard dependency for importing this module.
from sklearn import datasets
from sklearn.model_selection import train_test_split

iris = datasets.load_iris()

X = np.array(iris["data"])
Expand Down