Source code for astroNN.models.base_vae

import json
import os
import time
from abc import ABC

import numpy as np
from tqdm import tqdm
from tensorflow import keras as tfk
from astroNN.config import MULTIPROCESS_FLAG
from astroNN.config import _astroNN_MODEL_NAME
from astroNN.datasets import H5Loader
from astroNN.models.base_master_nn import NeuralNetMaster
from astroNN.nn.callbacks import VirutalCSVLogger
from astroNN.nn.losses import (
    mean_squared_error,
    mean_error,
    mean_absolute_error,
    mean_squared_reconstruction_error,
)
from astroNN.nn.utilities import Normalizer
from astroNN.nn.utilities.generator import GeneratorMaster
from astroNN.shared.dict_tools import dict_np_to_dict_list, list_to_dict
from astroNN.shared.warnings import deprecated, deprecated_copy_signature
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow.python.keras.engine import data_adapter
from tensorflow.python.util import nest

regularizers = tfk.regularizers
ReduceLROnPlateau = tfk.callbacks.ReduceLROnPlateau
Adam = tfk.optimizers.Adam


class CVAEDataGenerator(GeneratorMaster):
    """
    To generate data to NN

    :param batch_size: batch size
    :type batch_size: int
    :param shuffle: Whether to shuffle batches or not
    :type shuffle: bool
    :param data: List of data to NN
    :type data: list
    :param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
    :type manual_reset: bool
    :param sample_weight: Sample weights (if any)
    :type sample_weight: Union([NoneType, ndarray])
    :History:
        | 2017-Dec-02 - Written - Henry Leung (University of Toronto)
        | 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
    """

    def __init__(
        self,
        batch_size,
        shuffle,
        steps_per_epoch,
        data,
        manual_reset=False,
        sample_weight=None,
    ):
        super().__init__(
            batch_size=batch_size,
            shuffle=shuffle,
            steps_per_epoch=steps_per_epoch,
            data=data,
            manual_reset=manual_reset,
        )
        self.inputs = self.data[0]
        self.recon_inputs = self.data[1]
        self.sample_weight = sample_weight

        # initial idx
        self.idx_list = self._get_exploration_order(
            range(self.inputs["input"].shape[0])
        )

    def _data_generation(self, idx_list_temp):
        x = self.input_d_checking(self.inputs, idx_list_temp)
        y = self.input_d_checking(self.recon_inputs, idx_list_temp)
        if self.sample_weight is not None:
            return x, y, self.sample_weight[idx_list_temp]
        else:
            return x, y

    def __getitem__(self, index):
        return self._data_generation(
            self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
        )

    def on_epoch_end(self):
        # shuffle the list when epoch ends for the next epoch
        self.idx_list = self._get_exploration_order(
            range(self.inputs["input"].shape[0])
        )


class CVAEPredDataGenerator(GeneratorMaster):
    """
    To generate data to NN for prediction

    :param batch_size: batch size
    :type batch_size: int
    :param shuffle: Whether to shuffle batches or not
    :type shuffle: bool
    :param data: List of data to NN
    :type data: list
    :param key_name: key_name for the input data, default to "input"
    :type key_name: str
    :param manual_reset: Whether need to reset the generator manually, usually it is handled by tensorflow
    :type manual_reset: bool
    :param pbar: tqdm progress bar
    :type pbar: obj
    :History:
        | 2017-Dec-02 - Written - Henry Leung (University of Toronto)
        | 2019-Feb-17 - Updated - Henry Leung (University of Toronto)
    """

    def __init__(
        self,
        batch_size,
        shuffle,
        steps_per_epoch,
        data,
        key_name="input",
        manual_reset=True,
        pbar=None,
    ):
        super().__init__(
            batch_size=batch_size,
            shuffle=shuffle,
            steps_per_epoch=steps_per_epoch,
            data=data,
            manual_reset=manual_reset,
        )
        self.inputs = self.data[0]
        self.pbar = pbar
        self.input_key_name = key_name

        # initial idx
        self.idx_list = self._get_exploration_order(
            range(self.inputs[self.input_key_name].shape[0])
        )
        self.current_idx = -1

    def _data_generation(self, idx_list_temp):
        # Generate data
        x = self.input_d_checking(self.inputs, idx_list_temp)
        return x

    def __getitem__(self, index):
        x = self._data_generation(
            self.idx_list[index * self.batch_size : (index + 1) * self.batch_size]
        )
        if self.pbar and index > self.current_idx:
            self.pbar.update(self.batch_size)
        self.current_idx = index
        return x

    def on_epoch_end(self):
        # shuffle the list when epoch ends for the next epoch
        self.idx_list = self._get_exploration_order(
            range(self.inputs[self.input_key_name].shape[0])
        )


