Source code for dessn.framework.simulations.snana

import numpy as np
import os
import inspect
import pickle

from scipy.interpolate import interp1d
from scipy.stats import binned_statistic

from dessn.framework.simulation import Simulation
from dessn.general.pecvelcor import get_sigma_mu_pecvel
from dessn.framework.simulations.selection_effects import des_sel, lowz_sel


[docs]class SNANASimulation(Simulation): def __init__(self, num_supernova, sim_name, num_nodes=4, use_sim=False, cov_scale=1.0, global_calib=13, shift=None, type="G10", kappa=0.0, bias_cor=True, zlim=None, add_pecv=True, add_disp=False): super().__init__() self.simulation_name = sim_name self.type = type self.global_calib = global_calib this_dir = os.path.dirname(os.path.abspath(inspect.stack()[0][1])) self.data_folder = this_dir + "/snana_data/%s/" % self.simulation_name assert os.path.exists(self.data_folder), "Cannot find folder %s" % self.data_folder self.use_sim = use_sim self.num_nodes = num_nodes self.systematic_labels = None self.kappa = kappa self.num_supernova = num_supernova self.cov_scale = cov_scale self.manual_selection = None self.shift = shift self.get_systematic_names() self.num_calib = len(self.systematic_labels) self.bias_cor = bias_cor self.zlim = zlim self.add_disp = add_disp self.add_pecv = add_pecv if self.num_calib == 0: self.num_calib = 1
[docs] def get_correction(self, cov_scale=1.0): version = self.simulation_name.lower().split("_")[-1] if not version.startswith("v"): version = None if "_des" in self.simulation_name.lower(): self.logger.info("Getting DES correction for sim %s" % self.simulation_name) return des_sel(cov_scale=cov_scale, shift=self.shift, type=self.type, kappa=self.kappa, zlim=self.zlim, version=version) elif "_lowz" in self.simulation_name.lower(): self.logger.info("Getting LOWZ correction for sim %s" % self.simulation_name) return lowz_sel(cov_scale=cov_scale, shift=self.shift, type=self.type, kappa=self.kappa, zlim=self.zlim, version=version) else: raise ValueError("Cannot find des or lowz in your sim name, unsure which selection function to use!")
[docs] def get_name(self): return self.simulation_name
[docs] def get_truth_values(self): return [ ("Om", 0.3, r"$\Omega_m$"), ("Ol", 0.7, r"$\Omega_\Lambda$"), ("w", -1.0, r"$w$", True, -1.5, -0.5), ("alpha", 0.15, r"$\alpha$"), ("alpha_z", 0.0, r"$\alpha_z$"), ("beta", 3.1, r"$\beta$"), ("beta_z", 0.0, r"$\beta_z$"), ("mean_MB", -19.365, r"$\langle M_B \rangle$"), ("mean_x1", np.zeros(self.num_nodes), r"$\langle x_1^{%d} \rangle$"), ("mean_c", np.zeros(self.num_nodes), r"$\langle c^{%d} \rangle$"), ("sigma_MB", 0.1, r"$\sigma_{\rm m_B}$"), ("sigma_x1", 1.0, r"$\sigma_{x_1}$"), ("sigma_c", 0.1, r"$\sigma_c$"), ("log_sigma_MB", np.log(0.1), r"$\log\sigma_{\rm m_B}$"), ("log_sigma_x1", np.log(0.5), r"$\log\sigma_{x_1}$"), ("log_sigma_c", np.log(0.1), r"$\log\sigma_c$"), ("alpha_c", 0, r"$\alpha_c$"), ("dscale", 0, r"$\delta(0)$"), ("dratio", 0.5, r"$\delta(\infty)/\delta(0)$"), ("intrinsic_correlation", np.identity(3), r"$\rho$"), ("calibration", np.zeros(self.num_calib), r"$\delta \mathcal{Z}_%d$") ]
[docs] def get_systematic_names(self): if self.systematic_labels is None: this_dir = os.path.dirname(os.path.abspath(inspect.stack()[0][1])) filename = this_dir + "/snana_data/%s/sys_names.pkl" % self.simulation_name with open(filename, 'rb') as f: names = pickle.load(f) self.systematic_labels = names return self.systematic_labels
[docs] def get_all_supernova(self, n_sne, cosmology_index=0): self.logger.info("Getting SNANA data from %s" % self.data_folder) supernovae_files = [np.load(self.data_folder + "/" + f) for f in os.listdir(self.data_folder) if f.startswith("all")] supernovae = np.vstack(tuple(supernovae_files)) passed = supernovae > 100 mags = supernovae - 100 * passed.astype(np.int) res = { "sim_apparents": mags, "passed": passed } return res
[docs] def get_bias_cor(self, des=True): print("Getting biascor for des=%s" % des) if des: file = self.data_folder + "../DES3YR_DES_BHMEFF_AM%s/passed_0.npy" else: file = self.data_folder + "../DES3YR_LOWZ_BHMEFF_%s/passed_0.npy" models = ["C11", "G10"] bine = 30 means = [] for model in models: data = np.load(file % model) z = data[:, 1] c_obs = data[:, 8] c_true = data[:, 5] diff = c_obs - c_true mean, bine, _ = binned_statistic(z, diff, bins=bine) means.append(mean) std, _, _ = binned_statistic(z, diff, statistic=lambda x: np.std(x) / np.sqrt(len(x)), bins=bine) middle = np.array(means) binc = 0.5 * (bine[1:] + bine[:-1]) return binc, models[0], middle[0]
[docs] def get_disp(self, des=True): print("Getting dispersion for des=%s" % des) if des: file = self.data_folder + "../DES3YR_DES_BHMEFF_AM%s/passed_0.npy" else: file = self.data_folder + "../DES3YR_LOWZ_BHMEFF_%s/passed_0.npy" models = ["C11", "G10"] bine = 10 means = [] for model in models: data = np.load(file % model) z = data[:, 1] mb_obs = data[:, 6] mb_true = data[:, 3] c_obs = data[:, 8] c_true = data[:, 5] c_std = np.sqrt(data[:, 12 + 8]) rms = np.abs(c_obs - c_true) mean_rms, bine, _ = binned_statistic(z, rms, bins=bine) mean_cstd, _, _ = binned_statistic(z, c_std, bins=bine) extra = mean_rms**2 - mean_cstd**2 print(model, extra) means.append(extra) final = means[0] - means[1] final = np.array([max(0, i) for i in final]) binc = 0.5 * (bine[1:] + bine[:-1]) print("EXTRA ", np.sqrt(final), mean_rms) return binc, final
[docs] def get_passed_supernova(self, n_sne, cosmology_index=0): filename = self.data_folder + "passed_%d.npy" % cosmology_index assert os.path.exists(filename), "Cannot find file %s, do you have this realisations?" % filename supernovae = np.load(filename) self.logger.info("%s SN in %s" % (supernovae.shape[0], filename)) if self.zlim is not None: redshifts = supernovae[:, 1] self.logger.info("Enforcing zlim of %0.2f" % self.zlim) mask = redshifts < self.zlim self.logger.info("%d supernova out of %d passed the redshift cut" % (mask.sum(), supernovae.shape[0])) supernovae = supernovae[mask, :] if n_sne != -1: supernovae = supernovae[:n_sne, :] else: n_sne = supernovae.shape[0] self.logger.info("All SN requested: found %d SN" % n_sne) cids = supernovae[:, 0] redshifts = supernovae[:, 1] masses = supernovae[:, 2] s_ap = supernovae[:, 3] s_st = supernovae[:, 4] s_co = supernovae[:, 5] apparents = supernovae[:, 6] stretches = supernovae[:, 7] colours = supernovae[:, 8] # bias_mB = supernovae[:, 9] # bias_x1 = supernovae[:, 10] # bias_c = supernovae[:, 11] extra_uncert = get_sigma_mu_pecvel(redshifts) obs_mBx1c_cov, obs_mBx1c, deta_dcalibs = [], [], [] shift_amount = np.zeros(redshifts.shape) shift_deltas = np.zeros(redshifts.shape) extra_colour_add = np.zeros(redshifts.shape) if self.add_disp: disp_z, disp_ratio = self.get_disp("_DES" in self.simulation_name) extra_colour_add = interp1d(disp_z, disp_ratio, bounds_error=False, fill_value=(disp_ratio[0], disp_ratio[-1]))(redshifts) if self.bias_cor: # apparents -= bias_mB # stretches -= bias_x1 # colours -= bias_c cor_z, cor_models, cor_means = self.get_bias_cor("_DES" in self.simulation_name) # shift0 = cor_means[0] # shift_amount = interp1d(cor_z, shift0, bounds_error=False, fill_value=(shift0[0], shift0[-1]))(redshifts) # delta = cor_means[1] - cor_means[0] # shift_deltas = interp1d(cor_z, delta, bounds_error=False, fill_value=(delta[0], delta[-1]))(redshifts) shift_deltas = interp1d(cor_z, cor_means, bounds_error=False, fill_value=(cor_means[0], cor_means[-1]))(redshifts) for i, (mb, x1, c, smb, sx1, sc, eu, sa) in enumerate(zip(apparents, stretches, colours, s_ap, s_st, s_co, extra_uncert, shift_amount)): if self.use_sim: cov = np.diag(np.array([0.04, 0.1, 0.04]) ** 2) vector = np.array([smb, sx1, sc]) + np.random.multivariate_normal([0, 0, 0], cov) else: vector = np.array([mb, x1, c - sa]) cov = supernovae[i, 12:12 + 9].reshape((3, 3)) cov[2, 2] += extra_colour_add[i] if self.add_pecv: cov[0, 0] += eu**2 calib = supernovae[i, 12+9:].reshape((3, -1)) obs_mBx1c_cov.append(cov) obs_mBx1c.append(vector) deta_dcalibs.append(calib) covs = np.array(obs_mBx1c_cov) deta_dcalibs = np.array(deta_dcalibs) obs_mBx1c = np.array(obs_mBx1c) result = { "cids": cids, "n_sne": n_sne, "obs_mBx1c": obs_mBx1c, "obs_mBx1c_cov": covs, "deta_dcalib": deta_dcalibs, "redshifts": redshifts, "masses": masses, "sim_apparents": s_ap, "sim_stretches": s_st, "sim_colours": s_co, "shift_deltas": shift_deltas, "prob_ia": np.ones(n_sne) * 1.0 } return result
[docs] def get_approximate_correction(self): if self.manual_selection is None: self.manual_selection = self.get_correction(cov_scale=self.cov_scale) return self.manual_selection