From 28a2627b1f4406eb8d8c43237d53cb6ab4d0a514 Mon Sep 17 00:00:00 2001 From: steven_li Date: Fri, 22 May 2026 10:47:03 +0800 Subject: [PATCH] docs: plan task evidence validation implementation --- .../2026-05-22-task-evidence-validation.md | 1528 +++++++++++++++++ 1 file changed, 1528 insertions(+) create mode 100644 docs/superpowers/plans/2026-05-22-task-evidence-validation.md diff --git a/docs/superpowers/plans/2026-05-22-task-evidence-validation.md b/docs/superpowers/plans/2026-05-22-task-evidence-validation.md new file mode 100644 index 0000000..84a90ed --- /dev/null +++ b/docs/superpowers/plans/2026-05-22-task-evidence-validation.md @@ -0,0 +1,1528 @@ +# Task Evidence and Validation Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace truncated task validation context with structured run evidence, clearer validation statuses, bounded real team parallelism, no-tools synthesis, and explicit user-review states. + +**Architecture:** Keep the existing `AgentService`, `TeamService`, and `AgentLoop` shape. Add focused evidence models/builders under `beaver.tasks`, propagate evidence through coordinator result models, and make validation/status logic consume `status` rather than interpreting `passed=False` as failure. Parallel team batches use isolated `AgentLoop` instances behind a semaphore, while sequence and DAG keep the current shared-loop behavior. + +**Tech Stack:** Python dataclasses, pytest, asyncio, existing Beaver session/task/coordinator services, existing provider and tool abstractions. + +--- + +## File Structure + +- Create `app-instance/backend/beaver/tasks/evidence.py` + - Owns `ToolEvidence`, `RunEvidence`, `TaskEvidencePacket`, `EvidenceBuilder`, and text renderers. + - Reads `SessionManager` event records; does not call providers or tools. +- Modify `app-instance/backend/beaver/tasks/models.py` + - Add validation `status`, `evidence_gaps`, derived `passed`, derived `accepted`, and task status helper properties. +- Modify `app-instance/backend/beaver/tasks/service.py` + - Add validation-aware status transitions and API fields for `is_execution_active` and `requires_user_action`. +- Modify `app-instance/backend/beaver/tasks/validation.py` + - Accept `TaskEvidencePacket`, render full evidence without fixed caps, parse new validator fields, and preserve raw response metadata. +- Modify `app-instance/backend/beaver/tasks/__init__.py` + - Export evidence models and builder. +- Modify `app-instance/backend/beaver/coordinator/models.py` + - Add `evidence` to `NodeRunResult` and include it in `to_dict()`. +- Modify `app-instance/backend/beaver/coordinator/local.py` + - Build evidence after delegated runs and support isolated-loop execution mode. +- Modify `app-instance/backend/beaver/coordinator/execution/scheduler.py` + - Preserve failed-node evidence, keep result order deterministic, and bound parallel isolated runs. +- Modify `app-instance/backend/beaver/services/team_service.py` + - Pass parallel concurrency configuration to the scheduler. +- Modify `app-instance/backend/beaver/services/agent_service.py` + - Build task evidence packets, make team synthesis no-tools by default, pass evidence into validation, record validation debug metadata, and branch on validation `status`. +- Modify `app-instance/backend/beaver/engine/loop.py` + - Add no-tools finalization at the tool iteration limit and slim `llm_request_snapshotted` by default. +- Add or update tests in: + - `app-instance/backend/tests/unit/test_task_evidence.py` + - `app-instance/backend/tests/unit/test_task_mode_feedback.py` + - `app-instance/backend/tests/unit/test_agent_team_v1.py` + - `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` + +## Task 1: Validation Status Model and Task State Helpers + +**Files:** +- Modify: `app-instance/backend/beaver/tasks/models.py` +- Modify: `app-instance/backend/beaver/tasks/service.py` +- Modify: `app-instance/backend/beaver/tasks/__init__.py` +- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` + +- [ ] **Step 1: Write failing tests for validation status semantics** + +Add these tests to `app-instance/backend/tests/unit/test_task_mode_feedback.py` near the existing validation tests: + +```python +def test_validation_result_status_drives_accepted_and_passed() -> None: + accepted = ValidationResult(status="accepted", score=0.9, validator="test") + insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test") + rejected = ValidationResult(status="rejected", score=0.9, validator="test") + + assert accepted.passed is True + assert accepted.accepted is True + assert insufficient.passed is False + assert insufficient.accepted is False + assert rejected.passed is False + assert rejected.accepted is False + + +def test_validation_result_from_legacy_payload_maps_to_status() -> None: + accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"}) + rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"}) + + assert accepted is not None + assert accepted.status == "accepted" + assert rejected is not None + assert rejected.status == "rejected" +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v +``` + +Expected: FAIL because `ValidationResult` does not accept `status` or `evidence_gaps`. + +- [ ] **Step 3: Implement validation status fields** + +In `app-instance/backend/beaver/tasks/models.py`, replace the `ValidationResult` dataclass with this shape: + +```python +ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"] + + +@dataclass(slots=True) +class ValidationResult: + status: ValidationStatus = "rejected" + score: float = 0.0 + issues: list[str] = field(default_factory=list) + missing_requirements: list[str] = field(default_factory=list) + evidence_gaps: list[str] = field(default_factory=list) + recommended_revision_prompt: str = "" + validator: str = "heuristic" + + def __init__( + self, + *, + status: ValidationStatus | None = None, + passed: bool | None = None, + score: float = 0.0, + issues: list[str] | None = None, + missing_requirements: list[str] | None = None, + evidence_gaps: list[str] | None = None, + recommended_revision_prompt: str = "", + validator: str = "heuristic", + ) -> None: + self.status = status or ("accepted" if passed and score >= 0.75 else "rejected") + self.score = max(0.0, min(1.0, float(score or 0.0))) + self.issues = list(issues or []) + self.missing_requirements = list(missing_requirements or []) + self.evidence_gaps = list(evidence_gaps or []) + self.recommended_revision_prompt = recommended_revision_prompt + self.validator = validator + + @property + def passed(self) -> bool: + return self.status == "accepted" + + @property + def accepted(self) -> bool: + return self.status == "accepted" + + def to_dict(self) -> dict[str, Any]: + return { + "status": self.status, + "passed": self.passed, + "score": self.score, + "issues": list(self.issues), + "missing_requirements": list(self.missing_requirements), + "evidence_gaps": list(self.evidence_gaps), + "recommended_revision_prompt": self.recommended_revision_prompt, + "validator": self.validator, + "accepted": self.accepted, + } + + @classmethod + def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None": + if not isinstance(payload, dict): + return None + raw_status = payload.get("status") + status: ValidationStatus | None = ( + raw_status + if raw_status in {"accepted", "rejected", "insufficient_evidence", "validator_error"} + else None + ) + return cls( + status=status, + passed=bool(payload.get("passed")) if status is None else None, + score=float(payload.get("score", 0.0) or 0.0), + issues=[str(item) for item in payload.get("issues") or []], + missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], + evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []], + recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), + validator=str(payload.get("validator") or "unknown"), + ) +``` + +Also import `Literal` from `typing`. + +- [ ] **Step 4: Add task status helper properties** + +In `TaskRecord`, add: + +```python +@property +def is_execution_active(self) -> bool: + return self.status in {"running", "validating"} + +@property +def requires_user_action(self) -> bool: + return self.status in {"awaiting_feedback", "needs_review", "needs_revision"} +``` + +Add `needs_review` to `TASK_OPEN_STATUSES`; keep `failed` out of the open set: + +```python +TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"} +``` + +- [ ] **Step 5: Expose status helpers in API payloads** + +In `TaskService.to_api_dict()`, add: + +```python +payload["is_open"] = task.is_open +payload["is_execution_active"] = task.is_execution_active +payload["requires_user_action"] = task.requires_user_action +``` + +- [ ] **Step 6: Run status model tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add app-instance/backend/beaver/tasks/models.py app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_mode_feedback.py +git commit -m "feat(task): add validation status semantics" +``` + +## Task 2: Evidence Models, Builder, and Renderer + +**Files:** +- Create: `app-instance/backend/beaver/tasks/evidence.py` +- Modify: `app-instance/backend/beaver/tasks/__init__.py` +- Test: `app-instance/backend/tests/unit/test_task_evidence.py` + +- [ ] **Step 1: Write evidence builder tests** + +Create `app-instance/backend/tests/unit/test_task_evidence.py`: + +```python +from __future__ import annotations + +from pathlib import Path + +from beaver.engine.session.manager import SessionManager +from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, render_task_evidence + + +def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None: + session_manager = SessionManager(tmp_path) + session_id = "session-1" + run_id = "run-1" + long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO" + session_manager.ensure_session(session_id, source="test") + session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?") + session_manager.append_message( + session_id, + run_id=run_id, + role="tool", + event_type="tool_result_recorded", + event_payload={"success": True, "url": "https://example.test/match"}, + content=long_content, + tool_name="web_fetch", + tool_call_id="call-1", + ) + session_manager.append_message( + session_id, + run_id=run_id, + role="system", + event_type="run_completed", + event_payload={"finish_reason": "stop"}, + content="Manchester United won 3-2.", + finish_reason="stop", + context_visible=False, + ) + + evidence = EvidenceBuilder(session_manager).build_run_evidence( + session_id=session_id, + run_id=run_id, + output_text="Manchester United won 3-2.", + finish_reason="stop", + ) + rendered = render_task_evidence( + TaskEvidencePacket( + task_id="task-1", + attempt_index=1, + main_run=evidence, + team_runs=[], + team_node_results=[], + final_output="Manchester United won 3-2.", + ) + ) + + assert evidence.tool_results[0].content == long_content + assert "MAN 3 FT 2 NFO" in rendered + assert "https://example.test/match" in rendered + + +def test_render_task_evidence_includes_failed_team_run_tool_results() -> None: + run = RunEvidence( + run_id="run-team", + session_id="session-team", + output_text="Tool loop stopped.", + finish_reason="max_tool_iterations", + transcript=[], + tool_results=[], + warnings=["finish_reason=max_tool_iterations"], + ) + packet = TaskEvidencePacket( + task_id="task-1", + attempt_index=2, + main_run=None, + team_runs=[run], + team_node_results=[], + final_output="partial answer", + ) + + rendered = render_task_evidence(packet) + + assert "finish_reason=max_tool_iterations" in rendered + assert "partial answer" in rendered +``` + +- [ ] **Step 2: Run tests to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_evidence.py -v +``` + +Expected: FAIL because `beaver.tasks.evidence` does not exist. + +- [ ] **Step 3: Implement evidence models and builder** + +Create `app-instance/backend/beaver/tasks/evidence.py`: + +```python +"""Structured evidence for task synthesis and validation.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass(slots=True) +class ToolEvidence: + tool_name: str + tool_call_id: str | None + content: str + event_payload: dict[str, Any] = field(default_factory=dict) + url: str | None = None + title: str | None = None + created_at: str | None = None + + def to_dict(self) -> dict[str, Any]: + return { + "tool_name": self.tool_name, + "tool_call_id": self.tool_call_id, + "content": self.content, + "event_payload": dict(self.event_payload), + "url": self.url, + "title": self.title, + "created_at": self.created_at, + } + + +@dataclass(slots=True) +class RunEvidence: + run_id: str + session_id: str + output_text: str + finish_reason: str + transcript: list[dict[str, Any]] = field(default_factory=list) + tool_results: list[ToolEvidence] = field(default_factory=list) + warnings: list[str] = field(default_factory=list) + + def to_dict(self) -> dict[str, Any]: + return { + "run_id": self.run_id, + "session_id": self.session_id, + "output_text": self.output_text, + "finish_reason": self.finish_reason, + "transcript": list(self.transcript), + "tool_results": [item.to_dict() for item in self.tool_results], + "warnings": list(self.warnings), + } + + +@dataclass(slots=True) +class TaskEvidencePacket: + task_id: str + attempt_index: int + main_run: RunEvidence | None + team_runs: list[RunEvidence] = field(default_factory=list) + team_node_results: list[Any] = field(default_factory=list) + final_output: str = "" + + def to_dict(self) -> dict[str, Any]: + return { + "task_id": self.task_id, + "attempt_index": self.attempt_index, + "main_run": self.main_run.to_dict() if self.main_run else None, + "team_runs": [item.to_dict() for item in self.team_runs], + "team_node_results": [ + item.to_dict() if hasattr(item, "to_dict") else dict(item) + for item in self.team_node_results + ], + "final_output": self.final_output, + } + + +class EvidenceBuilder: + def __init__(self, session_manager: Any) -> None: + self.session_manager = session_manager + + def build_run_evidence( + self, + *, + session_id: str, + run_id: str, + output_text: str, + finish_reason: str, + ) -> RunEvidence: + events = self.session_manager.get_run_event_records(session_id, run_id) + transcript: list[dict[str, Any]] = [] + tool_results: list[ToolEvidence] = [] + warnings: list[str] = [] + for event in events: + payload = dict(event.event_payload or {}) + transcript.append( + { + "role": event.role, + "event_type": event.event_type, + "content": event.content, + "tool_name": event.tool_name, + "tool_call_id": event.tool_call_id, + "finish_reason": event.finish_reason, + "event_payload": payload, + } + ) + if event.event_type == "tool_result_recorded": + tool_results.append( + ToolEvidence( + tool_name=event.tool_name or "tool", + tool_call_id=event.tool_call_id, + content=event.content or "", + event_payload=payload, + url=_optional_str(payload.get("url")), + title=_optional_str(payload.get("title")), + created_at=_optional_str(payload.get("created_at")), + ) + ) + if finish_reason and finish_reason != "stop": + warnings.append(f"finish_reason={finish_reason}") + return RunEvidence( + run_id=run_id, + session_id=session_id, + output_text=output_text, + finish_reason=finish_reason, + transcript=transcript, + tool_results=tool_results, + warnings=warnings, + ) + + +def render_task_evidence(packet: TaskEvidencePacket) -> str: + sections = [ + f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}", + f"Final output:\n{packet.final_output}", + ] + if packet.main_run is not None: + sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run)) + if packet.team_runs: + sections.append( + "Team run evidence:\n" + + "\n\n".join(render_run_evidence(item) for item in packet.team_runs) + ) + if packet.team_node_results: + lines = [] + for item in packet.team_node_results: + lines.append( + f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} " + f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}" + ) + sections.append("Team node results:\n" + "\n".join(lines)) + return "\n\n".join(section for section in sections if section.strip()) + + +def render_run_evidence(evidence: RunEvidence) -> str: + lines = [ + f"run_id={evidence.run_id}", + f"session_id={evidence.session_id}", + f"finish_reason={evidence.finish_reason}", + ] + if evidence.output_text: + lines.append(f"output:\n{evidence.output_text}") + if evidence.warnings: + lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings)) + if evidence.tool_results: + lines.append( + "tool_results:\n" + + "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results) + ) + return "\n".join(lines) + + +def _render_tool_evidence(item: ToolEvidence) -> str: + header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}" + metadata = [] + if item.url: + metadata.append(f"url={item.url}") + if item.title: + metadata.append(f"title={item.title}") + return "\n".join([header, *metadata, item.content]) + + +def _optional_str(value: Any) -> str | None: + return str(value) if value is not None else None +``` + +- [ ] **Step 4: Export evidence models** + +In `app-instance/backend/beaver/tasks/__init__.py`, add: + +```python +from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence +``` + +Add those names to `__all__`. + +- [ ] **Step 5: Run evidence tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_evidence.py -v +``` + +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app-instance/backend/beaver/tasks/evidence.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_evidence.py +git commit -m "feat(task): add structured run evidence" +``` + +## Task 3: Team Evidence Propagation + +**Files:** +- Modify: `app-instance/backend/beaver/coordinator/models.py` +- Modify: `app-instance/backend/beaver/coordinator/local.py` +- Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py` +- Test: `app-instance/backend/tests/unit/test_agent_team_v1.py` + +- [ ] **Step 1: Write failing test for failed-node evidence** + +Add to `app-instance/backend/tests/unit/test_agent_team_v1.py`: + +```python +def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None: + loop = _loop(tmp_path) + provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")]) + envelope = DelegationEnvelope( + parent_task_id="task-parent", + parent_session_id="session-root", + parent_run_id="run-root", + agent=AgentDescriptor(name="researcher", role="research"), + task="research the requested topic", + node_id="research", + ) + + result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider))) + + assert result.success is False + assert result.evidence is not None + assert result.evidence.output_text == "partial evidence" + assert result.evidence.finish_reason == "max_tool_iterations" +``` + +- [ ] **Step 2: Run test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop -v +``` + +Expected: FAIL because `NodeRunResult` has no `evidence`. + +- [ ] **Step 3: Add evidence field to `NodeRunResult`** + +In `app-instance/backend/beaver/coordinator/models.py`, under `TYPE_CHECKING`, import `RunEvidence`: + +```python +if TYPE_CHECKING: + from beaver.engine.context import SkillContext + from beaver.tasks.evidence import RunEvidence +``` + +Update `NodeRunResult`: + +```python +evidence: "RunEvidence | None" = None +``` + +Update `to_dict()`: + +```python +"evidence": self.evidence.to_dict() if self.evidence is not None else None, +``` + +- [ ] **Step 4: Build evidence in delegated runs** + +In `app-instance/backend/beaver/coordinator/local.py`, import `EvidenceBuilder`: + +```python +from beaver.tasks.evidence import EvidenceBuilder +``` + +After `result = await runner(...)`, add: + +```python +loaded = self.loop.boot() +evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence( + session_id=result.session_id, + run_id=result.run_id, + output_text=result.output_text, + finish_reason=result.finish_reason, +) +``` + +Pass `evidence=evidence` into `NodeRunResult(...)`. + +- [ ] **Step 5: Preserve evidence in scheduler summaries** + +In `TeamGraphScheduler._summarize()`, keep `summary_parts` as-is for user-facing text, but do not filter or drop failed `node_results`; the `TeamRunResult(node_results=results, ...)` call already keeps them. Update failure lines to mention evidence: + +```python +f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}" +``` + +- [ ] **Step 6: Run team evidence tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop tests/unit/test_agent_team_v1.py::test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs -v +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add app-instance/backend/beaver/coordinator/models.py app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/tests/unit/test_agent_team_v1.py +git commit -m "feat(team): preserve node run evidence" +``` + +## Task 4: Task Evidence Packet, No-Tools Team Synthesis, and Validation Input + +**Files:** +- Modify: `app-instance/backend/beaver/services/agent_service.py` +- Modify: `app-instance/backend/beaver/tasks/validation.py` +- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` + +- [ ] **Step 1: Update stub provider to record tools** + +In `StubProvider` inside `test_task_mode_feedback.py`, change `self.calls` and `chat()`: + +```python +self.calls: list[dict[str, object]] = [] +``` + +```python +self.calls.append({"messages": messages, "tools": tools, "model": model}) +``` + +Update existing assertions from `main_provider.calls[0][0]["content"]` to: + +```python +main_provider.calls[0]["messages"][0]["content"] +``` + +- [ ] **Step 2: Write failing no-tools synthesis test** + +Add: + +```python +def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None: + main_provider = StubProvider( + [ + LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model") + ] + ) + sub_provider = StubProvider( + [ + LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model") + ] + ) + validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")]) + service = AgentService( + loader=EngineLoader( + workspace=tmp_path, + task_execution_planner=StubTaskExecutionPlanner([_team_plan()]), + validation_service=validation, + ) + ) + + result = asyncio.run( + service.process_direct( + "implement team-backed workflow", + session_id="web:team-no-tools", + provider_bundle=_provider_bundle(main_provider), + team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider), + ) + ) + + assert result.output_text == "final synthesized answer" + assert main_provider.calls[0]["tools"] is None + assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"] + assert "Task evidence packet" in validation.calls[0]["evidence_text"] +``` + +Update `StubValidationService` to record calls: + +```python +self.calls: list[dict] = [] +``` + +```python +self.calls.append(kwargs) +``` + +- [ ] **Step 3: Run no-tools synthesis test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v +``` + +Expected: FAIL because final synthesis still gets tools and validation does not receive `evidence_text`. + +- [ ] **Step 4: Build team evidence context in agent service** + +In `agent_service.py`, import: + +```python +from beaver.tasks.evidence import EvidenceBuilder, TaskEvidencePacket, render_task_evidence +``` + +Add helper methods near `_team_execution_context()`: + +```python +def _team_run_evidence(self, result: TeamRunResult | None) -> list[RunEvidence]: + if result is None: + return [] + return [node.evidence for node in result.node_results if node.evidence is not None] + +def _build_task_evidence_packet( + self, + *, + session_manager: Any, + task: TaskRecord, + attempt_index: int, + result: AgentRunResult, + team_result: TeamRunResult | None, +) -> TaskEvidencePacket: + main_run = EvidenceBuilder(session_manager).build_run_evidence( + session_id=result.session_id, + run_id=result.run_id, + output_text=result.output_text, + finish_reason=result.finish_reason, + ) + return TaskEvidencePacket( + task_id=task.task_id, + attempt_index=attempt_index, + main_run=main_run, + team_runs=self._team_run_evidence(team_result), + team_node_results=list(team_result.node_results) if team_result is not None else [], + final_output=result.output_text, + ) +``` + +Keep imports type-safe by importing `RunEvidence` if the helper uses it in annotations. + +- [ ] **Step 5: Make team synthesis no-tools by default** + +Inside `_run_task_mode`, keep `team_result: TeamRunResult | None = None` before the team block. Before `runner(message, **attempt_kwargs)`, add: + +```python +if plan.is_team and team_execution_context: + attempt_kwargs["include_tools"] = False + attempt_kwargs["max_tool_iterations"] = 0 +``` + +Replace team context building with evidence rendering: + +```python +if team_result is not None: + team_packet = TaskEvidencePacket( + task_id=task.task_id, + attempt_index=attempt_index, + main_run=None, + team_runs=self._team_run_evidence(team_result), + team_node_results=list(team_result.node_results), + final_output="", + ) + team_execution_context = self._join_context( + self._team_execution_context(plan, team_result), + "Rendered team evidence:\n" + render_task_evidence(team_packet), + ) +``` + +- [ ] **Step 6: Pass evidence packet to validation** + +After the final `result` returns, build the packet and pass it to validation: + +```python +evidence_packet = self._build_task_evidence_packet( + session_manager=session_manager, + task=task, + attempt_index=attempt_index, + result=result, + team_result=team_result, +) +evidence_text = render_task_evidence(evidence_packet) +validation = await validation_service.validate_task_result( + task=task, + user_message=message, + final_output=result.output_text, + evidence_packet=evidence_packet, + evidence_text=evidence_text, + transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id), + tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id), + team_summaries=team_summaries, + provider_bundle=provider_bundle, +) +``` + +- [ ] **Step 7: Update validation service signature** + +In `validation.py`, add parameters: + +```python +evidence_packet: Any | None = None +evidence_text: str = "" +``` + +Pass `evidence_text` into `_validate_with_provider()`. In the prompt, replace fixed excerpt emphasis with: + +```python +f"Evidence packet:\n{evidence_text}\n\n" +``` + +Keep old `transcript_excerpt`, `tool_summaries`, and `team_summaries` in the prompt only when `evidence_text` is empty: + +```python +legacy_context = "" if evidence_text else ( + f"Transcript excerpt:\n{transcript_excerpt}\n\n" + f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n" + f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n" +) +``` + +- [ ] **Step 8: Run team synthesis test** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v +``` + +Expected: PASS. + +- [ ] **Step 9: Commit** + +```bash +git add app-instance/backend/beaver/services/agent_service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/tests/unit/test_task_mode_feedback.py +git commit -m "feat(task): synthesize and validate from evidence" +``` + +## Task 5: Validation Status Transitions and Debug Metadata + +**Files:** +- Modify: `app-instance/backend/beaver/tasks/service.py` +- Modify: `app-instance/backend/beaver/tasks/validation.py` +- Modify: `app-instance/backend/beaver/services/agent_service.py` +- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` + +- [ ] **Step 1: Write failing status transition tests** + +Add: + +```python +def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None: + service = AgentService( + loader=EngineLoader( + workspace=tmp_path, + task_execution_planner=_single_planner(), + validation_service=StubValidationService( + [ValidationResult(status="insufficient_evidence", score=0.4, evidence_gaps=["source missing"], validator="test")] + ), + ) + ) + + result = asyncio.run( + service.process_direct( + "answer with uncertain evidence", + session_id="web:needs-review", + provider_bundle=_bundle("possible answer"), + ) + ) + loaded = service.create_loop().boot() + task = loaded.task_service.get_task(result.task_id) + events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id) + validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted") + + assert task is not None + assert task.status == "needs_review" + assert task.requires_user_action is True + assert task.is_execution_active is False + assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence" + assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0 +``` + +- [ ] **Step 2: Run transition test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review -v +``` + +Expected: FAIL because `record_validation()` still sets `awaiting_feedback`. + +- [ ] **Step 3: Implement status transition method** + +In `TaskService`, change `record_validation()` signature: + +```python +def record_validation( + self, + task_id: str, + run_id: str, + validation: ValidationResult, + *, + final_attempt: bool = True, + has_usable_answer: bool = True, +) -> TaskRecord: +``` + +Use this transition: + +```python +if validation.status == "accepted": + task.status = "awaiting_feedback" +elif validation.status in {"insufficient_evidence", "validator_error"}: + task.status = "needs_review" +elif validation.status == "rejected" and not final_attempt: + task.status = "needs_revision" +elif validation.status == "rejected" and has_usable_answer: + task.status = "needs_review" +else: + task.status = "failed" + task.closed_at = now + task.close_reason = "automatic validation rejected the final attempt" +``` + +Keep `task.validation_result = validation.to_dict()`. + +- [ ] **Step 4: Pass final-attempt metadata from agent service** + +In `_run_task_mode`, replace: + +```python +task = task_service.record_validation(task.task_id, result.run_id, validation) +``` + +with: + +```python +task = task_service.record_validation( + task.task_id, + result.run_id, + validation, + final_attempt=(attempt_index == 2 or validation.status in {"accepted", "insufficient_evidence", "validator_error"}), + has_usable_answer=bool(result.output_text.strip()) + and "Tool loop stopped after reaching the configured iteration limit." not in result.output_text, +) +``` + +Then retry only on rejected first attempts: + +```python +if validation.status == "rejected" and attempt_index == 1: + session_manager.set_run_context_visible(result.session_id, result.run_id, False) +else: + break +``` + +- [ ] **Step 5: Record validation debug metadata** + +Build debug payload in `agent_service.py` before appending `task_validation_snapshotted`: + +```python +validation_debug = { + "evidence_run_ids": [ + item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None + ], + "evidence_session_ids": [ + item.session_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None + ], + "tool_result_count": sum( + len(item.tool_results) for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None + ), + "evidence_length": len(evidence_text), +} +``` + +Add it to the event payload: + +```python +"validation_debug": validation_debug, +``` + +- [ ] **Step 6: Parse new validator status** + +In `ValidationService._validate_with_provider()`, create result with: + +```python +status = payload.get("status") +if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}: + status = "accepted" if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75 else "rejected" +return ValidationResult( + status=status, + score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))), + issues=[str(item) for item in payload.get("issues") or []], + missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], + evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []], + recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), + validator="llm", +) +``` + +For validator exceptions, return: + +```python +ValidationResult( + status="validator_error", + score=0.0, + issues=[f"Validator failed: {exc}"], + evidence_gaps=["Automatic validation failed before producing a reliable decision."], + missing_requirements=["User review is required because automatic validation failed."], + recommended_revision_prompt="Review the answer and evidence, then decide whether to revise or accept it.", + validator="llm_error", +) +``` + +- [ ] **Step 7: Run transition tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review tests/unit/test_task_mode_feedback.py::test_task_mode_team_retry_hides_first_synthesis_run -v +``` + +Expected: PASS. + +- [ ] **Step 8: Commit** + +```bash +git add app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/beaver/services/agent_service.py app-instance/backend/tests/unit/test_task_mode_feedback.py +git commit -m "feat(task): route validation status to review states" +``` + +## Task 6: Tool Iteration No-Tools Finalization + +**Files:** +- Modify: `app-instance/backend/beaver/engine/loop.py` +- Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` + +- [ ] **Step 1: Write failing finalization test** + +Update `test_agent_loop_records_max_tool_iterations_as_failed_skill_effect` in `test_phase5_skills_runtime.py` so the stub provider has a third finalization response: + +```python +LLMResponse( + content="Based on the available tool result, the container likely failed during startup.", + finish_reason="stop", + provider_name="stub", + model="stub-model", +), +``` + +Change assertions: + +```python +assert result.finish_reason == "max_tool_iterations_finalized" +assert "Based on the available tool result" in result.output_text +assert "Tool loop stopped" not in result.output_text +``` + +- [ ] **Step 2: Run test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v +``` + +Expected: FAIL because the loop returns `max_tool_iterations`. + +- [ ] **Step 3: Add finalization helper** + +In `AgentLoop`, add: + +```python +async def _finalize_after_tool_limit( + self, + *, + provider: Any, + messages: list[dict[str, Any]], + model: str, + max_tokens: int, + temperature: float, + thinking_enabled: bool | None, +) -> str: + final_messages = [ + *messages, + { + "role": "system", + "content": ( + "The configured tool iteration budget is exhausted. " + "Do not call tools. Produce the best final answer from the existing conversation " + "and tool results. State uncertainty explicitly." + ), + }, + ] + kwargs: dict[str, Any] = { + "messages": final_messages, + "tools": None, + "model": model, + "max_tokens": max_tokens, + "temperature": temperature, + } + if thinking_enabled is not None: + kwargs["thinking_enabled"] = thinking_enabled + response = await provider.chat(**kwargs) + return (response.content or "").strip() +``` + +- [ ] **Step 4: Use helper at iteration limit** + +Replace the `if iterations >= resolved_max_tool_iterations:` block with: + +```python +if iterations >= resolved_max_tool_iterations: + finalized = await self._finalize_after_tool_limit( + provider=provider, + messages=messages, + model=final_model, + max_tokens=resolved_max_tokens, + temperature=resolved_temperature, + thinking_enabled=thinking_enabled, + ) + final_text = finalized or "Tool loop stopped after reaching the configured iteration limit, and no final answer was produced." + final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations" + session_manager.append_message( + resolved_session_id, + run_id=resolved_run_id, + role="assistant", + event_type="assistant_message_added", + event_payload={"task_id": task_id} if task_id else None, + content=final_text, + finish_reason=final_finish_reason, + source=source, + title=title, + model=final_model, + user_id=user_id, + ) + context_builder.add_assistant_message(messages, content=final_text) + break +``` + +- [ ] **Step 5: Run finalization test** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v +``` + +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py +git commit -m "feat(engine): finalize after tool iteration limit" +``` + +## Task 7: Bounded Parallel Team Execution + +**Files:** +- Modify: `app-instance/backend/beaver/coordinator/local.py` +- Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py` +- Modify: `app-instance/backend/beaver/services/team_service.py` +- Test: `app-instance/backend/tests/unit/test_agent_team_v1.py` + +- [ ] **Step 1: Write failing concurrency test** + +Add to `test_agent_team_v1.py`: + +```python +class BlockingProvider(RecordingProvider): + def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None: + super().__init__([_response(content)]) + self.started = started + self.release = release + + async def chat(self, *args, **kwargs) -> LLMResponse: + self.started.set() + await self.release.wait() + return await super().chat(*args, **kwargs) + + +def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None: + loop = _loop(tmp_path) + first_started = asyncio.Event() + second_started = asyncio.Event() + release = asyncio.Event() + providers = { + "one": BlockingProvider("one", first_started, release), + "two": BlockingProvider("two", second_started, release), + } + graph = ExecutionGraph( + strategy="parallel", + nodes=[ + ExecutionNode("one", "task one", AgentDescriptor(name="one")), + ExecutionNode("two", "task two", AgentDescriptor(name="two")), + ], + ) + + async def run_case(): + task = asyncio.create_task( + TeamService(loop).run_team( + graph, + parent_task_id=None, + parent_session_id="session-root", + parent_run_id="run-root", + provider_bundle_factory=lambda node: _bundle(providers[node.node_id]), + ) + ) + await asyncio.wait_for(first_started.wait(), timeout=1) + await asyncio.wait_for(second_started.wait(), timeout=1) + release.set() + return await task + + result = asyncio.run(run_case()) + + assert result.success is True + assert [item.node_id for item in result.node_results] == ["one", "two"] +``` + +- [ ] **Step 2: Run test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops -v +``` + +Expected: FAIL or timeout because shared-loop execution serializes the providers. + +- [ ] **Step 3: Add isolated execution mode to local runner** + +In `LocalAgentRunner.run()`, add parameter: + +```python +execution_mode: str = "shared_loop", +``` + +Use helper: + +```python +target_loop = self.loop +if execution_mode == "isolated_loop": + target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader) +runner = target_loop.process_direct if execution_mode == "isolated_loop" else ( + self.loop.submit_direct if self.loop.is_running else self.loop.process_direct +) +``` + +Use `target_loop.boot()` when building evidence after the run. + +- [ ] **Step 4: Add scheduler concurrency limit** + +In `TeamGraphScheduler.__init__()`: + +```python +def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None: + self.runner = runner + self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes)) +``` + +Change `_run_parallel()`: + +```python +semaphore = asyncio.Semaphore(self.max_parallel_team_nodes) + +async def run_one(node: ExecutionNode) -> NodeRunResult: + async with semaphore: + return await self._run_node( + node, + dependency_outputs={}, + execution_mode="isolated_loop", + **kwargs, + ) + +return list(await asyncio.gather(*(run_one(node) for node in nodes))) +``` + +Update `_run_node()` signature to accept `execution_mode: str = "shared_loop"` and pass it to `self.runner.run(...)`. + +- [ ] **Step 5: Wire limit through TeamService** + +In `TeamService.__init__()`: + +```python +def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None: + self.loop = loop + self.runner = LocalAgentRunner(loop) + self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes) +``` + +- [ ] **Step 6: Run parallel tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops tests/unit/test_agent_team_v1.py::test_team_parallel_runs_all_nodes -v +``` + +Expected: PASS. + +- [ ] **Step 7: Commit** + +```bash +git add app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/beaver/services/team_service.py app-instance/backend/tests/unit/test_agent_team_v1.py +git commit -m "feat(team): run parallel nodes with isolated loops" +``` + +## Task 8: Slim LLM Request Snapshots + +**Files:** +- Modify: `app-instance/backend/beaver/engine/loop.py` +- Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` + +- [ ] **Step 1: Write failing snapshot-size test** + +Add to `test_phase5_skills_runtime.py`: + +```python +def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None: + loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler())) + bundle = ProviderBundle( + main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"), + main_provider=StubProvider([LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]), + ) + + result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle)) + loaded = loop.boot() + events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id) + snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted") + + assert "message_count" in snapshot.event_payload + assert "tool_names" in snapshot.event_payload + assert "messages" not in snapshot.event_payload + assert "tools" not in snapshot.event_payload +``` + +- [ ] **Step 2: Run test to verify failure** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v +``` + +Expected: FAIL because snapshot payload still includes complete `messages` and `tools`. + +- [ ] **Step 3: Add compact snapshot payload** + +In `AgentLoop._process_direct_impl()`, before `session_manager.append_message(... event_type="llm_request_snapshotted" ...)`, add: + +```python +tool_names = [ + str(tool.get("function", {}).get("name") or tool.get("name") or "tool") + for tool in (tool_schemas or []) + if isinstance(tool, dict) +] +snapshot_payload = { + "iteration": iterations, + "provider_name": final_provider_name, + "model": final_model, + "message_count": len(messages), + "tool_names": tool_names, + "message_char_length": len(json.dumps(messages, ensure_ascii=False, default=str)), + "tool_schema_char_length": len(json.dumps(tool_schemas, ensure_ascii=False, default=str)), + "max_tokens": resolved_max_tokens, + "temperature": resolved_temperature, + "thinking_enabled": thinking_enabled, +} +``` + +Use `snapshot_payload` as `event_payload`. Use compact `content`: + +```python +content=json.dumps(snapshot_payload, ensure_ascii=False, default=str) +``` + +- [ ] **Step 4: Run compact snapshot test** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v +``` + +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py +git commit -m "chore(engine): compact llm request snapshots" +``` + +## Task 9: Full Regression and Compatibility Sweep + +**Files:** +- Modify only files required by failing compatibility tests. +- Test: backend unit suite. + +- [ ] **Step 1: Run focused task/team/engine tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_task_evidence.py tests/unit/test_task_mode_feedback.py tests/unit/test_agent_team_v1.py tests/unit/test_phase5_skills_runtime.py -v +``` + +Expected: PASS. + +- [ ] **Step 2: Run full backend unit tests** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit -v +``` + +Expected: PASS. + +- [ ] **Step 3: Inspect active task API projections** + +Run: + +```bash +cd app-instance/backend +pytest tests/unit/test_active_task_api.py tests/unit/test_process_projection.py -v +``` + +Expected: PASS. If these fail because payloads lack `is_execution_active` or `requires_user_action`, update expected payloads to include the new fields and keep existing assertions. + +- [ ] **Step 4: Run repository status check** + +Run: + +```bash +git status --short +``` + +Expected: only files changed by the implementation tasks are listed. + +- [ ] **Step 5: Commit compatibility fixes** + +If Step 2 or Step 3 required fixes, commit them: + +```bash +git add app-instance/backend +git commit -m "test(task): update validation evidence regressions" +``` + +If no fixes were needed, skip this commit. + +## Self-Review Checklist + +- Spec coverage: + - Complete evidence preservation: Tasks 2, 3, 4. + - No fixed validation truncation: Task 4. + - `status` over `passed`: Tasks 1, 5. + - `needs_review` as user-action state: Tasks 1, 5. + - No-tools team synthesis: Task 4. + - Tool-limit finalization: Task 6. + - Limited parallel team execution: Task 7. + - Validation debug metadata: Task 5. + - Compact LLM snapshots: Task 8. +- Type consistency: + - `TaskEvidencePacket`, `RunEvidence`, and `ToolEvidence` are defined before coordinator and service code references them. + - New validation statuses are all handled by `ValidationResult`, `TaskService.record_validation()`, and `ValidationService`. + - `NodeRunResult.evidence` is optional, so blocked or factory-error nodes can still be represented. +- Execution order: + - Tasks are ordered so each dependency exists before later tasks use it. + - Every task has a focused test command and a commit point.