285 lines
11 KiB
Python
285 lines
11 KiB
Python
"""Skills loader for agent capabilities."""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
from pathlib import Path
|
||
|
||
# Default builtin skills directory (relative to this file)
|
||
BUILTIN_SKILLS_DIR = Path(__file__).parent.parent / "skills"
|
||
|
||
|
||
class SkillsLoader:
|
||
"""
|
||
Loader for agent skills.
|
||
|
||
Skills are markdown files (SKILL.md) that teach the agent how to use
|
||
specific tools or perform certain tasks.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
workspace: Path,
|
||
builtin_skills_dir: Path | None = None,
|
||
extra_dirs: list[Path] | None = None,
|
||
):
|
||
self.workspace = workspace
|
||
self.workspace_skills = workspace / "skills"
|
||
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
|
||
if extra_dirs is None:
|
||
from nanobot.agent.plugins import PluginLoader
|
||
|
||
extra_dirs = PluginLoader(workspace).get_skill_dirs()
|
||
self.extra_dirs: list[Path] = extra_dirs
|
||
|
||
def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
|
||
"""
|
||
List all available skills.
|
||
|
||
Args:
|
||
filter_unavailable: If True, filter out skills with unmet requirements.
|
||
|
||
Returns:
|
||
List of skill info dicts with 'name', 'path', 'source'.
|
||
"""
|
||
skills = []
|
||
|
||
# Workspace skills (highest priority)
|
||
if self.workspace_skills.exists():
|
||
for skill_dir in self.workspace_skills.iterdir():
|
||
if skill_dir.is_dir():
|
||
skill_file = skill_dir / "SKILL.md"
|
||
if skill_file.exists():
|
||
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "workspace"})
|
||
|
||
# Extra skill roots (e.g. plugin-provided skills)
|
||
for extra_dir in self.extra_dirs:
|
||
if extra_dir.exists():
|
||
for skill_dir in extra_dir.iterdir():
|
||
if skill_dir.is_dir():
|
||
skill_file = skill_dir / "SKILL.md"
|
||
if skill_file.exists() and not any(s["name"] == skill_dir.name for s in skills):
|
||
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "plugin"})
|
||
|
||
# Built-in skills
|
||
if self.builtin_skills and self.builtin_skills.exists():
|
||
for skill_dir in self.builtin_skills.iterdir():
|
||
if skill_dir.is_dir():
|
||
skill_file = skill_dir / "SKILL.md"
|
||
if skill_file.exists() and not any(s["name"] == skill_dir.name for s in skills):
|
||
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "builtin"})
|
||
|
||
# Filter by requirements
|
||
if filter_unavailable:
|
||
return [s for s in skills if self._check_requirements(self._get_skill_meta(s["name"]))]
|
||
return skills
|
||
|
||
def load_skill(self, name: str) -> str | None:
|
||
"""
|
||
Load a skill by name.
|
||
|
||
Args:
|
||
name: Skill name (directory name).
|
||
|
||
Returns:
|
||
Skill content or None if not found.
|
||
"""
|
||
# Check workspace first
|
||
workspace_skill = self.workspace_skills / name / "SKILL.md"
|
||
if workspace_skill.exists():
|
||
return workspace_skill.read_text(encoding="utf-8")
|
||
|
||
# Check plugin-provided roots
|
||
for extra_dir in self.extra_dirs:
|
||
extra_skill = extra_dir / name / "SKILL.md"
|
||
if extra_skill.exists():
|
||
return extra_skill.read_text(encoding="utf-8")
|
||
|
||
# Check built-in
|
||
if self.builtin_skills:
|
||
builtin_skill = self.builtin_skills / name / "SKILL.md"
|
||
if builtin_skill.exists():
|
||
return builtin_skill.read_text(encoding="utf-8")
|
||
|
||
return None
|
||
|
||
def load_skills_for_context(self, skill_names: list[str]) -> str:
|
||
"""
|
||
Load specific skills for inclusion in agent context.
|
||
|
||
Args:
|
||
skill_names: List of skill names to load.
|
||
|
||
Returns:
|
||
Formatted skills content.
|
||
"""
|
||
parts = []
|
||
for name in skill_names:
|
||
content = self.load_skill(name)
|
||
if content:
|
||
content = self._strip_frontmatter(content)
|
||
parts.append(f"### Skill: {name}\n\n{content}")
|
||
|
||
return "\n\n---\n\n".join(parts) if parts else ""
|
||
|
||
def build_skills_summary(self) -> str:
|
||
"""
|
||
Build a summary of all skills (name, description, path, availability).
|
||
|
||
This is used for progressive loading - the agent can read the full
|
||
skill content using read_file when needed.
|
||
|
||
Returns:
|
||
XML-formatted skills summary.
|
||
"""
|
||
all_skills = self.list_skills(filter_unavailable=False)
|
||
if not all_skills:
|
||
return ""
|
||
|
||
def escape_xml(s: str) -> str:
|
||
return s.replace("&", "&").replace("<", "<").replace(">", ">")
|
||
|
||
lines = ["<skills>"]
|
||
for s in all_skills:
|
||
name = escape_xml(s["name"])
|
||
path = s["path"]
|
||
desc = escape_xml(self._get_skill_description(s["name"]))
|
||
skill_meta = self._get_skill_meta(s["name"])
|
||
available = self._check_requirements(skill_meta)
|
||
|
||
lines.append(f" <skill available=\"{str(available).lower()}\">")
|
||
lines.append(f" <name>{name}</name>")
|
||
lines.append(f" <description>{desc}</description>")
|
||
lines.append(f" <location>{path}</location>")
|
||
|
||
# Show missing requirements for unavailable skills
|
||
if not available:
|
||
missing = self._get_missing_requirements(skill_meta)
|
||
if missing:
|
||
lines.append(f" <requires>{escape_xml(missing)}</requires>")
|
||
|
||
lines.append(" </skill>")
|
||
lines.append("</skills>")
|
||
|
||
return "\n".join(lines)
|
||
|
||
def _get_missing_requirements(self, skill_meta: dict) -> str:
|
||
"""Get a description of missing requirements."""
|
||
missing = []
|
||
requires = skill_meta.get("requires", {})
|
||
for b in requires.get("bins", []):
|
||
if not shutil.which(b):
|
||
missing.append(f"CLI: {b}")
|
||
for env in requires.get("env", []):
|
||
if not os.environ.get(env):
|
||
missing.append(f"ENV: {env}")
|
||
return ", ".join(missing)
|
||
|
||
def _get_skill_description(self, name: str) -> str:
|
||
"""Get the description of a skill from its frontmatter."""
|
||
meta = self.get_skill_metadata(name)
|
||
if meta and meta.get("description"):
|
||
return meta["description"]
|
||
return name # Fallback to skill name
|
||
|
||
def _strip_frontmatter(self, content: str) -> str:
|
||
"""Remove YAML frontmatter from markdown content."""
|
||
if content.startswith("---"):
|
||
match = re.match(r"^---\n.*?\n---\n", content, re.DOTALL)
|
||
if match:
|
||
return content[match.end():].strip()
|
||
return content
|
||
|
||
def _parse_nanobot_metadata(self, raw: str) -> dict:
|
||
"""Parse skill metadata JSON from frontmatter (supports nanobot and openclaw keys)."""
|
||
try:
|
||
data = json.loads(raw)
|
||
return data.get("nanobot", data.get("openclaw", {})) if isinstance(data, dict) else {}
|
||
except (json.JSONDecodeError, TypeError):
|
||
return {}
|
||
|
||
def _check_requirements(self, skill_meta: dict) -> bool:
|
||
"""Check if skill requirements are met (bins, env vars)."""
|
||
requires = skill_meta.get("requires", {})
|
||
for b in requires.get("bins", []):
|
||
if not shutil.which(b):
|
||
return False
|
||
for env in requires.get("env", []):
|
||
if not os.environ.get(env):
|
||
return False
|
||
return True
|
||
|
||
def _get_skill_meta(self, name: str) -> dict:
|
||
"""Get nanobot metadata for a skill (cached in frontmatter)."""
|
||
meta = self.get_skill_metadata(name) or {}
|
||
return self._parse_nanobot_metadata(meta.get("metadata", ""))
|
||
|
||
def get_always_skills(self) -> list[str]:
|
||
"""Get skills marked as always=true that meet requirements."""
|
||
result = []
|
||
for s in self.list_skills(filter_unavailable=True):
|
||
meta = self.get_skill_metadata(s["name"]) or {}
|
||
skill_meta = self._parse_nanobot_metadata(meta.get("metadata", ""))
|
||
if skill_meta.get("always") or meta.get("always"):
|
||
result.append(s["name"])
|
||
return result
|
||
|
||
def get_skill_metadata(self, name: str) -> dict | None:
|
||
"""
|
||
Get metadata from a skill's frontmatter.
|
||
|
||
Args:
|
||
name: Skill name.
|
||
|
||
Returns:
|
||
Metadata dict or None.
|
||
"""
|
||
content = self.load_skill(name)
|
||
if not content:
|
||
return None
|
||
|
||
if content.startswith("---"):
|
||
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
|
||
if match:
|
||
# Simple YAML parsing
|
||
metadata = {}
|
||
for line in match.group(1).split("\n"):
|
||
if ":" in line:
|
||
key, value = line.split(":", 1)
|
||
metadata[key.strip()] = value.strip().strip('"\'')
|
||
return metadata
|
||
|
||
return None
|
||
|
||
def get_skill_agent_cards(self, name: str) -> list[dict]:
|
||
"""从 skill 元数据里提取 A2A agent card 声明。"""
|
||
# 技能 frontmatter 里的 metadata 是字符串形式,先复用现有解析逻辑拿到 nanobot 扩展字段。
|
||
meta = self.get_skill_metadata(name) or {}
|
||
skill_meta = self._parse_nanobot_metadata(meta.get("metadata", ""))
|
||
cards = skill_meta.get("agent_cards", [])
|
||
if not isinstance(cards, list):
|
||
return []
|
||
|
||
result = []
|
||
for idx, card in enumerate(cards):
|
||
if not isinstance(card, dict):
|
||
continue
|
||
# 复制一份,避免直接修改原 metadata 结构。
|
||
item = dict(card)
|
||
# 对缺失字段做兜底补全,保证后续 AgentRegistry 可以稳定消费。
|
||
item.setdefault("id", item.get("name") or f"{name}-agent-{idx + 1}")
|
||
item.setdefault("name", item["id"])
|
||
item.setdefault("description", meta.get("description", item["name"]))
|
||
# 额外挂回 skill_name,方便前端展示来源,也便于后续定位声明位置。
|
||
item["skill_name"] = name
|
||
result.append(item)
|
||
return result
|
||
|
||
def list_skill_agent_cards(self) -> list[dict]:
|
||
"""聚合所有可见 skill 中声明的 agent card。"""
|
||
cards = []
|
||
for skill in self.list_skills(filter_unavailable=False):
|
||
cards.extend(self.get_skill_agent_cards(skill["name"]))
|
||
return cards
|