Source code for astroARIADNE.utils

"""Various utilities used throughout the code.

Here go various utilities that don't belong directly in any class,
photometry utils module nor or SED model module.
"""

__all__ = ['estimate_pdf', 'estimate_cdf', 'norm_fit', 'credibility_interval',
           'display_routine', 'display_star_fin', 'display_star_init', 'end',
           'get_noise_name', 'create_dir', 'execution_time', 'out_filler']

import os
import pickle
import random
import time
from contextlib import closing

from .config import colors

import numpy as np
from scipy.special import erf
from scipy.stats import (gaussian_kde, norm)
from termcolor import colored


[docs] def estimate_pdf(distribution): """Estimates the PDF of a distribution using a gaussian KDE. Parameters ---------- distribution: array_like The distribution. Returns ------- xx: array_like The x values of the PDF. pdf: array_like The estimated PDF. """ kde = gaussian_kde(distribution) xmin, xmax = distribution.min(), distribution.max() xx = np.linspace(xmin, xmax, 300) pdf = kde(xx) return xx, pdf
[docs] def estimate_cdf(distribution, hdr=False): """Estimate the CDF of a distribution.""" h, hx = np.histogram(distribution, density=True, bins=499) cdf = np.zeros(500) # ensure the first value of the CDF is 0 if hdr: idx = np.argsort(h)[::-1] cdf[1:] = np.cumsum(h[idx]) * np.diff(hx) else: cdf[1:] = np.cumsum(h) * np.diff(hx) return cdf
[docs] def norm_fit(x, mu, sigma, A): """Gaussian function.""" return A * norm.pdf(x, loc=mu, scale=sigma)
[docs] def credibility_interval(post, alpha=1.): """Calculate bayesian credibility interval. Parameters: ----------- post : array_like The posterior sample over which to calculate the bayesian credibility interval. alpha : float, optional Confidence level. Returns: -------- med : float Median of the posterior. low : float Lower part of the credibility interval. up : float Upper part of the credibility interval. """ z = erf(alpha / np.sqrt(2)) lower_percentile = 100 * (1 - z) / 2 upper_percentile = 100 * (1 + z) / 2 low, med, up = np.percentile( post, [lower_percentile, 50, upper_percentile] ) return med, low, up
def credibility_interval_hdr(xx, pdf, cdf, sigma=1.): """Calculate the highest density region for an empirical distribution. Reference: Hyndman, Rob J. 1996 Parameters ---------- xx: array_like The x values of the PDF (and the y values of the CDF). pdf: array_like The PDF of the distribution. cdf: array_like The CDF of the distribution. sigma: float The confidence level in sigma notation. (e.g. 1 sigma = 68%) Returns ------- best: float The value corresponding to the peak of the posterior distribution. low: float The minimum value of the HDR. high: float The maximum value of the HDR. Note: The HDR is capable of calculating more robust credible regions for multimodal distributions. It is identical to the usual probability regions of symmetric about the mean distributions. Using this then should lead to more realistic errorbars and 3-sigma intervals for multimodal outputs. """ # Get best fit value best = xx[np.argmax(pdf)] z = erf(sigma / np.sqrt(2)) # Sort the pdf in reverse order idx = np.argsort(pdf)[::-1] # Find where the CDF reaches 100*z% idx_hdr = np.where(cdf >= z)[0][0] # Isolate the HDR hdr = pdf[idx][:idx_hdr] # Get the minimum density hdr_min = hdr.min() # Get CI low = xx[pdf > hdr_min].min() high = xx[pdf > hdr_min].max() return best, low, high
[docs] def display_star_fin(star, c): """Display stellar information.""" temp, temp_e = star.temp, star.temp_e rad, rad_e = star.rad, star.rad_e plx, plx_e = star.plx, star.plx_e lum, lum_e = star.lum, star.lum_e dist, dist_e = star.dist, star.dist_e print(colored(f'\t\t\tGaia DR3 ID : {star.g_id}', c)) if star.tic: print(colored(f'\t\t\tTIC : {star.tic}', c)) if star.kic: print(colored(f'\t\t\tKIC : {star.kic}', c)) print(colored('\t\t\tGaia Effective temperature : ', c), end='') print(colored(f'{temp:.3f} +/- {temp_e:.3f}', c)) if rad is not None: print(colored('\t\t\tGaia Stellar radius : ', c), end='') print(colored(f'{rad:.3f} +/- {rad_e:.3f}', c)) if lum is not None: print(colored('\t\t\tGaia Stellar Luminosity : ', c), end='') print(colored(f'{lum:.3f} +/- {lum_e:.3f}', c)) print(colored('\t\t\tGaia Parallax : ', c), end='') print(colored(f'{plx:.3f} +/- {plx_e:.3f}', c)) print(colored('\t\t\tBailer-Jones distance : ', c), end='') print(colored(f'{dist:.3f} +/- {dist_e:.3f}', c)) print(colored('\t\t\tMaximum Av : ', c), end='') print(colored(f'{star.Av:.3f}', c)) pass
[docs] def display_star_init(star, c): """Display stellar information.""" print(colored('\n\t\t#####################################', c)) print(colored('\t\t## ARIADNE ##', c)) print(colored('\t\t#####################################', c)) print(colored(' spectrAl eneRgy dIstribution', c), end=' ') print(colored('bAyesian moDel averagiNg fittEr', c)) print(colored('\n\t\t\tAuthor : Jose Vines', c)) print(colored('\t\t\tContact : jose . vines at ug . uchile . cl', c)) print(colored('\t\t\tStar : ', c), end='') print(colored(star.starname, c)) pass
[docs] def display_routine(engine, live_points, dlogz, ndim, bound=None, sample=None, nthreads=None, dynamic=None): """Display program information. What is displayed is: Program name Program author Star selected Algorithm used (i.e. Multinest or Dynesty) Setup used (i.e. Live points, dlogz tolerance) """ c = random.choice(colors) if engine == 'multinest': engine = 'MultiNest' if engine == 'dynesty': engine = 'Dynesty' print(colored('\n\t\t*** EXECUTING MAIN FITTING ROUTINE ***', c)) print(colored('\t\t\tSelected engine : ', c), end='') print(colored(engine, c)) print(colored('\t\t\tLive points : ', c), end='') print(colored(str(live_points), c)) print(colored('\t\t\tlog Evidence tolerance : ', c), end='') print(colored(str(dlogz), c)) print(colored('\t\t\tFree parameters : ', c), end='') print(colored(str(ndim), c)) if engine == 'Dynesty' or engine == 'Bayesian Model Averaging': print(colored('\t\t\tBounding : ', c), end='') print(colored(bound, c)) print(colored('\t\t\tSampling : ', c), end='') print(colored(sample, c)) print(colored('\t\t\tN threads : ', c), end='') print(colored(nthreads, c)) if dynamic: print(colored('\t\t\tRunning the Dynamic Nested Sampler', c)) print('') pass
[docs] def end(coordinator, elapsed_time, out_folder, engine, use_norm): """Display end of run information.""" c = random.choice(colors) _T = '\t\t\t' if use_norm: order = np.array(['teff', 'logg', 'z', 'norm', 'rad', 'Av']) else: order = np.array(['teff', 'logg', 'z', 'dist', 'rad', 'Av']) if engine == 'Bayesian Model Averaging': res_dir = f'{out_folder}/BMA.pkl' else: res_dir = f'{out_folder}/{engine}_out.pkl' with closing(open(res_dir, 'rb')) as jar: out = pickle.load(jar) star = out['star'] mask = star.filter_mask for filt in star.filter_names[mask]: p_ = get_noise_name(filt) + '_noise' order = np.append(order, p_) # Display name mapping display_names = { 'teff': 'Teff (K)', 'logg': 'log(g) (dex)', 'z': '[Fe/H] (dex)', 'dist': 'Distance (pc)', 'rad': 'Radius (Rsun)', 'norm': '(R/D)^2', 'Av': 'Av (mag)', 'grav_mass': 'Grav. mass (Msun)', 'lum': 'Luminosity (Lsun)', 'AD': 'Ang. diameter (mas)', 'iso_mass': 'Iso. mass (Msun)', 'age': 'Age (Gyr)', 'eep': 'EEP', } def _fmt_row(name, med, unc=None, ci=None, fmt='g'): """Format one table row.""" col_name = f'{name:24s}' col_med = f'{med:>12.4{fmt}}' if unc is None: col_unc = f'{"fixed":>20s}' col_ci = f'{"":>20s}' else: unlo, unhi = unc lo, hi = ci col_unc = f'+{unhi:.4{fmt}} -{unlo:.4{fmt}}' col_ci = f'[{lo:.4{fmt}}, {hi:.4{fmt}}]' return f'{_T}{col_name} {col_med} {col_unc:>20s} {col_ci:>20s}' # Header print() print(colored(f'{_T}Fitting finished.', c)) print(colored(f'{_T}Best fit parameters are:', c)) print() hdr = f'{_T}{"Parameter":24s} {"Median":>12s} {chr(0x00B1) + "1" + chr(0x03c3) + " CI":>20s} {"3" + chr(0x03c3) + " CI":>20s}' print(colored(hdr, c)) print(colored(f'{_T}{"-" * 82}', c)) # Physical parameters for i, p in enumerate(order): if 'noise' in p: continue name = display_names.get(p, p) med = out['best_fit_averaged'][p] fmt = 'e' if p == 'norm' else 'g' if not coordinator[i]: unc = out['uncertainties_averaged'][p] ci = out['confidence_interval_averaged'][p] print(colored(_fmt_row(name, med, unc, ci, fmt), c)) else: print(colored(_fmt_row(name, med, fmt=fmt), c)) # Derived parameters derived = [] if not use_norm: derived.append('AD') derived.extend(['grav_mass', 'lum']) if engine == 'Bayesian Model Averaging': derived.extend(['iso_mass', 'age', 'eep']) for p in derived: if p not in out['best_fit_averaged']: continue name = display_names.get(p, p) med = out['best_fit_averaged'][p] unc = out['uncertainties_averaged'][p] ci = out['confidence_interval_averaged'][p] print(colored(_fmt_row(name, med, unc, ci), c)) # Spectral type spt = out['spectral_type'] print(colored(f'{_T}{"Spectral type":24s} {spt:>12s}', c)) # Noise parameters print() print(colored(f'{_T}{"Excess noise":24s} {"Median":>12s} {chr(0x00B1) + "1" + chr(0x03c3) + " CI":>20s} {"3" + chr(0x03c3) + " CI":>20s}', c)) print(colored(f'{_T}{"-" * 82}', c)) for i, p in enumerate(order): if 'noise' not in p: continue if 'SDSS' not in p and 'PS1' not in p: p1, p2 = p.split('_') else: p1, p2, p3 = p.split('_') p1 += '_' + p2 p2 = p3 name = f'{p1} {p2}' med = out['best_fit_averaged'][p] unc = out['uncertainties_averaged'][p] ci = out['confidence_interval_averaged'][p] print(colored(_fmt_row(name, med, unc, ci), c)) # Model weights / evidence print() if engine != 'Bayesian Model Averaging': z, z_err = out['global_lnZ'], out['global_lnZerr'] print(colored(f'{_T}log Bayesian evidence : {z:.3f} +/- {z_err:.3f}', c)) else: probs = out['weights'] lnZ = out.get('lnZ', {}) wfmt = f'{_T}{"Grid":24s} {"Probability":>12s} {"log(Z)":>12s}' print(colored(f'{_T}Model weights:', c)) print(colored(wfmt, c)) print(colored(f'{_T}{"-" * 52}', c)) for k in probs: lz = lnZ.get(k, float('nan')) print(colored(f'{_T}{k:24s} {probs[k]:>12.4f} {lz:>12.3f}', c)) print(colored(f'{_T}Elapsed time : {elapsed_time}', c))
[docs] def create_dir(path): """Create a directory.""" try: os.mkdir(path) except OSError: err_msg = f"Creation of the directory {path:s} failed. " err_msg += "It might already exist" print(colored(err_msg, 'red')) pass else: print(colored(f"Created the directory {path:s}", 'blue')) pass pass
[docs] def execution_time(start): """Calculate run execution time.""" end = time.time() - start weeks, rest0 = end // 604800, end % 604800 days, rest1 = rest0 // 86400, rest0 % 86400 hours, rest2 = rest1 // 3600, rest1 % 3600 minutes, seconds = rest2 // 60, rest2 % 60 if weeks == 0: if days == 0: if hours == 0: if minutes == 0: elapsed = f'{seconds:.2f} seconds' else: elapsed = f'{minutes:.0f} minutes' elapsed += f' and {seconds:.2f} seconds' else: elapsed = f'{hours:.0f} hours' elapsed += f', {minutes:.0f} minutes' elapsed += f' and {seconds:.2f} seconds' else: elapsed = f'{days:.0f} days' elapsed += f', {hours:.0f} hours' elapsed += f', {minutes:.0f} minutes' elapsed += f' and {seconds:.2f} seconds' else: elapsed = f'{weeks:.0f} weeks' elapsed += f', {days:.0f} days' elapsed += f', {hours:.0f} hours' elapsed += f', {minutes:.0f} minutes' elapsed += f' and {seconds:.2f} seconds' return elapsed
[docs] def get_noise_name(filt): """Retrieve parameter name for white noise.""" if filt == 'TYCHO_B_MvB': return 'BT' if filt == 'TYCHO_V_MvB': return 'VT' if filt == 'SPITZER_IRAC_36': return 'IRAC 36' if filt == 'SPITZER_IRAC_45': return 'IRAC 45' if filt == 'NGTS_I': return 'NGTS' if filt == 'WISE_RSR_W1': return 'W1' if filt == 'WISE_RSR_W2': return 'W2' if 'SDSS' in filt or 'PS1' in filt: return filt return filt.split('_')[-1]
[docs] def out_filler(samp, logdat, param, name, out, fmt='f', fixed=False, method='averaged'): """Fill up the output file.""" if method not in ['averaged', 'samples']: raise Exception('Method is wrong!') if fixed is False: try: xx, pdf = estimate_pdf(samp) cdf = estimate_cdf(samp, hdr=True) best, lo, up = credibility_interval_hdr(xx, pdf, cdf, sigma=1) _, lo3, up3 = credibility_interval_hdr(xx, pdf, cdf, sigma=3) except ValueError: wrn = f'HDR failed for parameter {param}, reverting to regular CI' wrn += ' calculation. Be sure to check the histograms afterwards' wrn += ' for diagnosis.' print(colored(wrn, 'red')) best, lo, up = credibility_interval(samp) _, lo3, up3 = credibility_interval(samp, alpha=3) out[f'best_fit_{method}'][param] = best logdat += f'{name}\t{best:.4{fmt}}\t' out[f'uncertainties_{method}'][param] = (best - lo, up - best) logdat += f'{up - best:.4{fmt}}\t{best - lo:.4{fmt}}\t' out[f'confidence_interval_{method}'][param] = (lo3, up3) logdat += f'{lo:.4{fmt}}\t{up:.4{fmt}}\n' else: out[f'best_fit_{method}'][param] = fixed out[f'uncertainties_{method}'][param] = np.nan out[f'confidence_interval_{method}'][param] = np.nan logdat += f'{name}\t{fixed:.4{fmt}}\t' logdat += '(FIXED)\n' return logdat
def get_max_from_kde(samp): """Get maximum of the given distribution.""" raise DeprecationWarning() kde = gaussian_kde(samp) xmin = samp.min() xmax = samp.max() xx = np.linspace(xmin, xmax, 1000) kde = kde(xx) best = xx[kde.argmax()] return best