# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT. # See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details. # Author(s): Marc Glisse # # Copyright (C) 2020 Inria # # Modification(s): # - YYYY/MM Author: Description of the modification import numpy class KNearestNeighbors: """ Class wrapping several implementations for computing the k nearest neighbors in a point set. """ def __init__(self, k, return_index=True, return_distance=False, metric="euclidean", **kwargs): """ Args: k (int): number of neighbors (possibly including the point itself). return_index (bool): if True, return the index of each neighbor. return_distance (bool): if True, return the distance to each neighbor. implementation (str): choice of the library that does the real work. * 'keops' for a brute-force, CUDA implementation through pykeops. Useful when the dimension becomes large (10+) but the number of points remains low (less than a million). Only "minkowski" and its aliases are supported. * 'ckdtree' for scipy's cKDTree. Only "minkowski" and its aliases are supported. * 'sklearn' for scikit-learn's NearestNeighbors. Note that this provides in particular an option algorithm="brute". * 'hnsw' for hnswlib.Index. It can be very fast but does not provide guarantees. Only supports "euclidean" for now. * None will try to select a sensible one (scipy if possible, scikit-learn otherwise). metric (str): see `sklearn.neighbors.NearestNeighbors`. eps (float): relative error when computing nearest neighbors with the cKDTree. p (float): norm L^p on input points (including numpy.inf) if metric is "minkowski". Defaults to 2. n_jobs (int): number of jobs to schedule for parallel processing of nearest neighbors on the CPU. If -1 is given all processors are used. Default: 1. sort_results (bool): if True, then distances and indices of each point are sorted on return, so that the first column contains the closest points. Otherwise, neighbors are returned in an arbitrary order. Defaults to True. enable_autodiff (bool): if the input is a torch.tensor, jax.numpy.array or similar, this instructs the function to compute distances in a way that works with automatic differentiation. This is experimental and not supported for all implementations. kwargs: additional parameters are forwarded to the backends. """ self.k = k self.return_index = return_index self.return_distance = return_distance self.metric = metric self.params = kwargs # canonicalize if metric == "euclidean": self.params["p"] = 2 self.metric = "minkowski" elif metric == "manhattan": self.params["p"] = 1 self.metric = "minkowski" elif metric == "chebyshev": self.params["p"] = numpy.inf self.metric = "minkowski" elif metric == "minkowski": self.params["p"] = kwargs.get("p", 2) if self.params.get("implementation") in {"keops", "ckdtree"}: assert self.metric == "minkowski" if self.params.get("implementation") == "hnsw": assert self.metric == "minkowski" and self.params["p"] == 2 if not self.params.get("implementation"): if self.metric == "minkowski": self.params["implementation"] = "ckdtree" else: self.params["implementation"] = "sklearn" def fit_transform(self, X, y=None): return self.fit(X).transform(X) def fit(self, X, y=None): """ Args: X (numpy.array): coordinates for reference points. """ self.ref_points = X if self.params["implementation"] == "ckdtree": # sklearn could handle this, but it is much slower from scipy.spatial import cKDTree self.kdtree = cKDTree(X) if self.params["implementation"] == "sklearn" and self.metric != "precomputed": # FIXME: sklearn badly handles "precomputed" from sklearn.neighbors import NearestNeighbors nargs = { k: v for k, v in self.params.items() if k in {"p", "n_jobs", "metric_params", "algorithm", "leaf_size"} } self.nn = NearestNeighbors(self.k, metric=self.metric, **nargs) self.nn.fit(X) if self.params["implementation"] == "hnsw": import hnswlib self.graph = hnswlib.Index("l2", len(X[0])) # Actually returns squared distances self.graph.init_index( len(X), **{k: v for k, v in self.params.items() if k in {"ef_construction", "M", "random_seed"}} ) n = self.params.get("num_threads") if n is None: n = self.params.get("n_jobs", 1) self.params["num_threads"] = n self.graph.add_items(X, num_threads=n) return self def transform(self, X): """ Args: X (numpy.array): coordinates for query points, or distance matrix if metric is "precomputed". """ metric = self.metric k = self.k if metric == "precomputed": # scikit-learn could handle that, but they insist on calling fit() with an unused square array, which is too unnatural. if self.return_index: n_jobs = self.params.get("n_jobs", 1) # Supposedly numpy can be compiled with OpenMP and handle this, but nobody does that?! if n_jobs == 1: neighbors = numpy.argpartition(X, k - 1)[:, 0:k] if self.params.get("sort_results", True): X = numpy.take_along_axis(X, neighbors, axis=-1) ngb_order = numpy.argsort(X, axis=-1) neighbors = numpy.take_along_axis(neighbors, ngb_order, axis=-1) else: ngb_order = neighbors if self.return_distance: distances = numpy.take_along_axis(X, ngb_order, axis=-1) return neighbors, distances else: return neighbors else: from joblib import Parallel, delayed, effective_n_jobs from sklearn.utils import gen_even_slices slices = gen_even_slices(len(X), effective_n_jobs(-1)) parallel = Parallel(backend="threading", n_jobs=-1) if self.params.get("sort_results", True): def func(M): neighbors = numpy.argpartition(M, k - 1)[:, 0:k] Y = numpy.take_along_axis(M, neighbors, axis=-1) ngb_order = numpy.argsort(Y, axis=-1) return numpy.take_along_axis(neighbors, ngb_order, axis=-1) else: def func(M): return numpy.argpartition(M, k - 1)[:, 0:k] neighbors = numpy.concatenate(parallel(delayed(func)(X[s]) for s in slices)) if self.return_distance: distances = numpy.take_along_axis(X, neighbors, axis=-1) return neighbors, distances else: return neighbors if self.return_distance: n_jobs = self.params.get("n_jobs", 1) if n_jobs == 1: distances = numpy.partition(X, k - 1)[:, 0:k] if self.params.get("sort_results"): # partition is not guaranteed to sort the lower half, although it often does distances.sort(axis=-1) else: from joblib import Parallel, delayed, effective_n_jobs from sklearn.utils import gen_even_slices if self.params.get("sort_results"): def func(M): # Not partitioning in place, because we should not modify the user's array? r = numpy.partition(M, k - 1)[:, 0:k] r.sort(axis=-1) return r else: func = lambda M: numpy.partition(M, k - 1)[:, 0:k] slices = gen_even_slices(len(X), effective_n_jobs(-1)) parallel = Parallel(backend="threading", n_jobs=-1) distances = numpy.concatenate(parallel(delayed(func)(X[s]) for s in slices)) return distances return None if self.params["implementation"] == "hnsw": ef = self.params.get("ef") if ef is not None: self.graph.set_ef(ef) neighbors, distances = self.graph.knn_query(X, k, num_threads=self.params["num_threads"]) # The k nearest neighbors are always sorted. I couldn't find it in the doc, but the code calls searchKnn, # which returns a priority_queue, and then fills the return array backwards with top/pop on the queue. if self.return_index: if self.return_distance: return neighbors, numpy.sqrt(distances) else: return neighbors if self.return_distance: return numpy.sqrt(distances) return None if self.params["implementation"] == "keops": import torch from pykeops.torch import LazyTensor import eagerpy as ep # 'float64' is slow except on super expensive GPUs. Allow it with some param? queries = X X = ep.astensor(X) XX = torch.as_tensor(X.numpy(), dtype=torch.float32) if queries is self.ref_points: Y = X YY = XX else: Y = ep.astensor(self.ref_points) YY = torch.as_tensor(Y.numpy(), dtype=torch.float32) p = self.params["p"] if p == numpy.inf: # Requires pykeops 1.4 or later mat = (LazyTensor(XX[:, None, :]) - LazyTensor(YY[None, :, :])).abs().max(-1) elif p == 2: # Any even integer? mat = ((LazyTensor(XX[:, None, :]) - LazyTensor(YY[None, :, :])) ** p).sum(-1) else: mat = ((LazyTensor(XX[:, None, :]) - LazyTensor(YY[None, :, :])).abs() ** p).sum(-1) # pykeops does not support autodiff for kmin yet :-( if self.params.get("enable_autodiff", False) and self.return_distance: # Compute the indices of the neighbors, and recompute the relevant distances autodiff-friendly. # Another strategy would be to compute the whole distance matrix with torch.cdist # and use neighbors as indices into it. neighbors = ep.astensor(mat.argKmin(k, dim=1)).numpy() neighbor_pts = Y[neighbors] diff = neighbor_pts - X[:, None, :] if p == numpy.inf: distances = diff.abs().max(-1) elif p == 2: distances = (diff ** 2).sum(-1) ** 0.5 else: distances = (diff.abs() ** p).sum(-1) ** (1.0 / p) if self.return_index: return neighbors.raw, distances.raw else: return distances.raw if self.return_index: if self.return_distance: distances, neighbors = mat.Kmin_argKmin(k, dim=1) if p != numpy.inf: distances = distances ** (1.0 / p) return neighbors, distances else: neighbors = mat.argKmin(k, dim=1) return neighbors if self.return_distance: distances = mat.Kmin(k, dim=1) if p != numpy.inf: distances = distances ** (1.0 / p) return distances return None if self.params["implementation"] == "ckdtree": qargs = {key: val for key, val in self.params.items() if key in {"p", "eps", "n_jobs"}} distances, neighbors = self.kdtree.query(X, k=self.k, **qargs) if self.return_index: if self.return_distance: return neighbors, distances else: return neighbors if self.return_distance: return distances return None assert self.params["implementation"] == "sklearn" if self.return_distance: distances, neighbors = self.nn.kneighbors(X, return_distance=True) if self.return_index: return neighbors, distances else: return distances if self.return_index: neighbors = self.nn.kneighbors(X, return_distance=False) return neighbors return None