[docs] class ConvVAEBase(NeuralNetMaster, ABC): """ Top-level class for a Convolutional Variational Autoencoder :History: 2018-Jan-06 - Written - Henry Leung (University of Toronto) """ def __init__(self): super().__init__() self.name = "Convolutional Variational Autoencoder" self._model_type = "CVAE" self.initializer = None self.activation = None self._last_layer_activation = None self.num_filters = None self.filter_len = None self.pool_length = None self.num_hidden = None self.reduce_lr_epsilon = None self.reduce_lr_min = None self.reduce_lr_patience = None self.l1 = None self.l2 = None self.maxnorm = None self.latent_dim = None self.val_size = 0.1 self.dropout_rate = 0.0 self.keras_vae = None self.keras_encoder = None self.keras_decoder = None self.loss = None self._input_shape = None self.input_norm_mode = 255 self.labels_norm_mode = 255 self.input_mean = None self.input_std = None self.labels_mean = None self.labels_std = None def compile( self, optimizer=None, loss=None, metrics=None, weighted_metrics=None, loss_weights=None, sample_weight_mode=None, ): self.keras_encoder, self.keras_decoder = self.model() self.keras_model = tfk.Model( inputs=[self.keras_encoder.inputs], outputs=[self.keras_decoder(self.keras_encoder.outputs[2])], ) if optimizer is not None: self.optimizer = optimizer elif self.optimizer is None or self.optimizer == "adam": self.optimizer = Adam( learning_rate=self.lr, beta_1=self.beta_1, beta_2=self.beta_2, epsilon=self.optimizer_epsilon, ) if metrics is not None: self.metrics = metrics self.loss = ( mean_squared_reconstruction_error if not (loss and self.loss) else loss ) # self.metrics = [mean_absolute_error, mean_error] if not self.metrics else self.metrics self.metrics = [] self.keras_model.compile( loss=self.loss, optimizer=self.optimizer, metrics=self.metrics, weighted_metrics=weighted_metrics, loss_weights=loss_weights, sample_weight_mode=sample_weight_mode, ) self.keras_model.total_loss_tracker = tfk.metrics.Mean(name="loss") self.keras_model.reconstruction_loss_tracker = tfk.metrics.Mean( name="reconstruction_loss" ) self.keras_model.kl_loss_tracker = tfk.metrics.Mean(name="kl_loss") # inject custom training step if needed try: self.custom_train_step() except NotImplementedError: pass except TypeError: self.keras_model.train_step = self.custom_train_step # inject custom testing step if needed try: self.custom_test_step() except NotImplementedError: pass except TypeError: self.keras_model.test_step = self.custom_test_step return None
[docs] def recompile( self, loss=None, weighted_metrics=None, loss_weights=None, sample_weight_mode=None, ): """ To be used when you need to recompile a already existing model """ self.keras_model.compile( loss=self.loss, optimizer=self.optimizer, metrics=self.metrics, weighted_metrics=weighted_metrics, loss_weights=loss_weights, sample_weight_mode=sample_weight_mode, )
[docs] def custom_train_step(self, data): """ Custom training logic :param data: :return: """ data = data_adapter.expand_1d(data) x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data) # TODO: properly fix this y = y["output"] # Run forward pass. with tf.GradientTape() as tape: z_mean, z_log_var, z = self.keras_encoder(x, training=True) y_pred = self.keras_decoder(z, training=True) reconstruction_loss = self.loss(y, y_pred, sample_weight=sample_weight) kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)) kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1)) total_loss = reconstruction_loss + kl_loss # Run backwards pass. grads = tape.gradient(total_loss, self.keras_model.trainable_weights) self.keras_model.optimizer.apply_gradients( zip(grads, self.keras_model.trainable_weights) ) # self.keras_model.compiled_metrics.update_state(y, y_pred, sample_weight) self.keras_model.total_loss_tracker.update_state(total_loss) self.keras_model.reconstruction_loss_tracker.update_state(reconstruction_loss) self.keras_model.kl_loss_tracker.update_state(kl_loss) return_metrics = { "loss": self.keras_model.total_loss_tracker.result(), "reconstruction_loss": self.keras_model.reconstruction_loss_tracker.result(), "kl_loss": self.keras_model.kl_loss_tracker.result(), } # Collect metrics to return for metric in self.keras_model.metrics: result = metric.result() if isinstance(result, dict): return_metrics.update(result) else: return_metrics[metric.name] = result return return_metrics
def custom_test_step(self, data): data = data_adapter.expand_1d(data) x, y, sample_weight = data_adapter.unpack_x_y_sample_weight(data) y = y["output"] z_mean, z_log_var, z = self.keras_encoder(x, training=False) y_pred = self.keras_decoder(z, training=False) reconstruction_loss = self.loss(y, y_pred, sample_weight=sample_weight) kl_loss = -0.5 * (1 + z_log_var - tf.square(z_mean) - tf.exp(z_log_var)) kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1)) total_loss = reconstruction_loss + kl_loss self.keras_model.total_loss_tracker.update_state(total_loss) self.keras_model.reconstruction_loss_tracker.update_state(reconstruction_loss) self.keras_model.kl_loss_tracker.update_state(kl_loss) return_metrics = { "loss": self.keras_model.total_loss_tracker.result(), "reconstruction_loss": self.keras_model.reconstruction_loss_tracker.result(), "kl_loss": self.keras_model.kl_loss_tracker.result(), } for metric in self.keras_model.metrics: result = metric.result() if isinstance(result, dict): return_metrics.update(result) else: return_metrics[metric.name] = result return return_metrics def pre_training_checklist_child( self, input_data, input_recon_target, sample_weight ): if self.task == "classification": raise RuntimeError("astroNN VAE does not support classification task") elif self.task == "binary_classification": raise RuntimeError( "astroNN VAE does not support binary classification task" ) input_data, input_recon_target = self.pre_training_checklist_master( input_data, input_recon_target ) if isinstance(input_data, H5Loader): self.targetname = input_data.target input_data, input_recon_target = input_data.load() # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(input_recon_target) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize( input_recon_target, calc=False ) if ( self.keras_model is None ): # only compile if there is no keras_model, e.g. fine-tuning does not required self.compile() norm_data = self._tensor_dict_sanitize(norm_data, self.keras_model.input_names) norm_labels = self._tensor_dict_sanitize( norm_labels, self.keras_model.output_names ) if self.has_val: self.train_idx, self.val_idx = train_test_split( np.arange(self.num_train + self.val_num), test_size=self.val_size ) else: self.train_idx = np.arange(self.num_train + self.val_num) # just dummy, to minimize modification needed self.val_idx = np.arange(self.num_train + self.val_num)[:2] norm_data_training = {} norm_data_val = {} norm_labels_training = {} norm_labels_val = {} for name in norm_data.keys(): norm_data_training.update({name: norm_data[name][self.train_idx]}) norm_data_val.update({name: norm_data[name][self.val_idx]}) for name in norm_labels.keys(): norm_labels_training.update({name: norm_labels[name][self.train_idx]}) norm_labels_val.update({name: norm_labels[name][self.val_idx]}) if sample_weight is not None: sample_weight_training = sample_weight[self.train_idx] sample_weight_val = sample_weight[self.val_idx] else: sample_weight_training = None sample_weight_val = None self.training_generator = CVAEDataGenerator( batch_size=self.batch_size, shuffle=True, steps_per_epoch=self.num_train // self.batch_size, data=[norm_data_training, norm_labels_training], manual_reset=False, sample_weight=sample_weight_training, ) if self.has_val: val_batchsize = ( self.batch_size if len(self.val_idx) > self.batch_size else len(self.val_idx) ) self.validation_generator = CVAEDataGenerator( batch_size=val_batchsize, shuffle=True, steps_per_epoch=max(self.val_num // self.batch_size, 1), data=[norm_data_val, norm_labels_val], manual_reset=True, sample_weight=sample_weight_val, ) return input_data, input_recon_target
[docs] def fit(self, input_data, input_recon_target, sample_weight=None): """ Train a Convolutional Autoencoder :param input_data: Data to be trained with neural network :type input_data: ndarray :param input_recon_target: Data to be reconstructed :type input_recon_target: ndarray :param sample_weight: Sample weights (if any) :type sample_weight: Union([NoneType, ndarray]) :return: None :rtype: NoneType :History: 2017-Dec-06 - Written - Henry Leung (University of Toronto) """ # Call the checklist to create astroNN folder and save parameters self.pre_training_checklist_child(input_data, input_recon_target, sample_weight) reduce_lr = ReduceLROnPlateau( monitor="loss", factor=0.5, min_delta=self.reduce_lr_epsilon, patience=self.reduce_lr_patience, min_lr=self.reduce_lr_min, mode="min", verbose=self.verbose, ) self.virtual_cvslogger = VirutalCSVLogger() self.__callbacks = [ reduce_lr, self.virtual_cvslogger, ] # default must have unchangeable callbacks if self.callbacks is not None: if isinstance(self.callbacks, list): self.__callbacks.extend(self.callbacks) else: self.__callbacks.append(self.callbacks) start_time = time.time() self.keras_model.fit( self.training_generator, validation_data=self.validation_generator, epochs=self.max_epochs, verbose=self.verbose, workers=os.cpu_count(), callbacks=self.__callbacks, use_multiprocessing=MULTIPROCESS_FLAG, ) print(f"Completed Training, {(time.time() - start_time):.{2}f}s in total") if self.autosave is True: # Call the post training checklist to save parameters self.save() return None
[docs] def fit_on_batch(self, input_data, input_recon_target, sample_weight=None): """ Train a AutoEncoder by running a single gradient update on all of your data, suitable for fine-tuning :param input_data: Data to be trained with neural network :type input_data: ndarray :param input_recon_target: Data to be reconstructed :type input_recon_target: ndarray :param sample_weight: Sample weights (if any) :type sample_weight: Union([NoneType, ndarray]) :return: None :rtype: NoneType :History: 2018-Aug-25 - Written - Henry Leung (University of Toronto) """ input_data, input_recon_target = self.pre_training_checklist_master( input_data, input_recon_target ) # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(input_recon_target) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize( input_recon_target, calc=False ) norm_data = self._tensor_dict_sanitize(norm_data, self.keras_model.input_names) norm_labels = self._tensor_dict_sanitize( norm_labels, self.keras_model.output_names ) start_time = time.time() fit_generator = CVAEDataGenerator( batch_size=input_data["input"].shape[0], shuffle=False, steps_per_epoch=1, data=[norm_data, norm_labels], sample_weight=sample_weight, ) scores = self.keras_model.fit( fit_generator, epochs=1, verbose=self.verbose, workers=os.cpu_count(), use_multiprocessing=MULTIPROCESS_FLAG, ) print( f"Completed Training on Batch, {(time.time() - start_time):.{2}f}s in total" ) return None
def post_training_checklist_child(self): self.keras_model.save(self.fullfilepath + _astroNN_MODEL_NAME) print( _astroNN_MODEL_NAME + f" saved to {(self.fullfilepath + _astroNN_MODEL_NAME)}" ) self.hyper_txt.write(f"Dropout Rate: {self.dropout_rate} \n") self.hyper_txt.flush() self.hyper_txt.close() data = { "id": self.__class__.__name__, "pool_length": self.pool_length, "filterlen": self.filter_len, "filternum": self.num_filters, "hidden": self.num_hidden, "input": self._input_shape, "labels": self._labels_shape, "task": self.task, "activation": self.activation, "input_mean": dict_np_to_dict_list(self.input_mean), "labels_mean": dict_np_to_dict_list(self.labels_mean), "input_std": dict_np_to_dict_list(self.input_std), "labels_std": dict_np_to_dict_list(self.labels_std), "valsize": self.val_size, "targetname": self.targetname, "dropout_rate": self.dropout_rate, "l1": self.l1, "l2": self.l2, "maxnorm": self.maxnorm, "input_norm_mode": self.input_normalizer.normalization_mode, "labels_norm_mode": self.labels_normalizer.normalization_mode, "batch_size": self.batch_size, "latent": self.latent_dim, } with open(self.fullfilepath + "/astroNN_model_parameter.json", "w") as f: json.dump(data, f, indent=4, sort_keys=True)
[docs] def predict(self, input_data): """ Use the neural network to do inference and get reconstructed data :param input_data: Data to be inferred with neural network :type input_data: ndarray :return: reconstructed data :rtype: ndarry :History: 2017-Dec-06 - Written - Henry Leung (University of Toronto) """ input_data = self.pre_testing_checklist_master(input_data) if self.input_normalizer is not None: input_array = self.input_normalizer.normalize(input_data, calc=False) else: # Prevent shallow copy issue input_array = np.array(input_data) input_array -= self.input_mean input_array /= self.input_std total_test_num = input_data["input"].shape[0] # Number of testing data # for number of training data smaller than batch_size if total_test_num < self.batch_size: self.batch_size = total_test_num # Due to the nature of how generator works, no overlapped prediction data_gen_shape = (total_test_num // self.batch_size) * self.batch_size remainder_shape = total_test_num - data_gen_shape # Remainder from generator predictions = np.zeros((total_test_num, self._labels_shape["output"], 1)) norm_data_main = {} norm_data_remainder = {} for name in input_array.keys(): norm_data_main.update({name: input_array[name][:data_gen_shape]}) norm_data_remainder.update({name: input_array[name][data_gen_shape:]}) norm_data_main = self._tensor_dict_sanitize( norm_data_main, self.keras_model.input_names ) norm_data_remainder = self._tensor_dict_sanitize( norm_data_remainder, self.keras_model.input_names ) # Data Generator for prediction with tqdm(total=total_test_num, unit="sample") as pbar: pbar.set_description_str("Prediction progress: ") prediction_generator = CVAEPredDataGenerator( batch_size=self.batch_size, shuffle=False, steps_per_epoch=total_test_num // self.batch_size, data=[norm_data_main], pbar=pbar, ) result = np.asarray( self.keras_model.predict(prediction_generator, verbose=0) ) if remainder_shape != 0: remainder_generator = CVAEPredDataGenerator( batch_size=remainder_shape, shuffle=False, steps_per_epoch=1, data=[norm_data_remainder], ) pbar.update(remainder_shape) remainder_result = np.asarray( self.keras_model.predict(remainder_generator, verbose=0) ) result = np.concatenate((result, remainder_result)) predictions[:] = result if self.labels_normalizer is not None: # TODO: handle named output in the future predictions[:, :, 0] = self.labels_normalizer.denormalize( list_to_dict(self.keras_model.output_names, predictions[:, :, 0]) )["output"] else: predictions[:, :, 0] *= self.labels_std predictions[:, :, 0] += self.labels_mean return predictions
[docs] def predict_encoder(self, input_data): """ Use the encoder to get the hidden layer encoding/representation :param input_data: Data to be inferred with neural network :type input_data: ndarray :return: hidden layer encoding/representation mean and std :rtype: ndarray :History: 2017-Dec-06 - Written - Henry Leung (University of Toronto) """ input_data = self.pre_testing_checklist_master(input_data) # Prevent shallow copy issue if self.input_normalizer is not None: input_array = self.input_normalizer.normalize(input_data, calc=False) else: # Prevent shallow copy issue input_array = np.array(input_data) input_array -= self.input_mean input_array /= self.input_std total_test_num = input_data["input"].shape[0] # Number of testing data # for number of training data smaller than batch_size if total_test_num < self.batch_size: self.batch_size = input_data["input"].shape[0] # Due to the nature of how generator works, no overlapped prediction data_gen_shape = (total_test_num // self.batch_size) * self.batch_size remainder_shape = total_test_num - data_gen_shape # Remainder from generator norm_data_main = {} norm_data_remainder = {} for name in input_array.keys(): norm_data_main.update({name: input_array[name][:data_gen_shape]}) norm_data_remainder.update({name: input_array[name][data_gen_shape:]}) encoding_mean = np.zeros((total_test_num, self.latent_dim)) encoding_uncertainty = np.zeros((total_test_num, self.latent_dim)) encoding = np.zeros((total_test_num, self.latent_dim)) # Data Generator for prediction with tqdm(total=total_test_num, unit="sample") as pbar: pbar.set_description_str("Prediction progress: ") prediction_generator = CVAEPredDataGenerator( batch_size=self.batch_size, shuffle=False, steps_per_epoch=total_test_num // self.batch_size, data=[norm_data_main], pbar=pbar, ) z_mean, z_log_var, z = np.asarray( self.keras_encoder.predict(prediction_generator, verbose=0) ) encoding_mean[:data_gen_shape] = z_mean encoding_uncertainty[:data_gen_shape] = np.exp(0.5 * z_log_var) encoding[:data_gen_shape] = z if remainder_shape != 0: # assume its caused by mono images, so need to expand dim by 1 for name in input_array.keys(): if len(norm_data_remainder[name][0].shape) != len( self._input_shape[name] ): norm_data_remainder.update( {name: np.expand_dims(norm_data_remainder[name], axis=-1)} ) z_mean, z_log_var, z = self.keras_encoder.predict( norm_data_remainder, verbose=0 ) pbar.update(remainder_shape) encoding_mean[data_gen_shape:] = z_mean encoding_uncertainty[data_gen_shape:] = np.exp(0.5 * z_log_var) encoding[data_gen_shape:] = z return encoding_mean, encoding_uncertainty, encoding
[docs] def predict_decoder(self, z): """ Use the decoder to get the hidden layer encoding/representation :param z: Latent space vectors :type z: ndarray :return: output reconstruction :rtype: ndarray :History: 2022-Dec-08 - Written - Henry Leung (University of Toronto) """ # TODO: need to do checklist recon = np.asarray(self.keras_decoder.predict(z, batch_size=self.batch_size)) # total_test_num = z.shape[0] # Number of testing data # prediction_generator = CVAEPredDataGenerator( # batch_size=self.batch_size, # shuffle=False, # steps_per_epoch=total_test_num // self.batch_size, # data=[{"decoder_input": z}], # key_name="decoder_input", # ) # recon = np.asarray( # self.keras_decoder.predict(prediction_generator, verbose=0) # ) recon_denorm = self.labels_normalizer.denormalize( list_to_dict(self.keras_decoder.output_names, recon) )["output"] return recon_denorm
[docs] def jacobian_latent(self, x=None, mean_output=False, mc_num=1, denormalize=False): """ | Calculate jacobian of gradient of latent space to input high performance calculation update on 15 April 2018 | | Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly | in which the equation is simply jacobian divided the input scaling, usually a good approx. if you use ReLU all the way :param x: Input Data :type x: ndarray :param mean_output: False to get all jacobian, True to get the mean :type mean_output: boolean :param mc_num: Number of monte carlo integration :type mc_num: int :param denormalize: De-normalize Jacobian :type denormalize: bool :return: An array of Jacobian :rtype: ndarray :History: | 2017-Nov-20 - Written - Henry Leung (University of Toronto) | 2018-Apr-15 - Updated - Henry Leung (University of Toronto) """ self.has_model_check() if x is None: raise ValueError("Please provide data to calculate the jacobian") if mc_num < 1 or isinstance(mc_num, float): raise ValueError("mc_num must be a positive integer") if self.input_normalizer is not None: x_data = self.input_normalizer.normalize({"input": x}, calc=False) x_data = x_data["input"] else: # Prevent shallow copy issue x_data = np.array(x) x_data -= self.input_mean x_data /= self.input_std _model = None try: input_tens = self.keras_model_predict.get_layer("input").input output_tens = self.keras_model_predict.get_layer("z_mean").output input_shape_expectation = self.keras_model_predict.get_layer( "input" ).input_shape output_shape_expectation = self.keras_model_predict.get_layer( "z_mean" ).output_shape _model = self.keras_encoder except AttributeError: input_tens = self.keras_model.get_layer("input").input output_tens = self.keras_model.get_layer("z_mean").output input_shape_expectation = self.keras_model.get_layer("input").input_shape output_shape_expectation = self.keras_model.get_layer("z_mean").output_shape _model = self.keras_encoder except ValueError: raise ValueError( "astroNN expects input layer is named as 'input' and output layer is named as 'z_mean', " "but None is found." ) if len(input_shape_expectation) == 1: input_shape_expectation = input_shape_expectation[0] # just in case only 1 data point is provided and mess up the shape issue if len(input_shape_expectation) == 3: x_data = np.atleast_3d(x_data) elif len(input_shape_expectation) == 4: if len(x_data.shape) < 4: x_data = x_data[:, :, :, np.newaxis] else: raise ValueError("Input data shape do not match neural network expectation") total_num = x_data.shape[0] input_dim = len(np.squeeze(np.ones(input_shape_expectation[1:])).shape) output_dim = len(np.squeeze(np.ones(output_shape_expectation[1:])).shape) if input_dim > 3 or output_dim > 3: raise ValueError("Unsupported data dimension") xtensor = tf.Variable(x_data) with tf.GradientTape(watch_accessed_variables=False) as tape: tape.watch(xtensor) temp = _model(xtensor)[0] start_time = time.time() jacobian = tf.squeeze(tape.batch_jacobian(temp, xtensor)) if mean_output is True: jacobian_master = tf.reduce_mean(jacobian, axis=0).numpy() else: jacobian_master = jacobian.numpy() if denormalize: if self.input_std is not None: jacobian_master = jacobian_master / np.squeeze(self.input_std) if self.labels_std is not None: try: jacobian_master = jacobian_master * self.labels_std except ValueError: jacobian_master = jacobian_master * self.labels_std.reshape(-1, 1) print( f"Finished all gradient calculation, {(time.time() - start_time):.{2}f} seconds elapsed" ) return jacobian_master
[docs] def evaluate(self, input_data, labels): """ Evaluate neural network by provided input data and labels/reconstruction target to get back a metrics score :param input_data: Data to be inferred with neural network :type input_data: ndarray :param labels: labels :type labels: ndarray :return: metrics score :rtype: float :History: 2018-May-20 - Written - Henry Leung (University of Toronto) """ self.has_model_check() input_data = {"input": input_data} labels = {"output": labels} input_data = list_to_dict(self.keras_model.input_names, input_data) labels = list_to_dict(self.keras_model.output_names, labels) # check if exists (existing means the model has already been trained (e.g. fine-tuning), so we do not need calculate mean/std again) if self.input_normalizer is None: self.input_normalizer = Normalizer( mode=self.input_norm_mode, verbose=self.verbose ) self.labels_normalizer = Normalizer( mode=self.labels_norm_mode, verbose=self.verbose ) norm_data = self.input_normalizer.normalize(input_data) self.input_mean, self.input_std = ( self.input_normalizer.mean_labels, self.input_normalizer.std_labels, ) norm_labels = self.labels_normalizer.normalize(labels) self.labels_mean, self.labels_std = ( self.labels_normalizer.mean_labels, self.labels_normalizer.std_labels, ) else: norm_data = self.input_normalizer.normalize(input_data, calc=False) norm_labels = self.labels_normalizer.normalize(labels, calc=False) norm_data = self._tensor_dict_sanitize(norm_data, self.keras_model.input_names) norm_labels = self._tensor_dict_sanitize( norm_labels, self.keras_model.output_names ) total_num = input_data["input"].shape[0] eval_batchsize = self.batch_size if total_num > self.batch_size else total_num steps = total_num // self.batch_size if total_num > self.batch_size else 1 start_time = time.time() print("Starting Evaluation") evaluate_generator = CVAEDataGenerator( batch_size=eval_batchsize, shuffle=False, steps_per_epoch=steps, data=[norm_data, norm_labels], ) scores = self.keras_model.evaluate(evaluate_generator) if isinstance(scores, float): # make sure scores is iterable scores = list(str(scores)) outputname = self.keras_model.output_names funcname = self.keras_model.metrics_names print(f"Completed Evaluation, {(time.time() - start_time):.{2}f}s elapsed") return list_to_dict(funcname, scores)
@deprecated_copy_signature(fit) def train(self, *args, **kwargs): return self.fit(*args, **kwargs) @deprecated_copy_signature(fit_on_batch) def train_on_batch(self, *args, **kwargs): return self.fit_on_batch(*args, **kwargs) @deprecated_copy_signature(predict) def test(self, *args, **kwargs): return self.predict(*args, **kwargs) @deprecated_copy_signature(predict_encoder) def test_encoder(self, *args, **kwargs): return self.predict_encoder(*args, **kwargs)