"""Beaver skills catalog loader。 第一版目标非常明确: 1. 扫描技能目录 2. 读取 `SKILL.md` 3. 解析前置元数据 4. 生成可注入上下文的正文与索引 这层不负责: 1. 动态选择本轮应该启用哪些 skill 2. skill review / publishing 3. skill 自动学习 这些决策属于 resolver 或更高层工作流。 """ from __future__ import annotations from dataclasses import dataclass, field import json from pathlib import Path from typing import Any from beaver.skills.specs.storage import SkillSpecStore from .utils import ( check_requirements, escape_xml, get_missing_requirements, parse_frontmatter, parse_skill_metadata_blob, strip_frontmatter, ) @dataclass(slots=True) class SkillRecord: """单个 skill 的目录级元数据。""" name: str path: Path source: str version: str = "legacy" content_hash: str | None = None source_kind: str = "legacy" status: str = "active" tool_hints: list[str] = field(default_factory=list) frontmatter: dict[str, Any] = field(default_factory=dict) description: str = "" class SkillsLoader: """从 workspace/builtin 目录中发现并读取 skills。""" def __init__( self, workspace: str | Path, *, builtin_skills_dir: str | Path | None = None, extra_dirs: list[str | Path] | None = None, skill_store: SkillSpecStore | None = None, ) -> None: self.workspace = Path(workspace) self.workspace_skills = self.workspace / "skills" self.builtin_skills = Path(builtin_skills_dir) if builtin_skills_dir is not None else Path(__file__).resolve().parent.parent / "builtin" self.extra_dirs = [Path(item) for item in (extra_dirs or [])] self.skill_store = skill_store or SkillSpecStore(self.workspace) def list_skills( self, *, filter_unavailable: bool = True, include_internal: bool = False, ) -> list[SkillRecord]: """列出当前可见的 skills。 优先级: 1. workspace 2. extra/plugin 目录 3. builtin 重名 skill 只保留优先级更高的那一个。 """ found: dict[str, SkillRecord] = {} for record in self.list_published_skills(filter_unavailable=filter_unavailable): if record.name in found: continue if not include_internal and self._record_internal(record): continue if filter_unavailable and not self._record_available(record): continue found[record.name] = record for source, root in [ *[("plugin", path) for path in self.extra_dirs], ("builtin", self.builtin_skills), ]: if not root.exists(): continue for skill_dir in root.iterdir(): skill_file = skill_dir / "SKILL.md" if not skill_dir.is_dir() or not skill_file.exists(): continue name = skill_dir.name if name in found: continue frontmatter, body = parse_frontmatter(skill_file.read_text(encoding="utf-8")) if not include_internal and _truthy(frontmatter.get("internal")): continue normalized_frontmatter = dict(frontmatter) record = SkillRecord( name=name, path=skill_file, source=source, version="legacy", source_kind=source, tool_hints=self._coerce_tool_names(frontmatter.get("tools")), frontmatter=normalized_frontmatter, description=str(frontmatter.get("description") or summarize_body(body) or name), ) if filter_unavailable and not self._record_available(record): continue found[name] = record return list(found.values()) def list_published_skills(self, *, filter_unavailable: bool = True) -> list[SkillRecord]: """只列 workspace 中正式 published 的 skill catalog。""" results: list[SkillRecord] = [] for name in self.skill_store.list_published_skill_names(): loaded = self.skill_store.read_published_skill(name) if loaded is None: continue if loaded.version.version == "legacy": path = self.workspace_skills / name / "SKILL.md" else: path = self.workspace_skills / name / "versions" / loaded.version.version / "SKILL.md" record = SkillRecord( name=name, path=path, source="workspace", version=loaded.version.version, content_hash=loaded.version.content_hash, source_kind=str(loaded.version.provenance.get("source_kind") or "workspace"), status=str(loaded.version.review_state or "published"), tool_hints=list(loaded.version.tool_hints), frontmatter=dict(loaded.version.frontmatter), description=str(loaded.version.frontmatter.get("description") or loaded.version.summary or name), ) if filter_unavailable and not self._record_available(record): continue results.append(record) return results def get_current_version(self, name: str) -> str | None: record = self._find_record(name) return record.version if record is not None else None def load_published_skill(self, name: str, version: str | None = None) -> str | None: loaded = self.skill_store.read_published_skill(name, version=version) if loaded is not None: return loaded.content return self.load_skill(name) def load_skill(self, name: str) -> str | None: """按名称加载 skill 原始内容。""" record = self._find_record(name) if record is None: return None return record.path.read_text(encoding="utf-8") def get_skill_record(self, name: str) -> SkillRecord | None: """按名称返回 skill record。""" return self._find_record(name) def get_skill_metadata(self, name: str) -> dict[str, Any] | None: """读取 skill frontmatter 元数据。""" record = self._find_record(name) if record is not None and record.frontmatter: return dict(record.frontmatter) content = self.load_skill(name) if content is None: return None metadata, _ = parse_frontmatter(content) return metadata def get_skill_tool_hints(self, name: str) -> list[str]: """读取 skill 显式声明的推荐工具。 第一版只信任显式 metadata,不从正文里猜: - `tools: read_file, search_files` - `tools: ["read_file", "search_files"]` - YAML-like list: tools: - read_file - search_files - 兼容 metadata JSON blob 里的 `tools` """ record = self._find_record(name) if record is not None and record.tool_hints: return list(record.tool_hints) frontmatter = self.get_skill_metadata(name) or {} meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", "")) names = [ *self._coerce_tool_names(frontmatter.get("tools")), *self._coerce_tool_names(meta_blob.get("tools")), *self._coerce_tool_names(meta_blob.get("required_tools")), ] result: list[str] = [] for item in names: if item and item not in result: result.append(item) return result def load_skills_for_context(self, skill_names: list[str]) -> str: """加载指定 skills 的正文,并整理成上下文块。""" sections: list[str] = [] for name in skill_names: content = self.load_published_skill(name) if not content: continue body = strip_frontmatter(content).strip() if not body: continue sections.append(f"## {name}\n\n{body}") return "\n\n".join(sections) def build_skills_summary(self) -> str: """构建可注入 system prompt 的 skills index。 虽然函数名还沿用 `summary`,但当前语义是轻量 skills index: - 这里只告诉模型“系统里有哪些 skill 可用” - 不负责把 skill 正文塞进 system prompt - 真正激活的 skill 正文由 resolver/builder 走显式消息注入 """ skills = self.list_skills(filter_unavailable=False) if not skills: return "" lines = [""] for record in skills: frontmatter = record.frontmatter or self.get_skill_metadata(record.name) or {} meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", "")) available = check_requirements(meta_blob) description = frontmatter.get("description") or record.description or record.name lines.append(f' ') lines.append(f" {escape_xml(record.name)}") lines.append(f" {escape_xml(description)}") lines.append(f" {escape_xml(record.version)}") support_files = self.list_skill_supporting_files(record.name) if support_files: lines.append(" ") for file_path in support_files[:12]: lines.append(f" {escape_xml(file_path)}") if len(support_files) > 12: lines.append(" ...additional files omitted...") lines.append(" ") if not available: missing = get_missing_requirements(meta_blob) if missing: lines.append(f" {escape_xml(missing)}") lines.append(" ") lines.append("") return "\n".join(lines) def build_selection_candidates(self) -> list[dict[str, str]]: """构建给 LLM selector 使用的候选 skill 摘要。 这里刻意保持精简,只给: - `name` - `description` 选择器的任务只是“从候选里挑名字”,不是直接阅读完整 skill 正文。 真正激活后的 skill 正文仍然在后续阶段按需加载。 """ candidates: list[dict[str, str]] = [] for record in self.list_skills(filter_unavailable=True): frontmatter = record.frontmatter or self.get_skill_metadata(record.name) or {} description = str(frontmatter.get("description") or record.description or "").strip() if not description: raw_content = self.load_published_skill(record.name) or "" body = strip_frontmatter(raw_content).strip() if body: description = " ".join(body.splitlines()[:3])[:240].strip() candidates.append( { "name": record.name, "description": description or record.name, "version": record.version, "content_hash": record.content_hash or "", } ) return candidates def list_skill_supporting_files(self, name: str) -> list[str]: """列出 skill 目录下可按需查看的支持文件相对路径。""" record = self._find_record(name) if record is None: return [] skill_dir = record.path.parent results: list[str] = [] for subdir in ("references", "templates", "scripts", "assets"): root = skill_dir / subdir if not root.exists(): continue for file in sorted(root.rglob("*")): if file.is_file() and not file.is_symlink(): results.append(str(file.relative_to(skill_dir))) return results def view_skill(self, name: str, file_path: str | None = None) -> tuple[str, str] | None: """读取 skill 正文或其支持文件。 返回 `(display_name, content)`: - `display_name` 用于提示当前读取的是 skill 本体还是某个支持文件 - `content` 为实际文本内容 """ record = self._find_record(name) if record is None: return None if not self._record_available(record): frontmatter = record.frontmatter or self.get_skill_metadata(name) or {} meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", "")) missing = get_missing_requirements(meta_blob) detail = f" Missing requirements: {missing}." if missing else "" raise ValueError(f"Skill '{name}' is currently unavailable.{detail}") skill_dir = record.path.parent if not file_path: return ("SKILL.md", self._read_text_file(record.path, display_name="SKILL.md")) candidate = (skill_dir / file_path).resolve() try: candidate.relative_to(skill_dir.resolve()) except ValueError as exc: raise ValueError("Requested skill file must stay within the skill directory") from exc if not candidate.exists() or not candidate.is_file(): raise FileNotFoundError(f"Skill file '{file_path}' does not exist") display_name = str(candidate.relative_to(skill_dir)) return (display_name, self._read_text_file(candidate, display_name=display_name)) def get_always_skills(self) -> list[str]: """返回标记为 always 的可用 skill 名称。""" result: list[str] = [] for record in self.list_skills(filter_unavailable=True): frontmatter = record.frontmatter or self.get_skill_metadata(record.name) or {} meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", "")) if meta_blob.get("always") or str(frontmatter.get("always", "")).lower() == "true": result.append(record.name) return result @staticmethod def _coerce_tool_names(value: Any) -> list[str]: if value is None: return [] if isinstance(value, str): raw = value.strip() if not raw: return [] if raw.startswith("["): try: parsed = json.loads(raw) except Exception: parsed = None if isinstance(parsed, list): return [str(item).strip() for item in parsed if str(item).strip()] return [item.strip() for item in raw.split(",") if item.strip()] if isinstance(value, (list, tuple, set)): return [str(item).strip() for item in value if str(item).strip()] return [] def _find_record(self, name: str) -> SkillRecord | None: for record in self.list_skills(filter_unavailable=False, include_internal=True): if record.name == name: return record return None @staticmethod def _record_internal(record: SkillRecord) -> bool: return _truthy((record.frontmatter or {}).get("internal")) def _record_available(self, record: SkillRecord) -> bool: content = record.path.read_text(encoding="utf-8") frontmatter, _ = parse_frontmatter(content) meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", "")) return check_requirements(meta_blob) @staticmethod def _read_text_file(path: Path, *, display_name: str) -> str: try: return path.read_text(encoding="utf-8") except UnicodeDecodeError as exc: raise ValueError( f"Skill file '{display_name}' is not UTF-8 text and cannot be viewed with skill_view." ) from exc def _skill_available(self, name: str) -> bool: record = self._find_record(name) if record is None: return False return self._record_available(record) def summarize_body(body: str) -> str: cleaned = " ".join(line.strip() for line in body.splitlines()[:3] if line.strip()).strip() return cleaned[:240] def _truthy(value: Any) -> bool: if isinstance(value, bool): return value return str(value or "").strip().lower() in {"1", "true", "yes", "y", "on"}