summaryrefslogtreecommitdiff
path: root/test/test_unbalanced.py
blob: b39e4578f0ab1b09bfae2bc6ddecd53325d9dc4a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
"""Tests for module Unbalanced OT with entropy regularization"""

# Author: Hicham Janati <hicham.janati@inria.fr>
#
# License: MIT License

import numpy as np
import ot
import pytest


@pytest.mark.parametrize("method", ["sinkhorn"])
def test_unbalanced_convergence(method):
    # test generalized sinkhorn for unbalanced OT
    n = 100
    rng = np.random.RandomState(42)

    x = rng.randn(n, 2)
    a = ot.utils.unif(n)

    # make dists unbalanced
    b = ot.utils.unif(n) * 1.5

    M = ot.dist(x, x)
    epsilon = 1.
    alpha = 1.
    K = np.exp(- M / epsilon)

    G, log = ot.unbalanced.sinkhorn_unbalanced(a, b, M, reg=epsilon, alpha=alpha,
                                               stopThr=1e-10, method=method,
                                               log=True)

    # check fixed point equations
    fi = alpha / (alpha + epsilon)
    v_final = (b / K.T.dot(log["u"])) ** fi
    u_final = (a / K.dot(log["v"])) ** fi

    np.testing.assert_allclose(
        u_final, log["u"], atol=1e-05)
    np.testing.assert_allclose(
        v_final, log["v"], atol=1e-05)


def test_unbalanced_barycenter():
    # test generalized sinkhorn for unbalanced OT barycenter
    n = 100
    rng = np.random.RandomState(42)

    x = rng.randn(n, 2)
    A = rng.rand(n, 2)

    # make dists unbalanced
    A = A * np.array([1, 2])[None, :]
    M = ot.dist(x, x)
    epsilon = 1.
    alpha = 1.
    K = np.exp(- M / epsilon)

    q, log = ot.unbalanced.barycenter_unbalanced(A, M, reg=epsilon, alpha=alpha,
                                                 stopThr=1e-10,
                                                 log=True)

    # check fixed point equations
    fi = alpha / (alpha + epsilon)
    v_final = (q[:, None] / K.T.dot(log["u"])) ** fi
    u_final = (A / K.dot(log["v"])) ** fi

    np.testing.assert_allclose(
        u_final, log["u"], atol=1e-05)
    np.testing.assert_allclose(
        v_final, log["v"], atol=1e-05)