Source code for polyzymd.analyses.shared.window

"""Trajectory window helpers for trajectory-backed analyses.

This module centralizes the frame-window logic shared by analysis plugins that
need to combine equilibration skipping with MDAnalysis ``run()`` slice
arguments. The helpers return a validated window that can be passed directly to
trajectory-native runners without PolyzyMD re-owning the frame loop.
"""

from __future__ import annotations

import math
from dataclasses import dataclass
from numbers import Real
from typing import TYPE_CHECKING

from polyzymd.analyses.shared.diagnostics import validate_equilibration_time
from polyzymd.analyses.shared.loader import convert_time, parse_time_string

if TYPE_CHECKING:
    from polyzymd.analyses.shared.loader import TrajectoryLoader


def _equilibration_start_frame(equilibration_ps: float, timestep_ps: float) -> int:
    """Return the first frame at or after equilibration.

    Parameters
    ----------
    equilibration_ps : float
        Equilibration time in picoseconds.
    timestep_ps : float
        Time between consecutive frames in picoseconds.

    Returns
    -------
    int
        First frame index whose timestamp is greater than or equal to the
        equilibration time.
    """

    frame_position = equilibration_ps / timestep_ps
    rounded_position = round(frame_position)
    if math.isclose(frame_position, rounded_position, rel_tol=1e-12, abs_tol=1e-12):
        return int(rounded_position)
    return int(math.floor(frame_position)) + 1


def _finite_time_or_none(value: object) -> float | None:
    """Return a finite real-valued timestamp or ``None``.

    Parameters
    ----------
    value : object
        Candidate timestamp from trajectory metadata or a test double.

    Returns
    -------
    float | None
        Finite timestamp value, or ``None`` for missing, non-real, boolean, or
        non-finite inputs.
    """

    if value is None or isinstance(value, bool) or not isinstance(value, Real):
        return None
    time_ps = float(value)
    if not math.isfinite(time_ps):
        return None
    return time_ps


