Source code for deep_bottleneck.mi_estimator.base
from typing import *
import pandas as pd
import numpy as np
from deep_bottleneck import utils
from deep_bottleneck.mi_estimator import kde
[docs]class MutualInformationEstimator:
nats2bits = 1.0 / np.log(2)
"""Nats to bits conversion factor."""
def __init__(self, discretization_range, training_data, test_data, architecture, calculate_mi_for):
self.training_data = training_data
self.test_data = test_data
self.architecture = architecture
self.calculate_mi_for = calculate_mi_for
[docs] def compute_mi(self, file_all_activations) -> pd.DataFrame:
print(f'*** Start running {self.__class__.__name__}. ***')
print(file_all_activations["2"])
print(f'len of file activations: {len(file_all_activations)}')
for i in file_all_activations: print(file_all_activations[str(i)])
labels, one_hot_labels = self._construct_dataset()
# Proportion of instances that have a certain label.
label_weights = np.mean(one_hot_labels, axis=0)
label_masks = {}
for target_class in range(self.training_data.n_classes):
label_masks[target_class] = labels == target_class
n_layers = len(self.architecture) + 1 # + 1 for output layer
epoch_numbers = [int(value) for value in file_all_activations]
epoch_numbers = sorted(epoch_numbers)
measures = self._init_dataframe(epoch_numbers=epoch_numbers, n_layers=n_layers)
for epoch in epoch_numbers:
print(f'Estimating mutual information for epoch {epoch}.')
summary = file_all_activations[str(epoch)]
for layer_index in range(n_layers):
layer_activations = summary['activations'][str(layer_index)]
mi_with_input, mi_with_label = self._compute_mi_per_epoch_and_layer(layer_activations, label_weights,
label_masks)
measures.loc[(epoch, layer_index), 'MI_XM'] = mi_with_input
measures.loc[(epoch, layer_index), 'MI_YM'] = mi_with_label
return measures
def _construct_dataset(self):
# Y is a one-hot vector, y is a label vector.
if self.calculate_mi_for == "full_dataset":
full = utils.construct_full_dataset(self.training_data, self.test_data)
labels = full.y
one_hot_labels = full.Y
elif self.calculate_mi_for == "test":
labels = self.test_data.y
one_hot_labels = self.test_data.Y
elif self.calculate_mi_for == "training":
labels = self.training_data.y
one_hot_labels = self.training_data.Y
return labels, one_hot_labels
def _init_dataframe(self, epoch_numbers, n_layers):
info_measures = ['MI_XM', 'MI_YM']
index_base_keys = [epoch_numbers, list(range(n_layers))]
index = pd.MultiIndex.from_product(index_base_keys, names=['epoch', 'layer'])
measures = pd.DataFrame(index=index, columns=info_measures)
return measures
def _compute_mi_per_epoch_and_layer(self, activations, label_weights, label_masks) -> Tuple[float, float]:
activations = np.asarray(activations)
H_of_M = self._estimate_entropy(activations)
H_of_M_given_X = self._estimate_conditional_entropy(activations)
H_of_M_given_Y = self._compute_H_of_M_given_Y(activations, label_weights, label_masks)
mi_with_input = self.nats2bits * (H_of_M - H_of_M_given_X)
mi_with_label = self.nats2bits * (H_of_M - H_of_M_given_Y)
return mi_with_input, mi_with_label
def _compute_H_of_M_given_Y(self, activations, label_weights, label_masks):
H_of_M_given_Y = 0
for label, mask in label_masks.items():
H_of_M_for_specific_y = self._estimate_entropy(activations[mask])
H_of_M_given_Y += label_weights[label] * H_of_M_for_specific_y
return H_of_M_given_Y
def _estimate_entropy(self, data: np.array) -> float:
"""
Args:
data: The data to estimate entropy for.
Returns:
The estimated entropy.
"""
raise NotImplementedError
def _estimate_conditional_entropy(self, data: np.array) -> float:
raise NotImplementedError