202 lines
6.0 KiB
Python
202 lines
6.0 KiB
Python
"""Skills catalog 的公共辅助函数。
|
||
|
||
这里专门放“解析和校验 skill 文件”的纯函数,避免 `loader.py` 里同时承担:
|
||
|
||
1. 目录扫描
|
||
2. frontmatter 解析
|
||
3. requirements 校验
|
||
4. 文本裁剪/格式化
|
||
|
||
把这些细节拆出来之后,skills catalog 的边界会更清楚,后面无论是 reviews、publisher
|
||
还是 runtime resolver,都可以复用同一套元数据解析规则。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import shutil
|
||
from dataclasses import dataclass, field
|
||
from typing import Any
|
||
|
||
|
||
def parse_frontmatter(content: str) -> tuple[dict[str, Any], str]:
|
||
"""解析 Markdown 文件顶部的极简 frontmatter。
|
||
|
||
当前先只支持最常见的:
|
||
|
||
```md
|
||
---
|
||
key: value
|
||
key2: value2
|
||
---
|
||
body...
|
||
```
|
||
|
||
这样足够支撑第一版 skills runtime,不提前把 YAML 解析器引进来。
|
||
"""
|
||
|
||
if not content.startswith("---"):
|
||
return {}, content
|
||
|
||
match = re.match(r"^---\n(.*?)\n---\n?", content, re.DOTALL)
|
||
if match is None:
|
||
return {}, content
|
||
|
||
metadata: dict[str, Any] = {}
|
||
lines = match.group(1).splitlines()
|
||
index = 0
|
||
while index < len(lines):
|
||
line = lines[index]
|
||
if ":" not in line:
|
||
index += 1
|
||
continue
|
||
key, value = line.split(":", 1)
|
||
key = key.strip()
|
||
value = value.strip()
|
||
if not value:
|
||
items: list[str] = []
|
||
lookahead = index + 1
|
||
while lookahead < len(lines):
|
||
candidate = lines[lookahead]
|
||
stripped = candidate.strip()
|
||
if not stripped:
|
||
lookahead += 1
|
||
continue
|
||
if not stripped.startswith("- "):
|
||
break
|
||
items.append(stripped[2:].strip().strip('"\''))
|
||
lookahead += 1
|
||
if items:
|
||
metadata[key] = items
|
||
index = lookahead
|
||
continue
|
||
metadata[key] = value.strip('"\'')
|
||
index += 1
|
||
body = content[match.end():].strip()
|
||
return metadata, body
|
||
|
||
|
||
def strip_frontmatter(content: str) -> str:
|
||
"""去掉 frontmatter,只保留 skill 正文。"""
|
||
|
||
_, body = parse_frontmatter(content)
|
||
return body
|
||
|
||
|
||
@dataclass(slots=True)
|
||
class SkillTeamTemplateParseResult:
|
||
template: dict[str, Any] | None = None
|
||
warnings: list[str] = field(default_factory=list)
|
||
|
||
|
||
def extract_skill_team_template(body: str) -> SkillTeamTemplateParseResult:
|
||
matches = re.findall(r"```beaver-team-template\s*\n(.*?)\n```", body, re.DOTALL)
|
||
if not matches:
|
||
return SkillTeamTemplateParseResult()
|
||
if len(matches) != 1:
|
||
return SkillTeamTemplateParseResult(warnings=["skill defines multiple team templates"])
|
||
try:
|
||
template = json.loads(matches[0])
|
||
except json.JSONDecodeError:
|
||
return SkillTeamTemplateParseResult(warnings=["team template JSON is invalid"])
|
||
if not isinstance(template, dict) or not isinstance(template.get("nodes", []), list):
|
||
return SkillTeamTemplateParseResult(warnings=["team template must be an object with a nodes list"])
|
||
return SkillTeamTemplateParseResult(template=template)
|
||
|
||
|
||
def extract_required_tool_names(body: str) -> list[str]:
|
||
"""从 canonical skill 正文的 `## Required Tools` 段落提取工具名。
|
||
|
||
这是 frontmatter `tools` 的容错补充,不从任意正文里猜工具。只读取明确
|
||
命名的 Required Tools section,支持常见 bullet/code 格式。
|
||
"""
|
||
|
||
if not body:
|
||
return []
|
||
|
||
match = re.search(
|
||
r"(?ims)^##\s+Required\s+Tools\s*$\n(?P<section>.*?)(?=^##\s+|\Z)",
|
||
body,
|
||
)
|
||
if match is None:
|
||
return []
|
||
|
||
names: list[str] = []
|
||
for line in match.group("section").splitlines():
|
||
stripped = line.strip()
|
||
if not stripped or not stripped.startswith(("-", "*")):
|
||
continue
|
||
candidate = stripped[1:].strip()
|
||
code_matches = re.findall(r"`([^`]+)`", candidate)
|
||
raw_items = code_matches or re.split(r"[,,]", candidate)
|
||
for raw_item in raw_items:
|
||
name = raw_item.strip().strip("`\"' ")
|
||
if not name:
|
||
continue
|
||
token = name.split()[0].strip("`\"' ::-")
|
||
if re.fullmatch(r"[A-Za-z0-9_.:-]+", token) and token not in names:
|
||
names.append(token)
|
||
return names
|
||
|
||
|
||
def parse_skill_metadata_blob(raw: str) -> dict[str, Any]:
|
||
"""解析 metadata 字段里的 JSON 扩展配置。
|
||
|
||
Supports plain metadata objects and the current `openclaw` namespace.
|
||
|
||
第一版主要关心的字段有:
|
||
- `always`
|
||
- `requires`
|
||
"""
|
||
|
||
try:
|
||
data = json.loads(raw)
|
||
except (json.JSONDecodeError, TypeError):
|
||
return {}
|
||
|
||
if not isinstance(data, dict):
|
||
return {}
|
||
nested = data.get("openclaw", data)
|
||
return nested if isinstance(nested, dict) else {}
|
||
|
||
|
||
def check_requirements(metadata: dict[str, Any]) -> bool:
|
||
"""检查 skill 的最小 requirements 是否满足。"""
|
||
|
||
requires = metadata.get("requires", {})
|
||
if not isinstance(requires, dict):
|
||
return True
|
||
|
||
for binary in requires.get("bins", []):
|
||
if not shutil.which(str(binary)):
|
||
return False
|
||
for env_name in requires.get("env", []):
|
||
if not os.environ.get(str(env_name)):
|
||
return False
|
||
return True
|
||
|
||
|
||
def get_missing_requirements(metadata: dict[str, Any]) -> str:
|
||
"""返回缺失 requirements 的简短描述。"""
|
||
|
||
requires = metadata.get("requires", {})
|
||
if not isinstance(requires, dict):
|
||
return ""
|
||
|
||
missing: list[str] = []
|
||
for binary in requires.get("bins", []):
|
||
if not shutil.which(str(binary)):
|
||
missing.append(f"CLI: {binary}")
|
||
for env_name in requires.get("env", []):
|
||
if not os.environ.get(str(env_name)):
|
||
missing.append(f"ENV: {env_name}")
|
||
return ", ".join(missing)
|
||
|
||
|
||
def escape_xml(value: str) -> str:
|
||
"""给 skills summary 做最小 XML 转义。"""
|
||
|
||
return value.replace("&", "&").replace("<", "<").replace(">", ">")
|