Source code for polyzymd.analyses.shared.convergence

"""Convergence diagnostics for sliding-window timeseries analysis.

This module implements a sliding-window convergence heuristic adapted from a
collaborator notebook used for RMSD equilibration checks.
"""

from __future__ import annotations

import math
from dataclasses import dataclass

import numpy as np
from numpy.typing import ArrayLike


[docs] @dataclass class ConvergenceResult: """Container for convergence diagnostics. Attributes ---------- converged : bool Whether sustained convergence was detected. assessable : bool Whether convergence could be assessed from available data. convergence_time_ns : float | None Start time of the first sustained converged period. window_start_times_ns : list[float] Start times for each sliding window. window_mean_values : list[float] Mean signal value in each sliding window. slope_times_ns : list[float] Time points associated with slope estimates. slopes : list[float] Slopes between successive window means. window_size_ns : float Sliding window width in ns. step_size_ns : float Sliding window stride in ns. slope_threshold : float Absolute slope cutoff used for convergence. sustained_for_ns : float Required sustained duration below slope threshold. message : str Human-readable status message. """ converged: bool assessable: bool convergence_time_ns: float | None window_start_times_ns: list[float] window_mean_values: list[float] slope_times_ns: list[float] slopes: list[float] window_size_ns: float step_size_ns: float slope_threshold: float sustained_for_ns: float message: str
[docs] def find_convergence_time( time_ns: ArrayLike, values: ArrayLike, window_size_ns: float = 15.0, step_size_ns: float = 5.0, slope_threshold: float = 0.0005, sustained_for_ns: float = 15.0, ) -> ConvergenceResult: """Find sustained convergence time using a sliding-window slope heuristic. Parameters ---------- time_ns : array_like Monotonically increasing time values in ns. values : array_like Signal values sampled at ``time_ns``. window_size_ns : float, optional Width of each averaging window in ns. step_size_ns : float, optional Sliding step between successive windows in ns. slope_threshold : float, optional Absolute slope threshold for classifying a window-to-window change as converged. sustained_for_ns : float, optional Required cumulative duration below threshold before declaring convergence. Returns ------- ConvergenceResult Full convergence diagnostics, including intermediate window means and slope traces. Raises ------ ValueError Raised when inputs are invalid. """ _validate_parameters( window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, ) time_arr = np.asarray(time_ns, dtype=np.float64) value_arr = np.asarray(values, dtype=np.float64) if not np.all(np.isfinite(time_arr)) or not np.all(np.isfinite(value_arr)): raise ValueError("time_ns and values must contain only finite values (no NaN or inf)") if time_arr.ndim != 1 or value_arr.ndim != 1: raise ValueError("time_ns and values must be 1D arrays") if len(time_arr) != len(value_arr): raise ValueError("time_ns and values must have the same length") if len(time_arr) == 0: return ConvergenceResult( converged=False, assessable=False, convergence_time_ns=None, window_start_times_ns=[], window_mean_values=[], slope_times_ns=[], slopes=[], window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, message="Convergence not assessable: empty timeseries", ) if np.any(np.diff(time_arr) <= 0): raise ValueError("time_ns must be strictly increasing") total_duration = float(time_arr[-1] - time_arr[0]) if total_duration < window_size_ns: return ConvergenceResult( converged=False, assessable=False, convergence_time_ns=None, window_start_times_ns=[], window_mean_values=[], slope_times_ns=[], slopes=[], window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, message=( "Convergence not assessable: trajectory shorter than window size " f"({total_duration:.3f} ns < {window_size_ns:.3f} ns)" ), ) window_means: list[float] = [] window_start_times: list[float] = [] start_time = float(time_arr[0]) while start_time + window_size_ns <= float(time_arr[-1]): end_time = start_time + window_size_ns indices = np.where((time_arr >= start_time) & (time_arr < end_time))[0] if len(indices) > 0: window_means.append(float(np.mean(value_arr[indices]))) window_start_times.append(start_time) start_time += step_size_ns if len(window_means) < 2: return ConvergenceResult( converged=False, assessable=False, convergence_time_ns=None, window_start_times_ns=window_start_times, window_mean_values=window_means, slope_times_ns=[], slopes=[], window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, message="Convergence not assessable: insufficient sliding windows", ) sustained_duration = 0.0 convergence_start_time: float | None = None slope_times: list[float] = [] slopes: list[float] = [] for i in range(1, len(window_means)): delta_rmsd = window_means[i] - window_means[i - 1] delta_time = window_start_times[i] - window_start_times[i - 1] slope = delta_rmsd / delta_time slopes.append(float(slope)) slope_times.append(float(window_start_times[i])) if abs(slope) < slope_threshold: if sustained_duration == 0: convergence_start_time = window_start_times[i - 1] sustained_duration += delta_time else: sustained_duration = 0.0 convergence_start_time = None if sustained_duration >= sustained_for_ns: return ConvergenceResult( converged=True, assessable=True, convergence_time_ns=convergence_start_time, window_start_times_ns=window_start_times, window_mean_values=window_means, slope_times_ns=slope_times, slopes=slopes, window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, message=f"Converged at {convergence_start_time:.3f} ns", ) return ConvergenceResult( converged=False, assessable=True, convergence_time_ns=None, window_start_times_ns=window_start_times, window_mean_values=window_means, slope_times_ns=slope_times, slopes=slopes, window_size_ns=window_size_ns, step_size_ns=step_size_ns, slope_threshold=slope_threshold, sustained_for_ns=sustained_for_ns, message="Did not converge within available trajectory window", )
def _validate_parameters( *, window_size_ns: float, step_size_ns: float, slope_threshold: float, sustained_for_ns: float, ) -> None: """Validate sliding-window convergence parameters. Parameters ---------- window_size_ns : float Window width in ns. step_size_ns : float Window stride in ns. slope_threshold : float Slope magnitude threshold. sustained_for_ns : float Required sustained converged duration. """ scalar_params = { "window_size_ns": window_size_ns, "step_size_ns": step_size_ns, "slope_threshold": slope_threshold, "sustained_for_ns": sustained_for_ns, } for param_name, param in scalar_params.items(): if not math.isfinite(param): raise ValueError(f"{param_name} must be finite, got {param}") if window_size_ns <= 0: raise ValueError("window_size_ns must be > 0") if step_size_ns <= 0: raise ValueError("step_size_ns must be > 0") if slope_threshold <= 0: raise ValueError("slope_threshold must be > 0") if sustained_for_ns <= 0: raise ValueError("sustained_for_ns must be > 0") if step_size_ns > window_size_ns: raise ValueError("step_size_ns must be <= window_size_ns")