Files
beaver_project/docs/superpowers/plans/2026-05-22-task-evidence-validation.md

1529 lines
50 KiB
Markdown

# Task Evidence and Validation Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Replace truncated task validation context with structured run evidence, clearer validation statuses, bounded real team parallelism, no-tools synthesis, and explicit user-review states.
**Architecture:** Keep the existing `AgentService`, `TeamService`, and `AgentLoop` shape. Add focused evidence models/builders under `beaver.tasks`, propagate evidence through coordinator result models, and make validation/status logic consume `status` rather than interpreting `passed=False` as failure. Parallel team batches use isolated `AgentLoop` instances behind a semaphore, while sequence and DAG keep the current shared-loop behavior.
**Tech Stack:** Python dataclasses, pytest, asyncio, existing Beaver session/task/coordinator services, existing provider and tool abstractions.
---
## File Structure
- Create `app-instance/backend/beaver/tasks/evidence.py`
- Owns `ToolEvidence`, `RunEvidence`, `TaskEvidencePacket`, `EvidenceBuilder`, and text renderers.
- Reads `SessionManager` event records; does not call providers or tools.
- Modify `app-instance/backend/beaver/tasks/models.py`
- Add validation `status`, `evidence_gaps`, derived `passed`, derived `accepted`, and task status helper properties.
- Modify `app-instance/backend/beaver/tasks/service.py`
- Add validation-aware status transitions and API fields for `is_execution_active` and `requires_user_action`.
- Modify `app-instance/backend/beaver/tasks/validation.py`
- Accept `TaskEvidencePacket`, render full evidence without fixed caps, parse new validator fields, and preserve raw response metadata.
- Modify `app-instance/backend/beaver/tasks/__init__.py`
- Export evidence models and builder.
- Modify `app-instance/backend/beaver/coordinator/models.py`
- Add `evidence` to `NodeRunResult` and include it in `to_dict()`.
- Modify `app-instance/backend/beaver/coordinator/local.py`
- Build evidence after delegated runs and support isolated-loop execution mode.
- Modify `app-instance/backend/beaver/coordinator/execution/scheduler.py`
- Preserve failed-node evidence, keep result order deterministic, and bound parallel isolated runs.
- Modify `app-instance/backend/beaver/services/team_service.py`
- Pass parallel concurrency configuration to the scheduler.
- Modify `app-instance/backend/beaver/services/agent_service.py`
- Build task evidence packets, make team synthesis no-tools by default, pass evidence into validation, record validation debug metadata, and branch on validation `status`.
- Modify `app-instance/backend/beaver/engine/loop.py`
- Add no-tools finalization at the tool iteration limit and slim `llm_request_snapshotted` by default.
- Add or update tests in:
- `app-instance/backend/tests/unit/test_task_evidence.py`
- `app-instance/backend/tests/unit/test_task_mode_feedback.py`
- `app-instance/backend/tests/unit/test_agent_team_v1.py`
- `app-instance/backend/tests/unit/test_phase5_skills_runtime.py`
## Task 1: Validation Status Model and Task State Helpers
**Files:**
- Modify: `app-instance/backend/beaver/tasks/models.py`
- Modify: `app-instance/backend/beaver/tasks/service.py`
- Modify: `app-instance/backend/beaver/tasks/__init__.py`
- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py`
- [ ] **Step 1: Write failing tests for validation status semantics**
Add these tests to `app-instance/backend/tests/unit/test_task_mode_feedback.py` near the existing validation tests:
```python
def test_validation_result_status_drives_accepted_and_passed() -> None:
accepted = ValidationResult(status="accepted", score=0.9, validator="test")
insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test")
rejected = ValidationResult(status="rejected", score=0.9, validator="test")
assert accepted.passed is True
assert accepted.accepted is True
assert insufficient.passed is False
assert insufficient.accepted is False
assert rejected.passed is False
assert rejected.accepted is False
def test_validation_result_from_legacy_payload_maps_to_status() -> None:
accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"})
rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"})
assert accepted is not None
assert accepted.status == "accepted"
assert rejected is not None
assert rejected.status == "rejected"
```
- [ ] **Step 2: Run tests to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v
```
Expected: FAIL because `ValidationResult` does not accept `status` or `evidence_gaps`.
- [ ] **Step 3: Implement validation status fields**
In `app-instance/backend/beaver/tasks/models.py`, replace the `ValidationResult` dataclass with this shape:
```python
ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
@dataclass(slots=True)
class ValidationResult:
status: ValidationStatus = "rejected"
score: float = 0.0
issues: list[str] = field(default_factory=list)
missing_requirements: list[str] = field(default_factory=list)
evidence_gaps: list[str] = field(default_factory=list)
recommended_revision_prompt: str = ""
validator: str = "heuristic"
def __init__(
self,
*,
status: ValidationStatus | None = None,
passed: bool | None = None,
score: float = 0.0,
issues: list[str] | None = None,
missing_requirements: list[str] | None = None,
evidence_gaps: list[str] | None = None,
recommended_revision_prompt: str = "",
validator: str = "heuristic",
) -> None:
self.status = status or ("accepted" if passed and score >= 0.75 else "rejected")
self.score = max(0.0, min(1.0, float(score or 0.0)))
self.issues = list(issues or [])
self.missing_requirements = list(missing_requirements or [])
self.evidence_gaps = list(evidence_gaps or [])
self.recommended_revision_prompt = recommended_revision_prompt
self.validator = validator
@property
def passed(self) -> bool:
return self.status == "accepted"
@property
def accepted(self) -> bool:
return self.status == "accepted"
def to_dict(self) -> dict[str, Any]:
return {
"status": self.status,
"passed": self.passed,
"score": self.score,
"issues": list(self.issues),
"missing_requirements": list(self.missing_requirements),
"evidence_gaps": list(self.evidence_gaps),
"recommended_revision_prompt": self.recommended_revision_prompt,
"validator": self.validator,
"accepted": self.accepted,
}
@classmethod
def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None":
if not isinstance(payload, dict):
return None
raw_status = payload.get("status")
status: ValidationStatus | None = (
raw_status
if raw_status in {"accepted", "rejected", "insufficient_evidence", "validator_error"}
else None
)
return cls(
status=status,
passed=bool(payload.get("passed")) if status is None else None,
score=float(payload.get("score", 0.0) or 0.0),
issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator=str(payload.get("validator") or "unknown"),
)
```
Also import `Literal` from `typing`.
- [ ] **Step 4: Add task status helper properties**
In `TaskRecord`, add:
```python
@property
def is_execution_active(self) -> bool:
return self.status in {"running", "validating"}
@property
def requires_user_action(self) -> bool:
return self.status in {"awaiting_feedback", "needs_review", "needs_revision"}
```
Add `needs_review` to `TASK_OPEN_STATUSES`; keep `failed` out of the open set:
```python
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"}
```
- [ ] **Step 5: Expose status helpers in API payloads**
In `TaskService.to_api_dict()`, add:
```python
payload["is_open"] = task.is_open
payload["is_execution_active"] = task.is_execution_active
payload["requires_user_action"] = task.requires_user_action
```
- [ ] **Step 6: Run status model tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v
```
Expected: PASS.
- [ ] **Step 7: Commit**
```bash
git add app-instance/backend/beaver/tasks/models.py app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): add validation status semantics"
```
## Task 2: Evidence Models, Builder, and Renderer
**Files:**
- Create: `app-instance/backend/beaver/tasks/evidence.py`
- Modify: `app-instance/backend/beaver/tasks/__init__.py`
- Test: `app-instance/backend/tests/unit/test_task_evidence.py`
- [ ] **Step 1: Write evidence builder tests**
Create `app-instance/backend/tests/unit/test_task_evidence.py`:
```python
from __future__ import annotations
from pathlib import Path
from beaver.engine.session.manager import SessionManager
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, render_task_evidence
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
session_manager = SessionManager(tmp_path)
session_id = "session-1"
run_id = "run-1"
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
session_manager.ensure_session(session_id, source="test")
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
session_manager.append_message(
session_id,
run_id=run_id,
role="tool",
event_type="tool_result_recorded",
event_payload={"success": True, "url": "https://example.test/match"},
content=long_content,
tool_name="web_fetch",
tool_call_id="call-1",
)
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="run_completed",
event_payload={"finish_reason": "stop"},
content="Manchester United won 3-2.",
finish_reason="stop",
context_visible=False,
)
evidence = EvidenceBuilder(session_manager).build_run_evidence(
session_id=session_id,
run_id=run_id,
output_text="Manchester United won 3-2.",
finish_reason="stop",
)
rendered = render_task_evidence(
TaskEvidencePacket(
task_id="task-1",
attempt_index=1,
main_run=evidence,
team_runs=[],
team_node_results=[],
final_output="Manchester United won 3-2.",
)
)
assert evidence.tool_results[0].content == long_content
assert "MAN 3 FT 2 NFO" in rendered
assert "https://example.test/match" in rendered
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
run = RunEvidence(
run_id="run-team",
session_id="session-team",
output_text="Tool loop stopped.",
finish_reason="max_tool_iterations",
transcript=[],
tool_results=[],
warnings=["finish_reason=max_tool_iterations"],
)
packet = TaskEvidencePacket(
task_id="task-1",
attempt_index=2,
main_run=None,
team_runs=[run],
team_node_results=[],
final_output="partial answer",
)
rendered = render_task_evidence(packet)
assert "finish_reason=max_tool_iterations" in rendered
assert "partial answer" in rendered
```
- [ ] **Step 2: Run tests to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_evidence.py -v
```
Expected: FAIL because `beaver.tasks.evidence` does not exist.
- [ ] **Step 3: Implement evidence models and builder**
Create `app-instance/backend/beaver/tasks/evidence.py`:
```python
"""Structured evidence for task synthesis and validation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any] = field(default_factory=dict)
url: str | None = None
title: str | None = None
created_at: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_call_id": self.tool_call_id,
"content": self.content,
"event_payload": dict(self.event_payload),
"url": self.url,
"title": self.title,
"created_at": self.created_at,
}
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]] = field(default_factory=list)
tool_results: list[ToolEvidence] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"session_id": self.session_id,
"output_text": self.output_text,
"finish_reason": self.finish_reason,
"transcript": list(self.transcript),
"tool_results": [item.to_dict() for item in self.tool_results],
"warnings": list(self.warnings),
}
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence] = field(default_factory=list)
team_node_results: list[Any] = field(default_factory=list)
final_output: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"attempt_index": self.attempt_index,
"main_run": self.main_run.to_dict() if self.main_run else None,
"team_runs": [item.to_dict() for item in self.team_runs],
"team_node_results": [
item.to_dict() if hasattr(item, "to_dict") else dict(item)
for item in self.team_node_results
],
"final_output": self.final_output,
}
class EvidenceBuilder:
def __init__(self, session_manager: Any) -> None:
self.session_manager = session_manager
def build_run_evidence(
self,
*,
session_id: str,
run_id: str,
output_text: str,
finish_reason: str,
) -> RunEvidence:
events = self.session_manager.get_run_event_records(session_id, run_id)
transcript: list[dict[str, Any]] = []
tool_results: list[ToolEvidence] = []
warnings: list[str] = []
for event in events:
payload = dict(event.event_payload or {})
transcript.append(
{
"role": event.role,
"event_type": event.event_type,
"content": event.content,
"tool_name": event.tool_name,
"tool_call_id": event.tool_call_id,
"finish_reason": event.finish_reason,
"event_payload": payload,
}
)
if event.event_type == "tool_result_recorded":
tool_results.append(
ToolEvidence(
tool_name=event.tool_name or "tool",
tool_call_id=event.tool_call_id,
content=event.content or "",
event_payload=payload,
url=_optional_str(payload.get("url")),
title=_optional_str(payload.get("title")),
created_at=_optional_str(payload.get("created_at")),
)
)
if finish_reason and finish_reason != "stop":
warnings.append(f"finish_reason={finish_reason}")
return RunEvidence(
run_id=run_id,
session_id=session_id,
output_text=output_text,
finish_reason=finish_reason,
transcript=transcript,
tool_results=tool_results,
warnings=warnings,
)
def render_task_evidence(packet: TaskEvidencePacket) -> str:
sections = [
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
f"Final output:\n{packet.final_output}",
]
if packet.main_run is not None:
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
if packet.team_runs:
sections.append(
"Team run evidence:\n"
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
)
if packet.team_node_results:
lines = []
for item in packet.team_node_results:
lines.append(
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
)
sections.append("Team node results:\n" + "\n".join(lines))
return "\n\n".join(section for section in sections if section.strip())
def render_run_evidence(evidence: RunEvidence) -> str:
lines = [
f"run_id={evidence.run_id}",
f"session_id={evidence.session_id}",
f"finish_reason={evidence.finish_reason}",
]
if evidence.output_text:
lines.append(f"output:\n{evidence.output_text}")
if evidence.warnings:
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
if evidence.tool_results:
lines.append(
"tool_results:\n"
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
)
return "\n".join(lines)
def _render_tool_evidence(item: ToolEvidence) -> str:
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
metadata = []
if item.url:
metadata.append(f"url={item.url}")
if item.title:
metadata.append(f"title={item.title}")
return "\n".join([header, *metadata, item.content])
def _optional_str(value: Any) -> str | None:
return str(value) if value is not None else None
```
- [ ] **Step 4: Export evidence models**
In `app-instance/backend/beaver/tasks/__init__.py`, add:
```python
from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
```
Add those names to `__all__`.
- [ ] **Step 5: Run evidence tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_evidence.py -v
```
Expected: PASS.
- [ ] **Step 6: Commit**
```bash
git add app-instance/backend/beaver/tasks/evidence.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_evidence.py
git commit -m "feat(task): add structured run evidence"
```
## Task 3: Team Evidence Propagation
**Files:**
- Modify: `app-instance/backend/beaver/coordinator/models.py`
- Modify: `app-instance/backend/beaver/coordinator/local.py`
- Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py`
- Test: `app-instance/backend/tests/unit/test_agent_team_v1.py`
- [ ] **Step 1: Write failing test for failed-node evidence**
Add to `app-instance/backend/tests/unit/test_agent_team_v1.py`:
```python
def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None:
loop = _loop(tmp_path)
provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")])
envelope = DelegationEnvelope(
parent_task_id="task-parent",
parent_session_id="session-root",
parent_run_id="run-root",
agent=AgentDescriptor(name="researcher", role="research"),
task="research the requested topic",
node_id="research",
)
result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider)))
assert result.success is False
assert result.evidence is not None
assert result.evidence.output_text == "partial evidence"
assert result.evidence.finish_reason == "max_tool_iterations"
```
- [ ] **Step 2: Run test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop -v
```
Expected: FAIL because `NodeRunResult` has no `evidence`.
- [ ] **Step 3: Add evidence field to `NodeRunResult`**
In `app-instance/backend/beaver/coordinator/models.py`, under `TYPE_CHECKING`, import `RunEvidence`:
```python
if TYPE_CHECKING:
from beaver.engine.context import SkillContext
from beaver.tasks.evidence import RunEvidence
```
Update `NodeRunResult`:
```python
evidence: "RunEvidence | None" = None
```
Update `to_dict()`:
```python
"evidence": self.evidence.to_dict() if self.evidence is not None else None,
```
- [ ] **Step 4: Build evidence in delegated runs**
In `app-instance/backend/beaver/coordinator/local.py`, import `EvidenceBuilder`:
```python
from beaver.tasks.evidence import EvidenceBuilder
```
After `result = await runner(...)`, add:
```python
loaded = self.loop.boot()
evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence(
session_id=result.session_id,
run_id=result.run_id,
output_text=result.output_text,
finish_reason=result.finish_reason,
)
```
Pass `evidence=evidence` into `NodeRunResult(...)`.
- [ ] **Step 5: Preserve evidence in scheduler summaries**
In `TeamGraphScheduler._summarize()`, keep `summary_parts` as-is for user-facing text, but do not filter or drop failed `node_results`; the `TeamRunResult(node_results=results, ...)` call already keeps them. Update failure lines to mention evidence:
```python
f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}"
```
- [ ] **Step 6: Run team evidence tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop tests/unit/test_agent_team_v1.py::test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs -v
```
Expected: PASS.
- [ ] **Step 7: Commit**
```bash
git add app-instance/backend/beaver/coordinator/models.py app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/tests/unit/test_agent_team_v1.py
git commit -m "feat(team): preserve node run evidence"
```
## Task 4: Task Evidence Packet, No-Tools Team Synthesis, and Validation Input
**Files:**
- Modify: `app-instance/backend/beaver/services/agent_service.py`
- Modify: `app-instance/backend/beaver/tasks/validation.py`
- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py`
- [ ] **Step 1: Update stub provider to record tools**
In `StubProvider` inside `test_task_mode_feedback.py`, change `self.calls` and `chat()`:
```python
self.calls: list[dict[str, object]] = []
```
```python
self.calls.append({"messages": messages, "tools": tools, "model": model})
```
Update existing assertions from `main_provider.calls[0][0]["content"]` to:
```python
main_provider.calls[0]["messages"][0]["content"]
```
- [ ] **Step 2: Write failing no-tools synthesis test**
Add:
```python
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
main_provider = StubProvider(
[
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
sub_provider = StubProvider(
[
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
validation_service=validation,
)
)
result = asyncio.run(
service.process_direct(
"implement team-backed workflow",
session_id="web:team-no-tools",
provider_bundle=_provider_bundle(main_provider),
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
)
)
assert result.output_text == "final synthesized answer"
assert main_provider.calls[0]["tools"] is None
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
```
Update `StubValidationService` to record calls:
```python
self.calls: list[dict] = []
```
```python
self.calls.append(kwargs)
```
- [ ] **Step 3: Run no-tools synthesis test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v
```
Expected: FAIL because final synthesis still gets tools and validation does not receive `evidence_text`.
- [ ] **Step 4: Build team evidence context in agent service**
In `agent_service.py`, import:
```python
from beaver.tasks.evidence import EvidenceBuilder, TaskEvidencePacket, render_task_evidence
```
Add helper methods near `_team_execution_context()`:
```python
def _team_run_evidence(self, result: TeamRunResult | None) -> list[RunEvidence]:
if result is None:
return []
return [node.evidence for node in result.node_results if node.evidence is not None]
def _build_task_evidence_packet(
self,
*,
session_manager: Any,
task: TaskRecord,
attempt_index: int,
result: AgentRunResult,
team_result: TeamRunResult | None,
) -> TaskEvidencePacket:
main_run = EvidenceBuilder(session_manager).build_run_evidence(
session_id=result.session_id,
run_id=result.run_id,
output_text=result.output_text,
finish_reason=result.finish_reason,
)
return TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=main_run,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results) if team_result is not None else [],
final_output=result.output_text,
)
```
Keep imports type-safe by importing `RunEvidence` if the helper uses it in annotations.
- [ ] **Step 5: Make team synthesis no-tools by default**
Inside `_run_task_mode`, keep `team_result: TeamRunResult | None = None` before the team block. Before `runner(message, **attempt_kwargs)`, add:
```python
if plan.is_team and team_execution_context:
attempt_kwargs["include_tools"] = False
attempt_kwargs["max_tool_iterations"] = 0
```
Replace team context building with evidence rendering:
```python
if team_result is not None:
team_packet = TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=None,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results),
final_output="",
)
team_execution_context = self._join_context(
self._team_execution_context(plan, team_result),
"Rendered team evidence:\n" + render_task_evidence(team_packet),
)
```
- [ ] **Step 6: Pass evidence packet to validation**
After the final `result` returns, build the packet and pass it to validation:
```python
evidence_packet = self._build_task_evidence_packet(
session_manager=session_manager,
task=task,
attempt_index=attempt_index,
result=result,
team_result=team_result,
)
evidence_text = render_task_evidence(evidence_packet)
validation = await validation_service.validate_task_result(
task=task,
user_message=message,
final_output=result.output_text,
evidence_packet=evidence_packet,
evidence_text=evidence_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries,
provider_bundle=provider_bundle,
)
```
- [ ] **Step 7: Update validation service signature**
In `validation.py`, add parameters:
```python
evidence_packet: Any | None = None
evidence_text: str = ""
```
Pass `evidence_text` into `_validate_with_provider()`. In the prompt, replace fixed excerpt emphasis with:
```python
f"Evidence packet:\n{evidence_text}\n\n"
```
Keep old `transcript_excerpt`, `tool_summaries`, and `team_summaries` in the prompt only when `evidence_text` is empty:
```python
legacy_context = "" if evidence_text else (
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
)
```
- [ ] **Step 8: Run team synthesis test**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v
```
Expected: PASS.
- [ ] **Step 9: Commit**
```bash
git add app-instance/backend/beaver/services/agent_service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): synthesize and validate from evidence"
```
## Task 5: Validation Status Transitions and Debug Metadata
**Files:**
- Modify: `app-instance/backend/beaver/tasks/service.py`
- Modify: `app-instance/backend/beaver/tasks/validation.py`
- Modify: `app-instance/backend/beaver/services/agent_service.py`
- Test: `app-instance/backend/tests/unit/test_task_mode_feedback.py`
- [ ] **Step 1: Write failing status transition tests**
Add:
```python
def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None:
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=_single_planner(),
validation_service=StubValidationService(
[ValidationResult(status="insufficient_evidence", score=0.4, evidence_gaps=["source missing"], validator="test")]
),
)
)
result = asyncio.run(
service.process_direct(
"answer with uncertain evidence",
session_id="web:needs-review",
provider_bundle=_bundle("possible answer"),
)
)
loaded = service.create_loop().boot()
task = loaded.task_service.get_task(result.task_id)
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted")
assert task is not None
assert task.status == "needs_review"
assert task.requires_user_action is True
assert task.is_execution_active is False
assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence"
assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0
```
- [ ] **Step 2: Run transition test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review -v
```
Expected: FAIL because `record_validation()` still sets `awaiting_feedback`.
- [ ] **Step 3: Implement status transition method**
In `TaskService`, change `record_validation()` signature:
```python
def record_validation(
self,
task_id: str,
run_id: str,
validation: ValidationResult,
*,
final_attempt: bool = True,
has_usable_answer: bool = True,
) -> TaskRecord:
```
Use this transition:
```python
if validation.status == "accepted":
task.status = "awaiting_feedback"
elif validation.status in {"insufficient_evidence", "validator_error"}:
task.status = "needs_review"
elif validation.status == "rejected" and not final_attempt:
task.status = "needs_revision"
elif validation.status == "rejected" and has_usable_answer:
task.status = "needs_review"
else:
task.status = "failed"
task.closed_at = now
task.close_reason = "automatic validation rejected the final attempt"
```
Keep `task.validation_result = validation.to_dict()`.
- [ ] **Step 4: Pass final-attempt metadata from agent service**
In `_run_task_mode`, replace:
```python
task = task_service.record_validation(task.task_id, result.run_id, validation)
```
with:
```python
task = task_service.record_validation(
task.task_id,
result.run_id,
validation,
final_attempt=(attempt_index == 2 or validation.status in {"accepted", "insufficient_evidence", "validator_error"}),
has_usable_answer=bool(result.output_text.strip())
and "Tool loop stopped after reaching the configured iteration limit." not in result.output_text,
)
```
Then retry only on rejected first attempts:
```python
if validation.status == "rejected" and attempt_index == 1:
session_manager.set_run_context_visible(result.session_id, result.run_id, False)
else:
break
```
- [ ] **Step 5: Record validation debug metadata**
Build debug payload in `agent_service.py` before appending `task_validation_snapshotted`:
```python
validation_debug = {
"evidence_run_ids": [
item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"evidence_session_ids": [
item.session_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"tool_result_count": sum(
len(item.tool_results) for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
),
"evidence_length": len(evidence_text),
}
```
Add it to the event payload:
```python
"validation_debug": validation_debug,
```
- [ ] **Step 6: Parse new validator status**
In `ValidationService._validate_with_provider()`, create result with:
```python
status = payload.get("status")
if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}:
status = "accepted" if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75 else "rejected"
return ValidationResult(
status=status,
score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))),
issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator="llm",
)
```
For validator exceptions, return:
```python
ValidationResult(
status="validator_error",
score=0.0,
issues=[f"Validator failed: {exc}"],
evidence_gaps=["Automatic validation failed before producing a reliable decision."],
missing_requirements=["User review is required because automatic validation failed."],
recommended_revision_prompt="Review the answer and evidence, then decide whether to revise or accept it.",
validator="llm_error",
)
```
- [ ] **Step 7: Run transition tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review tests/unit/test_task_mode_feedback.py::test_task_mode_team_retry_hides_first_synthesis_run -v
```
Expected: PASS.
- [ ] **Step 8: Commit**
```bash
git add app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/beaver/services/agent_service.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): route validation status to review states"
```
## Task 6: Tool Iteration No-Tools Finalization
**Files:**
- Modify: `app-instance/backend/beaver/engine/loop.py`
- Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py`
- [ ] **Step 1: Write failing finalization test**
Update `test_agent_loop_records_max_tool_iterations_as_failed_skill_effect` in `test_phase5_skills_runtime.py` so the stub provider has a third finalization response:
```python
LLMResponse(
content="Based on the available tool result, the container likely failed during startup.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
```
Change assertions:
```python
assert result.finish_reason == "max_tool_iterations_finalized"
assert "Based on the available tool result" in result.output_text
assert "Tool loop stopped" not in result.output_text
```
- [ ] **Step 2: Run test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v
```
Expected: FAIL because the loop returns `max_tool_iterations`.
- [ ] **Step 3: Add finalization helper**
In `AgentLoop`, add:
```python
async def _finalize_after_tool_limit(
self,
*,
provider: Any,
messages: list[dict[str, Any]],
model: str,
max_tokens: int,
temperature: float,
thinking_enabled: bool | None,
) -> str:
final_messages = [
*messages,
{
"role": "system",
"content": (
"The configured tool iteration budget is exhausted. "
"Do not call tools. Produce the best final answer from the existing conversation "
"and tool results. State uncertainty explicitly."
),
},
]
kwargs: dict[str, Any] = {
"messages": final_messages,
"tools": None,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
}
if thinking_enabled is not None:
kwargs["thinking_enabled"] = thinking_enabled
response = await provider.chat(**kwargs)
return (response.content or "").strip()
```
- [ ] **Step 4: Use helper at iteration limit**
Replace the `if iterations >= resolved_max_tool_iterations:` block with:
```python
if iterations >= resolved_max_tool_iterations:
finalized = await self._finalize_after_tool_limit(
provider=provider,
messages=messages,
model=final_model,
max_tokens=resolved_max_tokens,
temperature=resolved_temperature,
thinking_enabled=thinking_enabled,
)
final_text = finalized or "Tool loop stopped after reaching the configured iteration limit, and no final answer was produced."
final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations"
session_manager.append_message(
resolved_session_id,
run_id=resolved_run_id,
role="assistant",
event_type="assistant_message_added",
event_payload={"task_id": task_id} if task_id else None,
content=final_text,
finish_reason=final_finish_reason,
source=source,
title=title,
model=final_model,
user_id=user_id,
)
context_builder.add_assistant_message(messages, content=final_text)
break
```
- [ ] **Step 5: Run finalization test**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v
```
Expected: PASS.
- [ ] **Step 6: Commit**
```bash
git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py
git commit -m "feat(engine): finalize after tool iteration limit"
```
## Task 7: Bounded Parallel Team Execution
**Files:**
- Modify: `app-instance/backend/beaver/coordinator/local.py`
- Modify: `app-instance/backend/beaver/coordinator/execution/scheduler.py`
- Modify: `app-instance/backend/beaver/services/team_service.py`
- Test: `app-instance/backend/tests/unit/test_agent_team_v1.py`
- [ ] **Step 1: Write failing concurrency test**
Add to `test_agent_team_v1.py`:
```python
class BlockingProvider(RecordingProvider):
def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None:
super().__init__([_response(content)])
self.started = started
self.release = release
async def chat(self, *args, **kwargs) -> LLMResponse:
self.started.set()
await self.release.wait()
return await super().chat(*args, **kwargs)
def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None:
loop = _loop(tmp_path)
first_started = asyncio.Event()
second_started = asyncio.Event()
release = asyncio.Event()
providers = {
"one": BlockingProvider("one", first_started, release),
"two": BlockingProvider("two", second_started, release),
}
graph = ExecutionGraph(
strategy="parallel",
nodes=[
ExecutionNode("one", "task one", AgentDescriptor(name="one")),
ExecutionNode("two", "task two", AgentDescriptor(name="two")),
],
)
async def run_case():
task = asyncio.create_task(
TeamService(loop).run_team(
graph,
parent_task_id=None,
parent_session_id="session-root",
parent_run_id="run-root",
provider_bundle_factory=lambda node: _bundle(providers[node.node_id]),
)
)
await asyncio.wait_for(first_started.wait(), timeout=1)
await asyncio.wait_for(second_started.wait(), timeout=1)
release.set()
return await task
result = asyncio.run(run_case())
assert result.success is True
assert [item.node_id for item in result.node_results] == ["one", "two"]
```
- [ ] **Step 2: Run test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops -v
```
Expected: FAIL or timeout because shared-loop execution serializes the providers.
- [ ] **Step 3: Add isolated execution mode to local runner**
In `LocalAgentRunner.run()`, add parameter:
```python
execution_mode: str = "shared_loop",
```
Use helper:
```python
target_loop = self.loop
if execution_mode == "isolated_loop":
target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader)
runner = target_loop.process_direct if execution_mode == "isolated_loop" else (
self.loop.submit_direct if self.loop.is_running else self.loop.process_direct
)
```
Use `target_loop.boot()` when building evidence after the run.
- [ ] **Step 4: Add scheduler concurrency limit**
In `TeamGraphScheduler.__init__()`:
```python
def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None:
self.runner = runner
self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes))
```
Change `_run_parallel()`:
```python
semaphore = asyncio.Semaphore(self.max_parallel_team_nodes)
async def run_one(node: ExecutionNode) -> NodeRunResult:
async with semaphore:
return await self._run_node(
node,
dependency_outputs={},
execution_mode="isolated_loop",
**kwargs,
)
return list(await asyncio.gather(*(run_one(node) for node in nodes)))
```
Update `_run_node()` signature to accept `execution_mode: str = "shared_loop"` and pass it to `self.runner.run(...)`.
- [ ] **Step 5: Wire limit through TeamService**
In `TeamService.__init__()`:
```python
def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None:
self.loop = loop
self.runner = LocalAgentRunner(loop)
self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes)
```
- [ ] **Step 6: Run parallel tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops tests/unit/test_agent_team_v1.py::test_team_parallel_runs_all_nodes -v
```
Expected: PASS.
- [ ] **Step 7: Commit**
```bash
git add app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/beaver/services/team_service.py app-instance/backend/tests/unit/test_agent_team_v1.py
git commit -m "feat(team): run parallel nodes with isolated loops"
```
## Task 8: Slim LLM Request Snapshots
**Files:**
- Modify: `app-instance/backend/beaver/engine/loop.py`
- Test: `app-instance/backend/tests/unit/test_phase5_skills_runtime.py`
- [ ] **Step 1: Write failing snapshot-size test**
Add to `test_phase5_skills_runtime.py`:
```python
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler()))
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider([LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]),
)
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
assert "message_count" in snapshot.event_payload
assert "tool_names" in snapshot.event_payload
assert "messages" not in snapshot.event_payload
assert "tools" not in snapshot.event_payload
```
- [ ] **Step 2: Run test to verify failure**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v
```
Expected: FAIL because snapshot payload still includes complete `messages` and `tools`.
- [ ] **Step 3: Add compact snapshot payload**
In `AgentLoop._process_direct_impl()`, before `session_manager.append_message(... event_type="llm_request_snapshotted" ...)`, add:
```python
tool_names = [
str(tool.get("function", {}).get("name") or tool.get("name") or "tool")
for tool in (tool_schemas or [])
if isinstance(tool, dict)
]
snapshot_payload = {
"iteration": iterations,
"provider_name": final_provider_name,
"model": final_model,
"message_count": len(messages),
"tool_names": tool_names,
"message_char_length": len(json.dumps(messages, ensure_ascii=False, default=str)),
"tool_schema_char_length": len(json.dumps(tool_schemas, ensure_ascii=False, default=str)),
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
}
```
Use `snapshot_payload` as `event_payload`. Use compact `content`:
```python
content=json.dumps(snapshot_payload, ensure_ascii=False, default=str)
```
- [ ] **Step 4: Run compact snapshot test**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v
```
Expected: PASS.
- [ ] **Step 5: Commit**
```bash
git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py
git commit -m "chore(engine): compact llm request snapshots"
```
## Task 9: Full Regression and Compatibility Sweep
**Files:**
- Modify only files required by failing compatibility tests.
- Test: backend unit suite.
- [ ] **Step 1: Run focused task/team/engine tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_task_evidence.py tests/unit/test_task_mode_feedback.py tests/unit/test_agent_team_v1.py tests/unit/test_phase5_skills_runtime.py -v
```
Expected: PASS.
- [ ] **Step 2: Run full backend unit tests**
Run:
```bash
cd app-instance/backend
pytest tests/unit -v
```
Expected: PASS.
- [ ] **Step 3: Inspect active task API projections**
Run:
```bash
cd app-instance/backend
pytest tests/unit/test_active_task_api.py tests/unit/test_process_projection.py -v
```
Expected: PASS. If these fail because payloads lack `is_execution_active` or `requires_user_action`, update expected payloads to include the new fields and keep existing assertions.
- [ ] **Step 4: Run repository status check**
Run:
```bash
git status --short
```
Expected: only files changed by the implementation tasks are listed.
- [ ] **Step 5: Commit compatibility fixes**
If Step 2 or Step 3 required fixes, commit them:
```bash
git add app-instance/backend
git commit -m "test(task): update validation evidence regressions"
```
If no fixes were needed, skip this commit.
## Self-Review Checklist
- Spec coverage:
- Complete evidence preservation: Tasks 2, 3, 4.
- No fixed validation truncation: Task 4.
- `status` over `passed`: Tasks 1, 5.
- `needs_review` as user-action state: Tasks 1, 5.
- No-tools team synthesis: Task 4.
- Tool-limit finalization: Task 6.
- Limited parallel team execution: Task 7.
- Validation debug metadata: Task 5.
- Compact LLM snapshots: Task 8.
- Type consistency:
- `TaskEvidencePacket`, `RunEvidence`, and `ToolEvidence` are defined before coordinator and service code references them.
- New validation statuses are all handled by `ValidationResult`, `TaskService.record_validation()`, and `ValidationService`.
- `NodeRunResult.evidence` is optional, so blocked or factory-error nodes can still be represented.
- Execution order:
- Tasks are ordered so each dependency exists before later tasks use it.
- Every task has a focused test command and a commit point.