Source code for master_thesis_code.bayesian_inference.evaluation_report

"""Evaluation report module for baseline extraction and comparison.

Provides tools to extract a baseline H0 posterior snapshot from an h-sweep,
compute credible intervals, and generate before/after comparison reports.

Used by Phases 31-34 to measure the effect of each fix on the H0 posterior.
"""

import datetime
import json
import logging
import subprocess
from dataclasses import dataclass, field
from pathlib import Path

import numpy as np
import pandas as pd
from scipy.integrate import cumulative_trapezoid

_LOGGER = logging.getLogger(__name__)


def _get_git_commit_safe() -> str:
    """Return current git commit hash, or 'unknown' if not available."""
    try:
        return (
            subprocess.check_output(["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL)
            .decode()
            .strip()
        )
    except (subprocess.CalledProcessError, FileNotFoundError):
        return "unknown"


[docs] @dataclass class BaselineSnapshot: """Snapshot of H0 posterior metrics extracted from a posteriors directory. Args: map_h: MAP (maximum a posteriori) Hubble constant value. ci_lower: Lower bound of the 68% credible interval. ci_upper: Upper bound of the 68% credible interval. ci_width: Width of the 68% credible interval (ci_upper - ci_lower). bias_percent: Relative bias as (MAP - true_h) / true_h * 100. n_events: Number of detection events contributing to the posterior. h_values: Sorted list of h values in the sweep. log_posteriors: Log-posterior values at each h. per_event_summaries: Per-event diagnostic data (d_L, SNR, etc.). created_at: ISO 8601 timestamp of when this snapshot was created. git_commit: Git commit hash at time of creation. """ map_h: float ci_lower: float ci_upper: float ci_width: float bias_percent: float n_events: int h_values: list[float] = field(default_factory=list) log_posteriors: list[float] = field(default_factory=list) per_event_summaries: list[dict[str, float]] = field(default_factory=list) n_excluded_fisher: int = 0 median_cond_3d: float = 0.0 median_cond_4d: float = 0.0 created_at: str = field( default_factory=lambda: datetime.datetime.now(datetime.UTC).isoformat() + "Z" ) git_commit: str = field(default_factory=_get_git_commit_safe)
[docs] def to_json(self) -> dict[str, object]: """Serialize to a JSON-compatible dictionary. Returns: Dictionary representation suitable for json.dumps. """ return { "map_h": self.map_h, "ci_lower": self.ci_lower, "ci_upper": self.ci_upper, "ci_width": self.ci_width, "bias_percent": self.bias_percent, "n_events": self.n_events, "h_values": self.h_values, "log_posteriors": self.log_posteriors, "per_event_summaries": self.per_event_summaries, "n_excluded_fisher": self.n_excluded_fisher, "median_cond_3d": self.median_cond_3d, "median_cond_4d": self.median_cond_4d, "created_at": self.created_at, "git_commit": self.git_commit, }
[docs] @classmethod def from_json(cls, data: dict[str, object]) -> "BaselineSnapshot": """Deserialize from a JSON-compatible dictionary. Args: data: Dictionary as produced by to_json(). Returns: A new BaselineSnapshot instance. """ from typing import Any d: dict[str, Any] = data return cls( map_h=float(d["map_h"]), ci_lower=float(d["ci_lower"]), ci_upper=float(d["ci_upper"]), ci_width=float(d["ci_width"]), bias_percent=float(d["bias_percent"]), n_events=int(d["n_events"]), h_values=list(d.get("h_values", [])), log_posteriors=list(d.get("log_posteriors", [])), per_event_summaries=list(d.get("per_event_summaries", [])), n_excluded_fisher=int(d.get("n_excluded_fisher", 0)), median_cond_3d=float(d.get("median_cond_3d", 0.0)), median_cond_4d=float(d.get("median_cond_4d", 0.0)), created_at=str(d.get("created_at", "")), git_commit=str(d.get("git_commit", "unknown")), )
[docs] def load_posteriors(posteriors_dir: Path) -> list[dict[str, float]]: """Load h_*.json posterior files from a directory. Each file should contain a dict with key "h" (float) and integer-string keys mapping to [likelihood_value] arrays for each detection event. Args: posteriors_dir: Path to directory containing h_*.json files. Returns: List of dicts with keys: "h", "log_posterior", "n_detections". Sorted by h value in ascending order. """ files = sorted(posteriors_dir.glob("h_*.json")) if len(files) > 100: _LOGGER.warning( "Found %d posterior files in %s. Loading all may be slow. " "Consider pre-filtering if this is unexpected.", len(files), posteriors_dir, ) results: list[dict[str, float]] = [] for path in files: data: dict[str, object] = json.loads(path.read_text()) h = float(data["h"]) # type: ignore[arg-type] # Collect detection keys: all keys except "h" detection_keys = [k for k in data if k != "h"] n_detections = len(detection_keys) # Compute log_posterior = sum of log-likelihoods across events log_posterior = 0.0 for key in detection_keys: likelihood_list = data[key] if isinstance(likelihood_list, list) and len(likelihood_list) > 0: lk = float(likelihood_list[0]) if lk > 0: log_posterior += np.log(lk) # If likelihood is 0 or negative, skip (numerical floor) results.append( { "h": h, "log_posterior": log_posterior, "n_detections": float(n_detections), } ) results.sort(key=lambda r: r["h"]) return results
[docs] def compute_credible_interval( h_values: list[float], log_posteriors: list[float], level: float = 0.68, ) -> tuple[float, float]: """Compute a credible interval from log-posterior values. Converts log-posteriors to a normalized probability distribution, then computes the CDF and finds where it crosses (1-level)/2 and (1+level)/2. Args: h_values: Sorted list of h values. log_posteriors: Log-posterior value at each h. level: Credible interval level (default 0.68 for 68% CI). Returns: Tuple (lower, upper) bounding the credible interval. """ h_arr = np.array(h_values, dtype=np.float64) lp_arr = np.array(log_posteriors, dtype=np.float64) # Normalize: subtract max for numerical stability, then exponentiate prob = np.exp(lp_arr - lp_arr.max()) # Normalize via trapezoid integration (np.trapezoid since NumPy 2.0) norm = float(np.trapezoid(prob, x=h_arr)) if norm <= 0: norm = 1.0 prob = prob / norm # CDF via cumulative trapezoid cdf_vals = cumulative_trapezoid(prob, x=h_arr, initial=0.0) lower_target = (1.0 - level) / 2.0 upper_target = (1.0 + level) / 2.0 # Linear interpolation to find crossing points lower = float(np.interp(lower_target, cdf_vals, h_arr)) upper = float(np.interp(upper_target, cdf_vals, h_arr)) return lower, upper
[docs] def extract_baseline( posteriors_dir: Path, crb_csv_path: Path | None = None, true_h: float = 0.73, ) -> BaselineSnapshot: """Extract baseline H0 posterior metrics from a posteriors directory. .. warning:: This function does **NOT** apply the ``-N log D(h)`` selection-effect normalization from Gray et al. (2020) arXiv:1908.06050 Eq. A.19. The MAP returned here is systematically biased toward ``h_max`` in standard ΛCDM cosmology (where D(h) grows with h), because the log-posterior ``Σ_i log L_i(h)`` is a monotone-increasing function of h without the D(h) correction. Use ``--evaluate`` (``BayesianStatistics.evaluate``) for production MAP reporting. ``extract_baseline`` is **diagnostic-only** — suitable for comparing relative shifts between h-sweeps run under identical conditions, but not for absolute MAP recovery. Args: posteriors_dir: Directory containing h_*.json files from an h-sweep. crb_csv_path: Optional path to CRB CSV for per-event summaries. true_h: True Hubble constant value for bias computation. Default 0.73. Returns: BaselineSnapshot with MAP h, 68% CI, bias %, and event count. Raises: ValueError: If fewer than 3 h-value files are found (insufficient for credible interval computation per D-02). """ # Gray et al. (2020) arXiv:1908.06050 Eq. A.19: correct posterior is # log p(h) = sum_i log L_i(h) - N * log D(h). This function omits the # -N log D(h) term; MAP is therefore biased. Use --evaluate for production. _LOGGER.warning( "extract_baseline does not apply -N log D(h) normalization " "(Gray et al. 2020 arXiv:1908.06050 Eq. A.19). " "MAP may be biased toward h_max. Use --evaluate for production results." ) posteriors = load_posteriors(posteriors_dir) if len(posteriors) < 3: raise ValueError( f"Found only {len(posteriors)} h-value files in {posteriors_dir}. " "At least 3 h-value files are required to compute a credible interval. " "Run a full h-sweep before calling extract_baseline." ) h_values = [r["h"] for r in posteriors] log_posts = [r["log_posterior"] for r in posteriors] # MAP h: h value with highest log-posterior map_idx = int(np.argmax(log_posts)) map_h = h_values[map_idx] # 68% CI ci_lower, ci_upper = compute_credible_interval(h_values, log_posts, level=0.68) ci_width = ci_upper - ci_lower # Bias % bias_percent = (map_h - true_h) / true_h * 100.0 # Event count from first file n_events = int(posteriors[0]["n_detections"]) # Per-event summaries from CRB CSV per_event_summaries: list[dict[str, float]] = [] if crb_csv_path is not None and crb_csv_path.exists(): per_event_summaries = _extract_per_event_summaries(crb_csv_path) # Fisher quality metrics from CSV (written by BayesianStatistics.evaluate()) n_excluded_fisher = 0 median_cond_3d = 0.0 median_cond_4d = 0.0 fq_path = posteriors_dir.parent / "fisher_quality.csv" if fq_path.exists(): fq_df = pd.read_csv(fq_path) n_excluded_fisher = int(fq_df["excluded"].sum()) median_cond_3d = float(fq_df["cond_3d"].median()) median_cond_4d = float(fq_df["cond_4d"].median()) return BaselineSnapshot( map_h=map_h, ci_lower=ci_lower, ci_upper=ci_upper, ci_width=ci_width, bias_percent=bias_percent, n_events=n_events, h_values=h_values, log_posteriors=log_posts, per_event_summaries=per_event_summaries, n_excluded_fisher=n_excluded_fisher, median_cond_3d=median_cond_3d, median_cond_4d=median_cond_4d, created_at=datetime.datetime.now(datetime.UTC).isoformat() + "Z", git_commit=_get_git_commit_safe(), )
def _extract_per_event_summaries(crb_csv_path: Path) -> list[dict[str, float]]: """Extract per-event diagnostic data from a CRB CSV file. Args: crb_csv_path: Path to prepared_cramer_rao_bounds.csv. Returns: List of dicts with keys: d_L, SNR, sigma_d_L_over_d_L, condition_number, quality_pass. """ df = pd.read_csv(crb_csv_path) summaries: list[dict[str, float]] = [] for _, row in df.iterrows(): summary: dict[str, float] = {} for col in ["d_L", "SNR", "sigma_d_L_over_d_L", "condition_number", "quality_pass"]: if col in row.index: summary[col] = float(row[col]) summaries.append(summary) return summaries
[docs] def generate_comparison_report( baseline: BaselineSnapshot, current: BaselineSnapshot, output_dir: Path, label: str = "current", ) -> Path: """Generate a comparison report between a baseline and current posterior snapshot. Writes both a human-readable Markdown report and a machine-readable JSON sidecar. Args: baseline: The reference (pre-change) BaselineSnapshot. current: The current (post-change) BaselineSnapshot. output_dir: Directory to write output files into (created if needed). label: Label suffix for output file names (default "current"). Returns: Path to the generated Markdown report. """ output_dir.mkdir(parents=True, exist_ok=True) md_path = output_dir / f"comparison_{label}.md" json_path = output_dir / f"comparison_{label}.json" # --- Compute deltas --- delta_map_h = current.map_h - baseline.map_h delta_ci_width = current.ci_width - baseline.ci_width delta_bias_pct = current.bias_percent - baseline.bias_percent delta_n_events = current.n_events - baseline.n_events # --- JSON sidecar --- comparison_data: dict[str, object] = { "baseline": { "map_h": baseline.map_h, "ci_lower": baseline.ci_lower, "ci_upper": baseline.ci_upper, "ci_width": baseline.ci_width, "bias_pct": baseline.bias_percent, "n_events": baseline.n_events, "n_excluded_fisher": baseline.n_excluded_fisher, "median_cond_3d": baseline.median_cond_3d, "median_cond_4d": baseline.median_cond_4d, }, "current": { "map_h": current.map_h, "ci_lower": current.ci_lower, "ci_upper": current.ci_upper, "ci_width": current.ci_width, "bias_pct": current.bias_percent, "n_events": current.n_events, "n_excluded_fisher": current.n_excluded_fisher, "median_cond_3d": current.median_cond_3d, "median_cond_4d": current.median_cond_4d, }, "delta": { "map_h": delta_map_h, "ci_width": delta_ci_width, "bias_pct": delta_bias_pct, "n_events": delta_n_events, "n_excluded_fisher": current.n_excluded_fisher - baseline.n_excluded_fisher, }, } json_path.write_text(json.dumps(comparison_data, indent=2)) # --- Markdown report --- lines = [ f"# H0 Posterior Comparison: baseline vs {label}", "", f"Generated: {datetime.datetime.now(datetime.UTC).isoformat()}Z", "", "## Summary Table", "", "| Metric | Baseline | Current | Delta |", "|--------|----------|---------|-------|", f"| MAP h | {baseline.map_h:.4f} | {current.map_h:.4f} | {delta_map_h:+.4f} |", f"| CI lower | {baseline.ci_lower:.4f} | {current.ci_lower:.4f} | {current.ci_lower - baseline.ci_lower:+.4f} |", f"| CI upper | {baseline.ci_upper:.4f} | {current.ci_upper:.4f} | {current.ci_upper - baseline.ci_upper:+.4f} |", f"| CI width | {baseline.ci_width:.4f} | {current.ci_width:.4f} | {delta_ci_width:+.4f} |", f"| Bias % | {baseline.bias_percent:+.1f}% | {current.bias_percent:+.1f}% | {delta_bias_pct:+.1f}% |", f"| N events | {baseline.n_events} | {current.n_events} | {delta_n_events:+d} |", "", "## Log-Posterior by h Value", "", "| h | log P (baseline) | log P (current) |", "|---|-----------------|-----------------|", ] # Build aligned table of log-posteriors baseline_dict = dict(zip(baseline.h_values, baseline.log_posteriors)) current_dict = dict(zip(current.h_values, current.log_posteriors)) all_h = sorted(set(baseline.h_values) | set(current.h_values)) for h in all_h: b_lp = baseline_dict.get(h, float("nan")) c_lp = current_dict.get(h, float("nan")) lines.append(f"| {h:.4f} | {b_lp:.3f} | {c_lp:.3f} |") lines += [ "", "## ASCII Chart", "", "```", ] lines += _ascii_chart( baseline.h_values, baseline.log_posteriors, current.h_values, current.log_posteriors ) lines += [ "```", "", "## Verdict", "", ] # Verdict section if abs(delta_bias_pct) < 0.5: verdict = "No significant change in bias." elif delta_bias_pct < 0: verdict = f"Bias IMPROVED by {abs(delta_bias_pct):.1f} percentage points." else: verdict = f"Bias WORSENED by {delta_bias_pct:.1f} percentage points." lines.append(verdict) lines.append("") # Fisher Quality section (D-13) if baseline.n_excluded_fisher > 0 or current.n_excluded_fisher > 0: delta_excluded = current.n_excluded_fisher - baseline.n_excluded_fisher lines += [ "## Fisher Quality", "", "| Metric | Baseline | Current | Delta |", "|--------|----------|---------|-------|", f"| Events excluded (Fisher) | {baseline.n_excluded_fisher} | " f"{current.n_excluded_fisher} | {delta_excluded:+d} |", f"| Median cond (3D) | {baseline.median_cond_3d:.2e} | " f"{current.median_cond_3d:.2e} | - |", f"| Median cond (4D) | {baseline.median_cond_4d:.2e} | " f"{current.median_cond_4d:.2e} | - |", "", ] md_path.write_text("\n".join(lines)) return md_path
[docs] def generate_diagnostic_summary( diagnostic_csv_path: Path, output_dir: Path, label: str = "diagnostic", ) -> dict[str, object]: """Analyze per-event diagnostic CSV and generate explanatory summary. Reads the diagnostic CSV produced by BayesianStatistics.evaluate() and computes statistics explaining WHY the posterior bias changes: - Mean/median f_i across events (catalog completeness fraction) - L_comp contribution statistics - Fraction of events where L_comp pulls toward lower h The "L_comp pulls toward lower h" metric compares L_comp at h=0.66 vs h=0.73 per event. If L_comp(h=0.66) > L_comp(h=0.73), the completion term biases that event toward lower h. Args: diagnostic_csv_path: Path to event_likelihoods.csv from evaluate(). output_dir: Directory to write the summary report. label: Label suffix for output file name. Returns: Dict with summary statistics (also written to JSON). """ df = pd.read_csv(diagnostic_csv_path) # --- Per-event f_i statistics (use mean across h-values per event) --- event_fi = df.groupby("event_idx")["f_i"].mean() mean_f_i = float(event_fi.mean()) median_f_i = float(event_fi.median()) min_f_i = float(event_fi.min()) max_f_i = float(event_fi.max()) # --- L_comp statistics --- mean_L_comp = float(df["L_comp"].mean()) median_L_comp = float(df["L_comp"].median()) # --- L_comp weight in combination: (1-f_i)*L_comp vs f_i*L_cat --- df_normal = df[df["f_i"] < 1.0].copy() if len(df_normal) > 0: df_normal.loc[:, "L_comp_weight"] = (1 - df_normal["f_i"]) * df_normal["L_comp"] df_normal.loc[:, "L_cat_weight"] = df_normal["f_i"] * df_normal["L_cat_no_bh"] total_combined = df_normal["L_comp_weight"] + df_normal["L_cat_weight"] safe_mask = total_combined > 0 mean_L_comp_frac = ( float((df_normal.loc[safe_mask, "L_comp_weight"] / total_combined[safe_mask]).mean()) if safe_mask.any() else 0.0 ) else: mean_L_comp_frac = 0.0 # --- Fraction of events where L_comp pulls toward lower h --- # Compare L_comp at lowest h vs highest h per event # If L_comp(h_low) > L_comp(h_high), completion term prefers lower h h_values_sorted = sorted(df["h"].unique()) n_pulls_low = 0 n_events_compared = 0 h_low = 0.0 h_high = 0.0 if len(h_values_sorted) >= 2: h_low = h_values_sorted[0] h_high = h_values_sorted[-1] # Find the two h-values closest to 0.66 and 0.73 if available h_target_low = min(h_values_sorted, key=lambda x: abs(x - 0.66)) h_target_high = min(h_values_sorted, key=lambda x: abs(x - 0.73)) if h_target_low != h_target_high: h_low, h_high = h_target_low, h_target_high df_low = df[df["h"] == h_low].set_index("event_idx") df_high = df[df["h"] == h_high].set_index("event_idx") common_events = df_low.index.intersection(df_high.index) n_events_compared = len(common_events) for evt in common_events: low_val = df_low.loc[evt, "L_comp"] high_val = df_high.loc[evt, "L_comp"] # Use scalar mean when event_idx has duplicate rows (multiple detections per event) low_scalar = float(low_val.mean()) if hasattr(low_val, "mean") else float(low_val) high_scalar = float(high_val.mean()) if hasattr(high_val, "mean") else float(high_val) if low_scalar > high_scalar: n_pulls_low += 1 frac_pulls_low = float(n_pulls_low / n_events_compared) if n_events_compared > 0 else 0.0 summary: dict[str, object] = { "n_rows": len(df), "n_events": int(df["event_idx"].nunique()), "n_h_values": int(df["h"].nunique()), "mean_f_i": round(mean_f_i, 6), "median_f_i": round(median_f_i, 6), "min_f_i": round(min_f_i, 6), "max_f_i": round(max_f_i, 6), "mean_L_comp": mean_L_comp, "median_L_comp": median_L_comp, "mean_L_comp_fraction_of_combined": round(mean_L_comp_frac, 4), "frac_L_comp_pulls_low_h": round(frac_pulls_low, 4), "n_events_L_comp_pulls_low": n_pulls_low, "n_events_compared": n_events_compared, "h_low_compared": h_low if n_events_compared > 0 else None, "h_high_compared": h_high if n_events_compared > 0 else None, } # Write JSON summary output_dir.mkdir(parents=True, exist_ok=True) json_path = output_dir / f"diagnostic_summary_{label}.json" json_path.write_text(json.dumps(summary, indent=2)) # Write markdown summary md_path = output_dir / f"diagnostic_summary_{label}.md" md_lines = [ f"# Diagnostic Summary: {label}", "", f"Source: `{diagnostic_csv_path}`", f"Events: {summary['n_events']}, H-values: {summary['n_h_values']}, " f"Rows: {summary['n_rows']}", "", "## Catalog Completeness (f_i)", "", f"- Mean f_i: {summary['mean_f_i']}", f"- Median f_i: {summary['median_f_i']}", f"- Range: [{summary['min_f_i']}, {summary['max_f_i']}]", "", "## Completion Term (L_comp)", "", f"- Mean L_comp: {summary['mean_L_comp']:.6e}", f"- Median L_comp: {summary['median_L_comp']:.6e}", f"- Mean L_comp fraction of combined likelihood: " f"{summary['mean_L_comp_fraction_of_combined']:.1%}", "", "## Bias Direction", "", f"- Events where L_comp pulls toward lower h: " f"{summary['n_events_L_comp_pulls_low']}/{summary['n_events_compared']}", f"- Fraction: {summary['frac_L_comp_pulls_low_h']:.1%}", ] if n_events_compared > 0: md_lines.append( f"- Compared h={summary['h_low_compared']} vs h={summary['h_high_compared']}" ) md_lines.append("") md_path.write_text("\n".join(md_lines)) _LOGGER.info("Diagnostic summary written to %s", json_path) return summary
def _ascii_chart( h_baseline: list[float], lp_baseline: list[float], h_current: list[float], lp_current: list[float], width: int = 60, height: int = 10, ) -> list[str]: """Generate a simple ASCII chart comparing two log-posterior curves. Args: h_baseline: h values for the baseline. lp_baseline: Log-posteriors for the baseline. h_current: h values for the current run. lp_current: Log-posteriors for the current run. width: Chart width in characters. height: Chart height in lines. Returns: List of strings forming the ASCII chart. """ if not h_baseline and not h_current: return ["(no data)"] all_h = h_baseline + h_current all_lp = lp_baseline + lp_current if not all_h: return ["(no data)"] h_min = min(all_h) h_max = max(all_h) lp_min = min(all_lp) lp_max = max(all_lp) if h_max == h_min: h_max = h_min + 1.0 if lp_max == lp_min: lp_max = lp_min + 1.0 grid = [[" "] * width for _ in range(height)] def _plot(h_vals: list[float], lp_vals: list[float], char: str) -> None: for h, lp in zip(h_vals, lp_vals): xi = int((h - h_min) / (h_max - h_min) * (width - 1)) yi = int((lp - lp_min) / (lp_max - lp_min) * (height - 1)) yi = height - 1 - yi # flip y-axis xi = max(0, min(width - 1, xi)) yi = max(0, min(height - 1, yi)) grid[yi][xi] = char _plot(h_baseline, lp_baseline, "B") _plot(h_current, lp_current, "C") lines = [] for row in grid: lines.append("".join(row)) lines.append(f"{h_min:.2f}" + " " * (width - 9) + f"{h_max:.2f}") lines.append("B = baseline, C = current") return lines