Source code for polyzymd.compare.comparators.contacts

"""Contacts comparator for comparing polymer-protein contacts across conditions.

This module provides the ContactsComparator class that orchestrates
contacts analysis and statistical comparison across multiple conditions.

Key features:
- Aggregate-level comparisons (coverage, mean contact fraction)
- Effect size (Cohen's d) for practical significance
- ANOVA for 3+ conditions
- Auto-exclusion of conditions without polymer (e.g., "No Polymer" controls)

The comparator inherits from BaseComparator and implements the Template Method
pattern for DRY comparison logic. Since contacts has TWO primary metrics
(coverage and mean_contact_fraction), some methods are customized.

Note:
    Per-residue pairwise comparisons have been removed. Contact data is
    mechanistic (explains WHY stability changes), not an observable. Per-residue
    contact-RMSF correlations are computed in `polyzymd compare report`.
"""

from __future__ import annotations

import logging
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar

import numpy as np

from polyzymd import __version__
from polyzymd.analysis.core.metric_type import MetricType
from polyzymd.compare.core.base import ANOVASummary, BaseComparator
from polyzymd.compare.core.registry import ComparatorRegistry
from polyzymd.compare.results.contacts import (
    AggregateComparisonResult,
    BindingPreferenceComparisonEntry,
    BindingPreferenceComparisonSummary,
    ContactsANOVASummary,
    ContactsComparisonResult,
    ContactsConditionSummary,
    ContactsPairwiseComparison,
)
from polyzymd.compare.settings import ContactsAnalysisSettings, ContactsComparisonSettings
from polyzymd.compare.statistics import (
    cohens_d,
    independent_ttest,
    one_way_anova,
    percent_change,
)

if TYPE_CHECKING:
    from polyzymd.analysis.contacts.aggregator import AggregatedContactResult
    from polyzymd.compare.config import ComparisonConfig, ConditionConfig

logger = logging.getLogger("polyzymd.compare")


# Type alias for condition data
ContactsConditionData = dict[str, Any]


