1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
import ot
from sklearn.preprocessing import normalize
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import euclidean_distances
from sklearn.externals.joblib import Parallel, delayed
from sklearn.utils import check_array
from sklearn.metrics.scorer import check_scoring
from pathos.multiprocessing import ProcessingPool as Pool
from sklearn.metrics import euclidean_distances
import numpy as np
class Wasserstein_Retriever(KNeighborsClassifier):
"""
Implements a nearest neighbors classifier for input distributions using the Wasserstein distance as metric.
Source and target distributions are l_1 normalized before computing the Wasserstein distance.
Wasserstein is parametrized by the distances between the individual points of the distributions.
"""
def __init__(self, W_embed, n_neighbors=1, n_jobs=1, verbose=False, sinkhorn= False, sinkhorn_reg=0.1):
"""
Initialization of the class.
Arguments
---------
W_embed: embeddings of the words, np.array
verbose: True/False
"""
self.sinkhorn = sinkhorn
self.sinkhorn_reg = sinkhorn_reg
self.W_embed = W_embed
self.verbose = verbose
super(Wasserstein_Retriever, self).__init__(n_neighbors=n_neighbors, n_jobs=n_jobs, metric='precomputed', algorithm='brute')
def _wmd(self, i, row, X_train):
union_idx = np.union1d(X_train[i].indices, row.indices)
W_minimal = self.W_embed[union_idx]
W_dist = euclidean_distances(W_minimal)
bow_i = X_train[i, union_idx].A.ravel()
bow_j = row[:, union_idx].A.ravel()
if self.sinkhorn:
return ot.sinkhorn2(bow_i, bow_j, W_dist, self.sinkhorn_reg, numItermax=50, method='sinkhorn_stabilized',)[0]
else:
return ot.emd2(bow_i, bow_j, W_dist)
def _wmd_row(self, row):
X_train = self._fit_X
n_samples_train = X_train.shape[0]
return [self._wmd(i, row, X_train) for i in range(n_samples_train)]
def _pairwise_wmd(self, X_test, X_train=None):
n_samples_test = X_test.shape[0]
if X_train is None:
X_train = self._fit_X
pool = Pool(nodes=self.n_jobs) # Parallelization of the calculation of the distances
dist = pool.map(self._wmd_row, X_test)
return np.array(dist)
def fit(self, X, y):
X = check_array(X, accept_sparse='csr', copy=True)
X = normalize(X, norm='l1', copy=False)
return super(Wasserstein_Retriever, self).fit(X, y)
def predict(self, X):
X = check_array(X, accept_sparse='csr', copy=True)
X = normalize(X, norm='l1', copy=False)
dist = self._pairwise_wmd(X)
return super(Wasserstein_Retriever, self).predict(dist)
def kneighbors(self, X, n_neighbors=1):
X = check_array(X, accept_sparse='csr', copy=True)
X = normalize(X, norm='l1', copy=False)
dist = self._pairwise_wmd(X)
return super(Wasserstein_Retriever, self).kneighbors(dist, n_neighbors)
|