"""Canonical hashing for plugin skill trees.""" from __future__ import annotations import hashlib import os from pathlib import Path from .models import PluginSkillFileDigest, PluginSkillTreeDigest IGNORED_METADATA_FILENAMES = {"version.json", "upstream.json"} def hash_plugin_skill_tree(root: str | Path) -> PluginSkillTreeDigest: skill_root = Path(root) if not skill_root.is_dir(): raise ValueError(f"Plugin skill root is not a directory: {skill_root}") skill_file = skill_root / "SKILL.md" if not skill_file.is_file() or skill_file.is_symlink(): raise ValueError("Plugin skill tree must contain a regular SKILL.md") file_digests: list[PluginSkillFileDigest] = [] tree_hasher = hashlib.sha256() for path in _iter_regular_files(skill_root): relative = path.relative_to(skill_root).as_posix() data = path.read_bytes() executable = _is_executable(path) content_hash = _sha256(data) file_digests.append( PluginSkillFileDigest( path=relative, size=len(data), executable=executable, content_hash=content_hash, ) ) _update_field(tree_hasher, relative.encode("utf-8")) _update_field(tree_hasher, str(len(data)).encode("ascii")) _update_field(tree_hasher, b"1" if executable else b"0") _update_field(tree_hasher, data) skill_content = skill_file.read_text(encoding="utf-8").replace("\r\n", "\n").replace("\r", "\n") return PluginSkillTreeDigest( skill_content_hash=_sha256(skill_content.encode("utf-8")), skill_tree_hash=f"sha256:{tree_hasher.hexdigest()}", files=tuple(file_digests), ) def _iter_regular_files(root: Path) -> list[Path]: results: list[Path] = [] for path in sorted(root.rglob("*"), key=lambda item: item.relative_to(root).as_posix()): relative = path.relative_to(root) if any(part in {"", ".", ".."} for part in relative.parts): raise ValueError(f"Invalid path in plugin skill tree: {relative.as_posix()}") if path.is_symlink(): raise ValueError(f"Plugin skill tree contains a symlink: {relative.as_posix()}") if path.is_dir(): continue if not path.is_file(): raise ValueError(f"Plugin skill tree contains a non-regular file: {relative.as_posix()}") if len(relative.parts) == 1 and relative.name in IGNORED_METADATA_FILENAMES: continue results.append(path) return results def _is_executable(path: Path) -> bool: return bool(path.stat().st_mode & (os.X_OK | 0o111)) def _sha256(data: bytes) -> str: return f"sha256:{hashlib.sha256(data).hexdigest()}" def _update_field(hasher: "hashlib._Hash", data: bytes) -> None: hasher.update(len(data).to_bytes(8, "big")) hasher.update(data)