Source code for astroNN.models.apogee_models

# ---------------------------------------------------------#
#   astroNN.models.apogee_models: Contain Apogee Models
# ---------------------------------------------------------#
import numpy as np
import tensorflow as tf

from astroNN.apogee import aspcap_mask
from astroNN.apogee.plotting import ASPCAP_plots
from astroNN.config import keras_import_manager
from astroNN.models.base_bayesian_cnn import BayesianCNNBase
from astroNN.models.base_cnn import CNNBase
from astroNN.models.base_vae import ConvVAEBase
from astroNN.nn.layers import MCDropout, BoolMask, StopGrad, KLDivergenceLayer
from astroNN.nn.losses import bayesian_binary_crossentropy_wrapper, bayesian_binary_crossentropy_var_wrapper
from astroNN.nn.losses import bayesian_categorical_crossentropy_wrapper, bayesian_categorical_crossentropy_var_wrapper
from astroNN.nn.losses import mse_lin_wrapper, mse_var_wrapper

keras = keras_import_manager()

Add = keras.layers.Add
Dense = keras.layers.Dense
Input = keras.layers.Input
Conv1D = keras.layers.Conv1D
Lambda = keras.layers.Lambda
Reshape = keras.layers.Reshape
Dropout = keras.layers.Dropout
Flatten = keras.layers.Flatten
Multiply = keras.layers.Multiply
Activation = keras.layers.Activation
concatenate = keras.layers.concatenate
MaxPooling1D = keras.layers.MaxPooling1D

Model = keras.models.Model
Sequential = keras.models.Sequential

regularizers = keras.regularizers
MaxNorm = keras.constraints.MaxNorm
RandomNormal = keras.initializers.RandomNormal


