Source code for polyzymd.analyses.orchestrator

"""Orchestrator for running analyses through the plugin system.

The orchestrator owns the boring-but-critical plumbing:

- **Replicate iteration** with error handling and minimum-replicate checks.
- **Dependency ordering** via topological sort.
- **Condition filtering** (delegates to each analysis's ``filter_conditions``).
- **Context construction** — builds the right context objects and passes
  them to the analysis plugin.

The orchestrator does NOT own:

- Science code (that lives in each ``Analysis`` subclass).
- CLI wiring (that lives in ``cli/``).
- Configuration parsing (that lives in ``config/``).

Usage
-----
::

    from polyzymd.analyses.orchestrator import run_analysis, run_comparison

    # Run a single analysis for one condition
    run_analysis("rmsf", condition, settings, equilibration="10ns")

    # Run full comparison pipeline
    run_comparison("rmsf", comparison_config)
"""

from __future__ import annotations

import logging
import shutil
from pathlib import Path
from typing import TYPE_CHECKING, Any, Sequence

from polyzymd.analyses._framework.lifecycle import AnalysisLifecycle, _resolve_settings
from polyzymd.analyses.base import Condition
from polyzymd.analyses.exceptions import (
    AnalysisError,
    DependencyError,
    PluginContractError,
)

if TYPE_CHECKING:
    from pydantic import BaseModel

    from polyzymd.analyses.base import Analysis
    from polyzymd.analyses.mda import MDABackendPolicy
    from polyzymd.config.comparison import ComparisonConfig

logger = logging.getLogger("polyzymd.analyses")

_MANY_TASKS_THRESHOLD = 10


