import importlib
import json
import os
import sys
import zipfile
import warnings
from packaging import version
import h5py
from astroNN.config import custom_model_path_reader, _astroNN_MODEL_NAME
from astroNN.models.apogee_models import (
ApogeeBCNN,
ApogeeCVAE,
ApogeeCNN,
ApogeeBCNNCensored,
ApogeeDR14GaiaDR2BCNN,
ApogeeKeplerEchelle,
ApogeeBCNNaux,
ApokascEncoderDecoder,
StarNet2017,
)
from astroNN.models.misc_models import Cifar10CNN, MNIST_BCNN
from astroNN.nn.losses import losses_lookup
from astroNN.nn.utilities import Normalizer
from astroNN.shared.dict_tools import dict_list_to_dict_np, list_to_dict
import keras
__all__ = [
"load_folder",
"ApogeeBCNN",
"ApogeeCVAE",
"ApogeeCNN",
"ApogeeBCNNCensored",
"ApogeeDR14GaiaDR2BCNN",
"ApogeeKeplerEchelle",
"ApogeeBCNNaux",
"ApokascEncoderDecoder",
"StarNet2017",
"Cifar10CNN",
"MNIST_BCNN",
]
optimizers = keras.optimizers
Sequential = keras.models.Sequential
def Galaxy10CNN():
"""
NAME:
Galaxy10CNN
PURPOSE:
setup Galaxy10CNN from Cifar10CNN with Galaxy10 parameter
INPUT:
OUTPUT:
(instance): a callable instances from Cifar10_CNN with Galaxy10 parameter
HISTORY:
2018-Feb-09 - Written - Henry Leung (University of Toronto)
2018-Apr-02 - Update - Henry Leung (University of Toronto)
"""
from astroNN.datasets.galaxy10 import galaxy10cls_lookup
from astroNN.models.misc_models import Cifar10CNN
galaxy10_net = Cifar10CNN()
galaxy10_net._model_identifier = "Galaxy10CNN"
targetname = []
for i in range(10):
targetname.extend([galaxy10cls_lookup(i)])
galaxy10_net.targetname = targetname
return galaxy10_net
def convert_custom_objects(obj):
"""Handles custom object lookup.
Based on Keras Source Code
# Arguments
obj: object, dict, or list.
# Returns
The same structure, where occurrences
of a custom object name have been replaced
with the custom object.
"""
if isinstance(obj, list):
deserialized = []
for value in obj:
deserialized.append(convert_custom_objects(value))
return deserialized
if isinstance(obj, dict):
deserialized = {}
for key, value in obj.items():
deserialized[key] = convert_custom_objects(value)
return deserialized
return obj
[docs]
def load_folder(folder=None):
"""
To load astroNN model object from folder
:param folder: [optional] you should provide folder name if outside folder, do not specific when you are inside the folder
:type folder: str
:return: astroNN Neural Network instance
:rtype: astroNN.nn.NeuralNetMaster.NeuralNetMaster
:History: 2017-Dec-29 - Written - Henry Leung (University of Toronto)
"""
currentdir = os.getcwd()
if folder is not None:
fullfilepath = os.path.abspath(os.path.join(currentdir, folder))
else:
fullfilepath = os.path.abspath(currentdir)
astronn_model_obj = None
# default filename
model_weights_filename = _astroNN_MODEL_NAME
# search for model weights if h5 or keras
legacy_h5_format = False
if not os.path.exists(os.path.join(fullfilepath, model_weights_filename)):
model_weights_filename = model_weights_filename.replace(".keras", ".h5")
legacy_h5_format = ~legacy_h5_format
if (
folder is not None
and os.path.isfile(os.path.join(folder, "astroNN_model_parameter.json")) is True
):
with open(os.path.join(folder, "astroNN_model_parameter.json")) as f:
parameter = json.load(f)
f.close()
elif os.path.isfile("astroNN_model_parameter.json") is True:
with open("astroNN_model_parameter.json") as f:
parameter = json.load(f)
f.close()
elif folder is not None and not os.path.exists(folder):
raise IOError("Folder not exists: " + str(currentdir + os.sep + folder))
else:
raise FileNotFoundError("Are you sure this is an astroNN generated folder?")
identifier = parameter["id"]
unknown_model_message = f"Unknown model identifier -> {identifier}!"
# need to point to the actual neural network if non-travial location
if identifier == "Galaxy10CNN":
astronn_model_obj = Galaxy10CNN()
else:
# else try to import it from standard way
try:
astronn_model_obj = getattr(
importlib.import_module("astroNN.models"), identifier
)()
except ImportError:
# try to load custom model from CUSTOM_MODEL_PATH if none are working
CUSTOM_MODEL_PATH = custom_model_path_reader()
# try the current folder and see if there is any .py on top of CUSTOM_MODEL_PATH
list_py_files = [
os.path.join(fullfilepath, f)
for f in os.listdir(fullfilepath)
if f.endswith(".py")
]
if CUSTOM_MODEL_PATH is None and list_py_files is None:
print("\n")
raise TypeError(unknown_model_message)
else:
for path_list in (
path_list
for path_list in [CUSTOM_MODEL_PATH, list_py_files]
if path_list is not None
):
for path in path_list:
head, tail = os.path.split(path)
sys.path.insert(0, head)
try:
model = getattr(
importlib.import_module(tail.strip(".py")),
str(identifier),
)
astronn_model_obj = model()
except AttributeError:
pass
if astronn_model_obj is None:
print("\n")
raise TypeError(unknown_model_message)
astronn_model_obj.currentdir = currentdir
astronn_model_obj.fullfilepath = fullfilepath
astronn_model_obj.folder_name = (
folder if folder is not None else os.path.basename(os.path.normpath(currentdir))
)
# Must have parameter
astronn_model_obj._input_shape = parameter["input"]
astronn_model_obj._labels_shape = parameter["labels"]
if not isinstance(astronn_model_obj._input_shape, dict):
astronn_model_obj._input_shape = {"input": astronn_model_obj._input_shape}
if not isinstance(astronn_model_obj._labels_shape, dict):
astronn_model_obj._labels_shape = {"output": astronn_model_obj._labels_shape}
astronn_model_obj.num_hidden = parameter["hidden"]
astronn_model_obj.input_norm_mode = parameter["input_norm_mode"]
astronn_model_obj.labels_norm_mode = parameter["labels_norm_mode"]
astronn_model_obj.batch_size = parameter["batch_size"]
astronn_model_obj.targetname = parameter["targetname"]
astronn_model_obj.val_size = parameter["valsize"]
# Conditional parameter depends on neural net architecture
try:
astronn_model_obj.num_filters = parameter["filternum"]
except KeyError:
pass
try:
astronn_model_obj.filter_len = parameter["filterlen"]
except KeyError:
pass
try:
pool_length = parameter["pool_length"]
if pool_length is not None:
if isinstance(pool_length, int): # multi-dimensional case
astronn_model_obj.pool_length = parameter["pool_length"]
else:
astronn_model_obj.pool_length = list(parameter["pool_length"])
except KeyError or TypeError:
pass
try:
# need to convert to int because of keras do not want array or list
astronn_model_obj.latent_dim = int(parameter["latent"])
except KeyError:
pass
try:
astronn_model_obj.task = parameter["task"]
except KeyError:
pass
try:
astronn_model_obj.dropout_rate = parameter["dropout_rate"]
except KeyError:
pass
try:
# if inverse model precision exists, so does length_scale
astronn_model_obj.inv_model_precision = parameter["inv_tau"]
astronn_model_obj.length_scale = parameter["length_scale"]
except KeyError:
pass
try:
astronn_model_obj.l1 = parameter["l1"]
except KeyError:
pass
try:
astronn_model_obj.l2 = parameter["l2"]
except KeyError:
pass
try:
astronn_model_obj.maxnorm = parameter["maxnorm"]
except KeyError:
pass
try:
astronn_model_obj.input_names = parameter["input_names"]
except KeyError:
astronn_model_obj.input_names = ["input"]
try:
astronn_model_obj.output_names = parameter["output_names"]
except KeyError:
astronn_model_obj.output_names = ["output"]
try:
astronn_model_obj._last_layer_activation = parameter["last_layer_activation"]
except KeyError:
pass
try:
astronn_model_obj.activation = parameter["activation"]
except KeyError:
pass
try:
astronn_model_obj.aux_length = parameter["aux_length"]
except KeyError:
pass
# treat .keras as zip file
if legacy_h5_format:
h5_obj = os.path.join(astronn_model_obj.fullfilepath, model_weights_filename)
else:
archive = zipfile.ZipFile(
os.path.join(astronn_model_obj.fullfilepath, model_weights_filename), "r"
)
h5_obj = archive.open("model.weights.h5")
with h5py.File(h5_obj, mode="r") as f:
# TODO: temporary fix for optimizer weights loading by printing it first so no weird errors
print(f["optimizer"]["vars"])
if legacy_h5_format: # legacy h5 format
training_config = json.loads(f.attrs["training_config"])
optimizer_config = training_config["optimizer_config"]
# Recover loss functions and metrics.
losses_raw = convert_custom_objects(training_config["loss"])
loss_weights = training_config["loss_weights"]
metrics_raw = convert_custom_objects(training_config["metrics"])
model_config = json.loads(f.attrs["model_config"])
# input/name names, mean, std
input_names = []
output_names = []
for lay in model_config["config"]["input_layers"]:
input_names.append(lay[0])
for lay in model_config["config"]["output_layers"]:
output_names.append(lay[0])
# for older models, they have -tf prefix like 2.1.6-tf which cannot be parsed by version
keras_version = (f.attrs["keras_version"]).replace("-tf", "")
else: # keras 3.0+ format
training_config = json.loads(archive.read("config.json"))
optimizer_config = training_config["compile_config"]["optimizer"]
# optimizer_vars = f["optimizer"]["vars"]
# Recover loss functions and metrics.
loss_config = training_config["compile_config"]["loss"]
if isinstance(loss_config, dict):
losses_raw = convert_custom_objects(loss_config["config"])
else:
losses_raw = convert_custom_objects(loss_config)
loss_weights = training_config["compile_config"]["loss_weights"]
metrics_config = training_config["compile_config"]["metrics"]
if isinstance(metrics_config, dict):
if "config" in metrics_config:
metrics_raw = convert_custom_objects(metrics_config["config"])
else:
metrics_raw = [
convert_custom_objects(metrics_config[_mc][0]["config"])
for _mc in metrics_config.keys()
]
elif isinstance(metrics_config, list):
metrics_raw = [
convert_custom_objects(_mc["config"]) for _mc in metrics_config
]
else:
metrics_raw = convert_custom_objects(metrics_config)
model_config = training_config["config"]
# input/name names, mean, std
input_names = []
output_names = []
for lay in training_config["config"]["input_layers"]:
input_names.append(lay[0])
for lay in training_config["config"]["output_layers"]:
output_names.append(lay[0])
keras_version = json.loads(archive.read("metadata.json"))["keras_version"]
optimizer = optimizers.deserialize(optimizer_config)
astronn_model_obj.input_mean = list_to_dict(
input_names, dict_list_to_dict_np(parameter["input_mean"])
)
astronn_model_obj.labels_mean = list_to_dict(
output_names, dict_list_to_dict_np(parameter["labels_mean"])
)
astronn_model_obj.input_std = list_to_dict(
input_names, dict_list_to_dict_np(parameter["input_std"])
)
astronn_model_obj.labels_std = list_to_dict(
output_names, dict_list_to_dict_np(parameter["labels_std"])
)
if losses_raw:
try:
loss = [losses_lookup(losses_raw[_loss]) for _loss in losses_raw]
except TypeError:
loss = losses_lookup(losses_raw)
except ValueError:
raise LookupError("Cant lookup loss")
else:
loss = None
# its weird that keras needs -> metrics[metric][0] instead of metrics[metric] likes losses
try:
try:
metrics = [
losses_lookup(_metric["config"]["fn"]) for _metric in metrics_raw[0]
]
except TypeError:
metrics = [losses_lookup(metrics_raw[0])]
except:
metrics = metrics_raw
weighted_metrics = None
# compile the model
astronn_model_obj.compile(
optimizer=optimizer,
loss=loss,
metrics=metrics,
weighted_metrics=weighted_metrics,
loss_weights=loss_weights,
)
# set weights
astronn_model_obj.keras_model.load_weights(
os.path.join(astronn_model_obj.fullfilepath, model_weights_filename)
)
# Build train function (to get weight updates), need to consider Sequential model too
astronn_model_obj.keras_model.make_train_function()
# TODO: maybe fix loading optimizer weights from legacy h5py file
if version.parse(keras_version) < version.parse("3.0.0"):
warnings.warn(
"This model is trained with Keras/TF Keras < v3.0 with incompatable optimizer, astroNN has switched to use Tensorflow/PyTorch with Keras v3.0+."
)
else:
# optimizer_weights_group = f["optimizer_weights"]
# if version.parse(h5py.__version__) >= version.parse("3.0"):
# optimizer_weight_names = [
# n for n in optimizer_weights_group.attrs["weight_names"]
# ]
# else:
# optimizer_weight_names = [
# n.decode("utf8")
# for n in optimizer_weights_group.attrs["weight_names"]
# ]
# optimizer_weight_values = [
# optimizer_weights_group[n] for n in optimizer_weight_names
# ]
# astronn_model_obj.keras_model.optimizer.build(
# astronn_model_obj.keras_model.trainable_variables
# )
# astronn_model_obj.keras_model.optimizer.set_weights(optimizer_weight_values)
astronn_model_obj.keras_model.optimizer.load_own_variables(
f["optimizer"]["vars"]
)
# create normalizer and set correct mean and std
astronn_model_obj.input_normalizer = Normalizer(
mode=astronn_model_obj.input_norm_mode
)
astronn_model_obj.labels_normalizer = Normalizer(
mode=astronn_model_obj.labels_norm_mode
)
astronn_model_obj.input_normalizer.mean_labels = astronn_model_obj.input_mean
astronn_model_obj.input_normalizer.std_labels = astronn_model_obj.input_std
astronn_model_obj.labels_normalizer.mean_labels = astronn_model_obj.labels_mean
astronn_model_obj.labels_normalizer.std_labels = astronn_model_obj.labels_std
print("========================================================")
print(f"Loaded astroNN model, model type: {astronn_model_obj.name} -> {identifier}")
print("========================================================")
return astronn_model_obj