# noinspection PyCallingNonCallable
[docs]class ApogeeBCNN(BayesianCNNBase, ASPCAP_plots): """ Class for Bayesian convolutional neural network for stellar spectra analysis :History: 2017-Dec-21 - Written - Henry Leung (University of Toronto) """ def __init__(self, lr=0.0005, dropout_rate=0.3): super().__init__() self._implementation_version = '1.0' self.initializer = RandomNormal(mean=0.0, stddev=0.05) self.activation = 'relu' self.num_filters = [2, 4] self.filter_len = 8 self.pool_length = 4 self.num_hidden = [196, 96] self.max_epochs = 100 self.lr = lr self.reduce_lr_epsilon = 0.00005 self.reduce_lr_min = 1e-8 self.reduce_lr_patience = 2 self.l2 = 5e-9 self.dropout_rate = dropout_rate self.input_norm_mode = 3 self.task = 'regression' self.targetname = ['teff', 'logg', 'M', 'alpha', 'C', 'C1', 'N', 'O', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'K', 'Ca', 'Ti', 'Ti2', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'fakemag'] def model(self): input_tensor = Input(shape=self._input_shape, name='input') labels_err_tensor = Input(shape=(self._labels_shape,), name='labels_err') cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(input_tensor) activation_1 = Activation(activation=self.activation)(cnn_layer_1) dropout_1 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_1) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(dropout_1) activation_2 = Activation(activation=self.activation)(cnn_layer_2) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(activation_2) flattener = Flatten()(maxpool_1) dropout_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(flattener) layer_3 = Dense(units=self.num_hidden[0], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_2) activation_3 = Activation(activation=self.activation)(layer_3) dropout_3 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_3) layer_4 = Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_3) activation_4 = Activation(activation=self.activation)(layer_4) output = Dense(units=self._labels_shape, activation=self._last_layer_activation, name='output')(activation_4) variance_output = Dense(units=self._labels_shape, activation='linear', name='variance_output')(activation_4) model = Model(inputs=[input_tensor, labels_err_tensor], outputs=[output, variance_output]) # new astroNN high performance dropout variational inference on GPU expects single output model_prediction = Model(inputs=[input_tensor], outputs=concatenate([output, variance_output])) if self.task == 'regression': variance_loss = mse_var_wrapper(output, labels_err_tensor) output_loss = mse_lin_wrapper(variance_output, labels_err_tensor) elif self.task == 'classification': output_loss = bayesian_categorical_crossentropy_wrapper(variance_output) variance_loss = bayesian_categorical_crossentropy_var_wrapper(output) elif self.task == 'binary_classification': output_loss = bayesian_binary_crossentropy_wrapper(variance_output) variance_loss = bayesian_binary_crossentropy_var_wrapper(output) else: raise RuntimeError('Only "regression", "classification" and "binary_classification" are supported') return model, model_prediction, output_loss, variance_loss
# noinspection PyCallingNonCallable
[docs]class ApogeeBCNNCensored(BayesianCNNBase, ASPCAP_plots): """ Class for Bayesian censored convolutional neural network for stellar spectra analysis [specifically APOGEE DR14 spectra only] Described in the paper: https://ui.adsabs.harvard.edu/#abs/2018arXiv180804428L/abstract :History: 2018-May-27 - Written - Henry Leung (University of Toronto) """ def __init__(self, lr=0.0005, dropout_rate=0.3): super().__init__() self._implementation_version = '1.0' self.initializer = RandomNormal(mean=0.0, stddev=0.05) self.activation = 'relu' self.num_filters = [2, 4] self.filter_len = 8 self.pool_length = 4 # number of neurone for [ApogeeBCNN_Dense_1, ApogeeBCNN_Dense_2, aspcap_1, aspcap_2, hidden] self.num_hidden = [192, 96, 32, 16, 2] self.max_epochs = 50 self.lr = lr self.reduce_lr_epsilon = 0.00005 self.maxnorm = .5 self.reduce_lr_min = 1e-8 self.reduce_lr_patience = 2 self.l2 = 5e-9 self.dropout_rate = dropout_rate self.input_norm_mode = 3 self.task = 'regression' self.targetname = ['teff', 'logg', 'C', 'C1', 'N', 'O', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'K', 'Ca', 'Ti', 'Ti2', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni'] def model(self): input_tensor = Input(shape=self._input_shape, name='input') input_tensor_flattened = Flatten()(input_tensor) labels_err_tensor = Input(shape=(self._labels_shape,), name='labels_err') # slice spectra to censor out useless region for elements censored_c_input = BoolMask(aspcap_mask("C", dr=14), name='C_Mask')(input_tensor_flattened) censored_c1_input = BoolMask(aspcap_mask("C1", dr=14), name='C1_Mask')(input_tensor_flattened) censored_n_input = BoolMask(aspcap_mask("N", dr=14), name='N_Mask')(input_tensor_flattened) censored_o_input = BoolMask(aspcap_mask("O", dr=14), name='O_Mask')(input_tensor_flattened) censored_na_input = BoolMask(aspcap_mask("Na", dr=14), name='Na_Mask')(input_tensor_flattened) censored_mg_input = BoolMask(aspcap_mask("Mg", dr=14), name='Mg_Mask')(input_tensor_flattened) censored_al_input = BoolMask(aspcap_mask("Al", dr=14), name='Al_Mask')(input_tensor_flattened) censored_si_input = BoolMask(aspcap_mask("Si", dr=14), name='Si_Mask')(input_tensor_flattened) censored_p_input = BoolMask(aspcap_mask("P", dr=14), name='P_Mask')(input_tensor_flattened) censored_s_input = BoolMask(aspcap_mask("S", dr=14), name='S_Mask')(input_tensor_flattened) censored_k_input = BoolMask(aspcap_mask("K", dr=14), name='K_Mask')(input_tensor_flattened) censored_ca_input = BoolMask(aspcap_mask("Ca", dr=14), name='Ca_Mask')(input_tensor_flattened) censored_ti_input = BoolMask(aspcap_mask("Ti", dr=14), name='Ti_Mask')(input_tensor_flattened) censored_ti2_input = BoolMask(aspcap_mask("Ti2", dr=14), name='Ti2_Mask')(input_tensor_flattened) censored_v_input = BoolMask(aspcap_mask("V", dr=14), name='V_Mask')(input_tensor_flattened) censored_cr_input = BoolMask(aspcap_mask("Cr", dr=14), name='Cr_Mask')(input_tensor_flattened) censored_mn_input = BoolMask(aspcap_mask("Mn", dr=14), name='Mn_Mask')(input_tensor_flattened) censored_co_input = BoolMask(aspcap_mask("Co", dr=14), name='Co_Mask')(input_tensor_flattened) censored_ni_input = BoolMask(aspcap_mask("Ni", dr=14), name='Ni_Mask')(input_tensor_flattened) # get neurones from each elements from censored spectra c_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2] * 8, kernel_initializer=self.initializer, name='c_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_c_input)) c1_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='c1_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_c1_input)) n_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2] * 8, kernel_initializer=self.initializer, name='n_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_n_input)) o_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='o_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_o_input)) na_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='na_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_na_input)) mg_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='mg_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_mg_input)) al_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='al_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_al_input)) si_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='si_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_si_input)) p_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='p_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_p_input)) s_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='s_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_s_input)) k_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='k_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_k_input)) ca_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='ca_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_ca_input)) ti_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='ti_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_ti_input)) ti2_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='ti2_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_ti2_input)) v_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='v_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_v_input)) cr_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='cr_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_cr_input)) mn_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='mn_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_mn_input)) co_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='co_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_co_input)) ni_dense = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[2], kernel_initializer=self.initializer, name='ni_dense', activation=self.activation, kernel_regularizer=regularizers.l2(self.l2))(censored_ni_input)) # get neurones from each elements from censored spectra c_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3] * 4, kernel_initializer=self.initializer, activation=self.activation, name='c_dense_2')(c_dense)) c1_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='c1_dense_2')(c1_dense)) n_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3] * 4, kernel_initializer=self.initializer, activation=self.activation, name='n_dense_2')(n_dense)) o_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='o_dense_2')(o_dense)) na_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='na_dense_2')(na_dense)) mg_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='mg_dense_2')(mg_dense)) al_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='al_dense_2')(al_dense)) si_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='si_dense_2')(si_dense)) p_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='p_dense_2')(p_dense)) s_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='s_dense_2')(s_dense)) k_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='k_dense_2')(k_dense)) ca_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='ca_dense_2')(ca_dense)) ti_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='ti_dense_2')(ti_dense)) ti2_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='ti2_dense_2')(ti2_dense)) v_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='v_dense_2')(v_dense)) cr_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='cr_dense_2')(cr_dense)) mn_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='mn_dense_2')(mn_dense)) co_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='co_dense_2')(co_dense)) ni_dense_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)( Dense(units=self.num_hidden[3], kernel_initializer=self.initializer, activation=self.activation, name='ni_dense_2')(ni_dense)) # Basically the same as ApogeeBCNN structure cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(input_tensor) activation_1 = Activation(activation=self.activation)(cnn_layer_1) dropout_1 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_1) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(dropout_1) activation_2 = Activation(activation=self.activation)(cnn_layer_2) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(activation_2) flattener = Flatten()(maxpool_1) dropout_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(flattener) layer_3 = Dense(units=self.num_hidden[0], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_2) activation_3 = Activation(activation=self.activation)(layer_3) dropout_3 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_3) layer_4 = Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_3) activation_4 = Activation(activation=self.activation)(layer_4) teff_output = Dense(units=1)(activation_4) logg_output = Dense(units=1)(activation_4) fe_output = Dense(units=1)(activation_4) old_3_output_wo_grad = StopGrad()(concatenate([teff_output, logg_output, fe_output])) teff_output_var = Dense(units=1)(activation_4) logg_output_var = Dense(units=1)(activation_4) fe_output_var = Dense(units=1)(activation_4) aux_fullspec = Dense(units=self.num_hidden[4], kernel_initializer=self.initializer, kernel_constraint=MaxNorm(self.maxnorm), name='aux_fullspec')(activation_4) fullspec_hidden = concatenate([aux_fullspec, old_3_output_wo_grad]) # get the final answer c_concat = Dense(units=1, name='c_concat')(concatenate([c_dense_2, fullspec_hidden])) c1_concat = Dense(units=1, name='c1_concat')(concatenate([c1_dense_2, fullspec_hidden])) n_concat = Dense(units=1, name='n_concat')(concatenate([n_dense_2, fullspec_hidden])) o_concat = Dense(units=1, name='o_concat')(concatenate([o_dense_2, fullspec_hidden])) na_concat = Dense(units=1, name='na_concat')(concatenate([na_dense_2, fullspec_hidden])) mg_concat = Dense(units=1, name='mg_concat')(concatenate([mg_dense_2, fullspec_hidden])) al_concat = Dense(units=1, name='al_concat')(concatenate([al_dense_2, fullspec_hidden])) si_concat = Dense(units=1, name='si_concat')(concatenate([si_dense_2, fullspec_hidden])) p_concat = Dense(units=1, name='p_concat')(concatenate([p_dense_2, fullspec_hidden])) s_concat = Dense(units=1, name='s_concat')(concatenate([s_dense_2, fullspec_hidden])) k_concat = Dense(units=1, name='k_concat')(concatenate([k_dense_2, fullspec_hidden])) ca_concat = Dense(units=1, name='ca_concat')(concatenate([ca_dense_2, fullspec_hidden])) ti_concat = Dense(units=1, name='ti_concat')(concatenate([ti_dense_2, fullspec_hidden])) ti2_concat = Dense(units=1, name='ti2_concat')(concatenate([ti2_dense_2, fullspec_hidden])) v_concat = Dense(units=1, name='v_concat')(concatenate([v_dense_2, fullspec_hidden])) cr_concat = Dense(units=1, name='cr_concat')(concatenate([cr_dense_2, fullspec_hidden])) mn_concat = Dense(units=1, name='mn_concat')(concatenate([mn_dense_2, fullspec_hidden])) co_concat = Dense(units=1, name='co_concat')(concatenate([co_dense_2, fullspec_hidden])) ni_concat = Dense(units=1, name='ni_concat')(concatenate([ni_dense_2, fullspec_hidden])) # get the final predictive uncertainty c_concat_var = Dense(units=1, name='c_concat_var')(concatenate([c_dense_2, fullspec_hidden])) c1_concat_var = Dense(units=1, name='c1_concat_var')(concatenate([c1_dense_2, fullspec_hidden])) n_concat_var = Dense(units=1, name='n_concat_var')(concatenate([n_dense_2, fullspec_hidden])) o_concat_var = Dense(units=1, name='o_concat_var')(concatenate([o_dense_2, fullspec_hidden])) na_concat_var = Dense(units=1, name='na_concat_var')(concatenate([na_dense_2, fullspec_hidden])) mg_concat_var = Dense(units=1, name='mg_concat_var')(concatenate([mg_dense_2, fullspec_hidden])) al_concat_var = Dense(units=1, name='al_concat_var')(concatenate([al_dense_2, fullspec_hidden])) si_concat_var = Dense(units=1, name='si_concat_var')(concatenate([si_dense_2, fullspec_hidden])) p_concat_var = Dense(units=1, name='p_concat_var')(concatenate([p_dense_2, fullspec_hidden])) s_concat_var = Dense(units=1, name='s_concat_var')(concatenate([s_dense_2, fullspec_hidden])) k_concat_var = Dense(units=1, name='k_concat_var')(concatenate([k_dense_2, fullspec_hidden])) ca_concat_var = Dense(units=1, name='ca_concat_var')(concatenate([ca_dense_2, fullspec_hidden])) ti_concat_var = Dense(units=1, name='ti_concat_var')(concatenate([ti_dense_2, fullspec_hidden])) ti2_concat_var = Dense(units=1, name='ti2_concat_var')(concatenate([ti2_dense_2, fullspec_hidden])) v_concat_var = Dense(units=1, name='v_concat_var')(concatenate([v_dense_2, fullspec_hidden])) cr_concat_var = Dense(units=1, name='cr_concat_var')(concatenate([cr_dense_2, fullspec_hidden])) mn_concat_var = Dense(units=1, name='mn_concat_var')(concatenate([mn_dense_2, fullspec_hidden])) co_concat_var = Dense(units=1, name='co_concat_var')(concatenate([co_dense_2, fullspec_hidden])) ni_concat_var = Dense(units=1, name='ni_concat_var')(concatenate([ni_dense_2, fullspec_hidden])) # concatenate answer output = concatenate([teff_output, logg_output, c_concat, c1_concat, n_concat, o_concat, na_concat, mg_concat, al_concat, si_concat, p_concat, s_concat, k_concat, ca_concat, ti_concat, ti2_concat, v_concat, cr_concat, mn_concat, fe_output, co_concat, ni_concat], name='output') # concatenate predictive uncertainty variance_output = concatenate([teff_output_var, logg_output_var, c_concat_var, c1_concat_var, n_concat_var, o_concat_var, na_concat_var, mg_concat_var, al_concat_var, si_concat_var, p_concat_var, s_concat_var, k_concat_var, ca_concat_var, ti_concat_var, ti2_concat_var, v_concat_var, cr_concat_var, mn_concat_var, fe_output_var, co_concat_var, ni_concat_var], name='variance_output') model = Model(inputs=[input_tensor, labels_err_tensor], outputs=[output, variance_output]) # new astroNN high performance dropout variational inference on GPU expects single output model_prediction = Model(inputs=input_tensor, outputs=concatenate([output, variance_output])) variance_loss = mse_var_wrapper(output, labels_err_tensor) output_loss = mse_lin_wrapper(variance_output, labels_err_tensor) return model, model_prediction, output_loss, variance_loss
[docs]class ApogeeCNN(CNNBase, ASPCAP_plots): """ Class for Convolutional Neural Network for stellar spectra analysis :History: 2017-Dec-21 - Written - Henry Leung (University of Toronto) """ def __init__(self, lr=0.005): super().__init__() self._implementation_version = '1.0' self.initializer = 'he_normal' self.activation = 'relu' self.num_filters = [2, 4] self.filter_len = 8 self.pool_length = 4 self.num_hidden = [196, 96] self.max_epochs = 100 self.lr = lr self.reduce_lr_epsilon = 0.00005 self.reduce_lr_min = 1e-8 self.reduce_lr_patience = 2 self.l2 = 1e-5 self.dropout_rate = 0.1 self.input_norm_mode = 3 self.task = 'regression' self.targetname = ['teff', 'logg', 'M', 'alpha', 'C', 'C1', 'N', 'O', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'K', 'Ca', 'Ti', 'Ti2', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'fakemag'] def model(self): input_tensor = Input(shape=self._input_shape, name='input') cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(input_tensor) activation_1 = Activation(activation=self.activation)(cnn_layer_1) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(activation_1) activation_2 = Activation(activation=self.activation)(cnn_layer_2) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(activation_2) flattener = Flatten()(maxpool_1) dropout_1 = Dropout(self.dropout_rate)(flattener) layer_3 = Dense(units=self.num_hidden[0], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_1) activation_3 = Activation(activation=self.activation)(layer_3) dropout_2 = Dropout(self.dropout_rate)(activation_3) layer_4 = Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_2) activation_4 = Activation(activation=self.activation)(layer_4) layer_5 = Dense(units=self._labels_shape)(activation_4) output = Activation(activation=self._last_layer_activation, name='output')(layer_5) model = Model(inputs=input_tensor, outputs=output) return model
[docs]class StarNet2017(CNNBase, ASPCAP_plots): """ To create StarNet, S. Fabbro et al. (2017) arXiv:1709.09182. astroNN implemented the exact architecture with default parameter same as StarNet paper :History: 2017-Dec-23 - Written - Henry Leung (University of Toronto) """ def __init__(self): super().__init__() self.name = 'StarNet (arXiv:1709.09182)' self._implementation_version = '1.0' self.initializer = 'he_normal' self.activation = 'relu' self.num_filters = [4, 16] self.filter_len = 8 self.pool_length = 4 self.num_hidden = [256, 128] self.max_epochs = 30 self.lr = 0.0007 self.l2 = 0. self.reduce_lr_epsilon = 0.00005 self.reduce_lr_min = 0.00008 self.reduce_lr_patience = 2 self.early_stopping_min_delta = 0.0001 self.early_stopping_patience = 4 self.input_norm_mode = 3 self.task = 'regression' self.targetname = ['teff', 'logg', 'Fe'] def model(self): input_tensor = Input(shape=self._input_shape, name='input') cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len)(input_tensor) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len)(cnn_layer_1) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(cnn_layer_2) flattener = Flatten()(maxpool_1) layer_3 = Dense(units=self.num_hidden[0], kernel_initializer=self.initializer, activation=self.activation)( flattener) layer_4 = Dense(units=self.num_hidden[1], kernel_initializer=self.initializer, activation=self.activation)( layer_3) layer_out = Dense(units=self._labels_shape, kernel_initializer=self.initializer, activation=self._last_layer_activation, name='output')(layer_4) model = Model(inputs=input_tensor, outputs=layer_out) return model
# noinspection PyCallingNonCallable
[docs]class ApogeeCVAE(ConvVAEBase): """ Class for Convolutional Autoencoder Neural Network for stellar spectra analysis :History: 2017-Dec-21 - Written - Henry Leung (University of Toronto) """ def __init__(self): super().__init__() self._implementation_version = '1.0' self.batch_size = 64 self.initializer = 'he_normal' self.activation = 'relu' self.optimizer = None self.num_filters = [2, 4] self.filter_len = 8 self.pool_length = 4 self.num_hidden = [128, 64] self.latent_dim = 2 self.max_epochs = 100 self.lr = 0.0005 self.reduce_lr_epsilon = 0.0005 self.reduce_lr_min = 0.0000000001 self.reduce_lr_patience = 4 self.epsilon_std = 1.0 self.task = 'regression' self.keras_encoder = None self.keras_vae = None self.l1 = 1e-5 self.l2 = 1e-5 self.dropout_rate = 0.1 self._last_layer_activation = 'linear' self.targetname = 'Spectra Reconstruction' self.input_norm_mode = '2' self.labels_norm_mode = '2' def model(self): input_tensor = Input(shape=self._input_shape, name='input') cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(input_tensor) dropout_1 = Dropout(self.dropout_rate)(cnn_layer_1) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(dropout_1) dropout_2 = Dropout(self.dropout_rate)(cnn_layer_2) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(dropout_2) flattener = Flatten()(maxpool_1) layer_4 = Dense(units=self.num_hidden[0], kernel_regularizer=regularizers.l1(self.l1), kernel_initializer=self.initializer, activation=self.activation)(flattener) dropout_3 = Dropout(self.dropout_rate)(layer_4) layer_5 = Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l1(self.l1), kernel_initializer=self.initializer, activation=self.activation)(dropout_3) dropout_4 = Dropout(self.dropout_rate)(layer_5) z_mu = Dense(units=self.latent_dim, activation="linear", name='mean_output', kernel_initializer=self.initializer, kernel_regularizer=regularizers.l1(self.l1))(dropout_4) z_log_var = Dense(units=self.latent_dim, activation='linear', name='sigma_output', kernel_initializer=self.initializer, kernel_regularizer=regularizers.l1(self.l1))(dropout_4) z_mu, z_log_var = KLDivergenceLayer()([z_mu, z_log_var]) z_sigma = Lambda(lambda t: tf.exp(.5 * t))(z_log_var) eps = Input( tensor=tf.random_normal(mean=0., stddev=self.epsilon_std, shape=(tf.shape(z_mu)[0], self.latent_dim))) z_eps = Multiply()([z_sigma, eps]) z = Add()([z_mu, z_eps]) decoder = Sequential(name='output') decoder.add(Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l1(self.l1), kernel_initializer=self.initializer, activation=self.activation, input_dim=self.latent_dim)) decoder.add(Dropout(self.dropout_rate)) decoder.add(Dense(units=self._input_shape[0] * self.num_filters[1], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer, activation=self.activation)) decoder.add(Dropout(self.dropout_rate)) output_shape = (self.batch_size, self._input_shape[0], self.num_filters[1]) decoder.add(Reshape(output_shape[1:])) decoder.add(Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))) decoder.add(Dropout(self.dropout_rate)) decoder.add(Conv1D(kernel_initializer=self.initializer, activation=self.activation, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))) decoder.add(Conv1D(kernel_initializer=self.initializer, activation=self._last_layer_activation, padding="same", filters=1, kernel_size=self.filter_len, name='output')) x_pred = decoder(z) vae = Model(inputs=[input_tensor, eps], outputs=x_pred) encoder = Model(input_tensor, z_mu) return vae, encoder, decoder
class DeNormAdd(keras.layers.Layer): """ Just a layer to work around `TypeError: can't pickle _thread.lock objects` issue when saving this particular model For denormalizing """ def __init__(self, norm, name=None, **kwargs): self.norm = norm self.supports_masking = True if not name: prefix = self.__class__.__name__ name = prefix + '_' + str(keras.backend.get_uid(prefix)) super().__init__(name=name, **kwargs) def call(self, inputs, training=None): return tf.add(inputs, self.norm) # noinspection PyCallingNonCallable
[docs]class ApogeeDR14GaiaDR2BCNN(BayesianCNNBase): """ Class for Bayesian convolutional neural network for APOGEE DR14 Gaia DR2 :History: 2018-Nov-06 - Written - Henry Leung (University of Toronto) """ def __init__(self, lr=0.001, dropout_rate=0.3): super().__init__() self._implementation_version = '1.0' self.initializer = RandomNormal(mean=0.0, stddev=0.05) self.activation = 'relu' self.num_filters = [2, 4] self.filter_len = 8 self.pool_length = 4 self.num_hidden = [162, 64, 32, 16] self.max_epochs = 100 self.lr = lr self.reduce_lr_epsilon = 0.00005 self.reduce_lr_min = 1e-8 self.reduce_lr_patience = 2 self.l2 = 5e-9 self.dropout_rate = dropout_rate self.input_norm_mode = 3 self.task = 'regression' self.targetname = ['Ks-band fakemag'] def magmask(self): magmask = np.zeros(self._input_shape[0], dtype=np.bool) magmask[7514] = True # mask to extract extinction correction apparent magnitude return magmask def specmask(self): specmask = np.zeros(self._input_shape[0], dtype=np.bool) specmask[:7514] = True # mask to extract extinction correction apparent magnitude return specmask def gaia_aux_mask(self): gaia_aux = np.zeros(self._input_shape[0], dtype=np.bool) gaia_aux[7515:] = True # mask to extract data for gaia offset return gaia_aux def model(self): input_tensor = Input(shape=self._input_shape, name='input') # training data labels_err_tensor = Input(shape=(self._labels_shape,), name='labels_err') # extract spectra from input data and expand_dims for convolution spectra = Lambda(lambda x: tf.expand_dims(x, axis=-1))(BoolMask(self.specmask())(input_tensor)) # value to denorm magnitude app_mag = BoolMask(self.magmask())(input_tensor) # tf.convert_to_tensor(self.input_mean[self.magmask()]) denorm_mag = DeNormAdd(self.input_mean[self.magmask()])(app_mag) inv_pow_mag = Lambda(lambda mag: tf.pow(10., tf.multiply(-0.2, mag)))(denorm_mag) # data to infer Gia DR2 offset # ========================== Offset Calibration Model ========================== # gaia_aux_data = BoolMask(self.gaia_aux_mask())(input_tensor) gaia_aux_hidden = MCDropout(self.dropout_rate, disable=self.disable_dropout)(Dense(units=self.num_hidden[2], kernel_regularizer=regularizers.l2( self.l2), kernel_initializer=self.initializer, activation='tanh')( gaia_aux_data)) gaia_aux_hidden2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(Dense(units=self.num_hidden[3], kernel_regularizer=regularizers.l2( self.l2), kernel_initializer=self.initializer, activation='tanh')( gaia_aux_hidden)) offset = Dense(units=1, kernel_initializer=self.initializer, activation='tanh', name='offset_output')( gaia_aux_hidden2) # ========================== Offset Calibration Model ========================== # # good old NN takes spectra and output fakemag # ========================== Spectro-Luminosity Model ========================== # cnn_layer_1 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[0], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(spectra) activation_1 = Activation(activation=self.activation)(cnn_layer_1) dropout_1 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_1) cnn_layer_2 = Conv1D(kernel_initializer=self.initializer, padding="same", filters=self.num_filters[1], kernel_size=self.filter_len, kernel_regularizer=regularizers.l2(self.l2))(dropout_1) activation_2 = Activation(activation=self.activation)(cnn_layer_2) maxpool_1 = MaxPooling1D(pool_size=self.pool_length)(activation_2) flattener = Flatten()(maxpool_1) dropout_2 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(flattener) layer_3 = Dense(units=self.num_hidden[0], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_2) activation_3 = Activation(activation=self.activation)(layer_3) dropout_3 = MCDropout(self.dropout_rate, disable=self.disable_dropout)(activation_3) layer_4 = Dense(units=self.num_hidden[1], kernel_regularizer=regularizers.l2(self.l2), kernel_initializer=self.initializer)(dropout_3) activation_4 = Activation(activation=self.activation)(layer_4) fakemag_output = Dense(units=self._labels_shape, activation='softplus', name='fakemag_output')(activation_4) fakemag_variance_output = Dense(units=self._labels_shape, activation='linear', name='fakemag_variance_output')(activation_4) # ========================== Spectro-Luminosity Model ========================== # # multiply a pre-determined de-normalization factor, such that fakemag std approx. 1 for training set # it does not really matter as NN will adapt to whatever value this is _fakemag_denorm = Lambda(lambda x: tf.multiply(x, 73.85))(fakemag_output) _fakemag_var_denorm = Lambda(lambda x: tf.add(x, tf.log(73.85)))(fakemag_variance_output) _fakemag_parallax = Multiply()([_fakemag_denorm, inv_pow_mag]) # output parallax output = Add(name='output')([_fakemag_parallax, offset]) variance_output = Lambda(lambda x: tf.log(tf.abs(tf.multiply(x[2], tf.divide(tf.exp(x[0]), x[1])))), name='variance_output')([fakemag_variance_output, fakemag_output, _fakemag_parallax]) model = Model(inputs=[input_tensor, labels_err_tensor], outputs=[output, variance_output]) # new astroNN high performance dropout variational inference on GPU expects single output # while training with parallax, we want testing output fakemag model_prediction = Model(inputs=[input_tensor], outputs=concatenate([_fakemag_denorm, _fakemag_var_denorm])) variance_loss = mse_var_wrapper(output, labels_err_tensor) output_loss = mse_lin_wrapper(variance_output, labels_err_tensor) return model, model_prediction, output_loss, variance_loss