"""Safety checks for generated skill drafts.""" from __future__ import annotations import re from uuid import uuid4 from beaver.memory.skills import SkillDraftSafetyReport from beaver.skills.specs import SkillDraft class SkillDraftSafetyChecker: """Deterministic publish gate for skill drafts.""" _CRITICAL_PATTERNS = [ r"ignore (all )?(previous|system|developer) instructions", r"bypass (permissions|permission|safety|policy|policies)", r"disable (guardrails|safety|permission checks)", r"reveal (secrets|api keys|tokens|credentials)", r"exfiltrate", ] _DANGEROUS_TOOL_HINTS = { "shell", "terminal", "bash", "filesystem_write", "write_file", "delete_file", "network", "http", "auth", "credentials", } def __init__(self, *, allowed_tool_names: set[str] | None = None) -> None: self.allowed_tool_names = allowed_tool_names def check(self, draft: SkillDraft) -> SkillDraftSafetyReport: issues: list[str] = [] blocked: list[str] = [] risk_level = "low" frontmatter = draft.proposed_frontmatter if not isinstance(frontmatter, dict): blocked.append("frontmatter must be an object") description = str(frontmatter.get("description") or "").strip() if not description and draft.proposal_kind != "retire_skill": issues.append("frontmatter.description is missing") risk_level = _max_risk(risk_level, "medium") tool_hints = _tool_hints(frontmatter) if self.allowed_tool_names is not None: unknown = [name for name in tool_hints if name not in self.allowed_tool_names] if unknown: blocked.append(f"unknown tool hints: {', '.join(sorted(unknown))}") dangerous = sorted({name for name in tool_hints if name.lower() in self._DANGEROUS_TOOL_HINTS}) if dangerous: issues.append(f"dangerous tool hints require high-risk review: {', '.join(dangerous)}") risk_level = _max_risk(risk_level, "high") content = f"{draft.proposed_content}\n{frontmatter}".lower() for pattern in self._CRITICAL_PATTERNS: if re.search(pattern, content): blocked.append(f"critical prompt-safety pattern matched: {pattern}") risk_level = "critical" if draft.proposal_kind in {"retire_skill", "merge_skills"}: risk_level = _max_risk(risk_level, "high") passed = not blocked and risk_level != "critical" return SkillDraftSafetyReport( report_id=uuid4().hex, skill_name=draft.skill_name, draft_id=draft.draft_id, passed=passed, risk_level=risk_level, issues=issues, blocked_reasons=blocked, suggested_fix=_suggest_fix(blocked, issues), created_at=_utc_now(), ) def _tool_hints(frontmatter: dict) -> list[str]: raw = frontmatter.get("tools") if isinstance(raw, list): return [str(item).strip() for item in raw if str(item).strip()] if isinstance(raw, str): return [item.strip() for item in raw.split(",") if item.strip()] return [] def _max_risk(left: str, right: str) -> str: order = {"low": 0, "medium": 1, "high": 2, "critical": 3} return left if order[left] >= order[right] else right def _suggest_fix(blocked: list[str], issues: list[str]) -> str: if blocked: return "Remove blocked instructions or invalid tool hints before review." if issues: return "Review the flagged issues before publishing." return "" def _utc_now() -> str: from datetime import datetime, timezone return datetime.now(timezone.utc).isoformat()