"""Catalog-agnostic raytracing module
"""
import os
from abc import ABC, abstractmethod
import numpy as np
import pandas as pd
import n2j.trainval_data.utils.raytracing_utils as ru
__all__ = ['BaseRaytracer']
[docs]class BaseRaytracer(ABC):
"""Base class for raytracer tools. Not to be instantiated on its own.
Child classes inherit catalog-agnostic methods from this class.
"""
def __init__(self,
in_dir: str = '.',
out_dir: str = '.',
debug: bool = False,):
"""
Parameters
----------
out_dir : str
destination folder
test : bool
test mode
"""
self.in_dir = in_dir
self.out_dir = out_dir
if not os.path.exists(self.out_dir):
os.makedirs(self.out_dir, exist_ok=True)
self.debug = debug
# Paths for output data files
self._define_paths()
# Optionally overridden
self.halo_satisfies = []
[docs] def _define_paths(self):
"""Define output file paths and path formats
"""
self.sightlines_path = os.path.join(self.out_dir, 'sightlines.npy')
self.Y_path = os.path.join(self.out_dir, 'Y.csv')
self.halo_path_fmt = os.path.join(self.out_dir,
'halos_los={0:07d}_id={1:012d}.npy')
self.k_map_fmt = os.path.join(self.out_dir,
'k_map_los={0:07d}.npy')
self.g1_map_fmt = os.path.join(self.out_dir,
'g1_map_los={0:07d}.npy')
self.g2_map_fmt = os.path.join(self.out_dir,
'g2_map_los={0:07d}.npy')
self.k_samples_fmt = os.path.join(self.out_dir,
'k_samples_los={0:07d}.npy')
self.k_samples_map_fmt = os.path.join(self.out_dir,
'k_map_los={0:07d}_sample={1:04d}.npy')
self.uncalib_path = os.path.join(self.out_dir, 'uncalib.csv')
# out_dir of a separate run that had n_kappa_samples > 0
# that can be used to estimate the mapping between weighted sum of halo
# masses and mean of kappa samples.
@abstractmethod
[docs] def get_pointings_iterator(self):
return NotImplementedError
@abstractmethod
[docs] def get_halos_iterator(self):
return NotImplementedError
[docs] def apply_calibration(self):
"""Subtract off the extra mass added when we raytraced through
parameterized halos
"""
Y = pd.DataFrame(np.load(self.sightlines_path),
columns=self.pointings_cols) # init with pointings df
uncalib = pd.read_csv(self.uncalib_path,
index_col=None,
).drop_duplicates('idx').sort_values(by=['idx'])
mean_kappas = self.get_mean_kappas()
# To the WL quantities, add our raytracing and subtract mean mass
final_kappa = uncalib['kappa'].values + Y['kappa'].values - mean_kappas
final_gamma1 = uncalib['gamma1'].values + Y['gamma1'].values
final_gamma2 = uncalib['gamma2'].values + Y['gamma2'].values
# Also log the mean kappa contribution of halos, for fitting later
Y['mean_kappa'] = mean_kappas
Y['final_kappa'] = final_kappa
Y['final_gamma1'] = final_gamma1
Y['final_gamma2'] = final_gamma2
# Update the Y df
Y[self.Y_cols].to_csv(self.Y_path, index=None)
[docs] def get_mean_kappas(self):
"""Get the mean kappa contribution of LOS halos
"""
if self.n_kappa_samples > 0:
return self.load_mean_kappas_from_file()
else:
return self.estimate_mean_kappas()
[docs] def load_mean_kappas_from_file(self):
"""Load the mean kappa contribution of LOS halos from the saved files
from explicit computation
"""
pointings = pd.DataFrame(np.load(self.sightlines_path),
columns=self.pointings_cols)
n_sightlines = pointings.shape[0]
mean_kappas = np.empty(n_sightlines)
# Compute mean kappa of halos in each sightline
for los_i in range(n_sightlines):
samples = np.load(self.k_samples_fmt.format(los_i))
samples = samples[samples < 0.5] # remove overdense outliers
mean_kappas[los_i] = np.mean(samples)
return mean_kappas
[docs] def estimate_mean_kappas(self):
"""Estimate the mean kappas from a set of "training" pairs generated in
another run
"""
# Fit a model using the kappa_sampling run as the training data
uncalib_train_path = os.path.join(self.kappa_sampling_dir, 'uncalib.csv')
X_train = pd.read_csv(uncalib_train_path,
usecols=['idx', 'weighted_mass_sum'],
index_col=None,
).drop_duplicates('idx').sort_values(by=['idx'])
X_train = X_train['weighted_mass_sum'].values.reshape(-1, 1)
Y_train_path = os.path.join(self.kappa_sampling_dir, 'Y.csv')
Y_train = pd.read_csv(Y_train_path,
usecols=['mean_kappa'],
index_col=None,
).values
fit_model = ru.fit_gp(X_train, Y_train)
# Predict on this run
X_test = pd.read_csv(self.uncalib_path,
usecols=['idx', 'weighted_mass_sum'],
index_col=None,
).drop_duplicates('idx').sort_values(by=['idx'])
X_test = X_test['weighted_mass_sum'].values.reshape(-1, 1)
mean_kappas = ru.approx_mean_kappa(fit_model, X_test, seed=self.seed)
if self.debug:
import matplotlib.pyplot as plt
plt.scatter(X_train, Y_train, color='b', label='train')
plt.scatter(X_test, mean_kappas, color='r', label='test')
plt.legend()
plt.show()
plt.savefig('mean_kappa_interp_hp={:d}.png'.format(self.healpix))
return mean_kappas