"""LLM-driven skill assembler. 这层现在不再自己做规则打分,而是分两步把: 1. task description 2. embedding 召回后的候选 skill 摘要 3. 粗选候选的完整 skill 正文 交给一个模型来决定本轮要激活哪些 skill。 当前目标非常克制: - 主 agent 不拿 skill_view,也不动态探索技能库 - SkillAssembler 可以在系统侧内部读取候选 skill 正文 - 输出只要 skill 名称 - 没有命中就返回空 skills """ from __future__ import annotations import asyncio from dataclasses import dataclass, field import json from typing import Any from beaver.engine.context import SkillContext from beaver.engine.providers.base import LLMProvider from beaver.engine.providers.runtime import ProviderRuntime from beaver.skills.catalog.loader import SkillsLoader from beaver.skills.catalog.utils import strip_frontmatter from .embedding_retriever import SkillEmbeddingRetriever @dataclass(slots=True) class SkillAssemblyResult: """一次装配后真正要注入当前 run 的 skills。""" activated_skills: list[SkillContext] = field(default_factory=list) llm_interactions: list[dict[str, Any]] = field(default_factory=list) class SkillAssembler: """用 LLM 根据 task description 选择当前 run 的 skills。""" def __init__( self, loader: SkillsLoader, retriever: SkillEmbeddingRetriever | None = None, *, max_detailed_candidates: int = 5, max_candidate_content_chars: int = 6000, ) -> None: self.loader = loader self.retriever = retriever or SkillEmbeddingRetriever() self.max_detailed_candidates = max(1, max_detailed_candidates) self.max_candidate_content_chars = max(1000, max_candidate_content_chars) async def assemble( self, *, task_description: str, provider: LLMProvider, model: str, embedding_runtime: ProviderRuntime | None = None, thinking_enabled: bool | None = None, top_k: int = 12, ) -> SkillAssemblyResult: candidates = self.loader.build_selection_candidates() if not candidates: return SkillAssemblyResult() candidates = await self.retriever.retrieve( query=task_description, candidates=candidates, top_k=top_k, api_key=embedding_runtime.api_key if embedding_runtime is not None else None, api_base=embedding_runtime.api_base if embedding_runtime is not None else None, model=embedding_runtime.model if embedding_runtime is not None else None, extra_headers=embedding_runtime.extra_headers if embedding_runtime is not None else None, timeout_seconds=( embedding_runtime.request_timeout_seconds if embedding_runtime is not None else None ), fallback_top_k=None, ) if not candidates: return SkillAssemblyResult() llm_interactions: list[dict[str, Any]] = [] if len(candidates) <= self.max_detailed_candidates: shortlisted_names = [item["name"] for item in candidates] else: shortlisted_names = await self._select_skill_names( task_description=task_description, candidates=candidates, provider=provider, model=model, thinking_enabled=thinking_enabled, max_selected=self.max_detailed_candidates, selection_stage="shortlist", llm_interactions=llm_interactions, ) if not shortlisted_names: return SkillAssemblyResult(llm_interactions=llm_interactions) detailed_candidates = self._build_detailed_candidates( candidates=candidates, selected_names=shortlisted_names, ) selected_names = await self._select_skill_names( task_description=task_description, candidates=detailed_candidates, provider=provider, model=model, thinking_enabled=thinking_enabled, selection_stage="final", llm_interactions=llm_interactions, ) if not selected_names: return SkillAssemblyResult(llm_interactions=llm_interactions) activated_skills: list[SkillContext] = [] for name in selected_names: record = self.loader.get_skill_record(name) raw_content = self.loader.load_published_skill(name) content = strip_frontmatter(raw_content).strip() if raw_content else "" if not content: continue activated_skills.append( SkillContext( name=name, content=content, version=record.version if record is not None else "legacy", content_hash=record.content_hash or "" if record is not None else "", activation_reason="llm_selected", tool_hints=list(record.tool_hints) if record is not None else [], ) ) return SkillAssemblyResult(activated_skills=activated_skills, llm_interactions=llm_interactions) async def _select_skill_names( self, *, task_description: str, candidates: list[dict[str, str]], provider: LLMProvider, model: str, thinking_enabled: bool | None = None, max_selected: int | None = None, selection_stage: str = "final", llm_interactions: list[dict[str, Any]] | None = None, timeout_seconds: float = 8.0, ) -> list[str]: candidate_summary = self._render_candidates(candidates) candidate_names = {item["name"] for item in candidates} selection_instruction = ( f"Return at most {max_selected} names for detailed inspection. " if max_selected is not None else "Return the final skill names to activate. " ) messages = [ { "role": "system", "content": ( "You select Beaver skills for a single run. " "Given a task description and candidate skill information, " "return only a JSON array of skill names to activate. " "Do not invent names. If nothing matches, return []. " f"Selection stage: {selection_stage}. {selection_instruction}" ), }, { "role": "user", "content": ( f"Task description:\n{task_description}\n\n" f"Candidate skills:\n{candidate_summary}\n\n" "Return only JSON, for example: [\"skill-a\", \"skill-b\"]" ), }, ] chat_kwargs: dict[str, Any] = { "messages": messages, "tools": None, "model": model, "max_tokens": 256, "temperature": 0, } if thinking_enabled is not None: chat_kwargs["thinking_enabled"] = thinking_enabled try: response = await asyncio.wait_for(provider.chat(**chat_kwargs), timeout=timeout_seconds) except Exception: return [] if llm_interactions is not None: llm_interactions.append( { "stage": selection_stage, "model": model, "messages": messages, "response": { "content": response.content, "finish_reason": response.finish_reason, "provider_name": response.provider_name, "model": response.model, "usage": response.usage, }, } ) if response.finish_reason == "error" or not response.content: return [] parsed = self._parse_selected_names(response.content) if not parsed: return [] # 只保留当前候选集中真实存在的 skill 名称,并维持模型输出顺序。 filtered: list[str] = [] for name in parsed: if name in candidate_names and name not in filtered: filtered.append(name) return filtered[:max_selected] if max_selected is not None else filtered @staticmethod def _render_candidates(candidates: list[dict[str, str]]) -> str: lines: list[str] = [] for item in candidates: content = item.get("content") if content: lines.append( f"## {item['name']}\n" f"Description: {item['description']}\n" f"Skill content:\n{content}" ) else: lines.append(f"- {item['name']}: {item['description']}") return "\n".join(lines) def _build_detailed_candidates( self, *, candidates: list[dict[str, str]], selected_names: list[str], ) -> list[dict[str, str]]: by_name = {item["name"]: item for item in candidates} detailed: list[dict[str, str]] = [] for name in selected_names: candidate = by_name.get(name) if candidate is None: continue raw_content = self.loader.load_published_skill(name) content = strip_frontmatter(raw_content).strip() if raw_content else "" if len(content) > self.max_candidate_content_chars: content = content[: self.max_candidate_content_chars].rstrip() + "\n...[truncated]" detailed.append({**candidate, "content": content}) return detailed @staticmethod def _parse_selected_names(content: str) -> list[str]: cleaned = content.strip() if cleaned.startswith("```"): lines = cleaned.splitlines() if len(lines) >= 3 and lines[0].startswith("```") and lines[-1].startswith("```"): cleaned = "\n".join(lines[1:-1]).strip() try: payload: Any = json.loads(cleaned) except json.JSONDecodeError: return [] if isinstance(payload, dict): for key in ("skills", "selected_skills", "activated_skills", "selected"): value = payload.get(key) if isinstance(value, list): payload = value break if not isinstance(payload, list): return [] return [item.strip() for item in payload if isinstance(item, str) and item.strip()]