Source code for master_thesis_code.bayesian_inference.bayesian_statistics

"""Hubble constant posterior evaluation.

:class:`BayesianStatistics` loads saved Cramér-Rao bounds and orchestrates the
full Hubble-constant posterior evaluation using the real GLADE galaxy catalog,
simulation-based :class:`~master_thesis_code.bayesian_inference.simulation_detection_probability.SimulationDetectionProbability`,
full Fisher-matrix covariance, and multiprocessing.

Invoked via ``main.py:evaluate()`` / ``--evaluate`` CLI flag.
Output is written to ``simulations/posteriors/`` as JSON.
"""

import csv
import json
import logging
import math
import multiprocessing as mp
import os
import time
from typing import Any

import numpy as np
import numpy.typing as npt
import pandas as pd
from scipy.integrate import dblquad, fixed_quad, quad
from scipy.stats import multivariate_normal, norm

from master_thesis_code.bayesian_inference.simulation_detection_probability import (
    SimulationDetectionProbability,
)
from master_thesis_code.constants import (
    CRAMER_RAO_BOUNDS_OUTPUT_PATH,
    INJECTION_DATA_DIR,
    PREPARED_CRAMER_RAO_BOUNDS_PATH,
    SNR_THRESHOLD,
    H,
)
from master_thesis_code.cosmological_model import LamCDMScenario, Model1CrossCheck
from master_thesis_code.datamodels.detection import (
    Detection,
    _sky_localization_uncertainty,
)
from master_thesis_code.galaxy_catalogue.glade_completeness import GladeCatalogCompleteness
from master_thesis_code.galaxy_catalogue.handler import (
    GalaxyCatalogueHandler,
    HostGalaxy,
)
from master_thesis_code.physical_relations import (
    comoving_volume_element,
    dist,
    dist_to_redshift,
    dist_vectorized,
    get_redshift_outer_bounds,
)

_LOGGER = logging.getLogger()

DEFAULT_GALAXY_Z_ERROR = 0.0015
GALAXY_LIKELIHOODS = "galaxy_likelihoods"
ADDITIONAL_GALAXIES_WITHOUT_BH_MASS = "additional_galaxies_without_bh_mass"

FRACTIONAL_LUMINOSITY_DISTANCE_ERROR_THRESHOLD = 0.10

# Fixed-quad order for D(h) precomputation
_DH_QUAD_ORDER: int = 100


