"""Planner that prepares a minimal swarms run spec for agent-team tasks.""" from __future__ import annotations import asyncio import json from typing import Any from loguru import logger from nanobot.agent.agent_registry import AgentRegistry from nanobot.agent_team.memory import ProcedureMemory from nanobot.agent_team.swarms_adapter import load_swarms_runtime, safe_swarms_name from nanobot.agent_team.swarms_policy import SwarmsPolicy from nanobot.agent_team.target_resolver import TargetResolver from nanobot.agent_team.types import SwarmsRunSpec class SwarmsRunPlanner: """Generate `SwarmsRunSpec` without rebuilding swarms' own planner/runtime.""" def __init__( self, *, model: str | None, registry: AgentRegistry, target_resolver: TargetResolver, procedure_memory: ProcedureMemory, policy: SwarmsPolicy, ) -> None: self.model = model self.registry = registry self.target_resolver = target_resolver self.procedure_memory = procedure_memory self.policy = policy async def plan(self, *, task: str, label: str, skills: list[str]) -> SwarmsRunSpec: memory_hint = self.procedure_memory.match_procedure(task) if self._should_auto_build(task, skills, memory_hint): raw_config = await self._run_auto_swarm_builder(task, skills, memory_hint) return await self._spec_from_auto_config(task, label, skills, raw_config) target_plan = await self.target_resolver.resolve_team_targets( task=task, skills=skills, required_specialists=self._simple_required_roles(task, skills), ) return SwarmsRunSpec( task=task, label=label, skills=list(skills), swarm_type="GroupChat", agent_ids=list(target_plan.final_targets), auto_generated=False, max_loops=2, rules=self._default_rules(), metadata={ "memory_hint": memory_hint.id if memory_hint else None, "target_plan": target_plan.to_dict(), }, ) def _should_auto_build(self, task: str, skills: list[str], memory_hint: Any) -> bool: source = task or "" text = source.lower() markers = ("架构", "调研", "复杂", "多阶段", "strategy", "architecture", "research") return len(source) > 80 or memory_hint is not None or any( marker in source or marker in text for marker in markers ) async def _run_auto_swarm_builder(self, task: str, skills: list[str], memory_hint: Any) -> dict[str, Any]: try: runtime = load_swarms_runtime() builder = runtime["AutoSwarmBuilder"]( name="nanobot-auto-swarm-builder", description="Generate a safe swarms router config for nanobot", max_loops=1, model_name=self._auto_builder_model_name(), generate_router_config=True, execution_type="return-swarm-router-config", interactive=False, verbose=False, ) raw = await asyncio.to_thread( builder.run, self._auto_builder_prompt(task, skills, memory_hint), ) if isinstance(raw, dict): return raw if isinstance(raw, str): return json.loads(raw) model_dump = getattr(raw, "model_dump", None) if callable(model_dump): payload = model_dump() return payload if isinstance(payload, dict) else {} except Exception as exc: logger.warning("AutoSwarmBuilder failed; falling back to deterministic run spec: {}", exc) return {} def _auto_builder_model_name(self) -> str: model_name = str(self.model or "").strip() if not model_name: return "gpt-4.1" if "/" in model_name: return model_name return f"openai/{model_name}" def _auto_builder_prompt(self, task: str, skills: list[str], memory_hint: Any) -> str: return ( "Build a multi-agent swarm router config for nanobot.\n\n" f"User task:\n{task}\n\n" f"Required nanobot skills:\n{skills}\n\n" f"Procedure memory hint:\n{memory_hint}\n\n" "Return a valid JSON object that matches the swarm router config schema.\n\n" "Hard constraints:\n" "- Every generated role must follow the listed skills.\n" "- Do not replace, ignore, or reinterpret the listed skills.\n" "- Do not add external tools, credentials, MCP URLs, or hidden side effects.\n" "- Prefer existing nanobot registry agents; only describe missing roles." ) async def _spec_from_auto_config( self, task: str, label: str, skills: list[str], raw_config: dict[str, Any], ) -> SwarmsRunSpec: safe_config = self.policy.validate_auto_config(raw_config) target_plan = await self.target_resolver.resolve_team_targets( task=task, skills=skills, required_specialists=self._roles_from_auto_config(safe_config), ) return SwarmsRunSpec( task=task, label=label, skills=list(skills), swarm_type=str(safe_config.get("swarm_type") or "GroupChat"), agent_ids=list(target_plan.final_targets), auto_generated=bool(raw_config), max_loops=min(int(safe_config.get("max_loops") or 2), self.policy.max_loops), rearrange_flow=self._rearrange_flow(safe_config, target_plan.final_targets), rules=str(safe_config.get("rules") or self._default_rules()), raw_auto_config=safe_config, metadata={ "target_plan": target_plan.to_dict(), "auto_builder_returned_config": bool(raw_config), }, ) def _rearrange_flow(self, config: dict[str, Any], agent_ids: list[str]) -> str | None: if str(config.get("swarm_type") or "") == "AgentRearrange" and agent_ids: return " -> ".join(safe_swarms_name(agent_id) for agent_id in agent_ids) flow = config.get("rearrange_flow") or config.get("flow") if flow: return str(flow) return None def _roles_from_auto_config(self, config: dict[str, Any]) -> list[str]: roles: list[str] = [] for item in config.get("agents", []) or []: if not isinstance(item, dict): continue role = str( item.get("description") or item.get("system_prompt") or item.get("agent_name") or "" ).strip() if role: roles.append(role) return roles or ["general specialist", "synthesis analyst"] def _simple_required_roles(self, task: str, skills: list[str]) -> list[str]: if skills: return [f"{skill} specialist" for skill in skills] return ["general specialist", "synthesis analyst"] def _default_rules(self) -> str: return ( "You are running inside a nanobot agent team. Follow the provided skills, " "stay within your assigned role, and produce a concise final synthesis." )