"""Structured evidence for task synthesis and validation.""" from __future__ import annotations from dataclasses import dataclass, field from typing import Any @dataclass(slots=True) class ToolEvidence: tool_name: str tool_call_id: str | None content: str event_payload: dict[str, Any] = field(default_factory=dict) url: str | None = None title: str | None = None created_at: str | None = None def to_dict(self) -> dict[str, Any]: return { "tool_name": self.tool_name, "tool_call_id": self.tool_call_id, "content": self.content, "event_payload": dict(self.event_payload), "url": self.url, "title": self.title, "created_at": self.created_at, } @dataclass(slots=True) class RunEvidence: run_id: str session_id: str output_text: str finish_reason: str transcript: list[dict[str, Any]] = field(default_factory=list) tool_results: list[ToolEvidence] = field(default_factory=list) warnings: list[str] = field(default_factory=list) def to_dict(self) -> dict[str, Any]: return { "run_id": self.run_id, "session_id": self.session_id, "output_text": self.output_text, "finish_reason": self.finish_reason, "transcript": list(self.transcript), "tool_results": [item.to_dict() for item in self.tool_results], "warnings": list(self.warnings), } @dataclass(slots=True) class TaskEvidencePacket: task_id: str attempt_index: int main_run: RunEvidence | None team_runs: list[RunEvidence] = field(default_factory=list) team_node_results: list[Any] = field(default_factory=list) final_output: str = "" def to_dict(self) -> dict[str, Any]: return { "task_id": self.task_id, "attempt_index": self.attempt_index, "main_run": self.main_run.to_dict() if self.main_run else None, "team_runs": [item.to_dict() for item in self.team_runs], "team_node_results": [ item.to_dict() if hasattr(item, "to_dict") else dict(item) for item in self.team_node_results ], "final_output": self.final_output, } class EvidenceBuilder: def __init__(self, session_manager: Any) -> None: self.session_manager = session_manager def build_run_evidence( self, session_id: str, run_id: str, output_text: str, finish_reason: str, ) -> RunEvidence: events = self.session_manager.get_run_event_records(session_id, run_id) transcript: list[dict[str, Any]] = [] tool_results: list[ToolEvidence] = [] warnings: list[str] = [] for event in events: payload = dict(event.event_payload or {}) transcript.append( { "role": event.role, "event_type": event.event_type, "content": event.content, "tool_name": event.tool_name, "tool_call_id": event.tool_call_id, "finish_reason": event.finish_reason, "event_payload": payload, } ) if event.event_type == "tool_result_recorded": tool_results.append( ToolEvidence( tool_name=event.tool_name or "tool", tool_call_id=event.tool_call_id, content=event.content or "", event_payload=payload, url=_optional_str(payload.get("url")), title=_optional_str(payload.get("title")), created_at=_optional_str(payload.get("created_at")), ) ) if finish_reason and finish_reason != "stop": warnings.append(f"finish_reason={finish_reason}") return RunEvidence( run_id=run_id, session_id=session_id, output_text=output_text, finish_reason=finish_reason, transcript=transcript, tool_results=tool_results, warnings=warnings, ) def render_task_evidence(packet: TaskEvidencePacket) -> str: sections = [ f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}", f"Final output:\n{packet.final_output}", ] if packet.main_run is not None: sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run)) if packet.team_runs: sections.append( "Team run evidence:\n" + "\n\n".join(render_run_evidence(item) for item in packet.team_runs) ) if packet.team_node_results: lines = [] for item in packet.team_node_results: lines.append( f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} " f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}" ) sections.append("Team node results:\n" + "\n".join(lines)) return "\n\n".join(section for section in sections if section.strip()) def render_run_evidence(evidence: RunEvidence) -> str: lines = [ f"run_id={evidence.run_id}", f"session_id={evidence.session_id}", f"finish_reason={evidence.finish_reason}", ] if evidence.output_text: lines.append(f"output:\n{evidence.output_text}") if evidence.warnings: lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings)) if evidence.tool_results: lines.append( "tool_results:\n" + "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results) ) return "\n".join(lines) def _render_tool_evidence(item: ToolEvidence) -> str: header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}" metadata = [] if item.url: metadata.append(f"url={item.url}") if item.title: metadata.append(f"title={item.title}") if item.created_at: metadata.append(f"created_at={item.created_at}") return "\n".join([header, *metadata, item.content]) def _optional_str(value: Any) -> str | None: return str(value) if value is not None else None