"""Filesystem artifact store for MDAnalysis extension-layer outputs."""
from __future__ import annotations
import hashlib
from pathlib import Path, PurePosixPath
from typing import Any
from pydantic import ValidationError
from polyzymd.analyses.mda.artifacts import (
ArtifactManifest,
ArtifactSidecarRef,
ComparisonArtifact,
ConditionArtifact,
ReplicateArtifact,
validate_artifact_relative_path,
)
from polyzymd.analyses.mda.base import MDAnalysisExtensionError
_HASH_CHUNK_SIZE = 1024 * 1024
[docs]
class ArtifactStoreError(MDAnalysisExtensionError):
"""Error raised when artifact-store path or validation checks fail."""
[docs]
class ArtifactStore:
"""Store JSON artifacts and relative sidecars under one root directory."""
[docs]
def __init__(self, root: str | Path) -> None:
"""Create an artifact store rooted at ``root``.
Parameters
----------
root : str or Path
Directory that owns artifact JSON files and sidecars.
"""
self.root = Path(root).expanduser().resolve()
[docs]
def write_replicate_result(
self,
artifact: ReplicateArtifact,
path: str | Path = "result.json",
) -> Path:
"""Write a replicate artifact JSON file.
Parameters
----------
artifact : ReplicateArtifact
Replicate artifact envelope to serialize.
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
Path
Absolute path to the written JSON file.
"""
return self._write_json_model(artifact, path)
[docs]
def read_replicate_result(self, path: str | Path = "result.json") -> ReplicateArtifact:
"""Read a replicate artifact JSON file.
Parameters
----------
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
ReplicateArtifact
Deserialized replicate artifact.
"""
resolved_path = self._resolve_relative_path(path)
try:
return ReplicateArtifact.model_validate_json(resolved_path.read_text())
except OSError as exc:
raise ArtifactStoreError(
f"Failed to read replicate artifact {resolved_path}: {exc}"
) from exc
except ValidationError as exc:
raise ArtifactStoreError(
f"Failed to validate replicate artifact {resolved_path}: {exc}"
) from exc
[docs]
def write_condition_result(
self,
artifact: ConditionArtifact,
path: str | Path = "result.json",
) -> Path:
"""Write a condition artifact JSON file.
Parameters
----------
artifact : ConditionArtifact
Condition artifact envelope to serialize.
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
Path
Absolute path to the written JSON file.
"""
return self._write_json_model(artifact, path)
[docs]
def read_condition_result(self, path: str | Path = "result.json") -> ConditionArtifact:
"""Read a condition artifact JSON file.
Parameters
----------
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
ConditionArtifact
Deserialized condition artifact.
"""
resolved_path = self._resolve_relative_path(path)
try:
return ConditionArtifact.model_validate_json(resolved_path.read_text())
except (OSError, ValidationError) as exc:
raise ArtifactStoreError(
f"Failed to validate condition artifact {resolved_path}: {exc}"
) from exc
[docs]
def write_comparison_result(
self,
artifact: ComparisonArtifact,
path: str | Path = "result.json",
) -> Path:
"""Write a comparison artifact JSON file.
Parameters
----------
artifact : ComparisonArtifact
Comparison artifact envelope to serialize.
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
Path
Absolute path to the written JSON file.
"""
return self._write_json_model(artifact, path)
[docs]
def read_comparison_result(self, path: str | Path = "result.json") -> ComparisonArtifact:
"""Read a comparison artifact JSON file.
Parameters
----------
path : str or Path, optional
Store-relative JSON path, by default ``"result.json"``.
Returns
-------
ComparisonArtifact
Deserialized comparison artifact.
"""
resolved_path = self._resolve_relative_path(path)
try:
return ComparisonArtifact.model_validate_json(resolved_path.read_text())
except (OSError, ValidationError) as exc:
raise ArtifactStoreError(
f"Failed to validate comparison artifact {resolved_path}: {exc}"
) from exc
[docs]
def write_manifest(
self,
manifest: ArtifactManifest,
path: str | Path = "manifest.json",
) -> Path:
"""Write an artifact manifest JSON file.
Parameters
----------
manifest : ArtifactManifest
Manifest to serialize.
path : str or Path, optional
Store-relative JSON path, by default ``"manifest.json"``.
Returns
-------
Path
Absolute path to the written manifest.
"""
return self._write_json_model(manifest, path)
[docs]
def read_manifest(self, path: str | Path = "manifest.json") -> ArtifactManifest:
"""Read an artifact manifest JSON file.
Parameters
----------
path : str or Path, optional
Store-relative JSON path, by default ``"manifest.json"``.
Returns
-------
ArtifactManifest
Deserialized manifest.
"""
resolved_path = self._resolve_relative_path(path)
try:
return ArtifactManifest.model_validate_json(resolved_path.read_text())
except OSError as exc:
raise ArtifactStoreError(
f"Failed to read artifact manifest {resolved_path}: {exc}"
) from exc
[docs]
def register_sidecar(
self,
path: str | Path,
*,
media_type: str | None = None,
metadata: dict[str, Any] | None = None,
) -> ArtifactSidecarRef:
"""Register an existing store-relative sidecar file.
Parameters
----------
path : str or Path
Store-relative sidecar path.
media_type : str or None, optional
Optional sidecar content type.
metadata : dict[str, Any] or None, optional
Sidecar metadata to copy into the reference.
Returns
-------
ArtifactSidecarRef
Relative sidecar reference with streamed SHA-256 and size metadata.
"""
resolved_path = self._resolve_relative_path(path)
if not resolved_path.is_file():
raise ArtifactStoreError(f"Sidecar does not exist: {resolved_path}")
stat = resolved_path.stat()
return ArtifactSidecarRef(
path=self._relative_ref_for(resolved_path),
sha256=self._sha256_file(resolved_path),
size_bytes=stat.st_size,
media_type=media_type,
metadata=dict(metadata or {}),
)
[docs]
def write_npz_sidecar(
self,
path: str | Path,
*,
compressed: bool = True,
media_type: str | None = "application/x-npz",
metadata: dict[str, Any] | None = None,
**arrays: Any,
) -> ArtifactSidecarRef:
"""Write NumPy arrays to an NPZ sidecar and register it.
Parameters
----------
path : str or Path
Store-relative sidecar path.
compressed : bool, optional
Whether to use ``numpy.savez_compressed``, by default True.
media_type : str or None, optional
Sidecar media type stored in the reference.
metadata : dict[str, Any] or None, optional
Additional sidecar metadata.
**arrays : Any
Named arrays forwarded to NumPy.
Returns
-------
ArtifactSidecarRef
Registered NPZ sidecar reference.
"""
import numpy as np
resolved_path = self._resolve_relative_path(path)
resolved_path.parent.mkdir(parents=True, exist_ok=True)
writer = np.savez_compressed if compressed else np.savez
try:
writer(resolved_path, **arrays)
except OSError as exc:
raise ArtifactStoreError(f"Failed to write NPZ sidecar {resolved_path}: {exc}") from exc
return self.register_sidecar(
resolved_path.relative_to(self.root), media_type=media_type, metadata=metadata
)
[docs]
def resolve_sidecar(self, ref: ArtifactSidecarRef | str | Path) -> Path:
"""Resolve a sidecar reference to an absolute path under the store root.
Parameters
----------
ref : ArtifactSidecarRef or str or Path
Sidecar reference or store-relative path.
Returns
-------
Path
Absolute sidecar path under the store root.
"""
path = ref.path if isinstance(ref, ArtifactSidecarRef) else ref
return self._resolve_relative_path(path)
[docs]
def validate_sidecar(self, ref: ArtifactSidecarRef) -> Path:
"""Validate sidecar existence, size, path containment, and SHA-256.
Parameters
----------
ref : ArtifactSidecarRef
Sidecar reference to validate.
Returns
-------
Path
Absolute sidecar path when validation passes.
"""
resolved_path = self.resolve_sidecar(ref)
if not resolved_path.is_file():
raise ArtifactStoreError(f"Missing sidecar: {resolved_path}")
size_bytes = resolved_path.stat().st_size
if size_bytes != ref.size_bytes:
raise ArtifactStoreError(
f"Sidecar size mismatch for {ref.path}: expected {ref.size_bytes}, got {size_bytes}"
)
actual_sha256 = self._sha256_file(resolved_path)
if actual_sha256 != ref.sha256:
raise ArtifactStoreError(
f"Sidecar SHA-256 mismatch for {ref.path}: expected {ref.sha256}, got {actual_sha256}"
)
return resolved_path
[docs]
def load_npz_sidecar(self, ref: ArtifactSidecarRef) -> Any:
"""Validate and open an NPZ sidecar.
Parameters
----------
ref : ArtifactSidecarRef
Sidecar reference to validate before loading.
Returns
-------
Any
Open ``numpy.load`` handle. Callers should use it as a context
manager to close the underlying file promptly.
"""
import numpy as np
resolved_path = self.validate_sidecar(ref)
try:
return np.load(resolved_path)
except (OSError, ValueError) as exc:
raise ArtifactStoreError(f"Failed to load NPZ sidecar {resolved_path}: {exc}") from exc
[docs]
def source_artifact_ref(self, path: str | Path = "result.json") -> dict[str, Any]:
"""Return a hashed reference for an artifact JSON file.
Parameters
----------
path : str or Path, optional
Store-relative artifact path, by default ``"result.json"``.
Returns
-------
dict[str, Any]
Relative path, SHA-256 digest, and size metadata for the artifact.
"""
resolved_path = self._resolve_relative_path(path)
if not resolved_path.is_file():
raise ArtifactStoreError(f"Artifact source does not exist: {resolved_path}")
stat = resolved_path.stat()
return {
"path": self._relative_ref_for(resolved_path),
"sha256": self._sha256_file(resolved_path),
"size_bytes": stat.st_size,
}
def _write_json_model(self, model: Any, path: str | Path) -> Path:
"""Write a Pydantic-like model to a store-relative JSON path.
Parameters
----------
model : Any
Object exposing ``model_dump_json``.
path : str or Path
Store-relative output path.
Returns
-------
Path
Absolute output path.
"""
resolved_path = self._resolve_relative_path(path)
resolved_path.parent.mkdir(parents=True, exist_ok=True)
try:
resolved_path.write_text(model.model_dump_json(indent=2), encoding="utf-8")
except OSError as exc:
raise ArtifactStoreError(
f"Failed to write artifact JSON {resolved_path}: {exc}"
) from exc
return resolved_path
def _resolve_relative_path(self, path: str | Path) -> Path:
"""Resolve a store-relative path and reject root escape attempts.
Parameters
----------
path : str or Path
Candidate store-relative path.
Returns
-------
Path
Absolute path under the store root.
"""
relative_path = self._normalize_relative_path(path)
resolved_path = (self.root / relative_path).resolve(strict=False)
if not resolved_path.is_relative_to(self.root):
raise ArtifactStoreError(f"Artifact path escapes store root: {path}")
return resolved_path
@staticmethod
def _normalize_relative_path(path: str | Path) -> Path:
"""Normalize a candidate relative path for filesystem use.
Parameters
----------
path : str or Path
Candidate relative path.
Returns
-------
Path
Relative filesystem path.
"""
if isinstance(path, Path) and path.is_absolute():
raise ArtifactStoreError(f"Artifact paths must be relative: {path}")
candidate = PurePosixPath(str(path).replace("\\", "/"))
try:
relative = validate_artifact_relative_path(str(candidate))
except (TypeError, ValueError) as exc:
raise ArtifactStoreError(str(exc)) from exc
return Path(relative)
def _relative_ref_for(self, path: Path) -> str:
"""Return a validated POSIX reference for an absolute store path.
Parameters
----------
path : Path
Absolute path that must live under the store root.
Returns
-------
str
Store-relative POSIX reference.
"""
resolved_path = path.resolve(strict=False)
if not resolved_path.is_relative_to(self.root):
raise ArtifactStoreError(f"Artifact path escapes store root: {path}")
relative_path = resolved_path.relative_to(self.root).as_posix()
return validate_artifact_relative_path(relative_path)
@staticmethod
def _sha256_file(path: Path) -> str:
"""Stream a file into a SHA-256 digest.
Parameters
----------
path : Path
File to hash.
Returns
-------
str
Lowercase SHA-256 hexadecimal digest.
"""
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(_HASH_CHUNK_SIZE), b""):
digest.update(chunk)
return digest.hexdigest()