"""Star.py contains the Star class which contains the data regarding a star."""
__all__ = ['Star']
import random
import warnings
import astropy.units as u
import numpy as np
from astropy.coordinates import SkyCoord
from dustmaps.sfd import SFDQuery
from dustmaps.planck import (PlanckQuery, PlanckGNILCQuery)
from dustmaps.lenz2017 import Lenz2017Query
from dustmaps.bayestar import BayestarQuery
from termcolor import colored
from scipy.constants import h as _h, c as _c, k as _k
from .config import (gridsdir, filter_names, colors, iso_mask, iso_bands)
from .isochrone import estimate
from .librarian import Librarian
from .librarian._adapter import adapt_librarian
from .error import StarWarning
from .phot_utils import *
from .utils import (display_star_fin, display_star_init)
_GAIA_PREFIXES = ('GaiaDR2', 'GaiaDR3', 'GAIA')
def extract_from_lib(lib):
"""Extract relevant parameters from lib.
Returns
-------
[plx, plx_e, dist, dist_e, rad, rad_e, temp, temp_e, lum, lum_e]
"""
if lib is None:
return [-1] * 10
return [
lib.plx, lib.plx_e,
lib.dist, lib.dist_e,
lib.rad, lib.rad_e,
lib.temp, lib.temp_e,
lib.lum, lib.lum_e
]
[docs]
class Star:
"""Object that holds stellar magnitudes and other relevant information.
Parameters
----------
starname: str
The name of the object. If ra and dec aren't provided nor is a
list of magnitudes with associated uncertainties provided, the search
for stellar magnitudes will be done using the object's name instead.
ra: float
RA coordinate of the object in degrees.
dec: float
DEC coordinate of the object in degrees.
g_id: int, optional
The Gaia DR2 identifier.
plx: float, optional
The parallax of the star in case no internet connection is available
or if no parallax can be found on Gaia DR2.
plx_e: float, optional
The error on the parallax.
rad: float, optional
The radius of the star in case no internet connection is available
or if no radius can be found on Gaia DR2.
rad_e: float, optional
The error on the stellar radius.
temp: float, optional
The effective temperature of the star in case no internet connection
is available or if no effective temperature can be found on Gaia DR2.
temp_e: float, optional
The error on the effective temperature.
lum: float, optional
The stellar luminosity in case no internet connection
is available or if no luminosity can be found on Gaia DR2.
lum_e: float, optional
The error on the stellar luminosity.
dist: float, optional
The distance in parsec.
dist_e: float, optional
The error on the distance.
mag_dict: dictionary, optional
A dictionary with the filter names as keys (names must correspond to
those in the filter_names attribute) and with a tuple containing the
magnitude and error for that filter as the value. Provide in case no
internet connection is available.
offline: bool
If False it overrides the coordinate search entirely.
verbose: bool, optional
Set to False to suppress printed outputs.
ignore: list, optional
A list with the catalogs to ignore for whatever reason.
Attributes
----------
full_grid: ndarray
The full grid of fluxes.
teff: ndarray
The effective temperature axis of the flux grid.
logg: ndarray
The gravity axis of the flux grid
z: ndarray, float
If fixed_z is False, then z is the metallicity axis of the flux grid.
Otherwise z has the same value as fixed_z
starname: str
The name of the object.
ra: float
RA coordinate of the object.
dec: float
DEC coordinate of the object.
wave: ndarray
An array containing the wavelengths associated to the different
filters retrieved.
flux: ndarray
An array containing the fluxes of the different retrieved magnitudes.
"""
filter_names = filter_names
colors = colors
dustmaps = {
'SFD': SFDQuery,
'Lenz': Lenz2017Query,
'Planck13': PlanckQuery,
'Planck16': PlanckGNILCQuery,
'Bayestar': BayestarQuery,
}
def __init__(self, starname, ra, dec, g_id=None,
plx=None, plx_e=None, rad=None, rad_e=None,
temp=None, temp_e=None, lum=None, lum_e=None,
dist=None, dist_e=None, Av=None, Av_e=None,
offline=False, mag_dict=None, verbose=True, ignore=None,
dustmap='SFD'):
"""See class docstring."""
# MISC
self.verbose = verbose
self.offline = offline
# Star stuff
self.starname = starname
self.ra_dec_to_deg(ra, dec)
c = random.choice(self.colors)
display_star_init(self, c)
if verbose:
if plx is not None:
StarWarning('Parallax', 0).warn()
if rad is not None:
StarWarning('Radius', 0).warn()
if temp is not None:
StarWarning('Temperature', 0).warn()
if lum is not None:
StarWarning('Luminosity', 0).warn()
if mag_dict is not None:
StarWarning('Magnitudes', 0).warn()
self.get_plx = True if plx is None else False
self.get_dist = True if dist is None and plx is None else False
self.get_rad = True if rad is None else False
self.get_temp = True if temp is None else False
self.get_lum = True if lum is None else False
self.get_mags = True if mag_dict is None else False
self.get_logg = False # This is set to True after self.estimate_logg
self.g_id = g_id
# Lookup archival magnitudes, radius, temperature, luminosity
# and parallax
lookup = self.get_rad + self.get_temp + self.get_plx \
+ self.get_mags + self.get_dist
if lookup:
if not offline:
if verbose:
print(
colored('\t\t*** LOOKING UP ARCHIVAL INFORMATION ***',
c)
)
ignore_arg = ignore if ignore is not None else ()
lib = Librarian(self.ra, self.dec, gaia_id=self.g_id,
ignore=ignore_arg, verbose=self.verbose)
adapted = adapt_librarian(lib)
self.g_id = adapted.g_id
self.tic = adapted.tic
self.kic = adapted.kic
# Store spectroscopic parameters for use in priors
self.rave_params = adapted.rave_params
self.spectroscopic_params = adapted.spectroscopic_params
else:
print(
colored('\t\t*** ARCHIVAL LOOKUP OVERRIDDEN ***', c)
)
if self.get_mags:
StarWarning('', 1).__raise__()
adapted = None
self.tic = False
self.kic = False
self.rave_params = None
self.spectroscopic_params = None
# [plx, plx_e, dist, dist_e, rad, rad_e, temp, temp_e, lum, lum_e]
libouts = extract_from_lib(adapted)
if self.get_plx:
self.plx = libouts[0]
self.plx_e = libouts[1]
else:
self.plx = plx
self.plx_e = plx_e
if self.get_dist:
self.dist = libouts[2]
self.dist_e = libouts[3]
elif dist is not None:
self.dist = dist
self.dist_e = dist_e
else:
self.calculate_distance()
# In case self.dist == -999 (no BJ EDR3 distance found)
if self.dist == -999:
self.calculate_distance()
if self.get_rad:
self.rad = libouts[4]
self.rad_e = libouts[5]
else:
self.rad = rad
self.rad_e = rad_e
if self.get_temp:
self.temp = libouts[6]
self.temp_e = libouts[7]
else:
self.temp = temp
self.temp_e = temp_e
if self.get_lum:
self.lum = libouts[8]
self.lum_e = libouts[9]
else:
self.lum = lum
self.lum_e = lum_e
if self.get_mags:
# The adapter preserves the old _add_mags 1/2 distinction (2 ==
# band present but no error). The OLD Librarian.__init__ collapsed
# 2 -> 1 (`used_filters[used_filters >= 1] = 1`) before Star ever
# saw it, and all downstream code keys off `== 1`. Replicate that
# collapse here so the contract is byte-for-byte unchanged.
used_filters = adapted.used_filters.copy()
used_filters[used_filters >= 1] = 1
self.used_filters = used_filters
self.mags = adapted.mags
self.mag_errs = adapted.mag_errs
else:
filters = []
self.used_filters = np.zeros(self.filter_names.shape[0])
self.mags = np.zeros(self.filter_names.shape[0])
self.mag_errs = np.zeros(self.filter_names.shape[0])
for k in mag_dict.keys():
filt_idx = np.where(k == self.filter_names)[0]
self.used_filters[filt_idx] = 1
self.mags[filt_idx] = mag_dict[k][0]
self.mag_errs[filt_idx] = mag_dict[k][1]
filters.append(k)
self.filter_mask = np.where(self.used_filters == 1)[0]
# IRX filters
self.irx_filter_mask = np.array([])
self.irx_used_filters = np.zeros(self.filter_names.shape[0])
# Get max Av
if Av is None:
self.Av_e = None
dmap = self.dustmaps[dustmap]()
coords = SkyCoord(self.ra, self.dec, distance=self.dist,
unit=(u.deg, u.deg, u.pc), frame='icrs')
if dustmap in ['SFD', 'Lenz']:
ebv = dmap(coords)
self.Av = ebv * 2.742
elif dustmap == 'Bayestar':
ebvs = dmap(coords, mode='percentile', pct=[15, 50, 84])
if np.any(np.isnan(ebvs)):
StarWarning(None, 2).warn()
ebv = self.dustmaps['SFD']()(coords)
self.Av = ebv * 2.742
else:
mags = ebvs * 2.742 * 0.884
self.Av = mags[1]
self.Av_e = max([mags[1] - mags[0], mags[2] - mags[1]])
elif dustmap in ['Planck13', 'Planck16']:
ebv = dmap(coords)
self.Av = ebv * 3.1
else:
self.Av = Av
# Get the wavelength and fluxes of the retrieved magnitudes.
wave, flux, flux_er, bandpass = extract_info(
self.mags, self.mag_errs, self.filter_names)
self.wave = np.zeros(self.filter_names.shape[0])
self.flux = np.zeros(self.filter_names.shape[0])
self.flux_er = np.zeros(self.filter_names.shape[0])
self.bandpass = np.zeros(self.filter_names.shape[0])
for k in wave.keys():
filt_idx = np.where(k == self.filter_names)[0]
self.wave[filt_idx] = wave[k]
self.bandpass[filt_idx] = bandpass[k]
self.flux[filt_idx] = flux[k]
self.flux_er[filt_idx] = flux_er[k]
rel_er = self.flux_er[self.filter_mask] / self.flux[self.filter_mask]
mx_rel_er = rel_er.max() + 0.1
upper = self.flux_er[self.filter_mask] == 0
flx = self.flux[self.filter_mask][upper]
for i, f in zip(self.filter_mask[upper], flx):
self.flux_er[i] = mx_rel_er * f
# --- All retrieved photometry ---
c = random.choice(self.colors)
print(colored('\t\t--- Retrieved photometry ---', c))
self.print_mags(c)
# --- Photometry QC (warn only — no filters are removed) ---
self.clip_outlier_magnitudes(sigma=5.0, err_floor=0.05, warn_only=True)
# self.calculate_distance()
c = random.choice(self.colors)
display_star_fin(self, c)
def __repr__(self):
"""Repr overload."""
return self.starname
[docs]
def ra_dec_to_deg(self, ra, dec):
"""Transform ra, dec from selected unit to degrees."""
if isinstance(ra, float) and isinstance(dec, float):
self.ra = ra
self.dec = dec
return
c = SkyCoord(ra, dec, frame='icrs')
self.ra = c.ra.deg
self.dec = c.dec.deg
pass
[docs]
def load_grid(self, model):
"""Load the model grid for interpolation."""
# Grid stuff
if model.lower() == 'phoenix':
gridname = gridsdir + '/model_grid_Phoenixv2.dat'
if model.lower() == 'btsettl':
gridname = gridsdir + '/model_grid_BT_Settl.dat'
if model.lower() == 'btnextgen':
gridname = gridsdir + '/model_grid_BT_NextGen.dat'
if model.lower() == 'btcond':
gridname = gridsdir + '/model_grid_BT_Cond.dat'
if model.lower() == 'ck04':
gridname = gridsdir + '/model_grid_CK04.dat'
if model.lower() == 'kurucz':
gridname = gridsdir + '/model_grid_Kurucz.dat'
if model.lower() == 'coelho':
gridname = gridsdir + '/model_grid_Coelho.dat'
if model.lower() == 'bosz':
gridname = gridsdir + '/model_grid_BOSZ.dat'
if model.lower() == 'sphinx':
gridname = gridsdir + '/model_grid_SPHINX.dat'
if model.lower() == 'tlusty':
gridname = gridsdir + '/model_grid_TLUSTY.dat'
self.full_grid = np.loadtxt(gridname)
self.teff = self.full_grid[:, 0]
self.logg = self.full_grid[:, 1]
self.z = self.full_grid[:, 2]
if self.verbose:
print('Grid ' + model + ' loaded.')
[docs]
def calculate_distance(self):
"""Calculate distance using parallax in solar radii."""
if self.plx == -1:
self.dist = -1
self.dist_e = -1
return
dist = 1 / (0.001 * self.plx)
dist_e = dist * self.plx_e / self.plx
self.dist = dist
self.dist_e = dist_e
[docs]
def print_mags(self, c=None):
"""Pretty print of magnitudes and errors."""
master, headers = self.__prepare_mags()
if c is not None:
print(
colored('\t\t{:^20s}\t{:^9s}\t{:^11s}'.format(*headers), c)
)
print(colored(
'\t\t--------------------\t---------\t-----------', c)
)
for i in range(master.shape[0]):
printer = '\t\t{:^20s}\t{: ^9.4f}\t{: ^11.4f}'
print(colored(printer.format(*master[i]), c))
else:
print('\t\t{:^20s}\t{:^9s}\t{:^11s}'.format(*headers))
print('\t\t--------------------\t---------\t-----------')
for i in range(master.shape[0]):
printer = '\t\t{:^20s}\t{: ^9.4f}\t{: ^11.4f}'
print(printer.format(*master[i]))
print('')
[docs]
def save_mags(self, out):
"""Save the used magnitudes in a file."""
master, headers = self.__prepare_mags()
fmt = '%s %2.4f %2.4f'
np.savetxt(out + 'mags.dat', master, header=' '.join(headers),
delimiter=' ', fmt=fmt)
def __prepare_mags(self):
"""Prepare mags for either printing or saving in a file."""
mags = self.mags[np.append(self.filter_mask,
self.irx_filter_mask).astype(int)]
ers = self.mag_errs[np.append(self.filter_mask,
self.irx_filter_mask).astype(int)]
filt = self.filter_names[np.append(self.filter_mask,
self.irx_filter_mask).astype(int)]
master = np.zeros(
mags.size,
dtype=[
('var1', 'U16'),
('var2', float),
('var3', float)
])
master['var1'] = filt
master['var2'] = mags
master['var3'] = ers
headers = ['Filter', 'Magnitude', 'Uncertainty']
return master, headers
[docs]
def estimate_logg(self, out='.'):
"""Estimate logg values from MIST isochrones."""
self.get_logg = True
c = random.choice(self.colors)
params = dict() # params for isochrones.
if self.temp is not None and self.temp_e != 0:
params['Teff'] = (self.temp, self.temp_e)
if self.lum is not None and self.lum != 0:
params['LogL'] = (np.log10(self.lum),
np.log10(self.lum_e))
if self.get_rad and self.rad is not None and self.rad != 0:
params['radius'] = (self.rad, self.rad_e)
params['parallax'] = (self.plx, self.plx_e)
mags = self.mags[iso_mask == 1]
mags_e = self.mag_errs[iso_mask == 1]
used_bands = []
for m, e, b in zip(mags, mags_e, iso_bands):
if m != 0:
params[b] = (m, e)
used_bands.append(b)
if self.verbose:
print(
colored(
'\t\t*** ESTIMATING LOGG USING MIST ISOCHRONES ***', c
)
)
logg_est = estimate(used_bands, params, logg=True, out_folder=out)
if logg_est is not None:
self.logg = logg_est[0]
self.logg_e = logg_est[1]
print(colored('\t\t\tEstimated log g : ', c), end='')
print(
colored(
'{:.3f} +/- {:.3f}'.format(self.logg, self.logg_e), c)
)
[docs]
def add_mag(self, mag, err, filt):
"""Add an individual photometry point to the SED."""
mask = self.filter_names == filt
self.mags[mask] = mag
self.mag_errs[mask] = err
if filt not in self.filter_names[-5:]:
self.used_filters[mask] = 1
self.filter_mask = np.where(self.used_filters == 1)[0]
else:
self.irx_used_filters[mask] = 1
self.irx_filter_mask = np.where(self.irx_used_filters == 1)[0]
self.__reload_fluxes()
print(colored(f'\t\t\tAdded {filt} {mag} +/- {err}!!', 'yellow'))
pass
[docs]
def remove_mag(self, filt):
"""Remove an individual photometry point."""
mask = self.filter_names == filt
self.mags[mask] = 0
self.mag_errs[mask] = 0
self.used_filters[mask] = 0
self.filter_mask = np.where(self.used_filters == 1)[0]
self.__reload_fluxes()
print(colored(f'\t\t\tRemoved {filt}!!', 'yellow'))
pass
[docs]
def clip_outlier_magnitudes(self, sigma=5.0, err_floor=0.05, warn_only=False):
"""Iterative SED outlier rejection using synthetic photometry.
Uses per-filter zero-point fluxes (Vega or AB as appropriate)
so that magnitudes across different photometric systems are
compared on a consistent scale.
Seeds the trusted set from Gaia anchors (BP/G/RP), then iteratively
promotes the best-fitting untrusted filter if its blackbody residual
is within *sigma*. Filters never promoted are removed from the SED.
Falls back to iterative worst-outlier removal when no Gaia filters
are present.
Parameters
----------
sigma : float
Rejection threshold in units of max(photometric_error, err_floor).
err_floor : float
Minimum assumed error (mag) to avoid over-rejecting precise points.
Returns
-------
list[str]
Names of removed filters.
"""
# Persisted for the blackbody diagnostic overlay in
# SEDPlotter.plot_SED_no_model. ``_qc_bb_teff`` is the best-fit
# blackbody temperature on the trusted set; ``_qc_bb_flagged_bands``
# lists the bands flagged as outliers (whether or not removed).
self._qc_bb_teff = None
self._qc_bb_flagged_bands = []
mask = self.filter_mask
if len(mask) < 4:
return []
filt_names = self.filter_names[mask]
mags = self.mags[mask].copy()
errs = self.mag_errs[mask].copy()
try:
lam_m = np.array([get_effective_wavelength(f) for f in filt_names]) * 1e-6
except Exception:
return []
# Pre-compute per-filter zero-point fluxes (Vega or AB)
f0_arr = np.empty(len(filt_names))
for i, band in enumerate(filt_names):
if 'PS1_' in band or 'SDSS_' in band or 'GALEX_' in band:
leff = get_effective_wavelength(band)
f0_arr[i] = convert_f_nu_to_f_lambda(3.631e-20, leff)
else:
f0_arr[i] = get_zero_flux(band)
def _synth_mags(T):
x = _h * _c / (lam_m * _k * np.maximum(T, 1.0))
B = 1.0 / (lam_m ** 5 * (np.exp(np.clip(x, 0, 500)) - 1.0))
return -2.5 * np.log10(np.maximum(B / f0_arr, 1e-300))
def _residuals(fit_idx):
if len(fit_idx) < 2:
return np.full(len(mags), np.inf), np.nan
best_resid = np.full(len(mags), np.inf)
best_rss = np.inf
best_T = np.nan
for T in np.linspace(2000, 60000, 300):
model_mag = _synth_mags(T)
offset = np.mean(mags[fit_idx] - model_mag[fit_idx])
resid = np.abs(mags - (model_mag + offset))
rss = np.sum(resid[fit_idx] ** 2)
if rss < best_rss:
best_rss = rss
best_resid = resid
best_T = T
return best_resid, best_T
scale = np.maximum(errs, err_floor)
gaia_mask = np.array([any(f.startswith(p) for p in _GAIA_PREFIXES)
for f in filt_names])
if np.any(gaia_mask):
trusted = gaia_mask.copy()
while True:
untrusted_idx = np.where(~trusted)[0]
if len(untrusted_idx) == 0:
break
resid, _ = _residuals(np.where(trusted)[0])
z = resid / scale
best = untrusted_idx[np.argmin(z[untrusted_idx])]
if z[best] < sigma:
trusted[best] = True
else:
break
outlier_idx = np.where(~trusted)[0]
else:
active = np.ones(len(filt_names), dtype=bool)
while True:
active_idx = np.where(active)[0]
if len(active_idx) < 4:
break
resid, _ = _residuals(active_idx)
z = resid / scale
worst = active_idx[np.argmax(z[active_idx])]
if z[worst] >= sigma:
active[worst] = False
else:
break
outlier_idx = np.where(~active)[0]
# Final fit on the trusted set, captured for the diagnostic overlay.
trusted_idx = np.setdiff1d(np.arange(len(filt_names)), outlier_idx)
_, best_T = _residuals(trusted_idx)
self._qc_bb_teff = float(best_T) if np.isfinite(best_T) else None
self._qc_bb_flagged_bands = [str(filt_names[i]) for i in outlier_idx]
removed = []
for i in outlier_idx:
filt = filt_names[i]
if warn_only:
warnings.warn(
f"Filter '{filt}' looks like a poor-quality measurement "
f"(blackbody residual > {sigma}σ). It has NOT been removed "
f"because you supplied it directly.",
UserWarning, stacklevel=3
)
print(colored(f'\t\t\tWARNING: {filt} is potentially problematic '
f'(>{sigma}σ from blackbody fit) — not removed', 'yellow'))
else:
self.remove_mag(filt)
removed.append(filt)
return removed
def __reload_fluxes(self):
# Get the wavelength and fluxes of the retrieved magnitudes.
wave, flux, flux_er, bandpass = extract_info(
self.mags, self.mag_errs, self.filter_names)
self.wave = np.zeros(self.filter_names.shape[0])
self.flux = np.zeros(self.filter_names.shape[0])
self.flux_er = np.zeros(self.filter_names.shape[0])
self.bandpass = np.zeros(self.filter_names.shape[0])
for k in wave.keys():
filt_idx = np.where(k == self.filter_names)[0]
self.wave[filt_idx] = wave[k]
self.bandpass[filt_idx] = bandpass[k]
self.flux[filt_idx] = flux[k]
self.flux_er[filt_idx] = flux_er[k]
rel_er = self.flux_er[self.filter_mask] / self.flux[self.filter_mask]
mx_rel_er = rel_er.max() + 0.1
upper = self.flux_er[self.filter_mask] == 0
flx = self.flux[self.filter_mask][upper]
for i, f in zip(self.filter_mask[upper], flx):
self.flux_er[i] = mx_rel_er * f