Source code for polyzymd.compare.comparators.polymer_affinity

"""Polymer affinity score comparator.

This module implements PolymerAffinityScoreComparator, which quantifies the
total strength of polymer-protein interactions by summing per-contact free
energy contributions weighted by the number of simultaneous contacts.

**Physics**

For each (polymer_type, protein_group) pair:

::

    S_{p,g} = N_{p,g} × ΔG_sel(p,g)

where ``N_{p,g} = mean_contact_fraction × n_exposed_in_group`` and
``ΔG_sel(p,g) = -ln(contact_share / expected_share)`` (in kT).

Because contact_share / expected_share = enrichment + 1:

    ΔG_sel,rep = -ln(enrichment_rep + 1)

The total affinity score for a polymer type is:

    S_p = Σ_g S_{p,g}

The total affinity score for a condition is:

    S = Σ_p S_p

**Independence assumption**

This formulation assumes contacts are thermodynamically independent — each
contact contributes the same free energy regardless of what other contacts
exist simultaneously.  The absolute values are NOT rigorous binding free
energies.  Only the *relative differences* between polymer compositions are
meaningful as a comparative scoring metric.

**Sign convention**

| ``S < 0`` →  net favorable polymer-protein interaction
| ``S > 0`` →  net unfavorable (avoidance dominates)
| ``S = 0`` →  contacts match the surface-availability reference

**Temperature handling**

All scores are in kT (dimensionless); the temperature factor cancels in the
Boltzmann inversion ratio.  Pairwise statistics are suppressed between
conditions at different simulation temperatures because N changes.

**Design**

- Consumes cached binding preference files produced by the contacts analysis
  layer.  When cached data is missing, computes binding preference on-demand
  from per-replicate ``contacts_rep{N}.json`` files.
- Inherits ``BaseComparator`` but overrides ``compare()`` (like the BFE
  comparator) because the result type does not conform to
  ``BaseComparisonResult``.
- Uses ``AggregatedBindingPreferenceEntry`` objects (from ``bp_result.entries``)
  for the group-level data that includes ``mean_contact_fraction``.
"""

from __future__ import annotations

import logging
import math
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar

import numpy as np

from polyzymd import __version__
from polyzymd.analysis.core.metric_type import MetricType
from polyzymd.compare.core.base import BaseComparator
from polyzymd.compare.core.registry import ComparatorRegistry
from polyzymd.compare.results.polymer_affinity import (
    AffinityScoreConditionSummary,
    AffinityScoreEntry,
    AffinityScorePairwiseEntry,
    PolymerAffinityScoreResult,
    PolymerTypeScore,
)
from polyzymd.compare.settings import (
    PolymerAffinityScoreComparisonSettings,
    PolymerAffinityScoreSettings,
)
from polyzymd.compare.statistics import independent_ttest

if TYPE_CHECKING:
    from polyzymd.analysis.contacts.binding_preference import (
        AggregatedBindingPreferenceEntry,
        AggregatedBindingPreferenceResult,
        BindingPreferenceResult,
    )
    from polyzymd.compare.config import ComparisonConfig, ConditionConfig

logger = logging.getLogger("polyzymd.compare")

# Type alias for condition data dict
PAConditionData = dict[str, Any]


