- 新增 spawn_subagent 和 spawn_agent_team 工具,替代原有的 spawn 工具 - 重构 DelegationManager 以支持单个子代理和代理团队两种委派模式 - 更新系统提示词中的委派策略说明,明确使用场景和区别 - 添加技能上下文传递功能,确保委派任务遵循指定技能 - 实现代理内部的受控下游委派机制,防止无限嵌套 - 更新工具注册和上下文设置逻辑以适配新架构
1222 lines
47 KiB
Python
1222 lines
47 KiB
Python
"""统一委派管理器。
|
||
|
||
这是本次多 agent 改造的核心编排层,负责:
|
||
1. 根据接口语义选择单 subagent 或 agent team 路径;
|
||
2. 跟踪每次后台委派的运行状态,支持取消;
|
||
3. 统一发出 bus 公告和结构化 process events;
|
||
4. 在本地执行器和 A2A 客户端之间做协议桥接。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import re
|
||
import uuid
|
||
from collections.abc import Awaitable, Callable
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import TYPE_CHECKING, Any
|
||
|
||
from loguru import logger
|
||
|
||
from nanobot.a2a.client import A2AClient, A2AStreamEvent
|
||
from nanobot.agent.agent_registry import AgentDescriptor, AgentRegistry
|
||
from nanobot.agent.process_events import (
|
||
emit_process_event,
|
||
has_process_event_sink,
|
||
new_run_id,
|
||
process_run_context,
|
||
)
|
||
from nanobot.agent.run_result import AgentRunResult
|
||
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||
from nanobot.bus.queue import MessageBus
|
||
from nanobot.providers.base import LLMProvider
|
||
|
||
if TYPE_CHECKING:
|
||
from nanobot.agent.skills import SkillsLoader
|
||
|
||
DirectAnnouncementCallback = Callable[[str, dict[str, str], str, bool], Awaitable[None]]
|
||
|
||
|
||
@dataclass
|
||
class DelegationRun:
|
||
"""记录一次正在运行的委派任务及其远端子任务状态。"""
|
||
|
||
# 后台 asyncio 任务句柄,用于取消和生命周期管理。
|
||
task: asyncio.Task[None]
|
||
# 面向日志/UI 的短标签。
|
||
label: str
|
||
# 原会话路由,委派完成后需要把结果送回这里。
|
||
origin: dict[str, str]
|
||
# 是否通过 bus 回注 system 消息;直连模式下通常为 False。
|
||
announce_via_bus: bool = True
|
||
# 远端 agent 描述和 task_id 映射,用于取消 A2A 子任务。
|
||
remote_agents: dict[str, AgentDescriptor] = field(default_factory=dict)
|
||
remote_task_ids: dict[str, str] = field(default_factory=dict)
|
||
|
||
|
||
class DelegationManager:
|
||
"""把任务分发到单个 subagent 或 agent team。"""
|
||
|
||
def __init__(
|
||
self,
|
||
provider: LLMProvider,
|
||
workspace: Path,
|
||
bus: MessageBus,
|
||
registry: AgentRegistry,
|
||
skills_loader: "SkillsLoader | None",
|
||
local_executor: Any,
|
||
timeout_seconds: int = 30,
|
||
poll_interval_seconds: int = 2,
|
||
card_cache_ttl_seconds: int = 300,
|
||
max_parallel_agents: int = 4,
|
||
allowed_hosts: list[str] | None = None,
|
||
authz_config: Any | None = None,
|
||
backend_identity: Any | None = None,
|
||
allow_local_delegation: bool = True,
|
||
allow_plugin_delegation: bool = True,
|
||
allow_local_fallback: bool = True,
|
||
):
|
||
self.provider = provider
|
||
self.workspace = workspace
|
||
self.bus = bus
|
||
self.registry = registry
|
||
self.skills_loader = skills_loader
|
||
# local_executor 只负责“本地执行”,不再承担队列编排职责。
|
||
self.local_executor = local_executor
|
||
self.max_parallel_agents = max(1, max_parallel_agents)
|
||
self.allow_local_delegation = allow_local_delegation
|
||
self.allow_plugin_delegation = allow_plugin_delegation
|
||
self.allow_local_fallback = allow_local_fallback
|
||
# A2AClient 只处理远端协议细节,委派策略和公告统一放在本类。
|
||
self.a2a_client = A2AClient(
|
||
timeout_seconds=timeout_seconds,
|
||
poll_interval_seconds=poll_interval_seconds,
|
||
card_cache_ttl_seconds=card_cache_ttl_seconds,
|
||
allowed_hosts=allowed_hosts,
|
||
authz_config=authz_config,
|
||
backend_identity=backend_identity,
|
||
)
|
||
self._running_tasks: dict[str, DelegationRun] = {}
|
||
self._direct_announcement_callback: DirectAnnouncementCallback | None = None
|
||
|
||
_PERSISTENT_SUBAGENT_PATTERNS = (
|
||
re.compile(r"\bsub[\s-]?agent\b", re.IGNORECASE),
|
||
re.compile(r"\bpersistent\b", re.IGNORECASE),
|
||
re.compile(r"\bagents\.json\b", re.IGNORECASE),
|
||
re.compile(r"\bregistry\.json\b", re.IGNORECASE),
|
||
re.compile(r"\bsubagentctl\b", re.IGNORECASE),
|
||
re.compile(r"/api/subagents", re.IGNORECASE),
|
||
re.compile(r"workspace/agents", re.IGNORECASE),
|
||
re.compile(r"子智能体"),
|
||
re.compile(r"子 agent", re.IGNORECASE),
|
||
re.compile(r"持久化"),
|
||
)
|
||
|
||
def set_direct_announcement_callback(
|
||
self,
|
||
callback: DirectAnnouncementCallback | None,
|
||
) -> None:
|
||
"""注册直连模式下的本地公告处理器。"""
|
||
self._direct_announcement_callback = callback
|
||
|
||
async def delegate_for_subagent(
|
||
self,
|
||
task: str,
|
||
label: str | None = None,
|
||
target: str | None = None,
|
||
strategy: str = "auto",
|
||
skills: list[str] | None = None,
|
||
) -> str:
|
||
"""供 delegated worker 使用的同步下游委派入口。"""
|
||
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
|
||
try:
|
||
descriptor = self._resolve_nested_delegate(task, target, strategy)
|
||
result = await self._execute_descriptor(
|
||
descriptor,
|
||
task,
|
||
display_label,
|
||
skill_names=skills,
|
||
allow_nested_delegation=False,
|
||
)
|
||
except Exception as exc:
|
||
return f"Error: Nested delegation failed: {exc}"
|
||
|
||
status_text = "completed successfully" if result.status == "ok" else result.status
|
||
return (
|
||
f"Nested delegation via {result.agent_name} ({result.agent_id}) {status_text}.\n\n"
|
||
f"Result:\n{result.summary}"
|
||
)
|
||
|
||
def build_nested_agents_summary(self) -> str:
|
||
"""构造 delegated worker 可见的下游 agent 摘要。"""
|
||
agents = [
|
||
agent
|
||
for agent in self.registry.list_agents()
|
||
if self._nested_descriptor_allowed(agent)
|
||
]
|
||
if not agents:
|
||
return ""
|
||
|
||
def esc(value: str) -> str:
|
||
return (
|
||
value.replace("&", "&")
|
||
.replace("<", "<")
|
||
.replace(">", ">")
|
||
)
|
||
|
||
lines = ["<downstream-agents>"]
|
||
for agent in agents:
|
||
lines.append(" <agent>")
|
||
lines.append(f" <id>{esc(agent.id)}</id>")
|
||
lines.append(f" <name>{esc(agent.name)}</name>")
|
||
lines.append(f" <kind>{esc(agent.kind)}</kind>")
|
||
lines.append(f" <source>{esc(agent.source)}</source>")
|
||
lines.append(f" <description>{esc(agent.description)}</description>")
|
||
if agent.protocol:
|
||
lines.append(f" <protocol>{esc(agent.protocol)}</protocol>")
|
||
lines.append(" </agent>")
|
||
lines.append("</downstream-agents>")
|
||
return "\n".join(lines)
|
||
|
||
async def dispatch_subagent(
|
||
self,
|
||
task: str,
|
||
label: str | None = None,
|
||
skills: list[str] | None = None,
|
||
origin_channel: str = "cli",
|
||
origin_chat_id: str = "direct",
|
||
announce_via_bus: bool = True,
|
||
) -> str:
|
||
"""启动一个后台 subagent 委派任务,并立即返回已启动提示。"""
|
||
return await self._dispatch(
|
||
task=task,
|
||
label=label,
|
||
target=None,
|
||
targets=[],
|
||
strategy="auto",
|
||
skills=skills or [],
|
||
origin_channel=origin_channel,
|
||
origin_chat_id=origin_chat_id,
|
||
announce_via_bus=announce_via_bus,
|
||
mode="subagent",
|
||
)
|
||
|
||
async def dispatch_agent_team(
|
||
self,
|
||
task: str,
|
||
label: str | None = None,
|
||
skills: list[str] | None = None,
|
||
origin_channel: str = "cli",
|
||
origin_chat_id: str = "direct",
|
||
announce_via_bus: bool = True,
|
||
) -> str:
|
||
"""启动一个后台 agent team 任务,并立即返回已启动提示。"""
|
||
return await self._dispatch(
|
||
task=task,
|
||
label=label,
|
||
target=None,
|
||
targets=[],
|
||
strategy="group",
|
||
skills=skills or [],
|
||
origin_channel=origin_channel,
|
||
origin_chat_id=origin_chat_id,
|
||
announce_via_bus=announce_via_bus,
|
||
mode="agent_team",
|
||
)
|
||
|
||
async def _dispatch(
|
||
self,
|
||
task: str,
|
||
label: str | None,
|
||
target: str | None,
|
||
targets: list[str],
|
||
strategy: str,
|
||
skills: list[str],
|
||
origin_channel: str,
|
||
origin_chat_id: str,
|
||
announce_via_bus: bool,
|
||
mode: str,
|
||
) -> str:
|
||
"""启动一个后台委派任务,并立即返回已启动提示。"""
|
||
run_id = str(uuid.uuid4())[:8]
|
||
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
|
||
origin = {"channel": origin_channel, "chat_id": origin_chat_id}
|
||
kind_label = "Agent team" if mode == "agent_team" else "Subagent"
|
||
bg_task = asyncio.create_task(
|
||
self._run_dispatch(
|
||
run_id=run_id,
|
||
task=task,
|
||
label=display_label,
|
||
target=target,
|
||
targets=targets,
|
||
strategy=strategy,
|
||
skills=skills,
|
||
origin=origin,
|
||
mode=mode,
|
||
)
|
||
)
|
||
self._running_tasks[run_id] = DelegationRun(
|
||
task=bg_task,
|
||
label=display_label,
|
||
origin=origin,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
bg_task.add_done_callback(lambda _: self._running_tasks.pop(run_id, None))
|
||
logger.info("{} [{}] started: {}", kind_label, run_id, display_label)
|
||
return (
|
||
f"{kind_label} [{display_label}] started (id: {run_id}). "
|
||
"I'll notify you when it completes."
|
||
)
|
||
|
||
def get_running_count(self) -> int:
|
||
"""返回当前正在执行的委派数量。"""
|
||
return len(self._running_tasks)
|
||
|
||
@staticmethod
|
||
def _ui_status(status: str | None) -> str:
|
||
"""把底层状态归一化成前端更稳定的显示状态。"""
|
||
probe = (status or "").strip().lower()
|
||
if probe in {"", "ok", "done", "completed", "complete", "success"}:
|
||
return "done"
|
||
if probe in {"working", "running", "queued", "submitted", "waiting", "in_progress"}:
|
||
return "running" if probe != "waiting" else "waiting"
|
||
if probe in {"cancelled", "canceled"}:
|
||
return "cancelled"
|
||
if probe in {"failed", "error"}:
|
||
return "error"
|
||
return probe or "running"
|
||
|
||
async def _emit_agent_started(
|
||
self,
|
||
run_id: str,
|
||
descriptor: AgentDescriptor,
|
||
label: str,
|
||
*,
|
||
parent_run_id: str | None = None,
|
||
task: str | None = None,
|
||
) -> None:
|
||
# 单 agent 执行开始事件,供前端画执行树。
|
||
await emit_process_event(
|
||
"process_run_started",
|
||
run_id=run_id,
|
||
parent_run_id=parent_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
source=descriptor.source,
|
||
title=label,
|
||
status="running",
|
||
metadata={
|
||
"kind": descriptor.kind,
|
||
"protocol": descriptor.protocol,
|
||
"support_group": descriptor.support_group,
|
||
"support_streaming": descriptor.support_streaming,
|
||
"delegated_task": task,
|
||
},
|
||
)
|
||
if task:
|
||
await emit_process_event(
|
||
"process_run_message",
|
||
run_id=run_id,
|
||
parent_run_id=parent_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
message_role="user",
|
||
text=task,
|
||
metadata={"source": "delegation_input"},
|
||
)
|
||
|
||
async def _emit_agent_finished(
|
||
self,
|
||
run_id: str,
|
||
descriptor: AgentDescriptor,
|
||
result: AgentRunResult,
|
||
) -> None:
|
||
# 单 agent 结束事件只保留归一化状态和摘要,原始状态放 metadata 里。
|
||
await emit_process_event(
|
||
"process_run_finished",
|
||
run_id=run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
status=self._ui_status(result.status),
|
||
summary=result.summary,
|
||
metadata={"raw_status": result.status},
|
||
)
|
||
|
||
async def _emit_agent_cancelled(
|
||
self,
|
||
run_id: str,
|
||
descriptor: AgentDescriptor | None,
|
||
label: str,
|
||
) -> None:
|
||
# 取消事件允许 descriptor 为空,用于还没解析出具体目标就被取消的情况。
|
||
await emit_process_event(
|
||
"process_run_cancelled",
|
||
run_id=run_id,
|
||
actor_type="agent" if descriptor is not None else "system",
|
||
actor_id=descriptor.id if descriptor is not None else "delegation",
|
||
actor_name=descriptor.name if descriptor is not None else label,
|
||
status="cancelled",
|
||
)
|
||
|
||
async def _emit_group_started(self, run_id: str, label: str, targets: list[str]) -> None:
|
||
"""发送 agent team 开始事件。"""
|
||
await emit_process_event(
|
||
"process_run_started",
|
||
run_id=run_id,
|
||
parent_run_id=None,
|
||
actor_type="system",
|
||
actor_id="agent-group",
|
||
actor_name="Agent Team",
|
||
title=label,
|
||
status="running",
|
||
metadata={"targets": targets},
|
||
)
|
||
|
||
async def _emit_group_finished(self, run_id: str, label: str, results: list[AgentRunResult]) -> None:
|
||
"""发送 agent team 结束事件。"""
|
||
await emit_process_event(
|
||
"process_run_finished",
|
||
run_id=run_id,
|
||
actor_type="system",
|
||
actor_id="agent-group",
|
||
actor_name="Agent Team",
|
||
status="done",
|
||
summary=f"{label}: {len(results)} member(s) finished",
|
||
metadata={
|
||
"members": [
|
||
{
|
||
"agent_id": item.agent_id,
|
||
"agent_name": item.agent_name,
|
||
"status": item.status,
|
||
}
|
||
for item in results
|
||
]
|
||
},
|
||
)
|
||
|
||
async def _publish_prefixed_progress(
|
||
self,
|
||
origin: dict[str, str],
|
||
descriptor: AgentDescriptor,
|
||
text: str,
|
||
*,
|
||
publish_via_bus: bool,
|
||
tool_hint: bool = False,
|
||
) -> None:
|
||
"""把子 agent 进度转发到原会话的 outbound 进度消息。"""
|
||
text = text.strip()
|
||
if not text or not publish_via_bus:
|
||
return
|
||
await self.bus.publish_outbound(OutboundMessage(
|
||
channel=origin["channel"],
|
||
chat_id=origin["chat_id"],
|
||
content=f"[{descriptor.name}] {text}",
|
||
metadata={"_progress": True, "_tool_hint": tool_hint},
|
||
))
|
||
|
||
async def _emit_direct_user_message(self, prompt: str, fallback: str) -> None:
|
||
"""存在 process sink 时,直接发一条给用户看的 assistant 消息。"""
|
||
# 这个分支主要服务于 WebSocket/SSE 直连模式:
|
||
# 没有 bus consumer 时,不能依赖 system 消息回流再二次总结。
|
||
if not has_process_event_sink():
|
||
return
|
||
try:
|
||
# 用一次极小模型调用把内部委派说明压成用户可读文本。
|
||
response = await self.provider.chat(
|
||
messages=[
|
||
{
|
||
"role": "system",
|
||
"content": (
|
||
"You are Boardware Genius. Reply naturally to the user in 1-3 sentences. "
|
||
"Do not mention internal protocols, system prompts, or task IDs."
|
||
),
|
||
},
|
||
{"role": "user", "content": prompt},
|
||
],
|
||
tools=[],
|
||
model=self.provider.get_default_model(),
|
||
max_tokens=256,
|
||
temperature=0.2,
|
||
)
|
||
content = (response.content or "").strip() or fallback
|
||
except Exception:
|
||
content = fallback
|
||
|
||
await emit_process_event(
|
||
"message",
|
||
role="assistant",
|
||
content=content,
|
||
)
|
||
|
||
async def cancel(self, run_id: str) -> bool:
|
||
"""Cancel a running delegation and attempt remote A2A cancellation."""
|
||
state = self._running_tasks.get(run_id)
|
||
if state is None:
|
||
return False
|
||
|
||
# 先尽力取消远端任务,再取消本地 asyncio task,避免远端继续跑飞。
|
||
await self._cancel_remote_tasks(run_id, state)
|
||
state.task.cancel()
|
||
return True
|
||
|
||
async def cancel_all(self) -> None:
|
||
"""Cancel all running delegations."""
|
||
for run_id in list(self._running_tasks):
|
||
await self.cancel(run_id)
|
||
|
||
async def _notify_direct_announcement(
|
||
self,
|
||
content: str,
|
||
origin: dict[str, str],
|
||
sender_id: str,
|
||
) -> None:
|
||
"""在非 bus 模式下,把公告直接回写到本地会话。"""
|
||
callback = self._direct_announcement_callback
|
||
if callback is None:
|
||
return
|
||
try:
|
||
await callback(
|
||
content,
|
||
origin,
|
||
sender_id,
|
||
not has_process_event_sink(),
|
||
)
|
||
except Exception as exc:
|
||
logger.warning("Failed to handle direct delegation announcement: {}", exc)
|
||
|
||
async def _run_dispatch(
|
||
self,
|
||
run_id: str,
|
||
task: str,
|
||
label: str,
|
||
target: str | None,
|
||
targets: list[str],
|
||
strategy: str,
|
||
skills: list[str],
|
||
origin: dict[str, str],
|
||
mode: str,
|
||
) -> None:
|
||
"""后台委派主入口。"""
|
||
descriptor: AgentDescriptor | None = None
|
||
state = self._running_tasks.get(run_id)
|
||
# 某些极短生命周期场景下 state 可能已被移除,此时回落到默认 True。
|
||
announce_via_bus = True if state is None else state.announce_via_bus
|
||
is_group = mode == "agent_team"
|
||
try:
|
||
if is_group:
|
||
planned_targets = list(targets)
|
||
await self._emit_group_started(run_id, label, planned_targets)
|
||
results = await self._run_group(
|
||
task,
|
||
label,
|
||
None,
|
||
targets,
|
||
strategy,
|
||
skills,
|
||
origin=origin,
|
||
run_id=run_id,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
await self._emit_group_finished(run_id, label, results)
|
||
await self._announce_group_result(
|
||
run_id,
|
||
label,
|
||
task,
|
||
results,
|
||
origin,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
return
|
||
|
||
# 单 agent 场景先解析目标,再执行。
|
||
descriptor = self._resolve_single(task, target, strategy)
|
||
await self._emit_agent_started(run_id, descriptor, label, task=task)
|
||
progress_callback = self._build_progress_callback(
|
||
origin,
|
||
descriptor,
|
||
event_run_id=run_id,
|
||
tracking_run_id=run_id,
|
||
publish_via_bus=announce_via_bus,
|
||
)
|
||
result = await self._execute_descriptor(
|
||
descriptor,
|
||
task,
|
||
label,
|
||
skill_names=skills,
|
||
event_callback=progress_callback,
|
||
task_callback=self._build_task_callback(run_id, descriptor),
|
||
process_run_id=run_id,
|
||
)
|
||
await self._emit_agent_finished(run_id, descriptor, result)
|
||
await self._announce_single_result(
|
||
run_id,
|
||
label,
|
||
task,
|
||
result,
|
||
origin,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
except asyncio.CancelledError:
|
||
logger.info("Delegation [{}] cancelled", run_id)
|
||
if is_group:
|
||
await emit_process_event(
|
||
"process_run_cancelled",
|
||
run_id=run_id,
|
||
actor_type="system",
|
||
actor_id="agent-group",
|
||
actor_name="Agent Team",
|
||
status="cancelled",
|
||
)
|
||
else:
|
||
await self._emit_agent_cancelled(run_id, descriptor, label)
|
||
await self._announce_cancelled(
|
||
run_id,
|
||
label,
|
||
task,
|
||
origin,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
raise
|
||
except Exception as exc:
|
||
# 所有异常统一转换成 AgentRunResult 风格的错误结果,避免上层出现未处理异常。
|
||
logger.error("Delegation [{}] failed: {}", run_id, exc)
|
||
error_result = AgentRunResult(
|
||
agent_id=target or "delegation",
|
||
agent_name=target or "delegation",
|
||
status="error",
|
||
summary=f"Error: {exc}",
|
||
)
|
||
if is_group:
|
||
await emit_process_event(
|
||
"process_run_finished",
|
||
run_id=run_id,
|
||
actor_type="system",
|
||
actor_id="agent-group",
|
||
actor_name="Agent Team",
|
||
status="error",
|
||
summary=f"Error: {exc}",
|
||
)
|
||
elif descriptor is not None:
|
||
await self._emit_agent_finished(run_id, descriptor, error_result)
|
||
await self._announce_single_result(
|
||
run_id,
|
||
label,
|
||
task,
|
||
error_result,
|
||
origin,
|
||
announce_via_bus=announce_via_bus,
|
||
)
|
||
|
||
def _resolve_single(self, task: str, target: str | None, strategy: str) -> AgentDescriptor:
|
||
"""按显式目标或路由策略解析单个 agent。"""
|
||
if target:
|
||
descriptor = self.registry.get_agent(target)
|
||
if descriptor is None:
|
||
raise ValueError(f"Agent '{target}' not found")
|
||
self._ensure_descriptor_allowed(descriptor)
|
||
return descriptor
|
||
|
||
if strategy == "local":
|
||
if not self.allow_local_fallback:
|
||
raise ValueError("Local fallback delegation is disabled")
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor is None:
|
||
raise ValueError("Local subagent is not available")
|
||
return descriptor
|
||
|
||
if strategy == "plugin":
|
||
if not self.allow_plugin_delegation:
|
||
raise ValueError("Plugin delegation is disabled")
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task)
|
||
if agent.kind == "local_prompt" and agent.source == "plugin"
|
||
]
|
||
if suggestions:
|
||
return suggestions[0]
|
||
raise ValueError("No matching plugin agent found")
|
||
|
||
if strategy == "a2a":
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task)
|
||
if agent.protocol == "a2a"
|
||
]
|
||
if suggestions:
|
||
return suggestions[0]
|
||
raise ValueError("No matching A2A agent found")
|
||
|
||
# Persistent sub-agent 管理是本地工作区变更任务,必须留在本地执行,
|
||
# 不能自动委派给远端 A2A agent,否则远端看不到本地规范和状态。
|
||
if self._is_persistent_subagent_task(task):
|
||
if not self.allow_local_fallback:
|
||
raise ValueError("Persistent sub-agent management requires local fallback delegation")
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor is None:
|
||
raise ValueError("Local fallback agent is not available")
|
||
return descriptor
|
||
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=5)
|
||
if self._descriptor_allowed(agent)
|
||
]
|
||
if suggestions:
|
||
return suggestions[0]
|
||
# 自动路由一个都猜不到时,最后回到本地兜底 agent。
|
||
if not self.allow_local_fallback:
|
||
raise ValueError("No allowed agent found for delegation")
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor is None:
|
||
raise ValueError("Local fallback agent is not available")
|
||
return descriptor
|
||
|
||
def _resolve_nested_delegate(self, task: str, target: str | None, strategy: str) -> AgentDescriptor:
|
||
"""为 delegated worker 解析允许的下游目标。"""
|
||
probe = (strategy or "auto").strip().lower()
|
||
if target:
|
||
descriptor = self.registry.get_agent(target)
|
||
if descriptor is None:
|
||
raise ValueError(f"Agent '{target}' not found")
|
||
self._ensure_nested_descriptor_allowed(descriptor)
|
||
if probe == "a2a" and not self._is_nested_a2a_descriptor(descriptor):
|
||
raise ValueError(f"Agent '{target}' is not an allowed A2A downstream target")
|
||
if probe == "ephemeral_subagent" and not self._is_ephemeral_local_descriptor(descriptor):
|
||
raise ValueError(f"Agent '{target}' is not an allowed ephemeral downstream target")
|
||
return descriptor
|
||
|
||
if probe == "a2a":
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=5)
|
||
if self._is_nested_a2a_descriptor(agent)
|
||
]
|
||
if suggestions:
|
||
return suggestions[0]
|
||
raise ValueError("No matching downstream A2A agent found")
|
||
|
||
if probe == "ephemeral_subagent":
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=5)
|
||
if self._is_ephemeral_local_descriptor(agent)
|
||
]
|
||
if suggestions:
|
||
return suggestions[0]
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor and self._is_ephemeral_local_descriptor(descriptor):
|
||
return descriptor
|
||
raise ValueError("No ephemeral local subagent is available")
|
||
|
||
a2a_suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=5)
|
||
if self._is_nested_a2a_descriptor(agent)
|
||
]
|
||
if a2a_suggestions:
|
||
return a2a_suggestions[0]
|
||
local_suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=5)
|
||
if self._is_ephemeral_local_descriptor(agent)
|
||
]
|
||
if local_suggestions:
|
||
return local_suggestions[0]
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor and self._nested_descriptor_allowed(descriptor):
|
||
return descriptor
|
||
raise ValueError("No allowed downstream agent found")
|
||
|
||
@staticmethod
|
||
def _normalize_skill_names(skill_names: list[str] | None) -> list[str]:
|
||
result: list[str] = []
|
||
seen: set[str] = set()
|
||
for item in skill_names or []:
|
||
name = str(item or "").strip()
|
||
if not name:
|
||
continue
|
||
key = name.lower()
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
result.append(name)
|
||
return result
|
||
|
||
def _build_skill_context(self, skill_names: list[str] | None) -> str:
|
||
names = self._normalize_skill_names(skill_names)
|
||
if not names:
|
||
return ""
|
||
header = "Required skills: " + ", ".join(names)
|
||
if self.skills_loader is None:
|
||
return header
|
||
content = self.skills_loader.load_skills_for_context(names).strip()
|
||
if not content:
|
||
return header
|
||
return f"{header}\n\n{content}"
|
||
|
||
def _augment_task_with_skills(self, task: str, skill_names: list[str] | None) -> str:
|
||
skill_context = self._build_skill_context(skill_names)
|
||
if not skill_context:
|
||
return task
|
||
return (
|
||
f"{task}\n\n"
|
||
"You must follow the required skills below while completing this delegated work.\n\n"
|
||
f"{skill_context}"
|
||
)
|
||
|
||
@classmethod
|
||
def _is_persistent_subagent_task(cls, task: str) -> bool:
|
||
text = (task or "").strip()
|
||
if not text:
|
||
return False
|
||
|
||
matched = sum(1 for pattern in cls._PERSISTENT_SUBAGENT_PATTERNS if pattern.search(text))
|
||
if matched >= 2:
|
||
return True
|
||
|
||
lowered = text.lower()
|
||
return (
|
||
("create" in lowered or "update" in lowered or "repair" in lowered or "fix" in lowered)
|
||
and ("subagent" in lowered or "sub-agent" in lowered)
|
||
)
|
||
|
||
async def _run_group(
|
||
self,
|
||
task: str,
|
||
label: str,
|
||
target: str | None,
|
||
targets: list[str],
|
||
strategy: str,
|
||
skills: list[str],
|
||
origin: dict[str, str],
|
||
run_id: str,
|
||
announce_via_bus: bool,
|
||
) -> list[AgentRunResult]:
|
||
"""并行执行一组 agent,并汇总结果。"""
|
||
resolved_targets = list(targets)
|
||
if target:
|
||
resolved_targets.append(target)
|
||
if not resolved_targets:
|
||
# 未显式给出目标时,根据任务文本自动挑若干个候选 agent。
|
||
suggestions = [
|
||
agent for agent in self.registry.suggest_agents(task, limit=self.max_parallel_agents * 2)
|
||
if self._descriptor_allowed(agent)
|
||
]
|
||
resolved_targets = [agent.id for agent in suggestions]
|
||
if not resolved_targets:
|
||
descriptor = self.registry.get_agent("local-subagent")
|
||
if descriptor and self._descriptor_allowed(descriptor):
|
||
resolved_targets = [descriptor.id]
|
||
if not resolved_targets:
|
||
raise ValueError("No agents available for group delegation")
|
||
resolved_targets = list(dict.fromkeys(resolved_targets))
|
||
|
||
descriptors: list[AgentDescriptor] = []
|
||
missing: list[str] = []
|
||
for item in resolved_targets:
|
||
descriptor = self.registry.get_agent(item)
|
||
if descriptor is None:
|
||
missing.append(item)
|
||
else:
|
||
self._ensure_descriptor_allowed(descriptor)
|
||
descriptors.append(descriptor)
|
||
if missing:
|
||
raise ValueError(f"Agent(s) not found: {', '.join(missing)}")
|
||
|
||
semaphore = asyncio.Semaphore(self.max_parallel_agents)
|
||
|
||
async def _run_one(descriptor: AgentDescriptor) -> AgentRunResult:
|
||
# group 内每个成员都分配独立 child run_id,便于前端区分子树。
|
||
child_run_id = new_run_id("agent")
|
||
async with semaphore:
|
||
try:
|
||
await self._emit_agent_started(
|
||
child_run_id,
|
||
descriptor,
|
||
label,
|
||
parent_run_id=run_id,
|
||
task=task,
|
||
)
|
||
result = await self._execute_descriptor(
|
||
descriptor,
|
||
task,
|
||
label,
|
||
skill_names=skills,
|
||
event_callback=self._build_progress_callback(
|
||
origin,
|
||
descriptor,
|
||
event_run_id=child_run_id,
|
||
tracking_run_id=run_id,
|
||
publish_via_bus=announce_via_bus,
|
||
),
|
||
task_callback=self._build_task_callback(run_id, descriptor),
|
||
process_run_id=child_run_id,
|
||
)
|
||
await self._emit_agent_finished(child_run_id, descriptor, result)
|
||
return result
|
||
except asyncio.CancelledError:
|
||
await self._emit_agent_cancelled(child_run_id, descriptor, label)
|
||
raise
|
||
except Exception as exc:
|
||
result = AgentRunResult(
|
||
agent_id=descriptor.id,
|
||
agent_name=descriptor.name,
|
||
status="error",
|
||
summary=f"Error: {exc}",
|
||
)
|
||
await self._emit_agent_finished(child_run_id, descriptor, result)
|
||
return result
|
||
results = await asyncio.gather(*[_run_one(agent) for agent in descriptors])
|
||
return results
|
||
|
||
async def _execute_descriptor(
|
||
self,
|
||
descriptor: AgentDescriptor,
|
||
task: str,
|
||
label: str,
|
||
skill_names: list[str] | None = None,
|
||
event_callback=None,
|
||
task_callback=None,
|
||
process_run_id: str | None = None,
|
||
allow_nested_delegation: bool = True,
|
||
) -> AgentRunResult:
|
||
"""根据 descriptor 类型执行具体 agent。"""
|
||
logger.info("Delegating '{}' to {}", label, descriptor.id)
|
||
skill_context = self._build_skill_context(skill_names)
|
||
if descriptor.kind in {"local_fallback", "local_prompt"}:
|
||
if not self.allow_local_delegation or (
|
||
descriptor.kind == "local_prompt" and not self.allow_plugin_delegation
|
||
) or (
|
||
descriptor.kind == "local_fallback" and not self.allow_local_fallback
|
||
):
|
||
raise ValueError(f"Delegation to '{descriptor.id}' is disabled")
|
||
# 本地执行时,把当前 run_id 写入上下文,便于更深层的 MCP/tool 事件挂父节点。
|
||
with process_run_context(process_run_id):
|
||
return await self.local_executor.run_local_task(
|
||
task=task,
|
||
label=label,
|
||
agent_id=descriptor.id,
|
||
agent_name=descriptor.name,
|
||
system_prompt=descriptor.system_prompt,
|
||
model=descriptor.model,
|
||
progress_callback=event_callback,
|
||
allow_nested_delegation=allow_nested_delegation,
|
||
skill_context=skill_context,
|
||
skill_names=self._normalize_skill_names(skill_names),
|
||
)
|
||
if descriptor.kind == "a2a_remote" or descriptor.protocol == "a2a":
|
||
# 远端执行交给 A2AClient,委派层只负责传递事件回调和 task_callback。
|
||
with process_run_context(process_run_id):
|
||
return await self.a2a_client.run_task(
|
||
descriptor,
|
||
task=self._augment_task_with_skills(task, skill_names),
|
||
label=label,
|
||
event_callback=event_callback,
|
||
task_callback=task_callback,
|
||
)
|
||
raise ValueError(f"Unsupported agent kind '{descriptor.kind}'")
|
||
|
||
def _descriptor_allowed(self, descriptor: AgentDescriptor) -> bool:
|
||
if descriptor.kind == "local_fallback":
|
||
return self.allow_local_fallback and self.allow_local_delegation
|
||
if descriptor.kind == "local_prompt":
|
||
return self.allow_local_delegation and self.allow_plugin_delegation
|
||
if descriptor.protocol == "a2a" or descriptor.kind == "a2a_remote":
|
||
return True
|
||
return False
|
||
|
||
@staticmethod
|
||
def _is_persistent_local_subagent_descriptor(descriptor: AgentDescriptor) -> bool:
|
||
return bool(descriptor.metadata.get("local_subagent"))
|
||
|
||
def _is_ephemeral_local_descriptor(self, descriptor: AgentDescriptor) -> bool:
|
||
return descriptor.kind in {"local_fallback", "local_prompt"} and self._descriptor_allowed(descriptor)
|
||
|
||
def _is_nested_a2a_descriptor(self, descriptor: AgentDescriptor) -> bool:
|
||
return (
|
||
(descriptor.protocol == "a2a" or descriptor.kind == "a2a_remote")
|
||
and not self._is_persistent_local_subagent_descriptor(descriptor)
|
||
)
|
||
|
||
def _nested_descriptor_allowed(self, descriptor: AgentDescriptor) -> bool:
|
||
return self._is_ephemeral_local_descriptor(descriptor) or self._is_nested_a2a_descriptor(descriptor)
|
||
|
||
def _ensure_descriptor_allowed(self, descriptor: AgentDescriptor) -> None:
|
||
if not self._descriptor_allowed(descriptor):
|
||
raise ValueError(f"Delegation to '{descriptor.id}' is disabled")
|
||
|
||
def _ensure_nested_descriptor_allowed(self, descriptor: AgentDescriptor) -> None:
|
||
if not self._nested_descriptor_allowed(descriptor):
|
||
raise ValueError(f"Delegation to '{descriptor.id}' is not allowed for delegated workers")
|
||
|
||
def _build_progress_callback(
|
||
self,
|
||
origin: dict[str, str],
|
||
descriptor: AgentDescriptor,
|
||
event_run_id: str,
|
||
tracking_run_id: str | None = None,
|
||
publish_via_bus: bool = True,
|
||
):
|
||
"""构造统一的进度回调,适配本地 agent 和 A2A 流事件。"""
|
||
last_text: str | None = None
|
||
last_status: str | None = None
|
||
|
||
if descriptor.protocol == "a2a":
|
||
async def _callback(event: A2AStreamEvent) -> None:
|
||
nonlocal last_text, last_status
|
||
# 远端一旦暴露 task_id,立刻登记,便于后续取消。
|
||
if tracking_run_id and event.task_id:
|
||
self._register_remote_task(tracking_run_id, descriptor, event.task_id)
|
||
text = (event.text or "").strip()
|
||
status = (event.status or "").strip()
|
||
if text and text != last_text:
|
||
last_text = text
|
||
# 文本进度既发给 bus,也发结构化 process event。
|
||
await self._publish_prefixed_progress(
|
||
origin,
|
||
descriptor,
|
||
text,
|
||
publish_via_bus=publish_via_bus,
|
||
)
|
||
await emit_process_event(
|
||
"process_run_progress",
|
||
run_id=event_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
text=text,
|
||
metadata={"kind": event.kind, "protocol": "a2a"},
|
||
)
|
||
if event.kind == "artifact-update":
|
||
# artifact-update 单独再抛一份 artifact 事件,前端可按附件样式渲染。
|
||
await emit_process_event(
|
||
"process_run_artifact",
|
||
run_id=event_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
title=f"{descriptor.name} artifact",
|
||
artifact_type="text",
|
||
content=text,
|
||
metadata={"kind": event.kind, "protocol": "a2a"},
|
||
)
|
||
if status and status != last_status:
|
||
last_status = status
|
||
# A2A 的原始状态名不稳定,这里统一归一化后再发给前端。
|
||
await emit_process_event(
|
||
"process_run_status",
|
||
run_id=event_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
status=self._ui_status(status),
|
||
text=f"{descriptor.name}: {status}",
|
||
metadata={"raw_status": status, "protocol": "a2a"},
|
||
)
|
||
|
||
return _callback
|
||
|
||
async def _local_callback(text: str, *, tool_hint: bool = False) -> None:
|
||
nonlocal last_text, last_status
|
||
clean = text.strip()
|
||
if clean and clean != last_text:
|
||
last_text = clean
|
||
await self._publish_prefixed_progress(
|
||
origin,
|
||
descriptor,
|
||
clean,
|
||
publish_via_bus=publish_via_bus,
|
||
tool_hint=tool_hint,
|
||
)
|
||
await emit_process_event(
|
||
"process_run_progress",
|
||
run_id=event_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
text=clean,
|
||
metadata={"tool_hint": tool_hint, "protocol": "local"},
|
||
)
|
||
status = "running"
|
||
if status != last_status:
|
||
last_status = status
|
||
# 本地执行没有像 A2A 那样细粒度状态流,至少发一次 running 状态。
|
||
await emit_process_event(
|
||
"process_run_status",
|
||
run_id=event_run_id,
|
||
actor_type="agent",
|
||
actor_id=descriptor.id,
|
||
actor_name=descriptor.name,
|
||
status=status,
|
||
text=f"{descriptor.name} is working",
|
||
metadata={"protocol": "local"},
|
||
)
|
||
|
||
return _local_callback
|
||
|
||
def _build_task_callback(self, run_id: str, descriptor: AgentDescriptor):
|
||
"""为远端 A2A agent 构造 task_id 登记回调。"""
|
||
if descriptor.protocol != "a2a":
|
||
return None
|
||
|
||
async def _callback(task_id: str) -> None:
|
||
self._register_remote_task(run_id, descriptor, task_id)
|
||
|
||
return _callback
|
||
|
||
def _register_remote_task(
|
||
self,
|
||
run_id: str,
|
||
descriptor: AgentDescriptor,
|
||
task_id: str,
|
||
) -> None:
|
||
"""把远端 agent 产生的 task_id 记到运行状态里。"""
|
||
state = self._running_tasks.get(run_id)
|
||
if state is None:
|
||
return
|
||
state.remote_agents[descriptor.id] = descriptor
|
||
state.remote_task_ids[descriptor.id] = task_id
|
||
|
||
async def _cancel_remote_tasks(self, run_id: str, state: DelegationRun) -> None:
|
||
"""尽力取消当前委派对应的所有远端 A2A 任务。"""
|
||
if not state.remote_task_ids:
|
||
return
|
||
|
||
async def _cancel_one(agent_id: str, task_id: str) -> tuple[str, bool]:
|
||
descriptor = state.remote_agents.get(agent_id)
|
||
if descriptor is None:
|
||
return agent_id, False
|
||
try:
|
||
cancelled = await self.a2a_client.cancel_task(descriptor, task_id)
|
||
return agent_id, cancelled
|
||
except Exception as exc:
|
||
# 取消失败只记日志,不阻断其他任务的取消尝试。
|
||
logger.warning("Failed to cancel remote task {} for {}: {}", task_id, agent_id, exc)
|
||
return agent_id, False
|
||
|
||
results = await asyncio.gather(*[
|
||
_cancel_one(agent_id, task_id)
|
||
for agent_id, task_id in list(state.remote_task_ids.items())
|
||
])
|
||
for agent_id, cancelled in results:
|
||
if cancelled:
|
||
logger.info("Cancelled remote A2A task for {} in delegation {}", agent_id, run_id)
|
||
|
||
async def _announce_cancelled(
|
||
self,
|
||
run_id: str,
|
||
label: str,
|
||
task: str,
|
||
origin: dict[str, str],
|
||
*,
|
||
announce_via_bus: bool,
|
||
) -> None:
|
||
"""公告委派被取消。"""
|
||
if announce_via_bus:
|
||
await self._publish_announcement(
|
||
(
|
||
f"[Delegation '{label}' cancelled]\n\n"
|
||
f"Task: {task}\n\n"
|
||
"Tell the user briefly that the delegated work was cancelled."
|
||
),
|
||
origin,
|
||
sender_id="delegation-cancel",
|
||
)
|
||
else:
|
||
await self._notify_direct_announcement(
|
||
(
|
||
f"[Delegation '{label}' cancelled]\n\n"
|
||
f"Task: {task}\n\n"
|
||
"Tell the user briefly that the delegated work was cancelled."
|
||
),
|
||
origin,
|
||
"delegation-cancel",
|
||
)
|
||
await self._emit_direct_user_message(
|
||
f"The delegated work '{label}' for task '{task}' was cancelled. Tell the user briefly.",
|
||
f"已取消委派任务:{label}",
|
||
)
|
||
|
||
async def _announce_single_result(
|
||
self,
|
||
run_id: str,
|
||
label: str,
|
||
task: str,
|
||
result: AgentRunResult,
|
||
origin: dict[str, str],
|
||
*,
|
||
announce_via_bus: bool,
|
||
) -> None:
|
||
"""公告单 agent 委派结果。"""
|
||
status_text = "completed successfully" if result.status == "ok" else result.status
|
||
content = (
|
||
f"[Delegation '{label}' {status_text}]\n\n"
|
||
f"Agent: {result.agent_name} ({result.agent_id})\n"
|
||
f"Task: {task}\n\n"
|
||
f"Result:\n{result.summary}\n\n"
|
||
"Summarize this naturally for the user. Keep it brief (1-2 sentences). "
|
||
"Do not mention technical details like task IDs unless they matter."
|
||
)
|
||
if announce_via_bus:
|
||
await self._publish_announcement(content, origin, sender_id="delegation")
|
||
else:
|
||
await self._notify_direct_announcement(content, origin, "delegation")
|
||
await self._emit_direct_user_message(
|
||
content,
|
||
f"{result.agent_name} 已完成:{result.summary}",
|
||
)
|
||
logger.debug("Delegation [{}] announced result", run_id)
|
||
|
||
async def _announce_group_result(
|
||
self,
|
||
run_id: str,
|
||
label: str,
|
||
task: str,
|
||
results: list[AgentRunResult],
|
||
origin: dict[str, str],
|
||
*,
|
||
announce_via_bus: bool,
|
||
) -> None:
|
||
"""公告 agent team 汇总结果。"""
|
||
lines = [f"[Agent team '{label}' completed]", "", f"Task: {task}", "", "Members:"]
|
||
for result in results:
|
||
lines.append(f"- {result.agent_name} ({result.agent_id}): {result.status}")
|
||
lines.extend(["", "Results:"])
|
||
for result in results:
|
||
lines.append(f"### {result.agent_name} ({result.status})")
|
||
lines.append(result.summary)
|
||
lines.append("")
|
||
lines.append(
|
||
"Summarize this naturally for the user. Mention disagreements or failures if any."
|
||
)
|
||
summary = "\n".join(lines).strip()
|
||
if announce_via_bus:
|
||
await self._publish_announcement(
|
||
summary,
|
||
origin,
|
||
sender_id="delegation-group",
|
||
)
|
||
else:
|
||
await self._notify_direct_announcement(
|
||
summary,
|
||
origin,
|
||
"delegation-group",
|
||
)
|
||
await self._emit_direct_user_message(
|
||
summary,
|
||
"Agent team 已完成,请查看各 agent 的结果与最终结论。",
|
||
)
|
||
logger.debug("Agent team [{}] announced result", run_id)
|
||
|
||
async def _publish_announcement(
|
||
self,
|
||
content: str,
|
||
origin: dict[str, str],
|
||
sender_id: str,
|
||
) -> None:
|
||
"""通过 system inbound 消息把公告重新送回主 agent 链路。"""
|
||
msg = InboundMessage(
|
||
channel="system",
|
||
sender_id=sender_id,
|
||
chat_id=f"{origin['channel']}:{origin['chat_id']}",
|
||
content=content,
|
||
)
|
||
await self.bus.publish_inbound(msg)
|