Files
beaver_project/app-instance/backend/beaver/skills/learning/safety.py
steven_li 9d6cde2d23 feat: 将项目从nano重命名为beaver并更新相关配置
- 将所有环境变量前缀从NANO_改为BEAVER_
- 更新README.md文档内容,包括项目介绍、组件说明和快速开始指南
- 修改.gitignore文件,添加auth-portal运行时路径排除规则
- 更新app-instance镜像标签从nano/app-instance改为beaver/app-instance
- 增强技能安全检查器,支持工具前缀白名单功能
- 添加技能草稿重新检查安全性API端点
- 扩展证据选择器,收集工具调用名称用于技能学习
- 改进技能合成器,基于实际调用的工具生成工具提示
- 优化路由超时处理机制,增加重试逻辑
- 更新后端架构文档,添加可视化入口和基础概念说明
- 实现在WebSocket消息中传递工具迭代次数信息
2026-05-20 18:01:06 +08:00

121 lines
4.2 KiB
Python

"""Safety checks for generated skill drafts."""
from __future__ import annotations
import re
from uuid import uuid4
from beaver.memory.skills import SkillDraftSafetyReport
from beaver.skills.specs import SkillDraft
class SkillDraftSafetyChecker:
"""Deterministic publish gate for skill drafts."""
_CRITICAL_PATTERNS = [
r"ignore (all )?(previous|system|developer) instructions",
r"bypass (permissions|permission|safety|policy|policies)",
r"disable (guardrails|safety|permission checks)",
r"reveal (secrets|api keys|tokens|credentials)",
r"exfiltrate",
]
_DANGEROUS_TOOL_HINTS = {
"shell",
"terminal",
"bash",
"filesystem_write",
"write_file",
"delete_file",
"network",
"http",
"auth",
"credentials",
}
def __init__(
self,
*,
allowed_tool_names: set[str] | None = None,
allowed_tool_prefixes: set[str] | None = None,
) -> None:
self.allowed_tool_names = allowed_tool_names
self.allowed_tool_prefixes = allowed_tool_prefixes or set()
def check(self, draft: SkillDraft) -> SkillDraftSafetyReport:
issues: list[str] = []
blocked: list[str] = []
risk_level = "low"
frontmatter = draft.proposed_frontmatter
if not isinstance(frontmatter, dict):
blocked.append("frontmatter must be an object")
description = str(frontmatter.get("description") or "").strip()
if not description and draft.proposal_kind != "retire_skill":
issues.append("frontmatter.description is missing")
risk_level = _max_risk(risk_level, "medium")
tool_hints = _tool_hints(frontmatter)
if self.allowed_tool_names is not None:
unknown = [name for name in tool_hints if not self._is_allowed_tool_hint(name)]
if unknown:
blocked.append(f"unknown tool hints: {', '.join(sorted(unknown))}")
dangerous = sorted({name for name in tool_hints if name.lower() in self._DANGEROUS_TOOL_HINTS})
if dangerous:
issues.append(f"dangerous tool hints require high-risk review: {', '.join(dangerous)}")
risk_level = _max_risk(risk_level, "high")
content = f"{draft.proposed_content}\n{frontmatter}".lower()
for pattern in self._CRITICAL_PATTERNS:
if re.search(pattern, content):
blocked.append(f"critical prompt-safety pattern matched: {pattern}")
risk_level = "critical"
if draft.proposal_kind in {"retire_skill", "merge_skills"}:
risk_level = _max_risk(risk_level, "high")
passed = not blocked and risk_level != "critical"
return SkillDraftSafetyReport(
report_id=uuid4().hex,
skill_name=draft.skill_name,
draft_id=draft.draft_id,
passed=passed,
risk_level=risk_level,
issues=issues,
blocked_reasons=blocked,
suggested_fix=_suggest_fix(blocked, issues),
created_at=_utc_now(),
)
def _is_allowed_tool_hint(self, name: str) -> bool:
if self.allowed_tool_names is not None and name in self.allowed_tool_names:
return True
return any(name.startswith(prefix) and len(name) > len(prefix) for prefix in self.allowed_tool_prefixes)
def _tool_hints(frontmatter: dict) -> list[str]:
raw = frontmatter.get("tools")
if isinstance(raw, list):
return [str(item).strip() for item in raw if str(item).strip()]
if isinstance(raw, str):
return [item.strip() for item in raw.split(",") if item.strip()]
return []
def _max_risk(left: str, right: str) -> str:
order = {"low": 0, "medium": 1, "high": 2, "critical": 3}
return left if order[left] >= order[right] else right
def _suggest_fix(blocked: list[str], issues: list[str]) -> str:
if blocked:
return "Remove blocked instructions or invalid tool hints before review."
if issues:
return "Review the flagged issues before publishing."
return ""
def _utc_now() -> str:
from datetime import datetime, timezone
return datetime.now(timezone.utc).isoformat()