Source code for astroNN.apogee.chips

# ---------------------------------------------------------#
#   astroNN.apogee.chips: tools for dealing with apogee camera chips
# ---------------------------------------------------------#

import os
import warnings

import numpy as np

import astroNN
import astroNN.data
from astroNN.apogee.apogee_shared import apogee_default_dr


[docs] def chips_pix_info(dr=None): """ To return chips info according to dr :param dr: data release :type dr: Union(int, NoneType) :return: | The starting and ending pixels location of APOGEE camera chips in the original 8575 pixels spectra | - list[0] refers to the location where blue chips starts | - list[1] refers to the location where blue chips ends | - list[2] refers to the location where green chips starts | - list[3] refers to the location where blue chips end | - list[4] refers to the location where red chips starts | - list[5] refers to the location where red chips ends | - list[6] refers to the total number of pixels after deleting gap :rtype: list :History: | 2017-Nov-27 - Written - Henry Leung (University of Toronto) | 2017-Dec-16 - Updated - Henry Leung (University of Toronto) """ dr = apogee_default_dr(dr=dr) if dr == 11 or dr == 12: blue_start = 322 blue_end = 3242 green_start = 3648 green_end = 6048 red_start = 6412 red_end = 8306 total_pixel = 7214 elif 13 <= dr <= 17: blue_start = 246 blue_end = 3274 green_start = 3585 green_end = 6080 red_start = 6344 red_end = 8335 total_pixel = 7514 else: raise ValueError("Only DR11 to DR16 are supported") return [ blue_start, blue_end, green_start, green_end, red_start, red_end, total_pixel, ]
[docs] def gap_delete(spectra, dr=None): """ To delete the gap between APOGEE CCDs from the original 8575 pixels spectra :param spectra: The original 8575 pixels spectrum/spectra :type spectra: ndarray :param dr: data release :type dr: Union(int, NoneType) :return: Gap deleted spectrum/spectra :rtype: ndarray :History: | 2017-Oct-26 - Written - Henry Leung (University of Toronto) | 2017-Dec-16 - Updated - Henry Leung (University of Toronto) """ dr = apogee_default_dr(dr=dr) spectra = np.atleast_2d(spectra) info = chips_pix_info(dr=dr) if spectra.shape[1] != 8575 and spectra.shape[1] != info[6]: raise EnvironmentError("Are you sure you are giving astroNN APOGEE spectra?") if spectra.shape[1] != info[6]: spectra = spectra[ :, np.r_[info[0] : info[1], info[2] : info[3], info[4] : info[5]] ] return spectra
[docs] def wavelength_solution(dr=None): """ To return wavelegnth_solution, apStarWavegrid was provided by Jo Bovy's apogee tools (Toronto) :param dr: data release :type dr: Union(int, NoneType) :return: | lambda_blue, lambda_green, lambda_red which are 3 wavelength solution array | - lambda_blue refers to the wavelength solution for each pixel in blue chips | - lambda_green refers to the wavelength solution for each pixel in green chips | - lambda_red refers to the wavelength solution for each pixel in red chips :rtype: ndarray :History: | 2017-Nov-20 - Written - Henry Leung (University of Toronto) | 2017-Dec-16 - Updated - Henry Leung (University of Toronto) """ dr = apogee_default_dr(dr=dr) info = chips_pix_info(dr=dr) apstar_wavegrid = 10.0 ** np.arange( 4.179, 4.179 + 8575 * 6.0 * 10.0**-6.0, 6.0 * 10.0**-6.0 ) lambda_blue = apstar_wavegrid[info[0] : info[1]] lambda_green = apstar_wavegrid[info[2] : info[3]] lambda_red = apstar_wavegrid[info[4] : info[5]] return lambda_blue, lambda_green, lambda_red
[docs] def chips_split(spectra, dr=None): """ To split APOGEE spectra into RGB chips, will delete the gap if detected :param spectra: APOGEE spectrum/spectra :type spectra: ndarray :param dr: data release :type dr: Union(int, NoneType) :return: 3 ndarrays which are spectra_blue, spectra_green, spectra_red :rtype: ndarray :History: | 2017-Nov-20 - Written - Henry Leung (University of Toronto) | 2017-Dec-17 - Updated - Henry Leung (University of Toronto) """ dr = apogee_default_dr(dr=dr) info = chips_pix_info(dr=dr) blue = info[1] - info[0] green = info[3] - info[2] red = info[5] - info[4] spectra = np.atleast_2d(spectra) if spectra.shape[1] == 8575: spectra = gap_delete(spectra, dr=dr) warnings.warn( "Raw spectra with gaps between detectors, gaps are removed automatically" ) elif spectra.shape[1] == info[6]: pass else: raise EnvironmentError("Are you sure you are giving me APOGEE spectra?") spectra_blue = spectra[:, 0:blue] spectra_green = spectra[:, blue : (blue + green)] spectra_red = spectra[:, (blue + green) : (blue + green + red)] return spectra_blue, spectra_green, spectra_red
[docs] def bitmask_boolean(bitmask, target_bit): """ Turn bitmask to boolean with provided bitmask array and target bit to mask :param bitmask: bitmask :type bitmask: ndarray :param target_bit: target bit to mask :type target_bit: list[int] :return: boolean array, True for clean, False for masked :rtype: ndarray[bool] :History: 2018-Feb-03 - Written - Henry Leung (University of Toronto) """ target_bit = np.array(target_bit) target_bit = np.sum(2**target_bit) bitmask = np.atleast_2d(bitmask) boolean_output = np.zeros(bitmask.shape, dtype=bool) boolean_output[(bitmask & target_bit) != 0] = True return boolean_output
[docs] def bitmask_decompositor(bit): """ To decompose a bit from bitmask array to individual bit :param bit: bitmask :type bit: int :return: boolean array, True for clean, False for masked :rtype: ndarray[bool] :History: 2018-Feb-03 - Written - Henry Leung (University of Toronto) """ bitmask_num = int(bit) if bitmask_num < 0: raise ValueError( f"Your number ({bit}) is not valid, this value must not from a bitmask" ) if bitmask_num == 0: print("0 corresponds to good pixel, thus this bit cannot be decomposed") return None decomposited_bits = [int(np.log2(bitmask_num))] while True: if bitmask_num - 2 ** decomposited_bits[-1] == 0: decomposited_bits.sort() decomposited_bits = np.array(decomposited_bits) break bitmask_num -= 2 ** decomposited_bits[-1] if bitmask_num != 0: decomposited_bits.append(int(np.log2(bitmask_num))) return decomposited_bits
def continuum(spectra, spectra_err, cont_mask, deg=2): """ Fit Chebyshev polynomials to the flux values in the continuum mask by chips. The resulting continuum will have the same shape as `fluxes`. :param spectra: spectra :type spectra: ndarray :param spectra_err: spectra uncertainty, same shape as spectra :type spectra_err: ndarray :param cont_mask: continuum mask :type cont_mask: ndarray[bool] :param deg: The degree of Chebyshev polynomial to use in each region, default is 2 which works the best so far :type deg: int :return: normalized spectra, normalized spectra uncertainty :rtype: ndarray, ndarray :History: | 2017-Dec-04 - Written - Henry Leung (University of Toronto) | 2017-Dec-16 - Update - Henry Leung (University of Toronto) | 2018-Mar-21 - Update - Henry Leung (University of Toronto) """ spectra = np.atleast_2d(np.array(spectra)) spectra_err = np.atleast_2d(np.array(spectra_err)) flux_ivars = 1 / ( np.square(np.array(spectra_err)) + 1e-8 ) # for numerical stability pix_element = np.arange(spectra.shape[1]) # Array with size spectra for counter, (spectrum, spectrum_err, flux_ivar) in enumerate( zip(spectra, spectra_err, flux_ivars) ): no_nan_mask = ~np.isnan(spectrum[cont_mask]) fit = np.polynomial.chebyshev.Chebyshev.fit( x=np.arange(spectrum.shape[0])[cont_mask][no_nan_mask], y=spectrum[cont_mask][no_nan_mask], w=flux_ivar[cont_mask][no_nan_mask], deg=deg, ) spectra[counter] = spectrum / fit(pix_element) spectra_err[counter] = spectrum_err / fit(pix_element) return spectra, spectra_err
[docs] def apogee_continuum( spectra, spectra_err, cont_mask=None, deg=2, dr=None, bitmask=None, target_bit=None, mask_value=1.0, ): """ It is designed only for apogee spectra by fitting Chebyshev polynomials to the flux values in the continuum mask by chips. The resulting continuum will have the same shape as `fluxes`. :param spectra: spectra :type spectra: ndarray :param spectra_err: spectra uncertainty, same shape as spectra :type spectra_err: ndarray :param cont_mask: continuum mask :type cont_mask: ndarray[bool] :param deg: The degree of Chebyshev polynomial to use in each region, default is 2 which works the best so far :type deg: int :param dr: apogee dr :type dr: int :param bitmask: bitmask array of the spectra, same shape as spectra :type bitmask: ndarray :param target_bit: a list of bit to be masked :type target_bit: Union(int, list[int], ndarray[int]) :param mask_value: if a pixel is determined to be a bad pixel, this value will be used to replace that pixel flux :type mask_value: Union(int, float) :return: normalized spectra, normalized spectra uncertainty :rtype: ndarray, ndarray :History: 2018-Mar-21 - Written - Henry Leung (University of Toronto) """ dr = apogee_default_dr(dr=dr) spectra = gap_delete(spectra, dr=dr) flux_errs = gap_delete(spectra_err, dr=dr) spectra_blue, spectra_green, spectra_red = chips_split(spectra, dr=dr) yerrs_blue, yerrs_green, yerrs_red = chips_split(flux_errs, dr=dr) if cont_mask is None: maskpath = os.path.join(astroNN.data.datapath(), f"dr{dr}_contmask.npy") cont_mask = np.load(maskpath) con_mask_blue, con_mask_green, con_mask_red = chips_split(cont_mask, dr=dr) con_mask_blue, con_mask_green, con_mask_red = ( con_mask_blue[0], con_mask_green[0], con_mask_red[0], ) # Continuum chips by chips blue_spectra, blue_spectra_err = continuum( spectra_blue, yerrs_blue, cont_mask=con_mask_blue, deg=deg ) green_spectra, green_spectra_err = continuum( spectra_green, yerrs_green, cont_mask=con_mask_green, deg=deg ) red_spectra, red_spectra_err = continuum( spectra_red, yerrs_red, cont_mask=con_mask_red, deg=deg ) normalized_spectra = np.concatenate( (blue_spectra, green_spectra, red_spectra), axis=1 ) normalized_spectra_err = np.concatenate( (blue_spectra_err, green_spectra_err, red_spectra_err), axis=1 ) # set negative flux and error as 0 normalized_spectra[normalized_spectra < 0.0] = 0.0 normalized_spectra_err[normalized_spectra < 0.0] = 0.0 # set inf and nan as 0 normalized_spectra[np.isinf(normalized_spectra)] = mask_value normalized_spectra[np.isnan(normalized_spectra)] = mask_value normalized_spectra_err[np.isinf(normalized_spectra)] = 0.0 normalized_spectra_err[np.isnan(normalized_spectra)] = 0.0 if bitmask is not None: bitmask = gap_delete(bitmask, dr=dr) if target_bit is None: target_bit = [0, 1, 2, 3, 4, 5, 6, 7, 12] mask = bitmask_boolean(bitmask, target_bit) normalized_spectra[mask] = mask_value normalized_spectra_err[mask] = mask_value return normalized_spectra, normalized_spectra_err
[docs] def aspcap_mask(elem, dr=None): """ | To load ASPCAP elements window masks | DR14 Elements: ``'C', 'CI', 'N', 'O', 'Na', 'Mg', 'Al', 'Si', 'P', 'S', 'K', 'Ca', 'TI', 'TiII', 'V', 'Cr', 'Mn', 'Fe', 'Co', 'Ni', 'Cu', 'Ge', 'Ce', 'Rb', 'Y', 'Nd'`` :param elem: element name :type elem: str :param dr: apogee dr :type dr: int :return: mask :rtype: ndarray[bool] :History: 2018-Mar-24 - Written - Henry Leung (University of Toronto) """ if elem.lower() == "c1": elem = "CI" elif elem.lower() == "ti2": elem = "TiII" dr = apogee_default_dr(dr=dr) if 14 <= dr <= 17: aspcap_code = "l31c" elem_list = [ "C", "CI", "N", "O", "Na", "Mg", "Al", "Si", "P", "S", "K", "Ca", "TI", "TiII", "V", "Cr", "Mn", "Fe", "Co", "Ni", "Cu", "Ge", "Ce", "Rb", "Y", "Nd", ] else: raise ValueError("Only DR14-DR16 is supported currently") masks = np.load( os.path.join(astroNN.data.datapath(), f"aspcap_{aspcap_code}_masks.npy") ) try: # turn everything to lowercase to avoid case-related issue index = [x.lower() for x in elem_list].index(elem.lower()) except ValueError: # nicely handle if element not found print( f"Element not found, the only elements for dr{dr} supported are {elem_list}" ) return None return [(masks & 2**index) != 0][0]