"""plot_utils module for plotting SEDs."""
__all__ = ['SEDPlotter']
import glob
import logging
import pickle
from random import choice
logger = logging.getLogger(__name__)
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import pandas as pd
import numpy as np
from astropy import units as u
from astropy.io import fits
from astropy.table import Table
from extinction import apply
from isochrones.interp import DFInterpolator
from matplotlib.collections import LineCollection
from matplotlib.gridspec import GridSpec
from PyAstronomy import pyasl
from scipy.optimize import curve_fit
from scipy.stats import gaussian_kde, norm
import corner
from dynesty import plotting as dyplot
from .config import (filesdir, gridsdir, modelsdir, spectra_cache)
from .isochrone import get_isochrone
from .phot_utils import *
from .sed_library import *
from .utils import *
[docs]
class SEDPlotter:
"""Artist class for all things SED.
Parameters
----------
input_files : str
Directory containing the code's output files.
out_folder : type
Directory where to put the output plots.
pdf : type
Set to True to output plots in pdf.
png : type
Set to True to output plots in png.
model : type
Set to override the SED model that's going to be plotted.
Possible values are:
- Phoenix
- BTSettl
- NextGen
- CK04 (Castelli & Kurucz 2004)
- Kurucz (Kurucz 1993)
Examples
-------
Examples should be written in doctest format, and
should illustrate how to use the function/class.
>>>
Attributes
----------
chain_out : str
Output directory for chain plot.
like_out : str
Output directory for likelihood plot.
post_out : str
Output directory for posteriors plot.
moddir : type
Directory wheere the SED models are located.
out : dict
SED fitting routine output.
engine : str
Selected fitting engine.
star : Star
The fitted Star object.
coordinator : array_like
Array coordinating fixed parameters.
fixed : array_like
Array coordinating fixed parameters.
norm : bool
norm is set to True if a normalization constant is fitted instead of
radius + distance.
grid : str
Selected model grid.
av_law : function
Exticntion law chosen for the fit.
order : array_like
Array coordinating parameter order.
interpolator : function
Interpolator function.
theta : array_like
`Best fit` parameter vector
"""
__wav_file = 'PHOENIXv2/WAVE_PHOENIX-ACES-AGSS-COND-2011.fits'
def __init__(self, input_files, out_folder, pdf=False,
model=None, settings=None, method='averaged',
save_model=False, ir_excess=False):
"""See class docstring."""
print('\nInitializing plotter.\n')
# General setup
self.pdf = pdf
png = True if not pdf else False
self.png = png
self.out_folder = out_folder
self.bma = False
self.method = method
self.save = save_model
self.irx = ir_excess
if self.irx:
model = 'btsettl'
traces = f'{out_folder}/traces'
histograms = f'{out_folder}/histograms'
self.traces_out = traces
self.hist_out = histograms
self.moddir = modelsdir
self.settings_dir = settings
# Read output files.
if input_files != 'raw':
out = pickle.load(open(input_files, 'rb'))
self.out = out
self.engine = out['engine']
self.star = out['star']
self.coordinator = out['coordinator']
self.fixed = out['fixed']
self.norm = out['norm']
if model is None:
if self.engine != 'Bayesian Model Averaging':
self.grid = out['model_grid']
else:
self.bma = True
zs = np.array([out['lnZ'][key]
for key in out['lnZ'].keys()])
keys = np.array([key for key in out['lnZ'].keys()])
grid = keys[np.argmax(zs)]
self.grid = grid
else:
self.grid = model
self.av_law = out['av_law']
# Create target folders
create_dir(out_folder)
if self.engine != 'Bayesian Model Averaging':
create_dir(traces)
create_dir(histograms)
self.star.load_grid(self.grid)
if not self.norm:
self.order = np.array(
[
'teff', 'logg', 'z',
'dist', 'rad', 'Av',
]
)
else:
self.order = np.array(
['teff', 'logg', 'z', 'norm', 'Av'])
mask = self.star.filter_mask
flxs = self.star.flux[mask]
errs = self.star.flux_er[mask]
filters = self.star.filter_names[mask]
wave = self.star.wave[mask]
if self.irx:
irx_mask = self.star.irx_filter_mask
irx_filt = self.star.filter_names[irx_mask]
irx_wave = self.star.wave[irx_mask]
for filt, flx, flx_e in zip(filters, flxs, errs):
p_ = get_noise_name(filt) + '_noise'
self.order = np.append(self.order, p_)
if self.grid.lower() == 'phoenix':
with open(gridsdir + '/Phoenixv2_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'btsettl':
with open(gridsdir + '/BTSettl_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'btnextgen':
with open(gridsdir + '/BTNextGen_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'btcond':
with open(gridsdir + '/BTCond_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'ck04':
with open(gridsdir + '/CK04_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'kurucz':
with open(gridsdir + '/Kurucz_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
if self.grid.lower() == 'coelho':
with open(gridsdir + '/Coelho_DF.pkl', 'rb') as intp:
self.interpolator = DFInterpolator(pd.read_pickle(intp))
# Get best fit parameters.
theta_samples = np.zeros(self.order.shape[0])
theta_average = np.zeros(self.order.shape[0])
for i, param in enumerate(self.order):
if param != 'inflation':
theta_samples[i] = out['best_fit_samples'][param]
theta_average[i] = out['best_fit_averaged'][param]
# Calculate best fit model.
model_samples = model_grid(theta_samples, filters, wave,
self.interpolator, self.norm,
self.av_law)
model_average = model_grid(theta_average, filters, wave,
self.interpolator, self.norm,
self.av_law)
if self.irx:
irx_model_sam = model_grid(theta_samples, irx_filt, irx_wave,
self.interpolator, self.norm,
self.av_law)
irx_model_avg = model_grid(theta_average, irx_filt, irx_wave,
self.interpolator, self.norm,
self.av_law)
if method == 'averaged':
self.theta = theta_average
self.model = model_average
if self.irx:
self.irx_model = irx_model_avg
elif method == 'samples':
self.theta = theta_samples
self.model = model_samples
if self.irx:
self.irx_model = irx_model_sam
# Get archival fluxes.
self.__extract_info()
else:
self.star = None
# Setup plots.
self.__read_config()
print('\nPlotter initialized.\n')
def __extract_info(self):
self.flux = []
self.flux_er = []
self.wave = []
self.bandpass = []
for i, f in zip(self.star.used_filters, self.star.flux):
if i:
self.flux.append(f)
for i, e in zip(self.star.used_filters, self.star.flux_er):
if i:
self.flux_er.append(e)
for i, w in zip(self.star.used_filters, self.star.wave):
if i:
self.wave.append(w)
for i, bp in zip(self.star.used_filters, self.star.bandpass):
if i:
self.bandpass.append(bp)
self.flux = np.array(self.flux)
self.flux_er = np.array(self.flux_er)
self.wave = np.array(self.wave)
self.bandpass = np.array(self.bandpass).T
# Do the same for IR bandpasses
self.irx_flux = []
self.irx_flux_er = []
self.irx_wave = []
self.irx_bandpass = []
for i, f in zip(self.star.irx_used_filters, self.star.flux):
if i:
self.irx_flux.append(f)
for i, e in zip(self.star.irx_used_filters, self.star.flux_er):
if i:
self.irx_flux_er.append(e)
for i, w in zip(self.star.irx_used_filters, self.star.wave):
if i:
self.irx_wave.append(w)
for i, bp in zip(self.star.irx_used_filters, self.star.bandpass):
if i:
self.irx_bandpass.append(bp)
self.irx_flux = np.array(self.irx_flux)
self.irx_flux_er = np.array(self.irx_flux_er)
self.irx_wave = np.array(self.irx_wave)
self.irx_bandpass = np.array(self.irx_bandpass).T
def _load_from_cache(self, model_key):
"""Return (wavelength, flux) from pre-computed HDF5 cache, or None."""
if not spectra_cache:
return None
import h5py
import os as _os
if not _os.path.isfile(spectra_cache):
return None
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
try:
with h5py.File(spectra_cache, 'r') as f:
if model_key not in f:
return None
g = f[model_key]
teffs = g['teff'][:]
loggs = g['logg'][:]
zs = g['z'][:]
t_val = np.unique(teffs)[np.argmin(np.abs(np.unique(teffs) - teff))]
l_val = np.unique(loggs)[np.argmin(np.abs(np.unique(loggs) - logg))]
z_val = np.unique(zs)[np.argmin(np.abs(np.unique(zs) - z))]
mask = (teffs == t_val) & (loggs == l_val) & (zs == z_val)
idx = np.where(mask)[0][0]
wave = g['wavelength'][:]
flux = g['flux'][idx]
return wave.astype(float), flux.astype(float)
except Exception as e:
logger.warning(f'Failed to load spectra cache for {model_key}: {e}')
return None
def _get_raw_spectrum(self, model_key, teff, logg, z):
"""Fetch broadened spectrum for given model and stellar params.
Returns (wave, flux) trimmed to the standard wavelength range and
instrumentally broadened where applicable, but NOT extincted or
normalized. Returns None on failure.
"""
orig_theta = self.theta.copy()
self.theta[0], self.theta[1], self.theta[2] = teff, logg, z
try:
cached = self._load_from_cache(model_key)
if cached is not None:
return cached
if self.moddir is None:
return None
if model_key == 'phoenix':
if not hasattr(self, '_phoenix_wave'):
self._phoenix_wave = fits.open(
self.moddir + self.__wav_file
)[0].data * u.angstrom.to(u.um)
wave = self._phoenix_wave
mask = (0.125 < wave) & (wave < 4.629296073126975)
flux = self.fetch_Phoenix()
new_w = wave[mask]
new_ww = np.linspace(new_w[0], new_w[-1], len(new_w))
brf, _ = pyasl.instrBroadGaussFast(
new_ww, flux, 1500,
edgeHandling="firstlast", fullout=True, maxsig=8
)
return new_w, brf[mask]
elif model_key in ('btsettl', 'btnextgen', 'btcond'):
fetch = {
'btsettl': self.fetch_btsettl,
'btnextgen': self.fetch_btnextgen,
'btcond': self.fetch_btcond,
}[model_key]
wave, flux = fetch()
mask = (0.125 < wave) & (wave < 4.629296073126975)
wave, flux = wave[mask], flux[mask]
new_w = np.linspace(wave[0], wave[-1], len(wave))
brf, _ = pyasl.instrBroadGaussFast(
new_w, flux, 1500,
edgeHandling="firstlast", fullout=True, maxsig=8
)
return wave, brf
elif model_key in ('ck04', 'kurucz', 'coelho'):
fetch = {
'ck04': self.fetch_ck04,
'kurucz': self.fetch_kurucz,
'coelho': self.fetch_coelho,
}[model_key]
wave, flux = fetch()
lo = 0.15 if model_key in ('kurucz', 'coelho') else 0.125
mask = (lo < wave) & (wave < 4.629296073126975)
return wave[mask], flux[mask]
except Exception as e:
logger.warning(
f'Failed to get spectrum for {model_key} '
f'(Teff={teff}, logg={logg}, z={z}): {e}'
)
return None
finally:
self.theta[:] = orig_theta
def _plot_bma_gray_spectra(self, ax, n_samples=100):
"""Draw gray sampled model spectra weighted by BMA weights."""
rng = np.random.default_rng()
Rv = 3.1
for model_name, weight in self.out['weights'].items():
if weight < 1e-4:
continue
n_model = max(1, round(weight * n_samples))
samples = self.out['originals'][model_name]
n_avail = len(samples['teff'])
indices = rng.choice(n_avail, size=n_model, replace=True)
self.star.load_grid(model_name)
# Snap samples to grid points and deduplicate
grouped = {} # (teff_grid, logg_grid, z_grid) -> [(norm, Av)]
unique_teff = np.unique(self.star.teff)
unique_logg = np.unique(self.star.logg)
unique_z = np.unique(self.star.z)
for idx in indices:
t_snap = unique_teff[
np.argmin(np.abs(unique_teff - samples['teff'][idx]))]
g_snap = unique_logg[
np.argmin(np.abs(unique_logg - samples['logg'][idx]))]
z_snap = unique_z[
np.argmin(np.abs(unique_z - samples['z'][idx]))]
if not self.norm:
rad = samples['rad'][idx]
dist = samples['dist'][idx] * u.pc.to(u.solRad)
norm_val = (rad / dist) ** 2
Av_val = samples['Av'][idx]
else:
norm_val = samples['norm'][idx]
Av_val = samples['Av'][idx]
key = (t_snap, g_snap, z_snap)
grouped.setdefault(key, []).append((norm_val, Av_val))
for (t, g, z_val), param_list in grouped.items():
result = self._get_raw_spectrum(model_name, t, g, z_val)
if result is None:
continue
wave, flux_raw = result
for norm_val, Av in param_list:
ext = self.av_law(wave * 1e4, Av, Rv)
flx = apply(ext, flux_raw.copy()) * norm_val * wave
ax.plot(wave, flx, color='gray', alpha=0.15,
lw=0.5, zorder=-1)
# Restore winning model's grid
self.star.load_grid(self.grid)
[docs]
def plot_SED_no_model(self, s=None):
"""Plot raw photometry."""
if self.star is None:
self.star = s
self.__extract_info()
# Get plot ylims.
ymin = (self.flux * self.wave).min()
ymax = (self.flux * self.wave).max()
f, ax = plt.subplots(figsize=self.figsize)
# Model plot
used_f = self.star.filter_names[self.star.filter_mask]
# Bands flagged by the blackbody QC are outlined in red so outliers
# can be eyeballed against the BB fit drawn below.
flagged = set(getattr(self.star, '_qc_bb_flagged_bands', None) or [])
# Colour each point by its effective wavelength so the SED shape reads
# directly off the plot: short wavelengths -> violet/blue, long
# wavelengths -> orange/red.
lam_log = np.log10(self.wave)
if lam_log.max() > lam_log.min():
cnorm = plt.Normalize(vmin=lam_log.min(), vmax=lam_log.max())
else:
cnorm = plt.Normalize(vmin=lam_log.min() - 1, vmax=lam_log.max() + 1)
point_colors = plt.cm.rainbow(cnorm(lam_log))
for c, w, fl, fe, bp, fi in zip(
point_colors,
self.wave, self.flux, self.flux_er,
self.bandpass, used_f):
ax.errorbar(w, fl * w,
xerr=bp, yerr=fe * w,
fmt='',
ecolor=c,
marker=None)
ax.scatter(w, fl * w,
edgecolors='black',
linewidths=0.5,
marker=self.marker,
c=[c],
s=self.scatter_size,
alpha=self.scatter_alpha, label=fi)
# QC-flagged bands get a hollow red halo drawn on top, which stays
# visible regardless of the point's wavelength colour (the colormap
# is itself red at long wavelengths).
if fi in flagged:
ax.scatter(w, fl * w,
edgecolors='red', facecolors='none',
linewidths=1.8, marker=self.marker,
s=self.scatter_size * 2.4, zorder=4)
# Blackbody fit from the photometry QC, log-scaled to the non-flagged
# bands so it overlays the data. Lets the user judge visually whether a
# red-outlined band genuinely departs from the stellar continuum.
bb_T = getattr(self.star, '_qc_bb_teff', None)
if bb_T:
h_, c_, k_ = 6.62607015e-34, 2.99792458e8, 1.380649e-23
def _planck(lam_um_arr):
lam_m = np.asarray(lam_um_arr) * 1e-6
x = h_ * c_ / (lam_m * k_ * bb_T)
return 1.0 / (lam_m ** 5 * (np.exp(np.clip(x, 0, 500)) - 1.0))
wave_um = np.asarray(self.wave)
F_lam = np.asarray(self.flux)
trusted = np.array([fi not in flagged for fi in used_f])
if trusted.sum() < 2:
trusted = np.ones(len(used_f), dtype=bool)
B_bands = _planck(wave_um)
log_scale = np.median(np.log10(F_lam[trusted])
- np.log10(B_bands[trusted]))
scale = 10.0 ** log_scale
lam_curve = np.logspace(np.log10(max(wave_um.min() * 0.5, 0.1)),
np.log10(wave_um.max() * 2.0), 400)
F_curve = _planck(lam_curve) * scale
ax.plot(lam_curve, F_curve * lam_curve,
color='0.4', lw=1.0, ls='--', zorder=1,
label=f'BB fit, T={bb_T:.0f} K')
ax.set_ylim([ymin * .8, ymax * 1.25])
ax.set_xscale('log', nonpositive='clip')
ax.set_yscale('log', nonpositive='clip')
ax.set_ylabel(r'$\lambda$F$_\lambda$ (erg cm$^{-2}$s$^{-1}$)',
fontsize=self.fontsize,
fontname=self.fontname
)
ax.legend(loc=0)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
ax.tick_params(
axis='both', which='minor',
labelsize=self.tick_labelsize
)
# X-axis built like the model SED plot: log scale in micron, integer
# ticks, lower bound set by whether GALEX UV bands are present.
ax.set_xticks(np.linspace(1, 10, 10))
ax.get_xaxis().set_major_formatter(ticker.ScalarFormatter())
fnames = self.star.filter_names[self.star.filter_mask]
if 'GALEX_FUV' in fnames or 'GALEX_NUV' in fnames:
ax.set_xlim([0.125, 6])
else:
ax.set_xlim([0.25, 6])
ax.set_xlabel(r'$\lambda (\mu m)$',
fontsize=self.fontsize,
fontname=self.fontname)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
if self.pdf:
plt.savefig(self.out_folder + '/SED_no_model.pdf',
bbox_inches='tight')
if self.png:
plt.savefig(self.out_folder + '/SED_no_model.png',
bbox_inches='tight')
pass
[docs]
def plot_SED(self):
"""Create the plot of the SED."""
if self.moddir is None and not spectra_cache:
logger.warning('Models directory not provided and no spectra cache set, skipping SED plot')
return
print('Plotting SED')
# Get plot ylims.
ymin = (self.flux * self.wave).min()
ymax = (self.flux * self.wave).max()
if self.irx:
ymin = (self.irx_model * self.irx_wave).min()
n_filt = self.star.used_filters.sum()
n_pars = int(len(self.theta) - n_filt)
# Get models residuals
mask = self.star.filter_mask
mags = self.star.mags[mask]
flxs = self.star.flux[mask]
errs = self.star.flux_er[mask]
filters = self.star.filter_names[mask]
wave = self.star.wave[mask]
for i, th in enumerate(self.theta[n_pars:]):
mag = mags[i]
filt = filters[i]
_, er = mag_to_flux(mag, th, filt)
self.theta[n_pars + i] = er
residuals, errors = get_residuals(
self.theta, flxs, errs, wave, filters, self.interpolator, self.norm,
self.av_law)
norm_res = residuals / errors
# Create plot layout
f = plt.figure(figsize=self.figsize)
gs = GridSpec(2, 1, height_ratios=[3, 0.75], hspace=0.05)
ax = f.add_subplot(gs[0])
ax_r = f.add_subplot(gs[1])
if self.bma:
self._plot_bma_gray_spectra(ax)
self.SED(ax)
# Model plot
ax.errorbar(self.wave, self.flux * self.wave,
xerr=self.bandpass, yerr=errors,
fmt=',', ecolor=self.error_color, zorder=0,
marker=None)
ax.scatter(self.wave, self.flux * self.wave,
edgecolors='black', marker=self.marker, c=self.marker_colors,
s=self.scatter_size, zorder=1, alpha=self.scatter_alpha)
ax.scatter(self.wave, self.model * self.wave,
marker=self.marker_model,
edgecolors=self.marker_colors_model, s=self.scatter_size,
facecolor='none', zorder=3, lw=3)
if self.irx:
ax.errorbar(self.irx_wave, self.irx_flux * self.irx_wave,
xerr=self.irx_bandpass, yerr=self.irx_flux_er,
fmt=',', ecolor=self.irx_error_color, zorder=0,
marker=None)
ax.scatter(self.irx_wave, self.irx_flux * self.irx_wave,
edgecolors='black', marker=self.marker,
c=self.marker_colors_irx, s=self.scatter_size, zorder=1,
alpha=self.scatter_alpha)
ax.scatter(self.irx_wave, self.irx_model * self.irx_wave,
marker=self.marker_model,
edgecolors=self.marker_colors_model_irx,
s=self.scatter_size, facecolor='none', zorder=3, lw=3)
# Residual plot
ax_r.axhline(y=0, lw=2, ls='--', c='k', alpha=.7)
ax_r.errorbar(self.wave, norm_res, zorder=3,
xerr=self.bandpass, yerr=np.array(self.flux_er) / errors,
fmt=',', ecolor=self.error_color, marker=None)
ax_r.scatter(self.wave, norm_res, zorder=3,
edgecolors='black', marker=self.marker,
c=self.marker_colors, s=self.scatter_size,
alpha=self.scatter_alpha)
# ax_r.scatter(self.wave, norm_res,
# marker=self.marker_model,
# edgecolors=self.marker_colors_model, s=self.scatter_size,
# facecolor='none', lw=3, zorder=10)
if self.irx:
irx_res = (self.irx_flux - self.irx_model) / self.irx_flux_er
ax_r.errorbar(self.irx_wave, np.zeros(self.irx_wave.shape[0]),
xerr=self.irx_bandpass, yerr=self.irx_flux_er,
fmt=',', ecolor=self.irx_error_color, marker=None)
ax_r.scatter(self.irx_wave, np.zeros(self.irx_wave.shape[0]),
edgecolors='black', marker=self.marker,
c=self.marker_colors_irx, s=self.scatter_size,
alpha=self.scatter_alpha)
ax_r.scatter(self.irx_wave, irx_res,
marker=self.marker_model,
edgecolors=self.marker_colors_model_irx,
s=self.scatter_size, facecolor='none', lw=3, zorder=10)
# Formatting
res_std = norm_res.std()
ax.set_ylim([ymin * 0.6, ymax * 1.5])
# ax_r.set_ylim([-5, 5])
ax_r.set_ylim([-5 * res_std, 5 * res_std])
ax.set_xscale('log', nonpositive='clip')
ax.set_yscale('log', nonpositive='clip')
ax_r.set_xscale('log', nonpositive='clip')
ax_r.set_xlabel(r'$\lambda (\mu m)$',
fontsize=self.fontsize,
fontname=self.fontname
)
ax.set_ylabel(r'$\lambda$F$_\lambda$ (erg cm$^{-2}$s$^{-1}$)',
fontsize=self.fontsize,
fontname=self.fontname
)
ax_r.set_ylabel('Residuals\n$(\\sigma)$',
fontsize=self.fontsize,
fontname=self.fontname
)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
ax.tick_params(
axis='both', which='minor',
labelsize=self.tick_labelsize
)
ax_r.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
xticks = np.linspace(1, 10, 10)
if self.irx:
xticks = [1, 3, 5, 10, 20, 50, 100, 250]
ax_r.set_xticks(xticks)
ax_r.get_xaxis().set_major_formatter(ticker.ScalarFormatter())
ax.set_xticks(xticks)
ax.get_xaxis().set_major_formatter(ticker.NullFormatter())
ylocmin = ticker.LinearLocator(numticks=4)
ax_r.yaxis.set_minor_locator(ylocmin)
ax_r.yaxis.set_minor_formatter(ticker.NullFormatter())
xlims1 = [0.125, 6]
xlims2 = [0.25, 6]
if self.irx:
xlims1 = [0.125, 250]
xlims2 = [0.25, 250]
if 'GALEX_FUV' in self.star.filter_names[self.star.filter_mask] or \
'GALEX_NUV' in self.star.filter_names[self.star.filter_mask]:
ax.set_xlim(xlims1)
ax_r.set_xlim(xlims1)
else:
ax.set_xlim(xlims2)
ax_r.set_xlim(xlims2)
labels = [item.get_text() for item in ax.get_xticklabels()]
empty_string_labels = [''] * len(labels)
ax.set_xticklabels(empty_string_labels)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax_r.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax_r.get_xticklabels():
tick.set_fontname(self.fontname)
if self.pdf:
plt.savefig(f'{self.out_folder}/SED.pdf', bbox_inches='tight')
if self.png:
plt.savefig(f'{self.out_folder}/SED.png', bbox_inches='tight')
if self.save:
data = np.vstack((self.wave, self.model * self.wave)).T
np.savetxt(f'{self.out_folder}/synthetic.dat', data, fmt='%s',
header='wavelength(mu m) wave*flux(erg cm-2 s-2)')
pass
[docs]
def SED(self, ax):
"""Plot the SED model."""
Rv = 3.1 # For extinction.
if not self.norm:
rad = self.theta[4]
dist = self.theta[3] * u.pc.to(u.solRad)
norm = (rad / dist) ** 2
Av = self.theta[5]
else:
norm = self.theta[3]
Av = self.theta[4]
# SED plot.
if self.grid == 'phoenix':
cached = self._load_from_cache('phoenix')
if cached is not None and not self.irx:
new_w, brf = cached
else:
wave = fits.open(self.moddir + self.__wav_file)[0].data
wave *= u.angstrom.to(u.um)
lower_lim = 0.125 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
flux = self.fetch_Phoenix()
new_w = wave[lower_lim * upper_lim]
new_ww = np.linspace(new_w[0], new_w[-1], len(new_w))
brf, _ = pyasl.instrBroadGaussFast(
new_ww, flux, 1500,
edgeHandling="firstlast",
fullout=True, maxsig=8
)
brf = brf[lower_lim * upper_lim]
ext = self.av_law(new_w * 1e4, Av, Rv)
brf = apply(ext, brf)
flx = brf * norm * new_w
ax.plot(new_w, flx, lw=1.25, color=self.model_color,
zorder=0)
elif self.grid == 'btsettl':
cached = self._load_from_cache('btsettl')
if cached is not None and not self.irx:
wave, brf = cached
else:
wave, flux = self.fetch_btsettl()
lower_lim = 0.125 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
new_w = np.linspace(wave[0], wave[-1], len(wave))
brf, _ = pyasl.instrBroadGaussFast(
new_w, flux, 1500,
edgeHandling="firstlast",
fullout=True, maxsig=8
)
ext = self.av_law(wave * 1e4, Av, Rv)
flx = apply(ext, brf)
flx *= wave * norm
ax.plot(wave, flx, lw=1.25, color=self.model_color, zorder=0)
elif self.grid == 'btnextgen':
cached = self._load_from_cache('btnextgen')
if cached is not None and not self.irx:
wave, brf = cached
else:
wave, flux = self.fetch_btnextgen()
lower_lim = 0.125 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
new_w = np.linspace(wave[0], wave[-1], len(wave))
brf, _ = pyasl.instrBroadGaussFast(
new_w, flux, 1500,
edgeHandling="firstlast",
fullout=True, maxsig=8
)
ext = self.av_law(wave * 1e4, Av, Rv)
flx = apply(ext, brf)
flx *= wave * norm
ax.plot(wave, flx, lw=1.25, color=self.model_color, zorder=0)
elif self.grid == 'btcond':
cached = self._load_from_cache('btcond')
if cached is not None and not self.irx:
wave, brf = cached
else:
wave, flux = self.fetch_btcond()
lower_lim = 0.125 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
new_w = np.linspace(wave[0], wave[-1], len(wave))
brf, _ = pyasl.instrBroadGaussFast(
new_w, flux, 1500,
edgeHandling="firstlast",
fullout=True, maxsig=8
)
ext = self.av_law(wave * 1e4, Av, Rv)
flx = apply(ext, brf)
flx *= wave * norm
ax.plot(wave, flx, lw=1.25, color=self.model_color, zorder=0)
elif self.grid == 'ck04':
cached = self._load_from_cache('ck04')
if cached is not None and not self.irx:
wave, flux = cached
else:
wave, flux = self.fetch_ck04()
lower_lim = 0.125 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
ext = self.av_law(wave * 1e4, Av, Rv)
flux = apply(ext, flux)
flux *= wave * norm
ax.plot(wave, flux, lw=1.25, color=self.model_color, zorder=0)
elif self.grid == 'kurucz':
cached = self._load_from_cache('kurucz')
if cached is not None and not self.irx:
wave, flux = cached
else:
wave, flux = self.fetch_kurucz()
lower_lim = 0.15 < wave
upper_lim = wave < 4.629296073126975
if self.irx:
upper_lim = wave < wave[-1]
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
ext = self.av_law(wave * 1e4, Av, Rv)
flux = apply(ext, flux)
flux *= wave * norm
ax.plot(wave, flux, lw=1.25, color=self.model_color, zorder=0)
elif self.grid == 'coelho':
cached = self._load_from_cache('coelho')
if cached is not None:
wave, flux = cached
else:
wave, flux = self.fetch_coelho()
lower_lim = 0.15 < wave
upper_lim = wave < 4.629296073126975
wave = wave[lower_lim * upper_lim]
flux = flux[lower_lim * upper_lim]
ext = self.av_law(wave * 1e4, Av, Rv)
flux = apply(ext, flux)
flux *= wave * norm
ax.plot(wave, flux, lw=1.25, color=self.model_color, zorder=0)
if self.save:
data = np.vstack((wave, flux)).T
np.savetxt(f'{self.out_folder}/SED.dat', data, fmt='%s',
header='wavelength(mu m) wave*flux(erg cm-2 s-2)')
pass
[docs]
def plot_trace(self):
"""Plot SED chains."""
samples = self.out['posterior_samples']
for i, param in enumerate(self.order):
if not self.coordinator[i]:
f, ax = plt.subplots(figsize=(12, 4))
ax.step(range(len(samples[param])), samples[param],
color='k', alpha=0.8)
ax.set_ylabel(param,
fontsize=self.fontsize,
fontname=self.fontname
)
ax.set_xlabel('Steps',
fontsize=self.fontsize,
fontname=self.fontname
)
best = self.out['best_fit'][param]
# ax.axhline(np.median(samples[param]), color='red', lw=2)
ax.axhline(best, color='red', lw=2)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.savefig(self.chain_out + '/' + param +
'.png', bbox_inches='tight')
plt.close('all')
pass
[docs]
def plot_like(self):
"""Plot Likelihoods."""
samples = self.out['posterior_samples']
for i, param in enumerate(self.order):
if not self.coordinator[i]:
f, ax = plt.subplots(figsize=(12, 4))
ax.scatter(samples[param], samples['loglike'], alpha=0.5, s=40)
best = self.out['best_fit'][param]
# ax.axvline(np.median(samples[param]), color='red', lw=1.5)
ax.axvline(best, color='red', lw=1.5)
ax.set_ylabel('log likelihood',
fontsize=self.fontsize,
fontname=self.fontname
)
ax.set_xlabel(param,
fontsize=self.fontsize,
fontname=self.fontname
)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.savefig(self.like_out + '/' + param + '.png',
bbox_inches='tight')
plt.close('all')
if self.engine == 'dynesty':
fig, axes = dyplot.traceplot(
self.out['dynesty'],
truths=self.theta,
show_titles=True, trace_cmap='plasma',
)
plt.savefig(self.like_out + '/dynesty_trace.png')
pass
[docs]
def plot_post(self):
"""Plot posteriors."""
samples = self.out['posterior_samples']
for i, param in enumerate(self.order):
if not self.coordinator[i]:
f, ax = plt.subplots(figsize=(12, 4))
ax.scatter(samples[param], samples['posteriors'], alpha=0.5,
s=40)
best = self.out['best_fit'][param]
# ax.axvline(np.median(samples[param]), color='red', lw=1.5)
ax.axvline(best, color='red', lw=1.5)
ax.set_ylabel('log posterior',
fontsize=self.fontsize,
fontname=self.fontname
)
ax.set_xlabel(param,
fontsize=self.fontsize,
fontname=self.fontname
)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.savefig(self.post_out + '/' + param + '.png',
bbox_inches='tight')
plt.close('all')
pass
[docs]
def plot_hist(self):
pass
[docs]
def plot_bma_hist(self):
"""Plot histograms."""
print('Plotting BMA histograms.')
colors = [
'tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple',
'tab:brown'
]
models = [key for key in self.out['originals'].keys()]
for i, param in enumerate(self.order):
if 'noise' in param:
continue
if not self.coordinator[i]:
f1, ax1 = plt.subplots(figsize=(12, 6))
f2, ax2 = plt.subplots(figsize=(12, 6))
for j, m in enumerate(models):
# Get samples
samp = self.out['originals'][m][param]
# Plot sample histogram
label = m + ' prob: {:.3f}'.format(self.out['weights'][m])
# Normal
n, bins1, patches = ax1.hist(samp, alpha=.3, bins=20,
label=label, density=True,
color=colors[j])
# Weighted
n, bins2, patches = ax2.hist(
samp, alpha=.3, bins=20, label=label,
weights=[self.out['weights'][m]] * len(samp)
)
# Fit a KDE to data
kde = gaussian_kde(samp)
# Estimate amplitude of the weighted distributions
mu, sig = norm.fit(samp)
try:
bc = bins2[:-1] + np.diff(bins2)
popt, pcov = curve_fit(norm_fit, xdata=bc, ydata=n,
p0=[mu, sig, n.max()],
maxfev=50000)
except RuntimeError:
popt = (mu, sig, n.max())
xx1 = np.linspace(bins1[0], bins1[-1], 1000)
xx2 = np.linspace(bins2[0], bins2[-1], 1000)
# Plot best fit
ax1.plot(xx1, kde(xx2), lw=2, alpha=1, color=colors[j])
ax2.plot(xx1, kde(xx2) * popt[2], lw=2, alpha=1,
color=colors[j])
# The same but for the weighted samples
n, bins, patches = ax1.hist(
self.out['weighted_samples'][param], alpha=.3,
bins=20, label='Weighted sampling', density=True,
color='tab:cyan'
)
kde = gaussian_kde(self.out['weighted_samples'][param])
xx = np.linspace(bins[0], bins[-1], 300)
ax1.plot(xx, kde(xx), color='tab:cyan', lw=2, alpha=1, ls='--')
# Now ditto for the weighted average
n, bins, patches = ax1.hist(
self.out['weighted_average'][param], alpha=.3,
bins=20, label='Weighted average', density=True,
color='tab:pink'
)
kde = gaussian_kde(self.out['weighted_average'][param])
xx = np.linspace(bins[0], bins[-1], 300)
ax1.plot(xx, kde(xx), color='tab:pink', lw=2, alpha=1, ls='-.')
lab = 'fix this'
if param == 'teff':
lab = 'Teff (K)'
elif param == 'rad':
lab = r'R$_*$ (R$_\odot$)'
elif param == 'dist':
lab = 'D (pc)'
elif param == 'logg':
lab = 'Log g'
elif param == 'Av':
lab = r'A$_{\rm V}$ (mag)'
elif param == 'z':
lab = '[Fe/H]'
elif param == 'Age':
lab = 'Age (Gyr)'
elif param == 'Mass':
lab = r'Mass (M$_\odot$)'
# Normal
ax1.set_ylabel('PDF',
fontsize=self.fontsize,
fontname=self.fontname
)
# Weighted
ax2.set_ylabel('N',
fontsize=self.fontsize,
fontname=self.fontname
)
axes = [ax1, ax2]
for ax in axes:
ax.set_xlabel(lab,
fontsize=self.fontsize,
fontname=self.fontname
)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax.get_xticklabels():
tick.set_fontname(self.fontname)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
ax.legend(loc=0, prop={'size': 16})
if param == 'z':
param = 'Fe_H'
if self.png:
f1.savefig(self.hist_out + '/' + param + '.png',
bbox_inches='tight')
f2.savefig(self.hist_out + '/weighted_' + param + '.png',
bbox_inches='tight')
if self.pdf:
f1.savefig(self.hist_out + '/' + param + '.pdf',
bbox_inches='tight')
f2.savefig(self.hist_out + '/weighted_' + param + '.pdf',
bbox_inches='tight')
plt.close(f1)
plt.close(f2)
if self.bma:
mist_samp = self.out.get('mist_samples', {})
# Age hist
if 'age' in mist_samp:
f, ax = plt.subplots(figsize=(12, 4))
samp = mist_samp['age']
n, bins, patches = ax.hist(
samp, alpha=.3, bins=20, label='MIST', density=True
)
kde = gaussian_kde(samp)
xx = np.linspace(bins[0], bins[-1], 10000)
ax.plot(xx, kde(xx), color='k', lw=2, alpha=.7)
ax.set_ylabel('PDF',
fontsize=self.fontsize,
fontname=self.fontname)
ax.set_xlabel('Age',
fontsize=self.fontsize,
fontname=self.fontname)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax.get_xticklabels():
tick.set_fontname(self.fontname)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.legend(loc=0)
if self.png:
plt.savefig(self.hist_out + '/age.png',
bbox_inches='tight')
if self.pdf:
plt.savefig(self.hist_out + '/age.pdf',
bbox_inches='tight')
plt.close(f)
# Mass hist
if 'iso_mass' in mist_samp:
f, ax = plt.subplots(figsize=(12, 4))
samp = mist_samp['iso_mass']
n, bins, patches = ax.hist(
samp, alpha=.3, bins=20, label='MIST', density=True
)
kde = gaussian_kde(samp)
xx = np.linspace(bins[0], bins[-1], 10000)
ax.plot(xx, kde(xx), color='k', lw=2, alpha=.7)
ax.set_ylabel('PDF',
fontsize=self.fontsize,
fontname=self.fontname)
ax.set_xlabel('Mass',
fontsize=self.fontsize,
fontname=self.fontname)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax.get_xticklabels():
tick.set_fontname(self.fontname)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.legend(loc=0)
if self.png:
plt.savefig(self.hist_out + '/iso_mass.png',
bbox_inches='tight')
if self.pdf:
plt.savefig(self.hist_out + '/iso_mass.pdf',
bbox_inches='tight')
plt.close(f)
# EEP hist
if 'eep' in mist_samp:
f, ax = plt.subplots(figsize=(12, 4))
samp = mist_samp['eep']
n, bins, patches = ax.hist(
samp, alpha=.3, bins=20, label='MIST', density=True
)
kde = gaussian_kde(samp)
xx = np.linspace(bins[0], bins[-1], 10000)
ax.plot(xx, kde(xx), color='k', lw=2, alpha=.7)
ax.set_ylabel('PDF',
fontsize=self.fontsize,
fontname=self.fontname)
ax.set_xlabel('EEP',
fontsize=self.fontsize,
fontname=self.fontname)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax.get_xticklabels():
tick.set_fontname(self.fontname)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
plt.legend(loc=0)
if self.png:
plt.savefig(self.hist_out + '/EEP.png',
bbox_inches='tight')
if self.pdf:
plt.savefig(self.hist_out + '/EEP.pdf',
bbox_inches='tight')
plt.close(f)
[docs]
def plot_bma_HR(self, nsamp):
"""Plot HR diagram for the star."""
bf_key = f'best_fit_{self.method}'
if 'age' not in self.out.get(bf_key, {}):
return
print('Plotting HR diagram')
# Get necessary info from the star.
age = self.out[bf_key]['age']
feh = self.out[f'best_fit_{self.method}']['z']
teff = np.log10(self.out[f'best_fit_{self.method}']['teff'])
lum = np.log10(self.out[f'best_fit_{self.method}']['lum'])
teff_lo, teff_hi = self.out[f'uncertainties_{self.method}']['teff']
lum_lo, lum_hi = self.out[f'uncertainties_{self.method}']['lum']
teff_lo = teff_lo / (10 ** teff * np.log(10))
teff_hi = teff_hi / (10 ** teff * np.log(10))
lum_lo = lum_lo / (10 ** lum * np.log(10))
lum_hi = lum_hi / (10 ** lum * np.log(10))
ages = self.out['mist_samples']['age']
m = self.method if self.method == 'samples' else 'average'
fehs = self.out[f'weighted_{m}']['z']
if feh > 0.5:
feh = 0.5
iso_bf = get_isochrone(np.log10(age) + 9, feh)
logteff = iso_bf['logTeff'].values
loglum = iso_bf['logL'].values
mass = iso_bf['mass'].values
fig, ax = plt.subplots(figsize=self.hr_figsize)
points = np.array([logteff, loglum]).T.reshape(-1, 1, 2)
segments = np.concatenate([points[:-1], points[1:]], axis=1)
norm = plt.Normalize(mass.min(), mass.max())
lc = LineCollection(segments, cmap=self.hr_cmap, norm=norm,
linewidths=5)
lc.set_array(mass)
line = ax.add_collection(lc)
line.zorder = 1000
cbar = fig.colorbar(line, ax=ax, pad=0.01)
cbar.set_label(r'$M_\odot$',
rotation=270,
fontsize=self.fontsize,
fontname=self.fontname,
labelpad=20)
for i in range(nsamp):
a = np.log10(choice(ages)) + 9
z = choice(fehs)
if z > 0.5:
z = 0.5
iso = get_isochrone(a, z)
logt = iso['logTeff'].values
logl = iso['logL'].values
ax.plot(logt, logl, color='gray')
ax.errorbar(teff, lum, xerr=[[teff_lo], [teff_hi]],
yerr=[[lum_lo], [lum_hi]], color=self.hr_color,
zorder=1001)
ax.scatter(teff, lum, s=350, color=self.hr_color, zorder=1002,
edgecolors='k', marker=self.hr_marker)
ax.invert_xaxis()
ax.set_xlabel('logTeff',
fontsize=self.fontsize,
fontname=self.fontname)
ax.set_ylabel('logL',
fontsize=self.fontsize,
fontname=self.fontname)
ax.tick_params(
axis='both', which='major',
labelsize=self.tick_labelsize
)
for ll in cbar.ax.yaxis.get_ticklabels():
ll.set_fontsize(self.tick_labelsize)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
for tick in ax.get_yticklabels():
tick.set_fontname(self.fontname)
if self.png:
plt.savefig(self.out_folder + '/HR_diagram.png',
bbox_inches='tight')
if self.pdf:
plt.savefig(self.out_folder + '/HR_diagram.pdf',
bbox_inches='tight')
[docs]
def plot_corner(self):
"""Make corner plot."""
print('Plotting corner.')
m = self.method if self.method == 'samples' else 'average'
samples = self.out[f'weighted_{m}']
all_samps = []
theta_lo = []
theta_up = []
for i, o in enumerate(self.order):
if 'noise' in o:
self.coordinator[i] = 1
theta = self.theta[self.coordinator == 0]
used_params = self.order[self.coordinator == 0]
for i, param in enumerate(self.order):
if not self.coordinator[i]:
if 'noise' in param:
continue
_, lo, up = credibility_interval(samples[param])
theta_lo.append(lo)
theta_up.append(up)
all_samps.append(samples[param])
corner_samp = np.vstack(all_samps)
titles = self.__create_titles(used_params, theta, theta_up, theta_lo)
labels = self.__create_labels(used_params)
fig = corner.corner(
corner_samp.T,
plot_contours=True,
fill_contours=False,
plot_datapoints=True,
no_fill_contours=True,
max_n_ticks=4
)
axes = np.array(fig.axes).reshape((theta.shape[0], theta.shape[0]))
for i in range(theta.shape[0]):
ax = axes[i, i]
ax.axvline(theta[i], color=self.corner_med_c,
linestyle=self.corner_med_style)
ax.axvline(theta_lo[i], color=self.corner_v_c,
linestyle=self.corner_v_style)
ax.axvline(theta_up[i], color=self.corner_v_c,
linestyle=self.corner_v_style)
t = titles[i]
ax.set_title(t, fontsize=self.corner_fontsize,
fontname=self.fontname)
for yi in range(theta.shape[0]):
for xi in range(yi):
ax = axes[yi, xi]
if xi == 0:
for tick in ax.yaxis.get_major_ticks():
tick.label1.set_fontsize(self.corner_tick_fontsize)
tick.label1.set_fontname(self.fontname)
ax.set_ylabel(
labels[yi],
labelpad=self.corner_labelpad,
fontsize=self.corner_fontsize,
fontname=self.fontname
)
if yi == theta.shape[0] - 1:
for tick in ax.xaxis.get_major_ticks():
tick.label1.set_fontsize(self.corner_tick_fontsize)
tick.label1.set_fontname(self.fontname)
ax.set_xlabel(
labels[xi],
labelpad=self.corner_labelpad,
fontsize=self.corner_fontsize,
fontname=self.fontname
)
ax.axvline(theta[xi], color=self.corner_med_c,
linestyle=self.corner_med_style)
ax.axhline(theta[yi], color=self.corner_med_c,
linestyle=self.corner_med_style)
ax.plot(theta[xi], theta[yi], self.corner_marker)
axes[-1, -1].set_xlabel(
labels[-1],
labelpad=self.corner_labelpad,
fontsize=self.corner_fontsize,
fontname=self.fontname
)
for tick in axes[-1, -1].xaxis.get_major_ticks():
tick.label1.set_fontsize(self.corner_tick_fontsize)
tick.label1.set_fontname(self.fontname)
if self.pdf:
plt.savefig(f'{self.out_folder}/CORNER.pdf',
bbox_inches='tight')
if self.png:
plt.savefig(f'{self.out_folder}/CORNER.png',
bbox_inches='tight')
pass
[docs]
def clean(self):
"""Close opened figures."""
plt.close('all')
[docs]
def fetch_Phoenix(self):
"""Fetch correct Phoenixv2 SED file.
The directory containing the Phoenix spectra must be called PHOENIXv2
Within PHOENIXv2 there should be the wavelength file called
WAVE_PHOENIX-ACES-AGSS-COND-2011.fits and several folders called
Z[-/+]X.X where X.X are the metallicities (e.g. Z-0.0, Z+1.0, etc)
"""
# Change hdd to a class variable depending on an env param.
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff])
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
selected_SED = self.moddir + 'PHOENIXv2/Z'
metal_add = ''
if sel_z < 0:
metal_add = str(sel_z)
if sel_z == 0:
metal_add = '-0.0'
if sel_z > 0:
metal_add = '+' + str(sel_z)
selected_SED += metal_add
selected_SED += '/lte'
selected_SED += str(sel_teff) if len(str(sel_teff)) == 5 else \
'0' + str(sel_teff)
selected_SED += '-' + str(sel_logg) + '0'
selected_SED += metal_add
selected_SED += '.PHOENIX-ACES-AGSS-COND-2011-HiRes.fits'
flux = fits.open(selected_SED)[0].data
flux *= (u.erg / u.s / u.cm ** 2 / u.cm).to(
u.erg / u.s / u.cm ** 2 / u.um)
return flux
[docs]
def fetch_btsettl(self):
"""Fetch correct BT-Settl SED file.
The directory containing the BT-Settl spectra must be called BTSettl
Within BTSettl there should be yet another directory
called AGSS2009, within BTSettl/AGSS2009 there should be the SED fits
files with the following naming convention:
lteTTT-G.G[-/+]Z.Za+0.0.BT-Settl.AGSS2009.fits
where TTT are the first 3 digits of the effective temperature if it's a
number over 10000, else it's the first 2 digit prepended by a 0.
G.G is the log g and Z.Z the metallicity.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff]) // 100
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
metal_add = ''
if sel_z < 0:
metal_add = str(sel_z)
if sel_z == 0:
metal_add = '-0.0'
if sel_z > 0:
metal_add = '+' + str(sel_z)
selected_SED = self.moddir + 'BTSettl/AGSS2009/lte'
selected_SED += str(sel_teff) if len(str(sel_teff)) == 3 else \
'0' + str(sel_teff)
selected_SED += '-' + str(sel_logg) + metal_add + 'a+*'
gl = glob.glob(selected_SED)
selected_SED = gl[0]
tab = Table(fits.open(selected_SED)[1].data)
flux = np.array(tab['FLUX'].tolist()) * conversion
wave = np.array(tab['WAVELENGTH'].tolist()) * u.angstrom.to(u.um)
return wave, flux
[docs]
def fetch_btnextgen(self):
"""Fetch correct BT-NextGen SED file.
The directory containing the BT-NextGen spectra must be called
BTNextGen. Within BTNextGen there should be yet another directory
called AGSS2009, within BTNextGen/AGSS2009 there should be the SED fits
files with the following naming convention:
lteTTT-G.G[-/+]Z.Za+0.0..BT-NextGen.AGSS2009.fits
where TTT are the first 3 digits of the effective temperature if it's a
number over 10000, else it's the first 2 digit prepended by a 0.
G.G is the log g and Z.Z the metallicity.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff]) // 100
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
metal_add = ''
if sel_z < 0:
metal_add = str(sel_z)
if sel_z == 0:
metal_add = '-0.0'
if sel_z > 0:
metal_add = '+' + str(sel_z)
selected_SED = self.moddir + 'BTNextGen/AGSS2009/lte'
selected_SED += str(sel_teff) if len(str(sel_teff)) == 3 else \
'0' + str(sel_teff)
selected_SED += '-' + str(sel_logg) + metal_add + 'a+*'
gl = glob.glob(selected_SED)
selected_SED = gl[0]
tab = Table(fits.open(selected_SED)[1].data)
flux = np.array(tab['FLUX'].tolist()) * conversion
wave = np.array(tab['WAVELENGTH'].tolist()) * u.angstrom.to(u.um)
return wave, flux
[docs]
def fetch_btcond(self):
"""Fetch correct BT-COND SED file.
The directory containing the BT-COND spectra must be called
BTCOND. Within BTCOND there should be yet another directory
called CIFIST2011, within BTCOND/CIFIST2011 there should be the SED
fits files with the following naming convention:
lteTTT-G.G[-/+]Z.Za+0.0..BT-Cond.CIFIST2011.fits
where TTT are the first 3 digits of the effective temperature if it's a
number over 10000, else it's the first 2 digit prepended by a 0.
G.G is the log g and Z.Z the metallicity.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff]) // 100
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
metal_add = ''
if sel_z < 0:
metal_add = str(sel_z)
if sel_z == 0:
metal_add = '-0.0'
if sel_z > 0:
metal_add = '+' + str(sel_z)
selected_SED = self.moddir + 'BTCond/CIFIST2011/lte'
selected_SED += str(sel_teff) if len(str(sel_teff)) == 3 else \
'0' + str(sel_teff)
selected_SED += '-' + str(sel_logg) + metal_add + 'a+*'
gl = glob.glob(selected_SED)
selected_SED = gl[0]
tab = Table(fits.open(selected_SED)[1].data)
flux = np.array(tab['FLUX'].tolist()) * conversion
wave = np.array(tab['WAVELENGTH'].tolist()) * u.angstrom.to(u.um)
return wave, flux
[docs]
def fetch_ck04(self):
"""Fetch correct Castelli-Kurucz 2004 SED file.
The directory containing the Castelli-Kurucz spectra must be called
Castelli_Kurucz. Within Castelli_Kurucz there should be a group of
directories called ck[pm]ZZ where ZZ is the metalicity without the dot.
Within each directory there are fits files named:
ck[pm]ZZ_TTTT.fits
where ZZ is metalicity as previous and TTTT is the effective
temperature.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff])
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
metal_add = ''
if sel_z < 0:
metal_add = 'm' + str(-sel_z).replace('.', '')
if sel_z == 0:
metal_add = 'p00'
if sel_z > 0:
metal_add = 'p' + str(sel_z).replace('.', '')
name = 'ck' + metal_add
lgg = 'g{:02.0f}'.format(sel_logg * 10)
selected_SED = self.moddir + 'Castelli_Kurucz/' + name + '/' + name
selected_SED += '_' + str(sel_teff) + '.fits'
tab = Table(fits.open(selected_SED)[1].data)
wave = np.array(tab['WAVELENGTH'].tolist()) * u.angstrom.to(u.um)
flux = np.array(tab[lgg].tolist()) * conversion
return wave, flux
[docs]
def fetch_kurucz(self):
"""Fetch correct Kurucz 1993 SED file.
The directory containing the Kurucz spectra must be called
Kurucz. Within Kurucz there should be a group of
directories called k[pm]ZZ where ZZ is the metalicity without the dot.
Within each directory there are fits files named:
k[pm]ZZ_TTTT.fits
where ZZ is metalicity as previous and TTTT is the effective
temperature.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff])
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
metal_add = ''
if sel_z < 0:
metal_add = 'm' + str(-sel_z).replace('.', '')
if sel_z == 0:
metal_add = 'p00'
if sel_z > 0:
metal_add = 'p' + str(sel_z).replace('.', '')
name = 'k' + metal_add
lgg = 'g{:02.0f}'.format(sel_logg * 10)
selected_SED = self.moddir + 'Kurucz/' + name + '/' + name
selected_SED += '_' + str(sel_teff) + '.fits'
tab = Table(fits.open(selected_SED)[1].data)
wave = np.array(tab['WAVELENGTH'].tolist()) * u.angstrom.to(u.um)
flux = np.array(tab[lgg].tolist()) * conversion
return wave, flux
[docs]
def fetch_coelho(self):
"""Fetch correct Coelho 2014 SED file.
The directory containing the Coelho spectra must be called
Coelho14. Within Coelho14 there should be a group of
files called t[0X]XXXX_g[+-]Y.Y_[mp]ZZp0[14]_sed.fits
where X is the temperature, Y is the logg and Z the metallicity.
"""
conversion = (u.erg / u.s / u.cm ** 2 / u.angstrom)
conversion = conversion.to(u.erg / u.s / u.cm ** 2 / u.um)
teff = self.theta[0]
logg = self.theta[1]
z = self.theta[2]
select_teff = np.argmin((abs(teff - np.unique(self.star.teff))))
select_logg = np.argmin((abs(logg - np.unique(self.star.logg))))
select_z = np.argmin((abs(z - np.unique(self.star.z))))
sel_teff = int(np.unique(self.star.teff)[select_teff])
sel_logg = np.unique(self.star.logg)[select_logg]
sel_z = np.unique(self.star.z)[select_z]
sel_teff = str(sel_teff) if sel_teff >= 1e5 else '0{}'.format(sel_teff)
selected_SED = self.moddir + 'Coelho14/t' + sel_teff + '_g'
sel_logg = '+{:.1f}'.format(sel_logg) if sel_logg > 0 else '-0.5'
selected_SED += sel_logg
if sel_z < 0:
selected_SED += '_m{:02.0f}'.format(-sel_z * 10)
else:
selected_SED += '_p{:02.0f}'.format(sel_z * 10)
selected_SED = glob.glob(selected_SED + 'p0[04]_sed.fits')
selected_SED = selected_SED[0]
hdul = fits.open(selected_SED)
head = hdul[0].header
data = hdul[0].data
flux = data * conversion
CRVAL1 = head['CRVAL1']
CDEL1 = head['CDELT1']
wave = 10 ** np.array(
[CRVAL1 + CDEL1 * i for i in range(data.shape[0])])
wave *= u.angstrom.to(u.um)
return wave, flux
def __create_titles(self, titles, theta, theta_up, theta_lo):
new_titles = np.empty(titles.shape[0], dtype=object)
for i, param in enumerate(titles):
if param == 'teff':
new_titles[i] = r'Teff ='
if param == 'logg':
new_titles[i] = r' Log g ='
if param == 'z':
new_titles[i] = r' [Fe/H] ='
if param == 'dist':
new_titles[i] = r' D ='
if param == 'rad':
new_titles[i] = r'R ='
if param == 'norm':
new_titles[i] = r' (R/D)$^2$ ='
if param == 'Av':
new_titles[i] = r'Av ='
if param == 'inflation':
new_titles[i] = r'$\sigma$ ='
if param == 'rad' or param == 'dist':
new_titles[i] += '{:.3f}'.format(theta[i])
new_titles[i] += r'$^{+' + \
'{:.3f}'.format(theta_up[i] - theta[i])
new_titles[i] += r'}_{-' + \
'{:.3f}'.format(theta[i] - theta_lo[i])
new_titles[i] += r'}$'
else:
new_titles[i] += '{:.2f}'.format(theta[i])
new_titles[i] += r'$^{+' + \
'{:.2f}'.format(theta_up[i] - theta[i])
new_titles[i] += r'}_{-' + \
'{:.2f}'.format(theta[i] - theta_lo[i])
new_titles[i] += r'}$'
return new_titles
def __create_labels(self, labels):
new_labels = np.empty(labels.shape[0], dtype=object)
for i, param in enumerate(labels):
if param == 'teff':
new_labels[i] = r'Teff (K)'
if param == 'logg':
new_labels[i] = r'Log g'
if param == 'z':
new_labels[i] = r'[Fe/H]'
if param == 'dist':
new_labels[i] = r'D (pc)'
if param == 'rad':
new_labels[i] = r'R $($R$_\odot)$'
if param == 'norm':
new_labels[i] = r'(R/D)'
if param == 'Av':
new_labels[i] = r'Av'
if param == 'inflation':
new_labels[i] = r'$\sigma$'
return new_labels
def __read_config(self):
"""Read plotter configuration file."""
if self.settings_dir is None:
settings = open(filesdir + '/plot_settings.dat', 'r')
else:
settings = open(self.settings_dir, 'r')
for line in settings.readlines():
if line[0] == '#' or line[0] == '\n':
continue
splt = line.split(' ')
attr = splt[0]
if attr == 'figsize':
vals = splt[1].split('\n')[0].split(',')
val = (int(vals[0]), int(vals[1]))
elif attr == 'hr_figsize':
vals = splt[1].split('\n')[0].split(',')
val = (int(vals[0]), int(vals[1]))
elif 'alpha' in attr:
val = float(splt[1].split('\n')[0])
else:
try:
val = int(splt[1].split('\n')[0])
except ValueError:
val = splt[1].split('\n')[0]
setattr(self, attr, val)