"""Top-level :class:`MCMCRun` container and :func:`open_mcmc` entry point."""
from __future__ import annotations
from pathlib import Path
from typing import Iterable, Optional, Sequence, Tuple, Union
import numpy as np
from ._analysis import (
MaxResult,
PosteriorSamples,
acceptance_rate,
acceptance_rate_trace,
maximum_likelihood,
maximum_posterior,
posterior_samples,
)
from ._autocorr import autocorrelation_time, effective_sample_size
from ._chains import ChainSet, read_chains
from ._config import MCMCConfig, ModelParameter, parse_mcmc_config
from ._convergence import (
RhatResult,
convergence_step,
gelman_rubin,
geweke,
outlier_chains,
)
from ._mvn_reparam import MVNFit, multivariate_normal_fit
from ._params import (
apply_state,
emit_parameter_files,
read_parameter_file,
write_parameter_file_to,
)
from ._plots import corner_plot
from ._projection import ProjectionPursuitResult, projection_pursuit
[docs]
class MCMCRun:
"""An MCMC run, parsed from its config file and lazily backed by chain data.
Construct via :func:`open_mcmc`. The chain files are not read until
:attr:`chains` is first accessed; subsequent accesses return the cached
:class:`ChainSet`.
Parameters
----------
config:
Parsed :class:`MCMCConfig`.
"""
def __init__(self, config: MCMCConfig) -> None:
self._config = config
self._chains: Optional[ChainSet] = None
# ------------------------------------------------------------------
# Public properties
# ------------------------------------------------------------------
@property
def config(self) -> MCMCConfig:
"""The parsed :class:`MCMCConfig`."""
return self._config
@property
def parameters(self) -> Tuple[ModelParameter, ...]:
"""Active model parameters, in chain-file column order."""
return self._config.parameters
@property
def chains(self) -> ChainSet:
"""Lazily-loaded :class:`ChainSet` for this run."""
if self._chains is None:
self._chains = read_chains(self._config)
return self._chains
# ------------------------------------------------------------------
# Convergence diagnostics
# ------------------------------------------------------------------
[docs]
def gelman_rubin(
self,
*,
drop_chains: Sequence[int] = (),
step_grid: Optional[Sequence[int]] = None,
n_grid: int = 200,
min_steps: int = 10,
alpha_interval: float = 0.15,
) -> RhatResult:
"""Convenience wrapper around :func:`dendros.gelman_rubin`."""
return gelman_rubin(
self.chains,
drop_chains=drop_chains,
step_grid=step_grid,
n_grid=n_grid,
min_steps=min_steps,
alpha_interval=alpha_interval,
)
[docs]
def convergence_step(
self,
*,
threshold: float = 1.1,
sustained_for: int = 1,
drop_chains: Sequence[int] = (),
step_grid: Optional[Sequence[int]] = None,
n_grid: int = 200,
min_steps: int = 10,
) -> Optional[int]:
"""First simulation-step count at which max-Rhat is sustained below *threshold*.
Computes a Gelman-Rubin trace via :meth:`gelman_rubin` and returns the
``RhatResult.steps`` value at which convergence is first declared.
Returns ``None`` if convergence is never reached on the chosen grid.
"""
result = self.gelman_rubin(
drop_chains=drop_chains,
step_grid=step_grid,
n_grid=n_grid,
min_steps=min_steps,
)
idx = convergence_step(
result.Rhat_c_max(),
threshold=threshold,
sustained_for=sustained_for,
)
if idx is None:
return None
return int(result.steps[idx])
[docs]
def geweke(
self,
*,
first: float = 0.1,
last: float = 0.5,
) -> np.ndarray:
"""Convenience wrapper around :func:`dendros.geweke`."""
return geweke(self.chains, first=first, last=last)
[docs]
def outlier_chains(
self,
*,
alpha: float = 0.05,
max_outliers: int = 10,
parameters: Optional[Iterable[str]] = None,
) -> Tuple[int, ...]:
"""Convenience wrapper around :func:`dendros.outlier_chains`."""
return outlier_chains(
self.chains,
alpha=alpha,
max_outliers=max_outliers,
parameters=parameters,
)
# ------------------------------------------------------------------
# Mixing diagnostics
# ------------------------------------------------------------------
[docs]
def acceptance_rate(
self,
*,
post_burn: Optional[int] = None,
) -> np.ndarray:
"""Convenience wrapper around :func:`dendros.acceptance_rate`."""
return acceptance_rate(self.chains, post_burn=post_burn)
[docs]
def acceptance_rate_trace(
self,
*,
window: int = 30,
post_burn: int = 0,
):
"""Convenience wrapper around :func:`dendros.acceptance_rate_trace`."""
return acceptance_rate_trace(self.chains, window=window, post_burn=post_burn)
[docs]
def autocorrelation_time(
self,
*,
post_burn: Optional[int] = None,
c: float = 5.0,
) -> np.ndarray:
"""Convenience wrapper around :func:`dendros.autocorrelation_time`."""
return autocorrelation_time(self.chains, post_burn=post_burn, c=c)
[docs]
def effective_sample_size(
self,
*,
post_burn: Optional[int] = None,
c: float = 5.0,
) -> np.ndarray:
"""Convenience wrapper around :func:`dendros.effective_sample_size`."""
return effective_sample_size(self.chains, post_burn=post_burn, c=c)
# ------------------------------------------------------------------
# Max-posterior / sampling / projection / MVN fit
# ------------------------------------------------------------------
[docs]
def maximum_posterior(
self,
*,
drop_chains: Sequence[int] = (),
) -> MaxResult:
"""Convenience wrapper around :func:`dendros.maximum_posterior`."""
return maximum_posterior(self.chains, drop_chains=drop_chains)
[docs]
def maximum_likelihood(
self,
*,
drop_chains: Sequence[int] = (),
) -> MaxResult:
"""Convenience wrapper around :func:`dendros.maximum_likelihood`."""
return maximum_likelihood(self.chains, drop_chains=drop_chains)
[docs]
def posterior_samples(
self,
n: int,
*,
post_burn: Optional[int] = None,
drop_chains: Sequence[int] = (),
rng: Optional[np.random.Generator] = None,
replace: Optional[bool] = None,
) -> PosteriorSamples:
"""Convenience wrapper around :func:`dendros.posterior_samples`."""
return posterior_samples(
self.chains,
n,
post_burn=post_burn,
drop_chains=drop_chains,
rng=rng,
replace=replace,
)
[docs]
def projection_pursuit(
self,
*,
post_burn: Optional[int] = None,
drop_chains: Sequence[int] = (),
) -> ProjectionPursuitResult:
"""Convenience wrapper around :func:`dendros.projection_pursuit`."""
return projection_pursuit(
self.chains, post_burn=post_burn, drop_chains=drop_chains
)
[docs]
def multivariate_normal_fit(
self,
*,
post_burn: Optional[int] = None,
drop_chains: Sequence[int] = (),
) -> MVNFit:
"""Convenience wrapper around :func:`dendros.multivariate_normal_fit`."""
return multivariate_normal_fit(
self.chains, post_burn=post_burn, drop_chains=drop_chains
)
# ------------------------------------------------------------------
# Parameter-file emission
# ------------------------------------------------------------------
[docs]
def write_parameter_file(
self,
state,
out_path,
*,
likelihood_index: int = 0,
):
"""Emit a single Galacticus parameter file for one likelihood leaf.
Reads ``leaves[likelihood_index].base_parameters_file``, applies the
leaf's ``parameter_map`` (or the full state when no map is set), and
writes the result to *out_path*.
Parameters
----------
state:
``(n_params,)`` state vector in physical (model) space. Galacticus
stores chain rows in physical space, so a row from
:meth:`maximum_posterior` / :meth:`posterior_samples` can be passed
directly.
out_path:
Output path. Parent directory is created if missing.
likelihood_index:
Which leaf of the likelihood tree to use. Defaults to ``0``.
Returns
-------
pathlib.Path
Resolved output path.
"""
if self._config.likelihood is None:
raise ValueError(
"Config has no <posteriorSampleLikelihood>; cannot emit a "
"parameter file."
)
leaves = self._config.likelihood.leaves()
if likelihood_index < 0 or likelihood_index >= len(leaves):
raise IndexError(
f"likelihood_index={likelihood_index!r} out of range; "
f"this run has {len(leaves)} leaf likelihoods."
)
leaf = leaves[likelihood_index]
if leaf.base_parameters_file is None:
raise ValueError(
f"Likelihood leaf {likelihood_index!r} has no "
"<baseParametersFileName>; cannot emit a parameter file."
)
state_arr = np.asarray(state, dtype=float)
tree = read_parameter_file(leaf.base_parameters_file)
apply_state(
tree,
self._config.parameters,
state_arr,
parameter_map=leaf.parameter_map,
)
return write_parameter_file_to(tree, out_path)
[docs]
def corner_plot(
self,
*,
parameters: Optional[Iterable[str]] = None,
post_burn: Optional[int] = None,
drop_chains: Sequence[int] = (),
labels: Optional[Sequence[str]] = None,
**corner_kwargs,
):
"""Convenience wrapper around :func:`dendros.corner_plot`."""
return corner_plot(
self.chains,
parameters=parameters,
post_burn=post_burn,
drop_chains=drop_chains,
labels=labels,
**corner_kwargs,
)
[docs]
def write_parameter_files(
self,
state,
out_dir,
*,
name_format: Optional[str] = None,
):
"""Emit one parameter file per likelihood leaf into *out_dir*.
For ``independentLikelihoods`` configs each leaf has its own
``baseParametersFileName`` and ``parameterMap``; this writes one
file per leaf, with each file's filename derived from the base file's
stem.
Parameters
----------
state:
``(n_params,)`` state vector in physical space.
out_dir:
Output directory (created if missing).
name_format:
Output-filename format string accepting ``leaf_index`` and
``stem``. Defaults to ``"{stem}.xml"`` for a single leaf and
``"{leaf_index:02d}_{stem}.xml"`` for multiple.
Returns
-------
list of (leaf_index, pathlib.Path)
One entry per leaf, in document order.
"""
return emit_parameter_files(
np.asarray(state, dtype=float),
self._config,
out_dir,
name_format=name_format,
)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
def __repr__(self) -> str:
return (
f"<MCMCRun config={self._config.config_path.name!r} "
f"n_params={len(self._config.parameters)} "
f"simulation_kind={self._config.simulation_kind!r}>"
)
def __enter__(self) -> "MCMCRun":
return self
def __exit__(self, *args) -> None:
# Nothing to release: chain files are read fully into memory.
pass
[docs]
def open_mcmc(config_path: Union[str, "Path"]) -> MCMCRun:
"""Open an MCMC run by parsing its config XML.
Parameters
----------
config_path:
Path to the Galacticus MCMC ``<parameters>`` XML file.
Returns
-------
MCMCRun
Examples
--------
>>> from dendros import open_mcmc
>>> with open_mcmc("mcmcConfig.xml") as run:
... print(run.parameters)
... chains = run.chains
"""
return MCMCRun(parse_mcmc_config(config_path))