Source code for master_thesis_code.bayesian_inference.posterior_combination

"""Posterior combination module for merging per-event likelihoods.

Combines per-h-value posterior JSON files from the evaluation campaign into
a joint posterior over the Hubble constant, with multiple zero-handling
strategies and diagnostic reporting.

Four zero-handling strategies:
- **naive**: Replace 0.0 with ``np.finfo(float).tiny`` before log.
- **exclude**: Remove events that have any zero likelihood.
- **per-event-floor**: Replace 0.0 with ``min(nonzero) / 100`` per event.
- **physics-floor**: Per-event minimum nonzero likelihood as floor value; all-zero events excluded.
"""

from __future__ import annotations

import json
import logging
import re
from enum import StrEnum
from pathlib import Path

import numpy as np
import numpy.typing as npt

logger = logging.getLogger(__name__)


[docs] class CombinationStrategy(StrEnum): """Zero-handling strategy for posterior combination.""" NAIVE = "naive" EXCLUDE = "exclude" PER_EVENT_FLOOR = "per-event-floor" PHYSICS_FLOOR = "physics-floor"
# --------------------------------------------------------------------------- # Loading # ---------------------------------------------------------------------------
[docs] def load_posterior_jsons( posteriors_dir: Path, ) -> tuple[list[float], dict[int, dict[float, float]]]: """Load per-h-value posterior JSON files from a directory. Parameters ---------- posteriors_dir : Path Directory containing ``h_*.json`` files. Returns ------- h_values : list[float] Sorted list of h values found across all files. event_likelihoods : dict[int, dict[float, float]] Nested dict mapping ``detection_index -> {h_value: likelihood}``. Events with empty lists (missing data) are excluded. """ json_files = sorted(posteriors_dir.glob("h_*.json")) if not json_files: msg = f"No h_*.json files found in {posteriors_dir}" raise FileNotFoundError(msg) h_values_set: set[float] = set() # detection_index -> {h_value: likelihood} event_likelihoods: dict[int, dict[float, float]] = {} for path in json_files: with open(path) as f: data: dict[str, list[float] | float] = json.load(f) h_raw = data["h"] h_val = float(h_raw) # type: ignore[arg-type] h_values_set.add(h_val) for key, value in data.items(): if key == "h": continue try: detection_idx = int(key) except ValueError: continue # skip non-integer keys (e.g. "galaxy_likelihoods") # Empty list means missing event — skip if not isinstance(value, list) or len(value) == 0: continue likelihood = float(value[0]) if detection_idx not in event_likelihoods: event_likelihoods[detection_idx] = {} event_likelihoods[detection_idx][h_val] = likelihood h_values = sorted(h_values_set) logger.info( "Loaded %d JSON files: %d h-values, %d events with data", len(json_files), len(h_values), len(event_likelihoods), ) return h_values, event_likelihoods
# --------------------------------------------------------------------------- # Array construction # ---------------------------------------------------------------------------
[docs] def build_likelihood_array( h_values: list[float], event_likelihoods: dict[int, dict[float, float]], ) -> tuple[npt.NDArray[np.float64], list[int]]: """Build a 2-D likelihood array from the nested dict structure. Parameters ---------- h_values : list[float] Sorted h-values (columns). event_likelihoods : dict[int, dict[float, float]] Nested dict from :func:`load_posterior_jsons`. Returns ------- array : ndarray of shape ``(n_events, n_h_values)`` Likelihood values. ``NaN`` where an event is missing a particular h-value; ``0.0`` where the likelihood was explicitly zero. detection_indices : list[int] Sorted detection indices (row labels). """ detection_indices = sorted(event_likelihoods.keys()) n_events = len(detection_indices) n_h = len(h_values) array = np.full((n_events, n_h), np.nan, dtype=np.float64) for row, det_idx in enumerate(detection_indices): h_map = event_likelihoods[det_idx] for col, h in enumerate(h_values): if h in h_map: array[row, col] = h_map[h] logger.info("Built likelihood array: %d events x %d h-bins", n_events, n_h) return array, detection_indices
# --------------------------------------------------------------------------- # Zero-handling strategies # ---------------------------------------------------------------------------
[docs] def apply_strategy( likelihoods: npt.NDArray[np.float64], strategy: CombinationStrategy, ) -> tuple[npt.NDArray[np.float64], int]: """Apply a zero-handling strategy to the likelihood array. Parameters ---------- likelihoods : ndarray of shape ``(n_events, n_h_values)`` Raw likelihood array (may contain 0.0 entries). strategy : CombinationStrategy Which strategy to apply. Returns ------- processed : ndarray Likelihood array with zeros handled. excluded_count : int Number of events removed (0 for strategies that keep all events). """ result = likelihoods.copy() if strategy == CombinationStrategy.NAIVE: result[result == 0.0] = np.finfo(float).tiny return result, 0 if strategy == CombinationStrategy.EXCLUDE: return _exclude_zero_events(result) if strategy == CombinationStrategy.PER_EVENT_FLOOR: return _per_event_floor(result) if strategy == CombinationStrategy.PHYSICS_FLOOR: return _physics_floor(result) msg = f"Unknown strategy: {strategy}" raise ValueError(msg)
def _exclude_zero_events( likelihoods: npt.NDArray[np.float64], ) -> tuple[npt.NDArray[np.float64], int]: """Remove rows that contain any exact zero.""" has_zero = np.any(likelihoods == 0.0, axis=1) excluded = int(np.sum(has_zero)) kept = likelihoods[~has_zero] logger.info("Exclude strategy: removed %d events with zeros", excluded) return kept, excluded def _per_event_floor( likelihoods: npt.NDArray[np.float64], ) -> tuple[npt.NDArray[np.float64], int]: """Replace zeros with min(nonzero values in that row) / 100.""" result = likelihoods.copy() for i in range(result.shape[0]): row = result[i] zero_mask = row == 0.0 if not np.any(zero_mask): continue nonzero_vals = row[~zero_mask & ~np.isnan(row)] if len(nonzero_vals) == 0: # All-zero event: use tiny result[i, zero_mask] = np.finfo(float).tiny else: floor = float(np.min(nonzero_vals)) / 100.0 result[i, zero_mask] = floor return result, 0 def _physics_floor( likelihoods: npt.NDArray[np.float64], ) -> tuple[npt.NDArray[np.float64], int]: """Replace zeros with the per-event minimum nonzero likelihood. For each event (row), zeros are replaced with the smallest nonzero likelihood value in that row. Events that are entirely zero have no nonzero value to use as a floor and are excluded instead. Parameters ---------- likelihoods : ndarray of shape ``(n_events, n_h_values)`` Likelihood array (may contain 0.0 entries). Returns ------- processed : ndarray Likelihood array with zeros replaced by per-event min-nonzero floor. excluded_count : int Number of all-zero events that were excluded. """ result = likelihoods.copy() exclude_mask = np.zeros(result.shape[0], dtype=bool) for i in range(result.shape[0]): row = result[i] zero_mask = row == 0.0 if not np.any(zero_mask): continue nonzero_vals = row[~zero_mask & ~np.isnan(row)] n_zeros = int(np.sum(zero_mask)) n_bins = len(row) if len(nonzero_vals) == 0: # All-zero event: no nonzero value to derive floor from logger.warning( "Physics floor: event row %d has all-zero likelihoods, " "excluding (no nonzero value for floor)", i, ) exclude_mask[i] = True else: floor_value = float(np.min(nonzero_vals)) result[i, zero_mask] = floor_value logger.info( "Physics floor: event row %d: floored %d of %d bins with value %.6e", i, n_zeros, n_bins, floor_value, ) excluded = int(np.sum(exclude_mask)) kept = result[~exclude_mask] if excluded > 0: logger.info("Physics floor: excluded %d all-zero events", excluded) return kept, excluded # --------------------------------------------------------------------------- # Log-space combination # ---------------------------------------------------------------------------
[docs] def combine_log_space( likelihoods: npt.NDArray[np.float64], log_D_h: npt.NDArray[np.float64] | None = None, # noqa: ARG001 (kept for backward compat / diagnostics) n_events_used: int = 0, # noqa: ARG001 ) -> npt.NDArray[np.float64]: """Combine per-event likelihoods into a joint posterior using log-space. The per-event likelihoods :math:`L_i(h)` are already conditional on detection: ``L_comp = num/D(h)`` (Gray et al. 2020 Eq. 31, where ``D(h)`` plays the role of the prior normalization for ``p_galaxy ∝ p_det · dV_c``) and ``L_cat = (1/N_g) Σ_g (N_g_local/D_g_local)`` (per-galaxy detection-conditional expectation). The joint posterior is therefore just ``Π_i L_i(h)`` with no additional ``β(h)^N`` selection correction (Loredo 2004; Mandel et al. 2019, arXiv:1809.02063 §3): conditioning on the observed N_detected makes the β^N factor in the unconditional likelihood cancel against the Poisson rate prior. The ``log_D_h`` and ``n_events_used`` arguments are retained for backward compatibility with older callers, but are now **ignored** — applying ``−N · log D(h)`` here would double-count the selection correction already inside each per-event ``L_comp``. Parameters ---------- likelihoods : ndarray of shape ``(n_events, n_h_values)`` Likelihood array with zeros already handled (all values > 0). log_D_h : ndarray of shape ``(n_h_values,)`` or None Ignored. Kept for backward compatibility; pass ``None`` for new callers. Phase 43-H1 commit ``2853c32`` introduced an ``−N · log D(h)`` subtraction here that we have since shown (Tier 3 audit, 2026-05-04) over-applies the selection correction. n_events_used : int Ignored. Kept for backward compatibility. Returns ------- posterior : ndarray of shape ``(n_h_values,)`` Normalized joint posterior. """ log_likes = np.log(likelihoods) joint_log = np.sum(log_likes, axis=0) max_log = np.max(joint_log) posterior = np.exp(joint_log - max_log) posterior = posterior / np.sum(posterior) return np.asarray(posterior, dtype=np.float64)
# --------------------------------------------------------------------------- # Diagnostic report # ---------------------------------------------------------------------------
[docs] def generate_diagnostic_report( h_values: list[float], likelihoods: npt.NDArray[np.float64], detection_indices: list[int], ) -> str: """Generate a markdown diagnostic report about zero-likelihood events. Parameters ---------- h_values : list[float] Sorted h-values. likelihoods : ndarray of shape ``(n_events, n_h_values)`` Raw likelihood array (before strategy application). detection_indices : list[int] Detection indices corresponding to rows. Returns ------- report : str Markdown-formatted diagnostic report. """ n_events = likelihoods.shape[0] # Identify zero events zero_det_indices: list[int] = [] zero_h_bins: list[list[float]] = [] zero_patterns: list[str] = [] for i in range(n_events): row = likelihoods[i] bins = [h_values[j] for j in range(len(h_values)) if row[j] == 0.0] if bins: all_zero = len(bins) == len(h_values) low_h_only = all(h <= 0.70 for h in bins) and not all_zero if all_zero: pattern = "all-zeros" elif low_h_only: pattern = "low-h-only" else: pattern = "partial-zeros" zero_det_indices.append(detection_indices[i]) zero_h_bins.append(bins) zero_patterns.append(pattern) # Count NaN rows (empty events in the original data) nan_rows = int(np.sum(np.all(np.isnan(likelihoods), axis=1))) all_zero_count = sum(1 for p in zero_patterns if p == "all-zeros") # Zero distribution by h-bin zeros_per_h: dict[float, int] = {} for j, h in enumerate(h_values): zeros_per_h[h] = int(np.sum(likelihoods[:, j] == 0.0)) lines: list[str] = [] lines.append("# Zero-Likelihood Diagnostic Report\n") lines.append("## Summary\n") lines.append(f"- **Total events:** {n_events}") lines.append(f"- **Events with zeros:** {len(zero_det_indices)}") lines.append(f"- **Empty events (all NaN):** {nan_rows}") lines.append(f"- **All-zeros events:** {all_zero_count}") lines.append("") lines.append("## Zero-Event Detail\n") if zero_det_indices: lines.append("| Detection Index | Zero h-bins | Pattern |") lines.append("|---|---|---|") for det_idx, bins, pat in zip(zero_det_indices, zero_h_bins, zero_patterns): bins_str = ", ".join(f"{h:.2f}" for h in bins) lines.append(f"| {det_idx} | {bins_str} | {pat} |") else: lines.append("No events with zero likelihoods.") lines.append("") lines.append("## Zero Distribution by h-bin\n") lines.append("| h-value | Number of zeros |") lines.append("|---|---|") for h, count in sorted(zeros_per_h.items()): lines.append(f"| {h:.2f} | {count} |") lines.append("") lines.append("## Root Cause Analysis\n") lines.append( "Zero likelihoods arise when a detection event has no compatible " "host galaxy at the given Hubble constant value. " "**All-zeros** events have no viable host at any h-value, indicating " "the event lies outside the galaxy catalog coverage entirely. " "**Low-h-only** zeros occur at smaller h-values where the implied " "luminosity distance pushes the source beyond the catalog's redshift " "completeness boundary. " "**Partial-zeros** arise at coverage boundaries where the galaxy " "catalog transitions between complete and incomplete." ) lines.append("") lines.append("## Impact on Posterior\n") lines.append( "Under naive multiplication, a single zero at any h-bin drives the " "entire joint posterior to zero at that bin. With " f"{len(zero_det_indices)} zero-events, the naive posterior is dominated " "by the *absence* of catalog coverage rather than the actual " "likelihood information. Log-space combination with zero-handling " "strategies mitigates this." ) return "\n".join(lines)
# --------------------------------------------------------------------------- # Comparison table # ---------------------------------------------------------------------------
[docs] def generate_comparison_table( h_values: npt.NDArray[np.float64], likelihoods: npt.NDArray[np.float64], detection_indices: list[int], variant: str, log_D_h: npt.NDArray[np.float64] | None = None, ) -> str: """Generate a markdown comparison table for all strategies. Parameters ---------- h_values : ndarray Sorted h-values. likelihoods : ndarray of shape ``(n_events, n_h_values)`` Raw likelihood array. detection_indices : list[int] Detection indices. variant : str Name of the posterior variant (e.g. ``"posteriors"``). log_D_h : ndarray of shape ``(n_h_values,)`` or None Ignored (kept for backward compatibility). See ``combine_log_space`` docstring — D(h) enters via L_comp = num/D inside each per-event likelihood (Gray Eq. 31), not as an outer correction. Returns ------- table : str Markdown-formatted comparison table. """ n_total = likelihoods.shape[0] h_arr = np.asarray(h_values, dtype=np.float64) lines: list[str] = [] lines.append("# Posterior Combination Method Comparison\n") lines.append(f"## Variant: {variant}\n") lines.append("| Strategy | Events Used | Events Excluded | MAP h | MAP Posterior Value |") lines.append("|---|---|---|---|---|") for strat in CombinationStrategy: processed, excluded = apply_strategy(likelihoods, strat) n_used = n_total - excluded if processed.shape[0] == 0: lines.append(f"| {strat.value} | 0 | {excluded} | N/A | N/A |") continue posterior = combine_log_space(processed, log_D_h=log_D_h, n_events_used=n_used) map_idx = int(np.argmax(posterior)) map_h = float(h_arr[map_idx]) map_val = float(posterior[map_idx]) lines.append(f"| {strat.value} | {n_used} | {excluded} | {map_h:.4f} | {map_val:.6f} |") return "\n".join(lines)
# --------------------------------------------------------------------------- # Main entry point # ---------------------------------------------------------------------------
[docs] def combine_posteriors( posteriors_dir: str, strategy: str, output_dir: str, d_h_table: dict[float, float] | None = None, ) -> dict[str, object]: """Combine per-event posteriors into a joint posterior. This is the main entry point called from CLI. Parameters ---------- posteriors_dir : str Path to directory containing ``h_*.json`` files. strategy : str Zero-handling strategy name (one of the ``CombinationStrategy`` values). output_dir : str Path to write output files. d_h_table : dict[float, float] or None Pre-computed ``{h: D(h)}`` mapping (Gray et al. 2020, Eq. A.19). When ``None``, ``D(h)`` is computed automatically using :func:`~master_thesis_code.bayesian_inference.bayesian_statistics.precompute_completion_denominator`. Returns ------- result : dict Combined posterior result with keys ``h_values``, ``posterior``, ``strategy``, ``n_events_total``, ``n_events_used``, ``n_events_excluded``, ``n_events_empty``, ``map_h``, ``map_posterior``, ``variant``, ``D_h_per_h``. """ posteriors_path = Path(posteriors_dir) output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) # Parse strategy strat = CombinationStrategy(strategy) effective_strategy = strat # Load and build array h_values, event_likelihoods = load_posterior_jsons(posteriors_path) likelihoods, detection_indices = build_likelihood_array(h_values, event_likelihoods) n_total = likelihoods.shape[0] # Count truly empty events (those that didn't make it into event_likelihoods # are not in the array; we count by checking the original JSONs) json_files = sorted(posteriors_path.glob("h_*.json")) all_keys: set[str] = set() for jf in json_files: with open(jf) as f: data = json.load(f) all_keys.update(k for k in data if k != "h") n_empty = len(all_keys) - n_total # Compute or validate D(h) selection-function denominator # Gray et al. (2020), arXiv:1908.06050, Eq. A.19. if d_h_table is None: # Lazy import: avoids pulling heavy scipy/pandas/astropy into this # module's top-level import at the cost of a slightly slower first call. from master_thesis_code.bayesian_inference.bayesian_statistics import ( # noqa: PLC0415 precompute_completion_denominator, ) from master_thesis_code.bayesian_inference.simulation_detection_probability import ( # noqa: PLC0415 SimulationDetectionProbability, ) from master_thesis_code.constants import ( # noqa: PLC0415 INJECTION_DATA_DIR, OMEGA_DE, OMEGA_M, SNR_THRESHOLD, ) detection_probability = SimulationDetectionProbability( injection_data_dir=INJECTION_DATA_DIR, snr_threshold=SNR_THRESHOLD, ) for h in h_values: detection_probability._get_or_build_grid(h) d_h_table = precompute_completion_denominator( h_values=h_values, detection_probability_obj=detection_probability, Omega_m=OMEGA_M, Omega_DE=OMEGA_DE, ) D_h_array = np.array([d_h_table[h] for h in h_values], dtype=np.float64) log_D_h = np.log(D_h_array) logger.info( "D(h) range: %.4e%.4e (ratio %.2fx)", float(np.min(D_h_array)), float(np.max(D_h_array)), float(np.max(D_h_array) / max(float(np.min(D_h_array)), 1e-300)), ) # Apply strategy processed, n_excluded = apply_strategy(likelihoods, effective_strategy) n_used = n_total - n_excluded # Combine with Gray Eq. A.19 selection-function correction posterior = combine_log_space(processed, log_D_h=log_D_h, n_events_used=n_used) # MAP estimate h_arr = np.array(h_values, dtype=np.float64) map_idx = int(np.argmax(posterior)) map_h = float(h_arr[map_idx]) map_posterior = float(posterior[map_idx]) variant = posteriors_path.name logger.info( "Combined %d events (excluded %d, empty %d) with strategy '%s': MAP h=%.4f", n_used, n_excluded, n_empty, effective_strategy.value, map_h, ) # Generate reports diag_report = generate_diagnostic_report(h_values, likelihoods, detection_indices) (output_path / "diagnostic_report.md").write_text(diag_report) comp_table = generate_comparison_table( h_arr, likelihoods, detection_indices, variant, log_D_h=log_D_h ) (output_path / "comparison_table.md").write_text(comp_table) # Build result result: dict[str, object] = { "h_values": [float(h) for h in h_values], "posterior": [float(p) for p in posterior], "strategy": effective_strategy.value, "n_events_total": n_total, "n_events_used": n_used, "n_events_excluded": n_excluded, "n_events_empty": n_empty, "map_h": map_h, "map_posterior": map_posterior, "variant": variant, "D_h_per_h": [float(d) for d in D_h_array], } # Write combined posterior JSON with open(output_path / "combined_posterior.json", "w") as f: json.dump(result, f, indent=2) return result
# --------------------------------------------------------------------------- # Canonical combination (raw Σ log L) — paper-grade reference implementation # --------------------------------------------------------------------------- # # The functions below are the canonical "raw Σ log L" combination used by # the bias-investigation suite (test_24_multi_truth_bias_sweep.py, # test_28_production_finegrid_analyze.py) and quoted throughout # docs/H0_BIAS_RESOLUTION.md. They are the source-of-truth combination # for all plotting code; figures must consume these helpers rather than # `combine_posteriors` (which applies a `physics-floor` zero-handling # strategy that biases the MAP relative to raw Σ log L). _H_FILENAME_RE = re.compile(r"h_(\d+)_(\d+)\.json") def _h_from_filename(path: Path) -> float: """Parse the h-value out of an ``h_<int>_<int>.json`` filename. Returns ``nan`` if the filename does not match the expected pattern. """ m = _H_FILENAME_RE.match(path.name) if m is None: return float("nan") return float(f"{m.group(1)}.{m.group(2)}")
[docs] def load_per_h_likelihoods( directory: Path, ) -> tuple[list[float], npt.NDArray[np.float64]]: """Load per-event log-likelihoods across all h-value JSONs in *directory*. Reads every ``h_*.json`` file, extracts the scalar per-event likelihood for each integer event key, drops events that have missing data at any h-value (``full_mask`` filter), and returns ``(h_values, log_L)`` where ``log_L`` has shape ``(n_events_full, n_h_values)``. This is the reference implementation used by the bias-investigation suite and is the canonical loader for all paper-grade H0 posterior figures. Parameters ---------- directory: Path to a directory containing ``h_*.json`` files (typically ``posteriors/`` or ``posteriors_with_bh_mass/``). Returns ------- h_values: Sorted list of h-values (one per JSON file). log_L: Float64 array of shape ``(n_events, n_h_values)``; entries are ``log(max(L_event_at_h, 1e-300))``. Events with NaN at any h are excluded. """ if not directory.exists(): return [], np.empty((0, 0), dtype=np.float64) files = sorted(directory.glob("h_*.json"), key=_h_from_filename) h_values: list[float] = [_h_from_filename(f) for f in files] event_indices: set[int] = set() per_h_data: list[dict[int, float]] = [] for f in files: with open(f) as fh: d = json.load(fh) per_h: dict[int, float] = {} for k, v in d.items(): try: ev = int(k) except (TypeError, ValueError): continue if isinstance(v, list): if len(v) == 0: continue val = v[0] else: val = v event_indices.add(ev) per_h[ev] = float(val) per_h_data.append(per_h) events_sorted = sorted(event_indices) log_L = np.full((len(events_sorted), len(h_values)), np.nan, dtype=np.float64) for j, per_h in enumerate(per_h_data): for i, ev in enumerate(events_sorted): if ev in per_h: log_L[i, j] = float(np.log(max(per_h[ev], 1e-300))) full_mask = ~np.isnan(log_L).any(axis=1) log_L_filtered: npt.NDArray[np.float64] = log_L[full_mask] return h_values, log_L_filtered
[docs] def parabolic_refine_map( h_grid: npt.NDArray[np.float64], log_posterior: npt.NDArray[np.float64], ) -> float: """Refine the discrete MAP via 3-point parabolic interpolation. Given a log-posterior on a discrete h-grid, fits a parabola through the discrete argmax and its two neighbours and returns the analytic vertex of the parabola. Falls back to the discrete argmax when the peak is on the grid boundary or the parabola degenerates. Parameters ---------- h_grid: Monotonically increasing h-values. log_posterior: Log-posterior values on ``h_grid``. Returns ------- float Sub-grid continuous MAP estimate. """ i = int(np.argmax(log_posterior)) if i <= 0 or i >= len(h_grid) - 1: return float(h_grid[i]) h0, h1, h2 = float(h_grid[i - 1]), float(h_grid[i]), float(h_grid[i + 1]) y0, y1, y2 = ( float(log_posterior[i - 1]), float(log_posterior[i]), float(log_posterior[i + 1]), ) denom = y0 - 2 * y1 + y2 if abs(denom) < 1e-12: return h1 return float(h1 - 0.5 * (h2 - h0) * (y2 - y0) / (2 * denom))
[docs] def compute_canonical_combined_posterior( posteriors_dir: Path, ) -> dict[str, object]: """Compute the canonical joint H0 posterior from per-h JSONs. Aggregates per-event log-likelihoods via raw ``Σ log L_i`` (no physics-floor, no outer D(h) correction — the Tier 3 fix removed the latter from ``combine_log_space``). This is the combination quoted in Phase 48 verdict JSON and throughout ``docs/H0_BIAS_RESOLUTION.md``. Parameters ---------- posteriors_dir: Directory of ``h_*.json`` files (1D or 2D variant). Returns ------- dict with keys - ``h_values``: list[float] — sorted h-grid - ``posterior``: list[float] — peak-normalised posterior - ``log_posterior``: list[float] — un-normalised Σ log L_i - ``n_events_used``: int — events surviving the full-mask filter - ``discrete_map``: float — argmax on the discrete grid - ``continuous_map``: float — parabolic-refined sub-grid MAP - ``strategy``: literal string ``"raw-sum-log"`` """ h_values, log_L = load_per_h_likelihoods(posteriors_dir) if not h_values or log_L.size == 0: return { "h_values": [], "posterior": [], "log_posterior": [], "n_events_used": 0, "discrete_map": float("nan"), "continuous_map": float("nan"), "strategy": "raw-sum-log", } h_arr = np.asarray(h_values, dtype=np.float64) log_joint = log_L.sum(axis=0) # Peak-normalise the posterior in linear space for plotting convenience. log_joint_shifted = log_joint - log_joint.max() posterior = np.exp(log_joint_shifted) discrete_argmax = int(np.argmax(log_joint)) discrete_map = float(h_arr[discrete_argmax]) continuous_map = parabolic_refine_map(h_arr, log_joint) return { "h_values": [float(h) for h in h_values], "posterior": [float(p) for p in posterior], "log_posterior": [float(p) for p in log_joint], "n_events_used": int(log_L.shape[0]), "discrete_map": discrete_map, "continuous_map": continuous_map, "strategy": "raw-sum-log", }