Source code for MiraTitanHMFemulator.MiraTitanHMFemulator

import numpy as np
import os
import inspect

from scipy.interpolate import RectBivariateSpline

from . import GP_matrix as GP


class Emulator:
    """The Mira-Titan Universe emulator for the halo mass function.

    Attributes
    -----------------
    param_limits : dictionary
        Lower and upper limits of the cosmological parameters.
    z_arr : array
        Redshifts of the emulator output.
    """
    # Cosmology parameters
    param_names = ['Ommh2', 'Ombh2', 'Omnuh2', 'n_s', 'h', 'sigma_8', 'w_0', 'w_b']
    param_limits = {
        'Ommh2': (.12, .155),
        'Ombh2': (.0215, .0235),
        'Omnuh2': (0, .01),
        'n_s': (.85, 1.05),
        'h': (.55, .85),
        'sigma_8': (.7, .9),
        'w_0': (-1.3, -.7),
        'w_b': (.3, 1.3),
    }
    # Emulator redshifts
    z_arr = np.array([2.02, 1.61, 1.01, 0.656, 0.434, 0.242, 0.101, 0.0])
    z_arr_asc = z_arr[::-1]



[docs] def __init__(self): """No arguments are required when initializing an `Emulator` instance. Upon initialization, a bunch of matrix operations are performed which takes a few seconds.""" data_path = os.path.join(os.path.dirname(os.path.abspath(inspect.stack()[0][1])), 'data') # Basis functions and PCA standardization parameters # They have different lengths so they are stored in separate files self.__PCA_means, self.__PCA_transform = [], [] for i in range(len(self.z_arr)): filename = os.path.join(data_path, 'PCA_mean_std_transform_%d.npy'%i) _tmp = np.load(filename) self.__PCA_means.append(_tmp[0,:]) self.__PCA_transform.append(_tmp[1:,:]) self.__GP_means = np.load(os.path.join(data_path, 'GP_params_mean.npy')) self.__GP_std = np.load(os.path.join(data_path, 'GP_params_std.npy')) self.__facs = np.load(os.path.join(data_path, 'facs.npy')) # GP input data params_design = np.load(os.path.join(data_path, 'params_design_w0wb.npy')) hyper_params = np.load(os.path.join(data_path, 'hyperparams.npy')) input_means = np.load(os.path.join(data_path, 'means.npy')) cov_mat_data = np.load(os.path.join(data_path, 'cov_n.npy')) N_PC = input_means.shape[2] self.__GPreg = [GP.GaussianProcess(params_design, input_means[z_id], cov_mat_data[z_id], hyper_params[z_id,:N_PC], hyper_params[z_id,N_PC:].reshape(N_PC,-1)) for z_id in range(len(self.z_arr))]
[docs] def predict(self, requested_cosmology, z, m, get_errors=True, N_draw=1000): """Emulate the halo mass function dn/dlnM for the desired set of cosmology parameters, redshifts, and masses. :param requested_cosmology: The set of cosmology parameters for which the mass function is requested. The parameters are `Ommh2`, `Ombh2`, `Omnuh2`, `n_s`, `h`, `sigma_8`, `w_0`, `w_a`. :type requested_cosmology: dict :param z: The redshift(s) for which the mass function is requested. :type z: float or array :param m: The mass(es) for which the mass function is requested, in units [Msun/h]. :type z: float or array :param get_errors: Whether or not to compute error estimates (faster in the latter case). Default is `True`. :type get_errors: bool, optional :param N_draw: How many sample mass functions to draw when computing the error estimate. Applies only if `get_errors` is `True`. :type N_draw: int, optional Returns ------- HMF: array_like The mass function dN/dlnM in units[(h/Mpc)^3] and with shape [len(z), len(m)]. HMF_rel_err: array_like The relative error on dN/dlnM, with shape [len(z), len(m)]. Returns 0 if `get_errors` is `False`. For requested redshifts that are between the redshifts for which the underlying emulator is defined, the weighted errors from the neighboring redshifts are added in quadrature. """ # Validate requested z and m if np.any(z<0): raise ValueError("z must be >= 0") if np.any(z>self.z_arr_asc[-1]): raise ValueError("z must be <= 2.02") if np.any(m<1e13): raise ValueError("m must be >= 1e13") if np.any(m>1e16): raise ValueError("m must be <= 1e16") z = np.atleast_1d(z) m = np.atleast_1d(m) # Do we want error estimates? if not get_errors: N_draw = 0 # Call the actual emulator emu_dict = self.predict_raw_emu(requested_cosmology, N_draw=N_draw) # Set up interpolation grids HMF_interp_input = np.log(np.nextafter(0,1)) * np.ones((len(self.z_arr_asc), 3001)) for i,emu_z in enumerate(self.z_arr_asc): HMF_interp_input[i,:len(emu_dict[emu_z]['HMF'])] = np.log(emu_dict[emu_z]['HMF']) HMF_interp = RectBivariateSpline(self.z_arr_asc, np.linspace(13, 16, 3001), HMF_interp_input, kx=1, ky=1) if get_errors: HMFerr_interp_input = np.zeros((len(self.z_arr_asc), 3001)) for i,emu_z in enumerate(self.z_arr_asc): HMFerr_interp_input[i,:len(emu_dict[emu_z]['HMF_std'])] = emu_dict[emu_z]['HMF_std'] HMFerr_interp = RectBivariateSpline(self.z_arr_asc, np.linspace(13, 16, 3001), HMFerr_interp_input, kx=1, ky=1) # Call interpolation at input z and m HMF_out = np.exp(HMF_interp(z, np.log10(m))) HMFerr_out = np.zeros((len(z), len(m))) if get_errors: # First interpolate to requested m HMFerr_at_m = HMFerr_interp(self.z_arr_asc, np.log10(m)) # Now add weighted errors in quadrature for z_id,this_z in enumerate(z): z_id_nearest = np.argsort(np.abs(self.z_arr_asc-this_z))[:2] Delta_z = (this_z - self.z_arr_asc[z_id_nearest])/np.diff(self.z_arr_asc[z_id_nearest]) HMFerr_out[z_id] = np.sqrt((HMFerr_at_m[z_id_nearest[0],:]*Delta_z[0])**2 + (HMFerr_at_m[z_id_nearest[1],:]*Delta_z[1])**2) return HMF_out, HMFerr_out
[docs] def predict_raw_emu(self, requested_cosmology, N_draw=0, return_draws=False): """Emulates the halo mass function dn/dlnM for the desired set of cosmology parameters and returns an output dictionary. This function allows the user to have more fine-grained control over the raw emulator output. :param requested_cosmology: The set of cosmology parameters for which the mass function is requested. The parameters are `Ommh2`, `Ombh2`, `Omnuh2`, `n_s`, `h`, `sigma_8`, `w_0`, `w_a`. :type requested_cosmology: dict :param N_draw: The emulator can provide error estimates for its mass function prediction. If `N_draw` is provided, the emulator also provides `N_draw` samples of the mass function and the standard deviation of those estimates. :type N_draw: int, optional :param return_draws: If True, also return the mass function draws from the emulator posterior distribution. Default is False to save memory. Applies only if `N_draw` is > 0. :type return_draws: bool, optional :returns: A dictionary containing all the emulator output. A `readme` key describes the units: The mass functions are dn/dlnM [(h/Mpc)^3]. The output is organized by redshift -- each dictionary key corresponds to one redshift. For each redshift, the output contains the following data: redshift: float The redshift of the emulator output. log10_M: array_like Decadic logarithm log10(mass [Msun/h]) for which the mass function is emulated. HMF: array_like The mass function corresponding to the mean emulated parameters. HMF_mean: array_like, optional The mass function corresponding to the mean of the mass functions drawn from the emulator posterior distribution. Only if `N_draw` is > 0. HMF_std: array_like, optional The (relative) standard deviation in the mass function draws from the emulator posterior distribution. Should be used as a relative error on the mass function. Only if `N_draw` is > 0. HMF_draws: ndarray, optional Mass function realizations drawn from the emulator posterior distribution. The return values `HMF_mean` and `HMF_std` are computed from these. Only if `N_draw` is > 0 and if `return_draws` is True. :rtype: dict """ # Validate and normalize requested cosmology requested_cosmology_normed = self.__normalize_params(requested_cosmology) output = {'Units': "log10_M is log10(Mass in [Msun/h]), HMFs are given in dn/dlnM [(h/Mpc)^3]"} log10_M_full = np.linspace(13, 16, 3001) for i,emu_z in enumerate(self.z_arr): output[emu_z] = {'redshift': emu_z, 'log10_M': log10_M_full[:len(self.__PCA_means[i])],} # Call the GP wstar, wstar_covmat = self.__GPreg[i].predict(requested_cosmology_normed) wstar_covmat*= self.__facs[i] # De-standardize to GP input PC_weight = wstar * self.__GP_std[i] + self.__GP_means[i] # PCA transform output[emu_z]['HMF'] = np.exp(np.dot(PC_weight, self.__PCA_transform[i]) + self.__PCA_means[i]) # Draw parameter realizations if N_draw>0: wstar_draws = np.random.multivariate_normal(wstar, wstar_covmat, N_draw) PC_weight_draws = wstar_draws * self.__GP_std[i] + self.__GP_means[i] HMF_draws = np.exp(np.dot(PC_weight_draws, self.__PCA_transform[i]) + self.__PCA_means[i]) # Replace infinites with nan to be able to get mean and std idx = [np.all(np.isfinite(HMF_draws[j])) for j in range(N_draw)] output[emu_z]['HMF_draws'] = HMF_draws[idx] # Compute statistics output[emu_z]['HMF_mean'] = np.mean(output[emu_z]['HMF_draws'], axis=0) output[emu_z]['HMF_std'] = np.std(output[emu_z]['HMF_draws']/output[emu_z]['HMF_mean'], axis=0) if not return_draws: del output[emu_z]['HMF_draws'] return output
def __translate_params(self, cosmo_dict): """Copy cosmology parameter variables defined without underscores to variable names with underscore, which is the default naming scheme. If both variable flavors are provided, check for consistency. :param cosmo_dict: The input set of cosmology parameters. :type cosmo_dict: dict :return: Whether duplicate variables are consistent or not. :rtype: bool """ for var_w, var_wo in zip(['w_0', 'w_a', 'n_s', 'sigma_8'], ['w0', 'wa', 'ns', 'sigma8']): if var_wo in cosmo_dict.keys(): if var_w in cosmo_dict.keys(): if not np.isclose(cosmo_dict[var_wo], cosmo_dict[var_w]): return False else: cosmo_dict[var_w] = cosmo_dict[var_wo] return True
[docs] def validate_params(self, cosmo_dict): """Validate that a given input cosmology dictionary is complete and within the bounds of the Mira-Titan Universe design. Note that this function is only there for user convenience and is not called by the emulator code (which will raise an error and explain its cause). :param cosmo_dict: The set of cosmology parameters to validate. :type cosmo_dict: dict :return: Whether the provided dictionary is valid or not. :rtype: bool """ # Translate parameter names where necessary if not self.__translate_params(cosmo_dict): return False # Validate joint limit in w_0, w_a, then add w_b for param in ['w_0', 'w_a']: if param not in cosmo_dict.keys(): return False if cosmo_dict['w_a'] > -cosmo_dict['w_0']: return False cosmo_dict['w_b'] = (-cosmo_dict['w_0'] - cosmo_dict['w_a'])**.25 # Check all keys and parameter ranges for param in self.param_limits.keys(): if param not in cosmo_dict.keys(): return False if cosmo_dict[param]<self.param_limits[param][0]: return False if cosmo_dict[param]>self.param_limits[param][1]: return False return True
def __normalize_params(self, cosmo_dict): """Check that a given input cosmology dictionary is complete and within the bounds of the Mira-Titan Universe design and return the normalized cosmological parameter array.""" # Translate parameter names where necessary if not self.__translate_params(cosmo_dict): raise ValueError("At least one variable is provided twice but with inconsistent values") # Validate joint limit in w_0, w_a, then add w_b for param in ['w_0', 'w_a']: if param not in cosmo_dict.keys(): raise KeyError("You did not provide %s"%param) if cosmo_dict['w_a'] > -cosmo_dict['w_0']: raise ValueError("w_0 + w_a must be <0. You have w_0 %.4f and w_a %.4f"%(cosmo_dict['w_0'], cosmo_dict['w_a'])) cosmo_dict['w_b'] = (-cosmo_dict['w_0'] - cosmo_dict['w_a'])**.25 # Check keys and parameter ranges for param in self.param_limits.keys(): if param not in cosmo_dict.keys(): raise KeyError("You did not provide %s"%param) if cosmo_dict[param]<self.param_limits[param][0]: raise ValueError("Parameter %s is %.4f but must be >= %.4f"%(param, cosmo_dict[param], self.param_limits[param][0])) if cosmo_dict[param]>self.param_limits[param][1]: raise ValueError("Parameter %s is %.4f but must be <= %.4f"%(param, cosmo_dict[param], self.param_limits[param][1])) # Normalize the parameters normed_p = np.empty(len(self.param_limits.keys())) for i,param in enumerate(self.param_names): normed_p[i] = (cosmo_dict[param] - self.param_limits[param][0]) / (self.param_limits[param][1] - self.param_limits[param][0]) return normed_p