import numpy as np
from astropy.cosmology import FlatwCDM
from scipy.stats import norm, multivariate_normal, skewnorm
from dessn.framework.simulation import Simulation
[docs]class SimpleSimulation(Simulation):
def __init__(self, num_supernova, dscale=0.08, alpha_c=2, mass=True, num_nodes=4, lowz=False, min_prob_ia=0.9999999,
disable_selection=False, kappa0=0.03, kappa1=0.03):
super().__init__()
self.alpha_c = alpha_c
self.dscale = dscale
self.min_prob_ia = min_prob_ia
self.num_calib = 1
self.disable_selection = disable_selection
self.kappa0 = kappa0
self.kappa1 = kappa1
if lowz:
self.skewnorm = True
self.mb_alpha = 5.87
self.mb_mean = 13.72
self.mb_width = 1.35
self.power = 0.5,
self.max_z_gen = 0.01
self.min_z_gen = 0.0004
else:
self.skewnorm = False
self.mb_alpha = 4
self.mb_mean = 23.14
self.mb_width = 0.5
self.power = 0.3
self.max_z_gen = 1.0
self.min_z_gen = 0.008
self.mass_scale = 1.0 if mass else 0.0
self.num_nodes = num_nodes
self.num_supernova = num_supernova
[docs] def get_name(self):
return "simple"
[docs] def get_truth_values(self):
return [
("Om", 0.3, r"$\Omega_m$"),
("Ol", 0.7, r"$\Omega_\Lambda$"),
("w", -1.0, r"$w$", True, -1.5, -0.5),
("alpha", 0.14, r"$\alpha$"),
("delta_alpha", 0, r"$\delta_\alpha$"),
("beta", 3.1, r"$\beta$"),
("delta_beta", 0, r"$\delta_\beta$"),
("mean_MB", -19.365, r"$\langle M_B \rangle$"),
("outlier_MB_delta", 2, r"$\delta M_B$"),
("outlier_dispersion", np.array([1.0, 1.0, 0.7]), r"$\sigma_{\rm out}^{%d}$"),
("mean_x1", np.zeros(self.num_nodes), r"$\langle x_1^{%d} \rangle$"),
("mean_c", np.zeros(self.num_nodes), r"$\langle c^{%d} \rangle$"),
("sigma_MB", 0.1, r"$\sigma_{\rm m_B}$"),
("sigma_x1", 1.0, r"$\sigma_{x_1}$"),
("sigma_c", 0.1, r"$\sigma_c$"),
("log_sigma_MB", np.log(0.1), r"$\log\sigma_{\rm m_B}$"),
("log_sigma_x1", np.log(0.5), r"$\log\sigma_{x_1}$"),
("log_sigma_c", np.log(0.1), r"$\log\sigma_c$"),
("kappa_c0", self.kappa0, r"$\kappa_{c0}"),
("kappa_c1", self.kappa1, r"$\kappa_{c1}"),
("alpha_c", self.alpha_c, r"$\alpha_c$"),
("dscale", self.dscale, r"$\delta(0)$"),
("dratio", 0.5, r"$\delta(\infty)/\delta(0)$"),
("intrinsic_correlation", np.identity(3), r"$\rho$"),
("calibration", np.zeros(self.num_calib), r"$\delta \mathcal{Z}_%d$"),
("deltas", np.zeros(4), r"$\Delta_{%d}$")
]
[docs] def get_systematic_labels(self):
return [""] * self.num_calib
[docs] def get_all_supernova(self, n_sne, cosmology_index=0):
truth = self.get_truth_values_dict()
self.logger.info("Generating simple data for %d supernova, with skewness %d..." % (n_sne, truth["alpha_c"]))
np.random.seed(cosmology_index)
self.logger.info("Generating for cosmology index %d" % cosmology_index)
cosmology = FlatwCDM(70.0, truth["Om"])
# Unwrap some values
alpha, beta, dscale, dratio = truth["alpha"], truth["beta"], truth["dscale"], truth["dratio"]
sim_mBx1c, obs_mBx1c_cov, obs_mBx1c, deta_dcalib = [], [], [], []
redshifts_all, redshift_pre_comp_all, p_high_masses_all, mask_all, mbs_all, = [], [], [], [], []
ia_probs_all = []
sim_x1s_all, sim_cs_all = [], []
# Assume constant population.
means = np.array([truth["mean_MB"], truth["mean_x1"][0], truth["mean_c"][0]])
sigmas = np.array([truth["sigma_MB"], truth["sigma_x1"], truth["sigma_c"]])
sigmas_mat = np.dot(sigmas[:, None], sigmas[None, :])
correlations = np.dot(truth["intrinsic_correlation"], truth["intrinsic_correlation"].T)
pop_cov = correlations * sigmas_mat
probs = []
outlier_MB_delta, outlier_dispersion = truth["outlier_MB_delta"], truth["outlier_dispersion"]
nn = 2000
# Generate 1000 at a time
while True:
redshifts = (np.random.uniform(self.min_z_gen, self.max_z_gen, nn) ** self.power)
dist_mod = cosmology.distmod(redshifts).value
redshift_pre_comp = 0.9 + np.power(10, 0.95 * redshifts)
p_high_masses = np.random.uniform(low=0.0, high=1.0, size=dist_mod.size) * self.mass_scale
ia_probs = np.random.uniform(low=self.min_prob_ia, high=1.0, size=nn)
contaminations = np.random.random(nn) > ia_probs
for zz, mu, p, contamination, red in zip(redshift_pre_comp, dist_mod, p_high_masses, contaminations, redshifts):
while True:
MB, x1, c = np.random.multivariate_normal(means, pop_cov)
if np.random.random() < norm.cdf(truth["alpha_c"] * (c - truth["mean_c"][0]) / truth["sigma_c"], 0, 1):
skew_prob = norm.logcdf(truth["alpha_c"] * (c - truth["mean_c"][0]) / truth["sigma_c"], 0, 1)
break
probs.append(multivariate_normal.logpdf([MB, x1, c], mean=means, cov=pop_cov) + skew_prob)
if contamination:
MB -= outlier_MB_delta
adjust = np.random.normal(loc=0, scale=np.sqrt(outlier_dispersion**2 - sigmas**2), size=3)
MB += adjust[0]
x1 += adjust[1]
c += adjust[2]
mass_correction = dscale * (1.9 * (1 - dratio) / zz + dratio)
mb = MB + mu - alpha * x1 + beta * c - mass_correction * p
vector = np.array([mb, x1, c])
# Add intrinsic scatter to the mix
diag = np.array([0.04, 0.2, 0.03])**2
cov = np.diag(diag)
obs_mBx1c_cov.append(cov)
sim_mBx1c.append(vector)
cov2 = cov.copy()
cov2[2, 2] += (self.kappa0 + self.kappa1 * red)**2
vector += np.random.multivariate_normal([0, 0, 0], cov2)
obs_mBx1c.append(vector)
deta_dcalib.append(np.zeros((3, self.num_calib)))
redshifts_all += redshifts.tolist()
redshift_pre_comp_all += redshift_pre_comp.tolist()
p_high_masses_all += p_high_masses.tolist()
ia_probs_all += ia_probs.tolist()
mbs = np.array([o[0] for o in sim_mBx1c[-nn:]])
sim_x1 = np.array([o[1] for o in sim_mBx1c[-nn:]])
sim_c = np.array([o[2] for o in sim_mBx1c[-nn:]])
vals = np.random.uniform(size=mbs.size)
if self.disable_selection:
pdfs = np.ones(mbs.shape)
else:
if not self.skewnorm:
pdfs = 1 - norm.cdf(mbs, self.mb_mean, self.mb_width)
pdfs /= pdfs.max()
else:
pdfs = skewnorm.pdf(mbs, self.mb_alpha, self.mb_mean, self.mb_width)
pdfs /= pdfs.max()
mask = vals < pdfs
mbs_all += mbs.tolist()
sim_x1s_all += sim_x1.tolist()
sim_cs_all += sim_c.tolist()
mask_all += mask.tolist()
self.logger.debug("Have %d passed out of required %d sne, generated %d"
% (np.array(mask_all).sum(), n_sne, len(mask_all)))
if np.array(mask_all).sum() >= n_sne:
break
indexes = np.array(mask_all).cumsum()
cut_index = np.where(indexes == n_sne)[0][0] + 1
mask_all = np.array(mask_all)
self.logger.debug("Generated %d objects out of %d passed, %d percent" % (mask_all.sum(), mask_all.size, 100 * (mask_all.sum() / mask_all.size)))
return {
"n_sne": n_sne,
"obs_mBx1c": np.array(obs_mBx1c[:cut_index]),
"obs_mBx1c_cov": np.array(obs_mBx1c_cov[:cut_index]),
"deta_dcalib": np.array(deta_dcalib[:cut_index]),
"redshifts": np.array(redshifts_all[:cut_index]),
"shift_deltas": np.zeros(cut_index),
"masses": np.array(p_high_masses_all[:cut_index]),
"existing_prob": np.array(probs[:cut_index]),
"sim_apparents": np.array(mbs_all[:cut_index]),
"sim_stretches": np.array(sim_x1s_all[:cut_index]),
"sim_colours": np.array(sim_cs_all[:cut_index]),
"passed": np.array(mask_all[:cut_index]),
"prob_ia": np.array(ia_probs_all[:cut_index])
}
[docs] def get_approximate_correction(self):
if not self.skewnorm:
print("ccdf approx")
return False, np.array([self.mb_mean, self.mb_width, 0.1, 1]), 0.0001 * np.eye(4), 0
else:
print("skewnorm approx")
return True, np.array([self.mb_mean, self.mb_width, self.mb_alpha, 1.0]), 0.0001 * np.eye(4), 0
[docs] def get_systematic_names(self):
return [r"{%d}" % i for i in range(self.num_calib)]