import math
from packaging import version
import tensorflow as tf
# from tensorflow_probability.python import distributions as tfd
# from tensorflow_probability.python.layers import util as tfp_layers_util
# from tensorflow_probability.python.layers.dense_variational import _DenseVariational as DenseVariational_Layer
# from tensorflow_probability.python.math import random_rademacher
from astroNN.config import keras_import_manager
from astroNN.nn import intpow_avx2
keras = keras_import_manager()
epsilon = keras.backend.epsilon
initializers = keras.initializers
activations = keras.activations
Layer, Wrapper, InputSpec = keras.layers.Layer, keras.layers.Wrapper, keras.layers.InputSpec
# flag for new tensorflow dropout or not
new_dropout_flag = True if version.parse("1.13.0rc0") <= version.parse(tf.__version__) else False
[docs]class KLDivergenceLayer(Layer):
"""
| Identity transform layer that adds KL divergence to the final model losses.
| KL divergence used to force the latent space match the prior (in this case its unit gaussian)
:return: A layer
:rtype: object
:History: 2018-Feb-05 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, name=None, **kwargs):
self.is_placeholder = True
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied, concatenated tf.tensor of mean and std in latent space
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
mu, log_var = inputs
kl_batch = - .5 * tf.reduce_sum(1 + log_var - tf.square(mu) - tf.exp(log_var), axis=-1)
self.add_loss(tf.reduce_mean(kl_batch), inputs=inputs)
return inputs
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'None': None}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape):
return input_shape
[docs]class MCDropout(Layer):
"""
Dropout Layer for Bayesian Neural Network, this layer will always on regardless the learning phase flag
:param rate: Dropout Rate between 0 and 1
:type rate: float
:param disable: Dropout on or off
:type disable: boolean
:return: A layer
:rtype: object
:History: 2018-Feb-05 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, rate, disable=False, noise_shape=None, name=None, **kwargs):
# tensorflow expects (0,1] retain prob before 1.13.0, and dropout rate after that
self.rate = min(1. - epsilon(), max(0., rate))
self.keep_prob = 1. - self.rate
self.disable_layer = disable
self.supports_masking = True
self.noise_shape = noise_shape
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
def _get_noise_shape(self, inputs):
if self.noise_shape is None:
return self.noise_shape
symbolic_shape = tf.shape(inputs)
noise_shape = [symbolic_shape[axis] if shape is None else shape
for axis, shape in enumerate(self.noise_shape)]
return tuple(noise_shape)
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
noise_shape = self._get_noise_shape(inputs)
if self.disable_layer is True:
return inputs
else:
if new_dropout_flag:
return tf.nn.dropout(x=inputs,
rate=self.rate,
noise_shape=noise_shape)
else:
return tf.nn.dropout(x=inputs,
keep_prob=self.keep_prob,
noise_shape=noise_shape)
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'rate': self.rate,
'noise_shape': self.noise_shape}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape):
return input_shape
[docs]class MCSpatialDropout1D(MCDropout):
"""
Spatial 1D version of Dropout of Dropout Layer for Bayesian Neural Network,
this layer will always regardless the learning phase flag
:param rate: Dropout Rate between 0 and 1
:type rate: float
:param disable: Dropout on or off
:type disable: boolean
:return: A layer
:rtype: object
:History: 2018-Mar-07 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, rate, disable=False, **kwargs):
super().__init__(rate, disable, **kwargs)
self.disable_layer = disable
self.input_spec = InputSpec(ndim=3)
def _get_noise_shape(self, inputs):
input_shape = tf.shape(inputs)
return input_shape[0], 1, input_shape[2]
[docs]class MCSpatialDropout2D(MCDropout):
"""
Spatial 2D version of Dropout of Dropout Layer for Bayesian Neural Network,
this layer will always regardless the learning phase flag
:param rate: Dropout Rate between 0 and 1
:type rate: float
:param disable: Dropout on or off
:type disable: boolean
:return: A layer
:rtype: object
:History: 2018-Mar-07 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, rate, disable=False, **kwargs):
super().__init__(rate, disable, **kwargs)
self.disable_layer = disable
self.input_spec = InputSpec(ndim=4)
def _get_noise_shape(self, inputs):
input_shape = tf.shape(inputs)
return input_shape[0], 1, 1, input_shape[3]
[docs]class MCGaussianDropout(Layer):
"""
Dropout Layer for Bayesian Neural Network, this layer will always on regardless the learning phase flag
standard deviation sqrt(rate / (1 - rate))
:param rate: Dropout Rate between 0 and 1
:type rate: float
:param disable: Dropout on or off
:type disable: boolean
:return: A layer
:rtype: object
:History: 2018-Mar-07 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, rate, disable=False, name=None, **kwargs):
self.rate = min(1. - epsilon(), max(0., rate))
self.disable_layer = disable
self.supports_masking = True
self.rate = rate
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
stddev = math.sqrt(self.rate / (1.0 - self.rate))
if self.disable_layer is True:
return inputs
else:
return inputs * tf.random_normal(shape=tf.shape(inputs), mean=1.0, stddev=stddev)
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'rate': self.rate}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape):
return input_shape
[docs]class MCConcreteDropout(Wrapper):
"""
| Monte Carlo Dropout with Continuous Relaxation Layer Wrapper This layer will learn the dropout probability
| arXiv:1705.07832
:param layer: The layer to be applied concrete dropout
:type layer: keras.layers.Layer
:return: A layer
:rtype: object
:History: 2018-Mar-04 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, layer, weight_regularizer=5e-13, dropout_regularizer=1e-4,
init_min=0.1, init_max=0.2, disable=False, **kwargs):
assert 'kernel_regularizer' not in kwargs
super().__init__(layer, **kwargs)
self.weight_regularizer = weight_regularizer
self.dropout_regularizer = dropout_regularizer
self.disable_layer = disable
self.supports_masking = True
self.p_logit = None
self.p = None
self.init_min = math.log(init_min) - math.log(1. - init_min)
self.init_max = math.log(init_max) - math.log(1. - init_max)
def build(self, input_shape=None):
self.layer.input_spec = InputSpec(shape=input_shape)
if hasattr(self.layer, 'built') and not self.layer.built:
self.layer.build(input_shape)
self.layer.built = True
# initialise p
self.p_logit = self.add_weight(name='p_logit', shape=(1,),
initializer=initializers.RandomUniform(self.init_min, self.init_max),
dtype=tf.float32, trainable=True)
self.p = tf.nn.sigmoid(self.p_logit[0])
tf.add_to_collection("LAYER_P", self.p)
# initialise regularizer / prior KL term
input_dim = tf.reduce_prod(input_shape[1:]) # we drop only last dim
weight = self.layer.kernel
kernel_regularizer = self.weight_regularizer * tf.reduce_sum(tf.square(weight)) / (1. - self.p)
dropout_regularizer = self.p * tf.log(self.p)
dropout_regularizer += (1. - self.p) * tf.log(1. - self.p)
dropout_regularizer *= self.dropout_regularizer * tf.cast(input_dim, tf.float32)
regularizer = tf.reduce_sum(kernel_regularizer + dropout_regularizer)
self.layer.add_loss(regularizer)
# Add the regularisation loss to collection.
tf.add_to_collection(tf.GraphKeys.REGULARIZATION_LOSSES, regularizer)
def compute_output_shape(self, input_shape):
return self.layer.compute_output_shape(input_shape)
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'rate': self.p.eval(session=keras.backend.get_session()),
'weight_regularizer': self.weight_regularizer, 'dropout_regularizer': self.dropout_regularizer}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def concrete_dropout(self, x):
eps = epsilon()
unif_noise = tf.random_uniform(shape=tf.shape(x))
drop_prob = (tf.log(self.p + eps) - tf.log(1. - self.p + eps) + tf.log(unif_noise + eps) - tf.log(
1. - unif_noise + eps))
drop_prob = tf.nn.sigmoid(drop_prob / 0.1)
random_tensor = 1. - drop_prob
retain_prob = 1. - self.p
x *= random_tensor
x /= retain_prob
return x
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
if self.disable_layer is True:
return self.layer.call(inputs)
else:
return self.layer.call(self.concrete_dropout(inputs))
[docs]class MCBatchNorm(Layer):
"""
Monte Carlo Batch Normalization Layer for Bayesian Neural Network
:param disable: Dropout on or off
:type disable: boolean
:return: A layer
:rtype: object
:History: 2018-Apr-12 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, disable=False, name=None, **kwargs):
self.disable_layer = disable
self.supports_masking = True
self.epsilon = 1e-10
self.scale = None
self.beta = None
self.mean = None
self.var = None
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
self.scale = tf.Variable(tf.ones([inputs.shape[-1]]))
self.beta = tf.Variable(tf.zeros([inputs.shape[-1]]))
self.mean = tf.Variable(tf.zeros([inputs.shape[-1]]), trainable=False)
self.var = tf.Variable(tf.ones([inputs.shape[-1]]), trainable=False)
if training is None:
training = keras.backend.learning_phase()
batch_mean, batch_var = tf.nn.moments(inputs, [0])
in_train = tf.nn.batch_normalization(inputs, batch_mean, batch_var, self.beta, self.scale, self.epsilon)
in_test = tf.nn.batch_normalization(inputs, self.mean, self.var, self.beta, self.scale, self.epsilon)
output_tensor = tf.where(tf.equal(training, True), in_train, in_test)
output_tensor._uses_learning_phase = True
return output_tensor
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'epsilon': self.epsilon}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape):
return input_shape
[docs]class ErrorProp(Layer):
"""
Propagate Error Layer, do nothing during training, add gaussian noise during testing phase
:param stddev: Known 1-S.D. Uncertainty in input data
:type stddev: float
:return: A layer
:rtype: object
:History: 2018-Feb-05 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, stddev, name=None, **kwargs):
self.supports_masking = True
self.stddev = stddev
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
if training is None:
training = keras.backend.learning_phase()
noised = tf.add(inputs, tf.random_normal(shape=tf.shape(inputs), mean=0., stddev=self.stddev))
output_tensor = tf.where(tf.equal(training, True), inputs, noised)
output_tensor._uses_learning_phase = True
return output_tensor
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'stddev': self.stddev}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
def compute_output_shape(self, input_shape):
return input_shape
[docs]class FastMCInference():
"""
Turn a model for fast Monte Carlo (Dropout, Flipout, etc) Inference on GPU
:param n: Number of Monte Carlo integration
:type n: int
:return: A layer
:rtype: object
:History: 2018-Apr-13 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, n, **kwargs):
self.n = n
[docs] def __call__(self, model):
"""
:param model: Keras model to be accelerated
:type model: Union[keras.Model, keras.Sequential]
:return: Accelerated Keras model
:rtype: Union[keras.Model, keras.Sequential]
"""
if isinstance(model, keras.Model) or isinstance(model, keras.Sequential):
self.model = model
else:
raise TypeError(f'FastMCInference expects keras Model, you gave {type(model)}')
new_input = keras.layers.Input(shape=(self.model.input_shape[1:]), name='input')
mc_model = keras.models.Model(inputs=self.model.inputs, outputs=self.model.outputs)
mc = FastMCInferenceMeanVar()(keras.layers.TimeDistributed(mc_model)(FastMCRepeat(self.n)(new_input)))
new_mc_model = keras.models.Model(inputs=new_input, outputs=mc)
return new_mc_model
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'n': self.n}
return config
[docs]class FastMCInferenceMeanVar(Layer):
"""
Take mean and variance of the results of a TimeDistributed layer, assuming axis=1 is the timestamp axis
:return: A layer
:rtype: object
:History:
| 2018-Feb-02 - Written - Henry Leung (University of Toronto)
| 2018-Apr-13 - Update - Henry Leung (University of Toronto)
"""
def __init__(self, name=None, **kwargs):
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
def compute_output_shape(self, input_shape):
return 2, input_shape[0], input_shape[2:]
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'None': None}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer
:rtype: tf.Tensor
"""
# need to stack because keras can only handle one output
mean, var = tf.nn.moments(inputs, axes=1)
return tf.stack((tf.squeeze([mean]), tf.squeeze([var])), axis=-1)
[docs]class FastMCRepeat(Layer):
"""
Prepare data to do inference, Repeats the input n times at axis=1
:param n: Number of Monte Carlo integration
:type n: int
:return: A layer
:rtype: object
:History:
| 2018-Feb-02 - Written - Henry Leung (University of Toronto)
| 2018-Apr-13 - Update - Henry Leung (University of Toronto)
"""
def __init__(self, n, name=None, **kwargs):
self.n = n
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
def compute_output_shape(self, input_shape):
return (input_shape[0], self.n) + (input_shape[1:])
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer which is the repeated Tensor
:rtype: tf.Tensor
"""
expanded_inputs = tf.expand_dims(inputs, 1)
# we want [1, self.n, 1.....]
return tf.tile(expanded_inputs, tf.concat([[1, self.n], tf.ones_like(tf.shape(expanded_inputs))[2:]], axis=0))
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'n': self.n}
base_config = super().get_config()
return {**base_config.items(), **config}
[docs]class StopGrad(Layer):
"""
Stop gradient backpropagation via this layer during training, act as an identity layer during testing by default.
:param always_on: Default False which will on during train time and off during test time. True to enable it in every situation
:type always_on: bool
:return: A layer
:rtype: object
:History: 2018-May-23 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, name=None, always_on=False, **kwargs):
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
self.always_on = always_on
def compute_output_shape(self, input_shape):
return input_shape
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer which is just the original tensor
:rtype: tf.Tensor
"""
if self.always_on:
return tf.stop_gradient(inputs)
else:
if training is None:
training = keras.backend.learning_phase()
output_tensor = tf.where(tf.equal(training, True), tf.stop_gradient(inputs), inputs)
output_tensor._uses_learning_phase = True
return output_tensor
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'None': None}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
[docs]class BoolMask(Layer):
"""
Boolean Masking layer
:param mask: numpy boolean array as a mask for incoming tensor
:type mask: np.ndarray
:return: A layer
:rtype: object
:History: 2018-May-28 - Written - Henry Leung (University of Toronto)
"""
def __init__(self, mask, name=None, **kwargs):
if sum(mask) == 0:
raise ValueError("The mask is all False, which is invalid")
else:
self.boolmask = mask
self.mask_shape = self.boolmask.sum()
self.supports_masking = True
if not name:
prefix = self.__class__.__name__
name = prefix + '_' + str(keras.backend.get_uid(prefix))
super().__init__(name=name, **kwargs)
def compute_output_shape(self, input_shape):
return tuple((input_shape[0], self.mask_shape))
[docs] def call(self, inputs, training=None):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer which is just the masked tensor
:rtype: tf.Tensor
"""
batchsize = tf.shape(inputs)[0]
# need to reshape because tf.keras cannot get the Tensor shape correctly from tf.boolean_mask op
return tf.reshape(tf.boolean_mask(inputs, self.boolmask, axis=1), [batchsize, self.mask_shape])
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'None': None}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
[docs]class PolyFit(Layer):
"""
n-deg polynomial fitting layer which acts as an neural network layer to be optimized
:param deg: degree of polynomial
:type deg: int
:param output_units: number of output neurons
:type output_units: int
:param use_xbias: If True, then fitting output=P(inputs)+inputs, else fitting output=P(inputs)
:type use_xbias: bool
:param init_w: [Optional] list of initial weights if there is any, the list should be [n-degree, input_size, output_size]
:type init_w: Union[NoneType, list]
:param name: [Optional] name of the layer
:type name: Union[NoneType, str]
:param activation: [Optional] activation, default is 'linear'
:type activation: Union[NoneType, str]
:param kernel_regularizer: [Optional] kernel regularizer
:type kernel_regularizer: Union[NoneType, str]
:param kernel_constraint: [Optional] kernel constraint
:type kernel_constraint: Union[NoneType, str]
:return: A layer
:rtype: object
:History: 2018-Jul-24 - Written - Henry Leung (University of Toronto)
"""
def __init__(self,
deg=1,
output_units=1,
use_xbias=True,
init_w=None,
name=None,
activation=None,
kernel_regularizer=None,
kernel_constraint=None):
super().__init__(name=name)
self.input_spec = InputSpec(min_ndim=2)
self.deg = deg
self.output_units = output_units
self.use_bias = use_xbias
self.activation = activations.get(activation)
self.kernel_regularizer = keras.regularizers.get(kernel_regularizer)
self.kernel_constraint = keras.constraints.get(kernel_constraint)
self.init_w = init_w
if self.init_w is not None and len(self.init_w) != self.deg + 1:
raise ValueError(f"If you specify initial weight for {self.deg}-deg polynomial, "
f"you must provide {self.deg+1} weights")
def build(self, input_shape):
assert len(input_shape) >= 2
if isinstance(input_shape[-1], tf.Dimension):
self.input_dim = input_shape[-1].value
else:
self.input_dim = input_shape[-1]
self.kernel = self.add_weight(shape=(self.deg + 1, self.input_dim, self.output_units),
initializer="random_normal",
name='kernel',
regularizer=self.kernel_regularizer,
constraint=self.kernel_constraint)
if self.init_w is not None:
for k in range(self.output_units):
for j in range(self.input_dim):
for i in range(self.deg + 1):
keras.backend.set_value(self.kernel[i, j, k], self.init_w[i][j][k])
self.input_spec = InputSpec(min_ndim=2, axes={-1: self.input_dim})
self.built = True
[docs] def call(self, inputs):
"""
:Note: Equivalent to __call__()
:param inputs: Tensor to be applied
:type inputs: tf.Tensor
:return: Tensor after applying the layer which is just n-deg P(inputs)
:rtype: tf.Tensor
"""
polylist = []
output_list = []
for k in range(self.output_units):
for j in range(self.input_dim):
polylist.append([tf.multiply(intpow_avx2(inputs[:, j], i),
self.kernel[i, j, k]) for i in range(self.deg + 1)])
if self.use_bias:
polylist[j].append(inputs[:, j])
output_list.append(tf.add_n([tf.add_n(polylist[jj]) for jj in range(self.input_dim)]))
output = tf.stack(output_list, axis=-1)
if self.activation is not None:
output = self.activation(output)
return output
def compute_output_shape(self, input_shape):
return tuple((input_shape[0], self.output_units))
[docs] def get_config(self):
"""
:return: Dictionary of configuration
:rtype: dict
"""
config = {'degree': self.deg,
'use_bias': self.use_bias,
'activation': activations.serialize(self.activation),
'initial_weights': self.init_w,
'kernel_regularizer': keras.regularizers.serialize(self.kernel_regularizer),
'kernel_constraint': keras.constraints.serialize(self.kernel_constraint)}
base_config = super().get_config()
return {**dict(base_config.items()), **config}
# class BayesPolyFit(DenseVariational_Layer):
# """
# | n-deg polynomial fitting layer which acts as a bayesian neural network layer to be optimized with
# | local reparameterization gradients
# |
# | Moreover, the current implementation of this layer does not allow it to be run with Keras,
# | pleas modify astroNN configure in ~/config.ini key -> tensorflow_keras = tensorflow
#
# :param deg: degree of polynomial
# :type deg: int
# :param output_units: number of output neurons
# :type output_units: int
# :param use_xbias: If True, then fitting output=P(inputs)+inputs, else fitting output=P(inputs)
# :type use_xbias: bool
# :param init_w: [Optional] list of initial weights if there is any, the list should be [n-degree, input_size, output_size]
# :type init_w: Union[NoneType, list]
# :param name: [Optional] name of the layer
# :type name: Union[NoneType, str]
# :param activation: [Optional] activation, default is 'linear'
# :type activation: Union[NoneType, str]
# :param trainable: [Optional] trainable or not
# :type trainable: bool
# :return: A layer
# :rtype: object
# :History: 2018-Sept-08 - Written - Henry Leung (University of Toronto)
# """
#
# def __init__(self,
# deg=1,
# output_units=1,
# use_xbias=True,
# init_w=None,
# name=None,
# activation=None,
# trainable=True,
# kernel_posterior_fn=tfp_layers_util.default_mean_field_normal_fn(),
# kernel_posterior_tensor_fn=lambda d: d.sample(),
# kernel_prior_fn=tfp_layers_util.default_multivariate_normal_fn,
# kernel_divergence_fn=lambda q, p, ignore: tfd.kl_divergence(q, p)):
# if 'tf' not in keras.__version__:
# raise EnvironmentError("The current implementation of this layer does not allow it to be run with Keras, "
# "pleas modify astroNN configure in ~/config.ini key -> tensorflow_keras = tensorflow")
# super().__init__(name=name,
# units=output_units,
# trainable=trainable,
# kernel_posterior_fn=kernel_posterior_fn,
# kernel_posterior_tensor_fn=kernel_posterior_tensor_fn,
# kernel_prior_fn=kernel_prior_fn,
# kernel_divergence_fn=kernel_divergence_fn)
# self.input_spec = InputSpec(min_ndim=2)
# self.deg = deg
# self.output_units = output_units
# self.use_bias = use_xbias
# self.activation = activations.get(activation)
# self.kernel_posterior_fn = kernel_posterior_fn
# self.kernel_posterior_tensor_fn = kernel_posterior_tensor_fn
# self.kernel_prior_fn = kernel_prior_fn
# self.kernel_divergence_fn = kernel_divergence_fn
# self.init_w = init_w
#
# if self.init_w is not None and len(self.init_w) != self.deg + 1:
# raise ValueError(f"If you specify initial weight for {self.deg}-deg polynomial, "
# f"you must provide {self.deg+1} weights")
#
# def _apply_variational_kernel(self, inputs):
#
# if (not isinstance(self.kernel_posterior, tfd.Independent) or
# not isinstance(self.kernel_posterior.distribution, tfd.Normal)):
# raise TypeError(f'`DenseFlipout` requires kernel_posterior_fn` produce an instance of '
# f'`tf.distributions.Independent(tf.distributions.Normal)'
# f'`(saw: \"{self.kernel_posterior.name}\").')
# self.kernel_posterior_affine = tfd.Normal(
# loc=tf.zeros_like(self.kernel_posterior.distribution.loc),
# scale=self.kernel_posterior.distribution.scale)
# self.kernel_posterior_affine_tensor = (self.kernel_posterior_tensor_fn(self.kernel_posterior_affine))
# self.kernel_posterior_tensor = None
#
# input_shape = tf.shape(inputs)
# batch_shape = input_shape[:-1]
#
# sign_input = random_rademacher(input_shape, dtype=inputs.dtype)
# sign_output = random_rademacher(
# tf.concat([batch_shape,
# tf.expand_dims(self.units, 0)], 0),
# dtype=inputs.dtype)
# perturbed_inputs = self._ndegmul(inputs * sign_input,
# self.kernel_posterior_affine_tensor) * sign_output
#
# outputs = self._ndegmul(inputs, self.kernel_posterior.distribution.loc)
# outputs += perturbed_inputs
# return outputs
#
# def build(self, input_shape):
# assert len(input_shape) >= 2
# input_shape = tf.TensorShape(input_shape)
# in_size = input_shape.with_rank_at_least(2)[-1].value
#
# if isinstance(input_shape[-1], tf.Dimension):
# self.input_dim = input_shape[-1].value
# else:
# self.input_dim = input_shape[-1]
#
# self.kernel_posterior = self.kernel_posterior_fn(tf.float32, [self.deg + 1, self.input_dim, self.output_units],
# 'kernel_posterior',
# self.trainable, self.add_variable)
# if self.kernel_prior_fn is None:
# self.kernel_prior = None
# else:
# self.kernel_prior = self.kernel_prior_fn(tf.float32, [self.deg + 1, self.input_dim, self.output_units],
# 'kernel_prior', self.trainable, self.add_variable)
# self._built_kernel_divergence = False
#
# self.input_spec = InputSpec(min_ndim=2, axes={-1: self.input_dim})
# self.built = True
#
# def _apply_divergence(self, divergence_fn, posterior, prior,
# posterior_tensor, name):
# if divergence_fn is None or posterior is None or prior is None:
# divergence = None
# return
# divergence = tf.identity(divergence_fn(posterior, prior, posterior_tensor), name=name)
# self.add_loss(divergence)
#
# def _ndegmul(self, inputs, kernel):
# polylist = []
# output_list = []
# for k in range(self.output_units):
# for j in range(self.input_dim):
# polylist.append([tf.multiply(intpow_avx2(inputs[:, j], i),
# kernel[i, j, k]) for i in range(self.deg + 1)])
# if self.use_bias:
# polylist[j].append(inputs[:, j])
# output_list.append(tf.add_n([tf.add_n(polylist[jj]) for jj in range(self.input_dim)]))
# output = tf.stack(output_list, axis=-1)
#
# return output
#
# def call(self, inputs):
# """
# :Note: Equivalent to __call__()
# :param inputs: Tensor to be applied
# :type inputs: tf.Tensor
# :return: Tensor after applying the layer which is just n-deg P(inputs)
# :rtype: tf.Tensor
# """
# outputs = self._apply_variational_kernel(inputs)
#
# if self.activation is not None:
# outputs = self.activation(outputs)
#
# if not self._built_kernel_divergence:
# self._apply_divergence(self.kernel_divergence_fn,
# self.kernel_posterior,
# self.kernel_prior,
# self.kernel_posterior_tensor,
# name='divergence_kernel')
# self._built_kernel_divergence = True
# return outputs
#
# def get_weights_and_error(self):
# """
# :return: dictionary contains `weights` for weights and `error` for weights uncertainty
# :rtype: dict
# """
# weights = self.kernel_posterior.distribution.loc.eval(session=keras.backend.get_session())
# error = self.kernel_posterior.distribution.scale.eval(session=keras.backend.get_session())
# return {'weights': weights, 'error': error}
#
# def compute_output_shape(self, input_shape):
# return tuple((input_shape[0], self.output_units))
#
# def get_config(self):
# """
# :return: Dictionary of configuration
# :rtype: dict
# """
# config = {'degree': self.deg,
# 'use_bias': self.use_bias,
# 'activation': activations.serialize(self.activation),
# 'initial_weights': self.init_w}
# base_config = super().get_config()
# return {**dict(base_config.items()), **config}