"""Persistent local sub-agent storage helpers.""" from __future__ import annotations import json import re import shutil from dataclasses import asdict, dataclass, field from importlib.resources import files as pkg_files from pathlib import Path from typing import Any from nanobot.config.schema import Config, MCPServerConfig _INVALID_ID_RE = re.compile(r"[^a-z0-9-]+") def normalize_subagent_id(value: str) -> str: normalized = _INVALID_ID_RE.sub("-", str(value or "").strip().lower()).strip("-") normalized = re.sub(r"-{2,}", "-", normalized) if not normalized: raise ValueError("Sub-agent id is required") return normalized @dataclass class SubagentSpec: id: str name: str description: str enabled: bool = True workspace: str = "" system_prompt: str = "" model: str | None = None delegation_mode: str = "remote_a2a_only" allow_mcp: bool = True tags: list[str] = field(default_factory=list) aliases: list[str] = field(default_factory=list) mcp_servers: dict[str, dict[str, Any]] = field(default_factory=dict) metadata: dict[str, Any] = field(default_factory=dict) @classmethod def from_dict(cls, payload: dict[str, Any], *, workspace_path: Path | None = None) -> "SubagentSpec": agent_id = normalize_subagent_id(payload.get("id", "")) name = str(payload.get("name") or agent_id).strip() or agent_id description = str(payload.get("description") or name).strip() or name workspace = str(payload.get("workspace") or "").strip() if not workspace and workspace_path is not None: workspace = str(workspace_path) tags = [str(item).strip() for item in payload.get("tags", []) if str(item).strip()] aliases = [str(item).strip() for item in payload.get("aliases", []) if str(item).strip()] mcp_servers = payload.get("mcp_servers", {}) if not isinstance(mcp_servers, dict): mcp_servers = {} metadata = payload.get("metadata", {}) if not isinstance(metadata, dict): metadata = {} return cls( id=agent_id, name=name, description=description, enabled=bool(payload.get("enabled", True)), workspace=workspace, system_prompt=str(payload.get("system_prompt") or "").strip(), model=(str(payload.get("model") or "").strip() or None), delegation_mode=(str(payload.get("delegation_mode") or "remote_a2a_only").strip() or "remote_a2a_only"), allow_mcp=bool(payload.get("allow_mcp", True)), tags=tags, aliases=aliases, mcp_servers=mcp_servers, metadata=metadata, ) def to_dict(self) -> dict[str, Any]: payload = asdict(self) if not self.model: payload["model"] = None return payload class LocalSubagentStore: """Persist sub-agent definitions under `/agents/_agent/`.""" def __init__(self, workspace: Path): self.workspace = workspace.expanduser().resolve() self.directory = self.workspace / "agents" def list_subagents(self) -> list[SubagentSpec]: if not self.directory.exists(): return [] result: list[SubagentSpec] = [] for child in sorted(self.directory.iterdir()): agents_json = child / "AGENTS.json" if not child.is_dir() or not agents_json.exists(): continue try: payload = json.loads(agents_json.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError, ValueError): continue if not isinstance(payload, dict): continue result.append(SubagentSpec.from_dict(payload, workspace_path=child)) return result def get_subagent(self, agent_id: str) -> SubagentSpec | None: path = self.agents_json_path(agent_id) if not path.exists(): return None try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError, ValueError): return None if not isinstance(payload, dict): return None return SubagentSpec.from_dict(payload, workspace_path=self.subagent_dir(agent_id)) def upsert_subagent(self, payload: dict[str, Any], config: Config) -> SubagentSpec: agent_id = normalize_subagent_id(payload.get("id", "")) workspace_path = self.subagent_dir(agent_id) spec = SubagentSpec.from_dict(payload, workspace_path=workspace_path) self._ensure_workspace(workspace_path) spec.workspace = str(workspace_path) self._sync_agents_md(workspace_path, spec) self.agents_json_path(agent_id).write_text( json.dumps(spec.to_dict(), indent=2, ensure_ascii=False) + "\n", encoding="utf-8", ) from nanobot.agent.agent_registry import WorkspaceAgentStore WorkspaceAgentStore(self.workspace).upsert_agent(self.build_registry_record(spec, config)) return spec def delete_subagent(self, agent_id: str) -> bool: agent_id = normalize_subagent_id(agent_id) target = self.subagent_dir(agent_id) if not target.exists(): return False from nanobot.agent.agent_registry import WorkspaceAgentStore WorkspaceAgentStore(self.workspace).delete_agent(agent_id) shutil.rmtree(target) return True def subagent_dir(self, agent_id: str) -> Path: return self.directory / f"{normalize_subagent_id(agent_id)}_agent" def agents_json_path(self, agent_id: str) -> Path: return self.subagent_dir(agent_id) / "AGENTS.json" def local_base_url(self, config: Config, agent_id: str) -> str: return f"http://127.0.0.1:{int(config.gateway.port)}/subagents/{normalize_subagent_id(agent_id)}" def build_registry_record(self, spec: SubagentSpec, config: Config) -> dict[str, Any]: base_url = self.local_base_url(config, spec.id) card_url = f"{base_url}/.well-known/agent-card" return { "id": spec.id, "name": spec.name, "description": spec.description, "protocol": "a2a", "base_url": base_url, "endpoint": f"{base_url}/rpc", "card_url": card_url, "enabled": spec.enabled, "tags": sorted(set(["local-subagent", *spec.tags])), "aliases": sorted(set([spec.name, *spec.aliases])), "metadata": { **spec.metadata, "workspace": spec.workspace, "managed_by": "subagent-manager", "local_subagent": True, }, "capabilities": {"streaming": False}, "support_group": False, "support_streaming": False, } @staticmethod def build_agent_card(spec: SubagentSpec, config: Config) -> dict[str, Any]: base_url = f"http://127.0.0.1:{int(config.gateway.port)}/subagents/{spec.id}" rpc_url = f"{base_url}/rpc" return { "id": spec.id, "name": spec.name, "description": spec.description, "url": rpc_url, "preferred_transport": "jsonrpc", "interfaces": [{"transport": "jsonrpc", "url": rpc_url}], "capabilities": {"streaming": False}, "tags": sorted(set(["local-subagent", *spec.tags])), "metadata": { "workspace": spec.workspace, "managed_by": "subagent-manager", }, } @staticmethod def coerce_mcp_servers(spec: SubagentSpec) -> dict[str, MCPServerConfig]: if not spec.allow_mcp: return {} result: dict[str, MCPServerConfig] = {} for name, payload in spec.mcp_servers.items(): if not isinstance(payload, dict): continue try: result[name] = MCPServerConfig.model_validate(payload) except Exception: continue return result def _ensure_workspace(self, workspace_path: Path) -> None: workspace_path.mkdir(parents=True, exist_ok=True) templates_dir = pkg_files("nanobot") / "templates" for item in templates_dir.iterdir(): if not item.name.endswith(".md") or item.name == "AGENTS.md": continue dest = workspace_path / item.name if not dest.exists(): dest.write_text(item.read_text(encoding="utf-8"), encoding="utf-8") memory_dir = workspace_path / "memory" memory_dir.mkdir(exist_ok=True) memory_template = templates_dir / "memory" / "MEMORY.md" memory_file = memory_dir / "MEMORY.md" if not memory_file.exists(): memory_file.write_text(memory_template.read_text(encoding="utf-8"), encoding="utf-8") history_file = memory_dir / "HISTORY.md" if not history_file.exists(): history_file.write_text("", encoding="utf-8") (workspace_path / "skills").mkdir(exist_ok=True) def _sync_agents_md(self, workspace_path: Path, spec: SubagentSpec) -> None: content = self._render_agents_md(spec) (workspace_path / "AGENTS.md").write_text(content, encoding="utf-8") @staticmethod def _render_agents_md(spec: SubagentSpec) -> str: prompt = spec.system_prompt.strip() or "Complete delegated tasks accurately and concisely." return f"""# {spec.name} You are {spec.name}, a persistent local sub-agent managed by Boardware Genius. ## Role {spec.description} ## System Prompt {prompt} ## Constraints - Work only inside this workspace. - Respond only to delegated tasks. - Delegate only to remote A2A agents when delegation is enabled. - Do not create or manage local sub-agents. - Do not message end users directly. """