Source code for astroNN.datasets.xmatch

# ---------------------------------------------------------#
#   astroNN.datasets.xmatch: matching function between catalog
# ---------------------------------------------------------#

import pathlib
import numpy as np
import h5py
from astropy.io import fits
from astropy import units as u
import astropy.coordinates as acoords


[docs] def xmatch( ra1, dec1, ra2, dec2, epoch1=2000.0, epoch2=2000.0, pmra2=None, pmdec2=None, maxdist=2, ): """ Cross-matching between arrays by RA/DEC coordiantes :param ra1: 1d array for the first catalog RA :type ra1: ndarray :param dec1: 1d array for the first catalog DEC :type dec1: ndarray :param ra2: 1d array for the second catalog RA :type ra2: ndarray :param dec2: 1d array for the second catalog DEC :type dec2: ndarray :param epoch1: Epoch for the first catalog, can be float or 1d array :type epoch1: Union([float, ndarray]) :param epoch1: Epoch for the second catalog, can be float or 1d array :type epoch1: Union([float, ndarray]) :param pmra2: RA proper motion for second catalog, only effective if `epoch1` not equals `epoch2` :type pmra2: ndarray :param pmdec2: DEC proper motion for second catalog, only effective if `epoch1` not equals `epoch2` :type pmdec2: ndarray :param maxdist: Maximium distance in arcsecond :type maxdist: float :return: numpy array of ra, dec, separation :rtype: ndarrays :History: | 2018-Jan-25 - Written - Henry Leung (University of Toronto) | 2021-Jan-29 - Updated - Henry Leung (University of Toronto) """ depoch = epoch2 - epoch1 if np.any(depoch != 0.0): # Use proper motion to get both catalogs at the same time dra = pmra2 / np.cos(dec2 / 180.0 * np.pi) / 3600000.0 * depoch ddec = pmdec2 / 3600000.0 * depoch else: dra = 0.0 ddec = 0.0 mc1 = acoords.SkyCoord(ra1, dec1, unit=(u.degree, u.degree), frame="icrs") mc2 = acoords.SkyCoord( ra2 - dra, dec2 - ddec, unit=(u.degree, u.degree), frame="icrs" ) idx, d2d, d3d = mc1.match_to_catalog_sky(mc2) # to make sure filtering out all neg ones which are untouched mindx = (d2d < maxdist * u.arcsec) & (0.0 * u.arcsec <= d2d) m1 = np.arange(len(ra1))[mindx] m2 = idx[mindx] return m1, m2, d2d[mindx]
def xmatch_cat( cat1=None, cat2=None, maxdist=2.0, ra1="ra", dec1="dec", epoch1=2000.0, ra2="ra", dec2="dec", epoch2=2000.0, pmra2="pmra", pmdec2="pmdec", field=None, ): """ Cross-matching between two catalog files by RA/DEC coordiantes :param cat1: Catalog 1, can be path to `.fits` or `.h5` or opened fits or h5 files :type cat1: str :param cat2: Catalog 2, can be path to `.fits` or `.h5` or opened fits or h5 files :type cat2: str :param maxdist: Maximium distance in arcsecond :type maxdist: float :param ra1: Field for RA in Catalog 1 :type ra1: str :param dec1: Field for DEC in Catalog 1 :type dec1: str :param ra2: Field for RA in Catalog 2 :type ra2: str :param dec2: Field for DEC in Catalog 2 :type dec2: str :type maxdist: str :param maxdist: Maximium distance in arcsecond :type maxdist: float :param field: Additional field name, if not None then cross-match objects within the same field between the two catalog :type field: str :return: numpy array of ra, dec, array, err_array :rtype: ndarrays :History: | 2021-Jan-29 - Written - Henry Leung (University of Toronto) """ if isinstance(cat1, (str, pathlib.Path)): cat1_ext = pathlib.Path(cat1).suffix if cat1_ext.lower == ".h5": cat1 = h5py.File(cat1, mode="r") elif cat1_ext.lower == ".fits" or cat1_ext.lower == ".fit": cat1 = fits.getdata(cat1) else: raise TypeError(f"Unsupported file type {cat1_ext}") if isinstance(cat2, (str, pathlib.Path)): cat2_ext = pathlib.Path(cat2).suffix if cat2_ext.lower == ".h5": cat2 = h5py.File(cat2, mode="r") elif cat2_ext.lower == ".fits" or cat2_ext.lower == ".fit": cat2 = fits.getdata(cat2) else: raise TypeError(f"Unsupported file type {cat2_ext}") depoch = epoch2 - epoch1 if field is not None: try: # check if the field actually exists in both cat1/cat2 cat1[field] cat2[field] except KeyError: # python 2/3 format string raise KeyError(f"'{field}' does not exist in both catalog") uniques = np.unique(cat1[field]) d2d = np.ones(len(cat1)) * -1.0 idx = np.zeros(len(cat1), dtype=int) for unique in uniques: # loop over the class idx_1 = np.arange(cat1[ra1].shape[0])[cat1[field] == unique] idx_2 = np.arange(cat2[ra2].shape[0])[cat2[field] == unique] idx1, idx2, sep = xmatch( cat1[ra1], cat1[dec1], cat2[ra2], cat2[dec2], epoch1=epoch1, epoch2=epoch2, pmra2=None if np.all(depoch == 0.0) else cat2[pmra2], pmdec2=None if np.all(depoch == 0.0) else cat2[pmdec2], maxdist=maxdist, ) else: idx1, idx2, sep = xmatch( cat1[ra1], cat1[dec1], cat2[ra2], cat2[dec2], epoch1=epoch1, epoch2=epoch2, pmra2=None if np.all(depoch == 0.0) else cat2[pmra2], pmdec2=None if np.all(depoch == 0.0) else cat2[pmdec2], maxdist=maxdist, ) return idx1, idx2, sep