import json
import os
import time
import warnings
from abc import ABC
import numpy as np
from tqdm import tqdm
import keras
from astroNN.config import (
_KERAS_BACKEND,
backend_framework,
)
from astroNN.config import _astroNN_MODEL_NAME
from astroNN.models.base_master_nn import NeuralNetMaster
from astroNN.nn.callbacks import VirutalCSVLogger
from astroNN.nn.layers import FastMCInference
from astroNN.nn.losses import (
mean_absolute_error,
mean_error,
zeros_loss,
)
from astroNN.nn.metrics import categorical_accuracy, binary_accuracy
from astroNN.nn.numpy import sigmoid
from astroNN.nn.utilities import Normalizer
from astroNN.nn.utilities.generator import GeneratorMaster
from astroNN.shared.dict_tools import dict_np_to_dict_list, list_to_dict
from astroNN.nn.losses import (
bayesian_binary_crossentropy_wrapper,
)
from astroNN.nn.losses import (
bayesian_categorical_crossentropy_wrapper,
)
from astroNN.nn.losses import mse_lin_wrapper, mse_var_wrapper
from sklearn.model_selection import train_test_split
regularizers = keras.regularizers
ReduceLROnPlateau = keras.callbacks.ReduceLROnPlateau
Adam = keras.optimizers.Adam
class BayesianCNNDataGenerator(GeneratorMaster):
"""
To generate data to NN
:param batch_size: batch size
:type batch_size: int
:param shuffle: Whether to shuffle batches or not
:type shuffle: bool
:param data: List of data to NN
:type data: list
:param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
:type manual_reset: bool
:param sample_weight: Sample weights (if any)
:type sample_weight: Union([NoneType, ndarray])
:History:
| 2017-Dec-02 - Written - Henry Leung (University of Toronto)
| 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
"""
def __init__(
self,
batch_size,
shuffle,
steps_per_epoch,
data,
manual_reset=False,
sample_weight=None,
):
super().__init__(
batch_size=batch_size,
shuffle=shuffle,
steps_per_epoch=steps_per_epoch,
data=data,
manual_reset=manual_reset,
)
self.inputs = self.data[0]
self.labels = self.data[1]
self.sample_weight = sample_weight
# initial idx
self.idx_list = self._get_exploration_order(
range(self.inputs["input"].shape[0])
)
def _data_generation(self, idx_list_temp):
x = self.input_d_checking(self.inputs, idx_list_temp)
if "labels_err" in x.keys():
x.update({"labels_err": np.squeeze(x["labels_err"])})
y = {}
for name in self.labels.keys():
y.update({name: self.labels[name][idx_list_temp]})
if self.sample_weight is not None:
return x, y, self.sample_weight[idx_list_temp]
else:
return x, y
def __getitem__(self, index):
return self._data_generation(
self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
)
def on_epoch_end(self):
# shuffle the list when epoch ends for the next epoch
self.idx_list = self._get_exploration_order(
range(self.inputs["input"].shape[0])
)
class BayesianCNNPredDataGenerator(GeneratorMaster):
"""
To generate data to NN for prediction
:param batch_size: batch size
:type batch_size: int
:param shuffle: Whether to shuffle batches or not
:type shuffle: bool
:param data: List of data to NN
:type data: list
:param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
:type manual_reset: bool
:param pbar: tqdm progress bar
:type pbar: obj
:History:
| 2017-Dec-02 - Written - Henry Leung (University of Toronto)
| 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
"""
def __init__(
self, batch_size, shuffle, steps_per_epoch, data, manual_reset=False, pbar=None
):
super().__init__(
batch_size=batch_size,
shuffle=shuffle,
steps_per_epoch=steps_per_epoch,
data=data,
manual_reset=manual_reset,
)
self.inputs = self.data[0]
self.pbar = pbar
# initial idx
self.idx_list = self._get_exploration_order(
range(self.inputs[list(self.inputs.keys())[0]].shape[0])
)
self.current_idx = -1
def _data_generation(self, idx_list_temp):
# Generate data
x = self.input_d_checking(self.inputs, idx_list_temp)
return x
def __getitem__(self, index):
x = self._data_generation(
self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
)
if self.pbar and index > self.current_idx:
self.pbar.update(self.batch_size)
self.current_idx = index
return x
def on_epoch_end(self):
# shuffle the list when epoch ends for the next epoch
self.idx_list = self._get_exploration_order(
range(self.inputs[list(self.inputs.keys())[0]].shape[0])
)
[docs]
class BayesianCNNBase(NeuralNetMaster, ABC):
"""
Top-level class for a Bayesian convolutional neural network
:History: 2018-Jan-06 - Written - Henry Leung (University of Toronto)
"""
def __init__(self):
super().__init__()
self.name = "Bayesian Convolutional Neural Network"
self._model_type = "BCNN"
self.initializer = None
self.activation = None
self._last_layer_activation = None
self.num_filters = None
self.filter_len = None
self.pool_length = None
self.num_hidden = None
self.reduce_lr_epsilon = None
self.reduce_lr_min = None
self.reduce_lr_patience = None
self.l1 = None
self.l2 = None
self.maxnorm = None
self.inv_model_precision = None # inverse model precision
self.dropout_rate = 0.2
self.length_scale = 3 # prior length scale
self.mc_num = 100 # increased to 100 due to high performance VI on GPU implemented on 14 April 2018 (Henry)
self.val_size = 0.1
self.disable_dropout = False
self.aux_length = 0
self.input_norm_mode = 1
self.labels_norm_mode = 2
self.keras_model_predict = None
def pre_training_checklist_child(self, input_data, labels, sample_weight):
input_data, labels = self.pre_training_checklist_master(input_data, labels)
# check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again)
if self.input_normalizer is None:
self.input_normalizer = Normalizer(
mode=self.input_norm_mode, verbose=self.verbose
)
self.labels_normalizer = Normalizer(
mode=self.labels_norm_mode, verbose=self.verbose
)
norm_data = self.input_normalizer.normalize(input_data)
self.input_mean, self.input_std = (
self.input_normalizer.mean_labels,
self.input_normalizer.std_labels,
)
norm_labels = self.labels_normalizer.normalize(labels)
self.labels_mean, self.labels_std = (
self.labels_normalizer.mean_labels,
self.labels_normalizer.std_labels,
)
else:
norm_data = self.input_normalizer.normalize(input_data, calc=False)
norm_labels = self.labels_normalizer.normalize(labels, calc=False)
# No need to care about Magic number as loss function looks for magic num in y_true only
norm_data.update(
{
"input_err": (input_data["input_err"] / self.input_std["input"]),
"labels_err": input_data["labels_err"] / self.labels_std["output"],
}
)
norm_labels.update({"variance_output": norm_labels["output"]})
if (
self.keras_model is None
): # only compile if there is no keras_model, e.g. fine-tuning does not required
self.compile()
if self.has_val:
self.train_idx, self.val_idx = train_test_split(
np.arange(self.num_train + self.val_num), test_size=self.val_size
)
else:
self.train_idx = np.arange(self.num_train + self.val_num)
# just dummy, to minimize modification needed
self.val_idx = np.arange(self.num_train + self.val_num)[:2]
norm_data_training = {}
norm_data_val = {}
norm_labels_training = {}
norm_labels_val = {}
for name in norm_data.keys():
norm_data_training.update({name: norm_data[name][self.train_idx]})
norm_data_val.update({name: norm_data[name][self.val_idx]})
for name in norm_labels.keys():
norm_labels_training.update({name: norm_labels[name][self.train_idx]})
norm_labels_val.update({name: norm_labels[name][self.val_idx]})
if sample_weight is not None:
sample_weight_training = sample_weight[self.train_idx]
sample_weight_val = sample_weight[self.val_idx]
else:
sample_weight_training = None
sample_weight_val = None
self.inv_model_precision = (2 * self.num_train * self.l2) / (
self.length_scale**2 * (1 - self.dropout_rate)
)
self.training_generator = BayesianCNNDataGenerator(
batch_size=self.batch_size,
shuffle=True,
steps_per_epoch=self.num_train // self.batch_size,
data=[norm_data_training, norm_labels_training],
manual_reset=False,
sample_weight=sample_weight_training,
)
if self.has_val:
val_batchsize = (
self.batch_size
if len(self.val_idx) > self.batch_size
else len(self.val_idx)
)
self.validation_generator = BayesianCNNDataGenerator(
batch_size=val_batchsize,
shuffle=False,
steps_per_epoch=max(self.val_num // self.batch_size, 1),
data=[norm_data_val, norm_labels_val],
manual_reset=True,
sample_weight=sample_weight_val,
)
return (
norm_data_training,
norm_data_val,
norm_labels_training,
norm_labels_val,
sample_weight_training,
sample_weight_val,
)
def compile(
self,
optimizer=None,
loss=None,
metrics=None,
weighted_metrics=None,
loss_weights=None,
):
if optimizer is not None:
self.optimizer = optimizer
elif self.optimizer is None or self.optimizer == "adam":
self.optimizer = Adam(
learning_rate=self.lr,
beta_1=self.beta_1,
beta_2=self.beta_2,
epsilon=self.optimizer_epsilon,
)
if metrics is not None:
self.metrics = metrics
if self.task == "regression":
if self._last_layer_activation is None:
self._last_layer_activation = "linear"
elif self.task == "classification":
if self._last_layer_activation is None:
self._last_layer_activation = "softmax"
elif self.task == "binary_classification":
if self._last_layer_activation is None:
self._last_layer_activation = "sigmoid"
else:
raise RuntimeError(
'Only "regression", "classification" and "binary_classification" are supported'
)
(
self.keras_model,
self.keras_model_predict,
self.output_loss,
self.variance_loss,
) = self.model()
if self.task == "regression":
self._output_loss = lambda predictive, labelerr: mse_lin_wrapper(
predictive, labelerr
)
elif self.task == "classification":
self._output_loss = (
lambda predictive, labelerr: bayesian_categorical_crossentropy_wrapper(
predictive
)
)
elif self.task == "binary_classification":
self._output_loss = (
lambda predictive, labelerr: bayesian_binary_crossentropy_wrapper(
predictive
)
)
else:
raise RuntimeError(
'Only "regression", "classification" and "binary_classification" are supported'
)
# all zero losss as dummy lose
if self.task == "regression":
self.metrics = (
[mean_absolute_error, mean_error] if not self.metrics else self.metrics
)
if isinstance(self.metrics, list):
new_metrics = {}
# assuming for each output [output1, output2], apply each metric [metric1, metric2] to it
# such that the output1 will be evaluated by both metric and output2 will be evaluated by both metric
for i in self.keras_model.output_names:
new_metrics.update({i: self.metrics})
self.metrics = new_metrics
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics=self.metrics,
weighted_metrics=weighted_metrics,
)
elif self.task == "classification":
self.metrics = [categorical_accuracy] if not self.metrics else self.metrics
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics={"output": self.metrics},
weighted_metrics=weighted_metrics,
)
elif self.task == "binary_classification":
self.metrics = [binary_accuracy] if not self.metrics else self.metrics
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics={"output": self.metrics},
weighted_metrics=weighted_metrics,
)
# inject custom training step if needed
try:
self.custom_train_step()
except NotImplementedError:
pass
except TypeError:
self.keras_model.train_step = self.custom_train_step
# inject custom testing step if needed
try:
self.custom_test_step()
except NotImplementedError:
pass
except TypeError:
self.keras_model.test_step = self.custom_test_step
return None
[docs]
def recompile(self, weighted_metrics=None, loss_weights=None):
"""
To be used when you need to recompile a already existing model
"""
# all zero losss as dummy lose
if self.task == "regression":
self.metrics = (
[mean_absolute_error, mean_error] if not self.metrics else self.metrics
)
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics=self.metrics,
weighted_metrics=weighted_metrics,
)
elif self.task == "classification":
self.metrics = [categorical_accuracy] if not self.metrics else self.metrics
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics={"output": self.metrics},
weighted_metrics=weighted_metrics,
)
elif self.task == "binary_classification":
self.metrics = [binary_accuracy] if not self.metrics else self.metrics
self.keras_model.compile(
optimizer=self.optimizer,
loss=zeros_loss,
metrics={"output": self.metrics},
weighted_metrics=weighted_metrics,
)
[docs]
def custom_train_step(self, data):
"""
Custom training logic
:param data:
:return:
"""
x, y, sample_weight = keras.utils.unpack_x_y_sample_weight(data)
if _KERAS_BACKEND == "tensorflow":
# Run forward pass.
with backend_framework.GradientTape() as tape:
y_pred = self.keras_model(x, training=True)
# TODO: deal with sample weights
loss = self._output_loss(y_pred["variance_output"], x["labels_err"])(
y["output"], y_pred["output"]
)
self.keras_model._loss_tracker.update_state(loss)
if self.keras_model.optimizer is not None:
loss = self.keras_model.optimizer.scale_loss(loss)
gradients = tape.gradient(loss, self.keras_model.trainable_weights)
# Update weights
self.keras_model.optimizer.apply_gradients(
zip(gradients, self.keras_model.trainable_weights)
)
elif _KERAS_BACKEND == "torch":
self.keras_model.zero_grad()
y_pred = self.keras_model(x, training=True)
loss = self._output_loss(y_pred["variance_output"], x["labels_err"])(y["output"], y_pred["output"])
loss.sum().backward()
trainable_weights = [v for v in self.keras_model.trainable_weights]
gradients = [v.value.grad for v in trainable_weights]
# Update weights
with backend_framework.no_grad():
self.keras_model.optimizer.apply_gradients(
zip(gradients, self.keras_model.trainable_weights)
)
else:
raise RuntimeError(
"Currently only tensorflow and torch backend are supported"
)
# Update metrics
# print(self.keras_model.metrics[1]._user_metrics["output"])
# for metric in self.keras_model.metrics[1]:
# metric.update_state(y, y_pred)
self.keras_model.metrics[1].update_state(y, y_pred)
return self.keras_model.get_metrics_result()
def custom_test_step(self, data):
x, y, sample_weight = keras.utils.unpack_x_y_sample_weight(data)
y_pred = self.keras_model(x, training=False)
# Updates stateful loss metrics.
temploss = self._output_loss(y_pred["variance_output"], x["labels_err"])
# self.keras_model.compiled_loss._losses = temploss
# self.keras_model.compiled_loss._losses = nest.map_structure(
# self.keras_model.compiled_loss._get_loss_object,
# self.keras_model.compiled_loss._losses,
# )
# self.keras_model.compiled_loss._losses = nest.flatten(
# self.keras_model.compiled_loss._losses
# )
# self.keras_model.compiled_loss(
# y, y_pred, sample_weight, regularization_losses=self.keras_model.losses
# )
# self.keras_model.compiled_metrics.update_state(y, y_pred, sample_weight)
# Collect metrics to return
return_metrics = {}
for metric in self.keras_model.metrics:
result = metric.result()
if isinstance(result, dict):
return_metrics.update(result)
else:
return_metrics[metric.name] = result
return return_metrics
[docs]
def fit(
self,
input_data,
labels,
inputs_err=None,
labels_err=None,
sample_weight=None,
):
"""
Train a Bayesian neural network
:param input_data: Data to be trained with neural network
:type input_data: ndarray
:param labels: Labels to be trained with neural network
:type labels: ndarray
:param inputs_err: Error for input_data (if any), same shape with input_data.
:type inputs_err: Union([NoneType, ndarray])
:param labels_err: Labels error (if any)
:type labels_err: Union([NoneType, ndarray])
:param sample_weight: Sample weights (if any)
:type sample_weight: Union([NoneType, ndarray])
:return: None
:rtype: NoneType
:History:
| 2018-Jan-06 - Written - Henry Leung (University of Toronto)
| 2018-Apr-12 - Updated - Henry Leung (University of Toronto)
"""
if inputs_err is None:
inputs_err = np.zeros_like(input_data)
if labels_err is None:
labels_err = np.zeros_like(labels)
# TODO: allow named inputs too??
input_data = {
"input": input_data,
"input_err": inputs_err,
"labels_err": labels_err,
}
labels = {"output": labels, "variance_output": labels}
# Call the checklist to create astroNN folder and save parameters
(
norm_data_training,
norm_data_val,
norm_labels_training,
norm_labels_val,
sample_weight_training,
sample_weight_val,
) = self.pre_training_checklist_child(input_data, labels, sample_weight)
# norm_data_training['labels_err'] = norm_data_training['labels_err'].filled(MAGIC_NUMBER).astype(np.float32)
# TODO: fix the monitor name
reduce_lr = ReduceLROnPlateau(
monitor="val_output_mean_absolute_error",
factor=0.5,
min_delta=self.reduce_lr_epsilon,
patience=self.reduce_lr_patience,
min_lr=self.reduce_lr_min,
mode="min",
verbose=self.verbose,
)
self.virtual_cvslogger = VirutalCSVLogger()
self.__callbacks = [
reduce_lr,
self.virtual_cvslogger,
] # default must have unchangeable callbacks
if self.callbacks is not None:
if isinstance(self.callbacks, list):
self.__callbacks.extend(self.callbacks)
else:
self.__callbacks.append(self.callbacks)
start_time = time.time()
self.history = self.keras_model.fit(
self.training_generator,
validation_data=self.validation_generator,
epochs=self.max_epochs,
verbose=self.verbose,
callbacks=self.__callbacks,
)
print(f"Completed Training, {(time.time() - start_time):.{2}f}s in total")
if self.autosave is True:
# Call the post training checklist to save parameters
self.save()
return None
[docs]
def fit_on_batch(
self, input_data, labels, inputs_err=None, labels_err=None, sample_weight=None
):
"""
Train a Bayesian neural network by running a single gradient update on all of your data, suitable for fine-tuning
:param input_data: Data to be trained with neural network
:type input_data: ndarray
:param labels: Labels to be trained with neural network
:type labels: ndarray
:param inputs_err: Error for input_data (if any), same shape with input_data.
:type inputs_err: Union([NoneType, ndarray])
:param labels_err: Labels error (if any)
:type labels_err: Union([NoneType, ndarray])
:param sample_weight: Sample weights (if any)
:type sample_weight: Union([NoneType, ndarray])
:return: None
:rtype: NoneType
:History:
| 2018-Aug-25 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
if inputs_err is None:
inputs_err = np.zeros_like(input_data)
if labels_err is None:
labels_err = np.zeros_like(labels)
input_data = {
"input": input_data,
"input_err": inputs_err,
"labels_err": labels_err,
}
labels = {"output": labels, "variance_output": labels}
# check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again)
if self.input_normalizer is None:
self.input_normalizer = Normalizer(
mode=self.input_norm_mode, verbose=self.verbose
)
self.labels_normalizer = Normalizer(
mode=self.labels_norm_mode, verbose=self.verbose
)
norm_data = self.input_normalizer.normalize(input_data)
self.input_mean, self.input_std = (
self.input_normalizer.mean_labels,
self.input_normalizer.std_labels,
)
norm_labels = self.labels_normalizer.normalize(labels)
self.labels_mean, self.labels_std = (
self.labels_normalizer.mean_labels,
self.labels_normalizer.std_labels,
)
else:
norm_data = self.input_normalizer.normalize(input_data, calc=False)
norm_labels = self.labels_normalizer.normalize(labels, calc=False)
# No need to care about Magic number as loss function looks for magic num in y_true only
norm_data.update(
{
"input_err": (input_data["input_err"] / self.input_std["input"]),
"labels_err": input_data["labels_err"] / self.labels_std["output"],
}
)
norm_labels.update({"variance_output": norm_labels["output"]})
start_time = time.time()
fit_generator = BayesianCNNDataGenerator(
batch_size=input_data["input"].shape[0],
shuffle=False,
steps_per_epoch=1,
data=[norm_data, norm_labels],
sample_weight=sample_weight,
)
score = self.keras_model.fit(
fit_generator,
epochs=1,
verbose=self.verbose,
)
print(
f"Completed Training on Batch, {(time.time() - start_time):.{2}f}s in total"
)
return None
def post_training_checklist_child(self):
self.keras_model.save(self.fullfilepath + _astroNN_MODEL_NAME)
print(
_astroNN_MODEL_NAME
+ f" saved to {(self.fullfilepath + _astroNN_MODEL_NAME)}"
)
self.hyper_txt.write(f"Dropout Rate: {self.dropout_rate} \n")
self.hyper_txt.flush()
self.hyper_txt.close()
data = {
"id": self.__class__.__name__
if self._model_identifier is None
else self._model_identifier,
"pool_length": self.pool_length,
"filterlen": self.filter_len,
"filternum": self.num_filters,
"hidden": self.num_hidden,
"input": self._input_shape,
"labels": self._labels_shape,
"task": self.task,
"last_layer_activation": self._last_layer_activation,
"activation": self.activation,
"input_mean": dict_np_to_dict_list(self.input_mean),
"inv_tau": self.inv_model_precision,
"length_scale": self.length_scale,
"labels_mean": dict_np_to_dict_list(self.labels_mean),
"input_std": dict_np_to_dict_list(self.input_std),
"labels_std": dict_np_to_dict_list(self.labels_std),
"valsize": self.val_size,
"targetname": self.targetname,
"dropout_rate": self.dropout_rate,
"l1": self.l1,
"l2": self.l2,
"maxnorm": self.maxnorm,
"input_norm_mode": self.input_normalizer.normalization_mode,
"labels_norm_mode": self.labels_normalizer.normalization_mode,
"input_names": self.input_names,
"output_names": self.output_names,
"batch_size": self.batch_size,
"aux_length": self.aux_length,
}
with open(self.fullfilepath + "/astroNN_model_parameter.json", "w") as f:
json.dump(data, f, indent=4, sort_keys=True)
[docs]
def predict(self, input_data, inputs_err=None, batch_size=None):
"""
Test model, High performance version designed for fast variational inference on GPU
:param input_data: Data to be inferred with neural network
:type input_data: ndarray
:param inputs_err: Error for input_data, same shape with input_data.
:type inputs_err: Union([NoneType, ndarray])
:return: prediction and prediction uncertainty
:History:
| 2018-Jan-06 - Written - Henry Leung (University of Toronto)
| 2018-Apr-12 - Updated - Henry Leung (University of Toronto)
"""
self.has_model_check()
if self.mc_num < 2:
raise AttributeError("mc_num cannot be smaller than 2")
# if no error array then just zeros
if inputs_err is None:
inputs_err = np.zeros_like(input_data)
else:
inputs_err = np.atleast_2d(inputs_err)
inputs_err /= self.input_std["input"]
# TODO: better way to handle named input
if "input_err" in [i.name for i in self.keras_model.inputs]:
input_data = {"input": input_data, "input_err": inputs_err}
else:
input_data = {"input": input_data}
input_data = self.pre_testing_checklist_master(input_data)
if self.input_normalizer is not None:
input_array = self.input_normalizer.normalize(input_data, calc=False)
else:
# Prevent shallow copy issue
input_array = np.array(input_data)
input_array -= self.input_mean["input"]
input_array /= self.input_std["input"]
total_test_num = input_data["input"].shape[0] # Number of testing data
if batch_size is None:
batch_size = self.batch_size
# for number of training data smaller than batch_size
batch_size = np.min([total_test_num, batch_size])
# Due to the nature of how generator works, no overlapped prediction
data_gen_shape = (total_test_num // batch_size) * batch_size
remainder_shape = total_test_num - data_gen_shape # Remainder from generator
norm_data_main = {}
norm_data_remainder = {}
for name in input_array.keys():
norm_data_main.update({name: input_array[name][:data_gen_shape]})
norm_data_remainder.update({name: input_array[name][data_gen_shape:]})
# Data Generator for prediction
with tqdm(total=total_test_num, unit="sample") as pbar:
pbar.set_postfix({"Monte-Carlo": self.mc_num})
pbar.set_description_str("Prediction progress: ")
prediction_generator = BayesianCNNPredDataGenerator(
batch_size=batch_size,
shuffle=False,
steps_per_epoch=data_gen_shape // batch_size,
data=[norm_data_main],
pbar=pbar,
)
new = FastMCInference(self.mc_num, self.keras_model_predict).transformed_model
result = new.predict(prediction_generator, verbose=0)
if not isinstance(result, dict):
raise TypeError("The output of the model must be a dictionary")
if remainder_shape != 0: # deal with remainder
remainder_generator = BayesianCNNPredDataGenerator(
batch_size=remainder_shape,
shuffle=False,
steps_per_epoch=1,
data=[norm_data_remainder],
)
pbar.update(remainder_shape)
remainder_result = new.predict(remainder_generator, verbose=0)
# if it is a dict, then concatenate the values of the dict
if isinstance(result, dict):
for key in result.keys():
result[key] = np.concatenate(
(result[key], remainder_result[key])
)
# if remainder_shape == 1:
# remainder_result = np.expand_dims(remainder_result, axis=0)
# result = np.concatenate((result, remainder_result))
# # in case only 1 test data point, in such case we need to add a dimension
# if result.ndim < 3 and batch_size == 1:
# result = np.expand_dims(result, axis=0)
predictions = result["output"][:, :, 0] # mean prediction
mc_dropout_uncertainty = result["output"][:, :, 1] * (
self.labels_std["output"] ** 2
) # model uncertainty
predictions_var = np.exp(result["variance_output"][:, :, 0]) * (
self.labels_std["output"] ** 2
) # predictive uncertainty
if self.labels_normalizer is not None:
predictions = self.labels_normalizer.denormalize(
list_to_dict([self.keras_model.output_names[0]], predictions)
)
predictions = predictions["output"]
else:
predictions *= self.labels_std["output"]
predictions += self.labels_mean["output"]
if self.task == "regression":
# Predictive variance
pred_var = (
predictions_var + mc_dropout_uncertainty
) # epistemic plus aleatoric uncertainty
pred_uncertainty = np.sqrt(pred_var) # Convert back to std error
# final correction from variance to standard derivation
mc_dropout_uncertainty = np.sqrt(mc_dropout_uncertainty)
predictive_uncertainty = np.sqrt(predictions_var)
elif self.task == "classification":
# we want entropy for classification uncertainty
predicted_class = np.argmax(predictions, axis=1)
mc_dropout_uncertainty = np.ones_like(predicted_class, dtype=float)
predictive_uncertainty = np.ones_like(predicted_class, dtype=float)
# center variance
predictions_var -= 1.0
for i in range(predicted_class.shape[0]):
all_prediction = np.array(predictions[i, :])
mc_dropout_uncertainty[i] = -np.sum(
all_prediction * np.log(all_prediction)
)
predictive_uncertainty[i] = predictions_var[i, predicted_class[i]]
pred_uncertainty = mc_dropout_uncertainty + predictive_uncertainty
# We only want the predicted class back
predictions = predicted_class
elif self.task == "binary_classification":
# we want entropy for classification uncertainty, so need prediction in logits space
mc_dropout_uncertainty = -np.sum(predictions * np.log(predictions), axis=0)
# need to activate before round to int so that the prediction is always 0 or 1
predictions = np.rint(sigmoid(predictions))
predictive_uncertainty = predictions_var
pred_uncertainty = mc_dropout_uncertainty + predictions_var
else:
raise AttributeError("Unknown Task")
return predictions, {
"total": pred_uncertainty,
"model": mc_dropout_uncertainty,
"predictive": predictive_uncertainty,
}
def predict_dataset(self, file):
class BayesianCNNPredDataGeneratorV2(GeneratorMaster):
def __init__(
self,
batch_size,
shuffle,
steps_per_epoch,
manual_reset=False,
pbar=None,
nn_model=None,
):
super().__init__(
batch_size=batch_size,
shuffle=shuffle,
steps_per_epoch=steps_per_epoch,
data=None,
manual_reset=manual_reset,
)
self.pbar = pbar
# initial idx
self.idx_list = self._get_exploration_order(range(len(file)))
self.current_idx = 0
self.nn_model = nn_model
def _data_generation(self, idx_list_temp):
# Generate data
inputs = self.nn_model.input_normalizer.normalize(
{
"input": file[idx_list_temp],
"input_err": np.zeros_like(file[idx_list_temp]),
},
calc=False,
)
x = self.input_d_checking(inputs, np.arange(len(idx_list_temp)))
return x
def __getitem__(self, index):
x = self._data_generation(
self.idx_list[
index * self.batch_size : (index + 1) * self.batch_size
]
)
if self.pbar:
self.pbar.update(self.batch_size)
return x
def on_epoch_end(self):
# shuffle the list when epoch ends for the next epoch
self.idx_list = self._get_exploration_order(range(len(file)))
self.has_model_check()
if self.mc_num < 2:
raise AttributeError("mc_num cannot be smaller than 2")
total_test_num = len(file) # Number of testing data
# for number of training data smaller than batch_size
if total_test_num < self.batch_size:
batch_size = total_test_num
else:
batch_size = self.batch_size
# Due to the nature of how generator works, no overlapped prediction
data_gen_shape = (total_test_num // batch_size) * batch_size
remainder_shape = total_test_num - data_gen_shape # Remainder from generator
# Data Generator for prediction
with tqdm(total=total_test_num, unit="sample") as pbar:
pbar.set_postfix({"Monte-Carlo": self.mc_num})
prediction_generator = BayesianCNNPredDataGeneratorV2(
batch_size=batch_size,
shuffle=False,
steps_per_epoch=data_gen_shape // batch_size,
pbar=pbar,
nn_model=self,
)
new = FastMCInference(self.mc_num, self.keras_model_predict).transformed_model
result = np.asarray(new.predict(prediction_generator))
if remainder_shape != 0: # deal with remainder
remainder_generator = BayesianCNNPredDataGeneratorV2(
batch_size=remainder_shape,
shuffle=False,
steps_per_epoch=1,
pbar=pbar,
nn_model=self,
)
remainder_result = np.asarray(new.predict(remainder_generator))
if remainder_shape == 1:
remainder_result = np.expand_dims(remainder_result, axis=0)
result = np.concatenate((result, remainder_result))
# in case only 1 test data point, in such case we need to add a dimension
if result.ndim < 3 and batch_size == 1:
result = np.expand_dims(result, axis=0)
half_first_dim = (
result.shape[1] // 2
) # result.shape[1] is guarantee an even number, otherwise sth is wrong
predictions = result[:, :half_first_dim, 0] # mean prediction
mc_dropout_uncertainty = result[:, :half_first_dim, 1] * (
self.labels_std["output"] ** 2
) # model uncertainty
predictions_var = np.exp(result[:, half_first_dim:, 0]) * (
self.labels_std["output"] ** 2
) # predictive uncertainty
if self.labels_normalizer is not None:
predictions = self.labels_normalizer.denormalize(
list_to_dict([self.keras_model.output_names[0]], predictions)
)
predictions = predictions["output"]
else:
predictions *= self.labels_std["output"]
predictions += self.labels_mean["output"]
if self.task == "regression":
# Predictive variance
pred_var = (
predictions_var + mc_dropout_uncertainty
) # epistemic plus aleatoric uncertainty
pred_uncertainty = np.sqrt(pred_var) # Convert back to std error
# final correction from variance to standard derivation
mc_dropout_uncertainty = np.sqrt(mc_dropout_uncertainty)
predictive_uncertainty = np.sqrt(predictions_var)
elif self.task == "classification":
# we want entropy for classification uncertainty
predicted_class = np.argmax(predictions, axis=1)
mc_dropout_uncertainty = np.ones_like(predicted_class, dtype=float)
predictive_uncertainty = np.ones_like(predicted_class, dtype=float)
# center variance
predictions_var -= 1.0
for i in range(predicted_class.shape[0]):
all_prediction = np.array(predictions[i, :])
mc_dropout_uncertainty[i] = -np.sum(
all_prediction * np.log(all_prediction)
)
predictive_uncertainty[i] = predictions_var[i, predicted_class[i]]
pred_uncertainty = mc_dropout_uncertainty + predictive_uncertainty
# We only want the predicted class back
predictions = predicted_class
elif self.task == "binary_classification":
# we want entropy for classification uncertainty, so need prediction in logits space
mc_dropout_uncertainty = -np.sum(predictions * np.log(predictions), axis=0)
# need to activate before round to int so that the prediction is always 0 or 1
predictions = np.rint(sigmoid(predictions))
predictive_uncertainty = predictions_var
pred_uncertainty = mc_dropout_uncertainty + predictions_var
else:
raise AttributeError("Unknown Task")
return predictions, {
"total": pred_uncertainty,
"model": mc_dropout_uncertainty,
"predictive": predictive_uncertainty,
}
[docs]
def evaluate(
self, input_data, labels, inputs_err=None, labels_err=None, batch_size=None
):
"""
Evaluate neural network by provided input data and labels and get back a metrics score
:param input_data: Data to be trained with neural network
:type input_data: ndarray
:param labels: Labels to be trained with neural network
:type labels: ndarray
:param inputs_err: Error for input_data (if any), same shape with input_data.
:type inputs_err: Union([NoneType, ndarray])
:param labels_err: Labels error (if any)
:type labels_err: Union([NoneType, ndarray])
:return: metrics score dictionary
:rtype: dict
:History: 2018-May-20 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
if inputs_err is None:
inputs_err = np.zeros_like(input_data)
if labels_err is None:
labels_err = np.zeros_like(labels)
input_data = {"input": input_data}
labels = {"output": labels}
# check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again)
if self.input_normalizer is None:
self.input_normalizer = Normalizer(
mode=self.input_norm_mode, verbose=self.verbose
)
self.labels_normalizer = Normalizer(
mode=self.labels_norm_mode, verbose=self.verbose
)
norm_data = self.input_normalizer.normalize(input_data)
self.input_mean, self.input_std = (
self.input_normalizer.mean_labels,
self.input_normalizer.std_labels,
)
norm_labels = self.labels_normalizer.normalize(labels)
self.labels_mean, self.labels_std = (
self.labels_normalizer.mean_labels,
self.labels_normalizer.std_labels,
)
else:
norm_data = self.input_normalizer.normalize(input_data, calc=False)
norm_labels = self.labels_normalizer.normalize(labels, calc=False)
# No need to care about Magic number as loss function looks for magic num in y_true only
norm_input_err = inputs_err / self.input_std["input"]
norm_labels_err = labels_err / self.labels_std["output"]
if "input_err" in [i.name for i in self.keras_model.inputs]:
norm_data.update(
{"input_err": norm_input_err, "labels_err": norm_labels_err}
)
else:
norm_data.update({"labels_err": norm_labels_err})
norm_labels.update({"variance_output": norm_labels["output"]})
total_num = input_data["input"].shape[0]
if batch_size is None:
batch_size = self.batch_size
batch_size = np.min([total_num, batch_size])
steps = total_num // self.batch_size if total_num > self.batch_size else 1
start_time = time.time()
print("Starting Evaluation")
evaluate_generator = BayesianCNNDataGenerator(
batch_size=batch_size,
shuffle=False,
steps_per_epoch=steps,
data=[norm_data, norm_labels],
)
scores = self.keras_model.evaluate(evaluate_generator)
if isinstance(scores, float): # make sure scores is iterable
scores = list(str(scores))
outputname = self.keras_model.output_names
funcname = self.keras_model.metrics_names
print(f"Completed Evaluation, {(time.time() - start_time):.{2}f}s elapsed")
return list_to_dict(funcname, scores)