Source code for astroNN.nn.layers

import math

import keras
from astroNN.config import _KERAS_BACKEND, backend_framework

[docs] class KLDivergenceLayer(keras.layers.Layer): """ | Identity transform layer that adds KL divergence to the final model losses. | KL divergence used to force the latent space match the prior (in this case its unit gaussian) :return: A layer :rtype: object :History: 2018-Feb-05 - Written - Henry Leung (University of Toronto) """ def __init__(self, name=None, **kwargs): self.is_placeholder = True super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied, concatenated tf.tensor of mean and std in latent space :type inputs: tf.Tensor :return: Tensor after applying the layer :rtype: tf.Tensor """ mu, log_var = inputs kl_batch = -0.5 * keras.ops.sum( 1 + log_var - keras.ops.square(mu) - keras.ops.exp(log_var), axis=-1 ) self.add_loss(keras.ops.mean(kl_batch), inputs=inputs) return inputs
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"None": None} base_config = super().get_config() return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape): return input_shape
class VAESampling(keras.layers.Layer): """ Uses (z_mean, z_log_var) to sample z, the vector encoding a digit. """ def __init__(self, name=None, **kwargs): self.supports_masking = True super().__init__(name=name, **kwargs) def call(self, inputs): z_mean, z_log_var = inputs batch = keras.ops.shape(z_mean)[0] dim = keras.ops.shape(z_mean)[1] epsilon = keras.random.normal(shape=(batch, dim)) return z_mean + keras.ops.exp(0.5 * z_log_var) * epsilon
[docs] class MCDropout(keras.layers.Layer): """ Dropout Layer for Bayesian Neural Network, this layer will always on regardless the learning phase flag :param rate: Dropout Rate between 0 and 1 :type rate: float :param disable: Dropout on or off :type disable: boolean :return: A layer :rtype: object :History: 2018-Feb-05 - Written - Henry Leung (University of Toronto) """ def __init__(self, rate, disable=False, noise_shape=None, name=None, **kwargs): self.rate = rate self.disable_layer = disable self.supports_masking = True self.noise_shape = noise_shape super().__init__(name=name, **kwargs) def _get_noise_shape(self, inputs): if self.noise_shape is None: return self.noise_shape symbolic_shape = keras.ops.shape(inputs) noise_shape = [ symbolic_shape[axis] if shape is None else shape for axis, shape in enumerate(self.noise_shape) ] return tuple(noise_shape)
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer :rtype: tf.Tensor """ noise_shape = self._get_noise_shape(inputs) if self.disable_layer is True: return inputs else: return keras.random.dropout(inputs, rate=self.rate, noise_shape=noise_shape)
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"rate": self.rate, "noise_shape": self.noise_shape} base_config = super().get_config() return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape): return input_shape
[docs] class MCSpatialDropout1D(MCDropout): """ Spatial 1D version of Dropout of Dropout Layer for Bayesian Neural Network, this layer will always regardless the learning phase flag :param rate: Dropout Rate between 0 and 1 :type rate: float :param disable: Dropout on or off :type disable: boolean :return: A layer :rtype: object :History: 2018-Mar-07 - Written - Henry Leung (University of Toronto) """ def __init__(self, rate, disable=False, **kwargs): super().__init__(rate, disable, **kwargs) self.disable_layer = disable self.input_spec = keras.layers.InputSpec(ndim=3) def _get_noise_shape(self, inputs): input_shape = keras.ops.shape(inputs) return input_shape[0], 1, input_shape[2]
[docs] class MCSpatialDropout2D(MCDropout): """ Spatial 2D version of Dropout of Dropout Layer for Bayesian Neural Network, this layer will always regardless the learning phase flag :param rate: Dropout Rate between 0 and 1 :type rate: float :param disable: Dropout on or off :type disable: boolean :return: A layer :rtype: object :History: 2018-Mar-07 - Written - Henry Leung (University of Toronto) """ def __init__(self, rate, disable=False, **kwargs): super().__init__(rate, disable, **kwargs) self.disable_layer = disable self.input_spec = keras.layers.InputSpec(ndim=4) def _get_noise_shape(self, inputs): input_shape = keras.ops.shape(inputs) return input_shape[0], 1, 1, input_shape[3]
[docs] class MCGaussianDropout(keras.layers.Layer): """ Dropout Layer for Bayesian Neural Network, this layer will always on regardless the learning phase flag standard deviation sqrt(rate / (1 - rate)) :param rate: Dropout Rate between 0 and 1 :type rate: float :param disable: Dropout on or off :type disable: boolean :return: A layer :rtype: object :History: 2018-Mar-07 - Written - Henry Leung (University of Toronto) """ def __init__(self, rate, disable=False, name=None, **kwargs): self.rate = min(1.0 - keras.backend.epsilon(), max(0.0, rate)) self.disable_layer = disable self.supports_masking = True self.rate = rate super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer :rtype: tf.Tensor """ stddev = math.sqrt(self.rate / (1.0 - self.rate)) if self.disable_layer is True: return inputs else: return inputs * keras.random.normal( shape=keras.ops.shape(inputs), mean=1.0, stddev=stddev )
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"rate": self.rate} base_config = super().get_config() return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape): return input_shape
[docs] class ErrorProp(keras.layers.Layer): """ Propagate Error Layer by adding gaussian noise (mean=0, std=err) during testing phase from ``input_err`` tensor :return: A layer :rtype: object :History: 2018-Feb-05 - Written - Henry Leung (University of Toronto) """ def __init__(self, name=None, **kwargs): self.supports_masking = True super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: a list of Tensor which [input_tensor, input_error_tensor] :type inputs: list[tf.Tensor] :return: Tensor after applying the layer :rtype: tf.Tensor """ noise = keras.random.normal(keras.ops.shape(inputs[0])) noised_inputs = inputs[0] + noise * inputs[1] output_tensor = keras.ops.where( keras.ops.equal(training, True), inputs[0], noised_inputs ) return output_tensor
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {} base_config = super().get_config() return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape): return input_shape[0]
[docs] class FastMCInference: """ Turn a model for fast Monte Carlo (Dropout, Flipout, etc) Inference on GPU :param n: Number of Monte Carlo integration :type n: int :return: A layer :rtype: object :History: | 2018-Apr-13 - Written - Henry Leung (University of Toronto) | 2021-Apr-14 - Updated - Henry Leung (University of Toronto) """ def __init__(self, n, model, **kwargs): self.n = n if isinstance(model, keras.models.Model): self.model = model else: raise TypeError( f"FastMCInference expects an instance of keras.models.Model, you gave {type(model)}" ) self.meanvar_layer = FastMCInferenceMeanVar() new_input = keras.layers.Input(shape=(self.model.input_shape[1:]), name="input") # self.mc_model = keras.models.Model( # inputs=self.model.inputs, outputs=self.model.outputs # ) self.fast_mc_layer = FastMCInferenceV2_internal(self.model, self.n) mc = self.meanvar_layer(self.fast_mc_layer(new_input)) self.transformed_model = keras.models.Model(inputs=new_input, outputs=mc)
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"n": self.n} return config
class FastMCInferenceV2_internal(keras.layers.Wrapper): def __init__(self, model, n=100, **kwargs): super().__init__(model, **kwargs) if isinstance(model, keras.Model) or isinstance(model, keras.Sequential): self.layer = model self.n = n self.arange_n = keras.ops.arange(self.n, dtype=keras.backend.floatx()) else: raise TypeError( f"FastMCInference expects keras Model, you gave {type(model)}" ) def build(self, input_shape): self.built = True def compute_output_shape(self, input_shape): layer_output_shape = self.layer.compute_output_shape(input_shape) if isinstance(layer_output_shape, list): # if it is a list of shape, then add self.n in front of each shape return [tuple([self.n] + list(shape)) for shape in layer_output_shape] elif isinstance(layer_output_shape, dict): # if it is a dict of shape, then add self.n in front of each shape return { key: tuple([self.n] + list(shape)) for key, shape in layer_output_shape.items() } else: return (self.n,) + layer_output_shape def call(self, inputs, training=None, mask=None): def loop_fn(i): return self.layer(inputs) # vectorizing operation depends on backend if _KERAS_BACKEND == "torch": outputs = backend_framework.vmap( loop_fn, randomness="different", in_dims=0 )(self.arange_n) # TODO: tensorflow vectorized_map traced operation so there is no randomness which affects e.g., dropout # elif _KERAS_BACKEND == "tensorflow": # outputs = backend_framework.vectorized_map(loop_fn, self.arange_n) else: # fallback to simple for loop outputs = [self.layer(inputs) for _ in range(self.n)] if isinstance(outputs[0], dict): outputs = { key: keras.ops.stack([output[key] for output in outputs]) for key in outputs[0].keys() } else: outputs = keras.ops.stack(outputs) return outputs # outputs can be tensor or dict of tensors
[docs] class FastMCInferenceMeanVar(keras.layers.Layer): """ Take mean and variance of the results of a TimeDistributed layer, assuming axis=1 is the timestamp axis :return: A layer :rtype: object :History: | 2018-Feb-02 - Written - Henry Leung (University of Toronto) | 2018-Apr-13 - Update - Henry Leung (University of Toronto) """ def __init__(self, name=None, **kwargs): super().__init__(name=name, **kwargs) def compute_output_shape(self, input_shape): # the first dimension is the number of MC integration, so we remove it but add 2 for mean and var if isinstance(input_shape, list): return [shape[1:] + (2,) for shape in input_shape] elif isinstance(input_shape, dict): return {key: shape[1:] + (2,) for key, shape in input_shape.items()} else: return input_shape[1:] + (2,)
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"None": None} base_config = super().get_config() return {**dict(base_config.items()), **config}
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer :rtype: tf.Tensor """ if isinstance(inputs, dict): outputs = {} for key, value in inputs.items(): mean, var = keras.ops.mean(value, axis=0), keras.ops.var(value, axis=0) outputs[key] = keras.ops.stack((mean, var), axis=-1) return outputs elif isinstance(inputs, list): outputs = [] for value in inputs: mean, var = keras.ops.mean(value, axis=0), keras.ops.var(value, axis=0) outputs.append(keras.ops.stack((mean, var), axis=-1)) return outputs else: # just a tensor mean, var = keras.ops.mean(inputs, axis=0), keras.ops.var(inputs, axis=0) return keras.ops.stack((mean, var), axis=-1)
[docs] class FastMCRepeat(keras.layers.Layer): """ Prepare data to do inference, Repeats the input n times at axis=1 :param n: Number of Monte Carlo integration :type n: int :return: A layer :rtype: object :History: | 2018-Feb-02 - Written - Henry Leung (University of Toronto) | 2018-Apr-13 - Update - Henry Leung (University of Toronto) """ def __init__(self, n, name=None, **kwargs): self.n = n super().__init__(name=name, **kwargs) def compute_output_shape(self, input_shape): return (input_shape[0], self.n) + (input_shape[1:])
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer which is the repeated Tensor :rtype: tf.Tensor """ expanded_inputs = keras.ops.expand_dims(inputs, 1) # we want [1, self.n, 1.....] return keras.ops.tile( expanded_inputs, keras.ops.concat( [ [1, self.n], keras.ops.ones_like(keras.ops.shape(expanded_inputs))[2:], ], axis=0, ), )
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"n": self.n} base_config = super().get_config() return {**base_config.items(), **config}
[docs] class StopGrad(keras.layers.Layer): """ Stop gradient backpropagation via this layer during training, act as an identity layer during testing by default. :return: A layer :rtype: object :History: 2018-May-23 - Written - Henry Leung (University of Toronto) """ def __init__(self, name=None, always_on=False, **kwargs): super().__init__(name=name, **kwargs) self.always_on = always_on def compute_output_shape(self, input_shape): return input_shape
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer which is just the original tensor :rtype: tf.Tensor """ return keras.ops.stop_gradient(inputs)
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"None": None} base_config = super().get_config() return {**dict(base_config.items()), **config}
[docs] class BoolMask(keras.layers.Layer): """ Boolean Masking layer, please notice it is best to flatten input before using BoolMask :param mask: numpy boolean array as a mask for incoming tensor :type mask: np.ndarray :return: A layer :rtype: object :History: 2018-May-28 - Written - Henry Leung (University of Toronto) """ def __init__(self, mask, name=None, **kwargs): if sum(mask) == 0: raise ValueError("The mask is all False, which is invalid") else: self.boolmask = mask self.mask_shape = self.boolmask.sum() self.supports_masking = True super().__init__(name=name, **kwargs) def compute_output_shape(self, input_shape): if len(input_shape) < 2: raise ValueError(f"Shape {input_shape} must have rank at least 2") return input_shape[:-1] + (self.mask_shape,)
[docs] def call(self, inputs, training=None): """ :Note: Equivalent to __call__() :param inputs: Tensor to be applied :type inputs: tf.Tensor :return: Tensor after applying the layer which is just the masked tensor :rtype: tf.Tensor """ batchsize = keras.ops.shape(inputs)[0] # need to reshape because tf.keras cannot get the Tensor shape correctly from tf.boolean_mask op return keras.ops.reshape( keras.ops.take_along_axis(inputs, keras.ops.where(self.boolmask), axis=1), [batchsize, self.mask_shape], )
[docs] def get_config(self): """ :return: Dictionary of configuration :rtype: dict """ config = {"None": None} base_config = super().get_config() return {**dict(base_config.items()), **config}