"""委派工具:分别暴露 subagent 与 agent team 两种调用接口。""" from typing import TYPE_CHECKING, Any from nanobot.agent.tools.base import Tool if TYPE_CHECKING: from nanobot.agent.delegation import DelegationManager class DelegationTool(Tool): """委派类工具的公共上下文注入逻辑。""" def __init__(self, manager: "DelegationManager"): self._manager = manager self._origin_channel = "cli" self._origin_chat_id = "direct" self._announce_via_bus = True def set_context(self, channel: str, chat_id: str, announce_via_bus: bool = True) -> None: """设置后台委派结果回传的目标会话。""" self._origin_channel = channel self._origin_chat_id = chat_id self._announce_via_bus = announce_via_bus class SpawnSubagentTool(DelegationTool): """把任务委派给单个 subagent。""" @property def name(self) -> str: return "spawn_subagent" @property def description(self) -> str: return ( "Delegate a focused task to one background subagent. " "Use this for complex or time-consuming work that can run independently. " "You only provide the task and optional required skills; downstream routing decides the concrete agent. " "The subagent will report back when done." ) @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "task": { "type": "string", "description": "The task for the delegated subagent to complete", }, "label": { "type": "string", "description": "Optional short label for the task (for display)", }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Optional list of skill names the delegated worker must follow", }, }, "required": ["task"], } async def execute( self, task: str, label: str | None = None, skills: list[str] | None = None, **kwargs: Any, ) -> str: """创建并启动一个 subagent 后台任务。""" return await self._manager.dispatch_subagent( task=task, label=label, skills=skills, origin_channel=self._origin_channel, origin_chat_id=self._origin_chat_id, announce_via_bus=self._announce_via_bus, ) class SpawnAgentTeamTool(DelegationTool): """启动一个 agent team 任务。""" @property def name(self) -> str: return "spawn_agent_team" @property def description(self) -> str: return ( "Start an agent team for parallel exploration. " "Use this when multiple agents should investigate the task in parallel and return a combined result. " "You only provide the task and optional required skills; downstream routing selects the concrete members." ) @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "task": { "type": "string", "description": "The shared task for the agent team", }, "label": { "type": "string", "description": "Optional short label for the team task (for display)", }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Optional list of skill names the team must follow", }, }, "required": ["task"], } async def execute( self, task: str, label: str | None = None, skills: list[str] | None = None, **kwargs: Any, ) -> str: """创建并启动一个 agent team 后台任务。""" return await self._manager.dispatch_agent_team( task=task, label=label, skills=skills, origin_channel=self._origin_channel, origin_chat_id=self._origin_chat_id, announce_via_bus=self._announce_via_bus, ) class NestedDelegateTool(Tool): """供 delegated worker 使用的受控下游委派工具。""" def __init__(self, manager: "DelegationManager", default_skills: list[str] | None = None): self._manager = manager self._default_skills = [str(item).strip() for item in (default_skills or []) if str(item).strip()] @property def name(self) -> str: return "delegate_task" @property def description(self) -> str: return ( "Synchronously delegate a downstream task from a delegated worker. " "Use this only when specialized help is needed. " "It can route to an A2A agent or an ephemeral local subagent, but never creates a persistent subagent." ) @property def parameters(self) -> dict[str, Any]: return { "type": "object", "properties": { "task": { "type": "string", "description": "The downstream task to delegate", }, "label": { "type": "string", "description": "Optional short label for the downstream task", }, "target": { "type": "string", "description": "Optional agent ID or name for the downstream worker", }, "strategy": { "type": "string", "enum": ["auto", "a2a", "ephemeral_subagent"], "description": "Routing strategy for downstream delegation. Default is auto.", }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Optional required skills for the downstream delegate. Defaults to the current worker's required skills.", }, }, "required": ["task"], } async def execute( self, task: str, label: str | None = None, target: str | None = None, strategy: str = "auto", skills: list[str] | None = None, **kwargs: Any, ) -> str: """同步执行一次受控下游委派,并把结果返回给当前 worker。""" return await self._manager.delegate_for_subagent( task=task, label=label, target=target, strategy=strategy, skills=skills if skills is not None else list(self._default_skills), )