Source code for astroNN.models.base_bayesian_cnn

import json
import os
import time
import warnings
from abc import ABC

import numpy as np
from tqdm import tqdm
import keras
from astroNN.config import (
    _KERAS_BACKEND,
    backend_framework,
)
from astroNN.config import _astroNN_MODEL_NAME
from astroNN.models.base_master_nn import NeuralNetMaster
from astroNN.nn.callbacks import VirutalCSVLogger
from astroNN.nn.layers import FastMCInference
from astroNN.nn.losses import (
    mean_absolute_error,
    mean_error,
    zeros_loss,
)
from astroNN.nn.metrics import categorical_accuracy, binary_accuracy
from astroNN.nn.numpy import sigmoid
from astroNN.nn.utilities import Normalizer
from astroNN.nn.utilities.generator import GeneratorMaster
from astroNN.shared.dict_tools import dict_np_to_dict_list, list_to_dict

from astroNN.nn.losses import (
    bayesian_binary_crossentropy_wrapper,
)
from astroNN.nn.losses import (
    bayesian_categorical_crossentropy_wrapper,
)
from astroNN.nn.losses import mse_lin_wrapper, mse_var_wrapper
from sklearn.model_selection import train_test_split

regularizers = keras.regularizers
ReduceLROnPlateau = keras.callbacks.ReduceLROnPlateau
Adam = keras.optimizers.Adam


class BayesianCNNDataGenerator(GeneratorMaster):
    """
    To generate data to NN

    :param batch_size: batch size
    :type batch_size: int
    :param shuffle: Whether to shuffle batches or not
    :type shuffle: bool
    :param data: List of data to NN
    :type data: list
    :param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
    :type manual_reset: bool
    :param sample_weight: Sample weights (if any)
    :type sample_weight: Union([NoneType, ndarray])
    :History:
        | 2017-Dec-02 - Written - Henry Leung (University of Toronto)
        | 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
    """

    def __init__(
        self,
        batch_size,
        shuffle,
        steps_per_epoch,
        data,
        manual_reset=False,
        sample_weight=None,
    ):
        super().__init__(
            batch_size=batch_size,
            shuffle=shuffle,
            steps_per_epoch=steps_per_epoch,
            data=data,
            manual_reset=manual_reset,
        )
        self.inputs = self.data[0]
        self.labels = self.data[1]
        self.sample_weight = sample_weight

        # initial idx
        self.idx_list = self._get_exploration_order(
            range(self.inputs["input"].shape[0])
        )

    def _data_generation(self, idx_list_temp):
        x = self.input_d_checking(self.inputs, idx_list_temp)
        if "labels_err" in x.keys():
            x.update({"labels_err": np.squeeze(x["labels_err"])})
        y = {}
        for name in self.labels.keys():
            y.update({name: self.labels[name][idx_list_temp]})
        if self.sample_weight is not None:
            return x, y, self.sample_weight[idx_list_temp]
        else:
            return x, y

    def __getitem__(self, index):
        return self._data_generation(
            self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
        )

    def on_epoch_end(self):
        # shuffle the list when epoch ends for the next epoch
        self.idx_list = self._get_exploration_order(
            range(self.inputs["input"].shape[0])
        )


class BayesianCNNPredDataGenerator(GeneratorMaster):
    """
    To generate data to NN for prediction

    :param batch_size: batch size
    :type batch_size: int
    :param shuffle: Whether to shuffle batches or not
    :type shuffle: bool
    :param data: List of data to NN
    :type data: list
    :param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
    :type manual_reset: bool
    :param pbar: tqdm progress bar
    :type pbar: obj
    :History:
        | 2017-Dec-02 - Written - Henry Leung (University of Toronto)
        | 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
    """

    def __init__(
        self, batch_size, shuffle, steps_per_epoch, data, manual_reset=False, pbar=None
    ):
        super().__init__(
            batch_size=batch_size,
            shuffle=shuffle,
            steps_per_epoch=steps_per_epoch,
            data=data,
            manual_reset=manual_reset,
        )
        self.inputs = self.data[0]
        self.pbar = pbar

        # initial idx
        self.idx_list = self._get_exploration_order(
            range(self.inputs[list(self.inputs.keys())[0]].shape[0])
        )
        self.current_idx = -1

    def _data_generation(self, idx_list_temp):
        # Generate data
        x = self.input_d_checking(self.inputs, idx_list_temp)
        return x

    def __getitem__(self, index):
        x = self._data_generation(
            self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
        )
        if self.pbar and index > self.current_idx:
            self.pbar.update(self.batch_size)
        self.current_idx = index
        return x

    def on_epoch_end(self):
        # shuffle the list when epoch ends for the next epoch
        self.idx_list = self._get_exploration_order(
            range(self.inputs[list(self.inputs.keys())[0]].shape[0])
        )