[docs] def precompute_completion_denominator( h_values: list[float], detection_probability_obj: SimulationDetectionProbability, Omega_m: float, Omega_DE: float, *, quad_n: int = _DH_QUAD_ORDER, ) -> dict[float, float]: """Precompute the completion-term denominator D(h) for each h value. Gray et al. (2020), arXiv:1908.06050, Eq. A.19: Denominator integrates P_det * dVc/dz over the full detectable volume. .. math:: D(h) = \\int_{z_{\\min}}^{z_{\\max}(h)} P_{\\det}(d_L(z,h)) \\frac{dV_c}{dz\\,d\\Omega}\\, dz where ``z_max(h)`` is the redshift corresponding to the P_det grid's maximum ``d_L`` at the given h. Args: h_values: List of Hubble parameter values to evaluate. detection_probability_obj: SimulationDetectionProbability instance (must have ``get_dl_max`` and ``detection_probability_without_bh_mass_interpolated_zero_fill``). Omega_m: Matter density parameter. Omega_DE: Dark energy density parameter. quad_n: Gauss-Legendre quadrature order (default 100). Returns: Dict mapping h -> D(h) in units of Mpc^3/sr. """ D_h_table: dict[float, float] = {} for h in h_values: dl_max = detection_probability_obj.get_dl_max(h) z_max = dist_to_redshift(dl_max, h=h) z_min = 1e-6 def _denom_integrand( z: npt.NDArray[np.float64], _h: float = h, ) -> npt.NDArray[np.float64]: d_L: npt.NDArray[np.float64] = np.asarray( dist_vectorized(z, h=_h), dtype=np.float64 ) # Gpc phi = np.zeros_like(z) # marginalized; value does not matter theta = np.zeros_like(z) p_det = detection_probability_obj.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, phi, theta, h=_h ) dVc: npt.NDArray[np.float64] = np.atleast_1d( np.asarray(comoving_volume_element(z, h=_h), dtype=np.float64) ) return np.asarray(p_det, dtype=np.float64) * dVc D_h: float = fixed_quad(_denom_integrand, z_min, z_max, n=quad_n)[0] D_h_table[h] = D_h _LOGGER.info( "D(h=%.4f) = %.6e [z_max=%.4f, dl_max=%.4f Gpc]", h, D_h, z_max, dl_max, ) # --- Red flag checks --- D_values = list(D_h_table.values()) if any(d <= 0 for d in D_values): _LOGGER.warning( "D(h) <= 0 for some h values: %s", {h: d for h, d in D_h_table.items() if d <= 0}, ) if len(D_values) > 1: ratio = max(D_values) / max(min(D_values), 1e-300) if ratio > 10: _LOGGER.warning("D(h) varies by %.1fx across h grid (max/min)", ratio) if max(D_values) - min(D_values) < 1e-10 * max(D_values): _LOGGER.warning("D(h) is nearly identical for all h — h-dependence may not be captured") return D_h_table
# Module-level globals used by child_process_init for multiprocessing worker state redshift_upper_integration_limit: float = 0.0 redshift_lower_integration_limit: float = 0.0 bh_mass_upper_integration_limit: float = 0.0 bh_mass_lower_integration_limit: float = 0.0 detection_probability: Any = None # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: # Precomputed completion-term denominator D(h) for each h in the evaluation grid D_h_table: dict[float, float] = {} # Legacy global kept for single_host_likelihood_integration_testing() and # single_host_likelihood_grid() — not used by the optimized production path. detection_likelihood_gaussians_by_detection_index: Any = None # Pre-computed Gaussian arrays (replace frozen scipy multivariate_normal objects) means_3d: npt.NDArray[np.float64] = np.empty(0) cov_inv_3d: npt.NDArray[np.float64] = np.empty(0) log_norm_3d: npt.NDArray[np.float64] = np.empty(0) means_4d: npt.NDArray[np.float64] = np.empty(0) cov_inv_4d: npt.NDArray[np.float64] = np.empty(0) log_norm_4d: npt.NDArray[np.float64] = np.empty(0) det_index_to_slot: dict[int, int] = {} # Pre-computed conditional distribution parameters for BH mass branch sigma2_cond_arr: npt.NDArray[np.float64] = np.empty(0) proj_arr: npt.NDArray[np.float64] = np.empty(0) # Pre-extracted detection parameters (avoid pickling Detection objects per starmap call) det_d_L_arr: npt.NDArray[np.float64] = np.empty(0) det_d_L_unc_arr: npt.NDArray[np.float64] = np.empty(0) det_M_arr: npt.NDArray[np.float64] = np.empty(0) det_phi_arr: npt.NDArray[np.float64] = np.empty(0) det_theta_arr: npt.NDArray[np.float64] = np.empty(0) def _check_covariance_quality( cov: npt.NDArray[np.float64], threshold: float, ) -> tuple[float, bool]: """Check whether a covariance matrix is numerically degenerate. Computes the condition number of *cov* and returns whether it exceeds *threshold*. A high condition number indicates near-singularity that would make ``np.linalg.pinv`` and ``np.linalg.slogdet`` unreliable. Args: cov: Square covariance matrix to check. threshold: Condition-number threshold above which the matrix is considered degenerate. Returns: A tuple ``(condition_number, should_exclude)`` where *condition_number* is ``float(np.linalg.cond(cov))`` and *should_exclude* is ``True`` when ``condition_number > threshold``. """ cond = float(np.linalg.cond(cov)) return cond, cond > threshold def _mvn_pdf( x: npt.NDArray[np.float64], mean: npt.NDArray[np.float64], cov_inv: npt.NDArray[np.float64], log_norm: float, ) -> npt.NDArray[np.float64]: """Evaluate multivariate normal PDF using pre-computed inverse and log-normalization. Equivalent to ``scipy.stats.multivariate_normal.pdf()`` but avoids repeated Cholesky decompositions by using pre-computed Sigma^{-1} and log((2*pi)^{-k/2} * |Sigma|^{-1/2}). Args: x: Evaluation points, shape ``(N, k)`` or ``(k,)``. mean: Mean vector, shape ``(k,)``. cov_inv: Inverse covariance matrix, shape ``(k, k)``. log_norm: Pre-computed log-normalization constant. Returns: PDF values, shape ``(N,)``. """ diff = np.atleast_2d(x) - mean # (N, k) maha = np.sum(diff @ cov_inv * diff, axis=-1) # (N,) result: npt.NDArray[np.float64] = np.exp(log_norm - 0.5 * maha) return result
[docs] class BayesianStatistics: """Hubble constant posterior evaluation. Loads saved Cramér-Rao bounds from CSV, constructs a simulation-based :class:`SimulationDetectionProbability`, builds multivariate-normal GW likelihoods from the full Fisher-matrix covariance, and evaluates per-detection posteriors over an H₀ grid using a multiprocessing pool. Invoked via ``main.py:evaluate()`` (``--evaluate`` CLI flag). Output is written to ``simulations/posteriors/`` as JSON. """ cramer_rao_bounds: pd.DataFrame detection: Detection cosmological_model: LamCDMScenario h: float Omega_m: float Omega_DE: float w_0: float w_a: float h_values: list[float] h_values_with_bh_mass: list[float] galaxy_weights: dict[str, dict[str, list[float]]] additional_galaxies_without_bh_mass: dict[str, dict[str, list[float]]] posterior_data: dict[int, list[float]] posterior_data_with_bh_mass: dict[int | str, Any] def __init__(self) -> None: self.h_values = [] self.h_values_with_bh_mass = [] self.galaxy_weights = {} self.additional_galaxies_without_bh_mass = {} self.posterior_data = {} self.posterior_data_with_bh_mass = {} self.cramer_rao_bounds = pd.read_csv(PREPARED_CRAMER_RAO_BOUNDS_PATH) self.true_cramer_rao_bounds = pd.read_csv(CRAMER_RAO_BOUNDS_OUTPUT_PATH) _LOGGER.info(f"Loaded {len(self.cramer_rao_bounds)} detections...") self.cosmological_model = LamCDMScenario() self.h = self.cosmological_model.h.fiducial_value self.Omega_m = self.cosmological_model.Omega_m.fiducial_value self.Omega_DE = 1 - self.Omega_m self.w_0 = self.cosmological_model.w_0 self.w_a = self.cosmological_model.w_a self.catalog_only: bool = False self._diagnostic_rows: list[dict[str, object]] = []
[docs] def evaluate( self, galaxy_catalog: GalaxyCatalogueHandler, cosmological_model: Model1CrossCheck, h_value: float, num_workers: int | None = None, catalog_only: bool = False, pdet_dl_bins: int = 60, pdet_mass_bins: int = 40, fisher_cond_threshold: float = 1e16, ) -> None: self.catalog_only = catalog_only self._diagnostic_rows = [] if catalog_only: _LOGGER.info("catalog_only mode: f_i=1, L_comp=0 (skipping completion integral)") _LOGGER.info(f"Computing posteriors for h = {h_value}...") if (h_value < self.cosmological_model.h.lower_limit) or ( h_value > self.cosmological_model.h.upper_limit ): raise ValueError("Hubble constant out of bounds.") _LOGGER.debug(f"Loaded {len(self.cramer_rao_bounds)} detections...") # Filter detections: SNR threshold + relative d_L error n_before = len(self.cramer_rao_bounds) self.cramer_rao_bounds = self.cramer_rao_bounds[ self.cramer_rao_bounds["SNR"] >= SNR_THRESHOLD ] _LOGGER.info( f"SNR filter (>= {SNR_THRESHOLD}): {n_before} -> {len(self.cramer_rao_bounds)} detections" ) for index, detection in self.cramer_rao_bounds.iterrows(): detection = Detection(detection) if use_detection(detection) is False: self.cramer_rao_bounds.drop(index, inplace=True) _LOGGER.info( f"After quality filtering: {len(self.cramer_rao_bounds)} detections " f"(d_L relative error < {FRACTIONAL_LUMINOSITY_DISTANCE_ERROR_THRESHOLD})" ) # parameter limitations REDSHIFT_LOWER_LIMIT = 0.0 REDSHIFT_UPPER_LIMIT = cosmological_model.max_redshift BH_MASS_LOWER_LIMIT = cosmological_model.parameter_space.M.lower_limit BH_MASS_UPPER_LIMIT = cosmological_model.parameter_space.M.upper_limit _LOGGER.debug("Creating detection probability functions...") detection_probability = SimulationDetectionProbability( injection_data_dir=INJECTION_DATA_DIR, snr_threshold=SNR_THRESHOLD, dl_bins=pdet_dl_bins, mass_bins=pdet_mass_bins, ) _LOGGER.debug("Detection probability functions created.") # Pre-warm P_det grid cache for target h -- avoids N workers each building # the same grid independently after pool spawn detection_probability._get_or_build_grid(h_value) _LOGGER.debug("P_det grid pre-warmed for h=%.4f.", h_value) # Validate P_det grid coverage for observed events detection_probability.validate_coverage(h_value, self.cramer_rao_bounds) # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: # Precompute completion-term denominator D(h) over full detectable volume. # D(h) is event-independent; compute once per h-value. _D_h_table = precompute_completion_denominator( h_values=[h_value], detection_probability_obj=detection_probability, Omega_m=self.Omega_m, Omega_DE=self.Omega_DE, ) _LOGGER.info("D(h) precomputed for %d h-value(s).", len(_D_h_table)) _LOGGER.debug("Pre-computing Gaussian arrays for GW likelihoods...") _t0 = time.perf_counter() det_indices = list(self.cramer_rao_bounds.index) n_det = len(det_indices) _det_index_to_slot: dict[int, int] = { int(idx): slot for slot, idx in enumerate(det_indices) } # Pre-allocate arrays for 3D (without BH mass) and 4D (with BH mass) Gaussians _means_3d = np.zeros((n_det, 3)) _cov_inv_3d = np.zeros((n_det, 3, 3)) _log_norm_3d = np.zeros(n_det) _means_4d = np.zeros((n_det, 4)) _cov_inv_4d = np.zeros((n_det, 4, 4)) _log_norm_4d = np.zeros(n_det) # Conditional distribution pre-computations for BH mass branch _sigma2_cond_arr = np.zeros(n_det) _proj_arr = np.zeros((n_det, 3)) # Fisher quality: condition numbers and exclusion mask _excluded_mask = np.zeros(n_det, dtype=bool) _cond_3d = np.zeros(n_det, dtype=np.float64) _cond_4d = np.zeros(n_det, dtype=np.float64) _eigen_3d: dict[int, npt.NDArray[np.float64]] = {} # flagged slots only _eigen_4d: dict[int, npt.NDArray[np.float64]] = {} # flagged slots only # Pre-extracted detection scalar parameters (avoid pickling Detection objects) _det_d_L = np.zeros(n_det) _det_d_L_unc = np.zeros(n_det) _det_M = np.zeros(n_det) _det_phi = np.zeros(n_det) _det_theta = np.zeros(n_det) for index, row in self.cramer_rao_bounds.iterrows(): det = Detection(row) slot = _det_index_to_slot[int(index)] # Store detection scalars _det_d_L[slot] = det.d_L _det_d_L_unc[slot] = det.d_L_uncertainty _det_M[slot] = det.M _det_phi[slot] = det.phi _det_theta[slot] = det.theta # Build 3D covariance (without BH mass) cov_3d = np.array( [ [ det.phi_error**2, det.theta_phi_covariance, det.d_L_phi_covariance / det.d_L, ], [ det.theta_phi_covariance, det.theta_error**2, det.d_L_theta_covariance / det.d_L, ], [ det.d_L_phi_covariance / det.d_L, det.d_L_theta_covariance / det.d_L, det.d_L_uncertainty**2 / det.d_L**2, ], ] ) # Build 4D covariance (with BH mass) cov_4d = np.array( [ [ det.phi_error**2, det.theta_phi_covariance, det.d_L_phi_covariance / det.d_L, det.M_phi_covariance / det.M, ], [ det.theta_phi_covariance, det.theta_error**2, det.d_L_theta_covariance / det.d_L, det.M_theta_covariance / det.M, ], [ det.d_L_phi_covariance / det.d_L, det.d_L_theta_covariance / det.d_L, det.d_L_uncertainty**2 / det.d_L**2, det.d_L_M_covariance / det.d_L / det.M, ], [ det.M_phi_covariance / det.M, det.M_theta_covariance / det.M, det.d_L_M_covariance / det.d_L / det.M, det.M_uncertainty**2 / det.M**2, ], ] ) # Compute condition numbers for degeneracy detection (per D-01, D-02) cond_3d, exclude_3d = _check_covariance_quality(cov_3d, fisher_cond_threshold) cond_4d, exclude_4d = _check_covariance_quality(cov_4d, fisher_cond_threshold) _cond_3d[slot] = cond_3d _cond_4d[slot] = cond_4d if exclude_3d or exclude_4d: _excluded_mask[slot] = True _eigen_3d[slot] = np.linalg.eigh(cov_3d)[0] _eigen_4d[slot] = np.linalg.eigh(cov_4d)[0] _LOGGER.warning( "Detection %d excluded: cond_3d=%.2e, cond_4d=%.2e (threshold=%.2e)", int(index), cond_3d, cond_4d, fisher_cond_threshold, ) continue # 3D Gaussian: mean, inverse, log-normalization _means_3d[slot] = [det.phi, det.theta, 1] _cov_inv_3d[slot] = np.linalg.pinv(cov_3d) _sign_3d, logdet_3d = np.linalg.slogdet(cov_3d) if _sign_3d <= 0: _excluded_mask[slot] = True _eigen_3d[slot] = np.linalg.eigh(cov_3d)[0] _eigen_4d[slot] = np.linalg.eigh(cov_4d)[0] _LOGGER.warning( "Detection %d excluded: slogdet sign_3d=%d (non-positive definite)", int(index), _sign_3d, ) continue _log_norm_3d[slot] = -0.5 * (3 * np.log(2 * np.pi) + logdet_3d) # 4D Gaussian: mean, inverse, log-normalization _means_4d[slot] = [det.phi, det.theta, 1, 1] _cov_inv_4d[slot] = np.linalg.pinv(cov_4d) _sign_4d, logdet_4d = np.linalg.slogdet(cov_4d) if _sign_4d <= 0: _excluded_mask[slot] = True _eigen_3d[slot] = np.linalg.eigh(cov_3d)[0] _eigen_4d[slot] = np.linalg.eigh(cov_4d)[0] _LOGGER.warning( "Detection %d excluded: slogdet sign_4d=%d (non-positive definite)", int(index), _sign_4d, ) continue _log_norm_4d[slot] = -0.5 * (4 * np.log(2 * np.pi) + logdet_4d) # Conditional distribution for BH mass branch # Bishop (2006) PRML Eq. 2.81-2.82 cov_obs = cov_4d[:3, :3] # = cov_3d cov_cross = cov_4d[3, :3] cov_mz = cov_4d[3, 3] cov_obs_inv = _cov_inv_3d[slot] # reuse already-computed inverse _sigma2_cond_arr[slot] = max(float(cov_mz - cov_cross @ cov_obs_inv @ cov_cross), 1e-30) _proj_arr[slot] = cov_cross @ cov_obs_inv # Log Fisher quality summary (D-11) n_flagged = int(_excluded_mask.sum()) top5_worst = sorted( [ (int(idx), float(_cond_3d[slot]), float(_cond_4d[slot])) for idx, slot in _det_index_to_slot.items() ], key=lambda t: max(t[1], t[2]), reverse=True, )[:5] _LOGGER.info( "Fisher quality: %d total, %d flagged/excluded (%.1f%%). Top-5 worst cond: %s", n_det, n_flagged, 100 * n_flagged / max(n_det, 1), [(idx, f"{c3:.2e}", f"{c4:.2e}") for idx, c3, c4 in top5_worst], ) # Store index mapping on the instance for use in p_Di completion term self._det_index_to_slot = _det_index_to_slot self._means_3d = _means_3d self._cov_inv_3d = _cov_inv_3d self._log_norm_3d = _log_norm_3d self._det_d_L = _det_d_L self._det_d_L_unc = _det_d_L_unc self._det_M = _det_M self._det_phi = _det_phi self._det_theta = _det_theta self._D_h_table = _D_h_table self._excluded_mask = _excluded_mask self._cond_3d = _cond_3d self._cond_4d = _cond_4d self._eigen_3d = _eigen_3d self._eigen_4d = _eigen_4d self._fisher_cond_threshold = fisher_cond_threshold _LOGGER.info( "Gaussian precomputation: %.2fs (%d detections)", time.perf_counter() - _t0, n_det, ) # Gray et al. (2020), arXiv:1908.06050, Eq. 9: # Completeness function f(z, h) for weighting catalog vs completion terms completeness = GladeCatalogCompleteness() self.h = h_value if num_workers is None: try: available_cpus = len(os.sched_getaffinity(0)) except AttributeError: available_cpus = os.cpu_count() or 1 num_workers = max(1, available_cpus - 2) _LOGGER.debug(f"Using {num_workers} worker(s) for multiprocessing pool.") _t0 = time.perf_counter() # forkserver with module preloading: the server imports heavy modules # once, then workers inherit them via copy-on-write — eliminates 126× # Python startup + module import on the shared cluster filesystem. # Fallback: if forkserver is unavailable, use spawn (always safe). _ctx: mp.context.BaseContext if "forkserver" in mp.get_all_start_methods(): _fs_ctx = mp.get_context("forkserver") _fs_ctx.set_forkserver_preload( [ "numpy", "scipy.interpolate", "scipy.integrate", "scipy.stats", "pandas", "master_thesis_code.bayesian_inference.simulation_detection_probability", "master_thesis_code.physical_relations", ] ) _ctx = _fs_ctx else: _ctx = mp.get_context("spawn") _LOGGER.info("Multiprocessing context: %s", _ctx.get_start_method()) with _ctx.Pool( num_workers, initializer=child_process_init, initargs=( REDSHIFT_LOWER_LIMIT, REDSHIFT_UPPER_LIMIT, BH_MASS_LOWER_LIMIT, BH_MASS_UPPER_LIMIT, detection_probability, _means_3d, _cov_inv_3d, _log_norm_3d, _means_4d, _cov_inv_4d, _log_norm_4d, _det_index_to_slot, _sigma2_cond_arr, _proj_arr, _det_d_L, _det_d_L_unc, _det_M, _det_phi, _det_theta, _D_h_table, ), ) as pool: _LOGGER.info( "Pool spawn (%d workers): %.2fs", num_workers, time.perf_counter() - _t0, ) self.p_D( galaxy_catalog=galaxy_catalog, redshift_upper_limit=REDSHIFT_UPPER_LIMIT, pool=pool, completeness=completeness, detection_probability_obj=detection_probability, ) _LOGGER.info(f"posteriors comupted for h = {self.h}") if not os.path.isdir("simulations/posteriors"): os.makedirs("simulations/posteriors") if not os.path.isdir("simulations/posteriors_with_bh_mass"): os.makedirs("simulations/posteriors_with_bh_mass") # 4-decimal precision required to distinguish Phase-50 superdense # midpoints (Δh=0.0005, e.g. 0.7205 / 0.7215) from the dense Δh=0.001 # grid (0.720 / 0.721 / 0.722). Rounding to 3 decimals collapses each # midpoint onto a neighbouring dense filename, so the second writer # silently overwrites the first. Posteriors share filenames only when # the underlying h-values agree to 4 decimals. h_label = str(np.round(self.h, 4)).replace(".", "_") with open( f"simulations/posteriors/h_{h_label}.json", "w", ) as file: data = {str(key): value for key, value in self.posterior_data.items()} json.dump(data | {"h": self.h}, file) with open( f"simulations/posteriors_with_bh_mass/h_{h_label}.json", "w", ) as file: # update existing data data = {str(key): value for key, value in self.posterior_data_with_bh_mass.items()} json.dump(data | {"h": self.h}, file) # Write per-event diagnostic CSV if self._diagnostic_rows: diagnostic_csv_path = "simulations/diagnostics/event_likelihoods.csv" self._write_diagnostic_csv(diagnostic_csv_path) # Write Fisher quality CSV (per D-12) self._write_fisher_quality_csv() # Generate Fisher quality diagnostic plot (per D-06, D-07) from master_thesis_code.plotting.fisher_plots import plot_fisher_diagnostics plot_fisher_diagnostics( cond_3d=self._cond_3d, cond_4d=self._cond_4d, excluded_mask=self._excluded_mask, eigen_3d=self._eigen_3d, eigen_4d=self._eigen_4d, det_d_L=self._det_d_L, det_M=self._det_M, det_index_to_slot=self._det_index_to_slot, threshold=self._fisher_cond_threshold, output_dir="simulations", )
def _write_fisher_quality_csv(self) -> None: """Write per-event Fisher matrix condition numbers and exclusion flags to CSV. Columns: detection_index, cond_3d, cond_4d, excluded. Written once per evaluation run to ``simulations/fisher_quality.csv``. """ rows = [ { "detection_index": int(idx), "cond_3d": float(self._cond_3d[slot]), "cond_4d": float(self._cond_4d[slot]), "excluded": bool(self._excluded_mask[slot]), } for idx, slot in self._det_index_to_slot.items() ] df = pd.DataFrame(rows) csv_path = os.path.join("simulations", "fisher_quality.csv") os.makedirs(os.path.dirname(csv_path), exist_ok=True) df.to_csv(csv_path, index=False) _LOGGER.info("Fisher quality CSV written to %s (%d rows)", csv_path, len(rows)) def _write_diagnostic_csv(self, csv_path: str) -> None: """Write per-event diagnostic rows to CSV (append mode, header on first write). Args: csv_path: Path to the output CSV file. """ if not self._diagnostic_rows: return fieldnames = [ "event_idx", "h", "f_i", "L_cat_no_bh", "L_cat_with_bh", "L_comp", "combined_no_bh", "combined_with_bh", ] os.makedirs(os.path.dirname(csv_path), exist_ok=True) write_header = not os.path.isfile(csv_path) with open(csv_path, "a", newline="") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) if write_header: writer.writeheader() writer.writerows(self._diagnostic_rows) _LOGGER.info("Wrote %d diagnostic rows to %s", len(self._diagnostic_rows), csv_path)
[docs] def p_D( self, galaxy_catalog: GalaxyCatalogueHandler, redshift_upper_limit: float, pool: mp.pool.Pool, completeness: GladeCatalogCompleteness, detection_probability_obj: SimulationDetectionProbability, ) -> None: count = 0 _det_times: list[float] = [] self.posterior_data_with_bh_mass[GALAXY_LIKELIHOODS] = {} self.posterior_data_with_bh_mass[ADDITIONAL_GALAXIES_WITHOUT_BH_MASS] = {} for index, detection in self.cramer_rao_bounds.iterrows(): _t_det = time.perf_counter() slot = self._det_index_to_slot[int(index)] if self._excluded_mask[slot]: _LOGGER.debug("Skipping excluded detection %d (Fisher quality)", int(index)) continue _LOGGER.info(f"Progess: detections: {count}/{len(self.cramer_rao_bounds)}...") count += 1 try: self.posterior_data[index] except KeyError: self.posterior_data[index] = [] self.posterior_data_with_bh_mass[index] = [] self.detection = Detection(detection) z_min, z_max = get_redshift_outer_bounds( distance=self.detection.d_L, distance_error=self.detection.d_L_uncertainty, h_min=self.cosmological_model.h.lower_limit, h_max=self.cosmological_model.h.upper_limit, Omega_m_min=self.cosmological_model.Omega_m.lower_limit, Omega_m_max=self.cosmological_model.Omega_m.upper_limit, sigma_multiplier=2.0, ) z_max = min(z_max, redshift_upper_limit) possible_hosts = galaxy_catalog.get_possible_hosts_from_ball_tree( phi=self.detection.phi, theta=self.detection.theta, phi_sigma=self.detection.phi_error, theta_sigma=self.detection.theta_error, cov_theta_phi=self.detection.theta_phi_covariance, # COORD-04: 2×2 sky Fisher off-diagonal z_min=z_min, z_max=z_max, M_z=self.detection.M, M_z_sigma=self.detection.M_uncertainty, sigma_multiplier=1.5, # type: ignore[arg-type] ) if possible_hosts is None: _LOGGER.debug("no possible hosts found...") continue possible_hosts, possible_hosts_with_bh_mass = possible_hosts # type: ignore[assignment] _LOGGER.info( f"possible hosts found {len(possible_hosts)}/{len(possible_hosts_with_bh_mass)}..." ) """ if len(possible_hosts_with_bh_mass) == 0: detection_galaxy = _get_closest_possible_host( self.detection, possible_hosts ) else: detection_galaxy = _get_closest_possible_host( self.detection, possible_hosts_with_bh_mass ) self.detection.phi = detection_galaxy.phiS self.detection.theta = detection_galaxy.qS """ event_likelihood, event_likelihood_with_bh_mass = self.p_Di( possible_host_galaxies=possible_hosts, # type: ignore[arg-type] possible_host_galaxies_with_bh_mass=possible_hosts_with_bh_mass, detection_index=index, pool=pool, completeness=completeness, detection_probability_obj=detection_probability_obj, ) self.posterior_data[index].append(event_likelihood) self.posterior_data_with_bh_mass[index].append(event_likelihood_with_bh_mass) _det_time = time.perf_counter() - _t_det _det_times.append(_det_time) if count % 100 == 0 or count == len(self.cramer_rao_bounds): _LOGGER.info( "Detection %d/%d: last=%.2fs, avg=%.2fs, est_remaining=%.0fs", count, len(self.cramer_rao_bounds), _det_time, np.mean(_det_times), np.mean(_det_times) * (len(self.cramer_rao_bounds) - count), ) _LOGGER.debug( f"event likelihood: {event_likelihood}\nevent likelihood with bh mass: {event_likelihood_with_bh_mass}" )
[docs] def p_Di( self, possible_host_galaxies: list[HostGalaxy], possible_host_galaxies_with_bh_mass: list[HostGalaxy], detection_index: int, pool: mp.pool.Pool, completeness: GladeCatalogCompleteness, detection_probability_obj: SimulationDetectionProbability, ) -> tuple[float, float]: # start parallel computation _LOGGER.info(f"start parallel computation with: {pool}") start = time.time() # remove duplicates from possible_host_galaxies already covered in possible_host_galaxies_with_bh_mass hosts_with_bh_mass_set = set(possible_host_galaxies_with_bh_mass) possible_host_galaxies_reduced = [ host for host in possible_host_galaxies if host not in hosts_with_bh_mass_set ] _LOGGER.debug( f"reduced possible hosts galaxies to unique, removed {len(possible_host_galaxies) - len(possible_host_galaxies_reduced)} galaxies." ) chunksize = math.ceil(len(possible_host_galaxies_reduced) / pool._processes) # type: ignore[attr-defined] chunksize_with_bh_mass = math.ceil( len(possible_host_galaxies_with_bh_mass) / pool._processes # type: ignore[attr-defined] ) results_with_bh_mass = pool.starmap( single_host_likelihood, [ ( host.phiS, host.qS, host.z, host.z_error, host.M, host.M_error, detection_index, self.h, True, ) for host in possible_host_galaxies_with_bh_mass ], chunksize=chunksize_with_bh_mass, ) results_without_blackhole_mass = pool.starmap( single_host_likelihood, [ ( host.phiS, host.qS, host.z, host.z_error, host.M, host.M_error, detection_index, self.h, False, ) for host in possible_host_galaxies_reduced ], chunksize=chunksize, ) end = time.time() _LOGGER.info(f"parallel computing took: {end - start}s") galaxy_likelihoods = list( zip( [galaxy.catalog_index for galaxy in possible_host_galaxies_with_bh_mass], results_with_bh_mass, ) ) self.posterior_data_with_bh_mass[GALAXY_LIKELIHOODS][detection_index] = galaxy_likelihoods additional_likelihoods = list( zip( [galaxy.catalog_index for galaxy in possible_host_galaxies_reduced], results_without_blackhole_mass, ) ) self.posterior_data_with_bh_mass[ADDITIONAL_GALAXIES_WITHOUT_BH_MASS][detection_index] = ( additional_likelihoods ) # --- Catalog term (L_cat): existing galaxy-sum likelihood --- # Gray et al. (2020), arXiv:1908.06050, Eqs. 24-25 if len(results_without_blackhole_mass) == 0 and len(results_with_bh_mass) == 0: _LOGGER.warning(f"Detection {detection_index}: no catalog results found") L_cat_without_bh_mass = 0.0 L_cat_with_bh_mass = 0.0 else: # Gray et al. (2020), arXiv:1908.06050, Eq. 24-25: L_cat = (1/N) Σ_g [N_g / D_g] # under uniform 1/N galaxy prior. Note: Σ N_g / Σ D_g ≠ (1/N) Σ (N_g/D_g) when D_g # vary (e.g., N=2, N_1=1, D_1=1, N_2=1, D_2=2: old=2/3, new=3/4). See # test_l_cat_equivalence.py for the counterexample and limiting-case checks. all_results_without_bh = list(results_without_blackhole_mass) + list( results_with_bh_mass ) ratios_without_bh = [r[0] / r[1] for r in all_results_without_bh if r[1] > 0] L_cat_without_bh_mass = float(np.mean(ratios_without_bh)) if ratios_without_bh else 0.0 if len(results_with_bh_mass) > 0: # Gray et al. (2020), arXiv:1908.06050, Eq. 24-25: L_cat = (1/N) Σ_g [N_g / D_g] ratios_with_bh = [r[2] / r[3] for r in results_with_bh_mass if r[3] > 0] L_cat_with_bh_mass = float(np.mean(ratios_with_bh)) if ratios_with_bh else 0.0 else: L_cat_with_bh_mass = 0.0 # --- Completion term: Gray et al. (2020), arXiv:1908.06050, Eqs. 31-32 --- # When catalog_only=True, skip the completion integral entirely: # set f_i=1.0 (pure catalog), L_comp=0.0 if self.catalog_only: f_i = 1.0 L_comp = 0.0 else: # L_comp = integral[p_GW * P_det * dVc/dz dz] / integral[P_det * dVc/dz dz] # Uses "without BH mass" 3D Gaussian for both variants # (uncataloged host has no galaxy mass information) # Completeness at the detected redshift for the trial h # Gray et al. (2020), arXiv:1908.06050, Eq. 9: f_i evaluated at z(d_L_det, h) z_det = dist_to_redshift(self.detection.d_L, h=self.h) f_i = float(completeness.get_completeness_at_redshift(z_det, self.h)) # Integration limits: same 4-sigma range as catalog term numerator integration_limit_sigma_multiplier = 4.0 z_upper = dist_to_redshift( self.detection.d_L + integration_limit_sigma_multiplier * self.detection.d_L_uncertainty, h=self.h, ) z_lower = dist_to_redshift( self.detection.d_L - integration_limit_sigma_multiplier * self.detection.d_L_uncertainty, h=self.h, ) z_lower = max(z_lower, 1e-6) # avoid z=0 singularity in volume element FIXED_QUAD_N = 50 # Completion term numerator integrand # Gray et al. (2020), arXiv:1908.06050, Eq. 31: # p_GW(x|z, Omega_det, h) * P_det(d_L(z,h)) * dVc/dz _comp_slot = self._det_index_to_slot[detection_index] _comp_mean_3d = self._means_3d[_comp_slot] _comp_cov_inv_3d = self._cov_inv_3d[_comp_slot] _comp_log_norm_3d = float(self._log_norm_3d[_comp_slot]) _comp_det_d_L = self._det_d_L[_comp_slot] def completion_numerator_integrand( z: npt.NDArray[np.float64], ) -> npt.NDArray[np.float64]: d_L: npt.NDArray[np.float64] = np.asarray( dist_vectorized(z, h=self.h), dtype=np.float64 ) # Gpc d_L_fraction = d_L / _comp_det_d_L # dimensionless phi = np.full_like(z, self.detection.phi) theta = np.full_like(z, self.detection.theta) p_gw: npt.NDArray[np.float64] = _mvn_pdf( np.vstack([phi, theta, d_L_fraction]).T, _comp_mean_3d, _comp_cov_inv_3d, _comp_log_norm_3d, ) # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: shared p_det # function for L_comp numerator and D(h) denominator (STAT-03 # symmetry, commit a70d1a2). Phase 44: NN-fill below first bin # (real injection statistic), zero above injection horizon. p_det = detection_probability_obj.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, phi, theta, h=self.h ) dVc: npt.NDArray[np.float64] = np.atleast_1d( np.asarray(comoving_volume_element(z, h=self.h), dtype=np.float64) ) return p_gw * p_det * dVc comp_numerator: float = fixed_quad( completion_numerator_integrand, z_lower, z_upper, n=FIXED_QUAD_N )[0] # Gray et al. (2020), arXiv:1908.06050, Eq. 31: # D(h) = ∫ p_det · dV_c/dz dz normalizes p_galaxy ∝ p_det · dV_c # to a probability density, making L_comp = num/D the per-event # likelihood CONDITIONAL on detection (not an outer selection # correction). Tier 3 audit (2026-05-04) confirmed that combining # this with combine_log_space's old −N log D outer subtraction # double-counted D and biased MAP by +0.020 to +0.025; outer # subtraction is now disabled in posterior_combination.combine_log_space. comp_denominator: float = self._D_h_table.get(self.h, 0.0) # Grid coverage flag: warn if numerator 4-sigma window exceeds P_det grid d_L_upper = self.detection.d_L + 4.0 * self.detection.d_L_uncertainty dl_max_grid = detection_probability_obj.get_dl_max(self.h) if d_L_upper > dl_max_grid: _LOGGER.warning( "Detection %d: 4-sigma d_L upper (%.4f Gpc) exceeds P_det grid max (%.4f Gpc)", detection_index, d_L_upper, dl_max_grid, ) if comp_denominator > 0: L_comp = float(comp_numerator / comp_denominator) # Diagnostic: N_i(h)/D(h) ratio should be < 1 if L_comp > 1.0: _LOGGER.warning( "Detection %d: N_i/D(h) = %.4e > 1.0 (unexpected)", detection_index, L_comp, ) else: _LOGGER.warning(f"Detection {detection_index}: D(h) is zero, using L_cat only") L_comp = 0.0 f_i = 1.0 # fall back to catalog-only _LOGGER.debug( f"Detection {detection_index}: f_i={f_i:.4f}, " f"L_cat_no_bh={L_cat_without_bh_mass:.6e}, " f"L_cat_with_bh={L_cat_with_bh_mass:.6e}, L_comp={L_comp:.6e}" ) # --- Combination: Gray et al. (2020), arXiv:1908.06050, Eq. 9 --- # p_i = f_i * L_cat + (1 - f_i) * L_comp # L_comp uses "without BH mass" Gaussian for both variants # (uncataloged host has no galaxy mass information) combined_without_bh_mass = float(f_i * L_cat_without_bh_mass + (1 - f_i) * L_comp) combined_with_bh_mass = float(f_i * L_cat_with_bh_mass + (1 - f_i) * L_comp) # Record diagnostic row for every event self._diagnostic_rows.append( { "event_idx": detection_index, "h": self.h, "f_i": f_i, "L_cat_no_bh": L_cat_without_bh_mass, "L_cat_with_bh": L_cat_with_bh_mass, "L_comp": L_comp, "combined_no_bh": combined_without_bh_mass, "combined_with_bh": combined_with_bh_mass, } ) return (combined_without_bh_mass, combined_with_bh_mass)
[docs] def use_detection(detection: Detection) -> bool: sky_localization_uncertainty = _sky_localization_uncertainty( phi_error=detection.phi_error, theta=detection.theta, theta_error=detection.theta_error, cov_theta_phi=detection.theta_phi_covariance, ) distance_relative_error = detection.d_L_uncertainty / detection.d_L if distance_relative_error < FRACTIONAL_LUMINOSITY_DISTANCE_ERROR_THRESHOLD: return True _LOGGER.debug( f"Detection skipped: distance_relative_error {distance_relative_error} > {FRACTIONAL_LUMINOSITY_DISTANCE_ERROR_THRESHOLD}, sky_localization_uncertainty {sky_localization_uncertainty}" ) return False
[docs] def single_host_likelihood_grid( possible_host: HostGalaxy, detection: Detection, detection_index: int, h: float, evaluate_with_bh_mass: bool, ) -> list[float]: global redshift_upper_integration_limit global redshift_lower_integration_limit global bh_mass_upper_integration_limit global bh_mass_lower_integration_limit global detection_probability global detection_likelihood_gaussians_by_detection_index # find sharpest peak print(possible_host.z, possible_host.z_error) print(detection.d_L, detection.d_L_uncertainty) return []
[docs] def single_host_likelihood( host_phiS: float, host_qS: float, host_z: float, host_z_error: float, host_M: float, host_M_error: float, detection_index: int, h: float, evaluate_with_bh_mass: bool, ) -> list[float]: global redshift_upper_integration_limit global redshift_lower_integration_limit global bh_mass_upper_integration_limit global bh_mass_lower_integration_limit global detection_probability global means_3d, cov_inv_3d, log_norm_3d global means_4d, cov_inv_4d, log_norm_4d global det_index_to_slot global sigma2_cond_arr, proj_arr global det_d_L_arr, det_d_L_unc_arr, det_M_arr, det_phi_arr, det_theta_arr FIXED_QUAD_N = 50 slot = det_index_to_slot[detection_index] _det_d_L = float(det_d_L_arr[slot]) _det_d_L_unc = float(det_d_L_unc_arr[slot]) _det_M = float(det_M_arr[slot]) _mean_3d = means_3d[slot] _cov_inv_3d = cov_inv_3d[slot] _log_norm_3d = float(log_norm_3d[slot]) integration_limit_sigma_multiplier = 4.0 numerator_integration_upper_redshift_limit = dist_to_redshift( _det_d_L + integration_limit_sigma_multiplier * _det_d_L_unc, h=h ) numerator_integration_lower_redshift_limit = dist_to_redshift( _det_d_L - integration_limit_sigma_multiplier * _det_d_L_unc, h=h ) denominator_integration_upper_redshift_limit = ( host_z + integration_limit_sigma_multiplier * host_z_error ) denominator_integration_lower_redshift_limit = ( host_z - integration_limit_sigma_multiplier * host_z_error ) # construct normal distribution for redshift and mass for host galaxy galaxy_redshift_normal_distribution = norm(loc=host_z, scale=host_z_error) # Sky localization weight (phi, theta) is inside the GW likelihood Gaussian. # Verified correct by Phase 14 derivation (Sec. 2.7): the 3D/4D GW Gaussian # naturally encodes the sky position weight -- this is NOT a source of error. def numerator_integrant_without_bh_mass(z: npt.NDArray[np.float64]) -> Any: d_L = dist_vectorized(z, h=h) # fraction = d_L_model / d_L_measured; matches covariance σ²/d_L_measured² luminosity_distance_fraction = d_L / _det_d_L phi = np.full_like(z, host_phiS) theta = np.full_like(z, host_qS) # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: shared p_det function # with D(h) denominator (STAT-03 symmetry, commit a70d1a2). Phase 44: # NN-fill below first bin (real injection statistic), zero above # injection horizon. p_det = detection_probability.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, phi, theta, h=h ) return ( p_det * _mvn_pdf( np.vstack([phi, theta, luminosity_distance_fraction]).T, _mean_3d, _cov_inv_3d, _log_norm_3d, ) * galaxy_redshift_normal_distribution.pdf(z) ) def denominator_integrant_without_bh_mass(z: npt.NDArray[np.float64]) -> Any: d_L = dist_vectorized(z, h=h) phi = np.full_like(z, host_phiS) theta = np.full_like(z, host_qS) # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: shared p_det function # with D(h) denominator (STAT-03 symmetry, commit a70d1a2). Phase 44: # NN-fill below first bin (real injection statistic), zero above # injection horizon. p_det = detection_probability.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, phi, theta, h=h ) return p_det * galaxy_redshift_normal_distribution.pdf(z) ( single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_numerator_without_bh_mass_error, ) = fixed_quad( numerator_integrant_without_bh_mass, numerator_integration_lower_redshift_limit, numerator_integration_upper_redshift_limit, n=FIXED_QUAD_N, ) ( single_host_likelihood_denominator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass_error, ) = fixed_quad( denominator_integrant_without_bh_mass, denominator_integration_lower_redshift_limit, denominator_integration_upper_redshift_limit, n=FIXED_QUAD_N, ) # STAT-04: Per-event off-grid quadrature weight diagnostic. # Estimate the fraction of the integration window that lies outside the P_det grid. # Grid bounds are the first/last bin centres of the 1D interpolator grid. # Attribute access: detection_probability._get_or_build_grid(h)[1].grid[0] → d_L centres. _, _interp_1d = detection_probability._get_or_build_grid(h) _dl_centers = _interp_1d.grid[0] _dl_grid_min = float(_dl_centers[0]) _dl_grid_max = float(_dl_centers[-1]) # Numerator window: d_L(z_det ± 4σ) [redshift limits → d_L limits] _dl_lower_num = float( dist_vectorized(np.array([numerator_integration_lower_redshift_limit]), h=h)[0] ) _dl_upper_num = float( dist_vectorized(np.array([numerator_integration_upper_redshift_limit]), h=h)[0] ) _window_num = _dl_upper_num - _dl_lower_num if _window_num > 0.0: _below_min_num = max(0.0, min(_dl_upper_num, _dl_grid_min) - _dl_lower_num) / _window_num _above_max_num = max(0.0, _dl_upper_num - max(_dl_lower_num, _dl_grid_max)) / _window_num quadrature_weight_outside_grid_numerator = float( np.clip(_below_min_num + _above_max_num, 0.0, 1.0) ) else: quadrature_weight_outside_grid_numerator = 0.0 # Denominator window: d_L(z_gal ± 4σ_z) [redshift limits → d_L limits] _dl_lower_den = float( dist_vectorized(np.array([denominator_integration_lower_redshift_limit]), h=h)[0] ) _dl_upper_den = float( dist_vectorized(np.array([denominator_integration_upper_redshift_limit]), h=h)[0] ) _window_den = _dl_upper_den - _dl_lower_den if _window_den > 0.0: _below_min_den = max(0.0, min(_dl_upper_den, _dl_grid_min) - _dl_lower_den) / _window_den _above_max_den = max(0.0, _dl_upper_den - max(_dl_lower_den, _dl_grid_max)) / _window_den quadrature_weight_outside_grid_denominator = float( np.clip(_below_min_den + _above_max_den, 0.0, 1.0) ) else: quadrature_weight_outside_grid_denominator = 0.0 if ( quadrature_weight_outside_grid_numerator > 0.05 or quadrature_weight_outside_grid_denominator > 0.05 ): _LOGGER.warning( "Event %d: >5%% quadrature weight outside P_det grid — " "numerator=%.3f, denominator=%.3f", detection_index, quadrature_weight_outside_grid_numerator, quadrature_weight_outside_grid_denominator, ) if evaluate_with_bh_mass: galaxy_mass_normal_distribution = norm(loc=host_M, scale=host_M_error) # Pre-computed conditional distribution parameters for analytic M_z marginalization # Eqs. (14.23)-(14.28) in derivations/dark_siren_likelihood.md # Ref: Bishop (2006) PRML Eq. 2.81-2.82 (multivariate normal conditioning) _sigma2_cond = float(sigma2_cond_arr[slot]) _proj = proj_arr[slot] _mu_obs_4d = means_4d[slot] def numerator_integrant_with_bh_mass(z: npt.NDArray[np.float64]) -> Any: d_L = dist_vectorized(z, h=h) luminosity_distance_fraction = d_L / _det_d_L phi = np.full_like(z, host_phiS) theta = np.full_like(z, host_qS) # p_det evaluated at the *hypothesis*: the host candidate's # observer-frame M_z = host_M · (1+z) at integration redshift z. # This matches the rest of the integrand's hypothesis convention # (cf. `mu_gal_frac = host_M·(1+z)/_det_M` below) and the # denominator's `M_z = M·(1+z)` convention. See # docs/H0_BIAS_RESOLUTION.md §3.15 (H3 fix, Phase 47). # Ref: Mandel, Farr & Gair (2019), arXiv:1809.02063 §2 # (selection function evaluated at hypothesis parameters). p_det = detection_probability.detection_probability_with_bh_mass_interpolated( d_L, host_M * (1.0 + z), phi, theta, h=h ) # 3D marginal Gaussian: p(phi, theta, d_L_frac) # The 3D marginal is the upper-left 3x3 block of the 4D covariance gw_3d = _mvn_pdf( np.vstack([phi, theta, luminosity_distance_fraction]).T, _mean_3d, _cov_inv_3d, _log_norm_3d, ) # Conditional mean of M_z_frac given (phi_gal, theta_gal, d_L_frac) x_obs = np.vstack([phi, theta, luminosity_distance_fraction]).T # (N, 3) mu_cond = _mu_obs_4d[3] + (x_obs - _mu_obs_4d[:3]) @ _proj # (N,) # Galaxy mass in M_z_frac coordinates: M_z_frac = M_gal * (1+z) / M_z_det # Eq. (14.22) in derivations/dark_siren_likelihood.md # NOTE: (1+z) here is CORRECT -- it is the coordinate transform, not a Jacobian mu_gal_frac = host_M * (1 + z) / _det_M sigma_gal_frac = host_M_error * (1 + z) / _det_M # Analytic Gaussian product integral: # ∫ N(x; μ_cond, σ²_cond) · N(x; μ_gal, σ²_gal) dx # = N(μ_cond; μ_gal, σ²_cond + σ²_gal) # Eq. (14.31) in derivations/dark_siren_likelihood.md sigma2_sum = _sigma2_cond + sigma_gal_frac**2 mz_integral = np.exp(-0.5 * (mu_cond - mu_gal_frac) ** 2 / sigma2_sum) / np.sqrt( 2 * np.pi * sigma2_sum ) # Eq. (14.32) in derivations/dark_siren_likelihood.md # No /(1+z) factor: Jacobian absorbed by Gaussian rescaling (Eq. 14.21) return p_det * gw_3d * mz_integral * galaxy_redshift_normal_distribution.pdf(z) single_host_likelihood_numerator_with_bh_mass = fixed_quad( numerator_integrant_with_bh_mass, numerator_integration_lower_redshift_limit, numerator_integration_upper_redshift_limit, n=FIXED_QUAD_N, )[0] # Eq. (14.33) in derivations/dark_siren_likelihood.md # Denominator: p_det(d_L, M_z, phi, theta) * p_gal(z) * p_gal(M) # No GW likelihood, no mz_integral, no /(1+z) -- confirmed correct by Phase 14 def denominator_integrant_with_bh_mass_vectorized( M: npt.NDArray[np.float64], z: npt.NDArray[np.float64] ) -> Any: d_L = dist_vectorized(z, h=h) M_z = M * (1 + z) phi = np.full_like(M, host_phiS) theta = np.full_like(M, host_qS) # p_det in observer-frame M_z (consistent with grid axis and # the numerator's host_M·(1+z) hypothesis). p_det = detection_probability.detection_probability_with_bh_mass_interpolated( d_L, M_z, phi, theta, h=h ) return ( p_det * galaxy_redshift_normal_distribution.pdf(z) * galaxy_mass_normal_distribution.pdf(M) ) # MC importance sampling for 2D denominator integral over (z, M). # Proposal distribution: q(z, M) = p_gal(z) * p_gal(M). # After cancellation, weights = p_det (each in [0, 1]). # Relative MC error ~ std(p_det) / (sqrt(N) * mean(p_det)) ~ 1% for N=10000. # Numerator uses fixed_quad (1D over z, mass analytically marginalized) -- # the quadrature-vs-MC asymmetry is a numerical choice, not a physics error. N_SAMPLES = 10_000 z_samples = galaxy_redshift_normal_distribution.rvs(size=N_SAMPLES) M_samples = galaxy_mass_normal_distribution.rvs(size=N_SAMPLES) numerator_integrant_from_samples = denominator_integrant_with_bh_mass_vectorized( M_samples, z_samples ) sampling_pdf = galaxy_redshift_normal_distribution.pdf( z_samples ) * galaxy_mass_normal_distribution.pdf(M_samples) weights = numerator_integrant_from_samples / sampling_pdf single_host_likelihood_denominator_with_bh_mass = np.mean(weights) return [ single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass, single_host_likelihood_numerator_with_bh_mass, single_host_likelihood_denominator_with_bh_mass, quadrature_weight_outside_grid_numerator, quadrature_weight_outside_grid_denominator, ] return [ single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass, quadrature_weight_outside_grid_numerator, quadrature_weight_outside_grid_denominator, ]
[docs] def single_host_likelihood_integration_testing( possible_host: HostGalaxy, detection: Detection, detection_index: int, h: float, evaluate_with_bh_mass: bool, ) -> list[float]: global redshift_upper_integration_limit global redshift_lower_integration_limit global bh_mass_upper_integration_limit global bh_mass_lower_integration_limit global detection_probability global detection_likelihood_gaussians_by_detection_index ABS_ERROR = 1e-20 # construct normal distribution for redshift and mass for host galaxy galaxy_redshift_normal_distribution = norm(loc=possible_host.z, scale=possible_host.z_error) # Sky localization weight (phi, theta) is inside the GW likelihood Gaussian. # Verified correct by Phase 14 derivation (Sec. 2.7) -- not a source of error. def numerator_integrant_without_bh_mass(z: float) -> float: d_L = dist(z, h=h) luminosity_distance_fraction = d_L / detection.d_L # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: shared p_det function # with D(h) denominator (STAT-03 symmetry). Phase 44 boundary convention: # NN-fill below first bin, zero above injection horizon. return float( detection_probability.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, possible_host.phiS, possible_host.qS, h=h ) * detection_likelihood_gaussians_by_detection_index[detection_index][0].pdf( [possible_host.phiS, possible_host.qS, luminosity_distance_fraction] ) * galaxy_redshift_normal_distribution.pdf(z) ) def denominator_integrant_without_bh_mass(z: float) -> float: d_L = dist(z, h=h) # Gray et al. (2020), arXiv:1908.06050, Eq. A.19: shared p_det function # with D(h) denominator (STAT-03 symmetry). Phase 44 boundary convention: # NN-fill below first bin, zero above injection horizon. return float( detection_probability.detection_probability_without_bh_mass_interpolated_zero_fill( d_L, possible_host.phiS, possible_host.qS, h=h ) * galaxy_redshift_normal_distribution.pdf(z) ) ( single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_numerator_without_bh_mass_error, ) = quad( numerator_integrant_without_bh_mass, redshift_lower_integration_limit, redshift_upper_integration_limit, epsabs=ABS_ERROR, ) ( single_host_likelihood_denominator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass_error, ) = quad( denominator_integrant_without_bh_mass, redshift_lower_integration_limit, redshift_upper_integration_limit, epsabs=ABS_ERROR, ) print( f"Numerator without bh m:{single_host_likelihood_numerator_without_bh_mass}, error estimation: {single_host_likelihood_numerator_without_bh_mass_error}", flush=True, ) print( f"Denominator without bh m:{single_host_likelihood_denominator_without_bh_mass}, error estimation {single_host_likelihood_denominator_without_bh_mass_error}", flush=True, ) if evaluate_with_bh_mass: galaxy_mass_normal_distribution = norm(loc=possible_host.M, scale=possible_host.M_error) """ # double integral version def numerator_integrant_with_bh_mass(M: float, z: float) -> float: d_L = dist(z, h=h) M_z = M * (1 + z) luminosity_distance_fraction = d_L / detection.d_L redshifted_mass_fraction = M_z / detection.M return ( detection_probability.detection_probability_with_bh_mass_interpolated( d_L, M_z, possible_host.phiS, possible_host.qS, h=h ) * detection_likelihood_gaussians_by_detection_index[ detection_index ][1].pdf( [possible_host.phiS, possible_host.qS, luminosity_distance_fraction, redshifted_mass_fraction] ) * galaxy_redshift_normal_distribution.pdf(z) * galaxy_mass_normal_distribution.pdf(M) ) def denominator_integrant_with_bh_mass(M: float, z: float) -> float: d_L = dist(z, h=h) M_z = M * (1 + z) return ( detection_probability.detection_probability_with_bh_mass_interpolated( d_L, M_z, possible_host.phiS, possible_host.qS, h=h ) * galaxy_redshift_normal_distribution.pdf(z) * galaxy_mass_normal_distribution.pdf(M) ) start = time.time() single_host_likelihood_numerator_with_bh_mass, single_host_likelihood_numerator_without_bh_mass_error = dblquad( numerator_integrant_with_bh_mass, redshift_lower_integration_limit, redshift_upper_integration_limit, lambda z: bh_mass_lower_integration_limit, lambda z: bh_mass_upper_integration_limit, epsabs=ABS_ERROR ) single_host_likelihood_denominator_with_bh_mass, single_host_likelihood_denominator_with_bh_mass_error = dblquad( denominator_integrant_with_bh_mass, redshift_lower_integration_limit, redshift_upper_integration_limit, lambda m: bh_mass_lower_integration_limit, lambda m: bh_mass_upper_integration_limit, epsabs=ABS_ERROR ) end = time.time() print(f"Time taken for double integral: {end - start}", flush=True) print(f"Numerator with bh m:{single_host_likelihood_numerator_with_bh_mass}, error estimation: {single_host_likelihood_numerator_without_bh_mass_error}", flush=True) print(f"Denominator with bh m:{single_host_likelihood_denominator_with_bh_mass}, error estimation {single_host_likelihood_denominator_with_bh_mass_error}", flush=True) """ # Analytic marginalization over M_z_frac (same as production path) # Ref: Bishop (2006) PRML Eq. 2.81-2.82 gaussian_4d_test = detection_likelihood_gaussians_by_detection_index[detection_index][1] cov_4d_test = np.asarray(gaussian_4d_test.cov) mu_obs_4d_test = np.asarray(gaussian_4d_test.mean) cov_obs_test = cov_4d_test[:3, :3] cov_cross_test = cov_4d_test[3, :3] cov_mz_test = cov_4d_test[3, 3] cov_obs_inv_test = np.linalg.pinv(cov_obs_test) sigma2_cond_test = float(cov_mz_test - cov_cross_test @ cov_obs_inv_test @ cov_cross_test) sigma2_cond_test = max(sigma2_cond_test, 1e-30) proj_test = cov_cross_test @ cov_obs_inv_test try: gaussian_3d_marginal_test = multivariate_normal( mean=mu_obs_4d_test[:3], cov=cov_obs_test ) except np.linalg.LinAlgError: _LOGGER.warning( "Testing path: degenerate 3D covariance for detection %d — skipping", detection_index, ) return [0.0] def numerator_integrant_with_bh_mass(z: float) -> float: d_L = dist(z, h=h) luminosity_distance_fraction = d_L / detection.d_L x_obs_test = np.array( [possible_host.phiS, possible_host.qS, luminosity_distance_fraction] ) gw_3d = float(gaussian_3d_marginal_test.pdf(x_obs_test)) mu_cond = float(mu_obs_4d_test[3] + proj_test @ (x_obs_test - mu_obs_4d_test[:3])) mu_gal_frac = possible_host.M * (1 + z) / detection.M sigma_gal_frac = possible_host.M_error * (1 + z) / detection.M sigma2_sum = sigma2_cond_test + sigma_gal_frac**2 mz_integral = float( np.exp(-0.5 * (mu_cond - mu_gal_frac) ** 2 / sigma2_sum) / np.sqrt(2 * np.pi * sigma2_sum) ) # Eq. (14.32) in derivations/dark_siren_likelihood.md # No /(1+z) factor: Jacobian absorbed by Gaussian rescaling (Eq. 14.21) return float( detection_probability.detection_probability_with_bh_mass_interpolated( d_L, detection.M, possible_host.phiS, possible_host.qS, h=h ) * gw_3d * mz_integral * galaxy_redshift_normal_distribution.pdf(z) ) def denominator_integrant_with_bh_mass(M: float, z: float) -> float: d_L = dist(z, h=h) M_z = M * (1 + z) return float( detection_probability.detection_probability_with_bh_mass_interpolated( d_L, M_z, possible_host.phiS, possible_host.qS, h=h ) * galaxy_redshift_normal_distribution.pdf(z) * galaxy_mass_normal_distribution.pdf(M) ) start = time.time() ( single_host_likelihood_numerator_with_bh_mass, single_host_likelihood_numerator_with_bh_mass_error, ) = quad( numerator_integrant_with_bh_mass, redshift_lower_integration_limit, redshift_upper_integration_limit, epsabs=ABS_ERROR, ) ( single_host_likelihood_denominator_with_bh_mass, single_host_likelihood_denominator_with_bh_mass_error, ) = dblquad( denominator_integrant_with_bh_mass, galaxy_redshift_normal_distribution.mean() - 5 * galaxy_redshift_normal_distribution.std(), galaxy_redshift_normal_distribution.mean() + 5 * galaxy_redshift_normal_distribution.std(), lambda m: ( galaxy_mass_normal_distribution.mean() - 5 * galaxy_mass_normal_distribution.std() ), lambda m: ( galaxy_mass_normal_distribution.mean() + 5 * galaxy_mass_normal_distribution.std() ), epsabs=ABS_ERROR, ) end = time.time() print(f"Time taken for delta function approximation: {end - start}s", flush=True) print( f"Numerator with bh m:{single_host_likelihood_numerator_with_bh_mass}, error estimation: {single_host_likelihood_numerator_with_bh_mass_error}", flush=True, ) print( f"Denominator with bh m:{single_host_likelihood_denominator_with_bh_mass}, error estimation {single_host_likelihood_denominator_with_bh_mass_error}", flush=True, ) # monte carlo integration denominator 2D start = time.time() def denominator_integrant_with_bh_mass_vectorized( M: npt.NDArray[np.float64], z: npt.NDArray[np.float64] ) -> Any: d_L = dist_vectorized(z, h=h) M_z = M * (1 + z) phi = np.ones_like(M) * possible_host.phiS theta = np.ones_like(M) * possible_host.qS return ( detection_probability.detection_probability_with_bh_mass_interpolated( d_L, M_z, phi, theta, h=h ) * galaxy_redshift_normal_distribution.pdf(z) * galaxy_mass_normal_distribution.pdf(M) ) N_SAMPLES = 100_00 z_samples = galaxy_redshift_normal_distribution.rvs(size=N_SAMPLES) M_samples = galaxy_mass_normal_distribution.rvs(size=N_SAMPLES) numerator_integrant_from_samples = denominator_integrant_with_bh_mass_vectorized( M_samples, z_samples ) sampling_pdf = galaxy_redshift_normal_distribution.pdf( z_samples ) * galaxy_mass_normal_distribution.pdf(M_samples) weights = numerator_integrant_from_samples / sampling_pdf integral = np.mean(weights) integral_error = np.std(weights) / np.sqrt(N_SAMPLES) end = time.time() print(f"Time taken for monte carlo integration: {end - start}s", flush=True) print( f"Monte Carlo denominator integral with bh mass: {integral}, error estimation: {integral_error}", flush=True, ) print( f"Integration difference: {abs(single_host_likelihood_denominator_with_bh_mass - integral)}", flush=True, ) return [ single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass, single_host_likelihood_numerator_with_bh_mass, single_host_likelihood_denominator_with_bh_mass, ] return [ single_host_likelihood_numerator_without_bh_mass, single_host_likelihood_denominator_without_bh_mass, ]
[docs] def child_process_init( redshift_lower_limit: float, redshift_upper_limit: float, bh_mass_lower_limit: float, bh_mass_upper_limit: float, current_detection_probability: SimulationDetectionProbability, current_means_3d: npt.NDArray[np.float64], current_cov_inv_3d: npt.NDArray[np.float64], current_log_norm_3d: npt.NDArray[np.float64], current_means_4d: npt.NDArray[np.float64], current_cov_inv_4d: npt.NDArray[np.float64], current_log_norm_4d: npt.NDArray[np.float64], current_det_index_to_slot: dict[int, int], current_sigma2_cond_arr: npt.NDArray[np.float64], current_proj_arr: npt.NDArray[np.float64], current_det_d_L_arr: npt.NDArray[np.float64], current_det_d_L_unc_arr: npt.NDArray[np.float64], current_det_M_arr: npt.NDArray[np.float64], current_det_phi_arr: npt.NDArray[np.float64], current_det_theta_arr: npt.NDArray[np.float64], current_D_h_table: dict[float, float] | None = None, ) -> None: global redshift_upper_integration_limit global redshift_lower_integration_limit global bh_mass_upper_integration_limit global bh_mass_lower_integration_limit global detection_probability global means_3d, cov_inv_3d, log_norm_3d global means_4d, cov_inv_4d, log_norm_4d global det_index_to_slot global sigma2_cond_arr, proj_arr global det_d_L_arr, det_d_L_unc_arr, det_M_arr, det_phi_arr, det_theta_arr global D_h_table redshift_upper_integration_limit = redshift_upper_limit redshift_lower_integration_limit = redshift_lower_limit bh_mass_upper_integration_limit = bh_mass_upper_limit bh_mass_lower_integration_limit = bh_mass_lower_limit detection_probability = current_detection_probability means_3d = current_means_3d cov_inv_3d = current_cov_inv_3d log_norm_3d = current_log_norm_3d means_4d = current_means_4d cov_inv_4d = current_cov_inv_4d log_norm_4d = current_log_norm_4d det_index_to_slot = current_det_index_to_slot sigma2_cond_arr = current_sigma2_cond_arr proj_arr = current_proj_arr det_d_L_arr = current_det_d_L_arr det_d_L_unc_arr = current_det_d_L_unc_arr det_M_arr = current_det_M_arr det_phi_arr = current_det_phi_arr det_theta_arr = current_det_theta_arr if current_D_h_table is not None: D_h_table = current_D_h_table
def _get_closest_possible_host( detection: Detection, possible_hosts: list[HostGalaxy] ) -> HostGalaxy: distances = [ _distance_spherical_coordinates( phi1=detection.phi, theta1=detection.theta, phi2=host.phiS, theta2=host.qS, ) for host in possible_hosts ] return possible_hosts[int(np.argmin(distances))] def _distance_spherical_coordinates( phi1: float, theta1: float, phi2: float, theta2: float ) -> float: return float( np.arccos( np.sin(theta1) * np.sin(theta2) + np.cos(theta1) * np.cos(theta2) * np.cos(phi1 - phi2) ) )
[docs] def compute_sigma_deviation( sigma: float, sigma_error: float, h_mean: float, h_mean_error: float ) -> tuple[float, float]: sigma_dev = (h_mean - H) / sigma sigma_dev_error = float(np.sqrt((sigma_error * sigma_dev) ** 2 + (h_mean_error) ** 2) / sigma) return sigma_dev, sigma_dev_error