[docs] def run_replicate_once( analysis: Analysis, condition: Condition, settings: BaseModel, equilibration: str, output_dir: Path, replicate: int, recompute: bool, backend_policy: "MDABackendPolicy | None" = None, ) -> Any: """Run a single replicate compute stage and save canonical output. Parameters ---------- analysis : Analysis Analysis plugin instance. condition : Condition Condition being analyzed. settings : BaseModel Resolved analysis settings. equilibration : str Equilibration time string. output_dir : Path Replicate run directory (for example ``run_1``). replicate : int Replicate number. recompute : bool Whether to force recomputation. backend_policy : MDABackendPolicy or None, optional MDAnalysis internal backend policy for MDA job-backed analyses. Returns ------- Any Replicate result returned by the plugin. """ return AnalysisLifecycle(analysis).run_replicate_once( condition, settings, equilibration, output_dir, replicate, recompute, backend_policy=backend_policy, )
[docs] def aggregate_condition_from_disk( analysis: Analysis, condition: Condition, settings: BaseModel, equilibration: str, output_dir: Path, replicates: Sequence[int], recompute: bool = False, ) -> Any: """Aggregate one condition by loading replicate results from disk. Parameters ---------- analysis : Analysis Analysis plugin instance. condition : Condition Condition being aggregated. settings : BaseModel Resolved analysis settings. equilibration : str Equilibration time string. output_dir : Path Analysis output directory for the condition. replicates : Sequence[int] Replicate numbers to load. recompute : bool, optional Force regeneration of aggregate outputs. Returns ------- Any Aggregated result returned by ``analysis.aggregate()``. Raises ------ ValueError If fewer than ``analysis.min_replicates`` replicate results are available. """ return AnalysisLifecycle(analysis).aggregate_condition_from_disk( condition, settings, equilibration, output_dir, replicates, recompute=recompute, )
# --------------------------------------------------------------------------- # Single-condition analysis # ---------------------------------------------------------------------------
[docs] def run_analysis( analysis: Analysis, condition: Condition, settings: Any, equilibration: str = "0ns", output_dir: Path | None = None, recompute: bool = False, backend_policy: "MDABackendPolicy | None" = None, ) -> Any: """Run a single analysis for one condition (compute + aggregate). Parameters ---------- analysis : Analysis The analysis plugin instance. condition : Condition The condition to analyse. settings : BaseModel Analysis-specific settings. equilibration : str Equilibration time string (e.g. ``"10ns"``). output_dir : Path | None Output directory. If ``None``, auto-resolved from condition config. recompute : bool Force recomputation of cached results. backend_policy : MDABackendPolicy or None, optional MDAnalysis internal backend policy for MDA job-backed analyses. Returns ------- BaseModel Aggregated result. """ return AnalysisLifecycle(analysis).run_analysis( condition, settings, equilibration=equilibration, output_dir=output_dir, recompute=recompute, backend_policy=backend_policy, )
[docs] def prepare_comparison_run( analysis: Analysis, config: ComparisonConfig, equilibration: str | None, ) -> dict[str, Any]: """Resolve shared comparison state before compute/aggregate/compare. Parameters ---------- analysis : Analysis Analysis plugin instance. config : ComparisonConfig Comparison configuration. equilibration : str | None Optional equilibration override. Returns ------- dict[str, Any] Prepared comparison state including filtered conditions. """ return AnalysisLifecycle(analysis, settings_resolver=_resolve_settings).prepare_comparison_run( config, equilibration, )
def _print_execution_summary( analysis: Analysis, conditions: list[Condition], settings: BaseModel, equilibration: str, ) -> None: """Print execution summary and warn for potentially slow local runs. This helper is messaging-only and does not change execution behavior. """ del settings total_replicates = sum(len(condition.replicates) for condition in conditions) logger.info( "%s\n" " %s%d conditions × %d total replicate tasks\n" " Mode: sequential (local)\n" " Equilibration: %s\n" "%s", "=" * 60, analysis.name, len(conditions), total_replicates, equilibration, "=" * 60, ) is_expensive = getattr(analysis, "execution_cost_hint", "medium") == "high" many_tasks = total_replicates > _MANY_TASKS_THRESHOLD if not (is_expensive or many_tasks): return if shutil.which("sbatch") is not None: logger.warning( "This analysis may take a long time to run locally\n" "Consider submitting to SLURM for parallel execution:\n" " polyzymd compare submit %s", analysis.name, ) else: logger.warning( "This analysis may take a long time to run locally\n" "If you have access to an HPC cluster with SLURM, consider:\n" " polyzymd compare submit %s", analysis.name, )
[docs] def finalize_comparison_from_disk( analysis: Analysis, config: ComparisonConfig, analysis_dirs: dict[str, Path], aggregated_results: dict[str, Any], results_dir: Path, figures_dir: Path, settings: BaseModel, effective_control: str | None, prepared_state: dict[str, Any] | None = None, allow_partial: bool = False, recompute: bool = False, ) -> dict[str, Any]: """Run compare and plot using already-aggregated condition results. Parameters ---------- analysis : Analysis Analysis plugin instance. config : ComparisonConfig Comparison configuration. analysis_dirs : dict[str, Path] Mapping of condition label to analysis directory. aggregated_results : dict[str, Any] Mapping of condition label to aggregated result objects. results_dir : Path Directory for comparison result JSON. figures_dir : Path Output directory for generated figures. settings : BaseModel Resolved analysis settings. effective_control : str | None Control condition label if available in successful conditions. allow_partial : bool, optional If ``True``, proceed with dropped conditions. If ``False``, fail when any configured condition lacks aggregated results. recompute : bool, optional Force regeneration of comparison and plot outputs. Returns ------- dict[str, Any] Dictionary with ``comparison``, ``comparison_path``, and ``plots``. """ return AnalysisLifecycle( analysis, settings_resolver=_resolve_settings ).finalize_comparison_from_disk( config=config, analysis_dirs=analysis_dirs, aggregated_results=aggregated_results, results_dir=results_dir, figures_dir=figures_dir, settings=settings, effective_control=effective_control, prepared_state=prepared_state, allow_partial=allow_partial, recompute=recompute, )
# --------------------------------------------------------------------------- # Dependency ordering # --------------------------------------------------------------------------- def _topological_sort(analyses: list[Analysis]) -> list[Analysis]: """Order analyses so dependencies come first. Parameters ---------- analyses : list[Analysis] Analyses to sort. Returns ------- list[Analysis] Analyses in dependency order. Raises ------ ValueError If there is a circular dependency. """ name_to_analysis = {a.name: a for a in analyses} visited: set[str] = set() temp_marks: set[str] = set() order: list[Analysis] = [] def visit(name: str) -> None: if name in visited: return if name in temp_marks: raise ValueError(f"Circular dependency detected involving {name!r}") temp_marks.add(name) analysis = name_to_analysis.get(name) if analysis is not None: for dep_name in analysis.dependencies: visit(dep_name) temp_marks.discard(name) visited.add(name) if analysis not in order: order.append(analysis) else: temp_marks.discard(name) visited.add(name) for a in analyses: visit(a.name) return order
[docs] def order_analyses_for_execution( analysis_names: Sequence[str], satisfied: set[str] | None = None, ) -> list[str]: """Return analysis names ordered by dependency constraints. Parameters ---------- analysis_names : Sequence[str] Canonical analysis names to order. satisfied : set[str] | None, optional Dependency names that are already satisfied outside this run list, such as excluded analyses with completed results on disk. Returns ------- list[str] Canonical analysis names in dependency-safe execution order. Raises ------ KeyError If an analysis name cannot be resolved. DependencyError If declared dependencies are missing from the requested set. ValueError If a dependency cycle is detected. """ from polyzymd.analyses.discovery import get_analysis requested: list[Analysis] = [] seen_names: set[str] = set() for name in analysis_names: analysis_cls = get_analysis(name) canonical_name = analysis_cls.name if canonical_name in seen_names: continue seen_names.add(canonical_name) requested.append(analysis_cls()) _validate_dependencies(requested, satisfied=satisfied) ordered = _topological_sort(requested) return [analysis.name for analysis in ordered]
# --------------------------------------------------------------------------- # Full comparison pipeline # ---------------------------------------------------------------------------
[docs] def run_comparison( analysis: Analysis, config: "ComparisonConfig", recompute: bool = False, equilibration: str | None = None, ) -> dict[str, Any]: """Run the full comparison pipeline for one analysis type. Steps: 1. Build ``Condition`` objects from ``ComparisonConfig``. 2. Filter conditions via ``analysis.filter_conditions()``. 3. For each condition: compute replicates + aggregate. 4. Run ``analysis.compare()``. 5. Run ``analysis.plot()``. Parameters ---------- analysis : Analysis The analysis plugin instance. config : ComparisonConfig Comparison configuration. recompute : bool Force recomputation. equilibration : str or None Override equilibration time. If ``None``, uses ``config.defaults.equilibration_time``. Returns ------- dict[str, Any] Dictionary with ``"aggregated"``, ``"comparison"``, ``"plots"`` keys. """ return AnalysisLifecycle( analysis, settings_resolver=_resolve_settings, prepare_comparison_run=prepare_comparison_run, run_analysis=run_analysis, finalize_comparison_from_disk=finalize_comparison_from_disk, execution_summary=_print_execution_summary, ).run_comparison( config, recompute=recompute, equilibration=equilibration, )
# --------------------------------------------------------------------------- # Multi-analysis comparison runner # ---------------------------------------------------------------------------
[docs] def run_all_comparisons( config: "ComparisonConfig", analysis_names: list[str] | None = None, recompute: bool = False, equilibration: str | None = None, ) -> dict[str, dict[str, Any]]: """Run comparisons for multiple (or all enabled) analysis types. Analyses are run in dependency order. Parameters ---------- config : ComparisonConfig Comparison configuration. analysis_names : list[str] | None Analysis names to run. ``None`` = run all enabled in config. recompute : bool Force recomputation. equilibration : str or None Override equilibration time. If ``None``, uses ``config.defaults.equilibration_time``. Returns ------- dict[str, dict[str, Any]] Mapping ``analysis_name -> run_comparison() result``. """ from polyzymd.analyses.discovery import get_analysis if analysis_names is None: # Use whatever is enabled in the unified plugin config analysis_names = _get_enabled_from_config(config) # Instantiate and sort analyses = [] for name in analysis_names: try: cls = get_analysis(name) analyses.append(cls()) except KeyError: logger.warning(f"Unknown analysis type {name!r} — skipping.") # Validate declared dependencies are discoverable _validate_dependencies(analyses) sorted_analyses = _topological_sort(analyses) results: dict[str, dict[str, Any]] = {} for analysis in sorted_analyses: logger.info(f"{'=' * 60}") logger.info(f"Running {analysis.name} comparison") logger.info(f"{'=' * 60}") try: results[analysis.name] = run_comparison( analysis, config, recompute, equilibration=equilibration ) except PluginContractError: raise except (AnalysisError, ValueError, FileNotFoundError, OSError) as e: logger.error(f"{analysis.name} comparison failed: {e}") results[analysis.name] = {"error": str(e)} return results
[docs] def run_plot_only( analysis: Analysis, config: "ComparisonConfig", equilibration: str | None = None, ) -> tuple[list[Path], list[tuple[str, str]]]: """Run only the plot step for a single analysis type. Uses the same path resolution and context construction as ``run_comparison()`` but skips compute, aggregate, and compare. Aggregated results and comparison results must already exist on disk. Parameters ---------- analysis : Analysis The analysis plugin instance. config : ComparisonConfig Comparison configuration. equilibration : str | None Override equilibration time. Returns ------- tuple[list[Path], list[tuple[str, str]]] A tuple of (generated_paths, failures) where failures is a list of (analysis_name, error_message) tuples. """ return AnalysisLifecycle(analysis, settings_resolver=_resolve_settings).run_plot_only( config, equilibration=equilibration, )
[docs] def run_all_plots( config: "ComparisonConfig", analysis_names: list[str] | None = None, equilibration: str | None = None, ) -> tuple[list[Path], list[tuple[str, str]]]: """Run plot-only for all (or selected) enabled analyses. Parameters ---------- config : ComparisonConfig Comparison configuration. analysis_names : list[str] | None Analyses to plot. ``None`` means all enabled analyses. equilibration : str | None Override equilibration time. If ``None``, uses ``config.defaults.equilibration_time``. Returns ------- tuple[list[Path], list[tuple[str, str]]] A tuple of (generated_paths, failures) where failures is a list of (analysis_name, error_message) tuples. """ from polyzymd.analyses.discovery import get_analysis if analysis_names is None: analysis_names = _get_enabled_from_config(config) all_generated: list[Path] = [] all_failures: list[tuple[str, str]] = [] for name in analysis_names: try: analysis_cls = get_analysis(name) except KeyError: all_failures.append((name, f"Unknown analysis type {name!r}")) continue analysis = analysis_cls() generated, failures = run_plot_only( analysis, config, equilibration=equilibration, ) all_generated.extend(generated) all_failures.extend(failures) return all_generated, all_failures
# --------------------------------------------------------------------------- # Internal helpers # --------------------------------------------------------------------------- def _validate_dependencies(analyses: list[Analysis], satisfied: set[str] | None = None) -> None: """Validate that declared dependencies are discoverable and scheduled. This catches configuration errors early — e.g. a plugin declares ``dependencies = ("contacts",)`` but ``contacts`` isn't in the run list or doesn't exist. Parameters ---------- analyses : list[Analysis] Analyses scheduled for this execution pass. satisfied : set[str] | None, optional Dependency names considered already satisfied outside the scheduled analyses. """ from polyzymd.analyses.discovery import list_all_names known = list_all_names() resolved_satisfied = satisfied or set() scheduled = {a.name for a in analyses} for a in analyses: for dep in a.dependencies: if dep not in known: raise DependencyError( f"{a.name}: declared dependency {dep!r} is not a discoverable analysis plugin" ) if dep not in scheduled and dep not in resolved_satisfied: raise DependencyError( f"{a.name}: declared dependency {dep!r} is not in the current run list" ) def _get_enabled_from_config(config: "ComparisonConfig") -> list[str]: """Get list of enabled analysis names from unified plugin config.""" plugins = getattr(config, "plugins", None) if plugins is not None and hasattr(plugins, "get_enabled_plugins"): return plugins.get_enabled_plugins() return []