"""
Job submission for HPC SLURM scheduler.
This module provides utilities for submitting self-resubmitting MD
simulation jobs to SLURM. Each replicate gets a single job script
that calls ``polyzymd run-segment``, checks progress, and resubmits
itself until the simulation is complete.
.. versionchanged:: 1.1.0
Replaced the legacy daisy-chain (dependency-chain) model with
self-resubmitting jobs. The public API (``submit_daisy_chain``,
``DaisyChainConfig``, ``DaisyChainSubmitter``) is preserved for
backward compatibility but the internal behaviour is simplified.
"""
from __future__ import annotations
import logging
import os
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from polyzymd.config.schema import SimulationConfig
from polyzymd.workflow.slurm import (
JobContext,
SlurmConfig,
SlurmScriptGenerator,
parse_replicate_range,
validate_replicate_range,
)
LOGGER = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# squeue-based duplicate detection
# ---------------------------------------------------------------------------
[docs]
def check_existing_slurm_jobs(job_name: str) -> List[str]:
"""Query SLURM for RUNNING or PENDING jobs that match *job_name*.
This is a best-effort check: if ``squeue`` is unavailable (e.g. in a
non-SLURM environment or CI), a warning is logged and an empty list is
returned so that submission proceeds unimpeded.
Parameters
----------
job_name : str
The SLURM ``--job-name`` to search for (exact match).
Returns
-------
list of str
SLURM job IDs that are RUNNING or PENDING with the given name.
Empty if ``squeue`` is unavailable or returns no matches.
"""
try:
result = subprocess.run(
[
"squeue",
"--noheader",
"--name",
job_name,
"--states",
"RUNNING,PENDING",
"--format",
"%i",
],
capture_output=True,
text=True,
timeout=15,
)
except FileNotFoundError:
LOGGER.warning(
"squeue not found — skipping duplicate-job check "
"(this is expected outside of SLURM environments)"
)
return []
except subprocess.TimeoutExpired:
LOGGER.warning("squeue timed out — skipping duplicate-job check")
return []
except OSError as exc:
LOGGER.warning(f"squeue failed ({exc}) — skipping duplicate-job check")
return []
if result.returncode != 0:
LOGGER.warning(
f"squeue returned exit code {result.returncode} — skipping duplicate-job check"
)
return []
job_ids = [line.strip() for line in result.stdout.splitlines() if line.strip()]
return job_ids
[docs]
def create_job_name(sim_config: SimulationConfig, replicate: int) -> str:
"""Create a descriptive SLURM job name for a replicate.
Produces names like ``r1_310K_Fibronectin_SBMA-OEGMA_A75_B25``
matching the directory naming convention.
Parameters
----------
sim_config : SimulationConfig
Validated simulation configuration.
replicate : int
Replicate number.
Returns
-------
str
Formatted job name.
"""
enzyme = sim_config.enzyme.name
temp = int(sim_config.thermodynamics.temperature)
polymer_info = ""
if sim_config.polymers and sim_config.polymers.enabled:
prefix = sim_config.polymers.type_prefix
probs = {m.label: m.probability for m in sim_config.polymers.monomers}
composition = "_".join(f"{lbl}{int(probs[lbl] * 100)}" for lbl in sorted(probs))
polymer_info = f"_{prefix}_{composition}"
return f"r{replicate}_{temp}K_{enzyme}{polymer_info}"
[docs]
@dataclass
class DaisyChainConfig:
"""Configuration for job submission.
Despite the legacy name, this now configures single self-resubmitting
jobs (one per replicate) rather than dependency chains.
Attributes
----------
slurm_config : SlurmConfig
SLURM job configuration.
total_production_time_ns : float
Total production time in nanoseconds.
total_samples : int
Total trajectory frames across the entire production run.
equilibration_time_ns : float
Equilibration time (informational only).
replicates : list of int
Replicate numbers to run.
dry_run : bool
If True, create scripts but don't submit.
force : bool
If True, skip the squeue duplicate-job check and submit even if
a RUNNING/PENDING job already exists for the same replicate.
output_script_dir : Path
Directory for generated job scripts.
config_path : str
Path to the YAML configuration file.
"""
slurm_config: SlurmConfig
total_production_time_ns: float
total_samples: int = 2500
equilibration_time_ns: float = 0.5
replicates: List[int] = field(default_factory=lambda: [1])
dry_run: bool = False
force: bool = False
output_script_dir: Path = Path("daisy_chain_scripts")
config_path: str = "config.yaml"
[docs]
@classmethod
def from_simulation_config(
cls,
sim_config: SimulationConfig,
slurm_config: SlurmConfig,
replicates: Union[str, List[int]] = "1",
dry_run: bool = False,
force: bool = False,
output_script_dir: Union[str, Path] = "daisy_chain_scripts",
config_path: str = "config.yaml",
) -> "DaisyChainConfig":
"""Create DaisyChainConfig from a SimulationConfig.
Parameters
----------
sim_config : SimulationConfig
Simulation configuration.
slurm_config : SlurmConfig
SLURM configuration.
replicates : str or list of int
Replicate range string (e.g. ``"1-5"``) or list of ints.
dry_run : bool
If True, don't submit jobs.
force : bool
If True, skip duplicate-job check.
output_script_dir : str or Path
Directory for job scripts.
config_path : str
Path to the YAML configuration file.
Returns
-------
DaisyChainConfig
Configured instance.
"""
# Parse replicates if string
if isinstance(replicates, str):
validate_replicate_range(replicates)
replicate_list = parse_replicate_range(replicates)
else:
replicate_list = replicates
return cls(
slurm_config=slurm_config,
total_production_time_ns=sim_config.simulation_phases.production.duration,
total_samples=sim_config.simulation_phases.production.samples,
equilibration_time_ns=sim_config.simulation_phases.total_equilibration_duration,
replicates=replicate_list,
dry_run=dry_run,
force=force,
output_script_dir=Path(output_script_dir),
config_path=config_path,
)
[docs]
@dataclass
class SubmissionResult:
"""Result of job submission.
Attributes
----------
job_id : str
SLURM job ID (or dummy ID for dry run).
script_path : Path
Path to the generated script.
segment_index : int
Always 0 in the self-resubmitting model (kept for compatibility).
replicate : int
Replicate number.
is_dry_run : bool
Whether this was a dry run.
"""
job_id: str
script_path: Path
segment_index: int
replicate: int
is_dry_run: bool = False
[docs]
class DaisyChainSubmitter:
"""Handles job submission for MD simulations.
In the self-resubmitting model, each replicate gets a single job
script. The script calls ``polyzymd run-segment``, checks progress,
and resubmits itself until the simulation is complete.
Example
-------
>>> sim_config = SimulationConfig.from_yaml("config.yaml")
>>> slurm_config = SlurmConfig.from_preset("aa100", email="user@example.com")
>>> dc_config = DaisyChainConfig.from_simulation_config(
... sim_config, slurm_config, replicates="1-3"
... )
>>> submitter = DaisyChainSubmitter(sim_config, dc_config)
>>> results = submitter.submit_all()
"""
[docs]
def __init__(
self,
sim_config: SimulationConfig,
dc_config: DaisyChainConfig,
pixi_env: str = "cuda-12-4",
openff_logs: bool = False,
skip_build: bool = False,
) -> None:
"""Initialize the submitter.
Parameters
----------
sim_config : SimulationConfig
Simulation configuration.
dc_config : DaisyChainConfig
Submission configuration.
pixi_env : str
Pixi environment name (e.g. ``"cuda-12-4"``, ``"cuda-12-6"``).
openff_logs : bool
Enable verbose OpenFF logs in generated scripts.
skip_build : bool
Skip system building in generated scripts.
"""
self._sim_config = sim_config
self._dc_config = dc_config
self._openff_logs = openff_logs
self._skip_build = skip_build
self._generator = SlurmScriptGenerator(
dc_config.slurm_config, pixi_env, openff_logs=openff_logs, skip_build=skip_build
)
# Track submitted jobs per replicate
self._job_chains: Dict[int, List[SubmissionResult]] = {}
@property
def sim_config(self) -> SimulationConfig:
"""Get the simulation configuration."""
return self._sim_config
@property
def dc_config(self) -> DaisyChainConfig:
"""Get the submission configuration."""
return self._dc_config
@property
def job_chains(self) -> Dict[int, List[SubmissionResult]]:
"""Get the submission results for all replicates."""
return self._job_chains
def _create_job_name(self, replicate: int) -> str:
"""Create a descriptive job name for a replicate.
Delegates to the module-level :func:`create_job_name` function.
Parameters
----------
replicate : int
Replicate number.
Returns
-------
str
Formatted job name.
"""
return create_job_name(self._sim_config, replicate)
def _get_scratch_dir(self, replicate: int) -> str:
"""Get the scratch directory path for a replicate.
Parameters
----------
replicate : int
Replicate number.
Returns
-------
str
Absolute scratch directory path.
"""
scratch_dir = self._sim_config.get_working_directory(replicate)
return str(scratch_dir.resolve())
[docs]
def generate_job_script(self, replicate: int) -> str:
"""Generate a self-resubmitting job script for a replicate.
Parameters
----------
replicate : int
Replicate number.
Returns
-------
str
Complete SLURM batch script content.
"""
job_name = self._create_job_name(replicate)
logs_subdir = self._sim_config.output.slurm_logs_subdir
output_file = f"{logs_subdir}/{job_name}.%j.out"
return self._generator.generate_job_script(
config_path=self._dc_config.config_path,
replicate=replicate,
working_dir=self._get_scratch_dir(replicate),
job_name=job_name,
output_file=output_file,
)
def _save_script(self, content: str, filename: str) -> Path:
"""Save a script to the output directory.
Parameters
----------
content : str
Script content.
filename : str
Script filename.
Returns
-------
Path
Path to saved script.
"""
output_dir = self._dc_config.output_script_dir
output_dir.mkdir(parents=True, exist_ok=True)
script_path = output_dir / filename
with open(script_path, "w") as f:
f.write(content)
os.chmod(script_path, 0o755)
return script_path
def _submit_job(
self,
script_path: Path,
replicate: int,
) -> SubmissionResult:
"""Submit a job to SLURM.
Parameters
----------
script_path : Path
Path to the job script.
replicate : int
Replicate number.
Returns
-------
SubmissionResult
Submission result with job information.
"""
if self._dc_config.dry_run:
job_id = f"DRY_RUN_{replicate}"
LOGGER.info(f"[DRY RUN] Would submit {script_path}")
return SubmissionResult(
job_id=job_id,
script_path=script_path,
segment_index=0,
replicate=replicate,
is_dry_run=True,
)
# Use --export=NONE to start with clean environment, letting the
# script's pixi shell-hook initialization work properly regardless
# of submission context
cmd = ["sbatch", "--export=NONE"]
if self._dc_config.slurm_config.exclude:
cmd.extend(["--exclude", self._dc_config.slurm_config.exclude])
cmd.append(str(script_path))
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
job_id = result.stdout.strip().split()[-1]
LOGGER.info(f"Submitted job {job_id} for replicate {replicate}")
return SubmissionResult(
job_id=job_id,
script_path=script_path,
segment_index=0,
replicate=replicate,
is_dry_run=False,
)
except subprocess.CalledProcessError as e:
LOGGER.error(f"Error submitting job: {e}")
LOGGER.error(f"STDOUT: {e.stdout}")
LOGGER.error(f"STDERR: {e.stderr}")
raise RuntimeError(f"Failed to submit job: {e.stderr}") from e
[docs]
def submit_replicate(self, replicate: int) -> SubmissionResult:
"""Generate and submit the job for a single replicate.
Before submitting, checks ``squeue`` for existing RUNNING/PENDING
jobs with the same job name. If duplicates are found and
``force`` is not set, raises ``RuntimeError``.
Parameters
----------
replicate : int
Replicate number.
Returns
-------
SubmissionResult
Submission result.
Raises
------
RuntimeError
If a SLURM job is already RUNNING or PENDING for this
replicate and ``force`` is False.
"""
job_name = self._create_job_name(replicate)
# Best-effort duplicate guard (skipped for dry runs)
if not self._dc_config.dry_run and not self._dc_config.force:
existing = check_existing_slurm_jobs(job_name)
if existing:
ids = ", ".join(existing)
raise RuntimeError(
f"Replicate {replicate} already has RUNNING/PENDING SLURM "
f"job(s): {ids} (job name '{job_name}'). "
"Use --force to submit anyway."
)
LOGGER.info(f"Submitting self-resubmitting job for replicate {replicate}")
script_content = self.generate_job_script(replicate)
filename = f"run_rep{replicate}.sh"
script_path = self._save_script(script_content, filename)
result = self._submit_job(script_path=script_path, replicate=replicate)
# Store as a single-element list for backward compatibility
self._job_chains[replicate] = [result]
return result
[docs]
def submit_all(self) -> Dict[int, List[SubmissionResult]]:
"""Submit jobs for all replicates.
Returns
-------
dict
Mapping of replicate numbers to their submission results
(each value is a single-element list for compatibility).
"""
self._print_submission_summary()
for replicate in self._dc_config.replicates:
self.submit_replicate(replicate)
self._print_completion_summary()
return self._job_chains
def _print_submission_summary(self) -> None:
"""Print a summary before submission."""
config = self._dc_config
num_replicates = len(config.replicates)
LOGGER.info("\nPreparing self-resubmitting simulation jobs")
LOGGER.info(f" Enzyme: {self._sim_config.enzyme.name}")
if self._sim_config.polymers and self._sim_config.polymers.enabled:
LOGGER.info(f" Polymer: {self._sim_config.polymers.type_prefix}")
LOGGER.info(f" Polymer count: {self._sim_config.polymers.count}")
LOGGER.info(f" Temperature: {self._sim_config.thermodynamics.temperature} K")
LOGGER.info(f" Total production time: {config.total_production_time_ns} ns")
LOGGER.info(f" Total samples: {config.total_samples}")
LOGGER.info(f" Replicates: {config.replicates} ({num_replicates} total)")
LOGGER.info(f" Jobs to submit: {num_replicates} (one per replicate, self-resubmitting)")
LOGGER.info("")
LOGGER.info("SLURM Configuration:")
LOGGER.info(f" Partition: {config.slurm_config.partition}")
if config.slurm_config.qos:
LOGGER.info(f" QoS: {config.slurm_config.qos}")
if config.slurm_config.account:
LOGGER.info(f" Account: {config.slurm_config.account}")
LOGGER.info(f" Time limit: {config.slurm_config.time_limit}")
LOGGER.info("")
if config.dry_run:
LOGGER.info("*** DRY RUN MODE - Scripts will be created but not submitted ***")
LOGGER.info("")
def _print_completion_summary(self) -> None:
"""Print a summary after submission."""
config = self._dc_config
total_jobs = sum(len(chain) for chain in self._job_chains.values())
if config.dry_run:
LOGGER.info(f"\nDry run completed. {total_jobs} job script(s) created.")
LOGGER.info(f"Scripts saved to: {config.output_script_dir}")
LOGGER.info("Review the scripts and run without --dry-run to submit them.")
else:
LOGGER.info(f"\nAll {total_jobs} job(s) submitted successfully!")
LOGGER.info("\nSubmitted jobs:")
for replicate, results in sorted(self._job_chains.items()):
job_id = results[0].job_id
LOGGER.info(f" Replicate {replicate}: job {job_id} (self-resubmitting)")
LOGGER.info("\nEach job will automatically resubmit until the simulation completes.")
LOGGER.info("Monitor progress with: squeue -u $USER")
LOGGER.info(
"Check simulation status with: polyzymd check-progress -c <config> -r <rep>"
)
[docs]
def submit_daisy_chain(
config_path: Union[str, Path],
slurm_preset: str = "aa100",
replicates: str = "1",
email: str = "",
dry_run: bool = False,
force: bool = False,
pixi_env: str = "cuda-12-4",
output_dir: Optional[Union[str, Path]] = None,
scratch_dir: Optional[Union[str, Path]] = None,
projects_dir: Optional[Union[str, Path]] = None,
time_limit: Optional[str] = None,
memory: Optional[str] = None,
account: Optional[str] = None,
gpu_type: Optional[str] = None,
openff_logs: bool = False,
skip_build: bool = False,
) -> Dict[int, List[SubmissionResult]]:
"""Submit self-resubmitting simulation jobs from a YAML config.
This is the main entry point called by ``polyzymd submit``. Despite
the legacy function name, it now submits one self-resubmitting job
per replicate rather than a chain of dependent jobs.
Parameters
----------
config_path : str or Path
Path to simulation YAML config.
slurm_preset : str
SLURM preset name (aa100, al40, blanca-shirts, bridges2, testing).
replicates : str
Replicate range string (e.g. ``"1-5"``, ``"1,3,5"``).
email : str
Email for job notifications.
dry_run : bool
If True, don't submit jobs.
force : bool
If True, skip the squeue duplicate-job check.
pixi_env : str
Pixi environment name (e.g. ``"cuda-12-4"``, ``"cuda-12-6"``).
output_dir : str or Path or None
Directory for job scripts.
scratch_dir : str or Path or None
Override scratch directory for simulation output.
projects_dir : str or Path or None
Override projects directory for scripts/logs.
time_limit : str or None
Override SLURM time limit (format: ``HH:MM:SS``).
memory : str or None
Override SLURM memory allocation (e.g. ``"4G"``).
account : str or None
Override SLURM account / allocation ID.
gpu_type : str or None
Override GPU type for presets that use ``--gpus`` directive.
openff_logs : bool
Enable verbose OpenFF logs in generated scripts.
skip_build : bool
Skip system building in generated scripts.
Returns
-------
dict
Mapping of replicate numbers to submission results.
Raises
------
ValueError
If the SLURM account is empty on a preset that requires one
and ``dry_run`` is False.
"""
# Load simulation config
sim_config = SimulationConfig.from_yaml(config_path)
# Apply CLI overrides for directories
if scratch_dir:
sim_config.output.scratch_directory = Path(scratch_dir)
if projects_dir:
sim_config.output.projects_directory = Path(projects_dir)
# Determine output script directory
if output_dir:
script_output_dir = Path(output_dir)
else:
script_output_dir = sim_config.output.get_job_scripts_directory()
# Create SLURM config from preset
slurm_config = SlurmConfig.from_preset(slurm_preset, email=email) # type: ignore[arg-type]
# Record whether the preset itself ships with an empty account.
preset_account_is_empty = not slurm_config.account
# Apply CLI overrides
if time_limit:
slurm_config.time_limit = time_limit
if memory:
slurm_config.memory = memory
if account:
slurm_config.account = account
if gpu_type:
slurm_config.gpu_type = gpu_type
# Guard: an empty account on presets that require one (e.g. Alpine) will
# produce an invalid SBATCH script. Skip the guard when the preset itself
# ships with account="" (e.g. bridges2).
if not slurm_config.account and not preset_account_is_empty:
msg = (
f"SLURM account is required but was not set for preset '{slurm_preset}'. "
"Pass your allocation ID with --account <id>."
)
if dry_run:
LOGGER.warning(msg)
else:
raise ValueError(msg)
# Create submission config
dc_config = DaisyChainConfig.from_simulation_config(
sim_config=sim_config,
slurm_config=slurm_config,
replicates=replicates,
dry_run=dry_run,
force=force,
output_script_dir=script_output_dir,
config_path=str(Path(config_path).resolve()),
)
# Create submitter and submit
submitter = DaisyChainSubmitter(
sim_config, dc_config, pixi_env=pixi_env, openff_logs=openff_logs, skip_build=skip_build
)
return submitter.submit_all()