Source code for dendros._mcmc._params

"""Read, modify, and write Galacticus base-parameter XML files.

Implements:

* A Galacticus-style parameter-path resolver supporting ``::`` / ``/``
  separators and ``[N]`` / ``[@value='...']`` selectors.
* :func:`apply_state` — set parameter elements' ``value=`` attributes from a
  state vector.
* :func:`read_parameter_file` / :func:`write_parameter_file_to` — XML round-trip
  using the standard library's ``xml.etree``.

Notes
-----
``xml.etree.ElementTree`` strips comments and reformats whitespace.  If you
need byte-perfect round-trips, use ``lxml`` directly via :mod:`lxml.etree`
(this module's API is intentionally narrow so swapping is easy).  Since the
typical use of these files is "generate, hand to Galacticus", that loss is
acceptable.
"""
from __future__ import annotations

import re
from pathlib import Path
from typing import Iterable, List, Optional, Sequence, Tuple, Union
from xml.etree import ElementTree as ET

import numpy as np

from ._config import ModelParameter


# ---------------------------------------------------------------------------
# Path resolution
# ---------------------------------------------------------------------------


_SEGMENT = re.compile(
    r"""
    ^
    (?P<name>[A-Za-z_][A-Za-z0-9_]*)            # element name
    (?:\[                                       # optional predicate
        (?P<predicate>
            \d+                                 # 1-based index
          |
            @value\s*=\s*['"][^'"]*['"]         # attribute filter
        )
    \])?
    $
    """,
    re.VERBOSE,
)

_VALUE_PRED = re.compile(r"""@value\s*=\s*(?P<quote>['"])(?P<val>[^'"]*)(?P=quote)""")


