Files
beaver_project/docs/superpowers/plans/2026-06-24-template-guided-team-routing.md
steven_li 520a21a027 feat(coordinator): 添加团队节点默认最大工具迭代次数配置
添加 DEFAULT_TEAM_NODE_MAX_TOOL_ITERATIONS 配置项以控制团队节点的最大工具迭代次数,
并修改 LocalAgentRunner 中的逻辑来使用此默认值当 envelope 中未指定时。

fix(runtime): 修复团队节点运行成功判断逻辑

更新运行成功判断条件,将 finish_reason 为 "max_tool_iterations_finalized" 的情况
视为运行失败,并添加对原始工具调用输出的检测,避免将其误判为成功完成。

feat(mcp): 添加团队工作流MCP工具类别支持

增加新的本地MCP工具类别 "team_workflow" 及其对应的工具创建功能,
为团队工作流提供本地工具支持。

refactor(engine): 调整AgentLoop最大工具迭代次数设置

将 AgentProfile 中的默认 max_tool_iterations 从 30 增加到 100,
同时移除 TaskExecutionPlanner 构造函数中的重复参数传递。

perf(mcp): 优化MCP连接管理避免重复连接

添加 mcp_connected 标志来跟踪MCP连接状态,确保 connect_all 只执行一次,
提高性能并避免不必要的重复连接。

refactor(skills): 移除技能团队模板相关功能

移除与技能团队模板相关的代码,包括解析、存储和处理逻辑,
简化技能记录结构和加载流程。

feat(process): 增强会话过程投影器功能

添加技能激活快照事件处理,改进团队运行完成消息显示,
并增强技能激活事件的时间戳记录功能。

refactor(tasks): 简化任务尝试编排器团队执行逻辑

移除团队执行相关代码,将所有任务统一按单步执行处理,
简化任务编排器的复杂度并提升执行效率。

fix(evidence): 修复节点证据评估中需求验证逻辑

更新节点证据评估逻辑,跳过自然语言证据需求的确定性验证,
只执行机器可读的需求验证,避免因自然语言需求导致的节点失败。
2026-06-26 16:36:29 +08:00

21 KiB
Raw Blame History

Template-Guided Team Routing Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Let a root Main Agent choose Team execution on its first provider response whenever an activated Skill supplies a valid Team template, while preserving an intentional zero-extra-round single-agent path.

Architecture: Keep ExecutionGraph, ExecutionNode, LocalAgentRunner, and run_agent_team unchanged. Add a small Main-Agent routing state inside AgentLoop: it selects the first valid activated template, adds compact first-turn guidance, classifies the first provider response as team or single, persists a structured mode event, and prevents a later mid-run Team switch after single-agent work starts. Project that event into the existing Task process stream; no frontend work is included.

Tech Stack: Python 3.12, asyncio, dataclasses, pytest, existing AgentLoop, session event store, process projector, and Team tool runtime.


File Structure

  • app-instance/backend/beaver/engine/loop.py: primary-template selection, first-turn guidance, mode classification/lock, tool-call filtering, and persistent routing event.
  • app-instance/backend/beaver/services/process_service.py: project the routing event into the existing task process stream.
  • app-instance/backend/tests/unit/test_agent_loop.py: Main-Agent prompt, first-turn Team, first-turn Single, mixed-call, and no-template regression tests.
  • app-instance/backend/tests/unit/test_process_projection.py: routing-event projection test.

No changes to Planner, Team scheduler/runtime, ToolAssembler, ToolExecutor, evidence gate, final synthesis gate, frontend, or Skill learning are required.

Task 1: Select a Primary Template and Make First-Turn Routing Explicit

Files:

  • Modify: app-instance/backend/beaver/engine/loop.py

  • Modify: app-instance/backend/tests/unit/test_agent_loop.py

  • Step 1: Add a sequenced provider and a valid template fixture to the AgentLoop test module

Add imports for SkillContext and ToolCall, then add a provider that captures the system prompt and returns supplied responses in sequence:

class SequencedProvider(LLMProvider):
    def __init__(self, responses: list[LLMResponse]) -> None:
        super().__init__()
        self.responses = list(responses)
        self.calls: list[dict[str, Any]] = []

    async def chat(self, messages: list[dict], tools: list[dict] | None = None, **_: Any) -> LLMResponse:
        self.calls.append({"messages": messages, "tools": tools})
        return self.responses.pop(0)

    def get_default_model(self) -> str:
        return "stub-model"


