"""Trajectory loading utilities for PolyzyMD analysis.
This module provides config-aware trajectory loading that understands
PolyzyMD's directory structure and daisy-chain continuation patterns.
File discovery is delegated to the active simulation engine so that
both OpenMM and GROMACS directory layouts are handled transparently.
Key Features
------------
- Config-based path resolution (config.yaml is single source of truth)
- Engine-aware file discovery (OpenMM daisy-chain, GROMACS flat layout)
- Automatic detection of daisy-chain trajectory segments
- Support for both scratch and projects directories
- Lazy loading and memory-efficient iteration
"""
from __future__ import annotations
import logging
import math
import re
from dataclasses import dataclass, field
from numbers import Real
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator, Sequence
import numpy as np
from numpy.typing import NDArray
from pydantic import ValidationError
if TYPE_CHECKING:
from MDAnalysis.core.universe import Universe
from polyzymd.config.schema import SimulationConfig
from polyzymd.engines.base import SimulationEngine, TrajectoryLayout
LOGGER = logging.getLogger(__name__)
_WARNED_GRO_TOPOLOGY_PATHS: set[Path] = set()
def _normalized_warning_path(path: Path) -> Path:
"""Return a stable key for topology warning de-duplication.
Parameters
----------
path : Path
Topology path to normalize.
Returns
-------
Path
Expanded and resolved path suitable for process-wide warning gating.
"""
expanded = Path(path).expanduser()
try:
return expanded.resolve(strict=False)
except OSError:
return expanded.absolute()
def _require_mdanalysis(feature_name: str = "trajectory analysis") -> None:
"""Raise ImportError if MDAnalysis is not available."""
try:
import MDAnalysis # noqa: F401
except ImportError:
raise ImportError(
f"MDAnalysis is required for {feature_name}.\n"
"Ensure MDAnalysis is available in the PolyzyMD pixi environment "
'(for example: pixi run -e build python -c "import MDAnalysis")'
) from None
def _require_matplotlib(feature_name: str = "plotting") -> None:
"""Raise ImportError if matplotlib is not available."""
try:
import matplotlib # noqa: F401
except ImportError:
raise ImportError(
f"matplotlib is required for {feature_name}.\n"
"Ensure matplotlib is available in the PolyzyMD pixi environment "
'(for example: pixi run -e build python -c "import matplotlib")'
) from None
def _trajectory_frame_index(trajectory: Any) -> int | None:
"""Return the current trajectory frame index when available.
Parameters
----------
trajectory : Any
MDAnalysis trajectory reader or a compatible test double.
Returns
-------
int | None
Current frame index, or ``None`` when it cannot be determined.
"""
for owner in (trajectory, getattr(trajectory, "ts", None)):
frame = getattr(owner, "frame", None)
if isinstance(frame, bool):
continue
try:
return int(frame)
except (TypeError, ValueError):
continue
return None
def _restore_trajectory_frame(trajectory: Any, frame_index: int | None) -> None:
"""Restore a trajectory reader to a previous frame when possible.
Parameters
----------
trajectory : Any
MDAnalysis trajectory reader or a compatible test double.
frame_index : int | None
Frame index captured before a metadata probe.
"""
if frame_index is None:
return
try:
trajectory[frame_index]
except (AttributeError, IndexError, TypeError, ValueError):
return
def _finite_numeric_time(value: object) -> float | None:
"""Return a finite real-valued time or ``None``.
Parameters
----------
value : object
Candidate time value from MDAnalysis metadata.
Returns
-------
float | None
Finite time value, or ``None`` when the value is missing, non-real, or
non-finite.
"""
if value is None or isinstance(value, bool) or not isinstance(value, Real):
return None
time_value = float(value)
if not math.isfinite(time_value):
return None
return time_value
def _trajectory_raw_time(trajectory: Any) -> float | None:
"""Return raw timestamp metadata from a trajectory timestep.
Parameters
----------
trajectory : Any
MDAnalysis trajectory reader or a compatible test double.
Returns
-------
float | None
Raw finite timestep time from ``ts.data['time']``, or ``None`` when it is
unavailable.
"""
ts = getattr(trajectory, "ts", None)
data = getattr(ts, "data", None)
if not isinstance(data, dict):
return None
return _finite_numeric_time(data.get("time"))
def _trajectory_time(trajectory: Any) -> float | None:
"""Return the best finite timestamp exposed by a trajectory reader.
Parameters
----------
trajectory : Any
MDAnalysis trajectory reader or a compatible test double.
Returns
-------
float | None
Raw timestep time when available, otherwise ``trajectory.time`` when it
is finite.
"""
raw_time = _trajectory_raw_time(trajectory)
if raw_time is not None:
return raw_time
return _finite_numeric_time(getattr(trajectory, "time", None))
class _TimestampPreservingTrajectory:
"""Proxy that exposes raw MDAnalysis timestep timestamps.
Some MDAnalysis multi-DCD ``ChainReader`` instances normalize
``reader.time`` to a loaded-frame-relative origin even though each timestep
keeps the absolute source timestamp in ``reader.ts.data['time']``. This proxy
preserves the reader protocol while making ``time`` return that raw timestamp
when available.
"""
def __init__(self, reader: Any) -> None:
"""Store the wrapped trajectory reader.
Parameters
----------
reader : Any
MDAnalysis trajectory reader to wrap.
"""
object.__setattr__(self, "_reader", reader)
def __len__(self) -> int:
"""Return the wrapped trajectory length."""
return len(self._reader)
def __iter__(self) -> Iterator[Any]:
"""Iterate over the wrapped trajectory reader."""
return iter(self._reader)
def __getitem__(self, item: Any) -> Any:
"""Delegate frame and slice access to the wrapped reader."""
return self._reader[item]
def __getattr__(self, name: str) -> Any:
"""Delegate unknown attributes to the wrapped reader."""
return getattr(self._reader, name)
def __setattr__(self, name: str, value: Any) -> None:
"""Delegate mutable reader attributes to the wrapped reader."""
if name == "_reader":
object.__setattr__(self, name, value)
return
setattr(self._reader, name, value)
@property
def time(self) -> float:
"""Return raw timestep time when available."""
raw_time = _trajectory_raw_time(self._reader)
if raw_time is not None:
return raw_time
return self._reader.time
def _wrap_timestamp_preserving_trajectory(trajectory: Any) -> Any:
"""Wrap trajectory readers that hide raw source timestamps.
Parameters
----------
trajectory : Any
MDAnalysis trajectory reader.
Returns
-------
Any
Original reader when no correction is needed, otherwise a proxy that
exposes raw timestamp metadata through ``time``.
"""
previous_frame = _trajectory_frame_index(trajectory)
try:
trajectory[0]
raw_time = _trajectory_raw_time(trajectory)
reported_time = _finite_numeric_time(getattr(trajectory, "time", None))
except (AttributeError, IndexError, TypeError, ValueError):
return trajectory
finally:
_restore_trajectory_frame(trajectory, previous_frame)
if raw_time is None or reported_time is None:
return trajectory
if math.isclose(raw_time, reported_time, rel_tol=1e-12, abs_tol=1e-12):
return trajectory
return _TimestampPreservingTrajectory(trajectory)
[docs]
@dataclass
class TrajectoryInfo:
"""Information about discovered trajectory files.
Attributes
----------
topology_file : Path
Path to topology file (PDB)
trajectory_files : list[Path]
List of trajectory files (DCD) in order
n_segments : int
Number of daisy-chain segments
working_directory : Path
Base working directory for this replicate
replicate : int
Replicate number
topology_format : str or None, optional
Engine-reported topology format, when available.
trajectory_format : str or None, optional
Engine-reported trajectory format, when available.
warnings : list[str]
Discovery warnings that should be preserved in downstream provenance.
"""
topology_file: Path
trajectory_files: list[Path] = field(default_factory=list)
n_segments: int = 0
working_directory: Path = field(default_factory=Path)
replicate: int = 1
topology_format: str | None = None
trajectory_format: str | None = None
warnings: list[str] = field(default_factory=list)
@property
def n_trajectory_files(self) -> int:
"""Number of trajectory files found."""
return len(self.trajectory_files)
[docs]
def validate(self) -> None:
"""Validate that all files exist."""
if not self.topology_file.exists():
raise FileNotFoundError(f"Topology not found: {self.topology_file}")
missing = [f for f in self.trajectory_files if not f.exists()]
if missing:
raise FileNotFoundError(f"Missing trajectory files: {missing}")
[docs]
class TrajectoryLoader:
"""Config-aware trajectory loader for PolyzyMD simulations.
This class handles the complexity of finding and loading trajectories
from PolyzyMD's output structure, including:
- Daisy-chain continuation segments (OpenMM)
- Flat production directories (GROMACS)
- Scratch vs projects directory resolution
- Multiple replicates
File discovery is delegated to the simulation engine resolved from the
config's ``engine`` field. The engine is created lazily on the first
call that needs it, so construction remains cheap. Engine resolution errors
propagate unless an explicit ``engine_override`` is supplied.
Parameters
----------
config : SimulationConfig
PolyzyMD simulation configuration.
engine_override : str or None, optional
Force a specific engine name (``"openmm"`` or ``"gromacs"``)
instead of reading ``config.engine``.
Examples
--------
>>> from polyzymd.config import load_config
>>> config = load_config("config.yaml")
>>> loader = TrajectoryLoader(config)
>>>
>>> # Load single replicate
>>> u = loader.load_universe(replicate=1)
>>> print(f"Loaded {len(u.trajectory)} frames")
>>>
>>> # Get trajectory info without loading
>>> info = loader.get_trajectory_info(replicate=1)
>>> print(f"Found {info.n_segments} segments")
>>>
>>> # Load multiple replicates
>>> for rep in range(1, 6):
... u = loader.load_universe(replicate=rep)
... # ... analyze
>>>
>>> # Explicit engine override for GROMACS directories
>>> loader = TrajectoryLoader(config, engine_override="gromacs")
Notes
-----
Frame indices in MDAnalysis are 0-indexed. For user-facing output,
add 1 to follow PyMOL convention (1-indexed frames).
"""
[docs]
def __init__(
self,
config: "SimulationConfig",
engine_override: str | None = None,
) -> None:
_require_mdanalysis()
self.config = config
self._engine_override = engine_override
self._engine: SimulationEngine | None = None
self._universe_cache: dict[int, "Universe"] = {}
# ------------------------------------------------------------------
# Engine delegation helpers
# ------------------------------------------------------------------
def _get_engine(self) -> "SimulationEngine":
"""Lazily create and cache the simulation engine.
Engine resolution errors from ``create_engine()`` propagate to callers
unless an explicit ``engine_override`` supplies a valid backend.
Returns
-------
SimulationEngine
Engine instance resolved from config.
"""
if self._engine is None:
from polyzymd.engines import create_engine
self._engine = create_engine(self.config, override=self._engine_override)
return self._engine
def _resolve_layout(
self,
working_dir: Path,
replicate: int | None = None,
) -> "TrajectoryLayout":
"""Resolve trajectory layout via the engine.
Parameters
----------
working_dir : Path
Replicate working directory.
replicate : int or None, optional
Replicate index. When ``None`` (e.g. from
``find_topology(working_dir)``), the replicate is inferred
from the directory name (``run_<N>``) with a fallback to 1.
Returns
-------
TrajectoryLayout
Engine-resolved file layout.
Raises
------
FileNotFoundError
If the engine cannot resolve the layout (e.g. invalid paths).
"""
if replicate is None:
replicate = self._infer_replicate(working_dir)
engine = self._get_engine()
try:
engine_dir = engine.resolve_engine_working_directory(working_dir)
except (AttributeError, TypeError):
engine_dir = working_dir
try:
layout = engine.resolve_trajectory_layout(engine_dir, replicate)
except FileNotFoundError:
raise
except (TypeError, ValueError, ValidationError) as exc:
# Invalid path-like inputs are treated as missing layouts so callers
# can use the existing discovery fallback path
raise FileNotFoundError(
f"Engine could not resolve trajectory layout in {working_dir}: {exc}"
) from exc
self._warn_gro_topology(layout)
return layout
@staticmethod
def _infer_replicate(working_dir: Path) -> int:
"""Best-effort replicate number from a ``run_<N>`` directory name.
Parameters
----------
working_dir : Path
Directory whose name may encode the replicate index.
Returns
-------
int
Parsed replicate number or ``1`` as a safe fallback.
"""
try:
dir_name = str(Path(working_dir).name)
except (TypeError, ValueError):
return 1
match = re.match(r"run_(\d+)", dir_name)
if match:
return int(match.group(1))
return 1
def _gro_topology_warning(self, layout: "TrajectoryLayout") -> str | None:
"""Build the chain-ID warning for GRO topology layouts.
GRO files do not reliably preserve chain identifiers, which can
break chain-based selections (``chainid A/B/C``) used by many
analysis plugins.
Parameters
----------
layout : TrajectoryLayout
Resolved layout from the engine.
Returns
-------
str or None
Actionable warning text when the layout uses a GRO topology,
otherwise ``None``.
"""
if layout.topology_path is None or layout.topology_format.lower() != "gro":
return None
return (
f"Using GRO topology {layout.topology_path} — GRO files may not preserve "
"chain identifiers. Chain-based selections (chainid A/B/C) used by "
"analysis plugins may be unreliable. Prefer a PDB topology when available."
)
def _warn_gro_topology(self, layout: "TrajectoryLayout") -> str | None:
"""Emit a one-time warning when the resolved topology is a GRO file.
Parameters
----------
layout : TrajectoryLayout
Resolved layout from the engine.
Returns
-------
str or None
Actionable warning text when the layout uses a GRO topology,
otherwise ``None``.
"""
warning = self._gro_topology_warning(layout)
if warning is not None and layout.topology_path is not None:
topology_key = _normalized_warning_path(layout.topology_path)
if topology_key in _WARNED_GRO_TOPOLOGY_PATHS:
return warning
_WARNED_GRO_TOPOLOGY_PATHS.add(topology_key)
LOGGER.warning(warning)
return warning
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def get_trajectory_info(self, replicate: int) -> TrajectoryInfo:
"""Get trajectory file information for a replicate.
Parameters
----------
replicate : int
Replicate number (1-indexed)
Returns
-------
TrajectoryInfo
Information about discovered trajectory files
Raises
------
FileNotFoundError
If working directory or required files don't exist
"""
# Get working directory from config
working_dir = self.config.get_working_directory(replicate)
if not working_dir.exists():
available = self._find_available_replicates()
raise FileNotFoundError(
self._format_missing_data_message(
"Working directory not found",
working_dir=working_dir,
replicate=replicate,
available_replicates=available,
action=(
"Run or complete the simulation for this replicate, or verify "
"the config output/scratch paths before rerunning analysis."
),
)
)
# Delegate file discovery to the simulation engine
layout = self._resolve_layout(working_dir, replicate=replicate)
if layout.topology_path is None:
raise FileNotFoundError(f"No topology file found in {working_dir}")
if not layout.trajectory_paths:
available = self._find_available_replicates()
raise FileNotFoundError(
self._format_missing_data_message(
"No production trajectory files found",
working_dir=working_dir,
replicate=replicate,
available_replicates=available,
action=(
"Run or complete the production simulation for this replicate, "
"then rerun analysis. Use --recompute only after trajectory files exist."
),
)
)
warnings = []
gro_warning = self._gro_topology_warning(layout)
if gro_warning is not None:
warnings.append(gro_warning)
return TrajectoryInfo(
topology_file=layout.topology_path,
trajectory_files=layout.trajectory_paths,
n_segments=len(layout.trajectory_paths),
working_directory=working_dir,
replicate=replicate,
topology_format=layout.topology_format,
trajectory_format=layout.trajectory_format,
warnings=warnings,
)
[docs]
def load_universe(
self,
replicate: int,
cache: bool = True,
) -> "Universe":
"""Load MDAnalysis Universe for a replicate.
Parameters
----------
replicate : int
Replicate number (1-indexed)
cache : bool, optional
If True (default), cache the Universe for reuse
Returns
-------
Universe
MDAnalysis Universe with trajectory loaded
Notes
-----
For daisy-chain trajectories, all segments are loaded as a
continuous trajectory using MDAnalysis's ChainReader.
"""
_require_mdanalysis()
import MDAnalysis as mda
if cache and replicate in self._universe_cache:
return self._universe_cache[replicate]
info = self.get_trajectory_info(replicate)
info.validate()
# Load universe - MDAnalysis handles multiple trajectory files
if len(info.trajectory_files) == 1:
u = mda.Universe(
str(info.topology_file),
str(info.trajectory_files[0]),
)
else:
# Multiple segments - use ChainReader
u = mda.Universe(
str(info.topology_file),
[str(f) for f in info.trajectory_files],
)
u.trajectory = _wrap_timestamp_preserving_trajectory(u.trajectory)
if cache:
self._universe_cache[replicate] = u
return u
[docs]
def iter_replicates(
self,
replicates: Sequence[int],
) -> Iterator[tuple[int, "Universe"]]:
"""Iterate over multiple replicates.
Parameters
----------
replicates : sequence of int
Replicate numbers to load
Yields
------
tuple of (int, Universe)
Replicate number and loaded Universe
Examples
--------
>>> for rep, u in loader.iter_replicates([1, 2, 3, 4, 5]):
... rmsf = compute_rmsf(u)
... results[rep] = rmsf
"""
for rep in replicates:
yield rep, self.load_universe(rep)
[docs]
def get_frame_times(
self,
replicate: int,
unit: str = "ns",
) -> NDArray[np.float64]:
"""Get time values for each frame.
Parameters
----------
replicate : int
Replicate number
unit : str, optional
Time unit for output. Options: "ps", "ns". Default is "ns".
Returns
-------
NDArray[np.float64]
Array of time values for each frame
"""
u = self.load_universe(replicate)
trajectory = u.trajectory
previous_frame = _trajectory_frame_index(trajectory)
try:
times_ps = []
for frame_index in range(len(trajectory)):
trajectory[frame_index]
time_ps = _trajectory_time(trajectory)
if time_ps is None:
raise ValueError(
"Trajectory timestamps are unavailable for frame-time extraction"
)
times_ps.append(time_ps)
finally:
_restore_trajectory_frame(trajectory, previous_frame)
times = np.array(times_ps, dtype=np.float64)
# Convert units (MDAnalysis uses ps internally)
if unit == "ns":
times = times / 1000.0
elif unit != "ps":
raise ValueError(f"Unknown time unit: {unit}. Use 'ps' or 'ns'.")
return times
[docs]
def get_timestep(self, replicate: int, unit: str = "ps") -> float:
"""Get the trajectory timestep (time between frames).
Parameters
----------
replicate : int
Replicate number
unit : str, optional
Time unit. Options: "ps", "ns". Default is "ps".
Returns
-------
float
Time between consecutive frames
"""
u = self.load_universe(replicate)
trajectory = u.trajectory
# Get timestep from trajectory
if len(trajectory) < 2:
raise ValueError("Need at least 2 frames to determine timestep")
previous_frame = _trajectory_frame_index(trajectory)
try:
trajectory[0]
t0 = _trajectory_time(trajectory)
trajectory[1]
t1 = _trajectory_time(trajectory)
finally:
_restore_trajectory_frame(trajectory, previous_frame)
if t0 is None or t1 is None:
raise ValueError("Trajectory timestamps are unavailable for timestep detection")
dt = t1 - t0 # in ps (MDAnalysis default)
if unit == "ns":
dt = dt / 1000.0
elif unit != "ps":
raise ValueError(f"Unknown time unit: {unit}")
return float(dt)
[docs]
def get_first_frame_time(self, replicate: int, unit: str = "ps") -> float | None:
"""Return the first loaded frame timestamp when available.
MDAnalysis reports trajectory times in picoseconds. This method probes
cached Universe metadata without changing the caller-visible current
frame when the reader exposes a restorable frame index.
Parameters
----------
replicate : int
Replicate number.
unit : str, optional
Time unit for output. Options are ``"ps"`` and ``"ns"``, by default
``"ps"``.
Returns
-------
float | None
Finite first-frame timestamp in the requested unit, or ``None`` when
the trajectory does not expose a usable timestamp.
Raises
------
ValueError
Raised when ``unit`` is not ``"ps"`` or ``"ns"``.
"""
if unit not in {"ps", "ns"}:
raise ValueError(f"Unknown time unit: {unit}")
u = self.load_universe(replicate)
trajectory = u.trajectory
previous_frame = _trajectory_frame_index(trajectory)
try:
trajectory[0]
time_ps = _trajectory_time(trajectory)
except (AttributeError, IndexError, TypeError, ValueError):
return None
finally:
_restore_trajectory_frame(trajectory, previous_frame)
if time_ps is None:
return None
if unit == "ns":
return time_ps / 1000.0
return time_ps
[docs]
def clear_cache(self) -> None:
"""Clear the Universe cache to free memory."""
self._universe_cache.clear()
def _find_available_replicates(self) -> list[int]:
"""Find available replicate numbers from existing run_* directories.
Returns
-------
list[int]
Sorted list of replicate numbers that have simulation directories
"""
discovered = self._discover_replicates_from_config()
if discovered:
return discovered
scratch_dir = self._get_scratch_directory()
if scratch_dir is None:
return []
if not scratch_dir.exists():
return []
replicates = []
for d in scratch_dir.iterdir():
if d.is_dir() and d.name.startswith("run_"):
try:
# Extract replicate number from directory name (e.g., "run_1" -> 1)
rep_num = int(d.name.split("_")[1])
replicates.append(rep_num)
except (IndexError, ValueError):
continue
return sorted(replicates)
def _discover_replicates_from_config(self) -> list[int]:
"""Discover replicate directories through SimulationConfig when available.
Returns
-------
list[int]
Sorted replicate numbers discovered by the config helper, or an
empty list when unavailable.
"""
discover = getattr(self.config, "discover_replicate_dirs", None)
if not callable(discover):
return []
try:
replicate_dirs = discover()
except (AttributeError, TypeError, OSError, ValueError):
return []
replicates: list[int] = []
for item in replicate_dirs:
if isinstance(item, tuple) and item:
try:
replicates.append(int(item[0]))
continue
except (TypeError, ValueError):
item = item[-1]
try:
path = Path(item)
except TypeError:
continue
match = re.search(r"run_?(\d+)$", path.name)
if match:
replicates.append(int(match.group(1)))
return sorted(set(replicates))
def _get_scratch_directory(self) -> Path | None:
"""Return configured scratch directory when it can be resolved.
Returns
-------
Path or None
Effective scratch directory, or ``None`` when config metadata is
incomplete.
"""
try:
scratch_dir = self.config.output.effective_scratch_directory
except AttributeError:
return None
if scratch_dir is None:
return None
try:
return Path(scratch_dir)
except TypeError:
return None
def _format_missing_data_message(
self,
headline: str,
*,
working_dir: Path,
replicate: int,
available_replicates: Sequence[int],
action: str,
) -> str:
"""Build a user-facing message for missing replicate trajectory data.
Parameters
----------
headline : str
Stable leading message used by existing tests.
working_dir : Path
Expected working directory for the replicate.
replicate : int
Requested replicate number.
available_replicates : sequence of int
Replicates discovered on disk.
action : str
Actionable hint for the user.
Returns
-------
str
Multi-line diagnostic message.
"""
scratch_dir = self._get_scratch_directory()
available = (
", ".join(str(rep) for rep in available_replicates)
if available_replicates
else "none found"
)
scratch_text = str(scratch_dir) if scratch_dir is not None else "not configured"
return (
f"{headline}: {working_dir}\n"
f"Replicate: {replicate}\n"
f"Expected working directory: {working_dir}\n"
f"Scratch directory: {scratch_text}\n"
f"Available replicates: {available}\n"
f"Action: {action}"
)
[docs]
def find_topology(self, working_dir: Path) -> Path:
"""Find topology file in working directory.
Delegates file discovery to the simulation engine. The engine
applies its own search order (e.g. PDB preference for GROMACS,
``solvated_system.pdb`` preference for OpenMM).
This method is used by several plugins that pass an explicit
``working_dir`` unrelated to the current replicate. The
replicate index is inferred from the directory name when
possible (``run_<N>``), falling back to ``1``.
Parameters
----------
working_dir : Path
Directory to search for topology files.
Returns
-------
Path
Path to the topology file.
Raises
------
FileNotFoundError
If no topology file is found.
"""
layout = self._resolve_layout(working_dir, replicate=None)
if layout.topology_path is None:
raise FileNotFoundError(f"No topology file found in {working_dir}")
return layout.topology_path
def _find_trajectories(self, working_dir: Path) -> list[Path]:
"""Find trajectory files via the simulation engine.
Parameters
----------
working_dir : Path
Working directory to search.
Returns
-------
list[Path]
Ordered trajectory file paths.
Raises
------
FileNotFoundError
If no trajectory files are found.
"""
layout = self._resolve_layout(working_dir, replicate=None)
if not layout.trajectory_paths:
replicate = self._infer_replicate(working_dir)
raise FileNotFoundError(
self._format_missing_data_message(
"No production trajectory files found",
working_dir=working_dir,
replicate=replicate,
available_replicates=self._find_available_replicates(),
action=(
"Run or complete the production simulation for this replicate, "
"then rerun analysis. Use --recompute only after trajectory files exist."
),
)
)
return layout.trajectory_paths
[docs]
def parse_time_string(time_str: str) -> tuple[float, str]:
"""Parse a time string with units into value and unit.
Parameters
----------
time_str : str
Time string like "100ns", "5000ps", "100 ns", etc.
Returns
-------
tuple of (float, str)
Numeric value and unit string
Examples
--------
>>> parse_time_string("100ns")
(100.0, "ns")
>>> parse_time_string("5000 ps")
(5000.0, "ps")
>>> parse_time_string("100") # Default to ns
(100.0, "ns")
"""
time_str = time_str.strip()
# Try to extract number and unit
match = re.match(r"^([\d.]+)\s*([a-zA-Z]*)$", time_str)
if not match:
raise ValueError(f"Cannot parse time string: {time_str}")
value = float(match.group(1))
unit = match.group(2).lower() if match.group(2) else "ns"
if unit not in ("ns", "ps", "fs"):
raise ValueError(f"Unknown time unit: {unit}. Use 'ns', 'ps', or 'fs'.")
return value, unit
[docs]
def convert_time(value: float, from_unit: str, to_unit: str) -> float:
"""Convert time between units.
Parameters
----------
value : float
Time value
from_unit : str
Source unit ("fs", "ps", "ns")
to_unit : str
Target unit ("fs", "ps", "ns")
Returns
-------
float
Converted time value
"""
# Convert to picoseconds first
to_ps = {"fs": 0.001, "ps": 1.0, "ns": 1000.0}
from_ps = {"fs": 1000.0, "ps": 1.0, "ns": 0.001}
if from_unit not in to_ps or to_unit not in from_ps:
raise ValueError(f"Unknown unit: {from_unit} or {to_unit}")
ps_value = value * to_ps[from_unit]
return ps_value * from_ps[to_unit]
[docs]
def time_to_frame(
time: float,
time_unit: str,
timestep: float,
timestep_unit: str = "ps",
) -> int:
"""Convert time to frame index.
Parameters
----------
time : float
Time value
time_unit : str
Unit of time value
timestep : float
Time between frames
timestep_unit : str
Unit of timestep (default: "ps")
Returns
-------
int
Frame index (0-indexed)
"""
# Convert both to same units
time_ps = convert_time(time, time_unit, "ps")
dt_ps = convert_time(timestep, timestep_unit, "ps")
return int(time_ps / dt_ps)