Source code for deep_bottleneck.callbacks.loggingreporter

from tensorflow import keras
from tensorflow.python.keras import backend as K
import numpy as np

from collections import OrderedDict
from deep_bottleneck import utils


[docs]class LoggingReporter(keras.callbacks.Callback): def __init__(self, trn, tst, calculate_mi_for, batch_size, activation_fn, file_all_activations, do_save_func=None, *args, **kwargs): super().__init__(*args, **kwargs) self.trn = trn # Train data self.tst = tst # Test data self.calculate_mi_for = calculate_mi_for self.batch_size = batch_size self.activation_fn = activation_fn self.file_all_activations = file_all_activations if self.calculate_mi_for == "full_dataset": self.full = utils.construct_full_dataset(trn, tst) # do_save_func(epoch) should return True if we should save on that epoch self.do_save_func = do_save_func
[docs] def on_train_begin(self, logs={}): # Indexes of the layers which we keep track of. Basically, this will be any layer # which has a 'kernel' attribute, which is essentially the "Dense" or "Dense"-like layers self.layerixs = [] # Functions return activity of each layer self.layerfuncs = [] # Functions return weights of each layer self.layerweights = [] for lndx, l in enumerate(self.model.layers): if utils.is_dense_like(l): self.layerixs.append(lndx) self.layerfuncs.append(K.function(self.model.inputs, [l.output])) self.layerweights.append(l.kernel) input_tensors = [self.model.inputs[0], self.model.sample_weights[0], self.model.targets[0], K.learning_phase()] # Get gradients of all the relevant layers at once grads = self.model.optimizer.get_gradients(self.model.total_loss, self.layerweights) self.get_gradients = K.function(inputs=input_tensors, outputs=grads) # Get cross-entropy loss self.get_loss = K.function(inputs=input_tensors, outputs=[self.model.total_loss])
[docs] def on_epoch_begin(self, epoch, logs={}): if self.do_save_func is not None and not self.do_save_func(epoch): # Don't log this epoch self._log_gradients = False else: # We will log this epoch. For each batch in this epoch, we will save the gradients (in on_batch_begin) # We will then compute means and vars of these gradients self._log_gradients = True self._batch_weightnorm = [] self._batch_gradients = [[] for _ in self.model.layers[1:]] # Indexes of all the training data samples. These are shuffled and read-in in chunks of SGD_BATCHSIZE ixs = list(range(len(self.trn.X))) np.random.shuffle(ixs) self._batch_todo_ixs = ixs
[docs] def on_batch_begin(self, batch, logs={}): if not self._log_gradients: # We are not keeping track of batch gradients, so do nothing return # Sample a batch batchsize = self.batch_size cur_ixs = self._batch_todo_ixs[:batchsize] # Advance the indexing, so next on_batch_begin samples a different batch self._batch_todo_ixs = self._batch_todo_ixs[batchsize:] # Get gradients for this batch inputs = [self.trn.X[cur_ixs, :], # Inputs [1] * len(cur_ixs), # Uniform sample weights self.trn.Y[cur_ixs, :], # Outputs 1 # Training phase ] for lndx, g in enumerate(self.get_gradients(inputs)): # g is gradients for weights of lndx's layer oneDgrad = np.reshape(g, -1, 1) # Flatten to one dimensional vector self._batch_gradients[lndx].append(oneDgrad)
[docs] def on_epoch_end(self, epoch, logs={}): if self.do_save_func is not None and not self.do_save_func(epoch): # Don't log this epoch return # Get overall performance loss = {} for cdata, cdataname, istrain in ((self.trn, 'trn', 1), (self.tst, 'tst', 0)): loss[cdataname] = self.get_loss([cdata.X, [1] * len(cdata.X), cdata.Y, istrain])[0].flat[0] self.file_all_activations.create_group(str(epoch)) self.file_all_activations[str(epoch)].create_dataset('weights_norm', (len(self.layerixs),)) self.file_all_activations[str(epoch)].create_dataset('gradmean', (len(self.layerixs),)) self.file_all_activations[str(epoch)].create_dataset('gradstd', (len(self.layerixs),)) self.file_all_activations[str(epoch)].create_group('activations') for lndx, layerix in enumerate(self.layerixs): clayer = self.model.layers[layerix] self.file_all_activations[f'{epoch}/weights_norm'][lndx] = np.linalg.norm(K.get_value(clayer.kernel)) stackedgrads = np.stack(self._batch_gradients[lndx], axis=1) self.file_all_activations[f'{epoch}/gradmean'][lndx] = np.linalg.norm(stackedgrads.mean(axis=1)) self.file_all_activations[f'{epoch}/gradstd'][lndx] = np.linalg.norm(stackedgrads.std(axis=1)) # TODO Same "if" clause is in the estimatior, remove code duplication if self.calculate_mi_for == "full_dataset": self.file_all_activations[f'{epoch}/activations/'].create_dataset(str(lndx), data= self.layerfuncs[lndx]([self.full.X])[0]) elif self.calculate_mi_for == "test": self.file_all_activations[f'{epoch}/activations/'].create_dataset(str(lndx), data= self.layerfuncs[lndx]([self.tst.X])[0]) elif self.calculate_mi_for == "training": self.file_all_activations[f'{epoch}/activations/'].create_dataset(str(lndx), data= self.layerfuncs[lndx]([self.trn.X])[0])