[docs] @ComparatorRegistry.register("polymer_affinity") class PolymerAffinityScoreComparator( BaseComparator[ PolymerAffinityScoreSettings, PAConditionData, AffinityScoreConditionSummary, PolymerAffinityScoreResult, ] ): """Compare polymer affinity scores across simulation conditions. Computes a composite interaction score for each (polymer_type, protein_group) pair by multiplying the mean number of simultaneous contacts by the per-contact selectivity free energy: S = N × ΔG_sel [kT] The total score is summed across all polymer types and protein groups. More negative = stronger net polymer-protein affinity. Statistical comparisons use per-replicate total scores and are only computed between conditions at the same simulation temperature. Parameters ---------- config : ComparisonConfig Comparison configuration. analysis_settings : PolymerAffinityScoreSettings Surface-exposure threshold, protein groups, etc. comparison_settings : PolymerAffinityScoreComparisonSettings, optional FDR alpha. Defaults to ``PolymerAffinityScoreComparisonSettings()``. equilibration : str, optional Equilibration time override. Notes ----- This is a MEAN_BASED metric (contact fractions are averages over frames, not fluctuation-based quantities). """ comparison_type: ClassVar[str] = "polymer_affinity"
[docs] def __init__( self, config: "ComparisonConfig", analysis_settings: PolymerAffinityScoreSettings, comparison_settings: PolymerAffinityScoreComparisonSettings | None = None, equilibration: str | None = None, ): super().__init__(config, analysis_settings, equilibration) self.comparison_settings = comparison_settings or PolymerAffinityScoreComparisonSettings()
[docs] @classmethod def comparison_type_name(cls) -> str: """Return the comparison type identifier.""" return "polymer_affinity"
@property def metric_type(self) -> MetricType: """Contact fractions and shares are mean-based metrics. Returns ------- MetricType MetricType.MEAN_BASED """ return MetricType.MEAN_BASED # ========================================================================= # Main entry point — override compare() for custom result type # =========================================================================
[docs] def compare(self, recompute: bool = False) -> PolymerAffinityScoreResult: # type: ignore[override] """Run polymer affinity score comparison across all conditions. Parameters ---------- recompute : bool, optional Ignored (affinity scores are always recomputed from cached binding preference data; the computation is fast and stateless). Returns ------- PolymerAffinityScoreResult Complete polymer affinity score comparison result. """ logger.info(f"Starting polymer affinity score comparison: {self.config.name}") logger.info(f"Conditions: {len(self.config.conditions)}") # Step 1: Load data and build summaries for each condition condition_summaries: list[AffinityScoreConditionSummary] = [] for cond in self.config.conditions: data = self._load_or_compute(cond, recompute) summary = self._build_condition_summary(cond, data) condition_summaries.append(summary) if not condition_summaries: raise ValueError("No binding preference data found for any condition.") # Step 2: Collect metadata all_polymer_types: list[str] = sorted( {e.polymer_type for s in condition_summaries for e in s.entries} ) all_protein_groups: list[str] = sorted( {e.protein_group for s in condition_summaries for e in s.entries} ) # Step 3: Temperature groups temp_groups: dict[float, list[str]] = {} for s in condition_summaries: temp_groups.setdefault(s.temperature_K, []).append(s.label) mixed_temperatures = len(temp_groups) > 1 if mixed_temperatures: logger.warning( f"Conditions span {len(temp_groups)} temperatures: " + ", ".join(f"{t}K ({len(labels)} conds)" for t, labels in temp_groups.items()) + ". Cross-temperature pairwise statistics will be suppressed." ) # Step 4: Pairwise comparisons on total scores pairwise = self._compute_pairwise(condition_summaries) # Stringify temp_groups keys for JSON compatibility temp_groups_str = {str(k): v for k, v in temp_groups.items()} # Step 5: Build result surface_threshold = self.analysis_settings.surface_exposure_threshold return PolymerAffinityScoreResult( name=self.config.name, mixed_temperatures=mixed_temperatures, temperature_groups=temp_groups_str, conditions=condition_summaries, pairwise_comparisons=pairwise, polymer_types=all_polymer_types, protein_groups=all_protein_groups, surface_exposure_threshold=surface_threshold, equilibration_time=self.equilibration, created_at=datetime.now(), polyzymd_version=__version__, )
# ========================================================================= # Abstract method implementations # ========================================================================= def _load_or_compute( self, cond: "ConditionConfig", recompute: bool, ) -> PAConditionData: """Load cached binding preference data, computing it if missing. Follows the same load-or-compute contract as every other comparator: 1. Try to load cached binding preference results from disk. 2. If no cached data exists and ``compute_binding_preference`` is True (the default), compute binding preference from per-replicate ``contacts_rep{N}.json`` files via the shared helper. 3. If compute is disabled, warn and return empty data. Parameters ---------- cond : ConditionConfig Condition to load or compute. recompute : bool If True, skip cache and always recompute. Returns ------- dict Raw binding preference data and temperature. """ from polyzymd.compare.comparators._binding_preference_helpers import ( compute_condition_binding_preference, resolve_enzyme_pdb, ) from polyzymd.config.schema import SimulationConfig logger.info(f"Loading binding preference for: {cond.label}") sim_config = SimulationConfig.from_yaml(cond.config) # Get temperature temperature_K = float(sim_config.thermodynamics.temperature) # Resolve condition-specific output directory (None in standalone mode) condition_output_dir = self._resolve_condition_output_dir(cond.label, "contacts") # Find analysis directory (contacts layer) — check condition dir first analysis_dir = self._find_contacts_analysis_dir( sim_config, cond, condition_output_dir=condition_output_dir ) # Step 1: Try to load cached binding preference (unless recompute) bp_result = None if not recompute: bp_result = self._try_load_cached_binding_preference(cond, analysis_dir) if bp_result is not None: logger.info(f" Loaded binding preference for {cond.label} at {temperature_K} K") return { "bp_result": bp_result, "temperature_K": temperature_K, "cond_label": cond.label, "config_path": str(cond.config), "analysis_dir": analysis_dir, "cond": cond, } # Step 2: Compute if enabled compute_enabled = getattr(self.analysis_settings, "compute_binding_preference", True) if compute_enabled: logger.info(f" No cached data for {cond.label}, computing binding preference...") # Resolve settings, falling back to contacts settings if needed settings = self._resolve_compute_settings() enzyme_pdb = resolve_enzyme_pdb( enzyme_pdb_setting=settings["enzyme_pdb_for_sasa"], source_path=self.config.source_path, sim_config=sim_config, ) if enzyme_pdb is None or not enzyme_pdb.exists(): logger.warning( f"Cannot compute binding preference for '{cond.label}': " f"enzyme PDB not found. Set enzyme_pdb_for_sasa in " f"polymer_affinity or contacts analysis settings." ) else: bp_result = compute_condition_binding_preference( cond=cond, sim_config=sim_config, analysis_dir=analysis_dir, enzyme_pdb=enzyme_pdb, threshold=settings["surface_exposure_threshold"], include_default_aa_groups=settings["include_default_aa_groups"], custom_protein_groups=settings["protein_groups"], protein_partitions=settings["protein_partitions"], polymer_type_selections=settings["polymer_type_selections"], ) if bp_result is not None: logger.info(f" Computed binding preference for {cond.label} at {temperature_K} K") return { "bp_result": bp_result, "temperature_K": temperature_K, "cond_label": cond.label, "config_path": str(cond.config), "analysis_dir": analysis_dir, "cond": cond, } else: logger.warning( f"Failed to compute binding preference for '{cond.label}'. " f"Ensure contacts_rep{{N}}.json files exist in {analysis_dir}." ) else: logger.warning( f"No binding preference data found for '{cond.label}'. " f"Set compute_binding_preference=true or run contacts analysis " f"with binding preference enabled first." ) return { "bp_result": None, "temperature_K": temperature_K, "cond_label": cond.label, "config_path": str(cond.config), "analysis_dir": None, "cond": cond, } def _build_condition_summary( self, cond: "ConditionConfig", data: PAConditionData, ) -> AffinityScoreConditionSummary: """Build affinity score entries for all (polymer_type, protein_group) pairs. Data flow: 1. Use ``bp_result.entries`` (``AggregatedBindingPreferenceEntry`` list) which has ``mean_contact_fraction`` and ``n_exposed_in_group`` 2. Also try to load per-replicate files for per-replicate N×ΔG_sel 3. Aggregate into per-polymer-type scores and total condition score Parameters ---------- cond : ConditionConfig Condition configuration. data : dict Raw data from ``_load_or_compute``. Returns ------- AffinityScoreConditionSummary Summary with entries, polymer type scores, and total score. """ temperature_K = data["temperature_K"] bp_result = data["bp_result"] if bp_result is None: return AffinityScoreConditionSummary( label=cond.label, config_path=str(cond.config), temperature_K=temperature_K, n_replicates=0, entries=[], polymer_types=[], protein_groups=[], ) # Try to load per-replicate data for per-replicate score computation per_rep_data = self._load_per_replicate_entries(data) # Compute affinity score entries from the aggregated result entries = self._compute_affinity_entries(bp_result, temperature_K, per_rep_data) # Aggregate into per-polymer-type scores polymer_type_scores = self._aggregate_polymer_type_scores(entries) # Compute total condition score total_score = sum(pts.total_score for pts in polymer_type_scores) total_n_contacts = sum(pts.total_n_contacts for pts in polymer_type_scores) # Total per-replicate scores (sum across all polymer types) total_per_rep = self._compute_total_per_replicate_scores(polymer_type_scores) total_unc: float | None = None if len(total_per_rep) >= 2: total_unc = float(np.std(total_per_rep, ddof=1) / np.sqrt(len(total_per_rep))) polymer_types = sorted({e.polymer_type for e in entries}) protein_groups = sorted({e.protein_group for e in entries}) n_replicates = max((e.n_replicates for e in entries), default=0) return AffinityScoreConditionSummary( label=cond.label, config_path=str(cond.config), temperature_K=temperature_K, n_replicates=n_replicates, total_score=total_score, total_score_uncertainty=total_unc, total_score_per_replicate=total_per_rep, total_n_contacts=total_n_contacts, polymer_type_scores=polymer_type_scores, entries=entries, polymer_types=polymer_types, protein_groups=protein_groups, ) def _get_replicate_values(self, summary: AffinityScoreConditionSummary) -> list[float]: """Return per-replicate total scores for this condition.""" return summary.total_score_per_replicate or [summary.total_score] def _get_mean_value(self, summary: AffinityScoreConditionSummary) -> float: """Return total affinity score.""" return summary.primary_metric_value @property def _direction_labels(self) -> tuple[str, str, str]: """More negative = stronger affinity.""" return ("stronger affinity", "unchanged", "weaker affinity") def _rank_summaries( self, summaries: list[AffinityScoreConditionSummary] ) -> list[AffinityScoreConditionSummary]: """Sort by total score (most negative = strongest affinity first).""" return sorted(summaries, key=lambda s: s.primary_metric_value) # ========================================================================= # Affinity score computation # ========================================================================= def _compute_affinity_entries( self, bp_result: Any, temperature_K: float, per_rep_data: dict[int, list[Any]] | None, ) -> list[AffinityScoreEntry]: """Compute affinity score entries from binding preference data. Uses ``AggregatedBindingPreferenceEntry`` objects from ``bp_result.entries`` which have the ``mean_contact_fraction`` and ``n_exposed_in_group`` fields needed for N_contacts. Parameters ---------- bp_result : AggregatedBindingPreferenceResult | BindingPreferenceResult Binding preference data for one condition. temperature_K : float Simulation temperature in Kelvin. per_rep_data : dict or None Mapping of replicate number to list of per-replicate ``BindingPreferenceEntry`` objects, if loaded. Returns ------- list[AffinityScoreEntry] One entry per (polymer_type, protein_group) pair. """ from polyzymd.analysis.contacts.binding_preference import ( AggregatedBindingPreferenceResult, ) if isinstance(bp_result, AggregatedBindingPreferenceResult): return self._entries_from_aggregated(bp_result, temperature_K, per_rep_data) else: return self._entries_from_single(bp_result, temperature_K) def _entries_from_aggregated( self, result: "AggregatedBindingPreferenceResult", temperature_K: float, per_rep_data: dict[int, list[Any]] | None, ) -> list[AffinityScoreEntry]: """Compute affinity entries from aggregated binding preference. Uses the group-level ``entries`` list which has ``mean_contact_fraction`` and ``n_exposed_in_group``. For per-replicate scores: - If per-replicate files were loaded, compute exact N_rep × ΔG_sel,rep - Otherwise, use mean N with per_replicate_enrichments for ΔG_sel,rep Parameters ---------- result : AggregatedBindingPreferenceResult Multi-replicate aggregated result. temperature_K : float Simulation temperature. per_rep_data : dict or None Per-replicate BindingPreferenceEntry objects by replicate number. Returns ------- list[AffinityScoreEntry] Affinity score entries. """ entries: list[AffinityScoreEntry] = [] for agg_entry in result.entries: entry = self._entry_from_agg_bp_entry(agg_entry, temperature_K, per_rep_data) if entry is not None: entries.append(entry) return entries def _entry_from_agg_bp_entry( self, agg_entry: "AggregatedBindingPreferenceEntry", temperature_K: float, per_rep_data: dict[int, list[Any]] | None, ) -> AffinityScoreEntry | None: """Convert one AggregatedBindingPreferenceEntry to an AffinityScoreEntry. Parameters ---------- agg_entry : AggregatedBindingPreferenceEntry Aggregated binding preference entry for one (polymer_type, protein_group) pair. temperature_K : float Simulation temperature. per_rep_data : dict or None Per-replicate data if available. Returns ------- AffinityScoreEntry or None Affinity score entry, or None if data is insufficient. """ mcf = agg_entry.mean_contact_fraction n_exposed = agg_entry.n_exposed_in_group cs = agg_entry.mean_contact_share es = agg_entry.expected_share polymer_type = agg_entry.polymer_type protein_group = agg_entry.protein_group # N_contacts = mean_contact_fraction × n_exposed_in_group n_contacts = mcf * n_exposed # ΔG_sel per contact = -ln(contact_share / expected_share) [kT] delta_g: float | None = None if cs > 0 and es > 0: delta_g = -math.log(cs / es) # Affinity score = N × ΔG_sel score: float | None = None if delta_g is not None: score = n_contacts * delta_g # Per-replicate scores score_per_rep: list[float] = [] if per_rep_data is not None: # Use exact per-replicate N_rep × ΔG_sel,rep score_per_rep = self._per_rep_scores_from_files( per_rep_data, polymer_type, protein_group ) elif agg_entry.per_replicate_enrichments: # Approximate: use mean N with per-replicate ΔG_sel for enrichment_rep in agg_entry.per_replicate_enrichments: ratio_rep = enrichment_rep + 1.0 if ratio_rep > 0: dg_rep = -math.log(ratio_rep) score_per_rep.append(n_contacts * dg_rep) else: score_per_rep.append(float("nan")) # Clean NaN values valid_reps = [v for v in score_per_rep if not math.isnan(v)] # Uncertainty score_unc: float | None = None if len(valid_reps) >= 2: score_unc = float(np.std(valid_reps, ddof=1) / np.sqrt(len(valid_reps))) elif score is not None and agg_entry.sem_contact_fraction is not None: # Analytical error propagation: σ(S) = √[(N·σ_ΔG_sel)² + (ΔG_sel·σ_N)²] # σ_N = sem_contact_fraction × n_exposed # σ_ΔG_sel = sem_contact_share / contact_share (relative error on ratio) sem_cs = getattr(agg_entry, "sem_contact_share", None) if delta_g is not None and sem_cs and cs > 0: sigma_n = agg_entry.sem_contact_fraction * n_exposed sigma_dg = sem_cs / cs # σ(ln(ratio)) ≈ σ_cs/cs score_unc = math.sqrt((n_contacts * sigma_dg) ** 2 + (delta_g * sigma_n) ** 2) return AffinityScoreEntry( polymer_type=polymer_type, protein_group=protein_group, partition_name="aa_class", n_contacts=n_contacts, delta_G_per_contact=delta_g, affinity_score=score, affinity_score_uncertainty=score_unc, affinity_score_per_replicate=valid_reps, mean_contact_fraction=mcf, n_exposed_in_group=n_exposed, contact_share=cs, expected_share=es, temperature_K=temperature_K, n_replicates=len(valid_reps) if valid_reps else agg_entry.n_replicates, ) def _entries_from_single( self, result: "BindingPreferenceResult", temperature_K: float, ) -> list[AffinityScoreEntry]: """Compute affinity entries from a single-replicate result. Parameters ---------- result : BindingPreferenceResult Single-replicate binding preference result. temperature_K : float Simulation temperature. Returns ------- list[AffinityScoreEntry] Affinity score entries. """ entries: list[AffinityScoreEntry] = [] for bp_entry in result.entries: mcf = bp_entry.mean_contact_fraction n_exposed = bp_entry.n_exposed_in_group cs = bp_entry.contact_share es = bp_entry.expected_share n_contacts = mcf * n_exposed delta_g: float | None = None if cs > 0 and es > 0: delta_g = -math.log(cs / es) score: float | None = None if delta_g is not None: score = n_contacts * delta_g entries.append( AffinityScoreEntry( polymer_type=bp_entry.polymer_type, protein_group=bp_entry.protein_group, partition_name="aa_class", n_contacts=n_contacts, delta_G_per_contact=delta_g, affinity_score=score, affinity_score_uncertainty=None, affinity_score_per_replicate=[score] if score is not None else [], mean_contact_fraction=mcf, n_exposed_in_group=n_exposed, contact_share=cs, expected_share=es, temperature_K=temperature_K, n_replicates=1, ) ) return entries # ========================================================================= # Per-replicate data loading # ========================================================================= def _load_per_replicate_entries( self, data: PAConditionData, ) -> dict[int, list[Any]] | None: """Load per-replicate BindingPreferenceEntry objects. Attempts to load ``binding_preference_rep{N}.json`` files from the analysis directory. Returns None if files are not available. Parameters ---------- data : dict Condition data from ``_load_or_compute``. Returns ------- dict[int, list] or None Mapping of replicate number to list of ``BindingPreferenceEntry`` objects, or None if files unavailable. """ from polyzymd.analysis.contacts.binding_preference import ( BindingPreferenceResult, ) analysis_dir = data.get("analysis_dir") cond = data.get("cond") if analysis_dir is None or cond is None: return None analysis_dir = Path(analysis_dir) per_rep: dict[int, list[Any]] = {} for rep in cond.replicates: rep_path = analysis_dir / f"binding_preference_rep{rep}.json" if rep_path.exists(): try: rep_result = BindingPreferenceResult.load(rep_path) per_rep[rep] = rep_result.entries except Exception as exc: logger.debug(f"Failed to load per-replicate BP for rep{rep}: {exc}") return per_rep if per_rep else None def _per_rep_scores_from_files( self, per_rep_data: dict[int, list[Any]], polymer_type: str, protein_group: str, ) -> list[float]: """Compute per-replicate N×ΔG_sel from loaded per-replicate entries. Parameters ---------- per_rep_data : dict Replicate number → list of BindingPreferenceEntry. polymer_type : str Polymer type to match. protein_group : str Protein group to match. Returns ------- list[float] Per-replicate affinity scores. """ scores: list[float] = [] for _rep_num, rep_entries in sorted(per_rep_data.items()): matched = False for entry in rep_entries: if entry.polymer_type == polymer_type and entry.protein_group == protein_group: mcf = entry.mean_contact_fraction n_exposed = entry.n_exposed_in_group n_rep = mcf * n_exposed cs = entry.contact_share es = entry.expected_share if cs > 0 and es > 0: dg_rep = -math.log(cs / es) scores.append(n_rep * dg_rep) else: scores.append(0.0) matched = True break if not matched: # This (polymer_type, group) pair doesn't exist in this replicate scores.append(0.0) return scores # ========================================================================= # Aggregation helpers # ========================================================================= def _aggregate_polymer_type_scores( self, entries: list[AffinityScoreEntry], ) -> list[PolymerTypeScore]: """Group entries by polymer type and compute per-type totals. Parameters ---------- entries : list[AffinityScoreEntry] All (polymer_type, protein_group) entries for one condition. Returns ------- list[PolymerTypeScore] One score per polymer type, sorted by total score ascending. """ # Group entries by polymer type by_polymer: dict[str, list[AffinityScoreEntry]] = {} for e in entries: by_polymer.setdefault(e.polymer_type, []).append(e) scores: list[PolymerTypeScore] = [] for polymer_type, group_entries in sorted(by_polymer.items()): total = sum(e.affinity_score for e in group_entries if e.affinity_score is not None) total_n = sum(e.n_contacts for e in group_entries) # Per-replicate total scores for this polymer type per_rep_totals = self._sum_per_replicate_across_groups(group_entries) total_unc: float | None = None if len(per_rep_totals) >= 2: total_unc = float(np.std(per_rep_totals, ddof=1) / np.sqrt(len(per_rep_totals))) scores.append( PolymerTypeScore( polymer_type=polymer_type, total_score=total, total_score_uncertainty=total_unc, total_score_per_replicate=per_rep_totals, total_n_contacts=total_n, group_contributions=group_entries, ) ) return sorted(scores, key=lambda s: s.total_score) def _sum_per_replicate_across_groups( self, entries: list[AffinityScoreEntry], ) -> list[float]: """Sum per-replicate scores across protein groups for one polymer type. Parameters ---------- entries : list[AffinityScoreEntry] Entries for a single polymer type. Returns ------- list[float] Per-replicate total scores (sum across groups). Empty if any entry lacks per-replicate data. """ if not entries: return [] # Check all entries have the same number of replicates rep_counts = [len(e.affinity_score_per_replicate) for e in entries] if not rep_counts or min(rep_counts) == 0: return [] n_reps = min(rep_counts) totals: list[float] = [] for rep_idx in range(n_reps): rep_total = sum( e.affinity_score_per_replicate[rep_idx] for e in entries if rep_idx < len(e.affinity_score_per_replicate) ) totals.append(rep_total) return totals def _compute_total_per_replicate_scores( self, polymer_type_scores: list[PolymerTypeScore], ) -> list[float]: """Sum per-replicate scores across all polymer types. Parameters ---------- polymer_type_scores : list[PolymerTypeScore] Per-polymer-type scores. Returns ------- list[float] Per-replicate grand total scores. """ if not polymer_type_scores: return [] rep_counts = [len(pts.total_score_per_replicate) for pts in polymer_type_scores] if not rep_counts or min(rep_counts) == 0: return [] n_reps = min(rep_counts) totals: list[float] = [] for rep_idx in range(n_reps): rep_total = sum( pts.total_score_per_replicate[rep_idx] for pts in polymer_type_scores if rep_idx < len(pts.total_score_per_replicate) ) totals.append(rep_total) return totals # ========================================================================= # Pairwise comparison # ========================================================================= def _compute_pairwise( self, summaries: list[AffinityScoreConditionSummary], ) -> list[AffinityScorePairwiseEntry]: """Compute pairwise total score comparisons with temperature grouping. Unlike the BFE comparator which compares per-(polymer, group) pairs, this comparator compares total scores between conditions. This is the natural unit for the polymer affinity score since it sums across all interactions. Parameters ---------- summaries : list[AffinityScoreConditionSummary] Condition summaries. Returns ------- list[AffinityScorePairwiseEntry] All pairwise entries (cross-temperature ones have stats suppressed). """ control = self.config.control comparisons: list[AffinityScorePairwiseEntry] = [] label_to_summary = {s.label: s for s in summaries} labels = [s.label for s in summaries] # Determine whether the control has usable data control_has_data = ( control is not None and control in label_to_summary and len(label_to_summary[control].entries) > 0 ) if control_has_data: summary_a = label_to_summary[control] # type: ignore[index] for label_b in labels: if label_b == control: continue summary_b = label_to_summary[label_b] if not summary_b.entries: continue pw = self._compare_total_scores(summary_a, summary_b) comparisons.append(pw) else: # Fall back to all-pairs among conditions with data if control is not None and control in label_to_summary: logger.info( f"Control '{control}' has no affinity data (e.g. no polymer " "contacts). Falling back to all-pairs comparison." ) valid_labels = [lbl for lbl in labels if label_to_summary[lbl].entries] for i in range(len(valid_labels)): for j in range(i + 1, len(valid_labels)): sa = label_to_summary[valid_labels[i]] sb = label_to_summary[valid_labels[j]] comparisons.append(self._compare_total_scores(sa, sb)) return comparisons def _compare_total_scores( self, summary_a: AffinityScoreConditionSummary, summary_b: AffinityScoreConditionSummary, ) -> AffinityScorePairwiseEntry: """Compare total affinity scores between two conditions. Parameters ---------- summary_a : AffinityScoreConditionSummary First condition (typically control/reference). summary_b : AffinityScoreConditionSummary Second condition. Returns ------- AffinityScorePairwiseEntry Pairwise comparison entry. """ cross_temperature = not math.isclose( summary_a.temperature_K, summary_b.temperature_K, rel_tol=1e-3 ) score_a = summary_a.total_score score_b = summary_b.total_score delta = score_b - score_a # Stats: only for same-temperature conditions with enough replicates t_stat: float | None = None p_val: float | None = None if not cross_temperature: reps_a = [v for v in summary_a.total_score_per_replicate if not math.isnan(v)] reps_b = [v for v in summary_b.total_score_per_replicate if not math.isnan(v)] if len(reps_a) >= 2 and len(reps_b) >= 2: try: ttest = independent_ttest(reps_a, reps_b) t_stat = ttest.t_statistic p_val = ttest.p_value except Exception as exc: logger.debug(f"T-test failed for {summary_a.label} vs {summary_b.label}: {exc}") return AffinityScorePairwiseEntry( condition_a=summary_a.label, condition_b=summary_b.label, temperature_a_K=summary_a.temperature_K, temperature_b_K=summary_b.temperature_K, cross_temperature=cross_temperature, score_a=score_a, score_b=score_b, delta_score=delta, t_statistic=t_stat, p_value=p_val, ) # ========================================================================= # Settings resolution (affinity settings with fallback to contacts) # ========================================================================= def _resolve_compute_settings(self) -> dict[str, Any]: """Resolve compute settings, falling back to contacts analysis settings. Affinity settings may omit fields like ``enzyme_pdb_for_sasa`` that are typically configured in the contacts analysis section. This method returns a unified dict, preferring affinity settings and falling back to contacts settings from the same comparison.yaml. Returns ------- dict Resolved settings for ``compute_condition_binding_preference()``. """ pa = self.analysis_settings # Try to get contacts settings for fallback contacts_settings = None if hasattr(self.config, "analysis_settings"): contacts_settings = self.config.analysis_settings.get("contacts") def _get(attr: str, default: Any = None) -> Any: """Get from affinity settings first, then contacts, then default.""" val = getattr(pa, attr, None) if val is not None: return val if contacts_settings is not None: val = getattr(contacts_settings, attr, None) if val is not None: return val return default return { "enzyme_pdb_for_sasa": _get("enzyme_pdb_for_sasa"), "surface_exposure_threshold": _get("surface_exposure_threshold", 0.2), "include_default_aa_groups": _get("include_default_aa_groups", True), "protein_groups": _get("protein_groups"), "protein_partitions": _get("protein_partitions"), "polymer_type_selections": _get("polymer_type_selections"), } # ========================================================================= # Cache loading helpers (mirrors BFE comparator pattern) # ========================================================================= def _find_contacts_analysis_dir( self, sim_config: Any, cond: "ConditionConfig", condition_output_dir: Path | None = None, ) -> Path: """Find the contacts analysis directory for a condition. Checks locations in order: 1. condition_output_dir (condition-specific, comparison mode) 2. sim_config.output.projects_directory / analysis / contacts / 3. cond.config parent directory / analysis / contacts / Parameters ---------- sim_config : SimulationConfig Simulation configuration. cond : ConditionConfig Condition configuration. condition_output_dir : Path, optional Condition-specific contacts output directory (from comparison mode). Checked first before falling back. Returns ------- Path Analysis directory path. """ # Check condition-specific path first (comparison mode) if condition_output_dir is not None: if condition_output_dir.exists(): return condition_output_dir # In comparison mode, do NOT fall back to the shared # projects_directory — all conditions share the same path. return condition_output_dir from polyzymd.compare.comparators._utils import find_analysis_dir return find_analysis_dir( sim_config, analysis_subdir="analysis/contacts", cond_config_path=Path(cond.config), ) def _try_load_cached_binding_preference( self, cond: "ConditionConfig", analysis_dir: Path, ) -> "AggregatedBindingPreferenceResult | BindingPreferenceResult | None": """Load cached binding preference results for a condition. Delegates to the shared helper in ``_binding_preference_helpers``. Parameters ---------- cond : ConditionConfig Condition configuration. analysis_dir : Path Contacts analysis directory. Returns ------- AggregatedBindingPreferenceResult | BindingPreferenceResult | None Loaded result, or None if not found. """ from polyzymd.compare.comparators._binding_preference_helpers import ( try_load_cached_binding_preference, ) return try_load_cached_binding_preference(cond, analysis_dir)