def _team_template_skill(name: str = "finance-report") -> SkillContext:
    return SkillContext(
        name=name,
        content="# Finance report",
        team_template={
            "version": 1,
            "strategy": "dag",
            "nodes": [{"node_id": "collect", "task": "Collect official sources"}],
        },
    )
  • Step 2: Write failing first-turn guidance and deterministic-primary tests
def test_root_task_with_template_adds_first_turn_team_routing_guidance(tmp_path) -> None:
    provider = RecordingProvider()
    loop = AgentLoop(loader=EngineLoader(workspace=tmp_path))

    asyncio.run(loop.process_direct(
        "compare financial reports",
        session_id="session",
        task_id="task-1",
        task_mode=True,
        pinned_skill_contexts=[_team_template_skill(), _team_template_skill("ignored")],
        provider_bundle=_bundle(provider),
    ))

    system_content = "\n".join(
        str(message["content"])
        for message in provider.messages_by_call[0]
        if message["role"] == "system"
    )
    assert "choose one execution path in this first response" in system_content
    assert "run_agent_team" in system_content
    assert '"skill_name":"finance-report"' in system_content
    assert "ignored" not in system_content


def test_empty_template_nodes_do_not_enable_first_turn_team_routing(tmp_path) -> None:
    provider = RecordingProvider()
    loop = AgentLoop(loader=EngineLoader(workspace=tmp_path))
    empty = SkillContext(name="empty", content="# Empty", team_template={"nodes": []})

    asyncio.run(loop.process_direct(
        "single lookup",
        session_id="session",
        task_id="task-1",
        task_mode=True,
        pinned_skill_contexts=[empty],
        provider_bundle=_bundle(provider),
    ))

    assert "choose one execution path in this first response" not in provider.system_prompts[0]

Extend RecordingProvider to retain messages_by_call and system_prompts, instead of creating a second nearly-identical fixture.

  • Step 3: Run the focused tests to verify failure

Run:

cd app-instance/backend && uv run pytest tests/unit/test_agent_loop.py -q

Expected: FAIL because no Main-Agent template selector or first-turn routing guidance exists.

  • Step 4: Add a private, immutable routing-selection value and selector in loop.py

Place this near AgentRunResult:

@dataclass(frozen=True, slots=True)
class _TeamTemplateRouting:
    skill_name: str
    template: dict[str, Any]
    ignored_skill_names: tuple[str, ...] = ()


def _select_main_agent_team_template(
    activated_skills: list[SkillContext],
) -> _TeamTemplateRouting | None:
    candidates = [
        skill
        for skill in activated_skills
        if isinstance(skill.team_template, dict)
        and isinstance(skill.team_template.get("nodes"), list)
        and bool(skill.team_template["nodes"])
    ]
    if not candidates:
        return None
    return _TeamTemplateRouting(
        skill_name=candidates[0].name,
        template=dict(candidates[0].team_template or {}),
        ignored_skill_names=tuple(skill.name for skill in candidates[1:]),
    )

This intentionally mirrors, but does not alter, TaskExecutionPlanner._select_team_template(): planner adaptation metadata and Main-Agent first-turn routing have different lifecycles. Do not move the helper into Planner or use Planner as a runtime dependency.

  • Step 5: Build compact guidance only when a root Task can actually invoke the Team tool

Replace the static-only Team section with a helper that accepts the routing value:

@staticmethod
def _team_template_routing_prompt(routing: _TeamTemplateRouting) -> str:
    template_payload = json.dumps(
        {"skill_name": routing.skill_name, "template": routing.template},
        ensure_ascii=False,
        separators=(",", ":"),
    )
    return (
        "# Task Agent Team Routing\n\n"
        "An active Skill provides this primary Team template:\n"
        f"{template_payload}\n\n"
        "Before beginning ordinary work, choose one execution path in this first response. "
        "For staged collection, extraction, validation, comparison, research, or reporting represented "
        "by this template, call `run_agent_team` now using task-only nodes derived from it. "
        "Choose single-agent execution only for a plainly one-step request, an explicit request not to "
        "delegate, or a template that does not fit the immediate request. Do not call ordinary tools "
        "before this choice. If choosing single-agent execution, call ordinary tools or answer normally "
        "without explaining the routing choice."
    )