[docs] @ComparatorRegistry.register("contacts") class ContactsComparator( BaseComparator[ ContactsAnalysisSettings, ContactsConditionData, ContactsConditionSummary, ContactsComparisonResult, ] ): """Compare polymer-protein contacts across multiple simulation conditions. This class loads contacts analysis results for each condition (computing them if necessary), then performs statistical comparisons including: - Aggregate-level comparisons (coverage, mean contact fraction) - ANOVA for 3+ conditions - Effect sizes (Cohen's d) for practical significance Parameters ---------- config : ComparisonConfig Comparison configuration defining conditions. analysis_settings : ContactsAnalysisSettings Settings defining what contacts to analyze (selections, cutoff). comparison_settings : ContactsComparisonSettings, optional Settings for how to compare (FDR alpha, effect sizes). Defaults to ContactsComparisonSettings() if not provided. equilibration : str, optional Equilibration time override (e.g., "10ns"). If None, uses config.defaults.equilibration_time. Examples -------- >>> config = ComparisonConfig.from_yaml("comparison.yaml") >>> analysis_settings = config.analysis_settings.get("contacts") >>> comparison_settings = config.comparison_settings.get("contacts") >>> comparator = ContactsComparator(config, analysis_settings, comparison_settings) >>> result = comparator.compare() >>> print(result.ranking_by_coverage) ["100% SBMA", "50/50 Mix", "100% EGMA"] Notes ----- - Higher contact fraction is considered "better" (more polymer-protein interaction) - Conditions without polymer atoms are automatically excluded - This is a MEAN_BASED metric (contact fractions are averages) """ comparison_type: ClassVar[str] = "contacts"
[docs] def __init__( self, config: "ComparisonConfig", analysis_settings: ContactsAnalysisSettings, comparison_settings: ContactsComparisonSettings | None = None, equilibration: str | None = None, ): super().__init__(config, analysis_settings, equilibration) self.comparison_settings = comparison_settings or ContactsComparisonSettings()
[docs] @classmethod def comparison_type_name(cls) -> str: """Return the comparison type identifier.""" return "contacts"
@property def metric_type(self) -> MetricType: """Contact fraction is a mean-based metric. Contact fraction is the average fraction of frames where a residue is in contact with the polymer. This is an average over frames, so the mean converges regardless of autocorrelation. However, we need to correct uncertainty using N_eff (effective sample size). Returns ------- MetricType MetricType.MEAN_BASED """ return MetricType.MEAN_BASED # ======================================================================== # Override compare() to handle contacts-specific dual-metric flow # ========================================================================
[docs] def compare(self, recompute: bool = False) -> ContactsComparisonResult: """Run comparison across all conditions. Overrides base to handle contacts-specific logic: - Dual metrics (coverage and mean_contact_fraction) - Auto-exclusion of no-polymer conditions - Custom result building Parameters ---------- recompute : bool, optional If True, force recompute even if cached results exist. Returns ------- ContactsComparisonResult Complete comparison results with statistics and rankings. """ # Store recompute flag for use by binding preference method self._recompute = recompute logger.info(f"Starting contacts comparison: {self.config.name}") logger.info(f"Conditions: {len(self.config.conditions)}") logger.info(f"Equilibration: {self.equilibration}") # Step 1: Filter conditions (exclude no-polymer) valid_conditions, excluded_conditions = self._filter_conditions() if excluded_conditions: logger.warning( f"Auto-excluding {len(excluded_conditions)} condition(s) without polymer: " f"{[c.label for c in excluded_conditions]}" ) if len(valid_conditions) < 2: raise ValueError( f"Need at least 2 conditions with polymer for contacts comparison. " f"Found {len(valid_conditions)} valid condition(s). " f"Excluded: {[c.label for c in excluded_conditions]}" ) # Step 2: Load or compute analysis for each condition condition_data: list[tuple["ConditionConfig", ContactsConditionData]] = [] for cond in valid_conditions: data = self._load_or_compute(cond, recompute) condition_data.append((cond, data)) # Step 3: Validate identical residue sets self._validate_residue_sets(condition_data) # Step 4: Build condition summaries summaries: list[ContactsConditionSummary] = [] for cond, data in condition_data: summary = self._build_condition_summary(cond, data) summaries.append(summary) # Step 5: Determine effective control effective_control = self._get_effective_control(excluded_conditions) # Step 6: Compute pairwise comparisons (custom for contacts) comparisons = self._compute_contacts_pairwise(summaries, condition_data, effective_control) # Step 7: ANOVA if 3+ conditions (custom for contacts - dual metrics) anova_results: list[ContactsANOVASummary] = [] if len(summaries) >= 3: anova_results = self._compute_contacts_anova(condition_data) # Step 8: Rankings (dual - by coverage and by contact fraction) ranked_coverage = sorted(summaries, key=lambda s: s.coverage_mean, reverse=True) ranked_contact = sorted(summaries, key=lambda s: s.mean_contact_fraction, reverse=True) # Step 9: Load or compute binding preference (if enabled or pre-existing) binding_pref_summary = self._load_or_compute_binding_preference( valid_conditions, condition_data ) # Step 10: Build result return ContactsComparisonResult( name=self.config.name, contacts_name="polymer_contacts", contacts_description=None, polymer_selection=self.analysis_settings.polymer_selection, protein_selection=self.analysis_settings.protein_selection, cutoff=self.analysis_settings.cutoff, contact_criteria="distance", fdr_alpha=self.comparison_settings.fdr_alpha, control_label=effective_control, conditions=summaries, pairwise_comparisons=comparisons, anova=anova_results, ranking_by_coverage=[s.label for s in ranked_coverage], ranking_by_contact_fraction=[s.label for s in ranked_contact], excluded_conditions=[c.label for c in excluded_conditions], binding_preference=binding_pref_summary, equilibration_time=self.equilibration, created_at=datetime.now(), polyzymd_version=__version__, )
# ======================================================================== # Abstract Method Implementations # ======================================================================== def _load_or_compute( self, cond: "ConditionConfig", recompute: bool, ) -> ContactsConditionData: """Load existing contacts results or compute them. Parameters ---------- cond : ConditionConfig Condition to analyze. recompute : bool Force recompute even if cached. Returns ------- dict Dictionary with aggregated result and per-replicate values. """ from polyzymd.analysis.contacts.aggregator import aggregate_contact_results from polyzymd.analysis.contacts.results import ContactResult from polyzymd.config.schema import SimulationConfig logger.info(f"Processing condition: {cond.label}") # Load simulation config sim_config = SimulationConfig.from_yaml(cond.config) # Resolve condition-specific output directory (None in standalone mode) condition_output_dir = self._resolve_condition_output_dir(cond.label, "contacts") # Load or compute individual replicate results results: list[ContactResult] = [] successful_reps: list[int] = [] for rep in cond.replicates: result = self._load_or_compute_replicate( sim_config, rep, recompute, cond_config_path=cond.config, condition_output_dir=condition_output_dir, ) if result is not None: results.append(result) successful_reps.append(rep) if not results: raise ValueError(f"No successful replicates for condition: {cond.label}") if len(results) < len(cond.replicates): missing = set(cond.replicates) - set(successful_reps) logger.warning( f" Missing replicates for {cond.label}: {missing} " f"(using {len(results)} of {len(cond.replicates)})" ) # Aggregate results logger.info(f" Aggregating {len(results)} replicates...") agg_result = aggregate_contact_results(results) # Persist aggregated result so downstream plotters can load it output_dir = condition_output_dir or ( sim_config.output.projects_directory / "analysis" / "contacts" ) output_dir.mkdir(parents=True, exist_ok=True) rep_range = f"{min(successful_reps)}-{max(successful_reps)}" agg_file = output_dir / f"contacts_aggregated_reps{rep_range}.json" agg_result.save(agg_file) logger.info(f" Saved aggregated contacts: {agg_file}") # Compute per-replicate values for statistical tests coverage_per_rep = self._compute_coverage_per_replicate(agg_result) contact_fraction_per_rep = self._compute_contact_fraction_per_replicate(agg_result) return { "agg_result": agg_result, "coverage_per_replicate": coverage_per_rep, "contact_fraction_per_replicate": contact_fraction_per_rep, } def _build_condition_summary( self, cond: "ConditionConfig", data: ContactsConditionData, ) -> ContactsConditionSummary: """Build a contacts condition summary from raw data. Parameters ---------- cond : ConditionConfig Condition configuration. data : dict Raw analysis data from _load_or_compute. Returns ------- ContactsConditionSummary Structured condition summary. """ agg_result = data["agg_result"] return ContactsConditionSummary( label=cond.label, config_path=str(cond.config), n_replicates=agg_result.n_replicates, n_residues=agg_result.n_residues, coverage_mean=agg_result.coverage_mean, coverage_sem=agg_result.coverage_sem, mean_contact_fraction=agg_result.mean_contact_fraction, mean_contact_fraction_sem=agg_result.mean_contact_fraction_sem, residence_time_by_polymer_type=agg_result.residence_time_by_polymer_type, ) def _get_mean_value(self, summary: ContactsConditionSummary) -> float: """Get the mean contact fraction value (primary metric).""" return summary.mean_contact_fraction @property def _direction_labels(self) -> tuple[str, str, str]: """Positive contact change = increased contact.""" return ("decreased", "unchanged", "increased") def _rank_summaries( self, summaries: list[ContactsConditionSummary] ) -> list[ContactsConditionSummary]: """Sort summaries by contact fraction (highest first = most contact).""" return sorted(summaries, key=lambda s: s.mean_contact_fraction, reverse=True) # ======================================================================== # Override _filter_conditions for polymer detection # ======================================================================== def _filter_conditions( self, ) -> tuple[list["ConditionConfig"], list["ConditionConfig"]]: """Filter conditions to only those with polymer atoms. Conditions without polymer atoms (e.g., "No Polymer" controls) are excluded from contacts analysis since there's nothing to analyze. This method first checks for cached contact results. If cached results exist, the condition is considered valid (since it must have had polymer to generate contacts). This allows analysis to proceed even when the original trajectory files are not available locally. Returns ------- tuple[list[ConditionConfig], list[ConditionConfig]] (valid_conditions, excluded_conditions) """ import MDAnalysis as mda from polyzymd.config.schema import SimulationConfig valid_conditions = [] excluded_conditions = [] for cond in self.config.conditions: # Load simulation config and check first available replicate try: sim_config = SimulationConfig.from_yaml(cond.config) # First check: do cached contact results exist? # If so, trust them - the condition must have had polymer has_cached_results = False condition_output_dir = self._resolve_condition_output_dir(cond.label, "contacts") for rep in cond.replicates: result_path = self._find_replicate_result( sim_config, rep, cond_config_path=cond.config, condition_output_dir=condition_output_dir, ) if result_path and result_path.exists(): has_cached_results = True logger.debug( f" {cond.label} rep {rep}: found cached result at {result_path}" ) break if has_cached_results: valid_conditions.append(cond) logger.debug(f" Including '{cond.label}': cached results found") continue # Fallback: check topology for polymer atoms has_polymer = False for rep in cond.replicates: run_dir = sim_config.get_working_directory(rep) topology_path = run_dir / "solvated_system.pdb" if not topology_path.exists(): continue # Load topology and check polymer selection try: universe = mda.Universe(str(topology_path)) polymer_atoms = universe.select_atoms( self.analysis_settings.polymer_selection ) if len(polymer_atoms) > 0: has_polymer = True logger.debug( f" {cond.label} rep {rep}: {len(polymer_atoms)} polymer atoms" ) break else: logger.debug( f" {cond.label} rep {rep}: 0 polymer atoms with selection " f"'{self.analysis_settings.polymer_selection}'" ) except Exception as e: logger.warning(f" Error checking {cond.label} rep {rep}: {e}") continue if has_polymer: valid_conditions.append(cond) else: excluded_conditions.append(cond) logger.info( f" Excluding '{cond.label}': no polymer atoms found with selection " f"'{self.analysis_settings.polymer_selection}'" ) except Exception as e: logger.warning(f" Error processing condition '{cond.label}': {e}") # If we can't check, include it and let the analysis fail later valid_conditions.append(cond) return valid_conditions, excluded_conditions # ======================================================================== # Contacts-Specific Helper Methods # ======================================================================== def _load_or_compute_replicate( self, sim_config: Any, replicate: int, recompute: bool, cond_config_path: Path | None = None, condition_output_dir: Path | None = None, ) -> Any | None: """Load or compute contacts for a single replicate. Parameters ---------- sim_config : SimulationConfig Simulation configuration. replicate : int Replicate number. recompute : bool Force recompute. cond_config_path : Path, optional Path to condition's config.yaml for fallback result location. condition_output_dir : Path, optional Condition-specific output directory (from comparison mode). Checked first before falling back to ``projects_directory``. Returns ------- ContactResult or None Contact result, or None if replicate doesn't exist. """ from polyzymd.analysis.contacts.results import ContactResult # Try to find existing result result_path = self._find_replicate_result( sim_config, replicate, cond_config_path=cond_config_path, condition_output_dir=condition_output_dir, ) if result_path and result_path.exists() and not recompute: logger.info(f" Loading cached result for rep {replicate}: {result_path}") result = ContactResult.load(result_path) # Check if it has per-residue statistics if not result.has_per_residue_statistics(): logger.warning( f" Cached result for rep {replicate} missing per-residue stats, recomputing..." ) return self._compute_replicate( sim_config, replicate, condition_output_dir=condition_output_dir, ) return result return self._compute_replicate( sim_config, replicate, condition_output_dir=condition_output_dir, ) def _compute_replicate( self, sim_config: Any, replicate: int, condition_output_dir: Path | None = None, ) -> Any | None: """Compute contacts analysis for a single replicate. Parameters ---------- sim_config : SimulationConfig Simulation configuration. replicate : int Replicate number. condition_output_dir : Path, optional Condition-specific output directory (from comparison mode). If provided, results are saved here instead of ``projects_directory``. Returns ------- ContactResult or None Contact result, or None if data doesn't exist. """ import MDAnalysis as mda from polyzymd.analysis.common.selectors import MDAnalysisSelector from polyzymd.analysis.contacts.calculator_parallel import ParallelContactAnalyzer from polyzymd.analysis.core.loader import TrajectoryLoader, convert_time, parse_time_string logger.info(f" Computing contacts for replicate {replicate}...") # Use TrajectoryLoader for consistent path resolution (DRY) loader = TrajectoryLoader(sim_config) try: traj_info = loader.get_trajectory_info(replicate) except FileNotFoundError as e: logger.warning(f" Skipping replicate {replicate}: trajectory data not found. {e}") return None try: # Create universe from resolved paths traj_files = [str(p) for p in traj_info.trajectory_files] universe = mda.Universe(str(traj_info.topology_file), traj_files) # Convert equilibration time to start frame eq_value, eq_unit = parse_time_string(self.equilibration) eq_time_ps = convert_time(eq_value, eq_unit, "ps") # Get timestep from trajectory try: timestep_ps = universe.trajectory.dt except (AttributeError, ValueError): timestep_ps = 1.0 start_frame = int(eq_time_ps / timestep_ps) if timestep_ps > 0 else 0 logger.info(f" Equilibration: {self.equilibration} -> frame {start_frame}") # Create selectors target_selector = MDAnalysisSelector(self.analysis_settings.protein_selection) query_selector = MDAnalysisSelector(self.analysis_settings.polymer_selection) # Create analyzer analyzer = ParallelContactAnalyzer( target_selector=target_selector, query_selector=query_selector, cutoff=self.analysis_settings.cutoff, ) # Run analysis result = analyzer.run( universe, start=start_frame, ) # Save result — use condition-specific dir if available if condition_output_dir is not None: output_dir = condition_output_dir else: output_dir = sim_config.output.projects_directory / "analysis" / "contacts" output_dir.mkdir(parents=True, exist_ok=True) output_file = output_dir / f"contacts_rep{replicate}.json" result.save(output_file) logger.info(f" Saved: {output_file}") return result except Exception as e: logger.warning(f" Skipping replicate {replicate}: analysis failed with error: {e}") return None def _find_replicate_result( self, sim_config: Any, replicate: int, cond_config_path: Path | None = None, condition_output_dir: Path | None = None, ) -> Path | None: """Find path to existing replicate contacts result. Checks multiple locations in order: 1. condition_output_dir (condition-specific, comparison mode) 2. sim_config.output.projects_directory / analysis / contacts / 3. cond_config_path.parent / analysis / contacts / (if provided) This allows cached results to be found even when the original projects_directory points to a remote/unavailable location. Parameters ---------- sim_config : SimulationConfig Simulation configuration object. replicate : int Replicate number. cond_config_path : Path, optional Path to the condition's config.yaml file. Used as fallback location for finding cached results. condition_output_dir : Path, optional Condition-specific output directory (from comparison mode). Checked first before falling back to ``projects_directory``. Returns ------- Path or None Path to the result file if found, None otherwise. """ result_filename = f"contacts_rep{replicate}.json" # Check condition-specific path first (comparison mode) if condition_output_dir is not None: cond_path = condition_output_dir / result_filename if cond_path.exists(): return cond_path # In comparison mode, do NOT fall back to the shared # projects_directory — all conditions share the same path and # the cached file would belong to whichever condition wrote it # first. Return None to trigger recomputation into the # condition-specific directory. return None # Fallback to shared helper for projects_directory and config parent # (standalone mode only) from polyzymd.compare.comparators._utils import find_replicate_result return find_replicate_result( sim_config, replicate, result_filename=result_filename, analysis_subdir="analysis/contacts", cond_config_path=cond_config_path, ) def _get_analysis_dir( self, sim_config: Any, cond_config_path: Path | None = None, condition_output_dir: Path | None = None, ) -> Path: """Get analysis directory with fallback to condition config parent. Checks multiple locations in order: 1. condition_output_dir (condition-specific, comparison mode) 2. sim_config.output.projects_directory / analysis / contacts / 3. cond_config_path.parent / analysis / contacts / (if provided and exists) This allows cached results to be found even when the original projects_directory points to a remote/unavailable location. Parameters ---------- sim_config : SimulationConfig Simulation configuration object. cond_config_path : Path, optional Path to the condition's config.yaml file. Used as fallback location for finding cached results. condition_output_dir : Path, optional Condition-specific output directory (from comparison mode). Checked first before falling back to ``projects_directory``. Returns ------- Path Analysis directory path (primary location, or fallback if it exists). """ # Check condition-specific path first (comparison mode) if condition_output_dir is not None: if condition_output_dir.exists(): return condition_output_dir from polyzymd.compare.comparators._utils import find_analysis_dir return find_analysis_dir( sim_config, analysis_subdir="analysis/contacts", cond_config_path=cond_config_path, ) def _validate_residue_sets( self, condition_data: list[tuple["ConditionConfig", ContactsConditionData]], ) -> None: """Validate that all conditions have identical residue sets. Parameters ---------- condition_data : list[tuple[ConditionConfig, dict]] Condition data to validate. Raises ------ ValueError If residue sets differ between conditions. """ if len(condition_data) < 2: return first_cond, first_data = condition_data[0] first_result = first_data["agg_result"] first_resids = {rs.protein_resid for rs in first_result.residue_stats} for cond, data in condition_data[1:]: result = data["agg_result"] resids = {rs.protein_resid for rs in result.residue_stats} if resids != first_resids: missing_in_first = resids - first_resids missing_in_other = first_resids - resids raise ValueError( f"Residue set mismatch between '{first_cond.label}' and '{cond.label}'. " f"Missing in {first_cond.label}: {sorted(missing_in_first)}, " f"Missing in {cond.label}: {sorted(missing_in_other)}. " f"All conditions must have identical protein residue sets." ) def _compute_coverage_per_replicate( self, result: "AggregatedContactResult", ) -> list[float]: """Compute coverage per replicate from residue stats. Parameters ---------- result : AggregatedContactResult Aggregated result. Returns ------- list[float] Coverage for each replicate. """ n_replicates = result.n_replicates n_residues = result.n_residues # Count residues with any contact per replicate coverages = [] for rep_idx in range(n_replicates): contacted = sum( 1 for rs in result.residue_stats if rs.contact_fraction_per_replicate[rep_idx] > 0 ) coverages.append(contacted / n_residues if n_residues > 0 else 0.0) return coverages def _compute_contact_fraction_per_replicate( self, result: "AggregatedContactResult", ) -> list[float]: """Compute mean contact fraction per replicate. Parameters ---------- result : AggregatedContactResult Aggregated result. Returns ------- list[float] Mean contact fraction for each replicate. """ n_replicates = result.n_replicates # Mean contact fraction per replicate fractions = [] for rep_idx in range(n_replicates): rep_fractions = [ rs.contact_fraction_per_replicate[rep_idx] for rs in result.residue_stats ] mean_frac = np.mean(rep_fractions) if rep_fractions else 0.0 fractions.append(float(mean_frac)) return fractions def _compute_contacts_pairwise( self, summaries: list[ContactsConditionSummary], condition_data: list[tuple["ConditionConfig", ContactsConditionData]], effective_control: str | None, ) -> list[ContactsPairwiseComparison]: """Compute pairwise statistical comparisons for contacts. Unlike other comparators, contacts compares TWO metrics (coverage and mean_contact_fraction) in each pairwise comparison. Parameters ---------- summaries : list[ContactsConditionSummary] Condition summaries. condition_data : list[tuple[ConditionConfig, dict]] Raw condition data with per-replicate values. effective_control : str or None Control condition label. Returns ------- list[ContactsPairwiseComparison] Pairwise comparison results. """ comparisons = [] # Build label -> data mapping label_to_data = {cond.label: data for cond, data in condition_data} label_to_summary = {s.label: s for s in summaries} if effective_control: # Compare all vs control control_data = label_to_data[effective_control] control_summary = label_to_summary[effective_control] for summary in summaries: if summary.label == effective_control: continue data = label_to_data[summary.label] comp = self._compare_contacts_pair( effective_control, control_summary, control_data, summary.label, summary, data, ) comparisons.append(comp) else: # Compare all pairs labels = [s.label for s in summaries] for i in range(len(labels)): for j in range(i + 1, len(labels)): label_a, label_b = labels[i], labels[j] comp = self._compare_contacts_pair( label_a, label_to_summary[label_a], label_to_data[label_a], label_b, label_to_summary[label_b], label_to_data[label_b], ) comparisons.append(comp) return comparisons def _compare_contacts_pair( self, label_a: str, summary_a: ContactsConditionSummary, data_a: ContactsConditionData, label_b: str, summary_b: ContactsConditionSummary, data_b: ContactsConditionData, ) -> ContactsPairwiseComparison: """Compare two conditions for both coverage and contact fraction. Parameters ---------- label_a : str Label of first condition. summary_a : ContactsConditionSummary Summary for first condition. data_a : dict Raw data for first condition. label_b : str Label of second condition. summary_b : ContactsConditionSummary Summary for second condition. data_b : dict Raw data for second condition. Returns ------- ContactsPairwiseComparison Comparison with both metrics. """ aggregate_comps = [] # Coverage comparison coverage_a = data_a["coverage_per_replicate"] coverage_b = data_b["coverage_per_replicate"] ttest = independent_ttest(coverage_a, coverage_b) effect = cohens_d(coverage_a, coverage_b, rmsf_mode=False) pct = percent_change(summary_a.coverage_mean, summary_b.coverage_mean) aggregate_comps.append( AggregateComparisonResult( metric="coverage", condition_a=label_a, condition_b=label_b, condition_a_mean=summary_a.coverage_mean, condition_a_sem=summary_a.coverage_sem, condition_b_mean=summary_b.coverage_mean, condition_b_sem=summary_b.coverage_sem, t_statistic=ttest.t_statistic, p_value=ttest.p_value, cohens_d=effect.cohens_d, effect_size_interpretation=effect.interpretation, significant=ttest.significant, percent_change=pct, direction=self._interpret_direction(pct), ) ) # Mean contact fraction comparison contact_a = data_a["contact_fraction_per_replicate"] contact_b = data_b["contact_fraction_per_replicate"] ttest = independent_ttest(contact_a, contact_b) effect = cohens_d(contact_a, contact_b, rmsf_mode=False) pct = percent_change(summary_a.mean_contact_fraction, summary_b.mean_contact_fraction) aggregate_comps.append( AggregateComparisonResult( metric="mean_contact_fraction", condition_a=label_a, condition_b=label_b, condition_a_mean=summary_a.mean_contact_fraction, condition_a_sem=summary_a.mean_contact_fraction_sem, condition_b_mean=summary_b.mean_contact_fraction, condition_b_sem=summary_b.mean_contact_fraction_sem, t_statistic=ttest.t_statistic, p_value=ttest.p_value, cohens_d=effect.cohens_d, effect_size_interpretation=effect.interpretation, significant=ttest.significant, percent_change=pct, direction=self._interpret_direction(pct), ) ) return ContactsPairwiseComparison( condition_a=label_a, condition_b=label_b, aggregate_comparisons=aggregate_comps, ) def _compute_contacts_anova( self, condition_data: list[tuple["ConditionConfig", ContactsConditionData]], ) -> list[ContactsANOVASummary]: """Compute one-way ANOVA for both aggregate metrics. Parameters ---------- condition_data : list[tuple[ConditionConfig, dict]] Condition data. Returns ------- list[ContactsANOVASummary] ANOVA results for coverage and mean_contact_fraction. """ results = [] # Coverage ANOVA coverage_groups = [data["coverage_per_replicate"] for _, data in condition_data] anova_coverage = one_way_anova(*coverage_groups) results.append( ContactsANOVASummary( metric="coverage", f_statistic=anova_coverage.f_statistic, p_value=anova_coverage.p_value, significant=anova_coverage.significant, ) ) # Mean contact fraction ANOVA contact_groups = [data["contact_fraction_per_replicate"] for _, data in condition_data] anova_contact = one_way_anova(*contact_groups) results.append( ContactsANOVASummary( metric="mean_contact_fraction", f_statistic=anova_contact.f_statistic, p_value=anova_contact.p_value, significant=anova_contact.significant, ) ) return results # ======================================================================== # Binding Preference Comparison # ======================================================================== def _load_or_compute_binding_preference( self, conditions: list["ConditionConfig"], condition_data: list[tuple["ConditionConfig", ContactsConditionData]], ) -> BindingPreferenceComparisonSummary | None: """Load or compute binding preference results across conditions. If compute_binding_preference is enabled in settings and data is missing (or recompute=True), computes binding preference from contacts data and surface exposure. Otherwise, attempts to load pre-existing results. Parameters ---------- conditions : list[ConditionConfig] Conditions to compare. condition_data : list[tuple[ConditionConfig, ContactsConditionData]] Already-loaded contacts data for each condition (used for compute). Returns ------- BindingPreferenceComparisonSummary or None Comparison summary, or None if no binding preference data available. """ from polyzymd.analysis.contacts.binding_preference import ( AggregatedBindingPreferenceResult, BindingPreferenceResult, aggregate_binding_preference, compute_binding_preference, resolve_protein_groups_from_surface_exposure, ) from polyzymd.analysis.contacts.results import ContactResult from polyzymd.analysis.contacts.surface_exposure import SurfaceExposureFilter from polyzymd.config.schema import SimulationConfig compute_enabled = getattr(self.analysis_settings, "compute_binding_preference", False) recompute = getattr(self, "_recompute", False) logger.info(f"Binding preference: compute_enabled={compute_enabled}, recompute={recompute}") # Collect binding preference results per condition condition_results: dict[ str, AggregatedBindingPreferenceResult | BindingPreferenceResult ] = {} surface_threshold: float | None = None # Build condition data lookup for compute path cond_data_map = {cond.label: data for cond, data in condition_data} for cond in conditions: try: sim_config = SimulationConfig.from_yaml(cond.config) # Resolve condition-specific output dir for binding preference condition_output_dir = self._resolve_condition_output_dir(cond.label, "contacts") analysis_dir = self._get_analysis_dir( sim_config, cond_config_path=cond.config, condition_output_dir=condition_output_dir, ) logger.debug(f"Binding preference for {cond.label}: analysis_dir={analysis_dir}") # If not recomputing, try to load existing results if not recompute: result = self._try_load_cached_binding_preference(cond, analysis_dir) if result is not None: condition_results[cond.label] = result if surface_threshold is None: surface_threshold = result.surface_exposure_threshold logger.debug(f" Loaded cached binding preference for {cond.label}") continue else: logger.debug(f" No cached binding preference for {cond.label}") # If compute is enabled, compute binding preference if compute_enabled: logger.info(f" Computing binding preference for {cond.label}...") computed = self._compute_condition_binding_preference( cond, sim_config, analysis_dir, cond_data_map.get(cond.label) ) if computed is not None: condition_results[cond.label] = computed if surface_threshold is None: surface_threshold = computed.surface_exposure_threshold logger.info(f" Computed binding preference for {cond.label}") continue else: logger.warning(f" Failed to compute binding preference for {cond.label}") logger.debug(f"No binding preference data for {cond.label}") except Exception as e: logger.warning(f"Could not load/compute binding preference for {cond.label}: {e}") continue if not condition_results: if compute_enabled: logger.warning( "compute_binding_preference is enabled but no results could be " "loaded or computed for any condition" ) else: logger.info( "No binding preference results found (compute_binding_preference=False)" ) return None if len(condition_results) < len(conditions): missing = [c.label for c in conditions if c.label not in condition_results] logger.warning(f"Binding preference missing for conditions: {missing}") # Build comparison summary return self._build_binding_preference_summary(condition_results, surface_threshold) def _try_load_cached_binding_preference( self, cond: "ConditionConfig", analysis_dir: Path, ) -> "AggregatedBindingPreferenceResult | BindingPreferenceResult | None": """Try to load cached binding preference results for a condition. Searches for binding preference files in order of preference: 1. binding_preference_aggregated.json 2. binding_preference_aggregated_reps*.json (glob pattern) 3. binding_preference.json (single replicate) 4. Per-replicate files (binding_preference_rep{N}.json) Parameters ---------- cond : ConditionConfig Condition to load. analysis_dir : Path Analysis directory for this condition. Returns ------- AggregatedBindingPreferenceResult | BindingPreferenceResult | None Loaded result, or None if not found. """ from polyzymd.compare.comparators._binding_preference_helpers import ( try_load_cached_binding_preference, ) return try_load_cached_binding_preference(cond, analysis_dir) def _compute_condition_binding_preference( self, cond: "ConditionConfig", sim_config: Any, analysis_dir: Path, cond_data: ContactsConditionData | None, ) -> "AggregatedBindingPreferenceResult | None": """Compute binding preference for a condition from contacts data. Delegates to the shared helper :func:`~polyzymd.compare.comparators._binding_preference_helpers.compute_condition_binding_preference`. Parameters ---------- cond : ConditionConfig Condition to compute. sim_config : SimulationConfig Simulation configuration. analysis_dir : Path Analysis directory for saving results. cond_data : ContactsConditionData or None Pre-loaded contacts data (unused — contacts are loaded from disk). Returns ------- AggregatedBindingPreferenceResult or None Computed and aggregated result, or None on failure. """ from polyzymd.compare.comparators._binding_preference_helpers import ( compute_condition_binding_preference, resolve_enzyme_pdb, ) # Get settings threshold = getattr(self.analysis_settings, "surface_exposure_threshold", 0.2) include_defaults = getattr(self.analysis_settings, "include_default_aa_groups", True) custom_groups = getattr(self.analysis_settings, "protein_groups", None) enzyme_pdb_setting = getattr(self.analysis_settings, "enzyme_pdb_for_sasa", None) polymer_type_selections = getattr(self.analysis_settings, "polymer_type_selections", None) protein_partitions = getattr(self.analysis_settings, "protein_partitions", None) # Resolve enzyme PDB path enzyme_pdb = resolve_enzyme_pdb( enzyme_pdb_setting=enzyme_pdb_setting, source_path=self.config.source_path, sim_config=sim_config, ) if enzyme_pdb is None or not enzyme_pdb.exists(): logger.warning( f"Cannot compute binding preference for {cond.label}: " f"enzyme PDB not found. Set enzyme_pdb_for_sasa in settings." ) return None return compute_condition_binding_preference( cond=cond, sim_config=sim_config, analysis_dir=analysis_dir, enzyme_pdb=enzyme_pdb, threshold=threshold, include_default_aa_groups=include_defaults, custom_protein_groups=custom_groups, protein_partitions=protein_partitions, polymer_type_selections=polymer_type_selections, ) def _find_enzyme_pdb(self, sim_config: Any) -> Path | None: """Find enzyme PDB file from simulation config. Delegates to the shared helper :func:`~polyzymd.compare.comparators._binding_preference_helpers.find_enzyme_pdb`. Parameters ---------- sim_config : SimulationConfig Simulation configuration. Returns ------- Path or None Path to enzyme PDB, or None if not found. """ from polyzymd.compare.comparators._binding_preference_helpers import find_enzyme_pdb return find_enzyme_pdb(sim_config) def _build_binding_preference_summary( self, condition_results: dict[str, Any], surface_threshold: float | None, ) -> BindingPreferenceComparisonSummary: """Build binding preference comparison summary from per-condition results. Extracts data from `binding_preference.aa_class_binding` which contains per-polymer enrichments for AA class partitions (aromatic, polar, etc.). contact_share sums to 1.0 within each partition. Parameters ---------- condition_results : dict Mapping of condition_label to binding preference result (either BindingPreferenceResult or AggregatedBindingPreferenceResult) surface_threshold : float or None SASA threshold used for surface filtering Returns ------- BindingPreferenceComparisonSummary Cross-condition comparison summary """ from polyzymd.analysis.contacts.binding_preference import ( AggregatedBindingPreferenceResult, AggregatedPartitionBindingResult, BindingPreferenceResult, PartitionBindingResult, ) # Collect all polymer types and AA classes (protein groups) across conditions all_polymer_types: set[str] = set() all_aa_classes: set[str] = set() for result in condition_results.values(): bp = None if isinstance(result, AggregatedBindingPreferenceResult): bp = result.binding_preference elif isinstance(result, BindingPreferenceResult): bp = result.binding_preference if bp is not None: all_polymer_types.update(bp.polymer_types) all_aa_classes.update(bp.aa_class_names()) polymer_types = sorted(all_polymer_types) # Use canonical AA class order canonical_order = ["aromatic", "polar", "nonpolar", "charged_positive", "charged_negative"] protein_groups = [aa for aa in canonical_order if aa in all_aa_classes] condition_labels = sorted(condition_results.keys()) # Build comparison entries for each (polymer_type, aa_class) pair entries = [] for poly_type in polymer_types: for aa_class in protein_groups: condition_values: dict[str, tuple[float, float]] = {} enrichments_for_ranking: list[tuple[str, float]] = [] per_replicate_data: dict[str, list[float]] = {} for cond_label, result in condition_results.items(): bp = None if isinstance(result, AggregatedBindingPreferenceResult): bp = result.binding_preference elif isinstance(result, BindingPreferenceResult): bp = result.binding_preference if bp is None: continue # Get AA class binding for this polymer type aa_binding = bp.aa_class_binding.get(poly_type) if aa_binding is None: continue # Get entry for this AA class if isinstance(aa_binding, AggregatedPartitionBindingResult): # Aggregated result - find entry by element name entry = None for e in aa_binding.entries: if e.partition_element == aa_class: entry = e break if entry is not None: mean_val = entry.mean_enrichment sem_val = entry.sem_enrichment if mean_val is not None: condition_values[cond_label] = (mean_val, sem_val or 0.0) enrichments_for_ranking.append((cond_label, mean_val)) if entry.per_replicate_enrichments: per_replicate_data[cond_label] = entry.per_replicate_enrichments elif isinstance(aa_binding, PartitionBindingResult): # Single replicate result entry = aa_binding.get_entry(aa_class) if entry is not None and entry.enrichment is not None: condition_values[cond_label] = (entry.enrichment, 0.0) enrichments_for_ranking.append((cond_label, entry.enrichment)) # Skip if no data for this pair if not condition_values: continue # Determine highest and lowest enrichment conditions highest_cond = None lowest_cond = None if enrichments_for_ranking: sorted_by_enrichment = sorted( enrichments_for_ranking, key=lambda x: x[1], reverse=True ) highest_cond = sorted_by_enrichment[0][0] lowest_cond = sorted_by_enrichment[-1][0] # Compute pairwise p-values using t-tests on per-replicate enrichments pairwise_p_values = self._compute_binding_pref_pairwise_pvalues(per_replicate_data) entries.append( BindingPreferenceComparisonEntry( polymer_type=poly_type, protein_group=aa_class, condition_values=condition_values, pairwise_p_values=pairwise_p_values, highest_enrichment_condition=highest_cond, lowest_enrichment_condition=lowest_cond, ) ) return BindingPreferenceComparisonSummary( entries=entries, polymer_types=polymer_types, protein_groups=protein_groups, n_conditions=len(condition_results), condition_labels=condition_labels, surface_exposure_threshold=surface_threshold, ) def _compute_binding_pref_pairwise_pvalues( self, per_replicate_data: dict[str, list[float]], ) -> dict[str, float]: """Compute pairwise t-test p-values from per-replicate enrichment data. Parameters ---------- per_replicate_data : dict[str, list[float]] Mapping of condition_label to list of enrichment values Returns ------- dict[str, float] Mapping of "condA_vs_condB" to p-value """ # Need at least 2 conditions with per-replicate data if len(per_replicate_data) < 2: return {} # Compute pairwise t-tests pairwise_p_values: dict[str, float] = {} cond_labels = sorted(per_replicate_data.keys()) for i, cond_a in enumerate(cond_labels): for cond_b in cond_labels[i + 1 :]: values_a = per_replicate_data[cond_a] values_b = per_replicate_data[cond_b] # Need at least 2 values in each group for t-test if len(values_a) < 2 or len(values_b) < 2: continue try: ttest_result = independent_ttest(values_a, values_b) key = f"{cond_a}_vs_{cond_b}" pairwise_p_values[key] = ttest_result.p_value except Exception as e: logger.warning(f"T-test failed for {cond_a} vs {cond_b}: {e}") return pairwise_p_values