Source code for dendros._mcmc._projection

"""Projection pursuit: PCA on prior-variance-rescaled posterior samples.

The smallest eigenvalues of the rescaled-sample covariance correspond to the
linear combinations of parameters most tightened by the data relative to the
prior — those are the "best constrained" directions.
"""
from __future__ import annotations

from dataclasses import dataclass
from typing import List, Optional, Sequence, Tuple

import numpy as np

from ._chains import ChainSet
from ._config import ModelParameter, PriorSpec


# ---------------------------------------------------------------------------
# Prior-variance helpers
# ---------------------------------------------------------------------------


def _prior_variance(prior: Optional[PriorSpec], parameter_name: str) -> float:
    """Return the variance of *prior*.

    Currently supports:

    * ``uniform`` with ``limitLower`` and ``limitUpper`` → ``(b - a)^2 / 12``.
    * ``normal`` with ``variance`` (and no truncation) → ``variance``.

    Other prior kinds, or normal priors carrying ``limitLower`` / ``limitUpper``
    (truncated), raise :class:`NotImplementedError`.
    """
    if prior is None:
        raise NotImplementedError(
            f"projection_pursuit requires a prior on every active parameter; "
            f"parameter {parameter_name!r} has none."
        )
    kind = prior.kind
    p = prior.params
    if kind == "uniform":
        lo = p.get("limitLower")
        hi = p.get("limitUpper")
        if lo is None or hi is None:
            raise NotImplementedError(
                f"Uniform prior on {parameter_name!r} is missing "
                "limitLower / limitUpper."
            )
        return (float(hi) - float(lo)) ** 2 / 12.0
    if kind == "normal":
        var = p.get("variance")
        if var is None:
            raise NotImplementedError(
                f"Normal prior on {parameter_name!r} is missing 'variance'."
            )
        if "limitLower" in p or "limitUpper" in p:
            raise NotImplementedError(
                f"Truncated-normal prior on {parameter_name!r} is not yet "
                "supported by projection_pursuit; please raise an issue if "
                "you need this."
            )
        return float(var)
    raise NotImplementedError(
        f"projection_pursuit does not yet support prior kind {kind!r} "
        f"(on parameter {parameter_name!r})."
    )


def _apply_mapper(values: np.ndarray, mapper: str, parameter_name: str) -> np.ndarray:
    """Apply *mapper* to *values* (sampler-space transform).

    Currently supports:

    * ``identity`` — pass through.

    Other mappers raise :class:`NotImplementedError`; please raise an issue if
    a particular mapper is needed.
    """
    if mapper == "identity":
        return values
    raise NotImplementedError(
        f"projection_pursuit does not yet support operatorUnaryMapper "
        f"value={mapper!r} (on parameter {parameter_name!r}).  Currently "
        "only 'identity' is implemented."
    )


# ---------------------------------------------------------------------------
# Result dataclass
# ---------------------------------------------------------------------------