[docs] @dataclass(frozen=True) class TrajectoryWindow: """Validated frame window for a trajectory-backed analysis. Parameters ---------- start : int Inclusive start frame for the analysis run. stop : int Exclusive stop frame for the analysis run. step : int Frame stride passed to ``runner.run(step=...)``. equilibration_start : int Start frame implied by the equilibration time alone. n_frames_total : int Total number of frames in the trajectory. n_frames_selected : int Number of frames selected by ``start``, ``stop``, and ``step``. timestep_ps : float Trajectory timestep in picoseconds. equilibration_ps : float Equilibration time converted to picoseconds. equilibration : str | None, optional Original equilibration time string used to resolve the window. first_frame_time_ps : float | None, optional Absolute MDAnalysis timestamp of the first loaded frame in picoseconds, when available. selected_start_time_ps : float | None, optional Timestamp of the selected start frame in the active time reference. equilibration_time_reference : str, optional Time reference used to interpret ``equilibration``. ``"trajectory_timestamp"`` means absolute MDAnalysis timestamps were available; ``"loaded_frame_zero"`` means the stale loaded-frame-relative origin was used. warning_message : str | None Non-fatal equilibration warning generated during validation. """ start: int stop: int step: int equilibration_start: int n_frames_total: int n_frames_selected: int timestep_ps: float equilibration_ps: float equilibration: str | None = None first_frame_time_ps: float | None = None selected_start_time_ps: float | None = None equilibration_time_reference: str = "loaded_frame_zero" warning_message: str | None = None
[docs] def run_kwargs(self) -> dict[str, int]: """Return keyword arguments for ``MDAnalysis`` runner ``run()``. Returns ------- dict[str, int] ``start``, ``stop``, and ``step`` values for ``run()``. """ return {"start": self.start, "stop": self.stop, "step": self.step}
[docs] def resolve_replicate_trajectory_window( *, loader: "TrajectoryLoader", replicate: int, equilibration: str, n_frames_total: int, start: int | None = None, stop: int | None = None, step: int = 1, min_frames: int = 1, timestep_ps: float | None = None, ) -> TrajectoryWindow: """Resolve a validated window using loader trajectory timing metadata. Parameters ---------- loader : TrajectoryLoader Loader for the replicate being analyzed. replicate : int Replicate number. equilibration : str Equilibration time string such as ``"10ns"``. n_frames_total : int Total number of frames in the trajectory. start : int | None, optional Absolute start frame for the analysis window. When ``None``, the equilibration-resolved start frame is used. stop : int | None, optional Absolute exclusive stop frame. When ``None``, the full remaining trajectory is used. step : int, optional Frame stride, by default 1. min_frames : int, optional Minimum required number of selected frames, by default 1. timestep_ps : float | None, optional Explicit timestep override in picoseconds. When ``None``, the loader timestep is used. Returns ------- TrajectoryWindow Validated trajectory window with materialized ``run()`` arguments. """ resolved_timestep_ps = ( float(timestep_ps) if timestep_ps is not None else float(loader.get_timestep(replicate, unit="ps")) ) try: first_frame_time_ps = loader.get_first_frame_time(replicate, unit="ps") except (AttributeError, TypeError, ValueError): first_frame_time_ps = None return resolve_trajectory_window( equilibration=equilibration, n_frames_total=n_frames_total, timestep_ps=resolved_timestep_ps, first_frame_time_ps=first_frame_time_ps, start=start, stop=stop, step=step, min_frames=min_frames, )
[docs] def resolve_trajectory_window( *, equilibration: str, n_frames_total: int, timestep_ps: float, start: int | None = None, stop: int | None = None, step: int = 1, min_frames: int = 1, first_frame_time_ps: float | None = None, ) -> TrajectoryWindow: """Resolve and validate a trajectory frame window. When the first loaded frame has a finite MDAnalysis timestamp, ``equilibration`` is interpreted as an absolute trajectory time. The start frame is the first loaded frame whose timestamp is greater than or equal to the equilibration time. When timestamp metadata is unavailable, the noncanonical loaded-frame-relative origin is used. Parameters ---------- equilibration : str Equilibration time string such as ``"10ns"``. n_frames_total : int Total number of frames in the trajectory. timestep_ps : float Time between consecutive frames in picoseconds. start : int | None, optional Absolute start frame for the analysis window. When ``None``, the equilibration-resolved start frame is used. stop : int | None, optional Absolute exclusive stop frame. When ``None``, the trajectory end is used. step : int, optional Frame stride, by default 1. min_frames : int, optional Minimum required number of selected frames, by default 1. first_frame_time_ps : float | None, optional Absolute MDAnalysis timestamp of loaded frame 0 in picoseconds. Non-finite values are ignored and use the stale loaded-frame-relative behavior. Returns ------- TrajectoryWindow Validated frame window. Raises ------ ValueError Raised when the timestep, equilibration, or window arguments are inconsistent with the trajectory. """ if n_frames_total < 1: raise ValueError("Trajectory must contain at least one frame") if timestep_ps <= 0: raise ValueError(f"Trajectory timestep must be positive, got {timestep_ps}") if step < 1: raise ValueError(f"step must be >= 1, got {step}") if min_frames < 1: raise ValueError(f"min_frames must be >= 1, got {min_frames}") eq_value, eq_unit = parse_time_string(equilibration) equilibration_ps = convert_time(eq_value, eq_unit, "ps") finite_first_frame_time_ps = _finite_time_or_none(first_frame_time_ps) if finite_first_frame_time_ps is None: equilibration_offset_ps = equilibration_ps time_reference = "loaded_frame_zero" else: equilibration_offset_ps = max(0.0, equilibration_ps - finite_first_frame_time_ps) time_reference = "trajectory_timestamp" equilibration_ns = equilibration_offset_ps / 1000.0 trajectory_ns = (n_frames_total * timestep_ps) / 1000.0 if finite_first_frame_time_ps is not None: last_frame_time_ps = finite_first_frame_time_ps + (n_frames_total - 1) * timestep_ps if equilibration_ps > last_frame_time_ps and not math.isclose( equilibration_ps, last_frame_time_ps, rel_tol=1e-12, abs_tol=1e-12, ): raise ValueError( "Equilibration time " f"({equilibration_ps:.3f} ps) leaves no frame at or after equilibration " f"in loaded trajectory timestamps {finite_first_frame_time_ps:.3f} ps to " f"{last_frame_time_ps:.3f} ps with timestep {timestep_ps:.3f} ps" ) is_valid, warning_message = validate_equilibration_time(equilibration_ns, trajectory_ns) if not is_valid: raise ValueError(warning_message or "Invalid equilibration window") equilibration_start = _equilibration_start_frame(equilibration_offset_ps, timestep_ps) if equilibration_start >= n_frames_total: if finite_first_frame_time_ps is None: raise ValueError( "Equilibration time " f"({equilibration_ps / 1000.0:.3f} ns) leaves no frame at or after " f"equilibration in a {n_frames_total}-frame trajectory with timestep " f"{timestep_ps:.3f} ps" ) raise ValueError( "Equilibration time " f"({equilibration_ps:.3f} ps) leaves no frame at or after equilibration " f"in loaded trajectory timestamps {finite_first_frame_time_ps:.3f} ps to " f"{last_frame_time_ps:.3f} ps with timestep {timestep_ps:.3f} ps" ) resolved_start = equilibration_start if start is None else start resolved_stop = n_frames_total if stop is None else stop if resolved_start < 0: raise ValueError(f"start must be >= 0, got {resolved_start}") if resolved_start < equilibration_start: raise ValueError( f"start={resolved_start} precedes equilibration-resolved start frame " f"{equilibration_start}" ) if resolved_start >= n_frames_total: raise ValueError( f"start={resolved_start} is outside the trajectory range [0, {n_frames_total})" ) if resolved_stop <= resolved_start: raise ValueError(f"stop={resolved_stop} must be greater than start={resolved_start}") if resolved_stop > n_frames_total: raise ValueError(f"stop={resolved_stop} exceeds trajectory length {n_frames_total}") n_frames_selected = len(range(resolved_start, resolved_stop, step)) if n_frames_selected < min_frames: raise ValueError( "Trajectory window " f"[{resolved_start}:{resolved_stop}:{step}] selects {n_frames_selected} frame(s), " f"need at least {min_frames}" ) selected_start_time_ps = ( finite_first_frame_time_ps + resolved_start * timestep_ps if finite_first_frame_time_ps is not None else resolved_start * timestep_ps ) return TrajectoryWindow( start=resolved_start, stop=resolved_stop, step=step, equilibration_start=equilibration_start, n_frames_total=n_frames_total, n_frames_selected=n_frames_selected, timestep_ps=float(timestep_ps), equilibration_ps=float(equilibration_ps), equilibration=equilibration, first_frame_time_ps=finite_first_frame_time_ps, selected_start_time_ps=float(selected_start_time_ps), equilibration_time_reference=time_reference, warning_message=warning_message, )