In _process_direct_impl(), calculate the value after activated Skills are resolved. Pass it into _extra_guidance_sections() only when all are true:

is_root_task = task_mode and not parent_session_id and not str(source or "").startswith("team:")
team_tool_available = any(spec.name == AGENT_TEAM_TOOL_NAME for spec in selected_tool_specs)
routing_template = _select_main_agent_team_template(activated_skills)
routing_enabled = is_root_task and team_tool_available and routing_template is not None

Keep TASK_AGENT_TEAM_CAPABILITY_PROMPT for ordinary root Task capability exposure. Do not add guidance for empty/invalid templates, child Team nodes, non-Task runs, or when run_agent_team is absent.

  • Step 6: Run the focused tests to verify they pass

Run:

cd app-instance/backend && uv run pytest tests/unit/test_agent_loop.py -q

Expected: PASS, including existing root-Team-tool visibility coverage.

Task 2: Lock First-Turn Mode and Persist the Machine-Readable Decision

Files:

  • Modify: app-instance/backend/beaver/engine/loop.py

  • Modify: app-instance/backend/tests/unit/test_agent_loop.py

  • Step 1: Write failing Team, Single, mixed-call, and legacy behavior tests

Use ToolCall objects in a SequencedProvider; use the normal registered run_agent_team only with a tool_executor_override stub so the test checks AgentLoop routing without starting a real Team.

def test_first_turn_agent_team_call_records_team_mode_and_executes_only_team(tmp_path) -> None:
    provider = SequencedProvider([
        LLMResponse(
            content="",
            tool_calls=[
                ToolCall(id="team", name="run_agent_team", arguments={"nodes": [{"node_id": "collect", "task": "Collect"}]}),
                ToolCall(id="search", name="web_search", arguments={"query": "must not run"}),
            ],
            provider_name="stub",
            model="stub-model",
        ),
        LLMResponse(content="done", provider_name="stub", model="stub-model"),
    ])
    executor = CapturingToolExecutor()
    loop = AgentLoop(loader=EngineLoader(workspace=tmp_path))

    asyncio.run(loop.process_direct(
        "compare finance reports",
        session_id="session",
        task_id="task-1",
        task_mode=True,
        pinned_skill_contexts=[_team_template_skill()],
        provider_bundle=_bundle(provider),
        tool_executor_override=executor,
    ))

    assert [call.name for call in executor.calls] == ["run_agent_team"]
    decision = _event_payload(loop, "session", "execution_mode_selected")
    assert decision == {
        "task_id": "task-1",
        "execution_mode": "team",
        "routing_source": "main_agent_first_turn",
        "primary_template_skill": "finance-report",
        "ignored_template_skills": [],
    }


def test_first_turn_ordinary_tool_records_single_and_blocks_later_team_call(tmp_path) -> None:
    provider = SequencedProvider([
        LLMResponse(
            content="",
            tool_calls=[ToolCall(id="search", name="web_search", arguments={"query": "one step"})],
            provider_name="stub",
            model="stub-model",
        ),
        LLMResponse(
            content="",
            tool_calls=[ToolCall(id="team", name="run_agent_team", arguments={"nodes": [{"node_id": "late", "task": "Late"}]})],
            provider_name="stub",
            model="stub-model",
        ),
        LLMResponse(content="done", provider_name="stub", model="stub-model"),
    ])
    executor = CapturingToolExecutor()
    loop = AgentLoop(loader=EngineLoader(workspace=tmp_path))

    asyncio.run(loop.process_direct(
        "one-step lookup",
        session_id="session",
        task_id="task-1",
        task_mode=True,
        pinned_skill_contexts=[_team_template_skill()],
        provider_bundle=_bundle(provider),
        tool_executor_override=executor,
    ))

    assert [call.name for call in executor.calls] == ["web_search"]
    assert "run_agent_team" not in provider.tool_names_by_call[1]
    late_result = _tool_result_by_call_id(loop, "session", "team")
    assert late_result["error"] == "execution_mode_locked_single"
    assert _event_payload(loop, "session", "execution_mode_selected")["execution_mode"] == "single"

Also assert that a root Task with no template keeps run_agent_team in every provider schema, preserving legacy behavior.

  • Step 2: Run the test module to verify failure

