feat(task): add structured run evidence

This commit is contained in:
2026-05-22 11:15:10 +08:00
parent c3c4df306b
commit 0ace09b984
3 changed files with 269 additions and 0 deletions

View File

@ -1,5 +1,6 @@
"""Internal task tracking for automatic Main Agent task mode.""" """Internal task tracking for automatic Main Agent task mode."""
from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult, ValidationStatus from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult, ValidationStatus
from .planner import TaskExecutionPlan, TaskExecutionPlanner from .planner import TaskExecutionPlan, TaskExecutionPlanner
from .router import MainAgentRouter from .router import MainAgentRouter
@ -8,16 +9,21 @@ from .skill_resolver import SkillResolutionReport, TaskSkillResolver
from .validation import ValidationService from .validation import ValidationService
__all__ = [ __all__ = [
"EvidenceBuilder",
"MainAgentDecision", "MainAgentDecision",
"MainAgentRouter", "MainAgentRouter",
"RunEvidence",
"TaskEvent", "TaskEvent",
"TaskEvidencePacket",
"TaskExecutionPlan", "TaskExecutionPlan",
"TaskExecutionPlanner", "TaskExecutionPlanner",
"TaskRecord", "TaskRecord",
"TaskService", "TaskService",
"SkillResolutionReport", "SkillResolutionReport",
"TaskSkillResolver", "TaskSkillResolver",
"ToolEvidence",
"ValidationResult", "ValidationResult",
"ValidationStatus", "ValidationStatus",
"ValidationService", "ValidationService",
"render_task_evidence",
] ]

View File

@ -0,0 +1,182 @@
"""Structured evidence for task synthesis and validation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any] = field(default_factory=dict)
url: str | None = None
title: str | None = None
created_at: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_call_id": self.tool_call_id,
"content": self.content,
"event_payload": dict(self.event_payload),
"url": self.url,
"title": self.title,
"created_at": self.created_at,
}
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]] = field(default_factory=list)
tool_results: list[ToolEvidence] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"session_id": self.session_id,
"output_text": self.output_text,
"finish_reason": self.finish_reason,
"transcript": list(self.transcript),
"tool_results": [item.to_dict() for item in self.tool_results],
"warnings": list(self.warnings),
}
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence] = field(default_factory=list)
team_node_results: list[Any] = field(default_factory=list)
final_output: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"attempt_index": self.attempt_index,
"main_run": self.main_run.to_dict() if self.main_run else None,
"team_runs": [item.to_dict() for item in self.team_runs],
"team_node_results": [
item.to_dict() if hasattr(item, "to_dict") else dict(item)
for item in self.team_node_results
],
"final_output": self.final_output,
}
class EvidenceBuilder:
def __init__(self, session_manager: Any) -> None:
self.session_manager = session_manager
def build_run_evidence(
self,
*,
session_id: str,
run_id: str,
output_text: str,
finish_reason: str,
) -> RunEvidence:
events = self.session_manager.get_run_event_records(session_id, run_id)
transcript: list[dict[str, Any]] = []
tool_results: list[ToolEvidence] = []
warnings: list[str] = []
for event in events:
payload = dict(event.event_payload or {})
transcript.append(
{
"role": event.role,
"event_type": event.event_type,
"content": event.content,
"tool_name": event.tool_name,
"tool_call_id": event.tool_call_id,
"finish_reason": event.finish_reason,
"event_payload": payload,
}
)
if event.event_type == "tool_result_recorded":
tool_results.append(
ToolEvidence(
tool_name=event.tool_name or "tool",
tool_call_id=event.tool_call_id,
content=event.content or "",
event_payload=payload,
url=_optional_str(payload.get("url")),
title=_optional_str(payload.get("title")),
created_at=_optional_str(payload.get("created_at")),
)
)
if finish_reason and finish_reason != "stop":
warnings.append(f"finish_reason={finish_reason}")
return RunEvidence(
run_id=run_id,
session_id=session_id,
output_text=output_text,
finish_reason=finish_reason,
transcript=transcript,
tool_results=tool_results,
warnings=warnings,
)
def render_task_evidence(packet: TaskEvidencePacket) -> str:
sections = [
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
f"Final output:\n{packet.final_output}",
]
if packet.main_run is not None:
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
if packet.team_runs:
sections.append(
"Team run evidence:\n"
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
)
if packet.team_node_results:
lines = []
for item in packet.team_node_results:
lines.append(
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
)
sections.append("Team node results:\n" + "\n".join(lines))
return "\n\n".join(section for section in sections if section.strip())
def render_run_evidence(evidence: RunEvidence) -> str:
lines = [
f"run_id={evidence.run_id}",
f"session_id={evidence.session_id}",
f"finish_reason={evidence.finish_reason}",
]
if evidence.output_text:
lines.append(f"output:\n{evidence.output_text}")
if evidence.warnings:
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
if evidence.tool_results:
lines.append(
"tool_results:\n"
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
)
return "\n".join(lines)
def _render_tool_evidence(item: ToolEvidence) -> str:
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
metadata = []
if item.url:
metadata.append(f"url={item.url}")
if item.title:
metadata.append(f"title={item.title}")
return "\n".join([header, *metadata, item.content])
def _optional_str(value: Any) -> str | None:
return str(value) if value is not None else None

View File

@ -0,0 +1,81 @@
from __future__ import annotations
from pathlib import Path
from beaver.engine.session.manager import SessionManager
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, render_task_evidence
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
session_manager = SessionManager(tmp_path)
session_id = "session-1"
run_id = "run-1"
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
session_manager.ensure_session(session_id, source="test")
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
session_manager.append_message(
session_id,
run_id=run_id,
role="tool",
event_type="tool_result_recorded",
event_payload={"success": True, "url": "https://example.test/match"},
content=long_content,
tool_name="web_fetch",
tool_call_id="call-1",
)
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="run_completed",
event_payload={"finish_reason": "stop"},
content="Manchester United won 3-2.",
finish_reason="stop",
context_visible=False,
)
evidence = EvidenceBuilder(session_manager).build_run_evidence(
session_id=session_id,
run_id=run_id,
output_text="Manchester United won 3-2.",
finish_reason="stop",
)
rendered = render_task_evidence(
TaskEvidencePacket(
task_id="task-1",
attempt_index=1,
main_run=evidence,
team_runs=[],
team_node_results=[],
final_output="Manchester United won 3-2.",
)
)
assert evidence.tool_results[0].content == long_content
assert "MAN 3 FT 2 NFO" in rendered
assert "https://example.test/match" in rendered
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
run = RunEvidence(
run_id="run-team",
session_id="session-team",
output_text="Tool loop stopped.",
finish_reason="max_tool_iterations",
transcript=[],
tool_results=[],
warnings=["finish_reason=max_tool_iterations"],
)
packet = TaskEvidencePacket(
task_id="task-1",
attempt_index=2,
main_run=None,
team_runs=[run],
team_node_results=[],
final_output="partial answer",
)
rendered = render_task_evidence(packet)
assert "finish_reason=max_tool_iterations" in rendered
assert "partial answer" in rendered