###############################################################################
# nn_base.py: top-level class for a neural network
###############################################################################
import os
import sys
import time
import warnings
import pathlib
from abc import ABC, abstractmethod
import numpy as np
import matplotlib.pyplot as plt
import keras
import astroNN
from astroNN.config import _astroNN_MODEL_NAME, cpu_gpu_reader
from astroNN.shared.nn_tools import cpu_fallback
from astroNN.shared.nn_tools import folder_runnum
from astroNN.config import (
_KERAS_BACKEND,
backend_framework,
)
epsilon, plot_model = keras.backend.epsilon, keras.utils.plot_model
[docs]
class NeuralNetBase(ABC):
"""
Top-level class for an astroNN neural network
:ivar name: Full English name
:ivar _model_type: Type of model
:ivar _model_identifier: Unique model identifier, by default using class name as ID
:ivar _implementation_version: Version of the model
:ivar _python_info: Placeholder to store python version used for debugging purpose
:ivar _astronn_ver: astroNN version detected
:ivar _keras_ver: Keras version detected
:ivar _tf_ver: Tensorflow version detected
:ivar currentdir: Current directory of the terminal
:ivar folder_name: Folder name to be saved
:ivar fullfilepath: Full file path
:ivar batch_size: Batch size for training, by default 64
:ivar autosave: Boolean to flag whether autosave model or not
:ivar task: Task
:ivar lr: Learning rate
:ivar max_epochs: Maximum epochs
:ivar val_size: Validation set size in percentage
:ivar val_num: Validation set autual number
:ivar beta_1: Exponential decay rate for the 1st moment estimates for optimization algorithm
:ivar beta_2: Exponential decay rate for the 2nd moment estimates for optimization algorithm
:ivar optimizer_epsilon: A small constant for numerical stability for optimization algorithm
:ivar optimizer: Placeholder for optimizer
:ivar targetname: Full name for every output neurones
:History:
| 2017-Dec-23 - Written - Henry Leung (University of Toronto)
| 2018-Jan-05 - Updated - Henry Leung (University of Toronto)
"""
def __init__(self):
self.name = None
self._model_type = None
self._model_identifier = self.__class__.__name__ # No effect, will do when save
self._implementation_version = None
self._python_info = sys.version
self._astronn_ver = astroNN.__version__
self._keras_ver = keras.__version__
self._tf_ver = keras.__version__
self.currentdir = os.getcwd()
self.folder_name = None
self.fullfilepath = None
self.batch_size = 64
self.autosave = False
# Hyperparameter
self.task = None
self.lr = None
self.max_epochs = None
self.val_size = None
self.has_val = False # flag if doing validation or not, if val_size > 0 then means doing validation
self.val_num = None
# optimizer parameter
self.beta_1 = 0.9 # exponential decay rate for the 1st moment estimates for optimization algorithm
self.beta_2 = 0.999 # exponential decay rate for the 2nd moment estimates for optimization algorithm
self.optimizer_epsilon = (
epsilon()
) # a small constant for numerical stability for optimization algorithm
self.optimizer = None
# Keras API
self.verbose = 2
self.keras_model = None
self.keras_model_predict = None
self.history = None
self.metrics = None
self.callbacks = None
self.__callbacks = None # for internal default callbacks usage only
self._output_loss = None
self.input_normalizer = None
self.labels_normalizer = None
self.training_generator = None
self.validation_generator = None
self.input_norm_mode = None
self.labels_norm_mode = None
self.input_mean = None
self.input_std = None
self.labels_mean = None
self.labels_std = None
self.input_names = None
self.output_names = None
self._input_shape = None
self._labels_shape = None
self.num_train = None
self.train_idx = None
self.val_idx = None
self.targetname = None
self.history = None
self.virtual_cvslogger = None
self.hyper_txt = None
fallback_cpu = cpu_gpu_reader()
if fallback_cpu is True:
cpu_fallback()
def __str__(self):
return f"Name: {self.name}\nModel Type: {self._model_type}\nModel ID: {self._model_identifier}"
@property
def has_model(self):
"""
Get whether the instance has a model, usually a model is created after you called train(), the instance
will has no model if you did not call train()
:return: bool
:History: 2018-May-21 - Written - Henry Leung (University of Toronto)
"""
if self.keras_model is None:
return False
else:
return True
def has_model_check(self):
if self.has_model is False:
raise AttributeError(
"No model found in this instance, the common problem is you did not train a model"
)
def custom_train_step(self, *args):
raise NotImplementedError
def custom_test_step(self, *args):
raise NotImplementedError
@abstractmethod
def fit(self, *args):
raise NotImplementedError
@abstractmethod
def fit_on_batch(self, *args):
raise NotImplementedError
@abstractmethod
def predict(self, *args):
raise NotImplementedError
@abstractmethod
def evaluate(self, *args):
raise NotImplementedError
@abstractmethod
def model(self):
raise NotImplementedError
@abstractmethod
def post_training_checklist_child(self):
raise NotImplementedError
def _tensor_dict_sanitize(self, tensor_dict, names_list):
"""
Remove extra tensors
:param tensor_dict: Dictionary of array or tensors
:type tensor_dict: dict
:param names_list: List of names
:type names_list: list
:return: Sanitized dict
"""
for tensor_name in [n for n in tensor_dict.keys() if n not in names_list]:
tensor_dict.pop(tensor_name)
return tensor_dict
def pre_training_checklist_master(self, input_data, labels):
# handle named inputs/outputs first
try:
self.input_names = list(input_data.keys())
# if input_data is a dict, cast all values to float32
input_data = {name: input_data[name].astype(np.float32) for name in self.input_names}
except AttributeError:
self.input_names = ["input"] # default input name in all astroNN models
input_data = {"input": input_data.astype(np.float32)}
try:
self.output_names = list(labels.keys())
# if labels is a dict, cast all values to float32
labels = {name: labels[name].astype(np.float32) for name in self.output_names}
except AttributeError:
self.output_names = ["output"] # default input name in all astroNN models
labels = {"output": labels.astype(np.float32)}
# assert all named input has the same number of data points
# TODO: add detail error msg, add test
if not all(
input_data["input"].shape[0] == input_data[name].shape[0]
for name in self.input_names
):
raise IndexError("all inputs should contain same number of data point")
if not all(
labels["output"].shape[0] == labels[name].shape[0]
for name in self.output_names
):
raise IndexError("all outputs should contain same number of data point")
if self.val_size is None:
self.val_size = 0
self.val_num = int(input_data["input"].shape[0] * self.val_size)
self.num_train = input_data["input"].shape[0] - self.val_num
self.has_val = self.val_num > 0
# Assuming the convolutional layer immediately after input layer
# only require if it is new, no need for fine-tuning
# in case you read this for dense network, use Flattener as first layer in your network to flatten it
if self._input_shape is None:
self._input_shape = {}
for name in self.input_names:
data_ndim = input_data[name].ndim
if data_ndim == 1:
self._input_shape.update(
{
name: (
1,
1,
)
}
)
elif data_ndim == 2:
self._input_shape.update(
{
name: (
input_data[name].shape[1],
1,
)
}
)
elif data_ndim == 3:
self._input_shape.update(
{
name: (
input_data[name].shape[1],
input_data[name].shape[2],
1,
)
}
)
elif data_ndim == 4:
self._input_shape.update(
{
name: (
input_data[name].shape[1],
input_data[name].shape[2],
input_data[name].shape[3],
)
}
)
# zeroth dim should always be number of data
self._labels_shape = {}
for name in self.output_names:
data_ndim = labels[name].ndim
if data_ndim == 1:
self._labels_shape.update({name: 1})
elif data_ndim == 2:
self._labels_shape.update({name: (labels[name].shape[1])})
elif data_ndim == 3:
self._labels_shape.update(
{name: (labels[name].shape[1], labels[name].shape[2])}
)
elif data_ndim == 4:
self._labels_shape.update(
{
name: (
labels[name].shape[1],
labels[name].shape[2],
labels[name].shape[3],
)
}
)
print(
f"Number of Training Data: {self.num_train}, Number of Validation Data: {self.val_num}"
)
return input_data, labels
def pre_testing_checklist_master(self, input_data):
if not isinstance(input_data, dict):
input_data = {self.input_names[0]: np.atleast_2d(input_data)}
else:
for name in input_data.keys():
input_data.update({name: np.atleast_2d(input_data[name])})
return input_data
def post_training_checklist_master(self):
pass
[docs]
def save(self, name=None, model_plot=False):
"""
Save the model to disk
:param name: Folder name/path to be saved
:type name: string or path
:param model_plot: True to plot model too
:type model_plot: boolean
:return: A saved folder on disk
"""
self.has_model_check()
# Only generate a folder automatically if no name provided
if self.folder_name is None and name is None:
self.folder_name = folder_runnum()
elif name is not None:
self.folder_name = name
self.folder_name = pathlib.Path(self.folder_name).absolute()
# if foldername provided, then create a directory, if exist append something to avoid overwrite
if not self.folder_name.exists():
os.makedirs(self.folder_name)
else:
i_back = 2
while True:
if not self.folder_name.with_name(
self.folder_name.stem + f"_{i_back}"
).exists():
break
i_back += 1
new_folder_name_temp = self.folder_name.with_name(
self.folder_name.stem + f"_{i_back}"
)
warnings.warn(
f"To prevent your model being overwritten, your folder name changed from {self.folder_name} "
f"to {new_folder_name_temp}",
UserWarning,
)
self.folder_name = new_folder_name_temp
os.makedirs(self.folder_name)
self.fullfilepath = str(self.folder_name) + pathlib.os.sep
txt_file_path = pathlib.Path.joinpath(self.folder_name, "hyperparameter.txt")
if os.path.isfile(txt_file_path):
self.hyper_txt = open(txt_file_path, "a")
self.hyper_txt.write("\n")
self.hyper_txt.write("======Another Run======")
else:
self.hyper_txt = open(txt_file_path, "w")
self.hyper_txt.write(f"Model: {self.name} \n")
self.hyper_txt.write(f"Model Type: {self._model_type} \n")
self.hyper_txt.write(f"astroNN identifier: {self._model_identifier} \n")
self.hyper_txt.write(f"Python Version: {self._python_info} \n")
self.hyper_txt.write(f"astroNN Version: {self._astronn_ver} \n")
self.hyper_txt.write(f"Keras Version: {self._keras_ver} \n")
self.hyper_txt.write(f"Tensorflow Version: {self._tf_ver} \n")
self.hyper_txt.write(f"Folder Name: {self.folder_name.name} \n")
self.hyper_txt.write(f"Batch size: {self.batch_size} \n")
self.hyper_txt.write(f"Optimizer: {self.optimizer.__class__.__name__} \n")
self.hyper_txt.write(f"Maximum Epochs: {self.max_epochs} \n")
self.hyper_txt.write(f"Learning Rate: {self.lr} \n")
self.hyper_txt.write(f"Validation Size: {self.val_size} \n")
self.hyper_txt.write(f"Input Shape: {self._input_shape} \n")
self.hyper_txt.write(f"Label Shape: {self._labels_shape} \n")
self.hyper_txt.write(f"Number of Training Data: {self.num_train} \n")
self.hyper_txt.write(f"Number of Validation Data: {self.val_num} \n")
if model_plot is True:
self.plot_model()
self.post_training_checklist_child()
if (
self.virtual_cvslogger is not None
): # in case you save without training, so cvslogger is None
self.virtual_cvslogger.savefile(folder_name=self.folder_name)
[docs]
def plot_model(
self, name="model.png", show_shapes=True, show_layer_names=True, rankdir="TB"
):
"""
Plot model architecture with pydot and graphviz
:param name: file name to be saved with extension, .png is recommended
:type name: str
:param show_shapes: whether show shape in model plot
:type show_shapes: bool
:param show_layer_names: whether to display layer names
:type show_layer_names: bool
:param rankdir: a string specifying the format of the plot, 'TB' for vertical or 'LR' for horizontal plot
:type rankdir: bool
:return: No return but will save the model architecture as png to disk
"""
self.has_model_check()
try:
if self.fullfilepath is not None:
plot_model(
self.keras_model,
show_shapes=show_shapes,
to_file=os.path.join(self.fullfilepath, name),
show_layer_names=show_layer_names,
rankdir=rankdir,
)
else:
plot_model(
self.keras_model,
show_shapes=show_shapes,
to_file=name,
show_layer_names=show_layer_names,
rankdir=rankdir,
)
except ImportError or ModuleNotFoundError:
warnings.warn(
"Skipped plot_model! graphviz and pydot_ng are required to plot the model architecture",
UserWarning,
)
pass
[docs]
def jacobian(self, x=None, mean_output=False, mc_num=1, denormalize=False):
"""
| Calculate jacobian of gradient of output to input high performance calculation update on 15 April 2018
|
| Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly
| in which the equation is simply jacobian divided the input scaling, usually a good approx. if you use ReLU all the way
:param x: Input Data
:type x: ndarray
:param mean_output: False to get all jacobian, True to get the mean
:type mean_output: boolean
:param mc_num: Number of monte carlo integration
:type mc_num: int
:param denormalize: De-normalize Jacobian
:type denormalize: bool
:return: An array of Jacobian
:rtype: ndarray
:History:
| 2017-Nov-20 - Written - Henry Leung (University of Toronto)
| 2018-Apr-15 - Updated - Henry Leung (University of Toronto)
"""
self.has_model_check()
if x is None:
raise ValueError("Please provide data to calculate the jacobian")
if mc_num < 1 or isinstance(mc_num, float):
raise ValueError("mc_num must be a positive integer")
if self.input_normalizer is not None:
x_data = self.input_normalizer.normalize({"input": x}, calc=False)
x_data = x_data["input"]
else:
# Prevent shallow copy issue
x_data = np.array(x)
x_data -= self.input_mean
x_data /= self.input_std
_model = None
try:
input_shape_expectation = self.keras_model_predict.get_layer(
"input"
).output.shape
output_shape_expectation = self.keras_model_predict.get_layer(
"output"
).output.shape
_model = self.keras_model_predict
except AttributeError:
input_shape_expectation = self.keras_model.input_shape
output_shape_expectation = self.keras_model.get_layer("output").output.shape
_model = self.keras_model
except ValueError:
raise ValueError(
"astroNN expects input layer is named as 'input' and output layer is named as 'output', "
"but None is found."
)
if len(input_shape_expectation) == 1:
input_shape_expectation = input_shape_expectation[0]
# just in case only 1 data point is provided and mess up the shape issue
# if len(input_shape_expectation) == 2:
# x_data = np.atleast_3d(x_data)
# elif len(input_shape_expectation) == 4:
# if len(x_data.shape) < 4:
# x_data = x_data[:, :, :, np.newaxis]
# else:
# raise ValueError(f"Input data shape {x_data.shape} do not match neural network expectation {len(input_shape_expectation)}-d")
total_num = x_data.shape[0]
# input_dim = len(np.squeeze(np.ones(input_shape_expectation[1:])).shape)
# output_dim = len(np.squeeze(np.ones(output_shape_expectation[1:])).shape)
# if input_dim > 3 or output_dim > 3:
# raise ValueError("Unsupported data dimension")
start_time = time.time()
if _KERAS_BACKEND == "tensorflow":
xtensor = backend_framework.Variable(x_data)
with backend_framework.GradientTape(watch_accessed_variables=False) as tape:
tape.watch(xtensor)
temp = _model(xtensor)
if isinstance(temp, dict):
temp = temp["output"]
jacobian = tape.batch_jacobian(temp, xtensor)
elif _KERAS_BACKEND == "torch":
# add new axis for vmap
xtensor = backend_framework.tensor(x_data, requires_grad=True)[:, None, ...]
jacobian = backend_framework.vmap(backend_framework.func.jacrev(_model), randomness="different")(xtensor)
else:
raise ValueError("Only Tensorflow and PyTorch backend is supported")
if isinstance(jacobian, dict):
jacobian = jacobian["output"]
jacobian = keras.ops.squeeze(jacobian)
if mean_output is True:
jacobian_master = keras.ops.convert_to_numpy(
keras.ops.mean(jacobian, axis=0)
)
else:
jacobian_master = keras.ops.convert_to_numpy(jacobian)
if denormalize:
if self.input_std is not None:
jacobian_master = jacobian_master / np.squeeze(self.input_std)
if self.labels_std is not None:
try:
jacobian_master = jacobian_master * self.labels_std
except ValueError:
jacobian_master = jacobian_master * self.labels_std.reshape(-1, 1)
print(
f"Finished all gradient calculation, {(time.time() - start_time):.{2}f} seconds elapsed"
)
return jacobian_master
[docs]
def plot_dense_stats(self):
"""
Plot dense layers weight statistics
:return: A plot
:History: 2018-May-12 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
dense_list = []
for counter, layer in enumerate(self.keras_model.layers):
if isinstance(layer, keras.layers.Dense):
dense_list.append(counter)
denses = np.array(self.keras_model.layers)[dense_list]
fig, ax = plt.subplots(1, figsize=(15, 10), dpi=100)
for counter, dense in enumerate(denses):
weight_temp = np.array(dense.get_weights()[0].flatten())
ax.hist(
weight_temp,
200,
density=True,
range=(-2.0, 2.0),
alpha=0.7,
label=f"Dense Layer {counter}, max: {weight_temp.max():.{2}f}, min: {weight_temp.min():.{2}f}, "
f"mean: {weight_temp.mean():.{2}f}, std: {weight_temp.std():.{2}f}",
)
fig.suptitle(
f"Dense Layers Weight Statistics of {self.folder_name}", fontsize=17
)
ax.set_xlabel("Weights", fontsize=17)
ax.set_ylabel("Normalized Distribution", fontsize=17)
ax.minorticks_on()
ax.tick_params(labelsize=15, width=3, length=10, which="major")
ax.tick_params(width=1.5, length=5, which="minor")
ax.legend(loc="best", fontsize=15)
fig.tight_layout(rect=[0, 0.00, 1, 0.96])
fig.show()
return fig
[docs]
def get_weights(self):
"""
Get all model weights
:return: weights arrays
:rtype: ndarray
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.get_weights()
[docs]
def summary(self):
"""
Get model summary
:return: None, just print
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.summary()
[docs]
def get_config(self):
"""
Get model configuration as a dictionary
:return: dict
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.get_config()
[docs]
def save_weights(self, filename=_astroNN_MODEL_NAME, overwrite=True):
"""
Save model weights as .h5
:param filename: Filename of .h5 to be saved
:type filename: str
:param overwrite: whether to overwrite
:type overwrite: bool
:return: None, a .h5 file will be saved
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
print("==========================")
print(
"This is a remainder that saving weights to h5, you might have difficult to "
"load it back and cannot be used with astroNN probably"
)
print("==========================")
if self.fullfilepath is not None:
return self.keras_model.save_weights(
str(os.path.join(self.fullfilepath, filename)), overwrite=overwrite
)
else:
return self.keras_model.save_weights(filename, overwrite=overwrite)
[docs]
def get_layer(self, *args, **kwargs):
"""
get_layer() method of Keras
"""
return self.keras_model.get_layer(*args, **kwargs)
[docs]
def transfer_weights(self, model, exclusion_output=False):
"""
Transfer weight of a model to current model if possible
# TODO: remove layers after successful transfer so wont mix up?
:param model: astroNN model
:type model: astroNN.model.NeuralNetBase or keras.models.Model
:param exclusion_output: whether to exclude output in the transfer or not
:type exclusion_output: bool
:return: bool
:History: 2022-Mar-06 - Written - Henry Leung (University of Toronto)
"""
if hasattr(
model, "keras_model"
): # check if its an astroNN model or keras model
model = model.keras_model
counter = 0 # count number of weights transferred
transferred = [] # keep track of transferred layer names
total_parameters_A = self.keras_model.count_params()
total_parameters_B = model.count_params()
current_bottom_idx = 0 # current bottom layer we are checking to prevent incorrect transfer of convolution layer weights
for new_l in self.keras_model.layers:
for idx, l in enumerate(model.layers[current_bottom_idx:]):
if "input" not in l.name and "input" not in new_l.name: # no need to do
try:
if ("output" not in l.name or not exclusion_output) and len(
new_l.get_weights()
) != 0:
new_l.set_weights(l.get_weights())
new_l.trainable = False
for i in l.get_weights():
counter += len(keras.ops.reshape(i, [-1]))
transferred.append(l.name)
current_bottom_idx += idx
break
except ValueError:
pass
if counter == 0:
warnings.warn(
"None of the layers' weights are successfully transfered due to shape incompatibility in all layers."
)
else:
self.recompile()
print(f"Successfully transferred: {transferred}")
print(
f"Transferred {counter} of {total_parameters_B} weights ({100*counter/total_parameters_B:.2f}%) to a new model with {total_parameters_A} weights."
)