# Task Evidence and Validation Implementation Plan > **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. **Goal:** Replace truncated task validation context with structured run evidence, clearer validation statuses, bounded real team parallelism, no-tools synthesis, and explicit user-review states. **Architecture:** Keep the existing `AgentService`, `TeamService`, and `AgentLoop` shape. Add focused evidence models/builders under `beaver.tasks`, propagate evidence through coordinator result models, and make validation/status logic consume `status` rather than interpreting `passed=False` as failure. Parallel team batches use isolated `AgentLoop` instances behind a semaphore, while sequence and DAG keep the current shared-loop behavior. **Tech Stack:** Python dataclasses, pytest, asyncio, existing Beaver session/task/coordinator services, existing provider and tool abstractions. --- ## File Structure - Create `app-instance/backend/beaver/tasks/evidence.py` - Owns `ToolEvidence`, `RunEvidence`, `TaskEvidencePacket`, `EvidenceBuilder`, and text renderers. - Reads `SessionManager` event records; does not call providers or tools. - Modify `app-instance/backend/beaver/tasks/models.py` - Add validation `status`, `evidence_gaps`, derived `passed`, derived `accepted`, and task status helper properties. - Modify `app-instance/backend/beaver/tasks/service.py` - Add validation-aware status transitions and API fields for `is_execution_active` and `requires_user_action`. - Modify `app-instance/backend/beaver/tasks/validation.py` - Accept `TaskEvidencePacket`, render full evidence without fixed caps, parse new validator fields, and preserve raw response metadata. - Modify `app-instance/backend/beaver/tasks/__init__.py` - Export evidence models and builder. - Modify `app-instance/backend/beaver/coordinator/models.py` - Add `evidence` to `NodeRunResult` and include it in `to_dict()`. - Modify `app-instance/backend/beaver/coordinator/local.py` - Build evidence after delegated runs and support isolated-loop execution mode. - Modify `app-instance/backend/beaver/coordinator/execution/scheduler.py` - Preserve failed-node evidence, keep result order deterministic, and bound parallel isolated runs. - Modify `app-instance/backend/beaver/services/team_service.py` - Pass parallel concurrency configuration to the scheduler. - Modify `app-instance/backend/beaver/services/agent_service.py` - Build task evidence packets, make team synthesis no-tools by default, pass evidence into validation, record validation debug metadata, and branch on validation `status`. - Modify `app-instance/backend/beaver/engine/loop.py` - Add no-tools finalization at the tool iteration limit and slim `llm_request_snapshotted` by default. - Add or update tests in: - `app-instance/backend/tests/unit/test_task_evidence.py` - `app-instance/backend/tests/unit/test_task_mode_feedback.py` - `app-instance/backend/tests/unit/test_agent_team_v1.py` - `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` ## Task 1: Validation Status Model and Task State Helpers **Files:** - Modify: `app-instance/backend/beaver/tasks/models.py` - Modify: `app-instance/backend/beaver/tasks/service.py` - Modify: `app-instance/backend/beaver/tasks/__init__.py` - Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` - [ ] **Step 1: Write failing tests for validation status semantics** Add these tests to `app-instance/backend/tests/unit/test_task_mode_feedback.py` near the existing validation tests: ```python def test_validation_result_status_drives_accepted_and_passed() -> None: accepted = ValidationResult(status="accepted", score=0.9, validator="test") insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test") rejected = ValidationResult(status="rejected", score=0.9, validator="test") assert accepted.passed is True assert accepted.accepted is True assert insufficient.passed is False assert insufficient.accepted is False assert rejected.passed is False assert rejected.accepted is False def test_validation_result_from_legacy_payload_maps_to_status() -> None: accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"}) rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"}) assert accepted is not None assert accepted.status == "accepted" assert rejected is not None assert rejected.status == "rejected" ``` - [ ] **Step 2: Run tests to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v ``` Expected: FAIL because `ValidationResult` does not accept `status` or `evidence_gaps`. - [ ] **Step 3: Implement validation status fields** In `app-instance/backend/beaver/tasks/models.py`, replace the `ValidationResult` dataclass with this shape: ```python ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"] @dataclass(slots=True) class ValidationResult: status: ValidationStatus = "rejected" score: float = 0.0 issues: list[str] = field(default_factory=list) missing_requirements: list[str] = field(default_factory=list) evidence_gaps: list[str] = field(default_factory=list) recommended_revision_prompt: str = "" validator: str = "heuristic" def __init__( self, *, status: ValidationStatus | None = None, passed: bool | None = None, score: float = 0.0, issues: list[str] | None = None, missing_requirements: list[str] | None = None, evidence_gaps: list[str] | None = None, recommended_revision_prompt: str = "", validator: str = "heuristic", ) -> None: self.status = status or ("accepted" if passed and score >= 0.75 else "rejected") self.score = max(0.0, min(1.0, float(score or 0.0))) self.issues = list(issues or []) self.missing_requirements = list(missing_requirements or []) self.evidence_gaps = list(evidence_gaps or []) self.recommended_revision_prompt = recommended_revision_prompt self.validator = validator @property def passed(self) -> bool: return self.status == "accepted" @property def accepted(self) -> bool: return self.status == "accepted" def to_dict(self) -> dict[str, Any]: return { "status": self.status, "passed": self.passed, "score": self.score, "issues": list(self.issues), "missing_requirements": list(self.missing_requirements), "evidence_gaps": list(self.evidence_gaps), "recommended_revision_prompt": self.recommended_revision_prompt, "validator": self.validator, "accepted": self.accepted, } @classmethod def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None": if not isinstance(payload, dict): return None raw_status = payload.get("status") status: ValidationStatus | None = ( raw_status if raw_status in {"accepted", "rejected", "insufficient_evidence", "validator_error"} else None ) return cls( status=status, passed=bool(payload.get("passed")) if status is None else None, score=float(payload.get("score", 0.0) or 0.0), issues=[str(item) for item in payload.get("issues") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []], recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), validator=str(payload.get("validator") or "unknown"), ) ``` Also import `Literal` from `typing`. - [ ] **Step 4: Add task status helper properties** In `TaskRecord`, add: ```python @property def is_execution_active(self) -> bool: return self.status in {"running", "validating"} @property def requires_user_action(self) -> bool: return self.status in {"awaiting_feedback", "needs_review", "needs_revision"} ``` Add `needs_review` to `TASK_OPEN_STATUSES`; keep `failed` out of the open set: ```python TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"} ``` - [ ] **Step 5: Expose status helpers in API payloads** In `TaskService.to_api_dict()`, add: ```python payload["is_open"] = task.is_open payload["is_execution_active"] = task.is_execution_active payload["requires_user_action"] = task.requires_user_action ``` - [ ] **Step 6: Run status model tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v ``` Expected: PASS. - [ ] **Step 7: Commit** ```bash git add app-instance/backend/beaver/tasks/models.py app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_mode_feedback.py git commit -m "feat(task): add validation status semantics" ``` ## Task 2: Evidence Models, Builder, and Renderer **Files:** - Create: `app-instance/backend/beaver/tasks/evidence.py` - Modify: `app-instance/backend/beaver/tasks/__init__.py` - Test: `app-instance/backend/tests/unit/test_task_evidence.py` - [ ] **Step 1: Write evidence builder tests** Create `app-instance/backend/tests/unit/test_task_evidence.py`: ```python from __future__ import annotations from pathlib import Path from beaver.engine.session.manager import SessionManager from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, render_task_evidence def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None: session_manager = SessionManager(tmp_path) session_id = "session-1" run_id = "run-1" long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO" session_manager.ensure_session(session_id, source="test") session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?") session_manager.append_message( session_id, run_id=run_id, role="tool", event_type="tool_result_recorded", event_payload={"success": True, "url": "https://example.test/match"}, content=long_content, tool_name="web_fetch", tool_call_id="call-1", ) session_manager.append_message( session_id, run_id=run_id, role="system", event_type="run_completed", event_payload={"finish_reason": "stop"}, content="Manchester United won 3-2.", finish_reason="stop", context_visible=False, ) evidence = EvidenceBuilder(session_manager).build_run_evidence( session_id=session_id, run_id=run_id, output_text="Manchester United won 3-2.", finish_reason="stop", ) rendered = render_task_evidence( TaskEvidencePacket( task_id="task-1", attempt_index=1, main_run=evidence, team_runs=[], team_node_results=[], final_output="Manchester United won 3-2.", ) ) assert evidence.tool_results[0].content == long_content assert "MAN 3 FT 2 NFO" in rendered assert "https://example.test/match" in rendered def test_render_task_evidence_includes_failed_team_run_tool_results() -> None: run = RunEvidence( run_id="run-team", session_id="session-team", output_text="Tool loop stopped.", finish_reason="max_tool_iterations", transcript=[], tool_results=[], warnings=["finish_reason=max_tool_iterations"], ) packet = TaskEvidencePacket( task_id="task-1", attempt_index=2, main_run=None, team_runs=[run], team_node_results=[], final_output="partial answer", ) rendered = render_task_evidence(packet) assert "finish_reason=max_tool_iterations" in rendered assert "partial answer" in rendered ``` - [ ] **Step 2: Run tests to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_evidence.py -v ``` Expected: FAIL because `beaver.tasks.evidence` does not exist. - [ ] **Step 3: Implement evidence models and builder** Create `app-instance/backend/beaver/tasks/evidence.py`: ```python """Structured evidence for task synthesis and validation.""" from __future__ import annotations from dataclasses import dataclass, field from typing import Any @dataclass(slots=True) class ToolEvidence: tool_name: str tool_call_id: str | None content: str event_payload: dict[str, Any] = field(default_factory=dict) url: str | None = None title: str | None = None created_at: str | None = None def to_dict(self) -> dict[str, Any]: return { "tool_name": self.tool_name, "tool_call_id": self.tool_call_id, "content": self.content, "event_payload": dict(self.event_payload), "url": self.url, "title": self.title, "created_at": self.created_at, } @dataclass(slots=True) class RunEvidence: run_id: str session_id: str output_text: str finish_reason: str transcript: list[dict[str, Any]] = field(default_factory=list) tool_results: list[ToolEvidence] = field(default_factory=list) warnings: list[str] = field(default_factory=list) def to_dict(self) -> dict[str, Any]: return { "run_id": self.run_id, "session_id": self.session_id, "output_text": self.output_text, "finish_reason": self.finish_reason, "transcript": list(self.transcript), "tool_results": [item.to_dict() for item in self.tool_results], "warnings": list(self.warnings), } @dataclass(slots=True) class TaskEvidencePacket: task_id: str attempt_index: int main_run: RunEvidence | None team_runs: list[RunEvidence] = field(default_factory=list) team_node_results: list[Any] = field(default_factory=list) final_output: str = "" def to_dict(self) -> dict[str, Any]: return { "task_id": self.task_id, "attempt_index": self.attempt_index, "main_run": self.main_run.to_dict() if self.main_run else None, "team_runs": [item.to_dict() for item in self.team_runs], "team_node_results": [ item.to_dict() if hasattr(item, "to_dict") else dict(item) for item in self.team_node_results ], "final_output": self.final_output, } class EvidenceBuilder: def __init__(self, session_manager: Any) -> None: self.session_manager = session_manager def build_run_evidence( self, *, session_id: str, run_id: str, output_text: str, finish_reason: str, ) -> RunEvidence: events = self.session_manager.get_run_event_records(session_id, run_id) transcript: list[dict[str, Any]] = [] tool_results: list[ToolEvidence] = [] warnings: list[str] = [] for event in events: payload = dict(event.event_payload or {}) transcript.append( { "role": event.role, "event_type": event.event_type, "content": event.content, "tool_name": event.tool_name, "tool_call_id": event.tool_call_id, "finish_reason": event.finish_reason, "event_payload": payload, } ) if event.event_type == "tool_result_recorded": tool_results.append( ToolEvidence( tool_name=event.tool_name or "tool", tool_call_id=event.tool_call_id, content=event.content or "", event_payload=payload, url=_optional_str(payload.get("url")), title=_optional_str(payload.get("title")), created_at=_optional_str(payload.get("created_at")), ) ) if finish_reason and finish_reason != "stop": warnings.append(f"finish_reason={finish_reason}") return RunEvidence( run_id=run_id, session_id=session_id, output_text=output_text, finish_reason=finish_reason, transcript=transcript, tool_results=tool_results, warnings=warnings, ) def render_task_evidence(packet: TaskEvidencePacket) -> str: sections = [ f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}", f"Final output:\n{packet.final_output}", ] if packet.main_run is not None: sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run)) if packet.team_runs: sections.append( "Team run evidence:\n" + "\n\n".join(render_run_evidence(item) for item in packet.team_runs) ) if packet.team_node_results: lines = [] for item in packet.team_node_results: lines.append( f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} " f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}" ) sections.append("Team node results:\n" + "\n".join(lines)) return "\n\n".join(section for section in sections if section.strip()) def render_run_evidence(evidence: RunEvidence) -> str: lines = [ f"run_id={evidence.run_id}", f"session_id={evidence.session_id}", f"finish_reason={evidence.finish_reason}", ] if evidence.output_text: lines.append(f"output:\n{evidence.output_text}") if evidence.warnings: lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings)) if evidence.tool_results: lines.append( "tool_results:\n" + "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results) ) return "\n".join(lines) def _render_tool_evidence(item: ToolEvidence) -> str: header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}" metadata = [] if item.url: metadata.append(f"url={item.url}") if item.title: metadata.append(f"title={item.title}") return "\n".join([header, *metadata, item.content]) def _optional_str(value: Any) -> str | None: return str(value) if value is not None else None ``` - [ ] **Step 4: Export evidence models** In `app-instance/backend/beaver/tasks/__init__.py`, add: ```python from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence ``` Add those names to `__all__`. - [ ] **Step 5: Run evidence tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_evidence.py -v ``` Expected: PASS. - [ ] **Step 6: Commit** ```bash git add app-instance/backend/beaver/tasks/evidence.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_evidence.py git commit -m "feat(task): add structured run evidence" ``` ## Task 3: Team Evidence Propagation **Files:** - Modify: `app-instance/backend/beaver/coordinator/models.py` - Modify: `app-instance/backend/beaver/coordinator/local.py` - Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py` - Test: `app-instance/backend/tests/unit/test_agent_team_v1.py` - [ ] **Step 1: Write failing test for failed-node evidence** Add to `app-instance/backend/tests/unit/test_agent_team_v1.py`: ```python def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None: loop = _loop(tmp_path) provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")]) envelope = DelegationEnvelope( parent_task_id="task-parent", parent_session_id="session-root", parent_run_id="run-root", agent=AgentDescriptor(name="researcher", role="research"), task="research the requested topic", node_id="research", ) result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider))) assert result.success is False assert result.evidence is not None assert result.evidence.output_text == "partial evidence" assert result.evidence.finish_reason == "max_tool_iterations" ``` - [ ] **Step 2: Run test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop -v ``` Expected: FAIL because `NodeRunResult` has no `evidence`. - [ ] **Step 3: Add evidence field to `NodeRunResult`** In `app-instance/backend/beaver/coordinator/models.py`, under `TYPE_CHECKING`, import `RunEvidence`: ```python if TYPE_CHECKING: from beaver.engine.context import SkillContext from beaver.tasks.evidence import RunEvidence ``` Update `NodeRunResult`: ```python evidence: "RunEvidence | None" = None ``` Update `to_dict()`: ```python "evidence": self.evidence.to_dict() if self.evidence is not None else None, ``` - [ ] **Step 4: Build evidence in delegated runs** In `app-instance/backend/beaver/coordinator/local.py`, import `EvidenceBuilder`: ```python from beaver.tasks.evidence import EvidenceBuilder ``` After `result = await runner(...)`, add: ```python loaded = self.loop.boot() evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence( session_id=result.session_id, run_id=result.run_id, output_text=result.output_text, finish_reason=result.finish_reason, ) ``` Pass `evidence=evidence` into `NodeRunResult(...)`. - [ ] **Step 5: Preserve evidence in scheduler summaries** In `TeamGraphScheduler._summarize()`, keep `summary_parts` as-is for user-facing text, but do not filter or drop failed `node_results`; the `TeamRunResult(node_results=results, ...)` call already keeps them. Update failure lines to mention evidence: ```python f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}" ``` - [ ] **Step 6: Run team evidence tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop tests/unit/test_agent_team_v1.py::test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs -v ``` Expected: PASS. - [ ] **Step 7: Commit** ```bash git add app-instance/backend/beaver/coordinator/models.py app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/tests/unit/test_agent_team_v1.py git commit -m "feat(team): preserve node run evidence" ``` ## Task 4: Task Evidence Packet, No-Tools Team Synthesis, and Validation Input **Files:** - Modify: `app-instance/backend/beaver/services/agent_service.py` - Modify: `app-instance/backend/beaver/tasks/validation.py` - Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` - [ ] **Step 1: Update stub provider to record tools** In `StubProvider` inside `test_task_mode_feedback.py`, change `self.calls` and `chat()`: ```python self.calls: list[dict[str, object]] = [] ``` ```python self.calls.append({"messages": messages, "tools": tools, "model": model}) ``` Update existing assertions from `main_provider.calls[0][0]["content"]` to: ```python main_provider.calls[0]["messages"][0]["content"] ``` - [ ] **Step 2: Write failing no-tools synthesis test** Add: ```python def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None: main_provider = StubProvider( [ LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model") ] ) sub_provider = StubProvider( [ LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model") ] ) validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")]) service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=StubTaskExecutionPlanner([_team_plan()]), validation_service=validation, ) ) result = asyncio.run( service.process_direct( "implement team-backed workflow", session_id="web:team-no-tools", provider_bundle=_provider_bundle(main_provider), team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider), ) ) assert result.output_text == "final synthesized answer" assert main_provider.calls[0]["tools"] is None assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"] assert "Task evidence packet" in validation.calls[0]["evidence_text"] ``` Update `StubValidationService` to record calls: ```python self.calls: list[dict] = [] ``` ```python self.calls.append(kwargs) ``` - [ ] **Step 3: Run no-tools synthesis test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v ``` Expected: FAIL because final synthesis still gets tools and validation does not receive `evidence_text`. - [ ] **Step 4: Build team evidence context in agent service** In `agent_service.py`, import: ```python from beaver.tasks.evidence import EvidenceBuilder, TaskEvidencePacket, render_task_evidence ``` Add helper methods near `_team_execution_context()`: ```python def _team_run_evidence(self, result: TeamRunResult | None) -> list[RunEvidence]: if result is None: return [] return [node.evidence for node in result.node_results if node.evidence is not None] def _build_task_evidence_packet( self, *, session_manager: Any, task: TaskRecord, attempt_index: int, result: AgentRunResult, team_result: TeamRunResult | None, ) -> TaskEvidencePacket: main_run = EvidenceBuilder(session_manager).build_run_evidence( session_id=result.session_id, run_id=result.run_id, output_text=result.output_text, finish_reason=result.finish_reason, ) return TaskEvidencePacket( task_id=task.task_id, attempt_index=attempt_index, main_run=main_run, team_runs=self._team_run_evidence(team_result), team_node_results=list(team_result.node_results) if team_result is not None else [], final_output=result.output_text, ) ``` Keep imports type-safe by importing `RunEvidence` if the helper uses it in annotations. - [ ] **Step 5: Make team synthesis no-tools by default** Inside `_run_task_mode`, keep `team_result: TeamRunResult | None = None` before the team block. Before `runner(message, **attempt_kwargs)`, add: ```python if plan.is_team and team_execution_context: attempt_kwargs["include_tools"] = False attempt_kwargs["max_tool_iterations"] = 0 ``` Replace team context building with evidence rendering: ```python if team_result is not None: team_packet = TaskEvidencePacket( task_id=task.task_id, attempt_index=attempt_index, main_run=None, team_runs=self._team_run_evidence(team_result), team_node_results=list(team_result.node_results), final_output="", ) team_execution_context = self._join_context( self._team_execution_context(plan, team_result), "Rendered team evidence:\n" + render_task_evidence(team_packet), ) ``` - [ ] **Step 6: Pass evidence packet to validation** After the final `result` returns, build the packet and pass it to validation: ```python evidence_packet = self._build_task_evidence_packet( session_manager=session_manager, task=task, attempt_index=attempt_index, result=result, team_result=team_result, ) evidence_text = render_task_evidence(evidence_packet) validation = await validation_service.validate_task_result( task=task, user_message=message, final_output=result.output_text, evidence_packet=evidence_packet, evidence_text=evidence_text, transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id), tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id), team_summaries=team_summaries, provider_bundle=provider_bundle, ) ``` - [ ] **Step 7: Update validation service signature** In `validation.py`, add parameters: ```python evidence_packet: Any | None = None evidence_text: str = "" ``` Pass `evidence_text` into `_validate_with_provider()`. In the prompt, replace fixed excerpt emphasis with: ```python f"Evidence packet:\n{evidence_text}\n\n" ``` Keep old `transcript_excerpt`, `tool_summaries`, and `team_summaries` in the prompt only when `evidence_text` is empty: ```python legacy_context = "" if evidence_text else ( f"Transcript excerpt:\n{transcript_excerpt}\n\n" f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n" f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n" ) ``` - [ ] **Step 8: Run team synthesis test** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v ``` Expected: PASS. - [ ] **Step 9: Commit** ```bash git add app-instance/backend/beaver/services/agent_service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/tests/unit/test_task_mode_feedback.py git commit -m "feat(task): synthesize and validate from evidence" ``` ## Task 5: Validation Status Transitions and Debug Metadata **Files:** - Modify: `app-instance/backend/beaver/tasks/service.py` - Modify: `app-instance/backend/beaver/tasks/validation.py` - Modify: `app-instance/backend/beaver/services/agent_service.py` - Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py` - [ ] **Step 1: Write failing status transition tests** Add: ```python def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None: service = AgentService( loader=EngineLoader( workspace=tmp_path, task_execution_planner=_single_planner(), validation_service=StubValidationService( [ValidationResult(status="insufficient_evidence", score=0.4, evidence_gaps=["source missing"], validator="test")] ), ) ) result = asyncio.run( service.process_direct( "answer with uncertain evidence", session_id="web:needs-review", provider_bundle=_bundle("possible answer"), ) ) loaded = service.create_loop().boot() task = loaded.task_service.get_task(result.task_id) events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id) validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted") assert task is not None assert task.status == "needs_review" assert task.requires_user_action is True assert task.is_execution_active is False assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence" assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0 ``` - [ ] **Step 2: Run transition test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review -v ``` Expected: FAIL because `record_validation()` still sets `awaiting_feedback`. - [ ] **Step 3: Implement status transition method** In `TaskService`, change `record_validation()` signature: ```python def record_validation( self, task_id: str, run_id: str, validation: ValidationResult, *, final_attempt: bool = True, has_usable_answer: bool = True, ) -> TaskRecord: ``` Use this transition: ```python if validation.status == "accepted": task.status = "awaiting_feedback" elif validation.status in {"insufficient_evidence", "validator_error"}: task.status = "needs_review" elif validation.status == "rejected" and not final_attempt: task.status = "needs_revision" elif validation.status == "rejected" and has_usable_answer: task.status = "needs_review" else: task.status = "failed" task.closed_at = now task.close_reason = "automatic validation rejected the final attempt" ``` Keep `task.validation_result = validation.to_dict()`. - [ ] **Step 4: Pass final-attempt metadata from agent service** In `_run_task_mode`, replace: ```python task = task_service.record_validation(task.task_id, result.run_id, validation) ``` with: ```python task = task_service.record_validation( task.task_id, result.run_id, validation, final_attempt=(attempt_index == 2 or validation.status in {"accepted", "insufficient_evidence", "validator_error"}), has_usable_answer=bool(result.output_text.strip()) and "Tool loop stopped after reaching the configured iteration limit." not in result.output_text, ) ``` Then retry only on rejected first attempts: ```python if validation.status == "rejected" and attempt_index == 1: session_manager.set_run_context_visible(result.session_id, result.run_id, False) else: break ``` - [ ] **Step 5: Record validation debug metadata** Build debug payload in `agent_service.py` before appending `task_validation_snapshotted`: ```python validation_debug = { "evidence_run_ids": [ item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None ], "evidence_session_ids": [ item.session_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None ], "tool_result_count": sum( len(item.tool_results) for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None ), "evidence_length": len(evidence_text), } ``` Add it to the event payload: ```python "validation_debug": validation_debug, ``` - [ ] **Step 6: Parse new validator status** In `ValidationService._validate_with_provider()`, create result with: ```python status = payload.get("status") if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}: status = "accepted" if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75 else "rejected" return ValidationResult( status=status, score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))), issues=[str(item) for item in payload.get("issues") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []], recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), validator="llm", ) ``` For validator exceptions, return: ```python ValidationResult( status="validator_error", score=0.0, issues=[f"Validator failed: {exc}"], evidence_gaps=["Automatic validation failed before producing a reliable decision."], missing_requirements=["User review is required because automatic validation failed."], recommended_revision_prompt="Review the answer and evidence, then decide whether to revise or accept it.", validator="llm_error", ) ``` - [ ] **Step 7: Run transition tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review tests/unit/test_task_mode_feedback.py::test_task_mode_team_retry_hides_first_synthesis_run -v ``` Expected: PASS. - [ ] **Step 8: Commit** ```bash git add app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/beaver/services/agent_service.py app-instance/backend/tests/unit/test_task_mode_feedback.py git commit -m "feat(task): route validation status to review states" ``` ## Task 6: Tool Iteration No-Tools Finalization **Files:** - Modify: `app-instance/backend/beaver/engine/loop.py` - Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` - [ ] **Step 1: Write failing finalization test** Update `test_agent_loop_records_max_tool_iterations_as_failed_skill_effect` in `test_phase5_skills_runtime.py` so the stub provider has a third finalization response: ```python LLMResponse( content="Based on the available tool result, the container likely failed during startup.", finish_reason="stop", provider_name="stub", model="stub-model", ), ``` Change assertions: ```python assert result.finish_reason == "max_tool_iterations_finalized" assert "Based on the available tool result" in result.output_text assert "Tool loop stopped" not in result.output_text ``` - [ ] **Step 2: Run test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v ``` Expected: FAIL because the loop returns `max_tool_iterations`. - [ ] **Step 3: Add finalization helper** In `AgentLoop`, add: ```python async def _finalize_after_tool_limit( self, *, provider: Any, messages: list[dict[str, Any]], model: str, max_tokens: int, temperature: float, thinking_enabled: bool | None, ) -> str: final_messages = [ *messages, { "role": "system", "content": ( "The configured tool iteration budget is exhausted. " "Do not call tools. Produce the best final answer from the existing conversation " "and tool results. State uncertainty explicitly." ), }, ] kwargs: dict[str, Any] = { "messages": final_messages, "tools": None, "model": model, "max_tokens": max_tokens, "temperature": temperature, } if thinking_enabled is not None: kwargs["thinking_enabled"] = thinking_enabled response = await provider.chat(**kwargs) return (response.content or "").strip() ``` - [ ] **Step 4: Use helper at iteration limit** Replace the `if iterations >= resolved_max_tool_iterations:` block with: ```python if iterations >= resolved_max_tool_iterations: finalized = await self._finalize_after_tool_limit( provider=provider, messages=messages, model=final_model, max_tokens=resolved_max_tokens, temperature=resolved_temperature, thinking_enabled=thinking_enabled, ) final_text = finalized or "Tool loop stopped after reaching the configured iteration limit, and no final answer was produced." final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations" session_manager.append_message( resolved_session_id, run_id=resolved_run_id, role="assistant", event_type="assistant_message_added", event_payload={"task_id": task_id} if task_id else None, content=final_text, finish_reason=final_finish_reason, source=source, title=title, model=final_model, user_id=user_id, ) context_builder.add_assistant_message(messages, content=final_text) break ``` - [ ] **Step 5: Run finalization test** Run: ```bash cd app-instance/backend pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v ``` Expected: PASS. - [ ] **Step 6: Commit** ```bash git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py git commit -m "feat(engine): finalize after tool iteration limit" ``` ## Task 7: Bounded Parallel Team Execution **Files:** - Modify: `app-instance/backend/beaver/coordinator/local.py` - Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py` - Modify: `app-instance/backend/beaver/services/team_service.py` - Test: `app-instance/backend/tests/unit/test_agent_team_v1.py` - [ ] **Step 1: Write failing concurrency test** Add to `test_agent_team_v1.py`: ```python class BlockingProvider(RecordingProvider): def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None: super().__init__([_response(content)]) self.started = started self.release = release async def chat(self, *args, **kwargs) -> LLMResponse: self.started.set() await self.release.wait() return await super().chat(*args, **kwargs) def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None: loop = _loop(tmp_path) first_started = asyncio.Event() second_started = asyncio.Event() release = asyncio.Event() providers = { "one": BlockingProvider("one", first_started, release), "two": BlockingProvider("two", second_started, release), } graph = ExecutionGraph( strategy="parallel", nodes=[ ExecutionNode("one", "task one", AgentDescriptor(name="one")), ExecutionNode("two", "task two", AgentDescriptor(name="two")), ], ) async def run_case(): task = asyncio.create_task( TeamService(loop).run_team( graph, parent_task_id=None, parent_session_id="session-root", parent_run_id="run-root", provider_bundle_factory=lambda node: _bundle(providers[node.node_id]), ) ) await asyncio.wait_for(first_started.wait(), timeout=1) await asyncio.wait_for(second_started.wait(), timeout=1) release.set() return await task result = asyncio.run(run_case()) assert result.success is True assert [item.node_id for item in result.node_results] == ["one", "two"] ``` - [ ] **Step 2: Run test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops -v ``` Expected: FAIL or timeout because shared-loop execution serializes the providers. - [ ] **Step 3: Add isolated execution mode to local runner** In `LocalAgentRunner.run()`, add parameter: ```python execution_mode: str = "shared_loop", ``` Use helper: ```python target_loop = self.loop if execution_mode == "isolated_loop": target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader) runner = target_loop.process_direct if execution_mode == "isolated_loop" else ( self.loop.submit_direct if self.loop.is_running else self.loop.process_direct ) ``` Use `target_loop.boot()` when building evidence after the run. - [ ] **Step 4: Add scheduler concurrency limit** In `TeamGraphScheduler.__init__()`: ```python def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None: self.runner = runner self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes)) ``` Change `_run_parallel()`: ```python semaphore = asyncio.Semaphore(self.max_parallel_team_nodes) async def run_one(node: ExecutionNode) -> NodeRunResult: async with semaphore: return await self._run_node( node, dependency_outputs={}, execution_mode="isolated_loop", **kwargs, ) return list(await asyncio.gather(*(run_one(node) for node in nodes))) ``` Update `_run_node()` signature to accept `execution_mode: str = "shared_loop"` and pass it to `self.runner.run(...)`. - [ ] **Step 5: Wire limit through TeamService** In `TeamService.__init__()`: ```python def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None: self.loop = loop self.runner = LocalAgentRunner(loop) self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes) ``` - [ ] **Step 6: Run parallel tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops tests/unit/test_agent_team_v1.py::test_team_parallel_runs_all_nodes -v ``` Expected: PASS. - [ ] **Step 7: Commit** ```bash git add app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/beaver/services/team_service.py app-instance/backend/tests/unit/test_agent_team_v1.py git commit -m "feat(team): run parallel nodes with isolated loops" ``` ## Task 8: Slim LLM Request Snapshots **Files:** - Modify: `app-instance/backend/beaver/engine/loop.py` - Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py` - [ ] **Step 1: Write failing snapshot-size test** Add to `test_phase5_skills_runtime.py`: ```python def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None: loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler())) bundle = ProviderBundle( main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"), main_provider=StubProvider([LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]), ) result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle)) loaded = loop.boot() events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id) snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted") assert "message_count" in snapshot.event_payload assert "tool_names" in snapshot.event_payload assert "messages" not in snapshot.event_payload assert "tools" not in snapshot.event_payload ``` - [ ] **Step 2: Run test to verify failure** Run: ```bash cd app-instance/backend pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v ``` Expected: FAIL because snapshot payload still includes complete `messages` and `tools`. - [ ] **Step 3: Add compact snapshot payload** In `AgentLoop._process_direct_impl()`, before `session_manager.append_message(... event_type="llm_request_snapshotted" ...)`, add: ```python tool_names = [ str(tool.get("function", {}).get("name") or tool.get("name") or "tool") for tool in (tool_schemas or []) if isinstance(tool, dict) ] snapshot_payload = { "iteration": iterations, "provider_name": final_provider_name, "model": final_model, "message_count": len(messages), "tool_names": tool_names, "message_char_length": len(json.dumps(messages, ensure_ascii=False, default=str)), "tool_schema_char_length": len(json.dumps(tool_schemas, ensure_ascii=False, default=str)), "max_tokens": resolved_max_tokens, "temperature": resolved_temperature, "thinking_enabled": thinking_enabled, } ``` Use `snapshot_payload` as `event_payload`. Use compact `content`: ```python content=json.dumps(snapshot_payload, ensure_ascii=False, default=str) ``` - [ ] **Step 4: Run compact snapshot test** Run: ```bash cd app-instance/backend pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v ``` Expected: PASS. - [ ] **Step 5: Commit** ```bash git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py git commit -m "chore(engine): compact llm request snapshots" ``` ## Task 9: Full Regression and Compatibility Sweep **Files:** - Modify only files required by failing compatibility tests. - Test: backend unit suite. - [ ] **Step 1: Run focused task/team/engine tests** Run: ```bash cd app-instance/backend pytest tests/unit/test_task_evidence.py tests/unit/test_task_mode_feedback.py tests/unit/test_agent_team_v1.py tests/unit/test_phase5_skills_runtime.py -v ``` Expected: PASS. - [ ] **Step 2: Run full backend unit tests** Run: ```bash cd app-instance/backend pytest tests/unit -v ``` Expected: PASS. - [ ] **Step 3: Inspect active task API projections** Run: ```bash cd app-instance/backend pytest tests/unit/test_active_task_api.py tests/unit/test_process_projection.py -v ``` Expected: PASS. If these fail because payloads lack `is_execution_active` or `requires_user_action`, update expected payloads to include the new fields and keep existing assertions. - [ ] **Step 4: Run repository status check** Run: ```bash git status --short ``` Expected: only files changed by the implementation tasks are listed. - [ ] **Step 5: Commit compatibility fixes** If Step 2 or Step 3 required fixes, commit them: ```bash git add app-instance/backend git commit -m "test(task): update validation evidence regressions" ``` If no fixes were needed, skip this commit. ## Self-Review Checklist - Spec coverage: - Complete evidence preservation: Tasks 2, 3, 4. - No fixed validation truncation: Task 4. - `status` over `passed`: Tasks 1, 5. - `needs_review` as user-action state: Tasks 1, 5. - No-tools team synthesis: Task 4. - Tool-limit finalization: Task 6. - Limited parallel team execution: Task 7. - Validation debug metadata: Task 5. - Compact LLM snapshots: Task 8. - Type consistency: - `TaskEvidencePacket`, `RunEvidence`, and `ToolEvidence` are defined before coordinator and service code references them. - New validation statuses are all handled by `ValidationResult`, `TaskService.record_validation()`, and `ValidationService`. - `NodeRunResult.evidence` is optional, so blocked or factory-error nodes can still be represented. - Execution order: - Tasks are ordered so each dependency exists before later tasks use it. - Every task has a focused test command and a commit point.