Files
steven_li 30ab74ffb2 feat(engine): 添加MCP连接管理和工具集成功能
- 集成MCP连接管理器,支持MCP服务器连接
- 添加多种内置工具:ClarifyTool、CronTool、DelegateTool、ExecuteCodeTool、
  PatchFileTool、ProcessTool、SendMessageTool、SpawnTool、TerminalTool、
  TodoTool、WebFetchTool、WebSearchTool、WriteFileTool等
- 实现工具注册和装配功能
- 添加技能选择上下文参数
- 支持思考模式控制参数thinking_enabled

feat(coordinator): 重构任务执行计划器参数命名

- 将learning_candidate_enabled重命名为allow_candidate_generation
- 更新TeamGraphScheduler中的参数传递
- 修改LocalAgentRunner中的相关参数处理
- 更新README文档中的相应描述

refactor(context): 标准化工具调用参数格式

- 添加_json导入用于参数序列化
- 实现_provider_tool_calls方法标准化OpenAI兼容的工具调用载荷
- 修复工具调用中参数非字符串类型的序列化问题

refactor(session): 优化消息历史记录过滤逻辑

- 修改get_messages_as_conversation为基于运行状态过滤消息
- 排除未完成、失败或错误结束的运行记录
- 改进对话历史的可见性控制机制

fix(store): 修复FTS索引重建逻辑

- 添加异常处理防止FTS索引创建失败
- 实现_rebuild_fts_index方法重新构建全文搜索索引
- 优化索引触发器和表的维护流程
2026-05-14 09:43:48 +08:00

145 lines
5.6 KiB
Python

"""Synthesize ephemeral guidance for missing sub-agent skills."""
from __future__ import annotations
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from uuid import uuid4
from beaver.engine.context import SkillContext
from beaver.engine.providers import ProviderBundle
from beaver.skills.specs.serialization import canonical_hash
if TYPE_CHECKING:
from beaver.tasks.models import TaskRecord
@dataclass(slots=True)
class EphemeralGuidanceResult:
guidance_id: str
guidance_name: str
skill_context: SkillContext
class EphemeralGuidanceSynthesizer:
"""Create one-run guidance for the current delegated sub-agent."""
async def synthesize(
self,
*,
task: TaskRecord,
user_message: str,
attempt_index: int,
node_id: str,
node_task: str,
skill_query: str,
required_capabilities: list[str],
provider_bundle: ProviderBundle,
) -> EphemeralGuidanceResult:
provider = provider_bundle.auxiliary_provider or provider_bundle.main_provider
runtime = provider_bundle.auxiliary_runtime or provider_bundle.main_runtime
model = getattr(runtime, "model", None)
payload = self._fallback_payload(skill_query=skill_query, node_task=node_task, capabilities=required_capabilities)
try:
response = await provider.chat(
messages=[
{
"role": "system",
"content": (
"You create concise Beaver ephemeral guidance. Return only JSON with keys: "
"guidance_name, description, content, tags."
),
},
{
"role": "user",
"content": (
"Create procedural guidance for this missing Task sub-agent capability.\n\n"
f"Task goal:\n{task.goal}\n\n"
f"Current user request:\n{user_message}\n\n"
f"Node id: {node_id}\n"
f"Node task:\n{node_task}\n\n"
f"Skill query:\n{skill_query}\n"
f"Required capabilities: {required_capabilities}\n\n"
"The content must be actionable guidance for a temporary sub-agent. "
"Do not include implementation claims, review metadata, or publish metadata."
),
},
],
tools=None,
model=model,
max_tokens=4096,
temperature=0,
)
payload = self._parse_payload(response.content or "") or payload
except Exception:
payload = payload
guidance_name = _slug(str(payload.get("guidance_name") or payload.get("skill_name") or skill_query or node_id))
guidance_id = f"eg_{uuid4().hex}"
content = str(payload.get("content") or "").strip()
if not content:
content = str(self._fallback_payload(skill_query=skill_query, node_task=node_task, capabilities=required_capabilities)["content"])
context = SkillContext(
name=f"ephemeral:{guidance_name}",
content=content,
version=f"ephemeral:{guidance_id}",
content_hash=canonical_hash(content),
activation_reason="ephemeral_guidance",
tool_hints=[],
)
return EphemeralGuidanceResult(
guidance_id=guidance_id,
guidance_name=guidance_name,
skill_context=context,
)
@staticmethod
def _parse_payload(text: str) -> dict[str, Any] | None:
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.splitlines()
if len(lines) >= 3 and lines[0].startswith("```") and lines[-1].startswith("```"):
cleaned = "\n".join(lines[1:-1]).strip()
if cleaned.lower().startswith("json"):
cleaned = cleaned[4:].strip()
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end >= start:
cleaned = cleaned[start : end + 1]
try:
payload = json.loads(cleaned)
except json.JSONDecodeError:
return None
return payload if isinstance(payload, dict) else None
@staticmethod
def _fallback_payload(*, skill_query: str, node_task: str, capabilities: list[str]) -> dict[str, Any]:
title = skill_query or node_task or "task subagent guidance"
capability_lines = "\n".join(f"- {item}" for item in capabilities) or "- Follow the node task precisely."
return {
"guidance_name": _slug(title),
"description": f"Draft guidance for {title}.",
"tags": ["generated", "task-sub-agent"],
"content": (
f"# {title}\n\n"
"Use this draft guidance only for the current delegated sub-task.\n\n"
"## Objective\n"
f"{node_task or title}\n\n"
"## Capabilities to apply\n"
f"{capability_lines}\n\n"
"## Output\n"
"Return concise evidence, decisions, and unresolved risks for the main Agent to synthesize."
),
}
def _slug(value: str) -> str:
cleaned = re.sub(r"[^a-zA-Z0-9]+", "-", value.strip().lower()).strip("-")
return cleaned[:64].strip("-") or "generated-task-subagent-guidance"
MissingSkillDraftResult = EphemeralGuidanceResult
MissingSkillSynthesizer = EphemeralGuidanceSynthesizer