"""Review-first skill installation helpers.""" from __future__ import annotations import json import secrets import shutil import zipfile from pathlib import Path, PurePosixPath from typing import Any from nanobot.utils.helpers import ensure_dir, get_workspace_state_path, safe_filename, timestamp def _is_relative_to(path: Path, root: Path) -> bool: try: path.relative_to(root) return True except ValueError: return False def _parse_frontmatter(content: str) -> dict[str, str]: if not content.startswith("---"): return {} end = content.find("\n---", 3) if end == -1: return {} metadata: dict[str, str] = {} for line in content[3:end].splitlines(): if ":" not in line: continue key, value = line.split(":", 1) metadata[key.strip()] = value.strip().strip("\"'") return metadata def _parse_skill_metadata(raw: str) -> dict[str, Any]: if not raw: return {} try: data = json.loads(raw) except json.JSONDecodeError: return {} if not isinstance(data, dict): return {} nested = data.get("nanobot", data.get("openclaw", {})) return nested if isinstance(nested, dict) else {} class SkillReviewManager: """Stage workspace skill installs until the user explicitly approves them.""" REVIEW_META_FILE = "review.json" ARCHIVE_FILE = "upload.zip" STAGED_DIR = "staged" def __init__(self, workspace: Path): self.workspace = workspace.expanduser().resolve() self.workspace_skills = ensure_dir(self.workspace / "skills") self.reviews_dir = ensure_dir(get_workspace_state_path(self.workspace) / "skill-reviews") def list_reviews(self) -> list[dict[str, Any]]: reviews: list[dict[str, Any]] = [] for review_dir in sorted(self.reviews_dir.iterdir(), reverse=True): if not review_dir.is_dir(): continue try: reviews.append(self._read_review(review_dir)) except FileNotFoundError: continue return reviews def get_review(self, review_id: str) -> dict[str, Any]: return self._read_review(self._review_dir(review_id)) def create_review_from_zip(self, filename: str, content: bytes) -> dict[str, Any]: review_id = secrets.token_hex(8) review_dir = ensure_dir(self._review_dir(review_id)) archive_path = review_dir / self.ARCHIVE_FILE archive_path.write_bytes(content) staged_root = ensure_dir(review_dir / self.STAGED_DIR) preview = self._extract_archive(archive_path, staged_root, filename) review = { "id": review_id, "status": "pending_review", "created_at": timestamp(), "archive_name": filename, **preview, } self._write_review(review_dir, review) return review def approve_review(self, review_id: str, overwrite: bool = False) -> dict[str, Any]: review_dir = self._review_dir(review_id) review = self._read_review(review_dir) if review.get("status") == "approved": return review skill_name = str(review.get("skill_name") or "").strip() if not skill_name: raise ValueError("Review is missing a skill_name") source_dir = review_dir / self.STAGED_DIR / skill_name if not source_dir.is_dir(): raise FileNotFoundError(f"Staged skill not found for review {review_id}") target_dir = self.workspace_skills / skill_name if target_dir.exists(): if not overwrite: raise FileExistsError( f"Skill '{skill_name}' already exists. Re-submit approval with overwrite=true." ) shutil.rmtree(target_dir) shutil.copytree(source_dir, target_dir) review["status"] = "approved" review["approved_at"] = timestamp() review["overwrite"] = overwrite review["installed_path"] = str(target_dir / "SKILL.md") self._write_review(review_dir, review) return review def discard_review(self, review_id: str) -> None: review_dir = self._review_dir(review_id) if not review_dir.exists(): raise FileNotFoundError(f"Skill review '{review_id}' not found") shutil.rmtree(review_dir) def _review_dir(self, review_id: str) -> Path: return self.reviews_dir / review_id def _read_review(self, review_dir: Path) -> dict[str, Any]: review_file = review_dir / self.REVIEW_META_FILE if not review_file.exists(): raise FileNotFoundError(f"Skill review metadata not found: {review_dir.name}") return json.loads(review_file.read_text(encoding="utf-8")) def _write_review(self, review_dir: Path, review: dict[str, Any]) -> None: review_file = review_dir / self.REVIEW_META_FILE review_file.write_text( json.dumps(review, ensure_ascii=False, indent=2), encoding="utf-8", ) def _extract_archive( self, archive_path: Path, staged_root: Path, upload_name: str, ) -> dict[str, Any]: with zipfile.ZipFile(archive_path, "r") as zf: file_infos = [info for info in zf.infolist() if not info.is_dir()] if not file_infos: raise ValueError("Zip archive is empty") skill_md_entries: list[str] = [] for info in file_infos: rel = PurePosixPath(info.filename) if rel.name != "SKILL.md": continue if len(rel.parts) not in (1, 2): raise ValueError( "SKILL.md must be at the archive root or inside a single top-level directory" ) skill_md_entries.append(info.filename) if not skill_md_entries: raise ValueError("Zip must contain a top-level SKILL.md file") skill_md_entry = skill_md_entries[0] skill_md_parts = PurePosixPath(skill_md_entry).parts top_level_dir = skill_md_parts[0] if len(skill_md_parts) == 2 else "" frontmatter = _parse_frontmatter( zf.read(skill_md_entry).decode("utf-8", errors="replace") ) if top_level_dir: skill_name = top_level_dir else: skill_name = frontmatter.get("name") or Path(upload_name).stem skill_name = safe_filename(skill_name).replace(" ", "-") if not skill_name: raise ValueError("Could not determine a safe skill name") staged_skill_dir = staged_root / skill_name staged_skill_dir.mkdir(parents=True, exist_ok=False) extracted_files: list[str] = [] for info in file_infos: raw_rel = PurePosixPath(info.filename) if "__MACOSX" in raw_rel.parts or raw_rel.name == ".DS_Store": continue if top_level_dir: if not raw_rel.parts or raw_rel.parts[0] != top_level_dir: continue rel_parts = raw_rel.parts[1:] else: rel_parts = raw_rel.parts if not rel_parts: continue if any(part in {"", ".", ".."} for part in rel_parts): raise ValueError(f"Unsafe archive entry: {info.filename}") dest = staged_skill_dir.joinpath(*rel_parts) dest.parent.mkdir(parents=True, exist_ok=True) resolved_dest = dest.resolve() if not _is_relative_to(resolved_dest, staged_skill_dir.resolve()): raise ValueError(f"Unsafe archive entry: {info.filename}") with zf.open(info) as src, open(dest, "wb") as dst: shutil.copyfileobj(src, dst) extracted_files.append(PurePosixPath(*rel_parts).as_posix()) if not (staged_skill_dir / "SKILL.md").exists(): raise ValueError("Staged skill is missing SKILL.md after extraction") skill_meta = _parse_skill_metadata(frontmatter.get("metadata", "")) target_dir = self.workspace_skills / skill_name return { "skill_name": skill_name, "declared_name": frontmatter.get("name", skill_name), "description": frontmatter.get("description", ""), "metadata": frontmatter, "requires": skill_meta.get("requires", {}), "file_count": len(extracted_files), "files": sorted(extracted_files), "target_exists": target_dir.exists(), "target_path": str(target_dir / "SKILL.md"), "staged_path": str(staged_skill_dir / "SKILL.md"), }