class H1:
def __init__(self, epsilon):
self.epsilon = epsilon
self.b = np.random.uniform(0.0, epsilon)
def perform_hash(self, x):
return np.floor((x + self.b) / self.epsilon).astype('int32')
class H2:
def __init__(self, N, cH):
self.cHN = N * cH
def hash_entry(self, sample):
native_hash = hash(tuple(sample))
result = np.mod(native_hash, self.cHN)
return result
def perform_hash(self, x):
result = map(self.hash_entry, x)
return list(result)
class EDGE:
def __init__(self):
self.N = X.shape[0]
self.cH = 4
#self.epsilon = 0.08
epsilon_est = self.N ** (-1/(2 *X.shape[1]))
self.epsilon = epsilon_est
#print(epsilon_est)
self.n_buckets = X.shape[0] * self.cH
self.h1 = H1(self.epsilon)
self.h2 = H2(self.N, self.cH)
@timeit
def _g(self, x):
result = dict()
for (x_idx, y_idx), w_ij in x.items():
result[(x_idx, y_idx)] = x[(x_idx, y_idx)] * np.log(x[(x_idx, y_idx)])
return result
@timeit
def _count_collisions(self, X, Y):
counts_i = np.zeros(self.n_buckets).astype('int32')
counts_j = np.zeros(self.n_buckets).astype('int32')
counts_ij = dict()
h1_applied_x = self.h1.perform_hash(X)
h1_applied_y = self.h1.perform_hash(Y)
h2_applied_x = self.h2.perform_hash(h1_applied_x)
h2_applied_y = self.h2.perform_hash(h1_applied_y)
for k in range(self.N):
h_x = h2_applied_x[k]
h_y = h2_applied_y[k]
counts_i[h_x] += 1
counts_j[h_y] += 1
if (h_x, h_y) in counts_ij.keys():
counts_ij[(h_x, h_y)] += 1
else:
counts_ij[(h_x, h_y)] = 1
return counts_i, counts_j, counts_ij
@timeit
def _compute_edge_weights(self, counts_i, counts_j, counts_ij):
w_i = counts_i / self.N
w_j = counts_j / self.N
edges = dict()
for (x_idx, y_idx), c_ij in counts_ij.items():
edges[(x_idx, y_idx)] = counts_i[x_idx] * counts_j[y_idx]
w_ij = dict()
for (x_idx, y_idx), c_ij in counts_ij.items():
w_ij[(x_idx, y_idx)] = counts_ij[(x_idx, y_idx)] * self.N / edges[(x_idx, y_idx)]
return w_i, w_j, w_ij
@timeit
def estimate_mi(self, X, Y):
counts_i, counts_j, counts_ij = self._count_collisions(X, Y)
w_i, w_j, w_ij = self._compute_edge_weights(counts_i, counts_j, counts_ij)
g_applied = self._g(w_ij)
# lower bound # used bins for Y
#used_bins_y = np.sum(counts_j[counts_j != 0])
#print(used_bins_y)
#U = np.ones_like(g_applied) * used_bins_y
#stacked = np.stack([g_applied, U])
#g_schlange = np.min(stacked, axis=0)
#print(len(g_applied[g_applied>0]))
MI = 0
for (i_idx, j_idx), w_ij in w_ij.items():
MI += w_i[i_idx] * w_j[j_idx] * g_applied[(i_idx, j_idx)]
return MI