"""Canonical Beaver skill authoring format.""" from __future__ import annotations import json import re from typing import Any from beaver.skills.catalog.utils import extract_required_tool_names CANONICAL_SKILL_SECTION_HEADINGS: tuple[str, ...] = ( "## Overview", "## When to Use", "## Required Tools", "## Workflow", "## Validation", "## Boundaries", "## Anti-Patterns", ) def canonical_skill_format_instructions() -> str: headings = "\n".join(f"- {heading}" for heading in CANONICAL_SKILL_SECTION_HEADINGS) return ( "Canonical Beaver SKILL.md format:\n" "1. Return a frontmatter object with `name`, `description`, and `tools`.\n" "2. `name` must be lowercase kebab-case. `description` must explain when the skill should be used.\n" "3. `tools` must be an explicit JSON array of exact runtime tool names. Use [] only if no tool is required.\n" "4. The Markdown content must start with one H1 title and include these H2 sections in this exact order:\n" f"{headings}\n" "5. Write concrete operational guidance, not a story about a past task.\n" "6. Include validation steps and anti-patterns so future runs know how to avoid false completion." ) def normalize_skill_frontmatter(frontmatter: dict[str, Any] | None, *, skill_name: str) -> dict[str, Any]: raw = dict(frontmatter or {}) name = _slug(str(raw.get("name") or skill_name)) description = str(raw.get("description") or f"Use when {name} guidance is needed.").strip() tools = _coerce_string_list(raw.get("tools")) normalized = {} for key, value in raw.items(): if key in {"name", "description", "tools"}: continue if key in {"always", "internal"} and isinstance(value, str): normalized[key] = value.strip().lower() in {"1", "true", "yes", "on"} continue normalized[key] = value return { "name": name, "description": description, "tools": tools, **normalized, } def is_canonical_skill_body(body: str) -> bool: text = body.strip() if not re.search(r"^#\s+\S", text, flags=re.MULTILINE): return False position = 0 for heading in CANONICAL_SKILL_SECTION_HEADINGS: found = text.find(heading, position) if found < 0: return False position = found + len(heading) return True def ensure_canonical_skill_body( body: str, *, title: str, description: str = "", tools: list[str] | None = None, ) -> str: if is_canonical_skill_body(body): normalized = body.strip() if tools: normalized = _replace_required_tools_section(normalized, tools) return normalized + "\n" source = _compact_source_guidance(body) overview = description or source or f"Use this skill for {title}." return canonicalize_skill_body( title=title, overview=overview, tools=list(tools or []), workflow=[ "Identify whether the user's request matches the skill's trigger conditions.", "Read the relevant source guidance below and apply only the steps that fit the current task.", "Use the required tools deliberately and keep tool output tied to the user's goal.", ], validation=[ "Verify the requested outcome with the most direct available check.", "Report any skipped step, unavailable dependency, or remaining uncertainty explicitly.", ], boundaries=[ "Do not broaden the task beyond the user's request.", "Do not use tools that are not listed or clearly available in the current runtime.", ], anti_patterns=[ "Do not summarize the skill instead of applying it.", "Do not claim completion without validation evidence.", ], source_guidance=source, ) def canonicalize_skill_body( *, title: str, overview: str, tools: list[str] | None = None, workflow: list[str] | None = None, validation: list[str] | None = None, boundaries: list[str] | None = None, anti_patterns: list[str] | None = None, when_to_use: list[str] | None = None, source_guidance: str = "", ) -> str: cleaned_title = _title(title) tool_lines = _tool_lines(tools or []) workflow_lines = _bullet_lines(workflow or ["Follow the workflow described by the current task and evidence."]) validation_lines = _bullet_lines(validation or ["Validate the result before reporting completion."]) boundary_lines = _bullet_lines(boundaries or ["Stay within the current task and workspace boundaries."]) anti_pattern_lines = _bullet_lines(anti_patterns or ["Do not skip validation."]) when_lines = _bullet_lines(when_to_use or [f"Use when the task requires {cleaned_title} guidance."]) source_section = f"\n\n### Source Guidance\n\n{source_guidance.strip()}" if source_guidance.strip() else "" return ( f"# {cleaned_title}\n\n" "## Overview\n\n" f"{overview.strip() or f'Use this skill for {cleaned_title}.'}\n\n" "## When to Use\n\n" f"{when_lines}\n\n" "## Required Tools\n\n" f"{tool_lines}\n\n" "## Workflow\n\n" f"{workflow_lines}{source_section}\n\n" "## Validation\n\n" f"{validation_lines}\n\n" "## Boundaries\n\n" f"{boundary_lines}\n\n" "## Anti-Patterns\n\n" f"{anti_pattern_lines}\n" ) def parse_skill_rewrite_json(content: str, *, skill_name: str) -> dict[str, Any] | None: cleaned = content.strip() if cleaned.startswith("```"): lines = cleaned.splitlines() if len(lines) >= 3 and lines[0].startswith("```") and lines[-1].startswith("```"): cleaned = "\n".join(lines[1:-1]).strip() try: payload = json.loads(cleaned) except json.JSONDecodeError: return None if not isinstance(payload, dict): return None frontmatter = payload.get("frontmatter") body = payload.get("content") if not isinstance(frontmatter, dict) or not isinstance(body, str): return None normalized = normalize_skill_frontmatter(frontmatter, skill_name=skill_name) normalized["tools"] = _merge_string_lists( normalized.get("tools"), extract_required_tool_names(body), ) normalized_body = ensure_canonical_skill_body( body, title=normalized["name"], description=normalized["description"], tools=normalized["tools"], ) return { "frontmatter": normalized, "content": normalized_body, "change_reason": str(payload.get("change_reason") or ""), } def _compact_source_guidance(body: str, *, max_chars: int = 20000) -> str: text = body.strip() if not text: return "" text = re.sub(r"^---\n.*?\n---\n?", "", text, flags=re.DOTALL).strip() text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r"^(#{1,4})\s+", r"##\1 ", text, flags=re.MULTILINE) return text[:max_chars].rstrip() def _tool_lines(tools: list[str]) -> str: if not tools: return "- No dedicated tools are required." return "\n".join(f"- `{tool}`" for tool in tools) def _bullet_lines(items: list[str]) -> str: cleaned = [str(item).strip() for item in items if str(item).strip()] if not cleaned: return "- No additional guidance." return "\n".join(f"- {item}" for item in cleaned) def _coerce_string_list(value: Any) -> list[str]: if isinstance(value, list): raw_items = value elif isinstance(value, str): raw_items = value.split(",") else: raw_items = [] result: list[str] = [] for item in raw_items: cleaned = str(item).strip() if cleaned and cleaned not in result: result.append(cleaned) return result def _merge_string_lists(*values: Any) -> list[str]: result: list[str] = [] for value in values: for item in _coerce_string_list(value): if item not in result: result.append(item) return result def _replace_required_tools_section(body: str, tools: list[str]) -> str: replacement = "## Required Tools\n\n" + _tool_lines(tools) updated, count = re.subn( r"(?ms)^##\s+Required\s+Tools\s*\n.*?(?=^##\s+|\Z)", replacement + "\n\n", body.strip(), count=1, ) return updated.strip() if count else body.strip() def _slug(value: str) -> str: text = value.strip().lower() text = re.sub(r"[^a-z0-9-]+", "-", text) text = re.sub(r"-{2,}", "-", text).strip("-") return text or "generated-skill" def _title(value: str) -> str: cleaned = str(value or "").strip().replace("-", " ") return cleaned.title() if cleaned else "Generated Skill"