Source code for master_thesis_code.datamodels.galaxy

"""Galaxy and GalaxyCatalog datamodels for Bayesian H₀ inference."""

from dataclasses import dataclass, field
from statistics import NormalDist
from typing import Any

import emcee
import numpy as np
import numpy.typing as npt
from scipy.integrate import cumulative_trapezoid
from scipy.interpolate import CubicSpline
from scipy.stats import rv_continuous, truncnorm

from master_thesis_code.constants import (
    FRACTIONAL_BLACK_HOLE_MASS_CATALOG_ERROR,
    GALAXY_REDSHIFT_ERROR_COEFFICIENT,
    LUMINOSITY_DISTANCE_THRESHOLD_GPC,
    OMEGA_M,
    TRUE_HUBBLE_CONSTANT,
)
from master_thesis_code.constants import (
    OMEGA_DE as OMEGA_LAMBDA,
)
from master_thesis_code.constants import (
    SPEED_OF_LIGHT_KM_S as SPEED_OF_LIGHT,
)
from master_thesis_code.physical_relations import dist


[docs] @dataclass(unsafe_hash=True) class Galaxy: """A galaxy in the catalog with a central massive black hole. Attributes: redshift: Spectroscopic redshift :math:`z` (dimensionless). central_black_hole_mass: Central MBH mass in solar masses :math:`M_\\odot`. right_ascension: Ecliptic azimuthal angle in radians :math:`[0, 2\\pi)`. declination: Ecliptic polar angle in radians :math:`[0, \\pi]`. """ redshift: float # dimensionless central_black_hole_mass: float # M_sun (massive black hole mass) right_ascension: float # rad, ecliptic azimuthal angle declination: float # rad, ecliptic polar angle
[docs] @classmethod def with_random_skylocalization( cls, redshift: float, central_black_hole_mass: float, rng: np.random.Generator | None = None, ) -> "Galaxy": if rng is None: rng = np.random.default_rng() # get spherically uniform distributed sky localization right_ascension = float(rng.uniform(0, 2 * np.pi)) declination = float(np.arccos(rng.uniform(-1, 1))) return cls( redshift=redshift, central_black_hole_mass=central_black_hole_mass, right_ascension=right_ascension, declination=declination, )
@property def redshift_uncertainty(self) -> float: return min(GALAXY_REDSHIFT_ERROR_COEFFICIENT * (1 + self.redshift) ** 3, 0.015) @property def central_black_hole_mass_uncertainty(self) -> float: return FRACTIONAL_BLACK_HOLE_MASS_CATALOG_ERROR * self.central_black_hole_mass
[docs] @dataclass class GalaxyCatalog: """Synthetic galaxy catalog for Bayesian H₀ inference. Holds a list of :class:`Galaxy` objects and provides redshift/mass probability distributions used by :class:`~master_thesis_code.bayesian_inference.bayesian_inference.BayesianInference`. Attributes: lower_mass_limit: Minimum central BH mass in :math:`M_\\odot`. upper_mass_limit: Maximum central BH mass in :math:`M_\\odot`. redshift_lower_limit: Minimum redshift in the catalog. redshift_upper_limit: Maximum redshift in the catalog. catalog: List of :class:`Galaxy` instances. """ _use_truncnorm: bool _use_comoving_volume: bool lower_mass_limit: float = 10 ** (4) upper_mass_limit: float = 10 ** (7) redshift_lower_limit: float = 0.00001 redshift_upper_limit: float = 0.55 catalog: list[Galaxy] = field(default_factory=list) galaxy_distribution: list[NormalDist | rv_continuous] = field(default_factory=list) galaxy_mass_distribution: list[NormalDist | rv_continuous] = field(default_factory=list) def __init__( self, use_truncnorm: bool = True, use_comoving_volume: bool = True, h0: float = TRUE_HUBBLE_CONSTANT, ): self._use_truncnorm = use_truncnorm self._use_comoving_volume = use_comoving_volume self._h0 = h0 self.catalog = [] self.galaxy_distribution = [] self.galaxy_mass_distribution = [] self._comoving_volume_element_spline = self._build_comoving_volume_element_spline(h0) @staticmethod def _build_comoving_volume_element_spline(h0: float = TRUE_HUBBLE_CONSTANT) -> CubicSpline: """Precompute the comoving volume element dV_c/dz on a fine redshift grid. The integral I(z) = integral_0^z dz'/E(z') is computed once via cumulative_trapezoid so subsequent calls to comoving_volume_element() are O(log n) spline interpolation. """ _z_grid = np.linspace(0, 10.0, 4000) e_z = np.sqrt(OMEGA_M * (1 + _z_grid) ** 3 + OMEGA_LAMBDA) integrand = 1.0 / e_z cumulative_integral = np.concatenate([[0.0], cumulative_trapezoid(integrand, _z_grid)]) # Eq. 27 in Hogg (1999), arXiv:astro-ph/9905116 cv_grid = 4 * np.pi * (SPEED_OF_LIGHT / h0) ** 3 * cumulative_integral**2 / e_z return CubicSpline(_z_grid, cv_grid)
[docs] def comoving_volume_element(self, redshift: float) -> float: """Comoving volume element dV_c/dz at the given redshift.""" return float(self._comoving_volume_element_spline(redshift))
[docs] def log_comoving_volume_element(self, redshift: float) -> float: try: redshift = redshift[0] # type: ignore[index] except TypeError: pass if redshift < self.redshift_lower_limit or redshift > self.redshift_upper_limit: return float(-np.inf) return float(np.log(self.comoving_volume_element(redshift)))
[docs] def get_samples_from_comoving_volume_element( self, number_of_samples: int, rng: np.random.Generator | None = None ) -> np.ndarray: if rng is None: rng = np.random.default_rng() # use emcee to sample the comoving volume distribution ndim = 1 nwalkers = 5 p0 = rng.uniform(self.redshift_lower_limit, self.redshift_upper_limit, (nwalkers, ndim)) sampler = emcee.EnsembleSampler(nwalkers, ndim, self.log_comoving_volume_element) # burn-in p0, _, _ = sampler.run_mcmc(p0, 1000) sampler.reset() sampler.run_mcmc(p0, int(number_of_samples / nwalkers) + 1) samples = np.array(sampler.get_chain(flat=True)).flatten() # return required number of samples by using the last n samples return samples[-number_of_samples:]
[docs] def create_random_catalog( self, number_of_galaxies: int, rng: np.random.Generator | None = None ) -> None: if rng is None: rng = np.random.default_rng() # draw mass from uniform in log space print( f"Creating random galaxy catalog with {number_of_galaxies} galaxies in the redshift range ({self.redshift_lower_limit}, {self.redshift_upper_limit}) / ({dist(self.redshift_lower_limit)}, {dist(self.redshift_upper_limit)}) Gpc." ) if self._use_comoving_volume: redshift_samples = self.get_samples_from_comoving_volume_element( number_of_galaxies, rng=rng ) assert len(redshift_samples) == number_of_galaxies for redshift in redshift_samples: self.catalog.append( Galaxy.with_random_skylocalization( redshift=redshift, central_black_hole_mass=10 ** float( rng.uniform( np.log10(self.lower_mass_limit), np.log10(self.upper_mass_limit), ) ), rng=rng, ) ) else: for i in range(number_of_galaxies): self.catalog.append( Galaxy.with_random_skylocalization( redshift=float( rng.uniform(self.redshift_lower_limit, self.redshift_upper_limit) ), central_black_hole_mass=10 ** float( rng.uniform( np.log10(self.lower_mass_limit), np.log10(self.upper_mass_limit), ) ), rng=rng, ) ) self.setup_galaxy_distribution() self.setup_galaxy_mass_distribution()
[docs] def remove_all_galaxies(self) -> None: self.catalog = [] self.galaxy_distribution = [] self.galaxy_mass_distribution = []
[docs] def add_random_galaxy(self, rng: np.random.Generator | None = None) -> None: if rng is None: rng = np.random.default_rng() galaxy = Galaxy.with_random_skylocalization( redshift=float(rng.uniform(self.redshift_lower_limit, self.redshift_upper_limit)), central_black_hole_mass=float( rng.uniform(self.lower_mass_limit, self.upper_mass_limit) ), rng=rng, ) self.catalog.append(galaxy) self.append_galaxy_to_galaxy_distribution(galaxy) self.append_galaxy_to_galaxy_mass_distribution(galaxy)
[docs] def add_host_galaxy(self, rng: np.random.Generator | None = None) -> Galaxy: if rng is None: rng = np.random.default_rng() galaxy = Galaxy.with_random_skylocalization( redshift=float(rng.uniform(self.redshift_lower_limit, self.redshift_upper_limit)), central_black_hole_mass=float( rng.uniform(self.lower_mass_limit, self.upper_mass_limit) ), rng=rng, ) self.catalog.append(galaxy) self.append_galaxy_to_galaxy_distribution(galaxy) self.append_galaxy_to_galaxy_mass_distribution(galaxy) return galaxy
[docs] def setup_galaxy_distribution(self) -> None: if not self._use_truncnorm: self.galaxy_distribution = [ NormalDist(mu=galaxy.redshift, sigma=galaxy.redshift_uncertainty) for galaxy in self.catalog ] else: self.galaxy_distribution = [ truncnorm( a=(self.redshift_lower_limit - galaxy.redshift) / galaxy.redshift_uncertainty, b=(self.redshift_upper_limit - galaxy.redshift) / galaxy.redshift_uncertainty, loc=galaxy.redshift, scale=galaxy.redshift_uncertainty, ) for galaxy in self.catalog ]
[docs] def append_galaxy_to_galaxy_mass_distribution(self, galaxy: Galaxy) -> None: if not self._use_truncnorm: self.galaxy_mass_distribution.append( NormalDist( mu=galaxy.central_black_hole_mass, sigma=FRACTIONAL_BLACK_HOLE_MASS_CATALOG_ERROR * galaxy.central_black_hole_mass, ) ) else: self.galaxy_mass_distribution.append( truncnorm( a=(self.lower_mass_limit - galaxy.central_black_hole_mass) / galaxy.central_black_hole_mass_uncertainty, b=(self.upper_mass_limit - galaxy.central_black_hole_mass) / galaxy.central_black_hole_mass_uncertainty, loc=galaxy.central_black_hole_mass, scale=galaxy.central_black_hole_mass_uncertainty, ) )
[docs] def append_galaxy_to_galaxy_distribution(self, galaxy: Galaxy) -> None: if not self._use_truncnorm: self.galaxy_distribution.append( NormalDist(galaxy.redshift, galaxy.redshift_uncertainty) ) else: self.galaxy_distribution.append( truncnorm( a=(self.redshift_lower_limit - galaxy.redshift) / galaxy.redshift_uncertainty, b=(self.redshift_upper_limit - galaxy.redshift) / galaxy.redshift_uncertainty, loc=galaxy.redshift, scale=galaxy.redshift_uncertainty, ) )
[docs] def evaluate_galaxy_distribution(self, redshift: float) -> npt.NDArray[np.float64]: redshift_uncertainty = min(GALAXY_REDSHIFT_ERROR_COEFFICIENT * (1 + redshift) ** 3, 0.015) p_background: float = 1.0 if self._use_comoving_volume: p_background = self.comoving_volume_element(redshift) normal_dist = NormalDist(mu=redshift, sigma=redshift_uncertainty) if self._use_truncnorm: normalization = ( normal_dist.cdf(self.redshift_upper_limit) - normal_dist.cdf(self.redshift_lower_limit) ) * normal_dist.stdev return np.array( [ normal_dist.pdf(galaxy.redshift) / normalization # Adjust for background for galaxy in self.catalog ] ) / len(self.catalog) return np.array( [ normal_dist.pdf(galaxy.redshift) # Adjust for background for galaxy in self.catalog ] ) / len(self.catalog)
[docs] def setup_galaxy_mass_distribution(self) -> None: if not self._use_truncnorm: self.galaxy_mass_distribution = [ NormalDist( mu=galaxy.central_black_hole_mass, # sigma(M) = f * M -- fractional uncertainty, consistent with append_galaxy_to_galaxy_mass_distribution sigma=FRACTIONAL_BLACK_HOLE_MASS_CATALOG_ERROR * galaxy.central_black_hole_mass, ) for galaxy in self.catalog ] else: self.galaxy_mass_distribution = [ truncnorm( a=(self.lower_mass_limit - galaxy.central_black_hole_mass) / galaxy.central_black_hole_mass_uncertainty, b=(self.upper_mass_limit - galaxy.central_black_hole_mass) / galaxy.central_black_hole_mass_uncertainty, loc=galaxy.central_black_hole_mass, scale=galaxy.central_black_hole_mass_uncertainty, ) for galaxy in self.catalog ]
[docs] def evaluate_galaxy_mass_distribution(self, mass: float) -> npt.NDArray[np.float64]: # use truncated normal distribution if self._use_truncnorm: # scipy.stats.truncnorm.pdf() is already normalized over its support return np.array( [distribution.pdf(mass) for distribution in self.galaxy_mass_distribution] ) / len(self.catalog) # without truncnorm return np.array( [distribution.pdf(mass) for distribution in self.galaxy_mass_distribution] ) / len(self.catalog)
[docs] def get_possible_host_galaxies(self) -> list[Galaxy]: return [ galaxy for galaxy in self.catalog if dist(galaxy.redshift, TRUE_HUBBLE_CONSTANT) <= LUMINOSITY_DISTANCE_THRESHOLD_GPC ]
[docs] def get_unique_host_galaxies_from_catalog( self, number_of_host_galaxies: int, rng: np.random.Generator | None = None ) -> list[Galaxy]: if rng is None: rng = np.random.default_rng() if len(self.get_possible_host_galaxies()) < number_of_host_galaxies: print("Not enough possible host galaxies in catalog.") return [] possible: list[Any] = self.get_possible_host_galaxies() indices = rng.choice(len(possible), number_of_host_galaxies, replace=False) return [possible[i] for i in indices]
[docs] def add_unique_host_galaxies_from_catalog( self, number_of_host_galaxies_to_add: int, used_host_galaxies: list[Galaxy], rng: np.random.Generator | None = None, ) -> list[Galaxy]: if rng is None: rng = np.random.default_rng() if ( len(self.get_possible_host_galaxies()) - len(used_host_galaxies) < number_of_host_galaxies_to_add ): print("Not enough possible host galaxies in catalog.") return used_host_galaxies filtered: list[Any] = [ galaxy for galaxy in self.get_possible_host_galaxies() if galaxy not in used_host_galaxies ] indices = rng.choice(len(filtered), number_of_host_galaxies_to_add, replace=False) new_host_galaxies = [filtered[i] for i in indices] used_host_galaxies.extend(new_host_galaxies) return used_host_galaxies