# -*- coding: utf-8 -*- """ Cython linker with C solver """ # Author: Remi Flamary # # License: MIT License import numpy as np cimport numpy as np cimport cython cdef extern from "EMD.h": int EMD_wrap(int n1,int n2, double *X, double *Y,double *D, double *G, double* alpha, double* beta, double *cost, int max_iter) cdef enum ProblemType: INFEASIBLE, OPTIMAL, UNBOUNDED @cython.boundscheck(False) @cython.wraparound(False) def emd_c( np.ndarray[double, ndim=1, mode="c"] a,np.ndarray[double, ndim=1, mode="c"] b,np.ndarray[double, ndim=2, mode="c"] M, int max_iter): """ Solves the Earth Movers distance problem and returns the optimal transport matrix gamm=emd(a,b,M) .. math:: \gamma = arg\min_\gamma <\gamma,M>_F s.t. \gamma 1 = a \gamma^T 1= b \gamma\geq 0 where : - M is the metric cost matrix - a and b are the sample weights Parameters ---------- a : (ns,) ndarray, float64 source histogram b : (nt,) ndarray, float64 target histogram M : (ns,nt) ndarray, float64 loss matrix max_iter : int The maximum number of iterations before stopping the optimization algorithm if it has not converged. Returns ------- gamma: (ns x nt) ndarray Optimal transportation matrix for the given parameters """ cdef int n1= M.shape[0] cdef int n2= M.shape[1] cdef double cost=0 cdef np.ndarray[double, ndim=2, mode="c"] G=np.zeros([n1, n2]) cdef np.ndarray[double, ndim=1, mode="c"] alpha=np.zeros(n1) cdef np.ndarray[double, ndim=1, mode="c"] beta=np.zeros(n2) if not len(a): a=np.ones((n1,))/n1 if not len(b): b=np.ones((n2,))/n2 # calling the function cdef int resultSolver = EMD_wrap(n1,n2, a.data, b.data, M.data, G.data, alpha.data, beta.data, &cost, max_iter) if resultSolver != OPTIMAL: if resultSolver == INFEASIBLE: print("Problem infeasible. Try to increase numItermax.") elif resultSolver == UNBOUNDED: print("Problem unbounded") return G, alpha, beta @cython.boundscheck(False) @cython.wraparound(False) def emd2_c( np.ndarray[double, ndim=1, mode="c"] a,np.ndarray[double, ndim=1, mode="c"] b,np.ndarray[double, ndim=2, mode="c"] M, int max_iter): """ Solves the Earth Movers distance problem and returns the optimal transport loss gamm=emd(a,b,M) .. math:: \gamma = arg\min_\gamma <\gamma,M>_F s.t. \gamma 1 = a \gamma^T 1= b \gamma\geq 0 where : - M is the metric cost matrix - a and b are the sample weights Parameters ---------- a : (ns,) ndarray, float64 source histogram b : (nt,) ndarray, float64 target histogram M : (ns,nt) ndarray, float64 loss matrix max_iter : int The maximum number of iterations before stopping the optimization algorithm if it has not converged. Returns ------- gamma: (ns x nt) ndarray Optimal transportation matrix for the given parameters """ cdef int n1= M.shape[0] cdef int n2= M.shape[1] cdef double cost=0 cdef np.ndarray[double, ndim=2, mode="c"] G=np.zeros([n1, n2]) cdef np.ndarray[double, ndim = 1, mode = "c"] alpha = np.zeros([n1]) cdef np.ndarray[double, ndim = 1, mode = "c"] beta = np.zeros([n2]) if not len(a): a=np.ones((n1,))/n1 if not len(b): b=np.ones((n2,))/n2 # calling the function cdef int resultSolver = EMD_wrap(n1,n2, a.data, b.data, M.data, G.data, alpha.data, beta.data, &cost, max_iter) if resultSolver != OPTIMAL: if resultSolver == INFEASIBLE: print("Problem infeasible. Try to inscrease numItermax.") elif resultSolver == UNBOUNDED: print("Problem unbounded") return cost, alpha, beta