"""Core collection abstraction and ``open_outputs`` entry point."""
from __future__ import annotations
import glob as _glob
import re
import warnings
from pathlib import Path
from typing import Dict, Iterator, List, Mapping, Optional, Sequence, Union
import h5py
import numpy as np
from ._outputs import OutputIndex
_MPI_SUFFIX = re.compile(r"^(.+):MPI(\d{4})$")
def _default_model_label(primary_path: str) -> str:
"""Default model label for a Collection: file stem with ``:MPIxxxx`` stripped.
``/runs/fiducial.hdf5`` -> ``"fiducial"``;
``/runs/fiducial:MPI0000.hdf5`` -> ``"fiducial"``.
"""
stem = Path(primary_path).stem
m = _MPI_SUFFIX.match(stem)
return m.group(1) if m else stem
# ---------------------------------------------------------------------------
# Attribute helpers
# ---------------------------------------------------------------------------
def _decode(value) -> str:
"""Decode bytes/numpy-string HDF5 attribute values to ``str``."""
if isinstance(value, (bytes, bytearray)):
return value.decode("utf-8", errors="replace")
# numpy.bytes_ is a subclass of bytes; numpy.str_ is a subclass of str
return str(value) if not isinstance(value, str) else value
def _resolve_output_name(output: Union[str, int]) -> str:
"""Return the Output* group name for a string or 1-based integer."""
if isinstance(output, int):
return f"Output{output}"
return output
# ---------------------------------------------------------------------------
# Proxy objects – lightweight h5py-like wrappers
# ---------------------------------------------------------------------------
[docs]
class GroupProxy:
"""Read-only h5py-like proxy for an HDF5 group.
Parameters
----------
collection:
Parent :class:`Collection`.
path:
HDF5 path to the group within the file.
"""
def __init__(self, collection: "Collection", path: str) -> None:
self._collection = collection
self._path = path
def _group(self) -> h5py.Group:
return self._collection._primary[self._path]
@property
def attrs(self) -> dict:
"""Return group attributes as a plain dict."""
return {k: _decode(v) for k, v in self._group().attrs.items()}
@property
def name(self) -> str:
"""HDF5 path of this group."""
return self._path
[docs]
def keys(self) -> List[str]:
"""Return the immediate children of this group."""
return list(self._group().keys())
def __iter__(self) -> Iterator[str]:
return iter(self._group().keys())
def __contains__(self, key: str) -> bool:
return key in self._group()
def __getitem__(self, key: str) -> Union["GroupProxy", "DatasetProxy"]:
path = self._path.rstrip("/") + "/" + key.lstrip("/")
item = self._collection._primary[path]
if isinstance(item, h5py.Group):
return GroupProxy(self._collection, path)
return DatasetProxy(self._collection, path)
def __repr__(self) -> str:
return f"<GroupProxy '{self._path}'>"
[docs]
class DatasetProxy:
"""Read-only h5py-like proxy for an HDF5 dataset.
For multi-file :class:`Collection` instances, :meth:`read` concatenates
data from all files along axis 0.
Parameters
----------
collection:
Parent :class:`Collection`.
path:
HDF5 path to the dataset within the file.
"""
def __init__(self, collection: "Collection", path: str) -> None:
self._collection = collection
self._path = path
def _dataset(self) -> h5py.Dataset:
return self._collection._primary[self._path]
@property
def attrs(self) -> dict:
"""Return dataset attributes as a plain dict."""
return {k: _decode(v) for k, v in self._dataset().attrs.items()}
@property
def dtype(self):
"""NumPy dtype of the dataset."""
return self._dataset().dtype
@property
def shape(self) -> tuple:
"""Total shape; for multi-file collections axis-0 is the sum across files."""
shapes = [h[self._path].shape for h in self._collection._handles]
if not shapes:
return ()
if len(shapes) == 1:
return shapes[0]
total = sum(s[0] for s in shapes)
return (total,) + shapes[0][1:]
@property
def name(self) -> str:
"""HDF5 path of this dataset."""
return self._path
[docs]
def read(self, where=None) -> np.ndarray:
"""Read the dataset into a :class:`numpy.ndarray`.
For multi-file collections the arrays from all files are concatenated
along axis 0 before the optional *where* selection is applied.
Parameters
----------
where:
``None`` reads everything. A boolean mask or integer index array
is applied after concatenation.
"""
arrays = [h[self._path][()] for h in self._collection._handles]
arr = np.concatenate(arrays, axis=0) if len(arrays) > 1 else arrays[0]
if where is not None:
arr = arr[np.asarray(where)]
return arr
def __getitem__(self, selection) -> np.ndarray:
"""Index into the dataset of the primary file (h5py-like)."""
return self._dataset()[selection]
def __repr__(self) -> str:
return f"<DatasetProxy '{self._path}' dtype={self.dtype} shape={self.shape}>"
# ---------------------------------------------------------------------------
# Collection
# ---------------------------------------------------------------------------
[docs]
class Collection:
"""A collection of one or more Galacticus HDF5 output files.
Prefer constructing instances through :func:`open_outputs` rather than
calling this constructor directly.
Parameters
----------
files:
Paths to the HDF5 files to open (read-only).
output_root:
Name of the top-level HDF5 group that contains the ``Output*`` subgroups.
Defaults to ``"Outputs"``.
Examples
--------
>>> from dendros import open_outputs
>>> with open_outputs("galacticus.hdf5") as c:
... c.validate_completion()
... print(c.list_outputs())
... data = c.read("Output1", ["nodeData/haloMass"])
"""
def __init__(self, files: List[str], output_root: str = "Outputs") -> None:
self._files: List[str] = list(files)
self._output_root: str = output_root
self._handles: List[h5py.File] = [h5py.File(f, "r") for f in self._files]
self._outputs_index: Optional[OutputIndex] = None
# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------
@property
def _primary(self) -> h5py.File:
"""The first open file handle; used for metadata and structure."""
return self._handles[0]
# ------------------------------------------------------------------
# Public properties
# ------------------------------------------------------------------
@property
def files(self) -> List[str]:
"""Paths of the files in this collection (in order)."""
return list(self._files)
@property
def output_root(self) -> str:
"""Top-level HDF5 group containing the ``Output*`` groups."""
return self._output_root
# ------------------------------------------------------------------
# h5py-like interface
# ------------------------------------------------------------------
[docs]
def keys(self) -> List[str]:
"""Return the top-level group keys of the primary file."""
return list(self._primary.keys())
def __getitem__(self, key: str) -> Union[GroupProxy, DatasetProxy]:
"""Access a group or dataset by HDF5 path (h5py-like)."""
item = self._primary[key]
if isinstance(item, h5py.Group):
return GroupProxy(self, key)
return DatasetProxy(self, key)
def __contains__(self, key: str) -> bool:
return key in self._primary
def __repr__(self) -> str:
n = len(self._files)
return (
f"<Collection files={n} output_root='{self._output_root}'"
f" path='{self._files[0]}'>"
)
# ------------------------------------------------------------------
# Completion validation
# ------------------------------------------------------------------
[docs]
def validate_completion(self, mode: str = "error") -> None:
"""Check that all files report a successful completion status.
Galacticus writes a ``statusCompletion`` attribute to the root of the
HDF5 file when it finishes. This method verifies that the attribute
equals ``"complete"`` for every file in the collection.
Parameters
----------
mode:
What to do when an incomplete file is found:
* ``"error"`` (default) – raise :exc:`RuntimeError`.
* ``"warn"`` – emit a :class:`UserWarning` and continue.
* ``"ignore"`` – do nothing.
Raises
------
ValueError
If *mode* is not one of the accepted values.
RuntimeError
If *mode* is ``"error"`` and at least one file is incomplete.
"""
if mode not in ("error", "warn", "ignore"):
raise ValueError(
f"mode must be 'error', 'warn', or 'ignore'; got {mode!r}"
)
incomplete = []
for path, h in zip(self._files, self._handles):
status = h.attrs.get("statusCompletion")
if status != 0:
incomplete.append((path, status))
if not incomplete:
return
lines = ["The following files have incomplete or missing statusCompletion:"]
for path, status in incomplete:
lines.append(f" {path}: statusCompletion={status!r}")
msg = "\n".join(lines)
if mode == "error":
raise RuntimeError(msg)
elif mode == "warn":
warnings.warn(msg, UserWarning, stacklevel=2)
# ------------------------------------------------------------------
# Output discovery
# ------------------------------------------------------------------
@property
def outputs(self) -> OutputIndex:
"""An :class:`~dendros._outputs.OutputIndex` for this collection.
The index is scanned lazily on first access and then cached.
"""
if self._outputs_index is None:
self._outputs_index = OutputIndex(self)
return self._outputs_index
[docs]
def list_outputs(self, format: str = "astropy"):
"""Return a table of available outputs.
Scans all ``Output*`` groups inside ``/{output_root}/`` and extracts
``outputTime`` and ``outputExpansionFactor`` attributes. Redshift is
computed as *z = 1/a − 1*.
Parameters
----------
format:
``"astropy"`` (default) returns an :class:`astropy.table.Table`;
``"pandas"`` returns a :class:`pandas.DataFrame`;
``"tabulate"`` returns a ``str`` formatted using the ``tabulate`` library.
Returns
-------
astropy.table.Table, pandas.DataFrame, or tabulate-formatted string
"""
return self.outputs.table(format=format)
# ------------------------------------------------------------------
# Properties table
# ------------------------------------------------------------------
[docs]
def list_properties(
self, output: Union[str, int], format: str = "astropy"
):
"""Return a table of datasets available in the ``nodeData`` group.
Parameters
----------
output:
Output name (e.g. ``"Output1"``) or 1-based integer index.
format:
``"astropy"`` (default), ``"pandas"``, or ``"tabulate"``.
Returns
-------
astropy.table.Table, pandas.DataFrame, or tabulate-formatted string
"""
output_name = _resolve_output_name(output)
path = f"{self._output_root}/{output_name}/nodeData"
try:
group = self._primary[path]
except KeyError:
raise KeyError(
f"nodeData group not found at '{path}'. "
f"Check output_root='{self._output_root}' and "
f"output='{output_name}'."
) from None
rows: List[dict] = []
for name in sorted(group.keys()):
ds = group[name]
attrs = {k: _decode(v) for k, v in ds.attrs.items()}
raw_units = attrs.get("unitsInSI")
try:
units_val = float(raw_units) if raw_units not in (None, "") else 1.0
except (TypeError, ValueError):
units_val = raw_units
rows.append(
{
"name": name,
"dtype": str(ds.dtype),
"shape": str(ds.shape),
"description": attrs.get("comment", ""),
"unitsInSI": units_val,
}
)
return _make_table(rows, format=format, maxcolwidths=[None, None, None, None, 25, None])
# ------------------------------------------------------------------
# Reading datasets
# ------------------------------------------------------------------
[docs]
def read(
self,
output: Union[str, int],
datasets: Union[List[str], Dict[str, str]],
where=None,
) -> Dict[str, np.ndarray]:
"""Read one or more datasets from an output group.
For multi-file collections, arrays from all files are concatenated
along axis 0 before any selection is applied.
Parameters
----------
output:
Output name (e.g. ``"Output1"``) or 1-based integer index.
datasets:
Either a list of relative dataset paths under the output group
(e.g. ``["nodeData/haloMass"]``), in which case the same strings
are used as dict keys in the return value; or a :class:`dict`
mapping user-chosen labels to relative paths.
where:
``None`` reads all rows. A boolean mask array of length
*N_total* or an integer index array selects a subset.
Returns
-------
dict
Mapping from dataset name / label to :class:`numpy.ndarray`.
Notes
-----
The ``unitsInSI`` attribute is preserved in the raw array values
but not yet applied. Future versions will optionally return
:class:`astropy.units.Quantity` objects.
"""
output_name = _resolve_output_name(output)
output_path = f"{self._output_root}/{output_name}"
if isinstance(datasets, dict):
datasets_map: Dict[str, str] = datasets
else:
datasets_map = {ds: ds for ds in datasets}
result: Dict[str, np.ndarray] = {}
for label, rel_path in datasets_map.items():
full_path = output_path + "/" + rel_path.lstrip("/")
arrays: List[np.ndarray] = []
for h in self._handles:
try:
arrays.append(h[full_path][()])
except KeyError:
raise KeyError(
f"Dataset '{full_path}' not found in '{h.filename}'"
) from None
arr = np.concatenate(arrays, axis=0) if len(arrays) > 1 else arrays[0]
if where is not None:
arr = arr[np.asarray(where)]
result[label] = arr
return result
# ------------------------------------------------------------------
# Galaxy history
# ------------------------------------------------------------------
[docs]
def trace_history(
self,
ids,
properties: Union[List[str], Dict[str, str]],
outputs=None,
*,
id_dataset: str = "nodeData/nodeUniqueIDBranchTip",
on_duplicate_file_match: str = "error",
int_sentinel: int = -1,
) -> Dict[str, np.ndarray]:
"""Trace the history of specified galaxies across outputs.
Convenience wrapper around :func:`dendros.trace_galaxy_history`.
See that function for full parameter and return-value documentation.
"""
from ._galaxy_history import trace_galaxy_history
return trace_galaxy_history(
self,
ids,
properties,
outputs=outputs,
id_dataset=id_dataset,
on_duplicate_file_match=on_duplicate_file_match,
int_sentinel=int_sentinel,
)
# ------------------------------------------------------------------
# Analyses
# ------------------------------------------------------------------
[docs]
def list_analyses(self, format: str = "astropy"):
"""Return a table of ``function1D`` analyses in ``/analyses``.
Convenience wrapper around :func:`dendros.list_analyses`.
"""
from ._analyses import list_analyses
return list_analyses(self, format=format)
[docs]
def plot_analyses(
self,
name=None,
output_directory=None,
*,
show_target: bool = True,
figsize=(7.0, 5.0),
dpi: int = 120,
file_format: str = "pdf",
):
"""Plot ``function1D`` analyses from ``/analyses``.
Convenience wrapper around :func:`dendros.plot_analyses`.
"""
from ._analyses import plot_analyses
return plot_analyses(
self,
name=name,
output_directory=output_directory,
show_target=show_target,
figsize=figsize,
dpi=dpi,
file_format=file_format,
)
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def close(self) -> None:
"""Close all open HDF5 file handles."""
for h in self._handles:
try:
h.close()
except Exception:
pass
self._handles = []
def __enter__(self) -> "Collection":
return self
def __exit__(self, *args) -> None:
self.close()
# ---------------------------------------------------------------------------
# Table helper
# ---------------------------------------------------------------------------
def _make_table(rows: List[dict], format: str, **kwargs):
"""Convert a list of row dicts to the requested table format."""
if format == "astropy":
from astropy.table import Table
if not rows:
return Table()
keys = list(rows[0].keys())
data = {k: [r[k] for r in rows] for k in keys}
return Table(data)
elif format == "pandas":
try:
import pandas as pd
except ImportError as exc:
raise ImportError(
"pandas is not installed. "
"Install it with: pip install 'dendros[pandas]'"
) from exc
if not rows:
return pd.DataFrame()
return pd.DataFrame(rows)
elif format == "tabulate":
try:
import pandas as pd
except ImportError as exc:
raise ImportError(
"pandas is not installed. "
"Install it with: pip install 'dendros[pandas]'"
) from exc
try:
from tabulate import tabulate
except ImportError as exc:
raise ImportError(
"tabulate is not installed. "
"Install it with: pip install 'dendros[tabulate]'"
) from exc
if not rows:
return ""
return tabulate(pd.DataFrame(rows), headers=list(rows[0].keys()), **kwargs)
else:
raise ValueError(f"format must be 'astropy', 'pandas', or 'tabulate'; got {format!r}")
# ---------------------------------------------------------------------------
# open_outputs
# ---------------------------------------------------------------------------
[docs]
def open_outputs(
path: Union[
str,
"Path",
List[Union[str, "Path"]],
Mapping[str, Union[str, "Path", List[Union[str, "Path"]]]],
],
output_root: str = "Outputs",
) -> Union[Collection, "ModelCollection"]:
"""Open a Galacticus output collection.
Parameters
----------
path:
One of:
* A single filename – e.g. ``"galacticus.hdf5"``. If sibling
MPI-rank files (``*_MPI:????``) exist they are included
automatically.
* A glob string – e.g. ``"run*/galacticus*.hdf5"``.
* An explicit list of filenames.
* A dict ``{label: path-or-paths}`` to open several models for
side-by-side comparison. Equivalent to
:func:`open_models(path) <open_models>`; returns a
:class:`ModelCollection`.
output_root:
Top-level HDF5 group containing the ``Output*`` groups.
Defaults to ``"Outputs"``. Pass ``"Lightcone"`` for lightcone runs
or any other custom group name as needed.
Returns
-------
Collection
For the single-, glob-, or list-of-files forms (one model,
possibly MPI-split).
ModelCollection
For the dict form (one entry per model, suitable for
multi-model comparison plots).
Raises
------
FileNotFoundError
If no files are found matching *path*.
TypeError
If *path* is not a ``str``, :class:`pathlib.Path`, ``list``, or
``dict``.
Examples
--------
Open a single file::
c = open_outputs("galacticus.hdf5")
Auto-detect MPI-split files (given any one rank's file)::
c = open_outputs("galacticus_MPI:0000.hdf5")
Open via glob::
c = open_outputs("run001/galacticus*.hdf5")
Open an explicit list::
c = open_outputs(["file_a.hdf5", "file_b.hdf5"])
Open several models for comparison plots (see also :func:`open_models`)::
m = open_outputs({"Fiducial": "fid.hdf5", "Variant": "var.hdf5"})
figs = m.plot_analyses()
Lightcone mode::
c = open_outputs("lightcone.hdf5", output_root="Lightcone")
"""
if isinstance(path, Mapping):
return open_models(path, output_root=output_root)
if isinstance(path, list):
files = [str(p) for p in path]
elif isinstance(path, (str, Path)):
path = str(path)
expanded = sorted(_glob.glob(path))
if not expanded:
raise FileNotFoundError(f"No files found matching: {path!r}")
if len(expanded) == 1:
files = _auto_detect_mpi(expanded[0])
else:
files = expanded
else:
raise TypeError(
f"path must be str, Path, list, or dict; got {type(path).__name__!r}"
)
if not files:
raise FileNotFoundError("No HDF5 files found.")
return Collection(files, output_root=output_root)
[docs]
class ModelCollection(dict):
"""A ``dict`` mapping label → :class:`Collection`, one entry per model.
Returned by :func:`open_models` and accepted by
:func:`~dendros.plot_analyses` so analyses from several Galacticus runs
can be overlaid on a single figure for comparison.
Acts as a regular dict but also supports the context-manager protocol —
``__exit__`` closes every contained Collection.
"""
[docs]
def close(self) -> None:
"""Close every contained Collection.
Each close is attempted independently; if one fails the others
are still closed and a :class:`UserWarning` is emitted naming
the failing model so the problem is visible.
"""
for label, c in self.items():
try:
c.close()
except Exception as exc:
warnings.warn(
f"ModelCollection: closing model {label!r} raised "
f"{type(exc).__name__}: {exc}",
UserWarning,
stacklevel=2,
)
def __enter__(self) -> "ModelCollection":
return self
def __exit__(self, *args) -> None:
self.close()
[docs]
def list_analyses(self, format: str = "astropy"):
"""Return the union of ``function1D`` analyses across all models.
Each row carries an extra ``models`` column listing the labels of
the models that contain the analysis (sorted, comma-separated).
Per-row metadata (description, axis labels, log flags, target
presence) is taken from the first model that supplies the
analysis.
"""
from ._analyses import _analyses_root, _attr_bool, _attr_str, _discover
meta: Dict[str, dict] = {}
appears: Dict[str, list] = {}
for label, c in self.items():
for name, grp in _discover(_analyses_root(c)):
if name not in meta:
meta[name] = {
"name": name,
"description": _attr_str(grp, "description"),
"xAxisLabel": _attr_str(grp, "xAxisLabel"),
"yAxisLabel": _attr_str(grp, "yAxisLabel"),
"xAxisIsLog": _attr_bool(grp, "xAxisIsLog"),
"yAxisIsLog": _attr_bool(grp, "yAxisIsLog"),
"hasTarget": "yDatasetTarget" in grp.attrs,
}
appears[name] = []
appears[name].append(label)
rows = []
for name in sorted(meta):
row = dict(meta[name])
row["models"] = ", ".join(sorted(appears[name]))
rows.append(row)
return _make_table(rows, format=format)
[docs]
def plot_analyses(
self,
name=None,
output_directory=None,
*,
show_target: bool = True,
figsize=(7.0, 5.0),
dpi: int = 120,
file_format: str = "pdf",
):
"""Plot ``function1D`` analyses overlaid across every model.
Convenience wrapper around :func:`dendros.plot_analyses` applied
to this :class:`ModelCollection`. Labels come from this dict's
keys; the target overlay is drawn once.
"""
from ._analyses import plot_analyses
return plot_analyses(
self,
name=name,
output_directory=output_directory,
show_target=show_target,
figsize=figsize,
dpi=dpi,
file_format=file_format,
)
[docs]
def open_models(
models: Union[
Mapping[str, Union[str, "Path", List[Union[str, "Path"]]]],
Sequence[Union[str, "Path", List[Union[str, "Path"]]]],
],
output_root: str = "Outputs",
) -> ModelCollection:
"""Open several Galacticus runs as a labelled :class:`ModelCollection`.
Each entry is opened with :func:`open_outputs`, so a single filename
auto-detects its ``*_MPI:????`` peers and a list-of-files entry is
accepted as-is. The returned :class:`ModelCollection` can be passed
directly to :func:`~dendros.plot_analyses` to overlay analyses from
every model on a single figure.
Parameters
----------
models:
Either a dict ``{label: path-or-paths}`` — keys become legend
labels — or a list of ``path-or-paths``, in which case default
labels are derived from each model's primary file stem (with any
``:MPIxxxx`` suffix stripped).
output_root:
Forwarded to :func:`open_outputs`.
Returns
-------
ModelCollection
Raises
------
ValueError
If a list form produces duplicate default labels. Pass a dict
with explicit labels to disambiguate.
Examples
--------
Compare two models with explicit labels::
with open_models({"Fiducial": "fid.hdf5", "Variant": "var.hdf5"}) as m:
figs = dendros.plot_analyses(m)
Or with labels derived from filenames::
models = open_models(["fid.hdf5", "var.hdf5"])
"""
out = ModelCollection()
if isinstance(models, Mapping):
items = list(models.items())
for label, path in items:
out[label] = open_outputs(path, output_root=output_root)
return out
try:
paths = list(models)
except TypeError as exc:
raise TypeError(
"models must be a dict {label: path} or a list of paths; "
f"got {type(models).__name__!r}"
) from exc
for path in paths:
c = open_outputs(path, output_root=output_root)
label = _default_model_label(c.files[0])
if label in out:
c.close()
out.close()
raise ValueError(
f"Duplicate default label {label!r} for models opened by "
"list. Pass models as a dict {label: path} to disambiguate."
)
out[label] = c
return out
def _auto_detect_mpi(path: str) -> List[str]:
"""Return sorted MPI-rank peers if they exist; otherwise return ``[path]``."""
p = Path(path)
stem = p.stem
m = _MPI_SUFFIX.match(stem)
base = m.group(1) if m else stem
peers = sorted(p.parent.glob(f"{base}:MPI????{p.suffix}"))
if peers:
return [str(x) for x in peers]
return [path]