Run:

cd app-instance/backend && uv run pytest tests/unit/test_agent_loop.py -q

Expected: FAIL because AgentLoop has no decision event, no per-run mode state, and executes mixed/later Team calls normally.

  • Step 3: Add mode state and first-response classification immediately after the provider response

Before the while True loop set:

routing_mode: str | None = None

After response = await provider.chat(**chat_kwargs) and before serializing/appending the assistant message, classify only once when routing_enabled is true:

if routing_enabled and routing_mode is None:
    tool_names = {self._tool_call_name(tool_call) for tool_call in response.tool_calls}
    routing_mode = "team" if AGENT_TEAM_TOOL_NAME in tool_names else "single"
    append_message(
        resolved_session_id,
        run_id=resolved_run_id,
        role="system",
        event_type="execution_mode_selected",
        event_payload={
            "task_id": task_id,
            "attempt_index": attempt_index,
            "execution_mode": routing_mode,
            "routing_source": "main_agent_first_turn",
            "primary_template_skill": routing_template.skill_name,
            "ignored_template_skills": list(routing_template.ignored_skill_names),
        },
        content=None,
        context_visible=False,
        source=source,
        title=title,
        model=final_model,
        user_id=user_id,
    )

Do not write this event for runs without routing_enabled. A no-tool first response selects single before the normal final-answer branch.

  • Step 4: Apply the no-mixed-mode and single-lock behavior at the call boundary

Add two private helpers:

@staticmethod
def _calls_for_execution_mode(tool_calls: list[Any], routing_mode: str | None) -> list[Any]:
    if routing_mode != "team":
        return list(tool_calls)
    return [call for call in tool_calls if AgentLoop._tool_call_name(call) == AGENT_TEAM_TOOL_NAME]


@staticmethod
def _team_locked_result(tool_call: Any) -> ToolResult:
    return ToolResult(
        success=False,
        content="Agent Team can only be selected in the first response of this Task run.",
        tool_name=AGENT_TEAM_TOOL_NAME,
        error="execution_mode_locked_single",
    )

Then use these rules in the loop:

  1. If first response selected team, serialize and execute only run_agent_team; ordinary calls from that response receive no execution.
  2. If routing_mode == "single" and the current iteration is after the first response, remove run_agent_team from chat_kwargs["tools"] before calling the provider.
  3. If a later response nevertheless emits run_agent_team, do not call the executor. Add _team_locked_result() through the same tool_result_recorded and context-builder paths as ordinary tool failures.
  4. Preserve the normal concurrent-execution decision for the remaining executable calls.

Keep original tool schemas and ToolExecutor behavior unchanged for no-template runs. Do not alter allowed_tool_names behavior or use it as a source of tools.

  • Step 5: Run focused AgentLoop tests

Run:

cd app-instance/backend && uv run pytest tests/unit/test_agent_loop.py -q

Expected: PASS. The test verifies no extra provider call is made solely for mode selection, mixed first-turn calls execute only Team, and late Team calls are rejected after Single mode.

Task 3: Project Routing Decisions into the Task Process Stream

Files:

  • Modify: app-instance/backend/beaver/services/process_service.py

  • Modify: app-instance/backend/tests/unit/test_process_projection.py

  • Step 1: Write a failing process-projection test

def test_process_projection_maps_main_agent_execution_mode_selection(tmp_path: Path) -> None:
    session = SessionManager(tmp_path)
    run_store = RunMemoryStore(tmp_path / "memory" / "runs")
    session.append_message(
        "web:test",
        run_id="main-run",
        role="system",
        event_type="execution_mode_selected",
        event_payload={
            "task_id": "task-1",
            "attempt_index": 1,
            "execution_mode": "team",
            "routing_source": "main_agent_first_turn",
            "primary_template_skill": "finance-report",
            "ignored_template_skills": ["secondary-template"],
        },
        context_visible=False,
    )

    projection = SessionProcessProjector(session, run_store).project("web:test")

    event = next(item for item in projection["events"] if item["kind"] == "execution_mode_selected")
    assert event["status"] == "done"
    assert event["metadata"]["execution_mode"] == "team"
    assert event["metadata"]["primary_template_skill"] == "finance-report"
    assert event["metadata"]["ignored_template_skills"] == ["secondary-template"]
  • Step 2: Run it to verify failure

