"""Evaluation report module for baseline extraction and comparison.
Provides tools to extract a baseline H0 posterior snapshot from an h-sweep,
compute credible intervals, and generate before/after comparison reports.
Used by Phases 31-34 to measure the effect of each fix on the H0 posterior.
"""
import datetime
import json
import logging
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.integrate import cumulative_trapezoid
_LOGGER = logging.getLogger(__name__)
def _get_git_commit_safe() -> str:
"""Return current git commit hash, or 'unknown' if not available."""
try:
return (
subprocess.check_output(["git", "rev-parse", "HEAD"], stderr=subprocess.DEVNULL)
.decode()
.strip()
)
except (subprocess.CalledProcessError, FileNotFoundError):
return "unknown"
[docs]
@dataclass
class BaselineSnapshot:
"""Snapshot of H0 posterior metrics extracted from a posteriors directory.
Args:
map_h: MAP (maximum a posteriori) Hubble constant value.
ci_lower: Lower bound of the 68% credible interval.
ci_upper: Upper bound of the 68% credible interval.
ci_width: Width of the 68% credible interval (ci_upper - ci_lower).
bias_percent: Relative bias as (MAP - true_h) / true_h * 100.
n_events: Number of detection events contributing to the posterior.
h_values: Sorted list of h values in the sweep.
log_posteriors: Log-posterior values at each h.
per_event_summaries: Per-event diagnostic data (d_L, SNR, etc.).
created_at: ISO 8601 timestamp of when this snapshot was created.
git_commit: Git commit hash at time of creation.
"""
map_h: float
ci_lower: float
ci_upper: float
ci_width: float
bias_percent: float
n_events: int
h_values: list[float] = field(default_factory=list)
log_posteriors: list[float] = field(default_factory=list)
per_event_summaries: list[dict[str, float]] = field(default_factory=list)
n_excluded_fisher: int = 0
median_cond_3d: float = 0.0
median_cond_4d: float = 0.0
created_at: str = field(
default_factory=lambda: datetime.datetime.now(datetime.UTC).isoformat() + "Z"
)
git_commit: str = field(default_factory=_get_git_commit_safe)
[docs]
def to_json(self) -> dict[str, object]:
"""Serialize to a JSON-compatible dictionary.
Returns:
Dictionary representation suitable for json.dumps.
"""
return {
"map_h": self.map_h,
"ci_lower": self.ci_lower,
"ci_upper": self.ci_upper,
"ci_width": self.ci_width,
"bias_percent": self.bias_percent,
"n_events": self.n_events,
"h_values": self.h_values,
"log_posteriors": self.log_posteriors,
"per_event_summaries": self.per_event_summaries,
"n_excluded_fisher": self.n_excluded_fisher,
"median_cond_3d": self.median_cond_3d,
"median_cond_4d": self.median_cond_4d,
"created_at": self.created_at,
"git_commit": self.git_commit,
}
[docs]
@classmethod
def from_json(cls, data: dict[str, object]) -> "BaselineSnapshot":
"""Deserialize from a JSON-compatible dictionary.
Args:
data: Dictionary as produced by to_json().
Returns:
A new BaselineSnapshot instance.
"""
from typing import Any
d: dict[str, Any] = data
return cls(
map_h=float(d["map_h"]),
ci_lower=float(d["ci_lower"]),
ci_upper=float(d["ci_upper"]),
ci_width=float(d["ci_width"]),
bias_percent=float(d["bias_percent"]),
n_events=int(d["n_events"]),
h_values=list(d.get("h_values", [])),
log_posteriors=list(d.get("log_posteriors", [])),
per_event_summaries=list(d.get("per_event_summaries", [])),
n_excluded_fisher=int(d.get("n_excluded_fisher", 0)),
median_cond_3d=float(d.get("median_cond_3d", 0.0)),
median_cond_4d=float(d.get("median_cond_4d", 0.0)),
created_at=str(d.get("created_at", "")),
git_commit=str(d.get("git_commit", "unknown")),
)
[docs]
def load_posteriors(posteriors_dir: Path) -> list[dict[str, float]]:
"""Load h_*.json posterior files from a directory.
Each file should contain a dict with key "h" (float) and integer-string
keys mapping to [likelihood_value] arrays for each detection event.
Args:
posteriors_dir: Path to directory containing h_*.json files.
Returns:
List of dicts with keys: "h", "log_posterior", "n_detections".
Sorted by h value in ascending order.
"""
files = sorted(posteriors_dir.glob("h_*.json"))
if len(files) > 100:
_LOGGER.warning(
"Found %d posterior files in %s. Loading all may be slow. "
"Consider pre-filtering if this is unexpected.",
len(files),
posteriors_dir,
)
results: list[dict[str, float]] = []
for path in files:
data: dict[str, object] = json.loads(path.read_text())
h = float(data["h"]) # type: ignore[arg-type]
# Collect detection keys: all keys except "h"
detection_keys = [k for k in data if k != "h"]
n_detections = len(detection_keys)
# Compute log_posterior = sum of log-likelihoods across events
log_posterior = 0.0
for key in detection_keys:
likelihood_list = data[key]
if isinstance(likelihood_list, list) and len(likelihood_list) > 0:
lk = float(likelihood_list[0])
if lk > 0:
log_posterior += np.log(lk)
# If likelihood is 0 or negative, skip (numerical floor)
results.append(
{
"h": h,
"log_posterior": log_posterior,
"n_detections": float(n_detections),
}
)
results.sort(key=lambda r: r["h"])
return results
[docs]
def compute_credible_interval(
h_values: list[float],
log_posteriors: list[float],
level: float = 0.68,
) -> tuple[float, float]:
"""Compute a credible interval from log-posterior values.
Converts log-posteriors to a normalized probability distribution, then
computes the CDF and finds where it crosses (1-level)/2 and (1+level)/2.
Args:
h_values: Sorted list of h values.
log_posteriors: Log-posterior value at each h.
level: Credible interval level (default 0.68 for 68% CI).
Returns:
Tuple (lower, upper) bounding the credible interval.
"""
h_arr = np.array(h_values, dtype=np.float64)
lp_arr = np.array(log_posteriors, dtype=np.float64)
# Normalize: subtract max for numerical stability, then exponentiate
prob = np.exp(lp_arr - lp_arr.max())
# Normalize via trapezoid integration (np.trapezoid since NumPy 2.0)
norm = float(np.trapezoid(prob, x=h_arr))
if norm <= 0:
norm = 1.0
prob = prob / norm
# CDF via cumulative trapezoid
cdf_vals = cumulative_trapezoid(prob, x=h_arr, initial=0.0)
lower_target = (1.0 - level) / 2.0
upper_target = (1.0 + level) / 2.0
# Linear interpolation to find crossing points
lower = float(np.interp(lower_target, cdf_vals, h_arr))
upper = float(np.interp(upper_target, cdf_vals, h_arr))
return lower, upper
def _extract_per_event_summaries(crb_csv_path: Path) -> list[dict[str, float]]:
"""Extract per-event diagnostic data from a CRB CSV file.
Args:
crb_csv_path: Path to prepared_cramer_rao_bounds.csv.
Returns:
List of dicts with keys: d_L, SNR, sigma_d_L_over_d_L, condition_number,
quality_pass.
"""
df = pd.read_csv(crb_csv_path)
summaries: list[dict[str, float]] = []
for _, row in df.iterrows():
summary: dict[str, float] = {}
for col in ["d_L", "SNR", "sigma_d_L_over_d_L", "condition_number", "quality_pass"]:
if col in row.index:
summary[col] = float(row[col])
summaries.append(summary)
return summaries
[docs]
def generate_comparison_report(
baseline: BaselineSnapshot,
current: BaselineSnapshot,
output_dir: Path,
label: str = "current",
) -> Path:
"""Generate a comparison report between a baseline and current posterior snapshot.
Writes both a human-readable Markdown report and a machine-readable JSON sidecar.
Args:
baseline: The reference (pre-change) BaselineSnapshot.
current: The current (post-change) BaselineSnapshot.
output_dir: Directory to write output files into (created if needed).
label: Label suffix for output file names (default "current").
Returns:
Path to the generated Markdown report.
"""
output_dir.mkdir(parents=True, exist_ok=True)
md_path = output_dir / f"comparison_{label}.md"
json_path = output_dir / f"comparison_{label}.json"
# --- Compute deltas ---
delta_map_h = current.map_h - baseline.map_h
delta_ci_width = current.ci_width - baseline.ci_width
delta_bias_pct = current.bias_percent - baseline.bias_percent
delta_n_events = current.n_events - baseline.n_events
# --- JSON sidecar ---
comparison_data: dict[str, object] = {
"baseline": {
"map_h": baseline.map_h,
"ci_lower": baseline.ci_lower,
"ci_upper": baseline.ci_upper,
"ci_width": baseline.ci_width,
"bias_pct": baseline.bias_percent,
"n_events": baseline.n_events,
"n_excluded_fisher": baseline.n_excluded_fisher,
"median_cond_3d": baseline.median_cond_3d,
"median_cond_4d": baseline.median_cond_4d,
},
"current": {
"map_h": current.map_h,
"ci_lower": current.ci_lower,
"ci_upper": current.ci_upper,
"ci_width": current.ci_width,
"bias_pct": current.bias_percent,
"n_events": current.n_events,
"n_excluded_fisher": current.n_excluded_fisher,
"median_cond_3d": current.median_cond_3d,
"median_cond_4d": current.median_cond_4d,
},
"delta": {
"map_h": delta_map_h,
"ci_width": delta_ci_width,
"bias_pct": delta_bias_pct,
"n_events": delta_n_events,
"n_excluded_fisher": current.n_excluded_fisher - baseline.n_excluded_fisher,
},
}
json_path.write_text(json.dumps(comparison_data, indent=2))
# --- Markdown report ---
lines = [
f"# H0 Posterior Comparison: baseline vs {label}",
"",
f"Generated: {datetime.datetime.now(datetime.UTC).isoformat()}Z",
"",
"## Summary Table",
"",
"| Metric | Baseline | Current | Delta |",
"|--------|----------|---------|-------|",
f"| MAP h | {baseline.map_h:.4f} | {current.map_h:.4f} | {delta_map_h:+.4f} |",
f"| CI lower | {baseline.ci_lower:.4f} | {current.ci_lower:.4f} | {current.ci_lower - baseline.ci_lower:+.4f} |",
f"| CI upper | {baseline.ci_upper:.4f} | {current.ci_upper:.4f} | {current.ci_upper - baseline.ci_upper:+.4f} |",
f"| CI width | {baseline.ci_width:.4f} | {current.ci_width:.4f} | {delta_ci_width:+.4f} |",
f"| Bias % | {baseline.bias_percent:+.1f}% | {current.bias_percent:+.1f}% | {delta_bias_pct:+.1f}% |",
f"| N events | {baseline.n_events} | {current.n_events} | {delta_n_events:+d} |",
"",
"## Log-Posterior by h Value",
"",
"| h | log P (baseline) | log P (current) |",
"|---|-----------------|-----------------|",
]
# Build aligned table of log-posteriors
baseline_dict = dict(zip(baseline.h_values, baseline.log_posteriors))
current_dict = dict(zip(current.h_values, current.log_posteriors))
all_h = sorted(set(baseline.h_values) | set(current.h_values))
for h in all_h:
b_lp = baseline_dict.get(h, float("nan"))
c_lp = current_dict.get(h, float("nan"))
lines.append(f"| {h:.4f} | {b_lp:.3f} | {c_lp:.3f} |")
lines += [
"",
"## ASCII Chart",
"",
"```",
]
lines += _ascii_chart(
baseline.h_values, baseline.log_posteriors, current.h_values, current.log_posteriors
)
lines += [
"```",
"",
"## Verdict",
"",
]
# Verdict section
if abs(delta_bias_pct) < 0.5:
verdict = "No significant change in bias."
elif delta_bias_pct < 0:
verdict = f"Bias IMPROVED by {abs(delta_bias_pct):.1f} percentage points."
else:
verdict = f"Bias WORSENED by {delta_bias_pct:.1f} percentage points."
lines.append(verdict)
lines.append("")
# Fisher Quality section (D-13)
if baseline.n_excluded_fisher > 0 or current.n_excluded_fisher > 0:
delta_excluded = current.n_excluded_fisher - baseline.n_excluded_fisher
lines += [
"## Fisher Quality",
"",
"| Metric | Baseline | Current | Delta |",
"|--------|----------|---------|-------|",
f"| Events excluded (Fisher) | {baseline.n_excluded_fisher} | "
f"{current.n_excluded_fisher} | {delta_excluded:+d} |",
f"| Median cond (3D) | {baseline.median_cond_3d:.2e} | "
f"{current.median_cond_3d:.2e} | - |",
f"| Median cond (4D) | {baseline.median_cond_4d:.2e} | "
f"{current.median_cond_4d:.2e} | - |",
"",
]
md_path.write_text("\n".join(lines))
return md_path
[docs]
def generate_diagnostic_summary(
diagnostic_csv_path: Path,
output_dir: Path,
label: str = "diagnostic",
) -> dict[str, object]:
"""Analyze per-event diagnostic CSV and generate explanatory summary.
Reads the diagnostic CSV produced by BayesianStatistics.evaluate() and
computes statistics explaining WHY the posterior bias changes:
- Mean/median f_i across events (catalog completeness fraction)
- L_comp contribution statistics
- Fraction of events where L_comp pulls toward lower h
The "L_comp pulls toward lower h" metric compares L_comp at h=0.66 vs
h=0.73 per event. If L_comp(h=0.66) > L_comp(h=0.73), the completion
term biases that event toward lower h.
Args:
diagnostic_csv_path: Path to event_likelihoods.csv from evaluate().
output_dir: Directory to write the summary report.
label: Label suffix for output file name.
Returns:
Dict with summary statistics (also written to JSON).
"""
df = pd.read_csv(diagnostic_csv_path)
# --- Per-event f_i statistics (use mean across h-values per event) ---
event_fi = df.groupby("event_idx")["f_i"].mean()
mean_f_i = float(event_fi.mean())
median_f_i = float(event_fi.median())
min_f_i = float(event_fi.min())
max_f_i = float(event_fi.max())
# --- L_comp statistics ---
mean_L_comp = float(df["L_comp"].mean())
median_L_comp = float(df["L_comp"].median())
# --- L_comp weight in combination: (1-f_i)*L_comp vs f_i*L_cat ---
df_normal = df[df["f_i"] < 1.0].copy()
if len(df_normal) > 0:
df_normal.loc[:, "L_comp_weight"] = (1 - df_normal["f_i"]) * df_normal["L_comp"]
df_normal.loc[:, "L_cat_weight"] = df_normal["f_i"] * df_normal["L_cat_no_bh"]
total_combined = df_normal["L_comp_weight"] + df_normal["L_cat_weight"]
safe_mask = total_combined > 0
mean_L_comp_frac = (
float((df_normal.loc[safe_mask, "L_comp_weight"] / total_combined[safe_mask]).mean())
if safe_mask.any()
else 0.0
)
else:
mean_L_comp_frac = 0.0
# --- Fraction of events where L_comp pulls toward lower h ---
# Compare L_comp at lowest h vs highest h per event
# If L_comp(h_low) > L_comp(h_high), completion term prefers lower h
h_values_sorted = sorted(df["h"].unique())
n_pulls_low = 0
n_events_compared = 0
h_low = 0.0
h_high = 0.0
if len(h_values_sorted) >= 2:
h_low = h_values_sorted[0]
h_high = h_values_sorted[-1]
# Find the two h-values closest to 0.66 and 0.73 if available
h_target_low = min(h_values_sorted, key=lambda x: abs(x - 0.66))
h_target_high = min(h_values_sorted, key=lambda x: abs(x - 0.73))
if h_target_low != h_target_high:
h_low, h_high = h_target_low, h_target_high
df_low = df[df["h"] == h_low].set_index("event_idx")
df_high = df[df["h"] == h_high].set_index("event_idx")
common_events = df_low.index.intersection(df_high.index)
n_events_compared = len(common_events)
for evt in common_events:
low_val = df_low.loc[evt, "L_comp"]
high_val = df_high.loc[evt, "L_comp"]
# Use scalar mean when event_idx has duplicate rows (multiple detections per event)
low_scalar = float(low_val.mean()) if hasattr(low_val, "mean") else float(low_val)
high_scalar = float(high_val.mean()) if hasattr(high_val, "mean") else float(high_val)
if low_scalar > high_scalar:
n_pulls_low += 1
frac_pulls_low = float(n_pulls_low / n_events_compared) if n_events_compared > 0 else 0.0
summary: dict[str, object] = {
"n_rows": len(df),
"n_events": int(df["event_idx"].nunique()),
"n_h_values": int(df["h"].nunique()),
"mean_f_i": round(mean_f_i, 6),
"median_f_i": round(median_f_i, 6),
"min_f_i": round(min_f_i, 6),
"max_f_i": round(max_f_i, 6),
"mean_L_comp": mean_L_comp,
"median_L_comp": median_L_comp,
"mean_L_comp_fraction_of_combined": round(mean_L_comp_frac, 4),
"frac_L_comp_pulls_low_h": round(frac_pulls_low, 4),
"n_events_L_comp_pulls_low": n_pulls_low,
"n_events_compared": n_events_compared,
"h_low_compared": h_low if n_events_compared > 0 else None,
"h_high_compared": h_high if n_events_compared > 0 else None,
}
# Write JSON summary
output_dir.mkdir(parents=True, exist_ok=True)
json_path = output_dir / f"diagnostic_summary_{label}.json"
json_path.write_text(json.dumps(summary, indent=2))
# Write markdown summary
md_path = output_dir / f"diagnostic_summary_{label}.md"
md_lines = [
f"# Diagnostic Summary: {label}",
"",
f"Source: `{diagnostic_csv_path}`",
f"Events: {summary['n_events']}, H-values: {summary['n_h_values']}, "
f"Rows: {summary['n_rows']}",
"",
"## Catalog Completeness (f_i)",
"",
f"- Mean f_i: {summary['mean_f_i']}",
f"- Median f_i: {summary['median_f_i']}",
f"- Range: [{summary['min_f_i']}, {summary['max_f_i']}]",
"",
"## Completion Term (L_comp)",
"",
f"- Mean L_comp: {summary['mean_L_comp']:.6e}",
f"- Median L_comp: {summary['median_L_comp']:.6e}",
f"- Mean L_comp fraction of combined likelihood: "
f"{summary['mean_L_comp_fraction_of_combined']:.1%}",
"",
"## Bias Direction",
"",
f"- Events where L_comp pulls toward lower h: "
f"{summary['n_events_L_comp_pulls_low']}/{summary['n_events_compared']}",
f"- Fraction: {summary['frac_L_comp_pulls_low_h']:.1%}",
]
if n_events_compared > 0:
md_lines.append(
f"- Compared h={summary['h_low_compared']} vs h={summary['h_high_compared']}"
)
md_lines.append("")
md_path.write_text("\n".join(md_lines))
_LOGGER.info("Diagnostic summary written to %s", json_path)
return summary
def _ascii_chart(
h_baseline: list[float],
lp_baseline: list[float],
h_current: list[float],
lp_current: list[float],
width: int = 60,
height: int = 10,
) -> list[str]:
"""Generate a simple ASCII chart comparing two log-posterior curves.
Args:
h_baseline: h values for the baseline.
lp_baseline: Log-posteriors for the baseline.
h_current: h values for the current run.
lp_current: Log-posteriors for the current run.
width: Chart width in characters.
height: Chart height in lines.
Returns:
List of strings forming the ASCII chart.
"""
if not h_baseline and not h_current:
return ["(no data)"]
all_h = h_baseline + h_current
all_lp = lp_baseline + lp_current
if not all_h:
return ["(no data)"]
h_min = min(all_h)
h_max = max(all_h)
lp_min = min(all_lp)
lp_max = max(all_lp)
if h_max == h_min:
h_max = h_min + 1.0
if lp_max == lp_min:
lp_max = lp_min + 1.0
grid = [[" "] * width for _ in range(height)]
def _plot(h_vals: list[float], lp_vals: list[float], char: str) -> None:
for h, lp in zip(h_vals, lp_vals):
xi = int((h - h_min) / (h_max - h_min) * (width - 1))
yi = int((lp - lp_min) / (lp_max - lp_min) * (height - 1))
yi = height - 1 - yi # flip y-axis
xi = max(0, min(width - 1, xi))
yi = max(0, min(height - 1, yi))
grid[yi][xi] = char
_plot(h_baseline, lp_baseline, "B")
_plot(h_current, lp_current, "C")
lines = []
for row in grid:
lines.append("".join(row))
lines.append(f"{h_min:.2f}" + " " * (width - 9) + f"{h_max:.2f}")
lines.append("B = baseline, C = current")
return lines