50 KiB
Task Evidence and Validation Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Replace truncated task validation context with structured run evidence, clearer validation statuses, bounded real team parallelism, no-tools synthesis, and explicit user-review states.
Architecture: Keep the existing AgentService, TeamService, and AgentLoop shape. Add focused evidence models/builders under beaver.tasks, propagate evidence through coordinator result models, and make validation/status logic consume status rather than interpreting passed=False as failure. Parallel team batches use isolated AgentLoop instances behind a semaphore, while sequence and DAG keep the current shared-loop behavior.
Tech Stack: Python dataclasses, pytest, asyncio, existing Beaver session/task/coordinator services, existing provider and tool abstractions.
File Structure
- Create
app-instance/backend/beaver/tasks/evidence.py- Owns
ToolEvidence,RunEvidence,TaskEvidencePacket,EvidenceBuilder, and text renderers. - Reads
SessionManagerevent records; does not call providers or tools.
- Owns
- Modify
app-instance/backend/beaver/tasks/models.py- Add validation
status,evidence_gaps, derivedpassed, derivedaccepted, and task status helper properties.
- Add validation
- Modify
app-instance/backend/beaver/tasks/service.py- Add validation-aware status transitions and API fields for
is_execution_activeandrequires_user_action.
- Add validation-aware status transitions and API fields for
- Modify
app-instance/backend/beaver/tasks/validation.py- Accept
TaskEvidencePacket, render full evidence without fixed caps, parse new validator fields, and preserve raw response metadata.
- Accept
- Modify
app-instance/backend/beaver/tasks/__init__.py- Export evidence models and builder.
- Modify
app-instance/backend/beaver/coordinator/models.py- Add
evidencetoNodeRunResultand include it into_dict().
- Add
- Modify
app-instance/backend/beaver/coordinator/local.py- Build evidence after delegated runs and support isolated-loop execution mode.
- Modify
app-instance/backend/beaver/coordinator/execution/scheduler.py- Preserve failed-node evidence, keep result order deterministic, and bound parallel isolated runs.
- Modify
app-instance/backend/beaver/services/team_service.py- Pass parallel concurrency configuration to the scheduler.
- Modify
app-instance/backend/beaver/services/agent_service.py- Build task evidence packets, make team synthesis no-tools by default, pass evidence into validation, record validation debug metadata, and branch on validation
status.
- Build task evidence packets, make team synthesis no-tools by default, pass evidence into validation, record validation debug metadata, and branch on validation
- Modify
app-instance/backend/beaver/engine/loop.py- Add no-tools finalization at the tool iteration limit and slim
llm_request_snapshottedby default.
- Add no-tools finalization at the tool iteration limit and slim
- Add or update tests in:
app-instance/backend/tests/unit/test_task_evidence.pyapp-instance/backend/tests/unit/test_task_mode_feedback.pyapp-instance/backend/tests/unit/test_agent_team_v1.pyapp-instance/backend/tests/unit/test_phase5_skills_runtime.py
Task 1: Validation Status Model and Task State Helpers
Files:
-
Modify:
app-instance/backend/beaver/tasks/models.py -
Modify:
app-instance/backend/beaver/tasks/service.py -
Modify:
app-instance/backend/beaver/tasks/__init__.py -
Test:
app-instance/backend/tests/unit/test_task_mode_feedback.py -
Step 1: Write failing tests for validation status semantics
Add these tests to app-instance/backend/tests/unit/test_task_mode_feedback.py near the existing validation tests:
def test_validation_result_status_drives_accepted_and_passed() -> None:
accepted = ValidationResult(status="accepted", score=0.9, validator="test")
insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test")
rejected = ValidationResult(status="rejected", score=0.9, validator="test")
assert accepted.passed is True
assert accepted.accepted is True
assert insufficient.passed is False
assert insufficient.accepted is False
assert rejected.passed is False
assert rejected.accepted is False
def test_validation_result_from_legacy_payload_maps_to_status() -> None:
accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"})
rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"})
assert accepted is not None
assert accepted.status == "accepted"
assert rejected is not None
assert rejected.status == "rejected"
- Step 2: Run tests to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v
Expected: FAIL because ValidationResult does not accept status or evidence_gaps.
- Step 3: Implement validation status fields
In app-instance/backend/beaver/tasks/models.py, replace the ValidationResult dataclass with this shape:
ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
@dataclass(slots=True)
class ValidationResult:
status: ValidationStatus = "rejected"
score: float = 0.0
issues: list[str] = field(default_factory=list)
missing_requirements: list[str] = field(default_factory=list)
evidence_gaps: list[str] = field(default_factory=list)
recommended_revision_prompt: str = ""
validator: str = "heuristic"
def __init__(
self,
*,
status: ValidationStatus | None = None,
passed: bool | None = None,
score: float = 0.0,
issues: list[str] | None = None,
missing_requirements: list[str] | None = None,
evidence_gaps: list[str] | None = None,
recommended_revision_prompt: str = "",
validator: str = "heuristic",
) -> None:
self.status = status or ("accepted" if passed and score >= 0.75 else "rejected")
self.score = max(0.0, min(1.0, float(score or 0.0)))
self.issues = list(issues or [])
self.missing_requirements = list(missing_requirements or [])
self.evidence_gaps = list(evidence_gaps or [])
self.recommended_revision_prompt = recommended_revision_prompt
self.validator = validator
@property
def passed(self) -> bool:
return self.status == "accepted"
@property
def accepted(self) -> bool:
return self.status == "accepted"
def to_dict(self) -> dict[str, Any]:
return {
"status": self.status,
"passed": self.passed,
"score": self.score,
"issues": list(self.issues),
"missing_requirements": list(self.missing_requirements),
"evidence_gaps": list(self.evidence_gaps),
"recommended_revision_prompt": self.recommended_revision_prompt,
"validator": self.validator,
"accepted": self.accepted,
}
@classmethod
def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None":
if not isinstance(payload, dict):
return None
raw_status = payload.get("status")
status: ValidationStatus | None = (
raw_status
if raw_status in {"accepted", "rejected", "insufficient_evidence", "validator_error"}
else None
)
return cls(
status=status,
passed=bool(payload.get("passed")) if status is None else None,
score=float(payload.get("score", 0.0) or 0.0),
issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator=str(payload.get("validator") or "unknown"),
)
Also import Literal from typing.
- Step 4: Add task status helper properties
In TaskRecord, add:
@property
def is_execution_active(self) -> bool:
return self.status in {"running", "validating"}
@property
def requires_user_action(self) -> bool:
return self.status in {"awaiting_feedback", "needs_review", "needs_revision"}
Add needs_review to TASK_OPEN_STATUSES; keep failed out of the open set:
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"}
- Step 5: Expose status helpers in API payloads
In TaskService.to_api_dict(), add:
payload["is_open"] = task.is_open
payload["is_execution_active"] = task.is_execution_active
payload["requires_user_action"] = task.requires_user_action
- Step 6: Run status model tests
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_validation_result_status_drives_accepted_and_passed tests/unit/test_task_mode_feedback.py::test_validation_result_from_legacy_payload_maps_to_status -v
Expected: PASS.
- Step 7: Commit
git add app-instance/backend/beaver/tasks/models.py app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): add validation status semantics"
Task 2: Evidence Models, Builder, and Renderer
Files:
-
Create:
app-instance/backend/beaver/tasks/evidence.py -
Modify:
app-instance/backend/beaver/tasks/__init__.py -
Test:
app-instance/backend/tests/unit/test_task_evidence.py -
Step 1: Write evidence builder tests
Create app-instance/backend/tests/unit/test_task_evidence.py:
from __future__ import annotations
from pathlib import Path
from beaver.engine.session.manager import SessionManager
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, render_task_evidence
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
session_manager = SessionManager(tmp_path)
session_id = "session-1"
run_id = "run-1"
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
session_manager.ensure_session(session_id, source="test")
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
session_manager.append_message(
session_id,
run_id=run_id,
role="tool",
event_type="tool_result_recorded",
event_payload={"success": True, "url": "https://example.test/match"},
content=long_content,
tool_name="web_fetch",
tool_call_id="call-1",
)
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="run_completed",
event_payload={"finish_reason": "stop"},
content="Manchester United won 3-2.",
finish_reason="stop",
context_visible=False,
)
evidence = EvidenceBuilder(session_manager).build_run_evidence(
session_id=session_id,
run_id=run_id,
output_text="Manchester United won 3-2.",
finish_reason="stop",
)
rendered = render_task_evidence(
TaskEvidencePacket(
task_id="task-1",
attempt_index=1,
main_run=evidence,
team_runs=[],
team_node_results=[],
final_output="Manchester United won 3-2.",
)
)
assert evidence.tool_results[0].content == long_content
assert "MAN 3 FT 2 NFO" in rendered
assert "https://example.test/match" in rendered
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
run = RunEvidence(
run_id="run-team",
session_id="session-team",
output_text="Tool loop stopped.",
finish_reason="max_tool_iterations",
transcript=[],
tool_results=[],
warnings=["finish_reason=max_tool_iterations"],
)
packet = TaskEvidencePacket(
task_id="task-1",
attempt_index=2,
main_run=None,
team_runs=[run],
team_node_results=[],
final_output="partial answer",
)
rendered = render_task_evidence(packet)
assert "finish_reason=max_tool_iterations" in rendered
assert "partial answer" in rendered
- Step 2: Run tests to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_task_evidence.py -v
Expected: FAIL because beaver.tasks.evidence does not exist.
- Step 3: Implement evidence models and builder
Create app-instance/backend/beaver/tasks/evidence.py:
"""Structured evidence for task synthesis and validation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any] = field(default_factory=dict)
url: str | None = None
title: str | None = None
created_at: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_call_id": self.tool_call_id,
"content": self.content,
"event_payload": dict(self.event_payload),
"url": self.url,
"title": self.title,
"created_at": self.created_at,
}
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]] = field(default_factory=list)
tool_results: list[ToolEvidence] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"session_id": self.session_id,
"output_text": self.output_text,
"finish_reason": self.finish_reason,
"transcript": list(self.transcript),
"tool_results": [item.to_dict() for item in self.tool_results],
"warnings": list(self.warnings),
}
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence] = field(default_factory=list)
team_node_results: list[Any] = field(default_factory=list)
final_output: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"attempt_index": self.attempt_index,
"main_run": self.main_run.to_dict() if self.main_run else None,
"team_runs": [item.to_dict() for item in self.team_runs],
"team_node_results": [
item.to_dict() if hasattr(item, "to_dict") else dict(item)
for item in self.team_node_results
],
"final_output": self.final_output,
}
class EvidenceBuilder:
def __init__(self, session_manager: Any) -> None:
self.session_manager = session_manager
def build_run_evidence(
self,
*,
session_id: str,
run_id: str,
output_text: str,
finish_reason: str,
) -> RunEvidence:
events = self.session_manager.get_run_event_records(session_id, run_id)
transcript: list[dict[str, Any]] = []
tool_results: list[ToolEvidence] = []
warnings: list[str] = []
for event in events:
payload = dict(event.event_payload or {})
transcript.append(
{
"role": event.role,
"event_type": event.event_type,
"content": event.content,
"tool_name": event.tool_name,
"tool_call_id": event.tool_call_id,
"finish_reason": event.finish_reason,
"event_payload": payload,
}
)
if event.event_type == "tool_result_recorded":
tool_results.append(
ToolEvidence(
tool_name=event.tool_name or "tool",
tool_call_id=event.tool_call_id,
content=event.content or "",
event_payload=payload,
url=_optional_str(payload.get("url")),
title=_optional_str(payload.get("title")),
created_at=_optional_str(payload.get("created_at")),
)
)
if finish_reason and finish_reason != "stop":
warnings.append(f"finish_reason={finish_reason}")
return RunEvidence(
run_id=run_id,
session_id=session_id,
output_text=output_text,
finish_reason=finish_reason,
transcript=transcript,
tool_results=tool_results,
warnings=warnings,
)
def render_task_evidence(packet: TaskEvidencePacket) -> str:
sections = [
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
f"Final output:\n{packet.final_output}",
]
if packet.main_run is not None:
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
if packet.team_runs:
sections.append(
"Team run evidence:\n"
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
)
if packet.team_node_results:
lines = []
for item in packet.team_node_results:
lines.append(
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
)
sections.append("Team node results:\n" + "\n".join(lines))
return "\n\n".join(section for section in sections if section.strip())
def render_run_evidence(evidence: RunEvidence) -> str:
lines = [
f"run_id={evidence.run_id}",
f"session_id={evidence.session_id}",
f"finish_reason={evidence.finish_reason}",
]
if evidence.output_text:
lines.append(f"output:\n{evidence.output_text}")
if evidence.warnings:
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
if evidence.tool_results:
lines.append(
"tool_results:\n"
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
)
return "\n".join(lines)
def _render_tool_evidence(item: ToolEvidence) -> str:
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
metadata = []
if item.url:
metadata.append(f"url={item.url}")
if item.title:
metadata.append(f"title={item.title}")
return "\n".join([header, *metadata, item.content])
def _optional_str(value: Any) -> str | None:
return str(value) if value is not None else None
- Step 4: Export evidence models
In app-instance/backend/beaver/tasks/__init__.py, add:
from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
Add those names to __all__.
- Step 5: Run evidence tests
Run:
cd app-instance/backend
pytest tests/unit/test_task_evidence.py -v
Expected: PASS.
- Step 6: Commit
git add app-instance/backend/beaver/tasks/evidence.py app-instance/backend/beaver/tasks/__init__.py app-instance/backend/tests/unit/test_task_evidence.py
git commit -m "feat(task): add structured run evidence"
Task 3: Team Evidence Propagation
Files:
-
Modify:
app-instance/backend/beaver/coordinator/models.py -
Modify:
app-instance/backend/beaver/coordinator/local.py -
Modify:
app-instance/backend/beaver/coordinator/execution/scheduler.py -
Test:
app-instance/backend/tests/unit/test_agent_team_v1.py -
Step 1: Write failing test for failed-node evidence
Add to app-instance/backend/tests/unit/test_agent_team_v1.py:
def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None:
loop = _loop(tmp_path)
provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")])
envelope = DelegationEnvelope(
parent_task_id="task-parent",
parent_session_id="session-root",
parent_run_id="run-root",
agent=AgentDescriptor(name="researcher", role="research"),
task="research the requested topic",
node_id="research",
)
result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider)))
assert result.success is False
assert result.evidence is not None
assert result.evidence.output_text == "partial evidence"
assert result.evidence.finish_reason == "max_tool_iterations"
- Step 2: Run test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop -v
Expected: FAIL because NodeRunResult has no evidence.
- Step 3: Add evidence field to
NodeRunResult
In app-instance/backend/beaver/coordinator/models.py, under TYPE_CHECKING, import RunEvidence:
if TYPE_CHECKING:
from beaver.engine.context import SkillContext
from beaver.tasks.evidence import RunEvidence
Update NodeRunResult:
evidence: "RunEvidence | None" = None
Update to_dict():
"evidence": self.evidence.to_dict() if self.evidence is not None else None,
- Step 4: Build evidence in delegated runs
In app-instance/backend/beaver/coordinator/local.py, import EvidenceBuilder:
from beaver.tasks.evidence import EvidenceBuilder
After result = await runner(...), add:
loaded = self.loop.boot()
evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence(
session_id=result.session_id,
run_id=result.run_id,
output_text=result.output_text,
finish_reason=result.finish_reason,
)
Pass evidence=evidence into NodeRunResult(...).
- Step 5: Preserve evidence in scheduler summaries
In TeamGraphScheduler._summarize(), keep summary_parts as-is for user-facing text, but do not filter or drop failed node_results; the TeamRunResult(node_results=results, ...) call already keeps them. Update failure lines to mention evidence:
f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}"
- Step 6: Run team evidence tests
Run:
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_node_preserves_evidence_when_finish_reason_is_not_stop tests/unit/test_agent_team_v1.py::test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs -v
Expected: PASS.
- Step 7: Commit
git add app-instance/backend/beaver/coordinator/models.py app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/tests/unit/test_agent_team_v1.py
git commit -m "feat(team): preserve node run evidence"
Task 4: Task Evidence Packet, No-Tools Team Synthesis, and Validation Input
Files:
-
Modify:
app-instance/backend/beaver/services/agent_service.py -
Modify:
app-instance/backend/beaver/tasks/validation.py -
Test:
app-instance/backend/tests/unit/test_task_mode_feedback.py -
Step 1: Update stub provider to record tools
In StubProvider inside test_task_mode_feedback.py, change self.calls and chat():
self.calls: list[dict[str, object]] = []
self.calls.append({"messages": messages, "tools": tools, "model": model})
Update existing assertions from main_provider.calls[0][0]["content"] to:
main_provider.calls[0]["messages"][0]["content"]
- Step 2: Write failing no-tools synthesis test
Add:
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
main_provider = StubProvider(
[
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
sub_provider = StubProvider(
[
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
validation_service=validation,
)
)
result = asyncio.run(
service.process_direct(
"implement team-backed workflow",
session_id="web:team-no-tools",
provider_bundle=_provider_bundle(main_provider),
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
)
)
assert result.output_text == "final synthesized answer"
assert main_provider.calls[0]["tools"] is None
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
Update StubValidationService to record calls:
self.calls: list[dict] = []
self.calls.append(kwargs)
- Step 3: Run no-tools synthesis test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v
Expected: FAIL because final synthesis still gets tools and validation does not receive evidence_text.
- Step 4: Build team evidence context in agent service
In agent_service.py, import:
from beaver.tasks.evidence import EvidenceBuilder, TaskEvidencePacket, render_task_evidence
Add helper methods near _team_execution_context():
def _team_run_evidence(self, result: TeamRunResult | None) -> list[RunEvidence]:
if result is None:
return []
return [node.evidence for node in result.node_results if node.evidence is not None]
def _build_task_evidence_packet(
self,
*,
session_manager: Any,
task: TaskRecord,
attempt_index: int,
result: AgentRunResult,
team_result: TeamRunResult | None,
) -> TaskEvidencePacket:
main_run = EvidenceBuilder(session_manager).build_run_evidence(
session_id=result.session_id,
run_id=result.run_id,
output_text=result.output_text,
finish_reason=result.finish_reason,
)
return TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=main_run,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results) if team_result is not None else [],
final_output=result.output_text,
)
Keep imports type-safe by importing RunEvidence if the helper uses it in annotations.
- Step 5: Make team synthesis no-tools by default
Inside _run_task_mode, keep team_result: TeamRunResult | None = None before the team block. Before runner(message, **attempt_kwargs), add:
if plan.is_team and team_execution_context:
attempt_kwargs["include_tools"] = False
attempt_kwargs["max_tool_iterations"] = 0
Replace team context building with evidence rendering:
if team_result is not None:
team_packet = TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=None,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results),
final_output="",
)
team_execution_context = self._join_context(
self._team_execution_context(plan, team_result),
"Rendered team evidence:\n" + render_task_evidence(team_packet),
)
- Step 6: Pass evidence packet to validation
After the final result returns, build the packet and pass it to validation:
evidence_packet = self._build_task_evidence_packet(
session_manager=session_manager,
task=task,
attempt_index=attempt_index,
result=result,
team_result=team_result,
)
evidence_text = render_task_evidence(evidence_packet)
validation = await validation_service.validate_task_result(
task=task,
user_message=message,
final_output=result.output_text,
evidence_packet=evidence_packet,
evidence_text=evidence_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries,
provider_bundle=provider_bundle,
)
- Step 7: Update validation service signature
In validation.py, add parameters:
evidence_packet: Any | None = None
evidence_text: str = ""
Pass evidence_text into _validate_with_provider(). In the prompt, replace fixed excerpt emphasis with:
f"Evidence packet:\n{evidence_text}\n\n"
Keep old transcript_excerpt, tool_summaries, and team_summaries in the prompt only when evidence_text is empty:
legacy_context = "" if evidence_text else (
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
)
- Step 8: Run team synthesis test
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence -v
Expected: PASS.
- Step 9: Commit
git add app-instance/backend/beaver/services/agent_service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): synthesize and validate from evidence"
Task 5: Validation Status Transitions and Debug Metadata
Files:
-
Modify:
app-instance/backend/beaver/tasks/service.py -
Modify:
app-instance/backend/beaver/tasks/validation.py -
Modify:
app-instance/backend/beaver/services/agent_service.py -
Test:
app-instance/backend/tests/unit/test_task_mode_feedback.py -
Step 1: Write failing status transition tests
Add:
def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None:
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=_single_planner(),
validation_service=StubValidationService(
[ValidationResult(status="insufficient_evidence", score=0.4, evidence_gaps=["source missing"], validator="test")]
),
)
)
result = asyncio.run(
service.process_direct(
"answer with uncertain evidence",
session_id="web:needs-review",
provider_bundle=_bundle("possible answer"),
)
)
loaded = service.create_loop().boot()
task = loaded.task_service.get_task(result.task_id)
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted")
assert task is not None
assert task.status == "needs_review"
assert task.requires_user_action is True
assert task.is_execution_active is False
assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence"
assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0
- Step 2: Run transition test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review -v
Expected: FAIL because record_validation() still sets awaiting_feedback.
- Step 3: Implement status transition method
In TaskService, change record_validation() signature:
def record_validation(
self,
task_id: str,
run_id: str,
validation: ValidationResult,
*,
final_attempt: bool = True,
has_usable_answer: bool = True,
) -> TaskRecord:
Use this transition:
if validation.status == "accepted":
task.status = "awaiting_feedback"
elif validation.status in {"insufficient_evidence", "validator_error"}:
task.status = "needs_review"
elif validation.status == "rejected" and not final_attempt:
task.status = "needs_revision"
elif validation.status == "rejected" and has_usable_answer:
task.status = "needs_review"
else:
task.status = "failed"
task.closed_at = now
task.close_reason = "automatic validation rejected the final attempt"
Keep task.validation_result = validation.to_dict().
- Step 4: Pass final-attempt metadata from agent service
In _run_task_mode, replace:
task = task_service.record_validation(task.task_id, result.run_id, validation)
with:
task = task_service.record_validation(
task.task_id,
result.run_id,
validation,
final_attempt=(attempt_index == 2 or validation.status in {"accepted", "insufficient_evidence", "validator_error"}),
has_usable_answer=bool(result.output_text.strip())
and "Tool loop stopped after reaching the configured iteration limit." not in result.output_text,
)
Then retry only on rejected first attempts:
if validation.status == "rejected" and attempt_index == 1:
session_manager.set_run_context_visible(result.session_id, result.run_id, False)
else:
break
- Step 5: Record validation debug metadata
Build debug payload in agent_service.py before appending task_validation_snapshotted:
validation_debug = {
"evidence_run_ids": [
item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"evidence_session_ids": [
item.session_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"tool_result_count": sum(
len(item.tool_results) for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
),
"evidence_length": len(evidence_text),
}
Add it to the event payload:
"validation_debug": validation_debug,
- Step 6: Parse new validator status
In ValidationService._validate_with_provider(), create result with:
status = payload.get("status")
if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}:
status = "accepted" if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75 else "rejected"
return ValidationResult(
status=status,
score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))),
issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator="llm",
)
For validator exceptions, return:
ValidationResult(
status="validator_error",
score=0.0,
issues=[f"Validator failed: {exc}"],
evidence_gaps=["Automatic validation failed before producing a reliable decision."],
missing_requirements=["User review is required because automatic validation failed."],
recommended_revision_prompt="Review the answer and evidence, then decide whether to revise or accept it.",
validator="llm_error",
)
- Step 7: Run transition tests
Run:
cd app-instance/backend
pytest tests/unit/test_task_mode_feedback.py::test_insufficient_evidence_moves_task_to_needs_review tests/unit/test_task_mode_feedback.py::test_task_mode_team_retry_hides_first_synthesis_run -v
Expected: PASS.
- Step 8: Commit
git add app-instance/backend/beaver/tasks/service.py app-instance/backend/beaver/tasks/validation.py app-instance/backend/beaver/services/agent_service.py app-instance/backend/tests/unit/test_task_mode_feedback.py
git commit -m "feat(task): route validation status to review states"
Task 6: Tool Iteration No-Tools Finalization
Files:
-
Modify:
app-instance/backend/beaver/engine/loop.py -
Test:
app-instance/backend/tests/unit/test_phase5_skills_runtime.py -
Step 1: Write failing finalization test
Update test_agent_loop_records_max_tool_iterations_as_failed_skill_effect in test_phase5_skills_runtime.py so the stub provider has a third finalization response:
LLMResponse(
content="Based on the available tool result, the container likely failed during startup.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
Change assertions:
assert result.finish_reason == "max_tool_iterations_finalized"
assert "Based on the available tool result" in result.output_text
assert "Tool loop stopped" not in result.output_text
- Step 2: Run test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v
Expected: FAIL because the loop returns max_tool_iterations.
- Step 3: Add finalization helper
In AgentLoop, add:
async def _finalize_after_tool_limit(
self,
*,
provider: Any,
messages: list[dict[str, Any]],
model: str,
max_tokens: int,
temperature: float,
thinking_enabled: bool | None,
) -> str:
final_messages = [
*messages,
{
"role": "system",
"content": (
"The configured tool iteration budget is exhausted. "
"Do not call tools. Produce the best final answer from the existing conversation "
"and tool results. State uncertainty explicitly."
),
},
]
kwargs: dict[str, Any] = {
"messages": final_messages,
"tools": None,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
}
if thinking_enabled is not None:
kwargs["thinking_enabled"] = thinking_enabled
response = await provider.chat(**kwargs)
return (response.content or "").strip()
- Step 4: Use helper at iteration limit
Replace the if iterations >= resolved_max_tool_iterations: block with:
if iterations >= resolved_max_tool_iterations:
finalized = await self._finalize_after_tool_limit(
provider=provider,
messages=messages,
model=final_model,
max_tokens=resolved_max_tokens,
temperature=resolved_temperature,
thinking_enabled=thinking_enabled,
)
final_text = finalized or "Tool loop stopped after reaching the configured iteration limit, and no final answer was produced."
final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations"
session_manager.append_message(
resolved_session_id,
run_id=resolved_run_id,
role="assistant",
event_type="assistant_message_added",
event_payload={"task_id": task_id} if task_id else None,
content=final_text,
finish_reason=final_finish_reason,
source=source,
title=title,
model=final_model,
user_id=user_id,
)
context_builder.add_assistant_message(messages, content=final_text)
break
- Step 5: Run finalization test
Run:
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_agent_loop_records_max_tool_iterations_as_failed_skill_effect -v
Expected: PASS.
- Step 6: Commit
git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py
git commit -m "feat(engine): finalize after tool iteration limit"
Task 7: Bounded Parallel Team Execution
Files:
-
Modify:
app-instance/backend/beaver/coordinator/local.py -
Modify:
app-instance/backend/beaver/coordinator/execution/scheduler.py -
Modify:
app-instance/backend/beaver/services/team_service.py -
Test:
app-instance/backend/tests/unit/test_agent_team_v1.py -
Step 1: Write failing concurrency test
Add to test_agent_team_v1.py:
class BlockingProvider(RecordingProvider):
def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None:
super().__init__([_response(content)])
self.started = started
self.release = release
async def chat(self, *args, **kwargs) -> LLMResponse:
self.started.set()
await self.release.wait()
return await super().chat(*args, **kwargs)
def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None:
loop = _loop(tmp_path)
first_started = asyncio.Event()
second_started = asyncio.Event()
release = asyncio.Event()
providers = {
"one": BlockingProvider("one", first_started, release),
"two": BlockingProvider("two", second_started, release),
}
graph = ExecutionGraph(
strategy="parallel",
nodes=[
ExecutionNode("one", "task one", AgentDescriptor(name="one")),
ExecutionNode("two", "task two", AgentDescriptor(name="two")),
],
)
async def run_case():
task = asyncio.create_task(
TeamService(loop).run_team(
graph,
parent_task_id=None,
parent_session_id="session-root",
parent_run_id="run-root",
provider_bundle_factory=lambda node: _bundle(providers[node.node_id]),
)
)
await asyncio.wait_for(first_started.wait(), timeout=1)
await asyncio.wait_for(second_started.wait(), timeout=1)
release.set()
return await task
result = asyncio.run(run_case())
assert result.success is True
assert [item.node_id for item in result.node_results] == ["one", "two"]
- Step 2: Run test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops -v
Expected: FAIL or timeout because shared-loop execution serializes the providers.
- Step 3: Add isolated execution mode to local runner
In LocalAgentRunner.run(), add parameter:
execution_mode: str = "shared_loop",
Use helper:
target_loop = self.loop
if execution_mode == "isolated_loop":
target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader)
runner = target_loop.process_direct if execution_mode == "isolated_loop" else (
self.loop.submit_direct if self.loop.is_running else self.loop.process_direct
)
Use target_loop.boot() when building evidence after the run.
- Step 4: Add scheduler concurrency limit
In TeamGraphScheduler.__init__():
def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None:
self.runner = runner
self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes))
Change _run_parallel():
semaphore = asyncio.Semaphore(self.max_parallel_team_nodes)
async def run_one(node: ExecutionNode) -> NodeRunResult:
async with semaphore:
return await self._run_node(
node,
dependency_outputs={},
execution_mode="isolated_loop",
**kwargs,
)
return list(await asyncio.gather(*(run_one(node) for node in nodes)))
Update _run_node() signature to accept execution_mode: str = "shared_loop" and pass it to self.runner.run(...).
- Step 5: Wire limit through TeamService
In TeamService.__init__():
def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None:
self.loop = loop
self.runner = LocalAgentRunner(loop)
self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes)
- Step 6: Run parallel tests
Run:
cd app-instance/backend
pytest tests/unit/test_agent_team_v1.py::test_team_parallel_starts_nodes_concurrently_with_isolated_loops tests/unit/test_agent_team_v1.py::test_team_parallel_runs_all_nodes -v
Expected: PASS.
- Step 7: Commit
git add app-instance/backend/beaver/coordinator/local.py app-instance/backend/beaver/coordinator/execution/scheduler.py app-instance/backend/beaver/services/team_service.py app-instance/backend/tests/unit/test_agent_team_v1.py
git commit -m "feat(team): run parallel nodes with isolated loops"
Task 8: Slim LLM Request Snapshots
Files:
-
Modify:
app-instance/backend/beaver/engine/loop.py -
Test:
app-instance/backend/tests/unit/test_phase5_skills_runtime.py -
Step 1: Write failing snapshot-size test
Add to test_phase5_skills_runtime.py:
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler()))
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider([LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]),
)
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
assert "message_count" in snapshot.event_payload
assert "tool_names" in snapshot.event_payload
assert "messages" not in snapshot.event_payload
assert "tools" not in snapshot.event_payload
- Step 2: Run test to verify failure
Run:
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v
Expected: FAIL because snapshot payload still includes complete messages and tools.
- Step 3: Add compact snapshot payload
In AgentLoop._process_direct_impl(), before session_manager.append_message(... event_type="llm_request_snapshotted" ...), add:
tool_names = [
str(tool.get("function", {}).get("name") or tool.get("name") or "tool")
for tool in (tool_schemas or [])
if isinstance(tool, dict)
]
snapshot_payload = {
"iteration": iterations,
"provider_name": final_provider_name,
"model": final_model,
"message_count": len(messages),
"tool_names": tool_names,
"message_char_length": len(json.dumps(messages, ensure_ascii=False, default=str)),
"tool_schema_char_length": len(json.dumps(tool_schemas, ensure_ascii=False, default=str)),
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
}
Use snapshot_payload as event_payload. Use compact content:
content=json.dumps(snapshot_payload, ensure_ascii=False, default=str)
- Step 4: Run compact snapshot test
Run:
cd app-instance/backend
pytest tests/unit/test_phase5_skills_runtime.py::test_llm_request_snapshot_defaults_to_compact_payload -v
Expected: PASS.
- Step 5: Commit
git add app-instance/backend/beaver/engine/loop.py app-instance/backend/tests/unit/test_phase5_skills_runtime.py
git commit -m "chore(engine): compact llm request snapshots"
Task 9: Full Regression and Compatibility Sweep
Files:
-
Modify only files required by failing compatibility tests.
-
Test: backend unit suite.
-
Step 1: Run focused task/team/engine tests
Run:
cd app-instance/backend
pytest tests/unit/test_task_evidence.py tests/unit/test_task_mode_feedback.py tests/unit/test_agent_team_v1.py tests/unit/test_phase5_skills_runtime.py -v
Expected: PASS.
- Step 2: Run full backend unit tests
Run:
cd app-instance/backend
pytest tests/unit -v
Expected: PASS.
- Step 3: Inspect active task API projections
Run:
cd app-instance/backend
pytest tests/unit/test_active_task_api.py tests/unit/test_process_projection.py -v
Expected: PASS. If these fail because payloads lack is_execution_active or requires_user_action, update expected payloads to include the new fields and keep existing assertions.
- Step 4: Run repository status check
Run:
git status --short
Expected: only files changed by the implementation tasks are listed.
- Step 5: Commit compatibility fixes
If Step 2 or Step 3 required fixes, commit them:
git add app-instance/backend
git commit -m "test(task): update validation evidence regressions"
If no fixes were needed, skip this commit.
Self-Review Checklist
- Spec coverage:
- Complete evidence preservation: Tasks 2, 3, 4.
- No fixed validation truncation: Task 4.
statusoverpassed: Tasks 1, 5.needs_reviewas user-action state: Tasks 1, 5.- No-tools team synthesis: Task 4.
- Tool-limit finalization: Task 6.
- Limited parallel team execution: Task 7.
- Validation debug metadata: Task 5.
- Compact LLM snapshots: Task 8.
- Type consistency:
TaskEvidencePacket,RunEvidence, andToolEvidenceare defined before coordinator and service code references them.- New validation statuses are all handled by
ValidationResult,TaskService.record_validation(), andValidationService. NodeRunResult.evidenceis optional, so blocked or factory-error nodes can still be represented.
- Execution order:
- Tasks are ordered so each dependency exists before later tasks use it.
- Every task has a focused test command and a commit point.