"""File-backed storage for Beaver skill lifecycle artifacts.""" from __future__ import annotations from dataclasses import dataclass import json import os from pathlib import Path import shutil from typing import Any from beaver.plugins.hashing import hash_plugin_skill_tree from beaver.plugins.transaction import PluginSkillTransaction from beaver.skills.catalog.utils import parse_frontmatter from .models import SkillDraft, SkillReviewRecord, SkillSpec, SkillUpstreamSnapshot, SkillVersion from .serialization import canonical_hash, json_dumps, normalize_frontmatter, summarize_skill_content @dataclass(slots=True) class LoadedSkillVersion: version: SkillVersion content: str @dataclass(slots=True) class LoadedSkillUpstreamSnapshot: snapshot: SkillUpstreamSnapshot content: str root: Path class SkillSpecStore: """Manage structured skill lifecycle state inside the workspace.""" def __init__(self, workspace: str | Path) -> None: self.workspace = Path(workspace) self.root = self.workspace / "skills" self.index_dir = self.root / "_index" self.root.mkdir(parents=True, exist_ok=True) self.index_dir.mkdir(parents=True, exist_ok=True) def list_published_skill_names(self) -> list[str]: names: list[str] = [] for child in self._iter_skill_dirs(): if not self._has_published_representation(child): continue spec = self.get_skill_spec(child.name) if spec is not None and spec.status != "active": continue names.append(child.name) return names def list_skill_specs(self) -> list[SkillSpec]: specs: list[SkillSpec] = [] for name in self.list_skill_names(): spec = self.get_skill_spec(name) if spec is not None: specs.append(spec) return specs def list_skill_names(self) -> list[str]: return [child.name for child in self._iter_skill_dirs()] def get_skill_spec(self, name: str) -> SkillSpec | None: directory = self._skill_dir(name) path = directory / "skill.json" if path.exists(): return SkillSpec.from_dict(self._read_json(path)) if not self._has_published_representation(directory): return None legacy = self.read_published_skill(name) if legacy is None: return None return SkillSpec( name=name, display_name=name, description=str(legacy.version.frontmatter.get("description") or name), created_at=legacy.version.created_at, updated_at=legacy.version.created_at, current_version=legacy.version.version, status="active", tags=[], owners=[], source_kind="legacy", lineage=[], ) def write_skill_spec(self, spec: SkillSpec) -> None: directory = self._skill_dir(spec.name) directory.mkdir(parents=True, exist_ok=True) self._write_json(directory / "skill.json", spec.to_dict()) def get_current_version(self, name: str) -> str | None: directory = self._skill_dir(name) current_path = directory / "current.json" if current_path.exists(): return str(self._read_json(current_path).get("current_version") or "") or None if (directory / "SKILL.md").exists(): return "legacy" versions_dir = directory / "versions" if versions_dir.exists(): versions = [child.name for child in sorted(versions_dir.iterdir()) if child.is_dir()] if versions: return versions[-1] spec = self.get_skill_spec(name) if spec is not None and spec.current_version: return spec.current_version return None def set_current_version(self, name: str, version: str) -> None: directory = self._skill_dir(name) directory.mkdir(parents=True, exist_ok=True) self._write_json(directory / "current.json", {"current_version": version}) spec = self.get_skill_spec(name) if spec is not None: spec.current_version = version self.write_skill_spec(spec) def list_versions(self, name: str) -> list[str]: directory = self._skill_dir(name) / "versions" if not directory.exists(): current = self.get_current_version(name) return [current] if current else [] versions: list[str] = [] for child in sorted(directory.iterdir()): if child.is_dir(): versions.append(child.name) return versions def read_published_skill(self, name: str, version: str | None = None) -> LoadedSkillVersion | None: requested_version = version or self.get_current_version(name) if requested_version is None: return None directory = self._skill_dir(name) if requested_version == "legacy": skill_file = directory / "SKILL.md" if not skill_file.exists(): return None content = skill_file.read_text(encoding="utf-8") frontmatter, body = parse_frontmatter(content) normalized_frontmatter = normalize_frontmatter(frontmatter) tool_hints = self._extract_tool_hints(normalized_frontmatter) loaded = SkillVersion( skill_name=name, version="legacy", content_hash=canonical_hash(content), summary_hash=canonical_hash(body), created_at="legacy", created_by="legacy", change_reason="legacy_import", review_state="published", frontmatter=normalized_frontmatter, summary=summarize_skill_content(body), tool_hints=tool_hints, provenance={"source_kind": "legacy"}, ) return LoadedSkillVersion(version=loaded, content=content) version_dir = directory / "versions" / requested_version version_file = version_dir / "version.json" skill_file = version_dir / "SKILL.md" if not version_file.exists() or not skill_file.exists(): return None payload = self._read_json(version_file) loaded = SkillVersion.from_dict(payload) content = skill_file.read_text(encoding="utf-8") if not loaded.tree_hash: loaded.tree_hash = hash_plugin_skill_tree(version_dir).skill_tree_hash return LoadedSkillVersion(version=loaded, content=content) def write_skill_version(self, version: SkillVersion, content: str) -> None: version_dir = self._skill_dir(version.skill_name) / "versions" / version.version version_dir.mkdir(parents=True, exist_ok=True) self._write_text(version_dir / "SKILL.md", content) version.tree_hash = hash_plugin_skill_tree(version_dir).skill_tree_hash self._write_json(version_dir / "version.json", version.to_dict()) def stage_upstream_snapshot( self, transaction: PluginSkillTransaction, *, skill_name: str, source_kind: str, source_id: str, source_version: str, source_path: str, source_root: str | Path, ) -> SkillUpstreamSnapshot: source = Path(source_root) digest = hash_plugin_skill_tree(source) staged_root = transaction.stage_upstream_snapshot(skill_name, source_id, digest.skill_tree_hash) self._copy_regular_tree(source, staged_root) content = (staged_root / "SKILL.md").read_text(encoding="utf-8") frontmatter, _body = parse_frontmatter(content) snapshot = SkillUpstreamSnapshot( skill_name=skill_name, source_kind=source_kind, source_id=source_id, source_version=source_version, source_path=source_path, skill_content_hash=digest.skill_content_hash, skill_tree_hash=digest.skill_tree_hash, created_at=_utc_now(), frontmatter=normalize_frontmatter(frontmatter), staged_root=staged_root, ) self._write_json(staged_root / "upstream.json", snapshot.to_dict()) return snapshot def promote_upstream_snapshot( self, transaction: PluginSkillTransaction, snapshot: SkillUpstreamSnapshot, ) -> None: staged_root = Path(snapshot.staged_root) if snapshot.staged_root is not None else None final_root = self._upstream_snapshot_dir(snapshot.skill_name, snapshot.source_id, snapshot.skill_tree_hash) if final_root.exists(): return if staged_root is None or not staged_root.exists(): raise ValueError("Staged upstream snapshot is missing") transaction.promote_directory(staged_root, final_root) def read_upstream_snapshot( self, skill_name: str, source_id: str, skill_tree_hash: str, ) -> LoadedSkillUpstreamSnapshot | None: root = self._upstream_snapshot_dir(skill_name, source_id, skill_tree_hash) metadata = root / "upstream.json" skill_file = root / "SKILL.md" if not metadata.exists() or not skill_file.exists(): return None snapshot = SkillUpstreamSnapshot.from_dict(self._read_json(metadata)) return LoadedSkillUpstreamSnapshot( snapshot=snapshot, content=skill_file.read_text(encoding="utf-8"), root=root, ) def list_drafts(self, skill_name: str | None = None) -> list[SkillDraft]: results: list[SkillDraft] = [] names = [skill_name] if skill_name else self.list_skill_names() for name in names: if not name: continue drafts_dir = self._skill_dir(name) / "drafts" if not drafts_dir.exists(): continue for path in sorted(drafts_dir.glob("draft-*.json")): results.append(SkillDraft.from_dict(self._read_json(path))) return results def read_draft(self, skill_name: str, draft_id: str) -> SkillDraft | None: path = self._skill_dir(skill_name) / "drafts" / f"draft-{draft_id}.json" if not path.exists(): return None return SkillDraft.from_dict(self._read_json(path)) def write_draft(self, draft: SkillDraft) -> None: drafts_dir = self._skill_dir(draft.skill_name) / "drafts" drafts_dir.mkdir(parents=True, exist_ok=True) self._write_json(drafts_dir / f"draft-{draft.draft_id}.json", draft.to_dict()) def delete_draft(self, skill_name: str, draft_id: str) -> bool: path = self._skill_dir(skill_name) / "drafts" / f"draft-{draft_id}.json" if not path.exists(): return False path.unlink() return True def list_reviews(self, skill_name: str, draft_id: str | None = None) -> list[SkillReviewRecord]: reviews_dir = self._skill_dir(skill_name) / "reviews" if not reviews_dir.exists(): return [] results: list[SkillReviewRecord] = [] for path in sorted(reviews_dir.glob("review-*.json")): record = SkillReviewRecord.from_dict(self._read_json(path)) if draft_id and record.draft_id != draft_id: continue results.append(record) return results def write_review(self, review: SkillReviewRecord) -> None: reviews_dir = self._skill_dir(review.skill_name) / "reviews" reviews_dir.mkdir(parents=True, exist_ok=True) self._write_json(reviews_dir / f"review-{review.review_id}.json", review.to_dict()) def delete_reviews_for_draft(self, skill_name: str, draft_id: str) -> int: reviews_dir = self._skill_dir(skill_name) / "reviews" if not reviews_dir.exists(): return 0 deleted = 0 for path in sorted(reviews_dir.glob("review-*.json")): record = SkillReviewRecord.from_dict(self._read_json(path)) if record.draft_id != draft_id: continue path.unlink() deleted += 1 return deleted def update_index(self, index_name: str, values: list[str]) -> None: self._write_json(self.index_dir / f"{index_name}.json", {"items": list(dict.fromkeys(values))}) def read_index(self, index_name: str) -> list[str]: path = self.index_dir / f"{index_name}.json" if not path.exists(): return [] payload = self._read_json(path) if not isinstance(payload, dict): return [] items = payload.get("items") if not isinstance(items, list): return [] return [str(item) for item in items if str(item).strip()] def archive_current_version(self, skill_name: str, version: str) -> None: version_dir = self._skill_dir(skill_name) / "versions" / version if not version_dir.exists(): return archive_dir = self._skill_dir(skill_name) / "archive" / version archive_dir.parent.mkdir(parents=True, exist_ok=True) if archive_dir.exists(): return version_dir.rename(archive_dir) def _has_published_representation(self, directory: Path) -> bool: return ( (directory / "SKILL.md").exists() or (directory / "current.json").exists() or (directory / "versions").exists() ) def _skill_dir(self, name: str) -> Path: return self.root / name def _upstream_snapshot_dir(self, skill_name: str, source_id: str, skill_tree_hash: str) -> Path: return self._skill_dir(skill_name) / "upstreams" / source_id / skill_tree_hash def _iter_skill_dirs(self) -> list[Path]: return [ child for child in sorted(self.root.iterdir()) if child.is_dir() and not child.name.startswith("_") ] @staticmethod def _extract_tool_hints(frontmatter: dict[str, Any]) -> list[str]: raw = frontmatter.get("tools") if isinstance(raw, list): return [str(item).strip() for item in raw if str(item).strip()] if isinstance(raw, str): return [item.strip() for item in raw.split(",") if item.strip()] return [] @staticmethod def _read_json(path: Path) -> dict[str, Any]: payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError(f"Expected JSON object in {path}") return payload @staticmethod def _write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_name(f"{path.name}.tmp") with tmp_path.open("w", encoding="utf-8") as handle: handle.write(json_dumps(payload) + "\n") handle.flush() os.fsync(handle.fileno()) os.replace(tmp_path, path) @staticmethod def _write_text(path: Path, content: str) -> None: path.parent.mkdir(parents=True, exist_ok=True) path.write_text(content, encoding="utf-8") @staticmethod def _copy_regular_tree(source_root: Path, target_root: Path) -> None: source_root = Path(source_root) target_root = Path(target_root) for source in sorted(source_root.rglob("*"), key=lambda item: item.relative_to(source_root).as_posix()): relative = source.relative_to(source_root) if any(part in {"", ".", ".."} for part in relative.parts): raise ValueError(f"Invalid path in skill tree: {relative.as_posix()}") if source.is_symlink(): raise ValueError(f"Skill tree contains a symlink: {relative.as_posix()}") target = target_root / relative if not target.resolve().is_relative_to(target_root.resolve()): raise ValueError(f"Skill tree copy target escapes root: {relative.as_posix()}") if source.is_dir(): target.mkdir(parents=True, exist_ok=True) continue if not source.is_file(): raise ValueError(f"Skill tree contains a non-regular file: {relative.as_posix()}") target.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(source, target) def _utc_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()