###############################################################################
# base_master_nn.py: top-level class for a neural network
###############################################################################
import os
import sys
import time
from abc import ABC, abstractmethod
import numpy as np
import pylab as plt
import tensorflow as tf
import astroNN
from astroNN.config import keras_import_manager, cpu_gpu_check
from astroNN.shared.custom_warnings import deprecated
from astroNN.shared.nn_tools import folder_runnum
from astroNN.config import _astroNN_MODEL_NAME
keras = keras_import_manager()
get_session, epsilon, plot_model = keras.backend.get_session, keras.backend.epsilon, keras.utils.plot_model
[docs]class NeuralNetMaster(ABC):
"""
Top-level class for an astroNN neural network
:ivar name: Full English name
:ivar _model_type: Type of model
:ivar _model_identifier: Unique model identifier, by default using class name as ID
:ivar _implementation_version: Version of the model
:ivar _python_info: Placeholder to store python version used for debugging purpose
:ivar _astronn_ver: astroNN version detected
:ivar _keras_ver: Keras version detected
:ivar _tf_ver: Tensorflow version detected
:ivar currentdir: Current directory of the terminal
:ivar folder_name: Folder name to be saved
:ivar fullfilepath: Full file path
:ivar batch_size: Batch size for training, by default 64
:ivar autosave: Boolean to flag whether autosave model or not
:ivar task: Task
:ivar lr: Learning rate
:ivar max_epochs: Maximum epochs
:ivar val_size: Validation set size in percentage
:ivar val_num: Validation set autual number
:ivar beta_1: Exponential decay rate for the 1st moment estimates for optimization algorithm
:ivar beta_2: Exponential decay rate for the 2nd moment estimates for optimization algorithm
:ivar optimizer_epsilon: A small constant for numerical stability for optimization algorithm
:ivar optimizer: Placeholder for optimizer
:ivar targetname: Full name for every output neurones
:History:
| 2017-Dec-23 - Written - Henry Leung (University of Toronto)
| 2018-Jan-05 - Updated - Henry Leung (University of Toronto)
"""
def __init__(self):
self.name = None
self._model_type = None
self._model_identifier = self.__class__.__name__ # No effect, will do when save
self._implementation_version = None
self._python_info = sys.version
self._astronn_ver = astroNN.__version__
self._keras_ver = keras.__version__ # Even using tensorflow.keras, this line will still be fine
self._tf_ver = tf.VERSION
self.currentdir = os.getcwd()
self.folder_name = None
self.fullfilepath = None
self.batch_size = 64
self.autosave = False
# Hyperparameter
self.task = None
self.lr = None
self.max_epochs = None
self.val_size = None
self.val_num = None
# optimizer parameter
self.beta_1 = 0.9 # exponential decay rate for the 1st moment estimates for optimization algorithm
self.beta_2 = 0.999 # exponential decay rate for the 2nd moment estimates for optimization algorithm
self.optimizer_epsilon = epsilon() # a small constant for numerical stability for optimization algorithm
self.optimizer = None
# Keras API
self.verbose = 2
self.keras_model = None
self.keras_model_predict = None
self.history = None
self.metrics = None
self.callbacks = None
self.__callbacks = None # for internal default callbacks usage only
self.input_normalizer = None
self.labels_normalizer = None
self.training_generator = None
self.validation_generator = None
self.input_norm_mode = None
self.labels_norm_mode = None
self.input_mean = None
self.input_std = None
self.labels_mean = None
self.labels_std = None
self._input_shape = None
self._labels_shape = None
self.num_train = None
self.train_idx = None
self.val_idx = None
self.targetname = None
self.history = None
self.virtual_cvslogger = None
self.hyper_txt = None
self.session = None
self.graph = None
cpu_gpu_check()
def __str__(self):
return f"Name: {self.name}\nModel Type: {self._model_type}\nModel ID: {self._model_identifier}"
@property
def has_model(self):
"""
Get whether the instance has a model, usually a model is created after you called train(), the instance
will has no model if you did not call train()
:return: bool
:History: 2018-May-21 - Written - Henry Leung (University of Toronto)
"""
if self.keras_model is None:
return False
else:
return True
def has_model_check(self):
if self.has_model is False:
raise AttributeError("No model found in this instance, the common problem is you did not train a model")
@abstractmethod
def train(self, *args):
raise NotImplementedError
@abstractmethod
def train_on_batch(self, *args):
raise NotImplementedError
@abstractmethod
def test(self, *args):
raise NotImplementedError
@abstractmethod
def evaluate(self, *args):
raise NotImplementedError
@abstractmethod
def model(self):
raise NotImplementedError
@abstractmethod
def post_training_checklist_child(self):
raise NotImplementedError
def pre_training_checklist_master(self, input_data, labels):
if self.val_size is None:
self.val_size = 0
self.val_num = int(input_data.shape[0] * self.val_size)
self.num_train = input_data.shape[0] - self.val_num
# Assuming the convolutional layer immediately after input layer
# only require if it is new, no need for fine-tuning
# in case you read this for dense network, use Flattener as first layer in your network to flatten it
if self._input_shape is None:
if input_data.ndim == 1:
self._input_shape = (1, 1,)
elif input_data.ndim == 2:
self._input_shape = (input_data.shape[1], 1,)
elif input_data.ndim == 3:
self._input_shape = (input_data.shape[1], input_data.shape[2], 1,)
elif input_data.ndim == 4:
self._input_shape = (input_data.shape[1], input_data.shape[2], input_data.shape[3],)
# zeroth dim should always be number of data
if labels.ndim == 1:
self._labels_shape = 1
elif labels.ndim == 2:
self._labels_shape = (labels.shape[1])
elif labels.ndim == 3:
self._labels_shape = (labels.shape[1], labels.shape[2])
elif labels.ndim == 4:
self._labels_shape = (labels.shape[1], labels.shape[2], labels.shape[3])
print(f'Number of Training Data: {self.num_train}, Number of Validation Data: {self.val_num}')
def pre_testing_checklist_master(self):
pass
def post_training_checklist_master(self):
pass
[docs] def save(self, name=None, model_plot=False):
"""
Save the model to disk
:param name: Folder name to be saved
:type name: string
:param model_plot: True to plot model too
:type model_plot: boolean
:return: A saved folder on disk
"""
self.has_model_check()
# Only generate a folder automatically if no name provided
if self.folder_name is None and name is None:
self.folder_name = folder_runnum()
elif name is not None:
self.folder_name = name
# if foldername provided, then create a directory, if exist append something to avoid overwrite
if not os.path.exists(os.path.join(self.currentdir, self.folder_name)):
os.makedirs(os.path.join(self.currentdir, self.folder_name))
else:
i_back = 2
while True:
if not os.path.exists(os.path.join(self.currentdir, self.folder_name + f'_{i_back}')):
break
i_back += 1
new_folder_name_temp = self.folder_name + f'_{i_back}'
print(f'To prevent your model being overwritten, your folder name changed from {self.folder_name} '
f'to {new_folder_name_temp}')
self.folder_name = new_folder_name_temp
os.makedirs(os.path.join(self.currentdir, self.folder_name))
self.fullfilepath = os.path.join(self.currentdir, self.folder_name + os.sep)
txt_file_path = self.fullfilepath + 'hyperparameter.txt'
if os.path.isfile(txt_file_path):
self.hyper_txt = open(txt_file_path, 'a')
self.hyper_txt.write("\n")
self.hyper_txt.write("======Another Run======")
else:
self.hyper_txt = open(txt_file_path, 'w')
self.hyper_txt.write(f"Model: {self.name} \n")
self.hyper_txt.write(f"Model Type: {self._model_type} \n")
self.hyper_txt.write(f"astroNN identifier: {self._model_identifier} \n")
self.hyper_txt.write(f"Python Version: {self._python_info} \n")
self.hyper_txt.write(f"astroNN Version: {self._astronn_ver} \n")
self.hyper_txt.write(f"Keras Version: {self._keras_ver} \n")
self.hyper_txt.write(f"Tensorflow Version: {self._tf_ver} \n")
self.hyper_txt.write(f"Folder Name: {self.folder_name} \n")
self.hyper_txt.write(f"Batch size: {self.batch_size} \n")
self.hyper_txt.write(f"Optimizer: {self.optimizer.__class__.__name__} \n")
self.hyper_txt.write(f"Maximum Epochs: {self.max_epochs} \n")
self.hyper_txt.write(f"Learning Rate: {self.lr} \n")
self.hyper_txt.write(f"Validation Size: {self.val_size} \n")
self.hyper_txt.write(f"Input Shape: {self._input_shape} \n")
self.hyper_txt.write(f"Label Shape: {self._labels_shape} \n")
self.hyper_txt.write(f"Number of Training Data: {self.num_train} \n")
self.hyper_txt.write(f"Number of Validation Data: {self.val_num} \n")
if model_plot is True:
self.plot_model()
self.post_training_checklist_child()
if self.virtual_cvslogger is not None: # in case you save without training, so cvslogger is None
self.virtual_cvslogger.savefile(folder_name=self.folder_name)
[docs] def plot_model(self, name='model.png', show_shapes=True, show_layer_names=True, rankdir='TB'):
"""
Plot model architecture with pydot and graphviz
:param name: file name to be saved with extension, .png is recommended
:type name: str
:param show_shapes: whether show shape in model plot
:type show_shapes: bool
:param show_layer_names: whether to display layer names
:type show_layer_names: bool
:param rankdir: a string specifying the format of the plot, 'TB' for vertical or 'LR' for horizontal plot
:type rankdir: bool
:return: No return but will save the model architecture as png to disk
"""
self.has_model_check()
print()
try:
if self.fullfilepath is not None:
plot_model(self.keras_model, show_shapes=show_shapes, to_file=os.path.join(self.fullfilepath, name),
show_layer_names=show_layer_names, rankdir=rankdir)
else:
plot_model(self.keras_model, show_shapes=show_shapes, to_file=name, show_layer_names=show_layer_names,
rankdir=rankdir)
except ImportError or ModuleNotFoundError:
print('Skipped plot_model! graphviz and pydot_ng are required to plot the model architecture')
pass
[docs] def hessian(self, x=None, mean_output=False, mc_num=1, denormalize=False, method='exact'):
"""
| Calculate the hessian of output to input
|
| Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly
| in which the hessians does not depends on input scaling and only depends on output scaling
|
| The hessians can be all zeros and the common cause is you did not use any activation or
| activation that is still too linear in some sense like ReLU.
:param x: Input Data
:type x: ndarray
:param mean_output: False to get all hessian, True to get the mean
:type mean_output: boolean
:param mc_num: Number of monte carlo integration
:type mc_num: int
:param denormalize: De-normalize diagonal part of Hessian
:type denormalize: bool
:param method: Either 'exact' to calculate numerical Hessian or 'approx' to approximate Hessian from Jacobian
:type method: str
:return: An array of Hessian
:rtype: ndarray
:History: 2018-Jun-14 - Written - Henry Leung (University of Toronto)
"""
if not mean_output:
print('only mean output is supported at this moment')
mean_output = True
if method == 'approx':
all_args = locals()
# remove unnecessary argument
all_args.pop('self')
all_args.pop('method')
jacobian = self.jacobian(**all_args)
hessians_master = np.stack([np.dot(jacobian[x_shape:x_shape + 1].T, jacobian[x_shape:x_shape + 1])
for x_shape in range(jacobian.shape[0])], axis=0)
return hessians_master
elif method == 'exact':
self.has_model_check()
if x is None:
raise ValueError('Please provide data to calculate the jacobian')
if mc_num < 1 or isinstance(mc_num, float):
raise ValueError('mc_num must be a positive integer')
if self.input_normalizer is not None:
x_data = self.input_normalizer.normalize(x, calc=False)
else:
# Prevent shallow copy issue
x_data = np.array(x)
x_data -= self.input_mean
x_data /= self.input_std
try:
input_tens = self.keras_model_predict.get_layer("input").input
output_tens = self.keras_model_predict.get_layer("output").output
input_shape_expectation = self.keras_model_predict.get_layer("input").input_shape
output_shape_expectation = self.keras_model_predict.get_layer("output").output_shape
except AttributeError:
input_tens = self.keras_model.get_layer("input").input
output_tens = self.keras_model.get_layer("output").output
input_shape_expectation = self.keras_model.get_layer("input").input_shape
output_shape_expectation = self.keras_model.get_layer("output").output_shape
except ValueError:
raise ValueError(
"astroNN expects input layer is named as 'input' and output layer is named as 'output', "
"but None is found.")
# just in case only 1 data point is provided and mess up the shape issue
if len(input_shape_expectation) == 3:
x_data = np.atleast_3d(x_data)
elif len(input_shape_expectation) == 4:
if len(x_data.shape) < 4:
x_data = x_data[:, :, :, np.newaxis]
else:
raise ValueError('Input data shape do not match neural network expectation')
total_num = x_data.shape[0]
hessians_list = []
for j in range(self._labels_shape):
hessians_list.append(tf.hessians(output_tens[:, j], input_tens))
final_stack = tf.stack(tf.squeeze(hessians_list))
# Looping variables for tensorflow setup
i = tf.constant(0)
mc_num_tf = tf.constant(mc_num)
# To store final result
l = tf.TensorArray(dtype=tf.float32, infer_shape=False, size=1, dynamic_size=True)
def body(i, l):
l = l.write(i, final_stack)
return i + 1, l
tf_index, loop = tf.while_loop(lambda i, *_: tf.less(i, mc_num_tf), body, [i, l])
loops = tf.cond(tf.greater(mc_num_tf, 1), lambda: tf.reduce_mean(loop.stack(), axis=0),
lambda: loop.stack())
start_time = time.time()
hessians = np.concatenate(
[get_session().run(loops, feed_dict={input_tens: x_data[i:i + 1], keras.backend.learning_phase(): 0})
for i
in range(0, total_num)], axis=0)
if np.all(hessians == 0.): # warn user about not so linear activation like ReLU will get all zeros
print(
'The hessians is detected to be all zeros. The common cause is you did not use any activation or '
'activation that is still too linear in some sense like ReLU.')
if mean_output is True:
hessians_master = np.mean(hessians, axis=0)
else:
hessians_master = np.array(hessians)
hessians_master = np.squeeze(hessians_master)
if denormalize: # no need to denorm input scaling because of we assume first order dependence
if self.labels_std is not None:
try:
hessians_master = hessians_master * self.labels_std
except ValueError:
hessians_master = hessians_master * self.labels_std.reshape(-1, 1)
print(f'Finished hessian ({method}) calculation, {(time.time() - start_time):.{2}f} seconds elapsed')
return hessians_master
else:
raise ValueError(f'Unknown method -> {method}')
[docs] def hessian_diag(self, x=None, mean_output=False, mc_num=1, denormalize=False):
"""
| Calculate the diagonal part of hessian of output to input, avoids the calculation of the whole hessian and takes its diagonal
|
| Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly
| in which the diagonal part of the hessians does not depends on input scaling and only depends on output scaling
|
| The diagonal part of the hessians can be all zeros and the common cause is you did not use
| any activation or activation that is still too linear in some sense like ReLU.
:param x: Input Data
:type x: ndarray
:param mean_output: False to get all hessian, True to get the mean
:type mean_output: boolean
:param mc_num: Number of monte carlo integration
:type mc_num: int
:param denormalize: De-normalize diagonal part of Hessian
:type denormalize: bool
:return: An array of Hessian
:rtype: ndarray
:History: 2018-Jun-13 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
if x is None:
raise ValueError('Please provide data to calculate the jacobian')
if mc_num < 1 or isinstance(mc_num, float):
raise ValueError('mc_num must be a positive integer')
if self.input_normalizer is not None:
x_data = self.input_normalizer.normalize(x, calc=False)
else:
# Prevent shallow copy issue
x_data = np.array(x)
x_data -= self.input_mean
x_data /= self.input_std
try:
input_tens = self.keras_model_predict.get_layer("input").input
output_tens = self.keras_model_predict.get_layer("output").output
input_shape_expectation = self.keras_model_predict.get_layer("input").input_shape
output_shape_expectation = self.keras_model_predict.get_layer("output").output_shape
except AttributeError:
input_tens = self.keras_model.get_layer("input").input
output_tens = self.keras_model.get_layer("output").output
input_shape_expectation = self.keras_model.get_layer("input").input_shape
output_shape_expectation = self.keras_model.get_layer("output").output_shape
except ValueError:
raise ValueError("astroNN expects input layer is named as 'input' and output layer is named as 'output', "
"but None is found.")
# just in case only 1 data point is provided and mess up the shape issue
if len(input_shape_expectation) == 3:
x_data = np.atleast_3d(x_data)
elif len(input_shape_expectation) == 4:
if len(x_data.shape) < 4:
x_data = x_data[:, :, :, np.newaxis]
else:
raise ValueError('Input data shape do not match neural network expectation')
total_num = x_data.shape[0]
hessians_diag_list = []
for j in range(self._labels_shape):
hessians_diag_list.append(tf.gradients(tf.gradients(output_tens[:, j], input_tens), input_tens))
final_stack = tf.stack(tf.squeeze(hessians_diag_list))
# Looping variables for tensorflow setup
i = tf.constant(0)
mc_num_tf = tf.constant(mc_num)
# To store final result
l = tf.TensorArray(dtype=tf.float32, infer_shape=False, size=1, dynamic_size=True)
def body(i, l):
l = l.write(i, final_stack)
return i + 1, l
tf_index, loop = tf.while_loop(lambda i, *_: tf.less(i, mc_num_tf), body, [i, l])
loops = tf.cond(tf.greater(mc_num_tf, 1), lambda: tf.reduce_mean(loop.stack(), axis=0), lambda: loop.stack())
start_time = time.time()
hessians_diag = np.concatenate(
[get_session().run(loops, feed_dict={input_tens: x_data[i:i + 1], keras.backend.learning_phase(): 0}) for i
in range(0, total_num)], axis=0)
if np.all(hessians_diag == 0.): # warn user about not so linear activation like ReLU will get all zeros
print('The diagonal part of the hessians is detected to be all zeros. The common cause is you did not use '
'any activation or activation that is still too linear in some sense like ReLU.')
if mean_output is True:
hessians_diag_master = np.mean(hessians_diag, axis=0)
else:
hessians_diag_master = np.array(hessians_diag)
hessians_diag_master = np.squeeze(hessians_diag_master)
if denormalize: # no need to denorm input scaling because of we assume first order dependence
if self.labels_std is not None:
try:
hessians_diag_master = hessians_diag_master * self.labels_std
except ValueError:
hessians_diag_master = hessians_diag_master * self.labels_std.reshape(-1, 1)
print(f'Finished diagonal hessian calculation, {(time.time() - start_time):.{2}f} seconds elapsed')
return hessians_diag_master
[docs] def jacobian(self, x=None, mean_output=False, mc_num=1, denormalize=False):
"""
| Calculate jacobian of gradient of output to input high performance calculation update on 15 April 2018
|
| Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly
| in which the equation is simply jacobian divided the input scaling, usually a good approx. if you use ReLU all the way
:param x: Input Data
:type x: ndarray
:param mean_output: False to get all jacobian, True to get the mean
:type mean_output: boolean
:param mc_num: Number of monte carlo integration
:type mc_num: int
:param denormalize: De-normalize Jacobian
:type denormalize: bool
:return: An array of Jacobian
:rtype: ndarray
:History:
| 2017-Nov-20 - Written - Henry Leung (University of Toronto)
| 2018-Apr-15 - Updated - Henry Leung (University of Toronto)
"""
self.has_model_check()
if x is None:
raise ValueError('Please provide data to calculate the jacobian')
if mc_num < 1 or isinstance(mc_num, float):
raise ValueError('mc_num must be a positive integer')
if self.input_normalizer is not None:
x_data = self.input_normalizer.normalize(x, calc=False)
else:
# Prevent shallow copy issue
x_data = np.array(x)
x_data -= self.input_mean
x_data /= self.input_std
try:
input_tens = self.keras_model_predict.get_layer("input").input
output_tens = self.keras_model_predict.get_layer("output").output
input_shape_expectation = self.keras_model_predict.get_layer("input").input_shape
output_shape_expectation = self.keras_model_predict.get_layer("output").output_shape
except AttributeError:
input_tens = self.keras_model.get_layer("input").input
output_tens = self.keras_model.get_layer("output").output
input_shape_expectation = self.keras_model.get_layer("input").input_shape
output_shape_expectation = self.keras_model.get_layer("output").output_shape
except ValueError:
raise ValueError("astroNN expects input layer is named as 'input' and output layer is named as 'output', "
"but None is found.")
# just in case only 1 data point is provided and mess up the shape issue
if len(input_shape_expectation) == 3:
x_data = np.atleast_3d(x_data)
elif len(input_shape_expectation) == 4:
if len(x_data.shape) < 4:
x_data = x_data[:, :, :, np.newaxis]
else:
raise ValueError('Input data shape do not match neural network expectation')
total_num = x_data.shape[0]
grad_list = []
for j in range(self._labels_shape):
grad_list.append(tf.gradients(output_tens[:, j], input_tens))
final_stack = tf.stack(tf.squeeze(grad_list))
# Looping variables for tensorflow setup
i = tf.constant(0)
mc_num_tf = tf.constant(mc_num)
# To store final result
l = tf.TensorArray(dtype=tf.float32, infer_shape=False, size=1, dynamic_size=True)
def body(i, l):
l = l.write(i, final_stack)
return i + 1, l
tf_index, loop = tf.while_loop(lambda i, *_: tf.less(i, mc_num_tf), body, [i, l])
loops = tf.cond(tf.greater(mc_num_tf, 1), lambda: tf.reduce_mean(loop.stack(), axis=0), lambda: loop.stack())
loops = tf.reshape(loops,
shape=[tf.shape(input_tens)[0], *output_shape_expectation[1:], *input_shape_expectation[1:]])
start_time = time.time()
jacobian = np.concatenate(
[get_session().run(loops, feed_dict={input_tens: x_data[i:i + 1], keras.backend.learning_phase(): 0}) for i
in range(0, total_num)], axis=0)
if mean_output is True:
jacobian_master = np.mean(jacobian, axis=0)
else:
jacobian_master = np.array(jacobian)
jacobian_master = np.squeeze(jacobian_master)
if denormalize:
if self.input_std is not None:
jacobian_master = jacobian_master / np.squeeze(self.input_std)
if self.labels_std is not None:
try:
jacobian_master = jacobian_master * self.labels_std
except ValueError:
jacobian_master = jacobian_master * self.labels_std.reshape(-1, 1)
print(f'Finished all gradient calculation, {(time.time() - start_time):.{2}f} seconds elapsed')
return jacobian_master
[docs] @deprecated
def jacobian_old(self, x=None, mean_output=False, denormalize=False):
"""
| Calculate jacobian of gradient of output to input
|
| Please notice that the de-normalize (if True) assumes the output depends on the input data first orderly
| in which the equation is simply jacobian divided the input scaling
:param x: Input Data
:type x: ndarray
:param mean_output: False to get all jacobian, True to get the mean
:type mean_output: boolean
:param denormalize: De-normalize Jacobian
:type denormalize: bool
:History: 2017-Nov-20 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
if x is None:
raise ValueError('Please provide data to calculate the jacobian')
if self.input_normalizer is not None:
x_data = self.input_normalizer.normalize(x, calc=False)
else:
# Prevent shallow copy issue
x_data = np.array(x)
x_data -= self.input_mean
x_data /= self.input_std
try:
input_tens = self.keras_model_predict.get_layer("input").input
output_tens = self.keras_model_predict.get_layer("output").output
input_shape_expectation = self.keras_model_predict.get_layer("input").input_shape
except AttributeError:
input_tens = self.keras_model.get_layer("input").input
output_tens = self.keras_model.get_layer("output").output
input_shape_expectation = self.keras_model.get_layer("input").input_shape
start_time = time.time()
if len(input_shape_expectation) == 3:
x_data = np.atleast_3d(x_data)
grad_list = []
for j in range(self._labels_shape):
grad_list.append(tf.gradients(output_tens[0, j], input_tens))
final_stack = tf.stack(tf.squeeze(grad_list))
jacobian = np.ones((x_data.shape[0], self._labels_shape, x_data.shape[1]), dtype=np.float32)
for i in range(x_data.shape[0]):
x_in = x_data[i:i + 1]
jacobian[i, :, :] = get_session().run(final_stack, feed_dict={input_tens: x_in,
keras.backend.learning_phase(): 0})
elif len(input_shape_expectation) == 4:
monoflag = False
if len(x_data.shape) < 4:
monoflag = True
x_data = x_data[:, :, :, np.newaxis]
jacobian = np.ones((x_data.shape[0], self._labels_shape, x_data.shape[1], x_data.shape[2], x_data.shape[3]),
dtype=np.float32)
grad_list = []
for j in range(self._labels_shape):
grad_list.append(tf.gradients(self.keras_model.get_layer("output").output[0, j], input_tens))
final_stack = tf.stack(tf.squeeze(grad_list))
for i in range(x_data.shape[0]):
x_in = x_data[i:i + 1]
if monoflag is False:
jacobian[i, :, :, :, :] = get_session().run(final_stack, feed_dict={input_tens: x_in,
keras.backend.learning_phase(): 0})
else:
jacobian[i, :, :, :, 0] = get_session().run(final_stack, feed_dict={input_tens: x_in,
keras.backend.learning_phase(): 0})
else:
raise ValueError('Input data shape do not match neural network expectation')
if mean_output is True:
jacobian_master = np.mean(jacobian, axis=0)
else:
jacobian_master = np.array(jacobian)
jacobian_master = np.squeeze(jacobian_master)
if denormalize:
if self.input_std is not None:
jacobian_master = jacobian_master / self.input_std
if self.labels_std is not None:
try:
jacobian_master = jacobian_master * self.labels_std
except ValueError:
jacobian_master = jacobian_master * self.labels_std.reshape(-1, 1)
print(f'Finished gradient calculation, {(time.time() - start_time):.{2}f} seconds elapsed')
return jacobian_master
[docs] def plot_dense_stats(self):
"""
Plot dense layers weight statistics
:return: A plot
:History: 2018-May-12 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
dense_list = []
for counter, layer in enumerate(self.keras_model.layers):
if isinstance(layer, keras.layers.Dense):
dense_list.append(counter)
denses = np.array(self.keras_model.layers)[dense_list]
fig, ax = plt.subplots(1, figsize=(15, 10), dpi=100)
for counter, dense in enumerate(denses):
weight_temp = np.array(dense.get_weights())[0].flatten()
ax.hist(weight_temp, 200, density=True, range=(-2., 2.), alpha=0.7,
label=f'Dense Layer {counter}, max: {weight_temp.max():.{2}f}, min: {weight_temp.min():.{2}f}, '
f'mean: {weight_temp.mean():.{2}f}, std: {weight_temp.std():.{2}f}')
fig.suptitle(f'Dense Layers Weight Statistics of {self.folder_name}', fontsize=17)
ax.set_xlabel('Weights', fontsize=17)
ax.set_ylabel('Normalized Distribution', fontsize=17)
ax.minorticks_on()
ax.tick_params(labelsize=15, width=3, length=10, which='major')
ax.tick_params(width=1.5, length=5, which='minor')
ax.legend(loc='best', fontsize=15)
fig.tight_layout(rect=[0, 0.00, 1, 0.96])
fig.show()
return fig
@property
def output_shape(self):
"""
Get output shape of the prediction model
:return: output shape expectation
:rtype: tuple
:History: 2018-May-19 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
try:
return self.keras_model_predict.output_shape
except AttributeError:
return self.keras_model.output_shape
@property
def input_shape(self):
"""
Get input shape of the prediction model
:return: input shape expectation
:rtype: tuple
:History: 2018-May-21 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
try:
return self.keras_model_predict.input_shape
except AttributeError:
return self.keras_model.input_shape
[docs] def get_weights(self):
"""
Get all model weights
:return: weights arrays
:rtype: ndarray
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.get_weights()
[docs] def summary(self):
"""
Get model summary
:return: None, just print
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.summary()
[docs] def get_config(self):
"""
Get model configuration as a dictionary
:return: dict
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return self.keras_model.get_config()
[docs] def save_weights(self, filename=_astroNN_MODEL_NAME, overwrite=True):
"""
Save model weights as .h5
:param filename: Filename of .h5 to be saved
:type filename: str
:param overwrite: whether to overwrite
:type overwrite: bool
:return: None, a .h5 file will be saved
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
print('==========================')
print('This is a remainder that saving weights to h5, you might have difficult to '
'load it back and cannot be used with astroNN probably')
print('==========================')
if self.fullfilepath is not None:
return self.keras_model.save_weights(str(os.path.join(self.fullfilepath, filename)), overwrite=overwrite)
else:
return self.keras_model.save_weights(filename, overwrite=overwrite)
@property
def uses_learning_phase(self):
"""
To determine whether the model depends on keras learning flag. If False, then setting learning phase will not
affect the model
:return: the boolean to indicate keras learning flag dependence of the model
:rtype: bool
:History: 2018-Jun-03 - Written - Henry Leung (University of Toronto)
"""
self.has_model_check()
return any([getattr(x, '_uses_learning_phase', False) for x in self.keras_model.outputs])
[docs] def get_layer(self, *args, **kwargs):
"""
get_layer() method of tensorflow
"""
return self.keras_model.get_layer(*args, **kwargs)
[docs] def flush(self):
"""
| Experimental, I don't think it works
| Flush GPU memory from tensorflow
:History: 2018-Jun-19 - Written - Henry Leung (University of Toronto)
"""
keras.backend.clear_session()