Files
beaver_project/app-instance/backend/beaver/skills/catalog/loader.py
steven_li 5ba5c7e4c1 feat(app-instance): 集成Beaver后端并更新配置管理
集成新的Beaver后端服务到应用实例中,替换原有的nanobot实现。

主要变更包括:
- 在Dockerfile和环境配置中添加Beaver相关路径和配置变量
- 更新工作目录结构从.nanobot到.beaver
- 实现Beaver引擎加载器,支持配置文件加载和工具组装
- 添加内置工具如ListDirectoryTool、ReadFileTool、SearchFilesTool
- 更新消息处理流程,支持通道适配器和网关模式
- 重构技能系统,支持显式工具提示和嵌入式检索
- 改进错误处理和生命周期管理

此变更使应用实例能够使用统一的Beaver后端进行AI代理运行时管理。
2026-04-27 17:37:40 +08:00

329 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Beaver skills catalog loader。
第一版目标非常明确:
1. 扫描技能目录
2. 读取 `SKILL.md`
3. 解析前置元数据
4. 生成可注入上下文的正文与索引
这层不负责:
1. 动态选择本轮应该启用哪些 skill
2. skill review / publishing
3. skill 自动学习
这些决策属于 resolver 或更高层工作流。
"""
from __future__ import annotations
from dataclasses import dataclass
import json
from pathlib import Path
from typing import Any
from .utils import (
check_requirements,
escape_xml,
get_missing_requirements,
parse_frontmatter,
parse_skill_metadata_blob,
strip_frontmatter,
)
@dataclass(slots=True)
class SkillRecord:
"""单个 skill 的目录级元数据。"""
name: str
path: Path
source: str
class SkillsLoader:
"""从 workspace/builtin 目录中发现并读取 skills。"""
def __init__(
self,
workspace: str | Path,
*,
builtin_skills_dir: str | Path | None = None,
extra_dirs: list[str | Path] | None = None,
) -> None:
self.workspace = Path(workspace)
self.workspace_skills = self.workspace / "skills"
self.builtin_skills = Path(builtin_skills_dir) if builtin_skills_dir is not None else Path(__file__).resolve().parent.parent / "builtin"
self.extra_dirs = [Path(item) for item in (extra_dirs or [])]
def list_skills(self, *, filter_unavailable: bool = True) -> list[SkillRecord]:
"""列出当前可见的 skills。
优先级:
1. workspace
2. extra/plugin 目录
3. builtin
重名 skill 只保留优先级更高的那一个。
"""
ordered_roots: list[tuple[str, Path]] = [
("workspace", self.workspace_skills),
*[("plugin", path) for path in self.extra_dirs],
("builtin", self.builtin_skills),
]
found: dict[str, SkillRecord] = {}
for source, root in ordered_roots:
if not root.exists():
continue
for skill_dir in root.iterdir():
skill_file = skill_dir / "SKILL.md"
if not skill_dir.is_dir() or not skill_file.exists():
continue
name = skill_dir.name
if name in found:
continue
record = SkillRecord(name=name, path=skill_file, source=source)
if filter_unavailable and not self._record_available(record):
continue
found[name] = record
return list(found.values())
def load_skill(self, name: str) -> str | None:
"""按名称加载 skill 原始内容。"""
record = self._find_record(name)
if record is None:
return None
return record.path.read_text(encoding="utf-8")
def get_skill_record(self, name: str) -> SkillRecord | None:
"""按名称返回 skill record。"""
return self._find_record(name)
def get_skill_metadata(self, name: str) -> dict[str, Any] | None:
"""读取 skill frontmatter 元数据。"""
content = self.load_skill(name)
if content is None:
return None
metadata, _ = parse_frontmatter(content)
return metadata
def get_skill_tool_hints(self, name: str) -> list[str]:
"""读取 skill 显式声明的推荐工具。
第一版只信任显式 metadata不从正文里猜
- `tools: read_file, search_files`
- `tools: ["read_file", "search_files"]`
- YAML-like list:
tools:
- read_file
- search_files
- 兼容 metadata JSON blob 里的 `tools`
"""
frontmatter = self.get_skill_metadata(name) or {}
meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", ""))
names = [
*self._coerce_tool_names(frontmatter.get("tools")),
*self._coerce_tool_names(meta_blob.get("tools")),
*self._coerce_tool_names(meta_blob.get("required_tools")),
]
result: list[str] = []
for item in names:
if item and item not in result:
result.append(item)
return result
def load_skills_for_context(self, skill_names: list[str]) -> str:
"""加载指定 skills 的正文,并整理成上下文块。"""
sections: list[str] = []
for name in skill_names:
content = self.load_skill(name)
if not content:
continue
body = strip_frontmatter(content).strip()
if not body:
continue
sections.append(f"## {name}\n\n{body}")
return "\n\n".join(sections)
def build_skills_summary(self) -> str:
"""构建可注入 system prompt 的 skills index。
虽然函数名还沿用 `summary`,但当前语义已经更接近 Hermes 的 skills index
- 这里只告诉模型“系统里有哪些 skill 可用”
- 不负责把 skill 正文塞进 system prompt
- 真正激活的 skill 正文由 resolver/builder 走显式消息注入
"""
skills = self.list_skills(filter_unavailable=False)
if not skills:
return ""
lines = ["<skills>"]
for record in skills:
frontmatter = self.get_skill_metadata(record.name) or {}
meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", ""))
available = check_requirements(meta_blob)
description = frontmatter.get("description") or record.name
load_hint = f'Use skill_view(name="{record.name}") to load the full skill.'
lines.append(f' <skill available="{str(available).lower()}">')
lines.append(f" <name>{escape_xml(record.name)}</name>")
lines.append(f" <description>{escape_xml(description)}</description>")
lines.append(f" <load_hint>{escape_xml(load_hint)}</load_hint>")
support_files = self.list_skill_supporting_files(record.name)
if support_files:
lines.append(" <supporting_files>")
for file_path in support_files[:12]:
lines.append(f" <file>{escape_xml(file_path)}</file>")
if len(support_files) > 12:
lines.append(" <file>...additional files omitted...</file>")
lines.append(" </supporting_files>")
if not available:
missing = get_missing_requirements(meta_blob)
if missing:
lines.append(f" <requires>{escape_xml(missing)}</requires>")
lines.append(" </skill>")
lines.append("</skills>")
return "\n".join(lines)
def build_selection_candidates(self) -> list[dict[str, str]]:
"""构建给 LLM selector 使用的候选 skill 摘要。
这里刻意保持精简,只给:
- `name`
- `description`
选择器的任务只是“从候选里挑名字”,不是直接阅读完整 skill 正文。
真正激活后的 skill 正文仍然在后续阶段按需加载。
"""
candidates: list[dict[str, str]] = []
for record in self.list_skills(filter_unavailable=True):
frontmatter = self.get_skill_metadata(record.name) or {}
description = str(frontmatter.get("description") or "").strip()
if not description:
raw_content = self.load_skill(record.name) or ""
body = strip_frontmatter(raw_content).strip()
if body:
description = " ".join(body.splitlines()[:3])[:240].strip()
candidates.append(
{
"name": record.name,
"description": description or record.name,
}
)
return candidates
def list_skill_supporting_files(self, name: str) -> list[str]:
"""列出 skill 目录下可按需查看的支持文件相对路径。"""
record = self._find_record(name)
if record is None:
return []
skill_dir = record.path.parent
results: list[str] = []
for subdir in ("references", "templates", "scripts", "assets"):
root = skill_dir / subdir
if not root.exists():
continue
for file in sorted(root.rglob("*")):
if file.is_file() and not file.is_symlink():
results.append(str(file.relative_to(skill_dir)))
return results
def view_skill(self, name: str, file_path: str | None = None) -> tuple[str, str] | None:
"""读取 skill 正文或其支持文件。
返回 `(display_name, content)`
- `display_name` 用于提示当前读取的是 skill 本体还是某个支持文件
- `content` 为实际文本内容
"""
record = self._find_record(name)
if record is None:
return None
if not self._record_available(record):
frontmatter = self.get_skill_metadata(name) or {}
meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", ""))
missing = get_missing_requirements(meta_blob)
detail = f" Missing requirements: {missing}." if missing else ""
raise ValueError(f"Skill '{name}' is currently unavailable.{detail}")
skill_dir = record.path.parent
if not file_path:
return ("SKILL.md", self._read_text_file(record.path, display_name="SKILL.md"))
candidate = (skill_dir / file_path).resolve()
try:
candidate.relative_to(skill_dir.resolve())
except ValueError as exc:
raise ValueError("Requested skill file must stay within the skill directory") from exc
if not candidate.exists() or not candidate.is_file():
raise FileNotFoundError(f"Skill file '{file_path}' does not exist")
display_name = str(candidate.relative_to(skill_dir))
return (display_name, self._read_text_file(candidate, display_name=display_name))
def get_always_skills(self) -> list[str]:
"""返回标记为 always 的可用 skill 名称。"""
result: list[str] = []
for record in self.list_skills(filter_unavailable=True):
frontmatter = self.get_skill_metadata(record.name) or {}
meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", ""))
if meta_blob.get("always") or str(frontmatter.get("always", "")).lower() == "true":
result.append(record.name)
return result
@staticmethod
def _coerce_tool_names(value: Any) -> list[str]:
if value is None:
return []
if isinstance(value, str):
raw = value.strip()
if not raw:
return []
if raw.startswith("["):
try:
parsed = json.loads(raw)
except Exception:
parsed = None
if isinstance(parsed, list):
return [str(item).strip() for item in parsed if str(item).strip()]
return [item.strip() for item in raw.split(",") if item.strip()]
if isinstance(value, (list, tuple, set)):
return [str(item).strip() for item in value if str(item).strip()]
return []
def _find_record(self, name: str) -> SkillRecord | None:
for record in self.list_skills(filter_unavailable=False):
if record.name == name:
return record
return None
def _record_available(self, record: SkillRecord) -> bool:
content = record.path.read_text(encoding="utf-8")
frontmatter, _ = parse_frontmatter(content)
meta_blob = parse_skill_metadata_blob(frontmatter.get("metadata", ""))
return check_requirements(meta_blob)
@staticmethod
def _read_text_file(path: Path, *, display_name: str) -> str:
try:
return path.read_text(encoding="utf-8")
except UnicodeDecodeError as exc:
raise ValueError(
f"Skill file '{display_name}' is not UTF-8 text and cannot be viewed with skill_view."
) from exc
def _skill_available(self, name: str) -> bool:
record = self._find_record(name)
if record is None:
return False
return self._record_available(record)