"""Librarian — automated photometry and astrometry retrieval.
Queries Gaia DR3 best_neighbour tables for catalog crossmatching,
with VizieR XMatch fallback when Gaia TAP is unavailable.
Uses pyphot filter names internally (ARIADNE-compatible).
Ported from astroARIADNE with cleaner declarative structure.
"""
__all__ = ["Librarian"]
import logging
import warnings
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
import numpy as np
import numpy.ma as ma
from astropy.coordinates import SkyCoord
import astropy.units as u
from astroquery.gaia import Gaia
from astroquery.mast import Catalogs
from astroquery.vizier import Vizier as _VizierClass
from astroquery.xmatch import XMatch
from regions import CircleSkyRegion
logger = logging.getLogger(__name__)
warnings.filterwarnings("ignore", category=UserWarning, append=True)
# Local Vizier instance — never mutate the module-level singleton, which is
# shared across the whole Python process and breaks any other consumer of
# astroquery in the same interpreter.
Vizier = _VizierClass(row_limit=-1, columns=["all"], timeout=60)
Gaia.TIMEOUT = 60
XMatch.TIMEOUT = 60
Catalogs.TIMEOUT = 60
# Shared executor used by _with_timeout — avoids per-call pool spawn/teardown.
_TIMEOUT_POOL = ThreadPoolExecutor(max_workers=4, thread_name_prefix="librarian")
# Module-level dustmap caches. Constructing SFDQuery loads ~600 MB of FITS;
# repeating per-Star creation makes catalogue-scale runs unusable.
_DUSTMAP_CACHE: dict = {}
# Schlafly+11 SFD V-band coefficient: A_V = 2.742 * E(B-V).
_AV_PER_EBV_SFD = 2.742
# ── Data structures ──────────────────────────────────────────────
@dataclass(frozen=True, slots=True)
class MagSpec:
"""One photometric band within a catalog."""
col: str
err_col: str
pyphot: str
@dataclass(frozen=True, slots=True)
class CatalogDef:
"""Declarative catalog definition."""
vizier_id: str | None
bands: tuple[MagSpec, ...]
xmatch_table: str | None = None
xmatch_id_cols: tuple[str, ...] | None = None
qc: Callable | None = None
# ── Helpers ──────────────────────────────────────────────────────
_QUERY_TIMEOUT = 60 # seconds per network query
def _with_timeout(func, *args, timeout=_QUERY_TIMEOUT, **kwargs):
"""Run any callable with a hard timeout. Returns None on timeout or error."""
from concurrent.futures import TimeoutError as FuturesTimeout
future = _TIMEOUT_POOL.submit(func, *args, **kwargs)
try:
return future.result(timeout=timeout)
except FuturesTimeout:
logger.warning("Query timed out after %ds", timeout)
return None
except Exception as e:
logger.warning("Query failed: %s", e)
return None
def _tap_query(service, query, timeout=_QUERY_TIMEOUT):
"""Run a TAP async query with a hard timeout. Returns astropy Table or None."""
def _run():
job = service.launch_job_async(query)
return job.get_results()
return _with_timeout(_run, timeout=timeout)
from ._qc import (
_col,
_qc_2mass_band,
_qc_galex_band,
_qc_mag,
_qc_ps1,
_qc_sdss,
_qc_skymapper,
_qc_wise_band,
_qc_wise_extended,
)
# ── Band index maps ─────────────────────────────────────────────
_2MASS_BAND_IDX = {"2MASS_J": 0, "2MASS_H": 1, "2MASS_Ks": 2}
_WISE_BAND_IDX = {"WISE_RSR_W1": 0, "WISE_RSR_W2": 1}
# ── Catalog registry ────────────────────────────────────────────
_CATALOGS = {
"Gaia": CatalogDef(
vizier_id="I/355/gaiadr3",
bands=(
MagSpec("Gmag", "e_Gmag", "Gaia_G"),
MagSpec("BPmag", "e_BPmag", "Gaia_BP"),
MagSpec("RPmag", "e_RPmag", "Gaia_RP"),
),
),
"2MASS": CatalogDef(
vizier_id="II/246/out",
bands=(
MagSpec("Jmag", "e_Jmag", "2MASS_J"),
MagSpec("Hmag", "e_Hmag", "2MASS_H"),
MagSpec("Kmag", "e_Kmag", "2MASS_Ks"),
),
xmatch_table="tmass_psc_xsc",
xmatch_id_cols=("2MASS", "_2MASS", "_2M"),
),
"Wise": CatalogDef(
vizier_id="II/328/allwise",
bands=(
MagSpec("W1mag", "e_W1mag", "WISE_RSR_W1"),
MagSpec("W2mag", "e_W2mag", "WISE_RSR_W2"),
),
xmatch_table="allwise",
xmatch_id_cols=("AllWISE",),
),
"Pan-STARRS": CatalogDef(
vizier_id="II/349/ps1",
bands=(
MagSpec("gmag", "e_gmag", "PS1_g"),
MagSpec("rmag", "e_rmag", "PS1_r"),
MagSpec("imag", "e_imag", "PS1_i"),
MagSpec("zmag", "e_zmag", "PS1_z"),
MagSpec("ymag", "e_ymag", "PS1_y"),
),
xmatch_table="panstarrs1",
xmatch_id_cols=("objID",),
qc=_qc_ps1,
),
"SDSS": CatalogDef(
vizier_id="V/147/sdss12",
bands=(
MagSpec("umag", "e_umag", "SDSS_u"),
MagSpec("gmag", "e_gmag", "SDSS_g"),
MagSpec("rmag", "e_rmag", "SDSS_r"),
MagSpec("imag", "e_imag", "SDSS_i"),
MagSpec("zmag", "e_zmag", "SDSS_z"),
),
xmatch_table="sdssdr13",
xmatch_id_cols=("objID",),
qc=_qc_sdss,
),
"TYCHO2": CatalogDef(
vizier_id="I/259/tyc2",
bands=(
MagSpec("BTmag", "e_BTmag", "TYCHO_B_MvB"),
MagSpec("VTmag", "e_VTmag", "TYCHO_V_MvB"),
),
xmatch_table="tycho2tdsc_merge",
),
"APASS": CatalogDef(
vizier_id=None,
bands=(
MagSpec("vmag", "e_vmag", "GROUND_JOHNSON_V"),
MagSpec("bmag", "e_bmag", "GROUND_JOHNSON_B"),
MagSpec("g_mag", "e_g_mag", "SDSS_g"),
MagSpec("r_mag", "e_r_mag", "SDSS_r"),
MagSpec("i_mag", "e_i_mag", "SDSS_i"),
),
xmatch_table="apassdr9",
),
"SkyMapper": CatalogDef(
vizier_id=None,
bands=(
MagSpec("u_psf", "e_u_psf", "SkyMapper_u"),
MagSpec("v_psf", "e_v_psf", "SkyMapper_v"),
MagSpec("g_psf", "e_g_psf", "SkyMapper_g"),
MagSpec("r_psf", "e_r_psf", "SkyMapper_r"),
MagSpec("i_psf", "e_i_psf", "SkyMapper_i"),
MagSpec("z_psf", "e_z_psf", "SkyMapper_z"),
),
xmatch_table="skymapperdr2",
xmatch_id_cols=("ObjectId", "object_id"),
qc=_qc_skymapper,
),
"TESS": CatalogDef(
vizier_id=None,
bands=(MagSpec("Tmag", "e_Tmag", "TESS"),),
),
"GALEX": CatalogDef(
vizier_id="II/312/ais",
bands=(
MagSpec("FUV", "e_FUV", "GALEX_FUV"),
MagSpec("NUV", "e_NUV", "GALEX_NUV"),
),
),
"GLIMPSE": CatalogDef(
vizier_id="II/293/glimpse",
bands=(
MagSpec("_3.6mag", "e_3.6mag", "SPITZER_IRAC_36"),
MagSpec("_4.5mag", "e_4.5mag", "SPITZER_IRAC_45"),
),
xmatch_id_cols=("2MASS", "_2MASS", "_2M", "Designation"),
),
}
# Standard catalogs: VizieR cone search + crossmatched ID match
_STANDARD_CATALOGS = ("2MASS", "Wise", "Pan-STARRS", "SDSS", "TYCHO2", "GLIMPSE")
# VizieR XMatch fallback config: catalog_name -> (vizier_cat, id_cols)
_XMATCH_CONFIG = {
"2MASS": ("vizier:II/246/out", ("2MASS", "_2MASS", "_2M")),
"Wise": ("vizier:II/328/allwise", ("AllWISE",)),
"Pan-STARRS": ("vizier:II/349/ps1", ("objID",)),
"SDSS": ("vizier:V/147/sdss12", ("objID",)),
"TYCHO2": ("vizier:I/259/tyc2", None),
"APASS": ("vizier:II/336/apass9", None),
}
# Catalogs only available via VizieR cone search (not XMatch)
_CONE_ONLY_FALLBACK = {
"RAVE": ("III/283/madera", ("ObsID",)),
"SkyMapper": ("II/379", ("ObjectId", "object_id")),
}
# ── Constants ────────────────────────────────────────────────────
_PLX_ZEROPOINT = 0.037 # Lindegren+21 parallax zero-point (mas)
_PLX_SYS_ERR = 0.02 # systematic floor (mas), added in quadrature
# ── Librarian ────────────────────────────────────────────────────
[docs]
class Librarian:
"""Automated photometry and astrometry retrieval for isochrone fitting.
Uses Gaia DR3 best_neighbour tables for catalog crossmatching,
with VizieR XMatch fallback when Gaia TAP is unavailable.
Parameters
----------
ra, dec : float
Right ascension and declination in degrees.
gaia_id : int, optional
Gaia DR3 source_id (skips cone search if provided).
radius : float
Search radius in arcseconds (default 3).
ignore : sequence of str
Catalog names to skip (e.g. ["SDSS", "APASS"]).
verbose : bool
Print retrieval summary.
"""
def __init__(
self,
ra: float,
dec: float,
*,
gaia_id: int | None = None,
radius: float = 3.0,
ignore: Sequence[str] = (),
verbose: bool = True,
feh_source: str = "hypatia",
hypatia_statistic: str = "median",
hypatia_uncertainty: str = "std",
hypatia_solarnorm: str = "asplund09",
feh_floor: float = 0.10,
):
self.ra = ra
self.dec = dec
self._search_radius = radius * u.arcsec
self._coord = SkyCoord(ra=ra, dec=dec, unit="deg")
self._ignore = set(ignore)
# [Fe/H] prior configuration.
# feh_source: "hypatia" → Hypatia for [Fe/H] (survey fallback on miss),
# surveys for Teff/logg; "survey" → surveys only.
self._feh_source = feh_source
self._hypatia_statistic = hypatia_statistic
self._hypatia_uncertainty = hypatia_uncertainty
self._hypatia_solarnorm = hypatia_solarnorm
self._feh_floor = feh_floor
self._magnitudes: dict[str, tuple[float, float]] = {}
self._gaia_id: int | None = gaia_id
self._ids: dict[str, str | int | None] = {}
self._parallax: float | None = None
self._parallax_e: float | None = None
self._teff: float | None = None
self._teff_e: float | None = None
self._radius_flame: float | None = None
self._radius_flame_e: float | None = None
self._luminosity: float | None = None
self._luminosity_e: float | None = None
self._mass: float | None = None
self._mass_e: float | None = None
self._age: float | None = None
self._age_e: float | None = None
self._distance: float | None = None
self._distance_e: float | None = None
self._spectroscopic_params: dict | None = None
self._tic_id: int | None = None
self._kic_id: int | None = None
self.Av: float | None = None
self._resolve_gaia_id()
self._query_gaia_params()
self._query_crossmatches()
self._query_bailer_jones()
self._fetch_photometry()
self._impute_zero_errors()
self._query_spectroscopic_priors()
self._query_extinction()
if verbose:
self._print_summary()
# ── Gaia ID resolution ───────────────────────────────────────
def _resolve_gaia_id(self):
"""Find Gaia DR3 source_id via a Gaia TAP cone query if not provided.
Uses an ADQL cone search against gaiadr3.gaia_source ordered by angular
separation (nearest match). This is more robust than the VizieR cone
search and avoids astroquery's ``Gaia.cone_search`` whose ``radius``
argument became keyword-only in recent versions.
"""
if self._gaia_id is not None:
return
ra = self._coord.ra.deg
dec = self._coord.dec.deg
radius_deg = self._search_radius.to(u.deg).value
adql = (
"SELECT source_id, "
f"DISTANCE(POINT('ICRS', ra, dec), POINT('ICRS', {ra}, {dec})) "
"AS ang_sep FROM gaiadr3.gaia_source "
"WHERE 1 = CONTAINS(POINT('ICRS', ra, dec), "
f"CIRCLE('ICRS', {ra}, {dec}, {radius_deg})) "
"ORDER BY ang_sep ASC"
)
try:
res = _with_timeout(lambda: Gaia.launch_job(adql).get_results())
if res is None or len(res) == 0:
logger.warning("No Gaia DR3 source within search radius")
return
self._gaia_id = int(res["source_id"][0])
logger.info("Resolved Gaia DR3 ID: %d", self._gaia_id)
except Exception as e:
logger.warning("Gaia ID resolution failed: %s", e)
# ── Gaia stellar parameters ──────────────────────────────────
def _query_gaia_params(self):
"""Query Gaia DR3 for parallax, Teff, photometry, and FLAME params."""
if self._gaia_id is None:
return
main_cats = _with_timeout(
Vizier.query_constraints,
catalog="I/355/gaiadr3", Source=str(self._gaia_id),
)
if not main_cats or len(main_cats[0]) == 0:
logger.warning("Gaia DR3 source %d not found", self._gaia_id)
return
main = main_cats[0][0]
# FLAME astrophysical parameters table
ap_cats = _with_timeout(
Vizier.query_constraints,
catalog="I/355/paramp", Source=str(self._gaia_id),
)
ap = ap_cats[0][0] if ap_cats and len(ap_cats[0]) > 0 else None
# Parallax (Lindegren+21 correction)
plx = _col(main, "Plx")
plx_e = _col(main, "e_Plx")
if plx is not None:
self._parallax = float(plx) + _PLX_ZEROPOINT
if plx_e is not None:
self._parallax_e = np.sqrt(float(plx_e) ** 2 + _PLX_SYS_ERR**2)
# Gaia GSP-Phot Teff is intentionally NOT extracted as a likelihood
# prior. The photometric SED already constrains Teff via BC tables;
# injecting GSP-Phot as a separate observable double-counts the
# information and pins to the sampler-percentile width (formal σ
# ≈ 1–5 K) which is ~100x narrower than the actual systematic vs
# spectroscopy (Andrae+23 quotes ±100–200 K). Spectroscopic priors
# used by LACHESIS are [Fe/H] (and optionally logg) — Teff is
# output, not input.
# FLAME radius — 5x error inflation (ARIADNE convention to avoid
# over-constraining from crude FLAME estimates)
rad = _col(ap, "Rad-Flame") if ap is not None else None
if rad is not None:
self._radius_flame = float(rad)
lo = _col(ap, "b_Rad-Flame")
hi = _col(ap, "B_Rad-Flame")
if lo is not None and hi is not None:
self._radius_flame_e = 5 * max(
abs(float(rad) - float(lo)), abs(float(hi) - float(rad))
)
# FLAME luminosity
lum = _col(ap, "Lum-Flame") if ap is not None else None
if lum is not None:
self._luminosity = float(lum)
lo = _col(ap, "b_Lum-Flame")
hi = _col(ap, "B_Lum-Flame")
if lo is not None and hi is not None:
self._luminosity_e = max(
abs(float(lum) - float(lo)), abs(float(hi) - float(lum))
)
# FLAME mass
mass_v = _col(ap, "Mass-Flame") if ap is not None else None
if mass_v is not None:
self._mass = float(mass_v)
lo = _col(ap, "b_Mass-Flame")
hi = _col(ap, "B_Mass-Flame")
if lo is not None and hi is not None:
self._mass_e = max(
abs(float(mass_v) - float(lo)), abs(float(hi) - float(mass_v))
)
# FLAME age
age_v = _col(ap, "Age-Flame") if ap is not None else None
if age_v is not None:
self._age = float(age_v)
lo = _col(ap, "b_Age-Flame")
hi = _col(ap, "B_Age-Flame")
if lo is not None and hi is not None:
self._age_e = max(
abs(float(age_v) - float(lo)), abs(float(hi) - float(age_v))
)
# Gaia photometry
for band in _CATALOGS["Gaia"].bands:
mag_v = _col(main, band.col)
err_v = _col(main, band.err_col)
if _qc_mag(mag_v, err_v):
self._add_mag(
band.pyphot,
float(mag_v),
float(err_v) if err_v is not None else 0.01,
)
# ── Gaia TAP crossmatch ──────────────────────────────────────
def _query_crossmatches(self):
"""Query Gaia DR3 best_neighbour tables for external catalog IDs."""
if self._gaia_id is None:
return
self._ids["Gaia"] = self._gaia_id
tap_down = False
xmatch_needed = []
for cat_name, cat_def in _CATALOGS.items():
if cat_name == "Gaia":
continue
if cat_name in self._ignore:
self._ids[cat_name] = None
continue
if cat_def.xmatch_table is None:
continue
if tap_down:
xmatch_needed.append(cat_name)
continue
query = (
f"SELECT xmatch.original_ext_source_id "
f"FROM gaiadr3.{cat_def.xmatch_table}_best_neighbour AS xmatch "
f"WHERE xmatch.source_id = {int(self._gaia_id)}"
)
result = _tap_query(Gaia, query)
if result is None:
tap_down = True
xmatch_needed.append(cat_name)
self._ids[cat_name] = None
elif len(result) > 0:
self._ids[cat_name] = result[0][0]
else:
self._ids[cat_name] = None
logger.info("No %s crossmatch for Gaia %d", cat_name, self._gaia_id)
if xmatch_needed:
logger.warning(
"Gaia TAP down, falling back to VizieR XMatch for: %s", xmatch_needed
)
self._xmatch_fallback(xmatch_needed)
def _xmatch_fallback(self, catalogs_needed):
"""Recover catalog IDs via VizieR XMatch when Gaia TAP is down."""
region = CircleSkyRegion(self._coord, radius=self._search_radius)
for cat_name in catalogs_needed:
if cat_name in self._ignore:
continue
# Cone-only fallback (RAVE, SkyMapper — not on XMatch server)
if cat_name in _CONE_ONLY_FALLBACK:
viz_cat, id_cols = _CONE_ONLY_FALLBACK[cat_name]
result = _with_timeout(
Vizier.query_region,
self._coord, radius=self._search_radius, catalog=viz_cat,
)
if not result or len(result[0]) == 0:
self._ids[cat_name] = None
continue
result[0].sort("_r")
row = result[0][0]
found = next(
(_col(row, c) for c in id_cols if c in result[0].colnames),
None,
)
self._ids[cat_name] = found
continue
if cat_name not in _XMATCH_CONFIG:
continue
vizier_cat, id_cols = _XMATCH_CONFIG[cat_name]
xm = _with_timeout(
XMatch.query,
cat1="vizier:I/355/gaiadr3",
cat2=vizier_cat,
max_distance=self._search_radius,
area=region,
)
try:
if xm is None or len(xm) == 0:
self._ids[cat_name] = None
continue
xm.sort("angDist")
row = xm[0]
if "Source" in xm.colnames:
src_mask = xm["Source"] == self._gaia_id
if src_mask.sum() > 0:
row = xm[src_mask][0]
# Pattern B: APASS — extract mags directly from XMatch row
if cat_name == "APASS":
self._ids["APASS"] = "xmatch_done"
for band in _CATALOGS["APASS"].bands:
if band.pyphot in self._magnitudes:
continue
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if _qc_mag(mag_v, err_v):
err_f = float(err_v) if err_v is not None else 0.02
self._add_mag(band.pyphot, float(mag_v), err_f)
continue
# Tycho-2: composite TYC1-TYC2-TYC3 ID
if cat_name == "TYCHO2":
try:
tyc_id = (
f"{int(row['TYC1'])}-{int(row['TYC2'])}-{int(row['TYC3'])}"
)
self._ids["TYCHO2"] = tyc_id
except (KeyError, ValueError):
self._ids["TYCHO2"] = None
continue
# Pattern A: extract ID from first matching column
if id_cols is not None:
found = next(
(_col(row, c) for c in id_cols if c in xm.colnames),
None,
)
self._ids[cat_name] = found
else:
self._ids[cat_name] = None
except Exception as e:
logger.warning("XMatch fallback failed for %s: %s", cat_name, e)
self._ids[cat_name] = None
# ── Bailer-Jones distance ────────────────────────────────────
def _query_bailer_jones(self):
"""Query Bailer-Jones EDR3 geometric distance."""
if self._gaia_id is None:
return
res = _with_timeout(
Vizier.query_constraints,
catalog="I/352/gedr3dis", Source=str(self._gaia_id),
)
if not res or len(res[0]) == 0:
return
try:
row = res[0][0]
dist = float(row["rgeo"])
lo = dist - float(row["b_rgeo"])
hi = float(row["B_rgeo"]) - dist
self._distance = dist
self._distance_e = max(lo, hi)
except Exception as e:
logger.warning("Bailer-Jones row parse failed: %s", e)
# ── Photometry retrieval ─────────────────────────────────────
def _fetch_photometry(self):
"""Dispatch photometry retrieval for all catalogs."""
if self._gaia_id is None:
return
# Bulk VizieR cone search for standard catalogs
viz_ids = [
_CATALOGS[name].vizier_id
for name in _STANDARD_CATALOGS
if name not in self._ignore
and _CATALOGS[name].vizier_id is not None
and self._ids.get(name) is not None
]
cats = {}
if viz_ids:
result = _with_timeout(
Vizier.query_region,
self._coord, radius=self._search_radius, catalog=viz_ids,
)
if result:
cats = result
for cat_name in _STANDARD_CATALOGS:
if cat_name in self._ignore:
continue
ext_id = self._ids.get(cat_name)
if ext_id is None:
continue
self._fetch_standard(cat_name, _CATALOGS[cat_name], cats, ext_id)
if "TESS" not in self._ignore:
self._fetch_tess()
if "APASS" not in self._ignore:
self._fetch_apass()
if "SkyMapper" not in self._ignore:
self._fetch_skymapper()
if "GALEX" not in self._ignore:
self._fetch_galex()
if "MERMILLIOD" not in self._ignore:
self._fetch_mermilliod()
if "STROMGREN" not in self._ignore:
self._fetch_stromgren()
def _fetch_standard(self, cat_name, cat_def, cats, ext_id):
"""Fetch photometry from VizieR results, filtered by crossmatched ID."""
if cat_def.vizier_id is None:
return
try:
cat = cats[cat_def.vizier_id]
cat.sort("_r")
except (TypeError, KeyError):
logger.info("No VizieR data for %s", cat_name)
return
# Filter by crossmatched ID
if cat_name == "TYCHO2":
try:
tyc1, tyc2, tyc3 = str(ext_id).split("-")
mask = (
(cat["TYC1"] == int(tyc1))
& (cat["TYC2"] == int(tyc2))
& (cat["TYC3"] == int(tyc3))
)
except (ValueError, KeyError):
return
elif cat_name == "GLIMPSE":
tmass_id = self._ids.get("2MASS")
if tmass_id is None:
return
id_cols = cat_def.xmatch_id_cols or ()
col_name = next((c for c in id_cols if c in cat.colnames), None)
if col_name is None:
return
mask = cat[col_name] == tmass_id
else:
id_cols = cat_def.xmatch_id_cols or ()
col_name = next((c for c in id_cols if c in cat.colnames), None)
if col_name is None:
return
mask = cat[col_name] == ext_id
matched = cat[mask]
if len(matched) == 0:
return
row = matched[0]
# Catalog-level QC
if cat_def.qc is not None and not cat_def.qc(row):
logger.info("%s failed catalog QC", cat_name)
return
# WISE extended source check
if cat_name == "Wise" and not _qc_wise_extended(row):
logger.info("WISE source is extended, skipping")
return
for band in cat_def.bands:
if band.pyphot in self._magnitudes:
continue
# Per-band QC
if band.pyphot in _2MASS_BAND_IDX:
if not _qc_2mass_band(row, _2MASS_BAND_IDX[band.pyphot]):
continue
if band.pyphot in _WISE_BAND_IDX:
if not _qc_wise_band(row, _WISE_BAND_IDX[band.pyphot]):
continue
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if not _qc_mag(mag_v, err_v):
continue
err_f = float(err_v) if err_v is not None else 0.02
self._add_mag(band.pyphot, float(mag_v), err_f)
def _fetch_tess(self):
"""Query TESS TIC via MAST, crossmatched on Gaia ID."""
result = _with_timeout(
Catalogs.query_region,
self._coord, radius=self._search_radius, catalog="TIC",
)
if result is None or len(result) == 0:
return
result.sort("dstArcSec")
gaia_mask = result["GAIA"] == str(self._gaia_id)
matched = result[gaia_mask]
if len(matched) == 0:
return
row = matched[0]
if _col(row, "objType") != "STAR":
return
self._tic_id = int(row["ID"])
kic = _col(row, "KIC")
if kic is not None:
self._kic_id = int(kic)
band = _CATALOGS["TESS"].bands[0]
if band.pyphot in self._magnitudes:
return
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if _qc_mag(mag_v, err_v):
err_f = float(err_v) if err_v is not None else 0.01
self._add_mag(band.pyphot, float(mag_v), err_f)
def _fetch_apass(self):
"""Query APASS via Gaia TAP external.apassdr9 table."""
apass_id = self._ids.get("APASS")
if apass_id is None or apass_id == "xmatch_done":
return
query = (
"SELECT apass.vmag, apass.e_vmag, "
"apass.bmag, apass.e_bmag, "
"apass.g_mag, apass.e_g_mag, "
"apass.r_mag, apass.e_r_mag, "
"apass.i_mag, apass.e_i_mag "
"FROM external.apassdr9 AS apass "
"INNER JOIN gaiadr3.apassdr9_best_neighbour AS xmatch "
"ON apass.recno = xmatch.original_ext_source_id "
f"WHERE xmatch.source_id = {int(self._gaia_id)}"
)
result = _tap_query(Gaia, query)
if result is None or len(result) == 0:
return
row = result[0]
for band in _CATALOGS["APASS"].bands:
if band.pyphot in self._magnitudes:
continue
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if _qc_mag(mag_v, err_v):
err_f = float(err_v) if err_v is not None else 0.02
self._add_mag(band.pyphot, float(mag_v), err_f)
def _fetch_skymapper(self):
"""Query SkyMapper DR2 via SkyMapper TAP service."""
sm_id = self._ids.get("SkyMapper")
if sm_id is None:
return
from astroquery.utils.tap.core import TapPlus
tap = TapPlus(url="https://api.skymapper.nci.org.au/public/tap/")
query = (
"SELECT object_id, u_psf, e_u_psf, v_psf, e_v_psf, "
"g_psf, e_g_psf, r_psf, e_r_psf, i_psf, e_i_psf, "
"z_psf, e_z_psf, flags "
f"FROM dr2.master WHERE object_id = {int(sm_id)}"
)
result = _tap_query(tap, query)
if result is None or len(result) == 0:
return
row = result[0]
if not _qc_skymapper(row):
return
for band in _CATALOGS["SkyMapper"].bands:
if band.pyphot in self._magnitudes:
continue
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if _qc_mag(mag_v, err_v):
err_f = float(err_v) if err_v is not None else 0.02
self._add_mag(band.pyphot, float(mag_v), err_f)
def _fetch_galex(self):
"""Query GALEX via VizieR XMatch with Gaia DR3."""
region = CircleSkyRegion(self._coord, radius=self._search_radius)
xm = _with_timeout(
XMatch.query,
cat1="vizier:I/355/gaiadr3",
cat2="vizier:II/312/ais",
max_distance=self._search_radius,
area=region,
)
try:
if xm is None or len(xm) == 0:
return
xm.sort("angDist")
src_mask = xm["Source"] == self._gaia_id
if src_mask.sum() == 0:
return
row = xm[src_mask][0]
for band in _CATALOGS["GALEX"].bands:
if band.pyphot in self._magnitudes:
continue
if not _qc_galex_band(row, band.pyphot):
continue
mag_v = _col(row, band.col)
err_v = _col(row, band.err_col)
if _qc_mag(mag_v, err_v):
err_f = float(err_v) if err_v is not None else 0.02
self._add_mag(band.pyphot, float(mag_v), err_f)
except Exception as e:
logger.warning("GALEX XMatch query failed: %s", e)
def _fetch_mermilliod(self):
"""Query Mermilliod Johnson UBV via VizieR XMatch.
Color algebra: V direct, B = (B-V) + V, U = (U-B) + B.
"""
region = CircleSkyRegion(self._coord, radius=5 * u.arcmin)
xm = _with_timeout(
XMatch.query,
cat1="vizier:I/355/gaiadr3",
cat2="vizier:II/168/ubvmeans",
max_distance=3 * u.arcmin,
area=region,
)
try:
if xm is None or len(xm) == 0:
return
xm.sort("angDist")
src_mask = xm["Source"] == self._gaia_id
if src_mask.sum() == 0:
return
row = xm[src_mask][0]
v = _col(row, "Vmag")
v_e = _col(row, "e_Vmag")
if not _qc_mag(v, v_e):
return
v, v_e = float(v), float(v_e) if v_e is not None else 0.0
if v_e > 0 and "GROUND_JOHNSON_V" not in self._magnitudes:
self._add_mag("GROUND_JOHNSON_V", v, v_e)
bv = _col(row, "B-V")
bv_e = _col(row, "e_B-V")
if _qc_mag(bv, bv_e):
bv = float(bv)
bv_e = float(bv_e) if bv_e is not None else 0.0
b = bv + v
b_e = np.sqrt(v_e**2 + bv_e**2)
if b_e > 0 and "GROUND_JOHNSON_B" not in self._magnitudes:
self._add_mag("GROUND_JOHNSON_B", b, b_e)
ub = _col(row, "U-B")
ub_e = _col(row, "e_U-B")
if _qc_mag(ub, ub_e):
ub = float(ub)
ub_e = float(ub_e) if ub_e is not None else 0.0
u_mag = ub + b
u_e = np.sqrt(b_e**2 + ub_e**2)
if u_e > 0 and "GROUND_JOHNSON_U" not in self._magnitudes:
self._add_mag("GROUND_JOHNSON_U", u_mag, u_e)
except Exception as e:
logger.warning("Mermilliod XMatch query failed: %s", e)
def _fetch_stromgren(self):
"""Query Stromgren from Paunzen+15 and Hauck+98 via VizieR XMatch.
Index algebra: b=(b-y)+y, v=m1+2(b-y)+y, u=c1+2m1+3(b-y)+y.
"""
xm_configs = [
("vizier:J/A+A/580/A23/catalog", self._search_radius),
("vizier:II/215/catalog", self._search_radius),
]
region = CircleSkyRegion(self._coord, radius=self._search_radius)
for cat2, max_dist in xm_configs:
xm = _with_timeout(
XMatch.query,
cat1="vizier:I/355/gaiadr3",
cat2=cat2,
max_distance=max_dist,
area=region,
)
try:
if xm is None or len(xm) == 0:
continue
xm.sort("angDist")
src_mask = xm["Source"] == self._gaia_id
if src_mask.sum() == 0:
continue
row = xm[src_mask][0]
y = _col(row, "Vmag")
y_e = _col(row, "e_Vmag")
if not _qc_mag(y, y_e):
continue
y = float(y)
y_e = float(y_e) if y_e is not None else 0.0
by = _col(row, "b-y")
by_e = _col(row, "e_b-y")
m1 = _col(row, "m1")
m1_e = _col(row, "e_m1")
c1 = _col(row, "c1")
c1_e = _col(row, "e_c1")
if not all(
_qc_mag(x, xe) for x, xe in [(by, by_e), (m1, m1_e), (c1, c1_e)]
):
continue
by, by_e = float(by), float(by_e) if by_e is not None else 0.0
m1, m1_e = float(m1), float(m1_e) if m1_e is not None else 0.0
c1, c1_e = float(c1), float(c1_e) if c1_e is not None else 0.0
b_mag = by + y
v_mag = m1 + 2 * by + y
u_mag = c1 + 2 * m1 + 3 * by + y
b_e = np.sqrt(by_e**2 + y_e**2)
v_e = np.sqrt(m1_e**2 + 4 * by_e**2 + y_e**2)
u_e = np.sqrt(c1_e**2 + 4 * m1_e**2 + 9 * by_e**2 + y_e**2)
for name, m, e in [
("STROMGREN_u", u_mag, u_e),
("STROMGREN_v", v_mag, v_e),
("STROMGREN_b", b_mag, b_e),
("STROMGREN_y", y, y_e),
]:
if name not in self._magnitudes:
self._add_mag(name, m, e)
break # found in one catalog, skip the other
except Exception as e:
logger.warning("Stromgren XMatch failed for %s: %s", cat2, e)
# ── Spectroscopic parameters ─────────────────────────────────
def _query_spectroscopic_priors(self):
"""Assemble spectroscopic priors (Teff, logg, [Fe/H]).
Teff/logg come from the survey priority chain (first match wins):
PASTEL > APOGEE DR17 > GALAH DR3 > RAVE DR6 > LAMOST DR11.
[Fe/H] depends on ``feh_source``:
* "hypatia" (default) — the Hypatia Catalog, a multi-study
high-resolution-spectroscopy compilation whose spread gives a
realistic σ; falls back to the survey-chain [Fe/H] on a Hypatia
miss.
* "survey" — use the survey-chain [Fe/H] (legacy behaviour).
"""
base = None
for query_fn, source_name in [
(self._query_pastel, "PASTEL"),
(self._query_apogee, "APOGEE_DR17"),
(self._query_galah, "GALAH_DR3"),
(self._query_rave, "RAVE_DR6"),
(self._query_lamost, "LAMOST_DR11"),
]:
result = query_fn()
if result is not None:
result["source"] = source_name
base = result
logger.info("Spectroscopic params from %s", source_name)
break
if self._feh_source == "hypatia":
hyp = self._query_hypatia()
if hyp is not None:
if base is None:
base = {"teff": None, "teff_err": None,
"logg": None, "logg_err": None,
"source": None}
base["feh"] = hyp["feh"]
base["feh_err"] = hyp["feh_err"]
base["feh_source"] = "Hypatia"
base["feh_n_meas"] = hyp["n_meas"]
base["feh_solarnorm"] = hyp["solarnorm"]
base["feh_statistic"] = hyp["statistic"]
base["feh_uncertainty"] = hyp["uncertainty"]
logger.info("[Fe/H] from Hypatia: %.3f ± %.3f (n=%d, %s)",
hyp["feh"], hyp["feh_err"], hyp["n_meas"],
hyp["matched_name"])
self._spectroscopic_params = base
def _query_hypatia(self) -> dict | None:
"""Query the Hypatia Catalog for [Fe/H] using the configured options."""
from ._hypatia import query_hypatia_feh
names = []
tmass = self._ids.get("2MASS") if self._ids else None
if tmass:
names.append(f"2MASS J{tmass}")
return query_hypatia_feh(
self._gaia_id, names,
statistic=self._hypatia_statistic,
uncertainty=self._hypatia_uncertainty,
solarnorm=self._hypatia_solarnorm,
floor=self._feh_floor,
)
def _query_apogee(self) -> dict | None:
"""Query APOGEE DR17 via VizieR (III/286).
Crossmatches via 2MASS ID. Rejects entries with ASPCAPFLAG != 0.
Uses [M/H] as proxy for [Fe/H] (close enough for priors).
"""
tmass_id = self._ids.get("2MASS")
if tmass_id is None:
return None
try:
cat = _with_timeout(
Vizier.query_constraints,
catalog="III/286/allstars", **{"2MASS": str(tmass_id)},
)
if not cat or len(cat[0]) == 0:
return None
row = cat[0][0]
# Reject flagged entries
flag = _col(row, "ASPCAPFLAG")
if flag is not None and int(flag) != 0:
return None
teff = _col(row, "Teff")
teff_e = _col(row, "e_Teff")
logg = _col(row, "logg")
logg_e = _col(row, "e_logg")
mh = _col(row, "__M_H_")
mh_e = _col(row, "e__M_H_")
if any(v is None or not np.isfinite(float(v)) for v in (teff, logg, mh)):
return None
return {
"teff": float(teff),
"teff_err": float(teff_e) if teff_e is not None else 100.0,
"logg": float(logg),
"logg_err": float(logg_e) if logg_e is not None else 0.2,
"feh": float(mh),
"feh_err": float(mh_e) if mh_e is not None else 0.1,
}
except Exception as e:
logger.warning("APOGEE DR17 query failed: %s", e)
return None
def _query_galah(self) -> dict | None:
"""Query GALAH DR3 via VizieR (J/MNRAS/506/150).
Pre-matched to Gaia DR3 source_id. Rejects entries with quality flags.
"""
if self._gaia_id is None:
return None
try:
cat = _with_timeout(
Vizier.query_constraints,
catalog="J/MNRAS/506/150/catalog",
Source=str(self._gaia_id),
)
if not cat or len(cat[0]) == 0:
return None
row = cat[0][0]
# Quality flags: reject if flagged
flag_sp = _col(row, "flag_sp")
flag_fe_h = _col(row, "flag_fe_h")
if flag_sp is not None and int(flag_sp) != 0:
return None
if flag_fe_h is not None and int(flag_fe_h) != 0:
return None
teff = _col(row, "Teff")
teff_e = _col(row, "e_Teff")
logg = _col(row, "logg")
logg_e = _col(row, "e_logg")
feh = _col(row, "fe_h")
feh_e = _col(row, "e_fe_h")
if any(v is None or not np.isfinite(float(v)) for v in (teff, logg, feh)):
return None
return {
"teff": float(teff),
"teff_err": float(teff_e) if teff_e is not None else 100.0,
"logg": float(logg),
"logg_err": float(logg_e) if logg_e is not None else 0.2,
"feh": float(feh),
"feh_err": float(feh_e) if feh_e is not None else 0.1,
}
except Exception as e:
logger.warning("GALAH DR3 query failed: %s", e)
return None
def _query_rave(self) -> dict | None:
"""Query RAVE DR6 for spectroscopic Teff, logg, [Fe/H]."""
rave_id = self._ids.get("RAVE")
if rave_id is None:
return None
try:
cat = _with_timeout(
Vizier.query_constraints,
catalog="III/283/madera", ObsID=str(rave_id),
)
if not cat or len(cat[0]) == 0:
return None
row = cat[0][0]
qual = _col(row, "Qual")
if qual is not None and int(qual) == 1:
return None
return {
"teff": float(row["TeffmC"]),
"teff_err": float(row["e_Teffm"]),
"logg": float(row["loggmC"]),
"logg_err": float(row["e_loggm"]),
"feh": float(row["[m/H]mC"]),
"feh_err": float(row["e_[m/H]m"]),
}
except Exception as e:
logger.warning("RAVE DR6 query failed: %s", e)
return None
def _query_lamost(self) -> dict | None:
"""Query LAMOST DR9 stellar parameters via VizieR (V/162).
Crossmatches via positional cone search since LAMOST has no
pre-matched Gaia source_id. Requires SNR_g > 30.
"""
try:
cats = _with_timeout(
Vizier.query_region,
self._coord, radius=self._search_radius, catalog="V/162",
)
if cats is None:
return None
if not cats or len(cats[0]) == 0:
return None
cats[0].sort("_r")
row = cats[0][0]
# SNR quality cut
snrg = _col(row, "snrg")
if snrg is not None and float(snrg) < 30:
return None
teff = _col(row, "Teff")
teff_e = _col(row, "e_Teff")
logg = _col(row, "logg")
logg_e = _col(row, "e_logg")
feh = _col(row, "__Fe_H_")
feh_e = _col(row, "e__Fe_H_")
if any(v is None for v in (teff, logg, feh)):
return None
return {
"teff": float(teff),
"teff_err": float(teff_e) if teff_e is not None else 100.0,
"logg": float(logg),
"logg_err": float(logg_e) if logg_e is not None else 0.2,
"feh": float(feh),
"feh_err": float(feh_e) if feh_e is not None else 0.1,
}
except Exception as e:
logger.warning("LAMOST DR11 query failed: %s", e)
return None
def _query_pastel(self) -> dict | None:
"""Query PASTEL catalog via VizieR (B/pastel).
Literature compilation of spectroscopic parameters. Positional match.
Uses conservative default errors since many entries lack formal errors.
"""
try:
cats = _with_timeout(
Vizier.query_region,
self._coord, radius=self._search_radius, catalog="B/pastel/pastel",
)
if not cats or len(cats[0]) == 0:
return None
cats[0].sort("_r")
row = cats[0][0]
teff = _col(row, "Teff")
logg = _col(row, "logg")
feh = _col(row, "__Fe_H_")
if any(v is None or not np.isfinite(float(v)) for v in (teff, logg, feh)):
return None
teff_e = _col(row, "e_Teff")
logg_e = _col(row, "e_logg")
feh_e = _col(row, "e__Fe_H_")
return {
"teff": float(teff),
"teff_err": float(teff_e) if teff_e is not None else 150.0,
"logg": float(logg),
"logg_err": float(logg_e) if logg_e is not None else 0.3,
"feh": float(feh),
"feh_err": float(feh_e) if feh_e is not None else 0.15,
}
except Exception as e:
logger.warning("PASTEL query failed: %s", e)
return None
def _query_extinction(self):
"""Query max line-of-sight Av from SFD dustmap.
SFD is purely 2D (line-of-sight integrated), so the distance kwarg
is intentionally not passed. The SFDQuery instance is cached at
module scope to avoid reloading ~600 MB of FITS per Star.
"""
try:
sfd = _DUSTMAP_CACHE.get("sfd")
if sfd is None:
from dustmaps.sfd import SFDQuery
sfd = SFDQuery()
_DUSTMAP_CACHE["sfd"] = sfd
coords = SkyCoord(self.ra, self.dec, unit=(u.deg, u.deg), frame='icrs')
ebv = sfd(coords)
self.Av = float(ebv) * _AV_PER_EBV_SFD
except Exception as e:
logger.warning("SFD extinction query failed: %s", e)
self.Av = None
# ── Helpers ──────────────────────────────────────────────────
def _impute_zero_errors(self):
"""Impute errors for zero-error bands (matching ARIADNE).
Computes the maximum relative flux error from bands with valid
errors, adds 0.1, and assigns that relative error to zero-error
bands. This mirrors ARIADNE's Star.extract_info() logic where
mx_rel_er = max(flux_er / flux) + 0.1.
"""
_LN10x04 = 0.4 * np.log(10) # ≈ 0.9210
good_rel = []
for _band, (_mag, err) in self._magnitudes.items():
if err > 0:
good_rel.append(_LN10x04 * err)
if not good_rel:
return
mx_rel = max(good_rel) + 0.1
imputed = mx_rel / _LN10x04
for band, (mag, err) in list(self._magnitudes.items()):
if err <= 0:
self._magnitudes[band] = (mag, imputed)
def _add_mag(self, pyphot_name, mag, err):
"""Add magnitude. First-write-wins (catalog priority by query order)."""
if pyphot_name not in self._magnitudes:
self._magnitudes[pyphot_name] = (mag, err)
def _print_summary(self):
import random
from termcolor import colored
from ..phot_utils import get_effective_wavelength
c = random.choice(['red', 'green', 'blue', 'yellow', 'grey', 'magenta', 'cyan', 'white'])
t2 = "\t\t"
t3 = "\t\t\t"
def _filter_wavelength(b):
try:
return get_effective_wavelength(b)
except Exception:
return 99.0
# Photometry table sorted by wavelength (like ARIADNE)
bands = sorted(
self._magnitudes.keys(),
key=_filter_wavelength,
)
print(colored(f"{t2}--- Retrieved photometry ---", c))
print(colored(f"{t2}{' Filter':^20s}\t{'Magnitude':>9s}\t{'Uncertainty':>11s}", c))
print(colored(f"{t2}{'':->20s}\t{'-':->9s}\t{'-':->11s}", c))
for band in bands:
mag, err = self._magnitudes[band]
print(colored(f"{t2}{band:^20s}\t{mag: ^9.4f}\t{err: ^11.4f}", c))
# Stellar params — new color (matching ARIADNE's display_star_fin)
c = random.choice(['red', 'green', 'blue', 'yellow', 'grey', 'magenta', 'cyan', 'white'])
if self._gaia_id:
print(colored(f"{t3}Gaia DR3 ID : {self._gaia_id}", c))
if self._tic_id:
print(colored(f"{t3}TIC : {self._tic_id}", c))
if self._kic_id:
print(colored(f"{t3}KIC : {self._kic_id}", c))
if self._teff is not None:
print(colored(f"{t3}Gaia Effective temperature : ", c), end='')
print(colored(f"{self._teff:.3f} +/- {self._teff_e:.3f}", c))
if self._radius_flame is not None:
print(colored(f"{t3}Gaia Stellar radius : ", c), end='')
print(colored(f"{self._radius_flame:.3f} +/- {self._radius_flame_e:.3f}", c))
if self._luminosity is not None:
print(colored(f"{t3}Gaia Stellar Luminosity : ", c), end='')
print(colored(f"{self._luminosity:.3f} +/- {self._luminosity_e:.3f}", c))
if self._parallax is not None:
print(colored(f"{t3}Gaia Parallax : ", c), end='')
print(colored(f"{self._parallax:.3f} +/- {self._parallax_e:.3f}", c))
if self._distance is not None:
print(colored(f"{t3}Bailer-Jones distance : ", c), end='')
print(colored(f"{self._distance:.3f} +/- {self._distance_e:.3f}", c))
if hasattr(self, 'Av') and self.Av is not None:
print(colored(f"{t3}Maximum Av : ", c), end='')
print(colored(f"{self.Av:.3f}", c))
print()
print()
# ── Properties ───────────────────────────────────────────────
@property
def parallax(self) -> tuple[float, float] | None:
"""(parallax_mas, error_mas) with Lindegren+21 correction."""
if self._parallax is None:
return None
return (self._parallax, self._parallax_e)
@property
def teff(self) -> tuple[float, float] | None:
"""(Teff_K, error_K) from Gaia GSP-Phot."""
if self._teff is None:
return None
return (self._teff, self._teff_e or 100.0)
@property
def radius(self) -> tuple[float, float] | None:
"""(R_Rsun, error_Rsun) from FLAME (5x inflated error)."""
if self._radius_flame is None:
return None
return (self._radius_flame, self._radius_flame_e)
@property
def luminosity(self) -> tuple[float, float] | None:
"""(L_Lsun, error_Lsun) from FLAME."""
if self._luminosity is None:
return None
return (self._luminosity, self._luminosity_e)
@property
def mass(self) -> tuple[float, float] | None:
"""(M_Msun, error_Msun) from FLAME."""
if self._mass is None:
return None
return (self._mass, self._mass_e)
@property
def age(self) -> tuple[float, float] | None:
"""(age_Gyr, error_Gyr) from FLAME."""
if self._age is None:
return None
return (self._age, self._age_e)
@property
def distance(self) -> tuple[float, float] | None:
"""(distance_pc, error_pc) from Bailer-Jones EDR3."""
if self._distance is None:
return None
return (self._distance, self._distance_e)
@property
def magnitudes(self) -> dict[str, tuple[float, float]]:
"""{'pyphot_filter_name': (mag, error)}."""
return dict(self._magnitudes)
@property
def gaia_id(self) -> int | None:
return self._gaia_id
@property
def tic_id(self) -> int | None:
return self._tic_id
@property
def spectroscopic_params(self) -> dict | None:
"""Spectroscopic params dict with source tag, or None.
Keys: teff, teff_err, logg, logg_err, feh, feh_err, source.
"""
return dict(self._spectroscopic_params) if self._spectroscopic_params else None
@property
def rave_params(self) -> dict | None:
"""RAVE DR6 spectroscopic params, or None. Backward-compatible."""
if (
self._spectroscopic_params is not None
and self._spectroscopic_params.get("source") == "RAVE_DR6"
):
return dict(self._spectroscopic_params)
return None