summaryrefslogtreecommitdiff
path: root/src/python/gudhi
diff options
context:
space:
mode:
authormartinroyer <martinpierreroyer@gmail.com>2020-06-08 15:56:34 +0200
committermartinroyer <martinpierreroyer@gmail.com>2020-06-08 15:56:34 +0200
commit9c45abcdf165519c58d59556dea74fd9f27c8396 (patch)
tree344432ac89c2b3b04edddf2d8d6695c59938c985 /src/python/gudhi
parent434cc66fbf832ca492e5f0710c8afc67d681637a (diff)
ATOL introduction as finite vectorisation method
Diffstat (limited to 'src/python/gudhi')
-rw-r--r--src/python/gudhi/representations/vector_methods.py128
1 files changed, 125 insertions, 3 deletions
diff --git a/src/python/gudhi/representations/vector_methods.py b/src/python/gudhi/representations/vector_methods.py
index 46fee086..df66ffc3 100644
--- a/src/python/gudhi/representations/vector_methods.py
+++ b/src/python/gudhi/representations/vector_methods.py
@@ -1,16 +1,17 @@
# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
-# Author(s): Mathieu Carrière
+# Author(s): Mathieu Carrière, Martin Royer
#
-# Copyright (C) 2018-2019 Inria
+# Copyright (C) 2018-2020 Inria
#
# Modification(s):
-# - YYYY/MM Author: Description of the modification
+# - 2020/06 Martin: ATOL integration
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import MinMaxScaler, MaxAbsScaler
from sklearn.neighbors import DistanceMetric
+from sklearn.metrics import pairwise
from .preprocessing import DiagramScaler, BirthPersistenceTransform
@@ -574,3 +575,124 @@ class ComplexPolynomial(BaseEstimator, TransformerMixin):
numpy array with shape (**threshold**): output complex vector of coefficients.
"""
return self.fit_transform([diag])[0,:]
+
+def _lapl_contrast(measure, centers, inertias, eps=1e-8):
+ """contrast function for vectorising `measure` in ATOL"""
+ return np.exp(-np.sqrt(pairwise.pairwise_distances(measure, Y=centers) / (inertias + eps)))
+
+def _gaus_contrast(measure, centers, inertias, eps=1e-8):
+ """contrast function for vectorising `measure` in ATOL"""
+ return np.exp(-pairwise.pairwise_distances(measure, Y=centers) / (inertias + eps))
+
+def _indicator_contrast(diags, centers, inertias, eps=1e-8):
+ """contrast function for vectorising `measure` in ATOL"""
+ pair_dist = pairwise.pairwise_distances(diags, Y=centers)
+ flat_circ = (pair_dist < (inertias+eps)).astype(int)
+ robe_curve = np.positive((2-pair_dist/(inertias+eps))*((inertias+eps) < pair_dist).astype(int))
+ return flat_circ + robe_curve
+
+def _cloud_weighting(measure):
+ """automatic uniform weighting with mass 1 for `measure` in ATOL"""
+ return np.ones(shape=measure.shape[0])
+
+def _iidproba_weighting(measure):
+ """automatic uniform weighting with mass 1/N for `measure` in ATOL"""
+ return np.ones(shape=measure.shape[0]) / measure.shape[0]
+
+class Atol(BaseEstimator, TransformerMixin):
+ """
+ This class allows to vectorise measures (e.g. point clouds, persistence diagrams, etc) after a quantisation step.
+
+ ATOL paper: https://arxiv.org/abs/1909.13472
+ """
+ def __init__(self, quantiser, weighting_method="cloud", contrast="gaus"):
+ """
+ Constructor for the Atol measure vectorisation class.
+
+ Parameters:
+ quantiser (Object): Object with `fit` (sklearn API consistent) and `cluster_centers` and `n_clusters`
+ attributes (default: MiniBatchKMeans()). This object will be fitted by the function `fit`.
+ weighting_method (function): constant generic function for weighting the measure points
+ choose from {"cloud", "iidproba"}
+ (default: constant function, i.e. the measure is seen as a point cloud by default).
+ This will have no impact if weights are provided along with measures all the way: `fit` and `transform`.
+ contrast (string): constant function for evaluating proximity of a measure with respect to centers
+ choose from {"gaus", "lapl", "indi"}
+ (default: laplacian contrast function, see page 3 in the ATOL paper).
+ """
+ self.quantiser = quantiser
+ self.contrast = {
+ "gaus": _gaus_contrast,
+ "lapl": _lapl_contrast,
+ "indi": _indicator_contrast,
+ }.get(contrast, _gaus_contrast)
+ self.centers = np.ones(shape=(self.quantiser.n_clusters, 2))*np.inf
+ self.inertias = np.full(self.quantiser.n_clusters, np.nan)
+ self.weighting_method = {
+ "cloud" : _cloud_weighting,
+ "iidproba": _iidproba_weighting,
+ }.get(weighting_method, _cloud_weighting)
+
+ def fit(self, X, y=None, sample_weight=None):
+ """
+ Calibration step: fit centers to the sample measures and derive inertias between centers.
+
+ Parameters:
+ X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias
+ (measures can have different N).
+ y: Ignored, present for API consistency by convention.
+ sample_weight (list of numpy arrays): weights for each measure point in X, optional.
+ If None, the object's weighting_method will be used.
+
+ Returns:
+ self
+ """
+ if not hasattr(self.quantiser, 'fit'):
+ raise TypeError("quantiser %s has no `fit` attribute." % (self.quantiser))
+ if len(X) < self.quantiser.n_clusters:
+ # in case there are not enough observations for fitting the quantiser, we add random points in [0, 1]^2
+ # @Martin: perhaps this behaviour is to be externalised and a warning should be raised instead
+ random_points = np.random.rand(self.quantiser.n_clusters-len(X), X[0].shape[1])
+ X.append(random_points)
+ if sample_weight is None:
+ sample_weight = np.concatenate([self.weighting_method(measure) for measure in X])
+
+ measures_concat = np.concatenate(X)
+ self.quantiser.fit(X=measures_concat, sample_weight=sample_weight)
+ self.centers = self.quantiser.cluster_centers_
+ labels = np.argmin(pairwise.pairwise_distances(measures_concat, Y=self.centers), axis=1)
+ dist_centers = pairwise.pairwise_distances(self.centers)
+ np.fill_diagonal(dist_centers, np.inf)
+ self.inertias = np.min(dist_centers, axis=0)/2
+ return self
+
+ def __call__(self, measure, sample_weight=None):
+ """
+ Apply measure vectorisation on a single measure.
+
+ Parameters:
+ measure (n x d numpy array): input measure in R^d.
+
+ Returns:
+ numpy array in R^self.quantiser.n_clusters.
+ """
+ if sample_weight is None:
+ sample_weight = self.weighting_method(measure)
+ return np.sum(sample_weight * self.contrast(measure, self.centers, self.inertias.T).T, axis=1)
+
+ def transform(self, X, sample_weight=None):
+ """
+ Apply measure vectorisation on a list of measures.
+
+ Parameters:
+ X (list N x d numpy arrays): input measures in R^d from which to learn center locations and inertias
+ (measures can have different N).
+ sample_weight (list of numpy arrays): weights for each measure point in X, optional.
+ If None, the object's weighting_method will be used.
+
+ Returns:
+ numpy array with shape (number of measures) x (self.quantiser.n_clusters).
+ """
+ if sample_weight is None:
+ sample_weight = [self.weighting_method(measure) for measure in X]
+ return np.stack([self(measure, sample_weight=weight) for measure, weight in zip(X, sample_weight)])