[docs] class BayesianCNNBase(NeuralNetMaster, ABC): """ Top-level class for a Bayesian convolutional neural network :History: 2018-Jan-06 - Written - Henry Leung (University of Toronto) """ def __init__(self): super().__init__() self.name = "Bayesian Convolutional Neural Network" self._model_type = "BCNN" self.initializer = None self.activation = None self._last_layer_activation = None self.num_filters = None self.filter_len = None self.pool_length = None self.num_hidden = None self.reduce_lr_epsilon = None self.reduce_lr_min = None self.reduce_lr_patience = None self.l1 = None self.l2 = None self.maxnorm = None self.inv_model_precision = None # inverse model precision self.dropout_rate = 0.2 self.length_scale = 3 # prior length scale self.mc_num = 100 # increased to 100 due to high performance VI on GPU implemented on 14 April 2018 (Henry) self.val_size = 0.1 self.disable_dropout = False self.aux_length = 0 self.input_norm_mode = 1 self.labels_norm_mode = 2 self.keras_model_predict = None def pre_training_checklist_child(self, input_data, labels, sample_weight): input_data, labels = self.pre_training_checklist_master(input_data, labels) # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(labels) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize(labels, calc=False) # No need to care about Magic number as loss function looks for magic num in y_true only norm_data.update( { "input_err": (input_data["input_err"] / self.input_std["input"]), "labels_err": input_data["labels_err"] / self.labels_std["output"], } ) norm_labels.update({"variance_output": norm_labels["output"]}) if ( self.keras_model is None ): # only compile if there is no keras_model, e.g. fine-tuning does not required self.compile() if self.has_val: self.train_idx, self.val_idx = train_test_split( np.arange(self.num_train + self.val_num), test_size=self.val_size ) else: self.train_idx = np.arange(self.num_train + self.val_num) # just dummy, to minimize modification needed self.val_idx = np.arange(self.num_train + self.val_num)[:2] norm_data_training = {} norm_data_val = {} norm_labels_training = {} norm_labels_val = {} for name in norm_data.keys(): norm_data_training.update({name: norm_data[name][self.train_idx]}) norm_data_val.update({name: norm_data[name][self.val_idx]}) for name in norm_labels.keys(): norm_labels_training.update({name: norm_labels[name][self.train_idx]}) norm_labels_val.update({name: norm_labels[name][self.val_idx]}) if sample_weight is not None: sample_weight_training = sample_weight[self.train_idx] sample_weight_val = sample_weight[self.val_idx] else: sample_weight_training = None sample_weight_val = None self.inv_model_precision = (2 * self.num_train * self.l2) / ( self.length_scale**2 * (1 - self.dropout_rate) ) self.training_generator = BayesianCNNDataGenerator( batch_size=self.batch_size, shuffle=True, steps_per_epoch=self.num_train // self.batch_size, data=[norm_data_training, norm_labels_training], manual_reset=False, sample_weight=sample_weight_training, ) if self.has_val: val_batchsize = ( self.batch_size if len(self.val_idx) > self.batch_size else len(self.val_idx) ) self.validation_generator = BayesianCNNDataGenerator( batch_size=val_batchsize, shuffle=False, steps_per_epoch=max(self.val_num // self.batch_size, 1), data=[norm_data_val, norm_labels_val], manual_reset=True, sample_weight=sample_weight_val, ) return ( norm_data_training, norm_data_val, norm_labels_training, norm_labels_val, sample_weight_training, sample_weight_val, ) def compile( self, optimizer=None, loss=None, metrics=None, weighted_metrics=None, loss_weights=None, ): if optimizer is not None: self.optimizer = optimizer elif self.optimizer is None or self.optimizer == "adam": self.optimizer = Adam( learning_rate=self.lr, beta_1=self.beta_1, beta_2=self.beta_2, epsilon=self.optimizer_epsilon, ) if metrics is not None: self.metrics = metrics if self.task == "regression": if self._last_layer_activation is None: self._last_layer_activation = "linear" elif self.task == "classification": if self._last_layer_activation is None: self._last_layer_activation = "softmax" elif self.task == "binary_classification": if self._last_layer_activation is None: self._last_layer_activation = "sigmoid" else: raise RuntimeError( 'Only "regression", "classification" and "binary_classification" are supported' ) ( self.keras_model, self.keras_model_predict, self.output_loss, self.variance_loss, ) = self.model() if self.task == "regression": self._output_loss = lambda predictive, labelerr: mse_lin_wrapper( predictive, labelerr ) elif self.task == "classification": self._output_loss = ( lambda predictive, labelerr: bayesian_categorical_crossentropy_wrapper( predictive ) ) elif self.task == "binary_classification": self._output_loss = ( lambda predictive, labelerr: bayesian_binary_crossentropy_wrapper( predictive ) ) else: raise RuntimeError( 'Only "regression", "classification" and "binary_classification" are supported' ) # all zero losss as dummy lose if self.task == "regression": self.metrics = ( [mean_absolute_error, mean_error] if not self.metrics else self.metrics ) if isinstance(self.metrics, list): new_metrics = {} # assuming for each output [output1, output2], apply each metric [metric1, metric2] to it # such that the output1 will be evaluated by both metric and output2 will be evaluated by both metric for i in self.keras_model.output_names: new_metrics.update({i: self.metrics}) self.metrics = new_metrics self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics=self.metrics, weighted_metrics=weighted_metrics, ) elif self.task == "classification": self.metrics = [categorical_accuracy] if not self.metrics else self.metrics self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics={"output": self.metrics}, weighted_metrics=weighted_metrics, ) elif self.task == "binary_classification": self.metrics = [binary_accuracy] if not self.metrics else self.metrics self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics={"output": self.metrics}, weighted_metrics=weighted_metrics, ) # inject custom training step if needed try: self.custom_train_step() except NotImplementedError: pass except TypeError: self.keras_model.train_step = self.custom_train_step # inject custom testing step if needed try: self.custom_test_step() except NotImplementedError: pass except TypeError: self.keras_model.test_step = self.custom_test_step return None
[docs] def recompile(self, weighted_metrics=None, loss_weights=None): """ To be used when you need to recompile a already existing model """ # all zero losss as dummy lose if self.task == "regression": self.metrics = ( [mean_absolute_error, mean_error] if not self.metrics else self.metrics ) self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics=self.metrics, weighted_metrics=weighted_metrics, ) elif self.task == "classification": self.metrics = [categorical_accuracy] if not self.metrics else self.metrics self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics={"output": self.metrics}, weighted_metrics=weighted_metrics, ) elif self.task == "binary_classification": self.metrics = [binary_accuracy] if not self.metrics else self.metrics self.keras_model.compile( optimizer=self.optimizer, loss=zeros_loss, metrics={"output": self.metrics}, weighted_metrics=weighted_metrics, )
[docs] def custom_train_step(self, data): """ Custom training logic :param data: :return: """ x, y, sample_weight = keras.utils.unpack_x_y_sample_weight(data) if _KERAS_BACKEND == "tensorflow": # Run forward pass. with backend_framework.GradientTape() as tape: y_pred = self.keras_model(x, training=True) # TODO: deal with sample weights loss = self._output_loss(y_pred["variance_output"], x["labels_err"])( y["output"], y_pred["output"] ) self.keras_model._loss_tracker.update_state(loss) if self.keras_model.optimizer is not None: loss = self.keras_model.optimizer.scale_loss(loss) gradients = tape.gradient(loss, self.keras_model.trainable_weights) # Update weights self.keras_model.optimizer.apply_gradients( zip(gradients, self.keras_model.trainable_weights) ) elif _KERAS_BACKEND == "torch": self.keras_model.zero_grad() y_pred = self.keras_model(x, training=True) loss = self._output_loss(y_pred["variance_output"], x["labels_err"])(y["output"], y_pred["output"]) loss.sum().backward() trainable_weights = [v for v in self.keras_model.trainable_weights] gradients = [v.value.grad for v in trainable_weights] # Update weights with backend_framework.no_grad(): self.keras_model.optimizer.apply_gradients( zip(gradients, self.keras_model.trainable_weights) ) else: raise RuntimeError( "Currently only tensorflow and torch backend are supported" ) # Update metrics # print(self.keras_model.metrics[1]._user_metrics["output"]) # for metric in self.keras_model.metrics[1]: # metric.update_state(y, y_pred) self.keras_model.metrics[1].update_state(y, y_pred) return self.keras_model.get_metrics_result()
def custom_test_step(self, data): x, y, sample_weight = keras.utils.unpack_x_y_sample_weight(data) y_pred = self.keras_model(x, training=False) # Updates stateful loss metrics. temploss = self._output_loss(y_pred["variance_output"], x["labels_err"]) # self.keras_model.compiled_loss._losses = temploss # self.keras_model.compiled_loss._losses = nest.map_structure( # self.keras_model.compiled_loss._get_loss_object, # self.keras_model.compiled_loss._losses, # ) # self.keras_model.compiled_loss._losses = nest.flatten( # self.keras_model.compiled_loss._losses # ) # self.keras_model.compiled_loss( # y, y_pred, sample_weight, regularization_losses=self.keras_model.losses # ) # self.keras_model.compiled_metrics.update_state(y, y_pred, sample_weight) # Collect metrics to return return_metrics = {} for metric in self.keras_model.metrics: result = metric.result() if isinstance(result, dict): return_metrics.update(result) else: return_metrics[metric.name] = result return return_metrics
[docs] def fit( self, input_data, labels, inputs_err=None, labels_err=None, sample_weight=None, ): """ Train a Bayesian neural network :param input_data: Data to be trained with neural network :type input_data: ndarray :param labels: Labels to be trained with neural network :type labels: ndarray :param inputs_err: Error for input_data (if any), same shape with input_data. :type inputs_err: Union([NoneType, ndarray]) :param labels_err: Labels error (if any) :type labels_err: Union([NoneType, ndarray]) :param sample_weight: Sample weights (if any) :type sample_weight: Union([NoneType, ndarray]) :return: None :rtype: NoneType :History: | 2018-Jan-06 - Written - Henry Leung (University of Toronto) | 2018-Apr-12 - Updated - Henry Leung (University of Toronto) """ if inputs_err is None: inputs_err = np.zeros_like(input_data) if labels_err is None: labels_err = np.zeros_like(labels) # TODO: allow named inputs too?? input_data = { "input": input_data, "input_err": inputs_err, "labels_err": labels_err, } labels = {"output": labels, "variance_output": labels} # Call the checklist to create astroNN folder and save parameters ( norm_data_training, norm_data_val, norm_labels_training, norm_labels_val, sample_weight_training, sample_weight_val, ) = self.pre_training_checklist_child(input_data, labels, sample_weight) # norm_data_training['labels_err'] = norm_data_training['labels_err'].filled(MAGIC_NUMBER).astype(np.float32) # TODO: fix the monitor name reduce_lr = ReduceLROnPlateau( monitor="val_output_mean_absolute_error", factor=0.5, min_delta=self.reduce_lr_epsilon, patience=self.reduce_lr_patience, min_lr=self.reduce_lr_min, mode="min", verbose=self.verbose, ) self.virtual_cvslogger = VirutalCSVLogger() self.__callbacks = [ reduce_lr, self.virtual_cvslogger, ] # default must have unchangeable callbacks if self.callbacks is not None: if isinstance(self.callbacks, list): self.__callbacks.extend(self.callbacks) else: self.__callbacks.append(self.callbacks) start_time = time.time() self.history = self.keras_model.fit( self.training_generator, validation_data=self.validation_generator, epochs=self.max_epochs, verbose=self.verbose, callbacks=self.__callbacks, ) print(f"Completed Training, {(time.time() - start_time):.{2}f}s in total") if self.autosave is True: # Call the post training checklist to save parameters self.save() return None
[docs] def fit_on_batch( self, input_data, labels, inputs_err=None, labels_err=None, sample_weight=None ): """ Train a Bayesian neural network by running a single gradient update on all of your data, suitable for fine-tuning :param input_data: Data to be trained with neural network :type input_data: ndarray :param labels: Labels to be trained with neural network :type labels: ndarray :param inputs_err: Error for input_data (if any), same shape with input_data. :type inputs_err: Union([NoneType, ndarray]) :param labels_err: Labels error (if any) :type labels_err: Union([NoneType, ndarray]) :param sample_weight: Sample weights (if any) :type sample_weight: Union([NoneType, ndarray]) :return: None :rtype: NoneType :History: | 2018-Aug-25 - Written - Henry Leung (University of Toronto) """ self.has_model_check() if inputs_err is None: inputs_err = np.zeros_like(input_data) if labels_err is None: labels_err = np.zeros_like(labels) input_data = { "input": input_data, "input_err": inputs_err, "labels_err": labels_err, } labels = {"output": labels, "variance_output": labels} # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(labels) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize(labels, calc=False) # No need to care about Magic number as loss function looks for magic num in y_true only norm_data.update( { "input_err": (input_data["input_err"] / self.input_std["input"]), "labels_err": input_data["labels_err"] / self.labels_std["output"], } ) norm_labels.update({"variance_output": norm_labels["output"]}) start_time = time.time() fit_generator = BayesianCNNDataGenerator( batch_size=input_data["input"].shape[0], shuffle=False, steps_per_epoch=1, data=[norm_data, norm_labels], sample_weight=sample_weight, ) score = self.keras_model.fit( fit_generator, epochs=1, verbose=self.verbose, ) print( f"Completed Training on Batch, {(time.time() - start_time):.{2}f}s in total" ) return None
def post_training_checklist_child(self): self.keras_model.save(self.fullfilepath + _astroNN_MODEL_NAME) print( _astroNN_MODEL_NAME + f" saved to {(self.fullfilepath + _astroNN_MODEL_NAME)}" ) self.hyper_txt.write(f"Dropout Rate: {self.dropout_rate} \n") self.hyper_txt.flush() self.hyper_txt.close() data = { "id": self.__class__.__name__ if self._model_identifier is None else self._model_identifier, "pool_length": self.pool_length, "filterlen": self.filter_len, "filternum": self.num_filters, "hidden": self.num_hidden, "input": self._input_shape, "labels": self._labels_shape, "task": self.task, "last_layer_activation": self._last_layer_activation, "activation": self.activation, "input_mean": dict_np_to_dict_list(self.input_mean), "inv_tau": self.inv_model_precision, "length_scale": self.length_scale, "labels_mean": dict_np_to_dict_list(self.labels_mean), "input_std": dict_np_to_dict_list(self.input_std), "labels_std": dict_np_to_dict_list(self.labels_std), "valsize": self.val_size, "targetname": self.targetname, "dropout_rate": self.dropout_rate, "l1": self.l1, "l2": self.l2, "maxnorm": self.maxnorm, "input_norm_mode": self.input_normalizer.normalization_mode, "labels_norm_mode": self.labels_normalizer.normalization_mode, "input_names": self.input_names, "output_names": self.output_names, "batch_size": self.batch_size, "aux_length": self.aux_length, } with open(self.fullfilepath + "/astroNN_model_parameter.json", "w") as f: json.dump(data, f, indent=4, sort_keys=True)
[docs] def predict(self, input_data, inputs_err=None, batch_size=None): """ Test model, High performance version designed for fast variational inference on GPU :param input_data: Data to be inferred with neural network :type input_data: ndarray :param inputs_err: Error for input_data, same shape with input_data. :type inputs_err: Union([NoneType, ndarray]) :return: prediction and prediction uncertainty :History: | 2018-Jan-06 - Written - Henry Leung (University of Toronto) | 2018-Apr-12 - Updated - Henry Leung (University of Toronto) """ self.has_model_check() if self.mc_num < 2: raise AttributeError("mc_num cannot be smaller than 2") # if no error array then just zeros if inputs_err is None: inputs_err = np.zeros_like(input_data) else: inputs_err = np.atleast_2d(inputs_err) inputs_err /= self.input_std["input"] # TODO: better way to handle named input if "input_err" in [i.name for i in self.keras_model.inputs]: input_data = {"input": input_data, "input_err": inputs_err} else: input_data = {"input": input_data} input_data = self.pre_testing_checklist_master(input_data) if self.input_normalizer is not None: input_array = self.input_normalizer.normalize(input_data, calc=False) else: # Prevent shallow copy issue input_array = np.array(input_data) input_array -= self.input_mean["input"] input_array /= self.input_std["input"] total_test_num = input_data["input"].shape[0] # Number of testing data if batch_size is None: batch_size = self.batch_size # for number of training data smaller than batch_size batch_size = np.min([total_test_num, batch_size]) # Due to the nature of how generator works, no overlapped prediction data_gen_shape = (total_test_num // batch_size) * batch_size remainder_shape = total_test_num - data_gen_shape # Remainder from generator norm_data_main = {} norm_data_remainder = {} for name in input_array.keys(): norm_data_main.update({name: input_array[name][:data_gen_shape]}) norm_data_remainder.update({name: input_array[name][data_gen_shape:]}) # Data Generator for prediction with tqdm(total=total_test_num, unit="sample") as pbar: pbar.set_postfix({"Monte-Carlo": self.mc_num}) pbar.set_description_str("Prediction progress: ") prediction_generator = BayesianCNNPredDataGenerator( batch_size=batch_size, shuffle=False, steps_per_epoch=data_gen_shape // batch_size, data=[norm_data_main], pbar=pbar, ) new = FastMCInference(self.mc_num, self.keras_model_predict).transformed_model result = new.predict(prediction_generator, verbose=0) if not isinstance(result, dict): raise TypeError("The output of the model must be a dictionary") if remainder_shape != 0: # deal with remainder remainder_generator = BayesianCNNPredDataGenerator( batch_size=remainder_shape, shuffle=False, steps_per_epoch=1, data=[norm_data_remainder], ) pbar.update(remainder_shape) remainder_result = new.predict(remainder_generator, verbose=0) # if it is a dict, then concatenate the values of the dict if isinstance(result, dict): for key in result.keys(): result[key] = np.concatenate( (result[key], remainder_result[key]) ) # if remainder_shape == 1: # remainder_result = np.expand_dims(remainder_result, axis=0) # result = np.concatenate((result, remainder_result)) # # in case only 1 test data point, in such case we need to add a dimension # if result.ndim < 3 and batch_size == 1: # result = np.expand_dims(result, axis=0) predictions = result["output"][:, :, 0] # mean prediction mc_dropout_uncertainty = result["output"][:, :, 1] * ( self.labels_std["output"] ** 2 ) # model uncertainty predictions_var = np.exp(result["variance_output"][:, :, 0]) * ( self.labels_std["output"] ** 2 ) # predictive uncertainty if self.labels_normalizer is not None: predictions = self.labels_normalizer.denormalize( list_to_dict([self.keras_model.output_names[0]], predictions) ) predictions = predictions["output"] else: predictions *= self.labels_std["output"] predictions += self.labels_mean["output"] if self.task == "regression": # Predictive variance pred_var = ( predictions_var + mc_dropout_uncertainty ) # epistemic plus aleatoric uncertainty pred_uncertainty = np.sqrt(pred_var) # Convert back to std error # final correction from variance to standard derivation mc_dropout_uncertainty = np.sqrt(mc_dropout_uncertainty) predictive_uncertainty = np.sqrt(predictions_var) elif self.task == "classification": # we want entropy for classification uncertainty predicted_class = np.argmax(predictions, axis=1) mc_dropout_uncertainty = np.ones_like(predicted_class, dtype=float) predictive_uncertainty = np.ones_like(predicted_class, dtype=float) # center variance predictions_var -= 1.0 for i in range(predicted_class.shape[0]): all_prediction = np.array(predictions[i, :]) mc_dropout_uncertainty[i] = -np.sum( all_prediction * np.log(all_prediction) ) predictive_uncertainty[i] = predictions_var[i, predicted_class[i]] pred_uncertainty = mc_dropout_uncertainty + predictive_uncertainty # We only want the predicted class back predictions = predicted_class elif self.task == "binary_classification": # we want entropy for classification uncertainty, so need prediction in logits space mc_dropout_uncertainty = -np.sum(predictions * np.log(predictions), axis=0) # need to activate before round to int so that the prediction is always 0 or 1 predictions = np.rint(sigmoid(predictions)) predictive_uncertainty = predictions_var pred_uncertainty = mc_dropout_uncertainty + predictions_var else: raise AttributeError("Unknown Task") return predictions, { "total": pred_uncertainty, "model": mc_dropout_uncertainty, "predictive": predictive_uncertainty, }
def predict_dataset(self, file): class BayesianCNNPredDataGeneratorV2(GeneratorMaster): def __init__( self, batch_size, shuffle, steps_per_epoch, manual_reset=False, pbar=None, nn_model=None, ): super().__init__( batch_size=batch_size, shuffle=shuffle, steps_per_epoch=steps_per_epoch, data=None, manual_reset=manual_reset, ) self.pbar = pbar # initial idx self.idx_list = self._get_exploration_order(range(len(file))) self.current_idx = 0 self.nn_model = nn_model def _data_generation(self, idx_list_temp): # Generate data inputs = self.nn_model.input_normalizer.normalize( { "input": file[idx_list_temp], "input_err": np.zeros_like(file[idx_list_temp]), }, calc=False, ) x = self.input_d_checking(inputs, np.arange(len(idx_list_temp))) return x def __getitem__(self, index): x = self._data_generation( self.idx_list[ index * self.batch_size : (index + 1) * self.batch_size ] ) if self.pbar: self.pbar.update(self.batch_size) return x def on_epoch_end(self): # shuffle the list when epoch ends for the next epoch self.idx_list = self._get_exploration_order(range(len(file))) self.has_model_check() if self.mc_num < 2: raise AttributeError("mc_num cannot be smaller than 2") total_test_num = len(file) # Number of testing data # for number of training data smaller than batch_size if total_test_num < self.batch_size: batch_size = total_test_num else: batch_size = self.batch_size # Due to the nature of how generator works, no overlapped prediction data_gen_shape = (total_test_num // batch_size) * batch_size remainder_shape = total_test_num - data_gen_shape # Remainder from generator # Data Generator for prediction with tqdm(total=total_test_num, unit="sample") as pbar: pbar.set_postfix({"Monte-Carlo": self.mc_num}) prediction_generator = BayesianCNNPredDataGeneratorV2( batch_size=batch_size, shuffle=False, steps_per_epoch=data_gen_shape // batch_size, pbar=pbar, nn_model=self, ) new = FastMCInference(self.mc_num, self.keras_model_predict).transformed_model result = np.asarray(new.predict(prediction_generator)) if remainder_shape != 0: # deal with remainder remainder_generator = BayesianCNNPredDataGeneratorV2( batch_size=remainder_shape, shuffle=False, steps_per_epoch=1, pbar=pbar, nn_model=self, ) remainder_result = np.asarray(new.predict(remainder_generator)) if remainder_shape == 1: remainder_result = np.expand_dims(remainder_result, axis=0) result = np.concatenate((result, remainder_result)) # in case only 1 test data point, in such case we need to add a dimension if result.ndim < 3 and batch_size == 1: result = np.expand_dims(result, axis=0) half_first_dim = ( result.shape[1] // 2 ) # result.shape[1] is guarantee an even number, otherwise sth is wrong predictions = result[:, :half_first_dim, 0] # mean prediction mc_dropout_uncertainty = result[:, :half_first_dim, 1] * ( self.labels_std["output"] ** 2 ) # model uncertainty predictions_var = np.exp(result[:, half_first_dim:, 0]) * ( self.labels_std["output"] ** 2 ) # predictive uncertainty if self.labels_normalizer is not None: predictions = self.labels_normalizer.denormalize( list_to_dict([self.keras_model.output_names[0]], predictions) ) predictions = predictions["output"] else: predictions *= self.labels_std["output"] predictions += self.labels_mean["output"] if self.task == "regression": # Predictive variance pred_var = ( predictions_var + mc_dropout_uncertainty ) # epistemic plus aleatoric uncertainty pred_uncertainty = np.sqrt(pred_var) # Convert back to std error # final correction from variance to standard derivation mc_dropout_uncertainty = np.sqrt(mc_dropout_uncertainty) predictive_uncertainty = np.sqrt(predictions_var) elif self.task == "classification": # we want entropy for classification uncertainty predicted_class = np.argmax(predictions, axis=1) mc_dropout_uncertainty = np.ones_like(predicted_class, dtype=float) predictive_uncertainty = np.ones_like(predicted_class, dtype=float) # center variance predictions_var -= 1.0 for i in range(predicted_class.shape[0]): all_prediction = np.array(predictions[i, :]) mc_dropout_uncertainty[i] = -np.sum( all_prediction * np.log(all_prediction) ) predictive_uncertainty[i] = predictions_var[i, predicted_class[i]] pred_uncertainty = mc_dropout_uncertainty + predictive_uncertainty # We only want the predicted class back predictions = predicted_class elif self.task == "binary_classification": # we want entropy for classification uncertainty, so need prediction in logits space mc_dropout_uncertainty = -np.sum(predictions * np.log(predictions), axis=0) # need to activate before round to int so that the prediction is always 0 or 1 predictions = np.rint(sigmoid(predictions)) predictive_uncertainty = predictions_var pred_uncertainty = mc_dropout_uncertainty + predictions_var else: raise AttributeError("Unknown Task") return predictions, { "total": pred_uncertainty, "model": mc_dropout_uncertainty, "predictive": predictive_uncertainty, }
[docs] def evaluate( self, input_data, labels, inputs_err=None, labels_err=None, batch_size=None ): """ Evaluate neural network by provided input data and labels and get back a metrics score :param input_data: Data to be trained with neural network :type input_data: ndarray :param labels: Labels to be trained with neural network :type labels: ndarray :param inputs_err: Error for input_data (if any), same shape with input_data. :type inputs_err: Union([NoneType, ndarray]) :param labels_err: Labels error (if any) :type labels_err: Union([NoneType, ndarray]) :return: metrics score dictionary :rtype: dict :History: 2018-May-20 - Written - Henry Leung (University of Toronto) """ self.has_model_check() if inputs_err is None: inputs_err = np.zeros_like(input_data) if labels_err is None: labels_err = np.zeros_like(labels) input_data = {"input": input_data} labels = {"output": labels} # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(labels) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize(labels, calc=False) # No need to care about Magic number as loss function looks for magic num in y_true only norm_input_err = inputs_err / self.input_std["input"] norm_labels_err = labels_err / self.labels_std["output"] if "input_err" in [i.name for i in self.keras_model.inputs]: norm_data.update( {"input_err": norm_input_err, "labels_err": norm_labels_err} ) else: norm_data.update({"labels_err": norm_labels_err}) norm_labels.update({"variance_output": norm_labels["output"]}) total_num = input_data["input"].shape[0] if batch_size is None: batch_size = self.batch_size batch_size = np.min([total_num, batch_size]) steps = total_num // self.batch_size if total_num > self.batch_size else 1 start_time = time.time() print("Starting Evaluation") evaluate_generator = BayesianCNNDataGenerator( batch_size=batch_size, shuffle=False, steps_per_epoch=steps, data=[norm_data, norm_labels], ) scores = self.keras_model.evaluate(evaluate_generator) if isinstance(scores, float): # make sure scores is iterable scores = list(str(scores)) outputname = self.keras_model.output_names funcname = self.keras_model.metrics_names print(f"Completed Evaluation, {(time.time() - start_time):.{2}f}s elapsed") return list_to_dict(funcname, scores)