summaryrefslogtreecommitdiff
path: root/src/python/gudhi/point_cloud/timedelay.py
blob: 5292e7524bdbbd07d833a9b5ee639302f4c1b4d9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# This file is part of the Gudhi Library - https://gudhi.inria.fr/ - which is released under MIT.
# See file LICENSE or go to https://gudhi.inria.fr/licensing/ for full license details.
# Author(s):       Martin Royer, Yuichi Ike, Masatoshi Takenouchi
#
# Copyright (C) 2020 Inria, Copyright (C) 2020 Fujitsu Laboratories Ltd.
# Modification(s):
#   - YYYY/MM Author: Description of the modification

import numpy as np


class TimeDelayEmbedding:
    """Point cloud transformation class. Embeds time-series data in the R^d according to
    `Takens' Embedding Theorem <https://en.wikipedia.org/wiki/Takens%27s_theorem>`_ and obtains the
    coordinates of each point.

    Parameters
    ----------
    dim : int, optional (default=3)
        `d` of R^d to be embedded.
    delay : int, optional (default=1)
        Time-Delay embedding.
    skip : int, optional (default=1)
        How often to skip embedded points.

    Example
    -------

    Given delay=3 and skip=2, a point cloud which is obtained by embedding
    a scalar time-series into R^3 is as follows::

        time-series = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        point cloud = [[1, 4, 7],
                       [3, 6, 9]]

    Given delay=1 and skip=1, a point cloud which is obtained by embedding
    a 2D vector time-series data into R^4 is as follows::

        time-series = [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
        point cloud = [[0, 1, 2, 3],
                       [2, 3, 4, 5],
                       [4, 5, 6, 7],
                       [6, 7, 8, 9]]
    """

    def __init__(self, dim=3, delay=1, skip=1):
        self._dim = dim
        self._delay = delay
        self._skip = skip

    def __call__(self, ts):
        """Transform method for single time-series data.

        Parameters
        ----------
        ts : Iterable[float] or Iterable[Iterable[float]]
            A single time-series data, with scalar or vector values.

        Returns
        -------
        point cloud : n x dim numpy arrays
            Makes point cloud from a single time-series data.
        """
        return self._transform(np.array(ts))

    def fit(self, ts, y=None):
        return self
    
    def _transform(self, ts):
        """Guts of transform method."""
        if ts.ndim == 1:
            repeat = self._dim
        else:
            assert self._dim % ts.shape[1] == 0
            repeat = self._dim // ts.shape[1]
        end = len(ts) - self._delay * (repeat - 1)
        short = np.arange(0, end, self._skip)
        vertical = np.arange(0, repeat * self._delay, self._delay)
        return ts[np.add.outer(short, vertical)].reshape(len(short), -1)

    def transform(self, ts):
        """Transform method for multiple time-series data.

        Parameters
        ----------
        ts : Iterable[Iterable[float]] or Iterable[Iterable[Iterable[float]]]
            Multiple time-series data, with scalar or vector values.

        Returns
        -------
        point clouds : list of n x dim numpy arrays
            Makes point cloud from each time-series data.
        """
        return [self._transform(np.array(s)) for s in ts]