[docs] def resolve_parameter_path(root: ET.Element, path: str) -> ET.Element: """Locate the XML element identified by a Galacticus parameter *path*. Parameters ---------- root: Root :class:`xml.etree.ElementTree.Element` to search. path: Slash- or ``::``-separated parameter path. Each segment is an element name, optionally followed by a ``[N]`` integer (1-based) instance selector or a ``[@value='x']`` attribute filter — matching the Galacticus parameter-file convention. Returns ------- xml.etree.ElementTree.Element Raises ------ KeyError If any segment does not match an element under the current node. ValueError If a path segment is malformed. """ segments = [s for s in path.replace("::", "/").strip("/").split("/") if s] if not segments: raise ValueError(f"Empty parameter path: {path!r}") current = root walked: List[str] = [] for seg in segments: m = _SEGMENT.match(seg) if m is None: raise ValueError(f"Malformed parameter-path segment {seg!r} in {path!r}") name = m.group("name") predicate = m.group("predicate") children = list(current.findall(name)) if not children: raise KeyError( f"No element {name!r} found under " f"{('/' + '/'.join(walked)) if walked else 'root'} " f"(path: {path!r})" ) if predicate is None: current = children[0] elif predicate.isdigit(): idx = int(predicate) if idx < 1 or idx > len(children): raise KeyError( f"Index [{idx}] out of range for {name!r} " f"(found {len(children)} matches; path {path!r})" ) current = children[idx - 1] else: vm = _VALUE_PRED.match(predicate) if vm is None: raise ValueError( f"Malformed predicate {predicate!r} in path {path!r}" ) wanted = vm.group("val") matches = [c for c in children if c.get("value") == wanted] if not matches: raise KeyError( f"No {name!r} element with value={wanted!r} found " f"(path: {path!r})" ) current = matches[0] walked.append(seg) return current
# --------------------------------------------------------------------------- # State application # --------------------------------------------------------------------------- def _format_value(x: float) -> str: """Format *x* as a string Galacticus will round-trip cleanly.""" return repr(float(x))
[docs] def apply_state( tree: ET.ElementTree, parameters: Sequence[ModelParameter], state: np.ndarray, *, parameter_map: Optional[Iterable[str]] = None, ) -> None: """Set ``value=`` attributes in *tree* from a state vector. Parameters ---------- tree: Parsed XML tree to modify in place. parameters: Active model parameters (the same ordering used by chain columns). state: ``(n_params,)`` state vector aligned with *parameters*. parameter_map: Optional iterable of parameter names to apply. ``None`` applies every entry of *parameters*; non-``None`` applies only those named and is the typical case for an ``independentLikelihoods`` leaf, whose base parameter file mentions only that leaf's parameters. Raises ------ KeyError If a parameter's path does not resolve in *tree*, or if a name in *parameter_map* is not among *parameters*. ValueError If *state* doesn't match the length of *parameters*. """ if state.shape != (len(parameters),): raise ValueError( f"state has shape {state.shape!r}; expected ({len(parameters)},)" ) if parameter_map is None: targets = list(range(len(parameters))) else: index_by_name = {p.name: i for i, p in enumerate(parameters)} targets = [] for name in parameter_map: if name not in index_by_name: raise KeyError( f"parameterMap entry {name!r} is not among the active " f"model parameters {list(index_by_name)!r}." ) targets.append(index_by_name[name]) root = tree.getroot() for i in targets: param = parameters[i] el = resolve_parameter_path(root, param.name) el.set("value", _format_value(state[i]))
# --------------------------------------------------------------------------- # File I/O # ---------------------------------------------------------------------------
[docs] def read_parameter_file(path: Union[str, "Path"]) -> ET.ElementTree: """Parse a Galacticus parameter XML file. Parameters ---------- path: Path to the file. Returns ------- xml.etree.ElementTree.ElementTree """ p = Path(path) if not p.exists(): raise FileNotFoundError(f"Base parameter file not found: {p}") return ET.parse(str(p))
[docs] def write_parameter_file_to(tree: ET.ElementTree, path: Union[str, "Path"]) -> Path: """Write *tree* to *path* with an XML declaration. Returns the resolved output path. """ out = Path(path) out.parent.mkdir(parents=True, exist_ok=True) tree.write(str(out), encoding="utf-8", xml_declaration=True) return out.resolve()
# --------------------------------------------------------------------------- # Convenience: emit one parameter file per likelihood leaf # ---------------------------------------------------------------------------
[docs] def emit_parameter_files( state: np.ndarray, config, out_dir: Union[str, "Path"], *, name_format: Optional[str] = None, ) -> List[Tuple[int, Path]]: """Write one Galacticus parameter file per leaf of ``config.likelihood``. Reads each leaf's ``base_parameters_file``, applies the subset of *state* selected by that leaf's ``parameter_map`` (or the full state when no map is set), and writes the modified XML into *out_dir*. Parameters ---------- state: ``(n_params,)`` state vector (in physical / model space, as stored in the chain log file — no mapper inversion is applied). config: :class:`MCMCConfig`. out_dir: Output directory (created if missing). name_format: Format string for output filenames; receives ``leaf_index`` and ``stem`` (the base file's stem). Defaults to ``"{stem}.xml"`` for a single leaf and ``"{leaf_index:02d}_{stem}.xml"`` for multiple, so per-leaf files don't collide when several leaves share a base stem. Returns ------- list of (leaf_index, written_path) One tuple per leaf, in document order. Raises ------ ValueError If ``config.likelihood`` is ``None``, or if any leaf lacks a ``base_parameters_file``. KeyError If a parameter's path does not resolve in the corresponding base file, or a ``parameter_map`` references an unknown parameter. """ state = np.asarray(state, dtype=float) if config.likelihood is None: raise ValueError( "Config has no <posteriorSampleLikelihood>; cannot emit parameter " "files." ) leaves = config.likelihood.leaves() if any(leaf.base_parameters_file is None for leaf in leaves): missing = [i for i, leaf in enumerate(leaves) if leaf.base_parameters_file is None] raise ValueError( f"Likelihood leaves {missing!r} have no <baseParametersFileName>; " "those leaves do not derive from posteriorSampleLikelihoodBaseParameters." ) out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) out_dir_resolved = out_dir.resolve() if name_format is None: name_format = "{stem}.xml" if len(leaves) == 1 else "{leaf_index:02d}_{stem}.xml" written: List[Tuple[int, Path]] = [] for i, leaf in enumerate(leaves): tree = read_parameter_file(leaf.base_parameters_file) apply_state( tree, config.parameters, state, parameter_map=leaf.parameter_map, ) stem = Path(leaf.base_parameters_file).stem formatted = name_format.format(leaf_index=i, stem=stem) out_path = (out_dir / formatted) # Containment check: refuse to write outside out_dir, regardless of # whether name_format contained an absolute path or `..` segments. try: out_path.resolve().relative_to(out_dir_resolved) except ValueError: raise ValueError( f"name_format={name_format!r} produced path " f"{out_path} which resolves outside out_dir " f"{out_dir_resolved}. Refusing to write." ) from None write_parameter_file_to(tree, out_path) written.append((i, out_path.resolve())) return written