[docs] @dataclass(frozen=True) class ProjectionPursuitResult: """Result of :func:`projection_pursuit`. Attributes ---------- eigenvalues: ``(n_params,)`` ascending eigenvalues of the rescaled-sample covariance matrix. Smaller is "better constrained". eigenvectors: ``(n_params, n_params)`` matrix whose ``[:, k]`` column is the eigenvector for ``eigenvalues[k]``, expressed in *rescaled-mapped* space (i.e. in the same coordinates used for the eigendecomposition). parameter_names: Parameter names along axis 0. parameter_labels: :attr:`ModelParameter.display_label` strings parallel to :attr:`parameter_names`. prior_sigma: ``(n_params,)`` square root of the prior variance for each parameter, the rescaling that was applied before eigendecomposition. Methods ------- direction: Components of one eigenvector that exceed a contribution threshold. latex_summary: LaTeX-rendered summary line for a chosen direction. """ eigenvalues: np.ndarray eigenvectors: np.ndarray parameter_names: Tuple[str, ...] parameter_labels: Tuple[str, ...] prior_sigma: np.ndarray
[docs] def direction( self, index: int, *, contribution_minimum: float = 0.05, ) -> List[Tuple[str, float]]: """Significant components of eigenvector *index*. Parameters ---------- index: Eigenvector index (0 = best constrained). contribution_minimum: Drop components whose squared loading is below this threshold. Returns ------- list of (label, loading) pairs Sorted by descending absolute loading. """ v = self.eigenvectors[:, index] out = [ (self.parameter_labels[i], float(v[i])) for i in range(v.size) if v[i] ** 2 >= contribution_minimum ] out.sort(key=lambda t: abs(t[1]), reverse=True) return out
[docs] def latex_summary( self, index: int, *, contribution_minimum: float = 0.05, precision: int = 3, ) -> str: """LaTeX-formatted summary of eigenvector *index*'s significant components.""" parts = [] for label, val in self.direction(index, contribution_minimum=contribution_minimum): sign = "+" if val >= 0 and parts else ("-" if val < 0 else "") parts.append(f"{sign} {abs(val):.{precision}f}\\,{label}".strip()) return " ".join(parts) if parts else ""
# --------------------------------------------------------------------------- # Main entry point # ---------------------------------------------------------------------------
[docs] def projection_pursuit( chains: ChainSet, *, post_burn: Optional[int] = None, drop_chains: Sequence[int] = (), ) -> ProjectionPursuitResult: """Find the linear combinations of parameters best constrained by the data. Each post-burn parameter column is mapped via its ``operatorUnaryMapper``, normalised by ``sqrt(prior variance)``, and mean-centred. The covariance matrix of the resulting samples is eigendecomposed, and the eigenvalues/eigenvectors are returned sorted by ascending eigenvalue — so ``eigenvectors[:, 0]`` is the linear combination most tightly constrained relative to the prior. Parameters ---------- chains: :class:`ChainSet`. post_burn: Number of leading rows to skip per chain. ``None`` triggers automatic detection. drop_chains: ``chain_index`` values to exclude. Returns ------- ProjectionPursuitResult Raises ------ NotImplementedError If any active parameter uses a prior or mapper not yet supported by :func:`projection_pursuit` (currently uniform/normal priors and ``identity`` mapper only). ValueError If the post-burn pool is empty or contains fewer than two rows. """ from ._convergence import _resolve_post_burn burn = _resolve_post_burn(chains, post_burn) drop = set(int(i) for i in drop_chains) parameters: Tuple[ModelParameter, ...] = chains.config.parameters n_params = len(parameters) # Pre-compute prior-sigma vector and mapper kinds. prior_sigma = np.empty(n_params, dtype=float) for i, p in enumerate(parameters): var = _prior_variance(p.prior, p.name) if var <= 0: raise ValueError( f"Prior variance on parameter {p.name!r} is non-positive." ) prior_sigma[i] = np.sqrt(var) # Concatenate post-burn samples across chains (with mappers applied). parts: List[np.ndarray] = [] for c in chains: if c.chain_index in drop: continue if c.n_steps <= burn: continue seg = c.state[burn:].copy() for i, p in enumerate(parameters): seg[:, i] = _apply_mapper(seg[:, i], p.mapper, p.name) parts.append(seg) if not parts: raise ValueError( "No post-burn samples available — every chain was dropped or " "shorter than post_burn." ) samples = np.concatenate(parts, axis=0) if samples.shape[0] < 2: raise ValueError( f"Need at least two post-burn samples for projection_pursuit; " f"got {samples.shape[0]}." ) # Rescale by prior sigma and centre. rescaled = samples / prior_sigma[np.newaxis, :] rescaled -= rescaled.mean(axis=0, keepdims=True) # Symmetric covariance → eigh. cov = rescaled.T @ rescaled / (rescaled.shape[0] - 1) cov = 0.5 * (cov + cov.T) # symmetrise to suppress numerical asymmetry eigvals, eigvecs = np.linalg.eigh(cov) # eigh returns ascending eigenvalues already; double-check / be explicit. order = np.argsort(eigvals) eigvals = eigvals[order] eigvecs = eigvecs[:, order] return ProjectionPursuitResult( eigenvalues=eigvals, eigenvectors=eigvecs, parameter_names=chains.config.parameter_names, parameter_labels=tuple(p.display_label for p in parameters), prior_sigma=prior_sigma, )