Run:

cd app-instance/backend && uv run pytest tests/unit/test_process_projection.py -q

Expected: FAIL with StopIteration, because the projector ignores execution_mode_selected.

  • Step 3: Add a narrow event branch in SessionProcessProjector.project()

Place the branch after skill_activation_snapshotted and before Team-completion handling:

elif record.event_type == "execution_mode_selected":
    run_id = record.run_id or root_run_id
    parent_run_id = root_run_id if run_id != root_run_id else None
    mode = str(payload.get("execution_mode") or "single")
    add_event(
        event_id=_event_id(record, "execution-mode"),
        run_id=str(run_id),
        parent_run_id=parent_run_id,
        kind="execution_mode_selected",
        actor_type="system",
        actor_id="main-agent-router",
        actor_name="Main Agent",
        text="Main Agent selected Team execution." if mode == "team" else "Main Agent selected single-agent execution.",
        created_at=created_at,
        status="done",
        metadata={
            **dict(payload),
            "task_id": task_id,
            "attempt_index": attempt_index,
            "timeline_type": "execution_mode",
        },
    )

Do not add frontend rendering in this task. The projected event is enough for the existing API/process payload and future UI work.

  • Step 4: Run focused projection tests

Run:

cd app-instance/backend && uv run pytest tests/unit/test_process_projection.py -q

Expected: PASS.

Task 4: Regression Verification and Steven Docker Acceptance

Files:

  • No new production files.

  • Modify only test fixtures/assertions from Tasks 13 if a compatibility issue is exposed.

  • Step 1: Run all directly affected unit tests

Run:

cd app-instance/backend && uv run pytest \
  tests/unit/test_agent_loop.py \
  tests/unit/test_process_projection.py \
  tests/unit/test_team_node_tool_policy.py \
  tests/unit/test_task_execution_planner.py \
  tests/unit/test_task_team_synthesis_outcome.py \
  -q

Expected: PASS. Do not change tests outside this feature to accommodate unrelated Python/TestClient cleanup behavior.

  • Step 2: Verify static quality for the scoped diff

Run:

git diff --check -- \
  app-instance/backend/beaver/engine/loop.py \
  app-instance/backend/beaver/services/process_service.py \
  app-instance/backend/tests/unit/test_agent_loop.py \
  app-instance/backend/tests/unit/test_process_projection.py

Expected: no output and exit status 0.

  • Step 3: Deploy only after local tests pass and verify the real MGM/Galaxy route

Run the established Steven deployment procedure:

docker cp app-instance/backend/beaver app-instance-steven:/opt/app/backend/
docker cp app-instance/backend/pyproject.toml app-instance-steven:/opt/app/backend/pyproject.toml
docker exec app-instance-steven sh -lc 'cd /opt/app/backend && uv pip install --system --no-deps -e .'
docker restart app-instance-steven
curl -fsS http://127.0.0.1:20000/api/ping

Create a fresh MGM/Galaxy finance-report Task and inspect its session/task process events. Acceptance requires this ordering:

skill_activation_snapshotted
→ execution_mode_selected {execution_mode: team, primary_template_skill: mgm-galaxy-financial-chart-report-safe}
→ tool_call_started: run_agent_team
→ run_agent_team_debug: invoke_started
→ task_team_run_completed or task_team_run_failed

The first ordinary web_search must be emitted by a Team node, never by the root Main Agent. If the model intentionally selects Single for this known staged finance template, stop and inspect the captured first-turn system prompt/tool call before changing code.

  • Step 4: Report and stop

Report modified files, focused test outputs, Docker health, real-task event ordering, git diff --stat, and remaining model-mediated routing risk. Do not stage or commit unless the user explicitly asks.

Plan Self-Review

  • Scope coverage: primary template selection, first-turn guidance, mode selection without extra LLM round/reason text, mode lock, raw event persistence, process projection, and real MGM/Galaxy verification are covered.
  • Compatibility: no-template runs keep existing Team-tool exposure; child Team nodes still cannot see the tool; graph/runtime/tool scope/evidence/synthesis behavior is untouched.
  • Out-of-scope guard: no Planner heuristic change, no frontend, no fixed roles, no nested Team, and no new Team model appear in the implementation tasks.