15 Commits

17 changed files with 2672 additions and 61 deletions

View File

@ -18,8 +18,9 @@ if TYPE_CHECKING:
class TeamGraphScheduler: class TeamGraphScheduler:
"""Execute sequence, parallel, and DAG team graphs.""" """Execute sequence, parallel, and DAG team graphs."""
def __init__(self, runner: LocalAgentRunner) -> None: def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None:
self.runner = runner self.runner = runner
self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes))
async def run( async def run(
self, self,
@ -96,7 +97,18 @@ class TeamGraphScheduler:
nodes: list[ExecutionNode], nodes: list[ExecutionNode],
**kwargs, **kwargs,
) -> list[NodeRunResult]: ) -> list[NodeRunResult]:
return list(await asyncio.gather(*(self._run_node(node, dependency_outputs={}, **kwargs) for node in nodes))) semaphore = asyncio.Semaphore(self.max_parallel_team_nodes)
async def run_one(node: ExecutionNode) -> NodeRunResult:
async with semaphore:
return await self._run_node(
node,
dependency_outputs={},
execution_mode="isolated_loop",
**kwargs,
)
return list(await asyncio.gather(*(run_one(node) for node in nodes)))
async def _run_dag( async def _run_dag(
self, self,
@ -164,6 +176,7 @@ class TeamGraphScheduler:
inherited_pinned_skill_contexts: list["SkillContext"], inherited_pinned_skill_contexts: list["SkillContext"],
allow_candidate_generation: bool, allow_candidate_generation: bool,
dependency_outputs: dict[str, str], dependency_outputs: dict[str, str],
execution_mode: str = "shared_loop",
) -> NodeRunResult: ) -> NodeRunResult:
try: try:
pinned = self._merge_pinned(inherited_pinned_skills, node.inherited_pinned_skills) pinned = self._merge_pinned(inherited_pinned_skills, node.inherited_pinned_skills)
@ -189,6 +202,7 @@ class TeamGraphScheduler:
envelope, envelope,
provider_bundle=node_provider_bundle, provider_bundle=node_provider_bundle,
allow_candidate_generation=allow_candidate_generation, allow_candidate_generation=allow_candidate_generation,
execution_mode=execution_mode,
) )
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
@ -241,7 +255,7 @@ class TeamGraphScheduler:
failed = [item for item in results if not item.success] failed = [item for item in results if not item.success]
if failed: if failed:
failure_lines = [ failure_lines = [
f"- {item.node_id}: {item.error or item.finish_reason}" f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}"
for item in failed for item in failed
] ]
summary_parts.append("Failed nodes:\n" + "\n".join(failure_lines)) summary_parts.append("Failed nodes:\n" + "\n".join(failure_lines))

View File

@ -6,6 +6,7 @@ from uuid import uuid4
from beaver.engine import AgentLoop from beaver.engine import AgentLoop
from beaver.engine.providers import ProviderBundle from beaver.engine.providers import ProviderBundle
from beaver.tasks.evidence import EvidenceBuilder
from .models import DelegationEnvelope, NodeRunResult from .models import DelegationEnvelope, NodeRunResult
@ -22,6 +23,7 @@ class LocalAgentRunner:
*, *,
provider_bundle: ProviderBundle | None = None, provider_bundle: ProviderBundle | None = None,
allow_candidate_generation: bool = False, allow_candidate_generation: bool = False,
execution_mode: str = "shared_loop",
) -> NodeRunResult: ) -> NodeRunResult:
if provider_bundle is not None and (envelope.agent.model or envelope.agent.provider_name): if provider_bundle is not None and (envelope.agent.model or envelope.agent.provider_name):
raise ValueError( raise ValueError(
@ -29,7 +31,14 @@ class LocalAgentRunner:
"build a node-specific provider bundle instead." "build a node-specific provider bundle instead."
) )
child_session_id = self._child_session_id(envelope) child_session_id = self._child_session_id(envelope)
runner = self.loop.submit_direct if self.loop.is_running else self.loop.process_direct target_loop = self.loop
if execution_mode == "isolated_loop":
target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader)
runner = (
target_loop.process_direct
if execution_mode == "isolated_loop"
else (self.loop.submit_direct if self.loop.is_running else self.loop.process_direct)
)
result = await runner( result = await runner(
envelope.task, envelope.task,
session_id=child_session_id, session_id=child_session_id,
@ -47,6 +56,13 @@ class LocalAgentRunner:
pinned_skill_contexts=envelope.inherited_pinned_skill_contexts, pinned_skill_contexts=envelope.inherited_pinned_skill_contexts,
allow_candidate_generation=allow_candidate_generation, allow_candidate_generation=allow_candidate_generation,
) )
loaded = target_loop.boot()
evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence(
result.session_id,
result.run_id,
result.output_text,
result.finish_reason,
)
success = result.finish_reason == "stop" success = result.finish_reason == "stop"
return NodeRunResult( return NodeRunResult(
node_id=envelope.node_id or envelope.agent.name, node_id=envelope.node_id or envelope.agent.name,
@ -56,6 +72,7 @@ class LocalAgentRunner:
session_id=result.session_id, session_id=result.session_id,
finish_reason=result.finish_reason, finish_reason=result.finish_reason,
error=None if success else (result.output_text or result.finish_reason), error=None if success else (result.output_text or result.finish_reason),
evidence=evidence,
) )
@staticmethod @staticmethod

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING: if TYPE_CHECKING:
from beaver.engine.context import SkillContext from beaver.engine.context import SkillContext
from beaver.tasks.evidence import RunEvidence
TeamStrategy = Literal[ TeamStrategy = Literal[
@ -116,6 +117,7 @@ class NodeRunResult:
session_id: str | None = None session_id: str | None = None
finish_reason: str = "stop" finish_reason: str = "stop"
error: str | None = None error: str | None = None
evidence: "RunEvidence | None" = None
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
@ -126,6 +128,7 @@ class NodeRunResult:
"session_id": self.session_id, "session_id": self.session_id,
"finish_reason": self.finish_reason, "finish_reason": self.finish_reason,
"error": self.error, "error": self.error,
"evidence": self.evidence.to_dict() if self.evidence is not None else None,
} }

View File

@ -637,36 +637,39 @@ class AgentLoop:
while True: while True:
chat_kwargs: dict[str, Any] = { chat_kwargs: dict[str, Any] = {
"messages": messages, "messages": messages,
"tools": tool_schemas, "tools": tool_schemas if include_tools else None,
"model": final_model, "model": final_model,
"max_tokens": resolved_max_tokens, "max_tokens": resolved_max_tokens,
"temperature": resolved_temperature, "temperature": resolved_temperature,
} }
if thinking_enabled is not None: if thinking_enabled is not None:
chat_kwargs["thinking_enabled"] = thinking_enabled chat_kwargs["thinking_enabled"] = thinking_enabled
message_char_length = len(json.dumps(messages, ensure_ascii=False, default=str))
tool_schema_char_length = len(json.dumps(tool_schemas, ensure_ascii=False, default=str))
tool_names = [
str(tool.get("function", {}).get("name") or tool.get("name") or "tool")
for tool in (tool_schemas or [])
if isinstance(tool, dict)
]
snapshot_payload = {
"iteration": iterations,
"provider_name": final_provider_name,
"model": final_model,
"message_count": len(messages),
"tool_names": tool_names,
"message_char_length": message_char_length,
"tool_schema_char_length": tool_schema_char_length,
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
}
session_manager.append_message( session_manager.append_message(
resolved_session_id, resolved_session_id,
run_id=resolved_run_id, run_id=resolved_run_id,
role="system", role="system",
event_type="llm_request_snapshotted", event_type="llm_request_snapshotted",
event_payload={ event_payload=snapshot_payload,
"iteration": iterations, content=json.dumps(snapshot_payload, ensure_ascii=False, default=str),
"provider_name": final_provider_name,
"model": final_model,
"messages": messages,
"tools": tool_schemas,
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
},
content=json.dumps(
{
"messages": messages,
"tools": tool_schemas,
},
ensure_ascii=False,
default=str,
),
context_visible=False, context_visible=False,
source=source, source=source,
title=title, title=title,
@ -708,8 +711,19 @@ class AgentLoop:
break break
if iterations >= resolved_max_tool_iterations: if iterations >= resolved_max_tool_iterations:
final_text = response.content or "Tool loop stopped after reaching the configured iteration limit." finalized = await self._finalize_after_tool_limit(
final_finish_reason = "max_tool_iterations" provider=provider,
messages=messages,
model=final_model,
max_tokens=resolved_max_tokens,
temperature=resolved_temperature,
thinking_enabled=thinking_enabled,
)
final_text = finalized or (
"Tool loop stopped after reaching the configured iteration limit, "
"and no final answer was produced."
)
final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations"
session_manager.append_message( session_manager.append_message(
resolved_session_id, resolved_session_id,
run_id=resolved_run_id, run_id=resolved_run_id,
@ -853,6 +867,39 @@ class AgentLoop:
raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}") raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}")
return value return value
@staticmethod
async def _finalize_after_tool_limit(
*,
provider: Any,
messages: list[dict[str, Any]],
model: str,
max_tokens: int,
temperature: float,
thinking_enabled: bool | None,
) -> str:
final_messages = [
*messages,
{
"role": "system",
"content": (
"The configured tool iteration budget is exhausted. Do not call tools. "
"Produce the best final answer from the existing conversation and tool results. "
"State uncertainty explicitly."
),
},
]
kwargs: dict[str, Any] = {
"messages": final_messages,
"tools": None,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
}
if thinking_enabled is not None:
kwargs["thinking_enabled"] = thinking_enabled
response = await provider.chat(**kwargs)
return (response.content or "").strip()
@staticmethod @staticmethod
def _load_pinned_skill_contexts(skills_loader: Any, skill_names: list[str]) -> list[SkillContext]: def _load_pinned_skill_contexts(skills_loader: Any, skill_names: list[str]) -> list[SkillContext]:
contexts: list[SkillContext] = [] contexts: list[SkillContext] = []

View File

@ -22,7 +22,16 @@ from beaver.engine import AgentLoop, AgentProfile, AgentRunResult, EngineLoader
from beaver.engine.providers import make_provider_bundle from beaver.engine.providers import make_provider_bundle
from beaver.foundation.events import InboundMessage, OutboundMessage from beaver.foundation.events import InboundMessage, OutboundMessage
from beaver.foundation.models import CronJob, CronRunRecord from beaver.foundation.models import CronJob, CronRunRecord
from beaver.tasks import MainAgentRouter, TaskExecutionPlan, TaskRecord, ValidationResult from beaver.tasks import (
EvidenceBuilder,
MainAgentRouter,
RunEvidence,
TaskEvidencePacket,
TaskExecutionPlan,
TaskRecord,
ValidationResult,
render_task_evidence,
)
NOTIFICATION_SESSION_ID = "notify:default:scheduled" NOTIFICATION_SESSION_ID = "notify:default:scheduled"
@ -715,6 +724,7 @@ class AgentService:
) )
team_summaries: list[str] = [] team_summaries: list[str] = []
team_execution_context = "" team_execution_context = ""
team_result: TeamRunResult | None = None
if plan.is_team: if plan.is_team:
team_result, team_error = await self._run_team_for_task( team_result, team_error = await self._run_team_for_task(
plan, plan,
@ -725,7 +735,18 @@ class AgentService:
) )
if team_result is not None: if team_result is not None:
team_summaries = [self._team_summary_for_validation(team_result)] team_summaries = [self._team_summary_for_validation(team_result)]
team_execution_context = self._team_execution_context(plan, team_result) team_packet = TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=None,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results),
final_output="",
)
team_execution_context = self._join_context(
self._team_execution_context(plan, team_result),
"Rendered team evidence:\n" + render_task_evidence(team_packet),
)
self._append_task_observation( self._append_task_observation(
session_manager, session_manager,
task.session_id, task.session_id,
@ -782,6 +803,9 @@ class AgentService:
) )
elif team_execution_context: elif team_execution_context:
attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context) attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context)
if plan.is_team and team_execution_context:
attempt_kwargs["include_tools"] = False
attempt_kwargs["max_tool_iterations"] = 0
attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context( attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context(
task=task, task=task,
user_message=message, user_message=message,
@ -810,17 +834,39 @@ class AgentService:
result.run_id, result.run_id,
skill_names=self._skill_names_for_run(loaded, result.run_id), skill_names=self._skill_names_for_run(loaded, result.run_id),
) )
evidence_packet = self._build_task_evidence_packet(
session_manager=session_manager,
task=task,
attempt_index=attempt_index,
result=result,
team_result=team_result,
)
evidence_text = render_task_evidence(evidence_packet)
validation = await validation_service.validate_task_result( validation = await validation_service.validate_task_result(
task=task, task=task,
user_message=message, user_message=message,
final_output=result.output_text, final_output=result.output_text,
evidence_packet=evidence_packet,
evidence_text=evidence_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id), transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id), tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries, team_summaries=team_summaries,
provider_bundle=provider_bundle, provider_bundle=provider_bundle,
) )
latest_validation = validation latest_validation = validation
task = task_service.record_validation(task.task_id, result.run_id, validation) has_usable_answer = bool(result.output_text.strip()) and (
"Tool loop stopped after reaching the configured iteration limit." not in result.output_text
)
task = task_service.record_validation(
task.task_id,
result.run_id,
validation,
final_attempt=(
attempt_index == 2
or validation.status in {"accepted", "insufficient_evidence", "validator_error"}
),
has_usable_answer=has_usable_answer,
)
run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict()) run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict())
session_manager.update_latest_assistant_event_payload( session_manager.update_latest_assistant_event_payload(
result.session_id, result.session_id,
@ -831,6 +877,23 @@ class AgentService:
"validation_status": "passed" if validation.accepted else "failed", "validation_status": "passed" if validation.accepted else "failed",
}, },
) )
validation_debug = {
"evidence_run_ids": [
item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"evidence_session_ids": [
item.session_id
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
if item is not None
],
"tool_result_count": sum(
len(item.tool_results)
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
if item is not None
),
"evidence_length": len(evidence_text),
}
retry_scheduled = validation.status == "rejected" and attempt_index == 1
session_manager.append_message( session_manager.append_message(
result.session_id, result.session_id,
run_id=result.run_id, run_id=result.run_id,
@ -840,17 +903,18 @@ class AgentService:
"task_id": task.task_id, "task_id": task.task_id,
"attempt_index": attempt_index, "attempt_index": attempt_index,
"validation_result": validation.to_dict(), "validation_result": validation.to_dict(),
"retry_scheduled": not validation.accepted and attempt_index == 1, "validation_debug": validation_debug,
"retry_scheduled": retry_scheduled,
}, },
content=validation.recommended_revision_prompt or None, content=validation.recommended_revision_prompt or None,
context_visible=False, context_visible=False,
) )
if not validation.accepted and attempt_index == 1: if retry_scheduled:
session_manager.set_run_context_visible(result.session_id, result.run_id, False) session_manager.set_run_context_visible(result.session_id, result.run_id, False)
result.task_id = task.task_id result.task_id = task.task_id
result.task_status = task.status result.task_status = task.status
result.validation_result = validation.to_dict() result.validation_result = validation.to_dict()
if validation.accepted or attempt_index == 2: if not retry_scheduled:
return result return result
if last_result is None: # pragma: no cover - defensive if last_result is None: # pragma: no cover - defensive
@ -1083,6 +1147,36 @@ class AgentService:
payloads.append(payload) payloads.append(payload)
return payloads return payloads
@staticmethod
def _team_run_evidence(result: TeamRunResult | None) -> list[RunEvidence]:
if result is None:
return []
return [node.evidence for node in result.node_results if node.evidence is not None]
def _build_task_evidence_packet(
self,
*,
session_manager: Any,
task: TaskRecord,
attempt_index: int,
result: AgentRunResult,
team_result: TeamRunResult | None,
) -> TaskEvidencePacket:
main_run = EvidenceBuilder(session_manager).build_run_evidence(
result.session_id,
result.run_id,
result.output_text,
result.finish_reason,
)
return TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=main_run,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results) if team_result is not None else [],
final_output=result.output_text,
)
@staticmethod @staticmethod
def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str: def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str:
node_lines = [ node_lines = [

View File

@ -16,10 +16,10 @@ if TYPE_CHECKING:
class TeamService: class TeamService:
"""Internal service for Beaver-native multi-agent execution.""" """Internal service for Beaver-native multi-agent execution."""
def __init__(self, loop: AgentLoop) -> None: def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None:
self.loop = loop self.loop = loop
self.runner = LocalAgentRunner(loop) self.runner = LocalAgentRunner(loop)
self.scheduler = TeamGraphScheduler(self.runner) self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes)
async def run_team( async def run_team(
self, self,

View File

@ -1,6 +1,7 @@
"""Internal task tracking for automatic Main Agent task mode.""" """Internal task tracking for automatic Main Agent task mode."""
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult, ValidationStatus
from .planner import TaskExecutionPlan, TaskExecutionPlanner from .planner import TaskExecutionPlan, TaskExecutionPlanner
from .router import MainAgentRouter from .router import MainAgentRouter
from .service import TaskService from .service import TaskService
@ -8,15 +9,21 @@ from .skill_resolver import SkillResolutionReport, TaskSkillResolver
from .validation import ValidationService from .validation import ValidationService
__all__ = [ __all__ = [
"EvidenceBuilder",
"MainAgentDecision", "MainAgentDecision",
"MainAgentRouter", "MainAgentRouter",
"RunEvidence",
"TaskEvent", "TaskEvent",
"TaskEvidencePacket",
"TaskExecutionPlan", "TaskExecutionPlan",
"TaskExecutionPlanner", "TaskExecutionPlanner",
"TaskRecord", "TaskRecord",
"TaskService", "TaskService",
"SkillResolutionReport", "SkillResolutionReport",
"TaskSkillResolver", "TaskSkillResolver",
"ToolEvidence",
"ValidationResult", "ValidationResult",
"ValidationStatus",
"ValidationService", "ValidationService",
"render_task_evidence",
] ]

View File

@ -0,0 +1,183 @@
"""Structured evidence for task synthesis and validation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any] = field(default_factory=dict)
url: str | None = None
title: str | None = None
created_at: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_call_id": self.tool_call_id,
"content": self.content,
"event_payload": dict(self.event_payload),
"url": self.url,
"title": self.title,
"created_at": self.created_at,
}
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]] = field(default_factory=list)
tool_results: list[ToolEvidence] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"session_id": self.session_id,
"output_text": self.output_text,
"finish_reason": self.finish_reason,
"transcript": list(self.transcript),
"tool_results": [item.to_dict() for item in self.tool_results],
"warnings": list(self.warnings),
}
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence] = field(default_factory=list)
team_node_results: list[Any] = field(default_factory=list)
final_output: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"attempt_index": self.attempt_index,
"main_run": self.main_run.to_dict() if self.main_run else None,
"team_runs": [item.to_dict() for item in self.team_runs],
"team_node_results": [
item.to_dict() if hasattr(item, "to_dict") else dict(item)
for item in self.team_node_results
],
"final_output": self.final_output,
}
class EvidenceBuilder:
def __init__(self, session_manager: Any) -> None:
self.session_manager = session_manager
def build_run_evidence(
self,
session_id: str,
run_id: str,
output_text: str,
finish_reason: str,
) -> RunEvidence:
events = self.session_manager.get_run_event_records(session_id, run_id)
transcript: list[dict[str, Any]] = []
tool_results: list[ToolEvidence] = []
warnings: list[str] = []
for event in events:
payload = dict(event.event_payload or {})
transcript.append(
{
"role": event.role,
"event_type": event.event_type,
"content": event.content,
"tool_name": event.tool_name,
"tool_call_id": event.tool_call_id,
"finish_reason": event.finish_reason,
"event_payload": payload,
}
)
if event.event_type == "tool_result_recorded":
tool_results.append(
ToolEvidence(
tool_name=event.tool_name or "tool",
tool_call_id=event.tool_call_id,
content=event.content or "",
event_payload=payload,
url=_optional_str(payload.get("url")),
title=_optional_str(payload.get("title")),
created_at=_optional_str(payload.get("created_at")),
)
)
if finish_reason and finish_reason != "stop":
warnings.append(f"finish_reason={finish_reason}")
return RunEvidence(
run_id=run_id,
session_id=session_id,
output_text=output_text,
finish_reason=finish_reason,
transcript=transcript,
tool_results=tool_results,
warnings=warnings,
)
def render_task_evidence(packet: TaskEvidencePacket) -> str:
sections = [
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
f"Final output:\n{packet.final_output}",
]
if packet.main_run is not None:
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
if packet.team_runs:
sections.append(
"Team run evidence:\n"
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
)
if packet.team_node_results:
lines = []
for item in packet.team_node_results:
lines.append(
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
)
sections.append("Team node results:\n" + "\n".join(lines))
return "\n\n".join(section for section in sections if section.strip())
def render_run_evidence(evidence: RunEvidence) -> str:
lines = [
f"run_id={evidence.run_id}",
f"session_id={evidence.session_id}",
f"finish_reason={evidence.finish_reason}",
]
if evidence.output_text:
lines.append(f"output:\n{evidence.output_text}")
if evidence.warnings:
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
if evidence.tool_results:
lines.append(
"tool_results:\n"
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
)
return "\n".join(lines)
def _render_tool_evidence(item: ToolEvidence) -> str:
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
metadata = []
if item.url:
metadata.append(f"url={item.url}")
if item.title:
metadata.append(f"title={item.title}")
if item.created_at:
metadata.append(f"created_at={item.created_at}")
return "\n".join([header, *metadata, item.content])
def _optional_str(value: Any) -> str | None:
return str(value) if value is not None else None

View File

@ -3,31 +3,63 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from typing import Any, Literal
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_revision"} ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
VALIDATION_STATUSES = {"accepted", "rejected", "insufficient_evidence", "validator_error"}
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"}
@dataclass(slots=True) @dataclass(slots=True)
class ValidationResult: class ValidationResult:
passed: bool status: ValidationStatus = "rejected"
score: float score: float = 0.0
issues: list[str] = field(default_factory=list) issues: list[str] = field(default_factory=list)
missing_requirements: list[str] = field(default_factory=list) missing_requirements: list[str] = field(default_factory=list)
evidence_gaps: list[str] = field(default_factory=list)
recommended_revision_prompt: str = "" recommended_revision_prompt: str = ""
validator: str = "heuristic" validator: str = "heuristic"
def __init__(
self,
*,
status: ValidationStatus | None = None,
passed: bool | None = None,
score: float = 0.0,
issues: list[str] | None = None,
missing_requirements: list[str] | None = None,
evidence_gaps: list[str] | None = None,
recommended_revision_prompt: str = "",
validator: str = "heuristic",
) -> None:
if status is not None and status not in VALIDATION_STATUSES:
raise ValueError(f"unknown validation status: {status}")
self.status = status or ("accepted" if passed and score >= 0.75 else "rejected")
self.score = max(0.0, min(1.0, float(score or 0.0)))
self.issues = list(issues or [])
self.missing_requirements = list(missing_requirements or [])
self.evidence_gaps = list(evidence_gaps or [])
self.recommended_revision_prompt = recommended_revision_prompt
self.validator = validator
@property
def passed(self) -> bool:
return self.status == "accepted"
@property @property
def accepted(self) -> bool: def accepted(self) -> bool:
return self.passed and self.score >= 0.75 return self.status == "accepted"
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
"status": self.status,
"passed": self.passed, "passed": self.passed,
"score": self.score, "score": self.score,
"issues": list(self.issues), "issues": list(self.issues),
"missing_requirements": list(self.missing_requirements), "missing_requirements": list(self.missing_requirements),
"evidence_gaps": list(self.evidence_gaps),
"recommended_revision_prompt": self.recommended_revision_prompt, "recommended_revision_prompt": self.recommended_revision_prompt,
"validator": self.validator, "validator": self.validator,
"accepted": self.accepted, "accepted": self.accepted,
@ -37,11 +69,17 @@ class ValidationResult:
def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None": def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None":
if not isinstance(payload, dict): if not isinstance(payload, dict):
return None return None
raw_status = payload.get("status")
if "status" in payload and raw_status not in VALIDATION_STATUSES:
raise ValueError(f"unknown validation status: {raw_status}")
status: ValidationStatus | None = raw_status if "status" in payload else None
return cls( return cls(
passed=bool(payload.get("passed")), status=status,
passed=bool(payload.get("passed")) if "status" not in payload else None,
score=float(payload.get("score", 0.0) or 0.0), score=float(payload.get("score", 0.0) or 0.0),
issues=[str(item) for item in payload.get("issues") or []], issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator=str(payload.get("validator") or "unknown"), validator=str(payload.get("validator") or "unknown"),
) )
@ -73,6 +111,14 @@ class TaskRecord:
def is_open(self) -> bool: def is_open(self) -> bool:
return self.status in TASK_OPEN_STATUSES return self.status in TASK_OPEN_STATUSES
@property
def is_execution_active(self) -> bool:
return self.status in {"running", "validating"}
@property
def requires_user_action(self) -> bool:
return self.status in {"awaiting_feedback", "needs_review", "needs_revision"}
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
"task_id": self.task_id, "task_id": self.task_id,

View File

@ -77,6 +77,8 @@ class TaskService:
payload = task.to_dict() payload = task.to_dict()
payload["short_title"] = self.ensure_short_title(task).metadata.get("short_title") payload["short_title"] = self.ensure_short_title(task).metadata.get("short_title")
payload["is_open"] = task.is_open payload["is_open"] = task.is_open
payload["is_execution_active"] = task.is_execution_active
payload["requires_user_action"] = task.requires_user_action
return payload return payload
def ensure_short_title(self, task: TaskRecord) -> TaskRecord: def ensure_short_title(self, task: TaskRecord) -> TaskRecord:
@ -108,10 +110,30 @@ class TaskService:
self._event(task, "run_completed", run_id=run_id, payload={"skill_names": skill_names or []}) self._event(task, "run_completed", run_id=run_id, payload={"skill_names": skill_names or []})
return task return task
def record_validation(self, task_id: str, run_id: str, validation: ValidationResult) -> TaskRecord: def record_validation(
self,
task_id: str,
run_id: str,
validation: ValidationResult,
*,
final_attempt: bool = True,
has_usable_answer: bool = True,
) -> TaskRecord:
task = self._require(task_id) task = self._require(task_id)
task.status = "awaiting_feedback" now = self._now()
task.updated_at = self._now() if validation.status == "accepted":
task.status = "awaiting_feedback"
elif validation.status in {"insufficient_evidence", "validator_error"}:
task.status = "needs_review"
elif validation.status == "rejected" and not final_attempt:
task.status = "needs_revision"
elif validation.status == "rejected" and has_usable_answer:
task.status = "needs_review"
else:
task.status = "failed"
task.closed_at = now
task.close_reason = "automatic validation rejected the final attempt"
task.updated_at = now
task.validation_result = validation.to_dict() task.validation_result = validation.to_dict()
self.store.upsert_task(task) self.store.upsert_task(task)
self._event(task, "validated", run_id=run_id, payload=validation.to_dict()) self._event(task, "validated", run_id=run_id, payload=validation.to_dict())

View File

@ -17,6 +17,8 @@ class ValidationService:
task: TaskRecord, task: TaskRecord,
user_message: str, user_message: str,
final_output: str, final_output: str,
evidence_packet: Any | None = None,
evidence_text: str = "",
transcript_excerpt: str = "", transcript_excerpt: str = "",
tool_summaries: list[str] | None = None, tool_summaries: list[str] | None = None,
team_summaries: list[str] | None = None, team_summaries: list[str] | None = None,
@ -36,19 +38,20 @@ class ValidationService:
task=task, task=task,
user_message=user_message, user_message=user_message,
final_output=final_output, final_output=final_output,
evidence_text=evidence_text,
transcript_excerpt=transcript_excerpt, transcript_excerpt=transcript_excerpt,
tool_summaries=tool_summaries or [], tool_summaries=tool_summaries or [],
team_summaries=team_summaries or [], team_summaries=team_summaries or [],
) )
except Exception as exc: except Exception as exc:
return ValidationResult( return ValidationResult(
passed=False, status="validator_error",
score=0.0, score=0.0,
issues=[f"Validator failed: {exc}"], issues=[f"Validator failed: {exc}"],
missing_requirements=["A valid automatic validation result is required before accepting the task."], evidence_gaps=["Automatic validation failed before producing a reliable decision."],
missing_requirements=["User review is required because automatic validation failed."],
recommended_revision_prompt=( recommended_revision_prompt=(
"Review the task result again because automatic validation failed, " "Review the answer and evidence, then decide whether to revise or accept it."
"then provide a corrected final answer that explicitly satisfies the task goal."
), ),
validator="llm_error", validator="llm_error",
) )
@ -62,20 +65,25 @@ class ValidationService:
task: TaskRecord, task: TaskRecord,
user_message: str, user_message: str,
final_output: str, final_output: str,
evidence_text: str,
transcript_excerpt: str, transcript_excerpt: str,
tool_summaries: list[str], tool_summaries: list[str],
team_summaries: list[str], team_summaries: list[str],
) -> ValidationResult: ) -> ValidationResult:
legacy_context = "" if evidence_text else (
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
)
prompt = ( prompt = (
"Validate whether the assistant output satisfies the task. " "Validate whether the assistant output satisfies the task. "
"Return only compact JSON with keys: passed, score, issues, " "Return only compact JSON with keys: passed, score, issues, "
"missing_requirements, recommended_revision_prompt.\n\n" "missing_requirements, recommended_revision_prompt.\n\n"
f"Task goal:\n{task.goal}\n\n" f"Task goal:\n{task.goal}\n\n"
f"Current user request:\n{user_message}\n\n" f"Current user request:\n{user_message}\n\n"
f"Transcript excerpt:\n{transcript_excerpt[:2500]}\n\n" f"Evidence packet:\n{evidence_text}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries[:12], ensure_ascii=False)}\n\n" f"{legacy_context}"
f"Team summaries:\n{json.dumps(team_summaries[:12], ensure_ascii=False)}\n\n" f"Assistant final output:\n{final_output}"
f"Assistant final output:\n{final_output[:4000]}"
) )
response = await provider.chat( response = await provider.chat(
messages=[ messages=[
@ -88,11 +96,19 @@ class ValidationService:
temperature=0.0, temperature=0.0,
) )
payload = self._parse_json_object(response.content or "") payload = self._parse_json_object(response.content or "")
status = payload.get("status")
if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}:
status = (
"accepted"
if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75
else "rejected"
)
return ValidationResult( return ValidationResult(
passed=bool(payload.get("passed")), status=status,
score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))), score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))),
issues=[str(item) for item in payload.get("issues") or []], issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator="llm", validator="llm",
) )

View File

@ -45,6 +45,18 @@ class RecordingProvider(LLMProvider):
return "stub-model" return "stub-model"
class BlockingProvider(RecordingProvider):
def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None:
super().__init__([_response(content)])
self.started = started
self.release = release
async def chat(self, *args, **kwargs) -> LLMResponse:
self.started.set()
await self.release.wait()
return await super().chat(*args, **kwargs)
class StubSkillAssembler: class StubSkillAssembler:
def __init__(self, activated_skills: list[SkillContext] | None = None) -> None: def __init__(self, activated_skills: list[SkillContext] | None = None) -> None:
self.activated_skills = list(activated_skills or []) self.activated_skills = list(activated_skills or [])
@ -153,6 +165,26 @@ def test_local_agent_runner_uses_shared_loop_and_records_parent_task(tmp_path: P
assert child_session["parent_session_id"] == "session-root" assert child_session["parent_session_id"] == "session-root"
def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None:
loop = _loop(tmp_path)
provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")])
envelope = DelegationEnvelope(
parent_task_id="task-parent",
parent_session_id="session-root",
parent_run_id="run-root",
agent=AgentDescriptor(name="researcher", role="research"),
task="research the requested topic",
node_id="research",
)
result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider)))
assert result.success is False
assert result.evidence is not None
assert result.evidence.output_text == "partial evidence"
assert result.evidence.finish_reason == "max_tool_iterations"
def test_pinned_skill_is_injected_into_delegated_run(tmp_path: Path) -> None: def test_pinned_skill_is_injected_into_delegated_run(tmp_path: Path) -> None:
_publish_skill( _publish_skill(
tmp_path, tmp_path,
@ -278,6 +310,57 @@ def test_team_parallel_runs_all_nodes(tmp_path: Path) -> None:
assert [item.output_text for item in result.node_results] == ["one", "two", "three"] assert [item.output_text for item in result.node_results] == ["one", "two", "three"]
def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None:
loop = _loop(tmp_path)
first_started = asyncio.Event()
second_started = asyncio.Event()
release = asyncio.Event()
providers = {
"one": BlockingProvider("one", first_started, release),
"two": BlockingProvider("two", second_started, release),
}
graph = ExecutionGraph(
strategy="parallel",
nodes=[
ExecutionNode("one", "task one", AgentDescriptor(name="one")),
ExecutionNode("two", "task two", AgentDescriptor(name="two")),
],
)
async def run_case():
loop_task = asyncio.create_task(loop.run())
await asyncio.sleep(0)
task = asyncio.create_task(
TeamService(loop).run_team(
graph,
parent_task_id=None,
parent_session_id="session-root",
parent_run_id="run-root",
provider_bundle_factory=lambda node: _bundle(providers[node.node_id]),
)
)
try:
await asyncio.wait_for(first_started.wait(), timeout=1)
await asyncio.wait_for(second_started.wait(), timeout=1)
release.set()
return await task
finally:
release.set()
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await loop.stop()
await loop_task
result = asyncio.run(run_case())
assert result.success is True
assert [item.node_id for item in result.node_results] == ["one", "two"]
def test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs(tmp_path: Path) -> None: def test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs(tmp_path: Path) -> None:
loop = _loop(tmp_path) loop = _loop(tmp_path)
loaded = loop.boot() loaded = loop.boot()
@ -438,7 +521,7 @@ def test_team_summary_lists_only_failed_nodes_when_all_nodes_fail(tmp_path: Path
) )
assert result.success is False assert result.success is False
assert result.summary == "Failed nodes:\n- one: one down\n- two: two down" assert result.summary == "Failed nodes:\n- one: one down evidence=no\n- two: two down evidence=no"
def test_graph_structure_errors_still_raise(tmp_path: Path) -> None: def test_graph_structure_errors_still_raise(tmp_path: Path) -> None:

View File

@ -608,6 +608,12 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
provider_name="stub", provider_name="stub",
model="stub-model", model="stub-model",
), ),
LLMResponse(
content="Based on the available tool result, the container likely failed during startup.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
] ]
), ),
) )
@ -621,7 +627,29 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
) )
loaded = loop.boot() loaded = loop.boot()
assert result.finish_reason == "max_tool_iterations" assert result.finish_reason == "max_tool_iterations_finalized"
assert "Based on the available tool result" in result.output_text
assert "Tool loop stopped" not in result.output_text
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007") effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert effect_records[-1].run_id == result.run_id assert effect_records[-1].run_id == result.run_id
assert effect_records[-1].success is False assert effect_records[-1].success is False
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler([])))
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]
),
)
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
assert "message_count" in snapshot.event_payload
assert "tool_names" in snapshot.event_payload
assert "messages" not in snapshot.event_payload
assert "tools" not in snapshot.event_payload

View File

@ -0,0 +1,91 @@
from __future__ import annotations
from pathlib import Path
from beaver.engine.session.manager import SessionManager
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
session_manager = SessionManager(tmp_path)
session_id = "session-1"
run_id = "run-1"
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
session_manager.ensure_session(session_id, source="test")
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
session_manager.append_message(
session_id,
run_id=run_id,
role="tool",
event_type="tool_result_recorded",
event_payload={"success": True, "url": "https://example.test/match"},
content=long_content,
tool_name="web_fetch",
tool_call_id="call-1",
)
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="run_completed",
event_payload={"finish_reason": "stop"},
content="Manchester United won 3-2.",
finish_reason="stop",
context_visible=False,
)
evidence = EvidenceBuilder(session_manager).build_run_evidence(
session_id,
run_id,
"Manchester United won 3-2.",
"stop",
)
rendered = render_task_evidence(
TaskEvidencePacket(
task_id="task-1",
attempt_index=1,
main_run=evidence,
team_runs=[],
team_node_results=[],
final_output="Manchester United won 3-2.",
)
)
assert evidence.tool_results[0].content == long_content
assert "MAN 3 FT 2 NFO" in rendered
assert "https://example.test/match" in rendered
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
run = RunEvidence(
run_id="run-team",
session_id="session-team",
output_text="Tool loop stopped.",
finish_reason="max_tool_iterations",
transcript=[],
tool_results=[
ToolEvidence(
tool_name="web_fetch",
tool_call_id="call-team",
content="Recovered partial source content.",
event_payload={"success": True, "created_at": "2026-05-22T12:00:00Z"},
created_at="2026-05-22T12:00:00Z",
)
],
warnings=["finish_reason=max_tool_iterations"],
)
packet = TaskEvidencePacket(
task_id="task-1",
attempt_index=2,
main_run=None,
team_runs=[run],
team_node_results=[],
final_output="partial answer",
)
rendered = render_task_evidence(packet)
assert "finish_reason=max_tool_iterations" in rendered
assert "partial answer" in rendered
assert "Recovered partial source content." in rendered
assert "created_at=2026-05-22T12:00:00Z" in rendered

View File

@ -13,14 +13,14 @@ from beaver.engine.providers.base import LLMProvider, LLMResponse
from beaver.engine.providers.factory import ProviderBundle from beaver.engine.providers.factory import ProviderBundle
from beaver.services.agent_service import AgentService from beaver.services.agent_service import AgentService
from beaver.skills.assembler import SkillAssemblyResult from beaver.skills.assembler import SkillAssemblyResult
from beaver.tasks import TaskExecutionPlan, TaskService, ValidationResult, ValidationService from beaver.tasks import TaskExecutionPlan, TaskRecord, TaskService, ValidationResult, ValidationService
class StubProvider(LLMProvider): class StubProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]) -> None: def __init__(self, responses: list[LLMResponse]) -> None:
super().__init__() super().__init__()
self._responses = list(responses) self._responses = list(responses)
self.calls: list[list[dict]] = [] self.calls: list[dict[str, object]] = []
async def chat( async def chat(
self, self,
@ -30,7 +30,7 @@ class StubProvider(LLMProvider):
max_tokens: int = 4096, max_tokens: int = 4096,
temperature: float = 0.7, temperature: float = 0.7,
) -> LLMResponse: ) -> LLMResponse:
self.calls.append(messages) self.calls.append({"messages": messages, "tools": tools, "model": model})
if not self._responses: if not self._responses:
raise AssertionError("No stubbed provider responses left") raise AssertionError("No stubbed provider responses left")
return self._responses.pop(0) return self._responses.pop(0)
@ -42,8 +42,10 @@ class StubProvider(LLMProvider):
class StubValidationService: class StubValidationService:
def __init__(self, results: list[ValidationResult]) -> None: def __init__(self, results: list[ValidationResult]) -> None:
self.results = list(results) self.results = list(results)
self.calls: list[dict] = []
async def validate_task_result(self, **kwargs) -> ValidationResult: async def validate_task_result(self, **kwargs) -> ValidationResult:
self.calls.append(kwargs)
if not self.results: if not self.results:
raise AssertionError("No stubbed validation results left") raise AssertionError("No stubbed validation results left")
return self.results.pop(0) return self.results.pop(0)
@ -153,6 +155,21 @@ def _main_only_bundle(*responses: str) -> ProviderBundle:
) )
def _task_record(status: str) -> TaskRecord:
return TaskRecord(
task_id="task-1",
session_id="session-1",
description="test task",
goal="test task",
constraints=[],
priority=0,
status=status,
creator="main-agent",
created_at="2026-05-22T00:00:00+00:00",
updated_at="2026-05-22T00:00:00+00:00",
)
def test_simple_question_does_not_create_task(tmp_path: Path) -> None: def test_simple_question_does_not_create_task(tmp_path: Path) -> None:
service = AgentService( service = AgentService(
loader=EngineLoader( loader=EngineLoader(
@ -393,6 +410,81 @@ def test_explicit_revision_feedback_then_input_reruns_without_duplicate_feedback
assert task.feedback[0]["comment"] == "准备补充穿衣建议" assert task.feedback[0]["comment"] == "准备补充穿衣建议"
def test_validation_result_status_drives_accepted_and_passed() -> None:
accepted = ValidationResult(status="accepted", score=0.9, validator="test")
insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test")
rejected = ValidationResult(status="rejected", score=0.9, validator="test")
assert accepted.passed is True
assert accepted.accepted is True
assert insufficient.passed is False
assert insufficient.accepted is False
assert rejected.passed is False
assert rejected.accepted is False
def test_validation_result_from_legacy_payload_maps_to_status() -> None:
accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"})
low_score = ValidationResult.from_dict({"passed": True, "score": 0.7, "validator": "legacy"})
rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"})
assert accepted is not None
assert accepted.status == "accepted"
assert low_score is not None
assert low_score.status == "rejected"
assert rejected is not None
assert rejected.status == "rejected"
def test_validation_result_rejects_unknown_status() -> None:
with pytest.raises(ValueError, match="unknown validation status"):
ValidationResult(status="pending", score=0.9, validator="test") # type: ignore[arg-type]
def test_validation_result_from_dict_rejects_unknown_explicit_status() -> None:
with pytest.raises(ValueError, match="unknown validation status"):
ValidationResult.from_dict({"status": "pending", "passed": True, "score": 0.9})
def test_validation_result_evidence_gaps_round_trip() -> None:
validation = ValidationResult(
status="insufficient_evidence",
score=0.4,
evidence_gaps=["missing command output", "missing file reference"],
validator="test",
)
restored = ValidationResult.from_dict(validation.to_dict())
assert restored is not None
assert restored.status == "insufficient_evidence"
assert restored.evidence_gaps == ["missing command output", "missing file reference"]
assert restored.to_dict()["evidence_gaps"] == ["missing command output", "missing file reference"]
def test_task_record_status_helpers_distinguish_review_and_failed() -> None:
needs_review = _task_record("needs_review")
failed = _task_record("failed")
assert needs_review.is_open is True
assert needs_review.is_execution_active is False
assert needs_review.requires_user_action is True
assert failed.is_open is False
assert failed.is_execution_active is False
assert failed.requires_user_action is False
def test_task_service_api_payload_emits_status_helpers(tmp_path: Path) -> None:
service = TaskService(tmp_path)
task = _task_record("needs_review")
payload = service.to_api_dict(task)
assert payload["is_open"] is True
assert payload["is_execution_active"] is False
assert payload["requires_user_action"] is True
def test_validation_failure_retries_once(tmp_path: Path) -> None: def test_validation_failure_retries_once(tmp_path: Path) -> None:
service = AgentService( service = AgentService(
loader=EngineLoader( loader=EngineLoader(
@ -616,10 +708,45 @@ def test_task_mode_team_plan_runs_subagent_then_main_synthesis(tmp_path: Path) -
assert result.run_id == task.run_ids[-1] assert result.run_id == task.run_ids[-1]
assert any(event.event_type == "task_execution_planned" for event in events) assert any(event.event_type == "task_execution_planned" for event in events)
assert any(event.event_type == "task_team_run_completed" for event in events) assert any(event.event_type == "task_team_run_completed" for event in events)
assert "sub-agent evidence" in main_provider.calls[0][0]["content"] assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "sub-agent evidence" != result.output_text assert "sub-agent evidence" != result.output_text
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
main_provider = StubProvider(
[
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
sub_provider = StubProvider(
[
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
validation_service=validation,
)
)
result = asyncio.run(
service.process_direct(
"implement team-backed workflow",
session_id="web:team-no-tools",
provider_bundle=_provider_bundle(main_provider),
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
)
)
assert result.output_text == "final synthesized answer"
assert main_provider.calls[0]["tools"] is None
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None: def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None:
main_provider = StubProvider( main_provider = StubProvider(
[ [
@ -647,9 +774,48 @@ def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> Non
assert result.output_text == "fallback synthesized answer" assert result.output_text == "fallback synthesized answer"
assert any(event.event_type == "task_team_run_failed" for event in events) assert any(event.event_type == "task_team_run_failed" for event in events)
assert "sub-agent unavailable" in main_provider.calls[0][0]["content"] assert "sub-agent unavailable" in main_provider.calls[0]["messages"][0]["content"]
assert "same class of tools fails repeatedly" in main_provider.calls[0][0]["content"] assert "same class of tools fails repeatedly" in main_provider.calls[0]["messages"][0]["content"]
assert "user-visible fallback answer" in main_provider.calls[0][0]["content"] assert "user-visible fallback answer" in main_provider.calls[0]["messages"][0]["content"]
def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None:
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=_single_planner(),
validation_service=StubValidationService(
[
ValidationResult(
status="insufficient_evidence",
score=0.4,
evidence_gaps=["source missing"],
validator="test",
)
]
),
)
)
result = asyncio.run(
service.process_direct(
"answer with uncertain evidence",
session_id="web:needs-review",
provider_bundle=_bundle("possible answer"),
)
)
loaded = service.create_loop().boot()
task = loaded.task_service.get_task(result.task_id)
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted")
assert task is not None
assert task.status == "needs_review"
assert task.requires_user_action is True
assert task.is_execution_active is False
assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence"
assert validation_event.event_payload["retry_scheduled"] is False
assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0
def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None: def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None:
@ -763,5 +929,6 @@ def test_llm_validator_parse_failure_is_not_accepted(tmp_path: Path) -> None:
) )
assert validation.accepted is False assert validation.accepted is False
assert validation.status == "validator_error"
assert validation.validator == "llm_error" assert validation.validator == "llm_error"
assert validation.issues assert validation.issues

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,265 @@
# Task Evidence and Validation Redesign
Date: 2026-05-22
## Context
Two recent task runs exposed the same underlying weakness from different angles:
- The agent can use complete tool results, but validation only receives truncated excerpts. A key fact can be present in the run log yet absent from validation context, causing a false rejection.
- Team execution can gather useful evidence in sub-agent runs, but that evidence is not reliably carried into final synthesis or validation. Failed team nodes are especially lossy.
- Team graphs marked `parallel` are currently scheduled through a shared single-consumer `AgentLoop`, so production execution can be effectively serial.
- Final synthesis after a team run still has full tools available, so it can repeat searches instead of synthesizing from team evidence.
- `max_tool_iterations` stops the tool loop with a placeholder message instead of forcing a final answer from already gathered evidence.
- Validation failures enter an open-looking state, which makes the UI feel like the task never completed.
The selected approach is a medium refactor: keep the existing `AgentService`, `TeamService`, and `AgentLoop` structure, but add a structured evidence pipeline, clearer validation semantics, finite team concurrency, no-tools synthesis after team runs, and explicit task states.
## Goals
- Preserve complete run evidence for synthesis and validation.
- Stop using fixed truncation for validation inputs.
- Distinguish "answer is contradicted" from "validator lacks enough evidence".
- Let user feedback be the final business judgment after an answer is shown.
- Make `parallel` team execution actually concurrent within a bounded limit.
- Prevent final synthesis from repeating team tool work by default.
- Produce a useful final answer when tool iteration limits are reached.
- Add enough debug metadata to diagnose validation decisions without reconstructing SQLite logs by hand.
## Non-Goals
- Rewriting the whole execution runtime.
- Introducing a distributed worker pool.
- Building a generic evidence bus for every future subsystem.
- Solving all provider rate-limit and storage concurrency concerns beyond the bounded local concurrency needed for team parallel nodes.
## Validation Semantics and Task States
Automatic validation becomes advisory evidence assessment, not the final user satisfaction signal.
Validation results should include:
```python
status: Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
passed: bool
score: float
issues: list[str]
missing_requirements: list[str]
evidence_gaps: list[str]
recommended_revision_prompt: str
```
`status` is the business decision field. `passed` is a compatibility boolean derived from `status`, not an independent source of truth. The mapping is:
- `status == "accepted"` -> `passed=True`
- `status in {"rejected", "insufficient_evidence", "validator_error"}` -> `passed=False`
Task mode, retry, and status transition logic must branch on `status`. New code treats `status == "accepted"` as the acceptance condition. Existing compatibility paths may continue to interpret acceptance as `passed and score >= 0.75` until they are migrated, but new logic should not derive status from `passed` or infer failure from `passed=False` alone.
Rules:
- `accepted`: the final answer is supported by available evidence and satisfies the task. The task enters `awaiting_feedback`.
- `insufficient_evidence`: the validator cannot confirm the answer from available evidence. It must not claim fabrication or contradiction. The task enters `needs_review`.
- `validator_error`: the validator failed to produce a reliable decision. The task enters `needs_review`.
- `rejected`: the evidence clearly contradicts the answer, or the answer clearly misses the task. The first attempt can trigger retry. The last attempt enters `failed` only when there is no usable answer; otherwise it enters `needs_review`.
Task statuses:
- `open`: task exists but has not started.
- `running`: execution is active.
- `validating`: final answer exists and automatic validation is running.
- `awaiting_feedback`: answer is available and automatic validation accepted it.
- `needs_review`: answer is available, but automatic validation could not confirm it or hit a validator error.
- `needs_revision`: user requested revision, or automatic validation rejected an attempt that can still be retried.
- `failed`: execution ended without a usable answer.
- `closed`: user marked the answer satisfied.
- `abandoned`: user abandoned the task.
`needs_review` remains an open status for the active task API, but the UI should distinguish it from `running`. `failed`, `closed`, and `abandoned` are terminal.
Open status does not mean auto-runnable. The backend should split status semantics:
- `is_open`: the task can still receive user feedback or revision.
- `is_execution_active`: the backend is currently running or validating work.
- `requires_user_action`: the task has stopped automatic execution and needs user input.
`needs_review` should have `is_open=True`, `is_execution_active=False`, and `requires_user_action=True`. Schedulers, automatic retry loops, and active-task polling must not treat `needs_review` as a reason to continue execution. It should appear in the active task API only so the user can review, mark satisfied, revise, or abandon.
User feedback is authoritative:
- `satisfied` closes the task.
- `revise` moves the task to `needs_revision`.
- `abandon` moves the task to `abandoned`.
## Evidence Models
Add structured evidence models in the task or coordinator layer.
```python
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any]
url: str | None = None
title: str | None = None
created_at: str | None = None
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]]
tool_results: list[ToolEvidence]
warnings: list[str]
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence]
team_node_results: list[NodeRunResult]
final_output: str
```
`llm_request_snapshotted` events are debug material, not task evidence. They may be referenced in validation debug metadata, but validation should primarily consume transcript, tool results, team node outputs, and final output.
## Evidence Data Flow
1. `AgentLoop` continues to write session events as it does now.
2. After a run completes, an evidence builder reads `session_manager.get_run_event_records(session_id, run_id)` and creates `RunEvidence`.
3. `LocalAgentRunner.run()` attaches `RunEvidence` to `NodeRunResult`.
4. `NodeRunResult` gains `evidence: RunEvidence | None`.
5. `TeamRunResult` carries node evidence through `node_results`; it may also expose a convenience `run_evidence` list.
6. `AgentService._run_task_mode()` builds a `TaskEvidencePacket` after team execution and final synthesis.
7. Final synthesis receives a rendered evidence context built from the same packet.
8. `ValidationService.validate_task_result()` receives the same packet and renders it into the validation prompt without fixed truncation.
Failed or partial nodes must still preserve evidence. A node with `finish_reason="max_tool_iterations"` can be unsuccessful while still carrying useful tool results.
## Final Synthesis Behavior
For team-backed task plans, final synthesis defaults to no tools:
```python
include_tools = False
max_tool_iterations = 0
```
The synthesis prompt should instruct the main agent to:
- use team evidence as the source of truth;
- avoid repeating failed or completed tool calls;
- answer with available evidence;
- clearly state missing or uncertain information.
The planner may explicitly allow a small synthesis tool budget, but the default is no-tools synthesis. If allowed, the budget should be small, such as `max_tool_iterations=1`.
## Tool Iteration Finalization
When a run reaches `max_tool_iterations` and the model still requests tools, the loop should not return `Tool loop stopped...` as the final user-visible answer.
Instead, the loop performs one no-tools finalization call:
- use the accumulated messages and tool results;
- call the provider with `tools=None`;
- add an instruction that the tool budget is exhausted and the model must answer from existing evidence;
- mark the finish reason as `max_tool_iterations_finalized` or another explicit non-stop value;
- return the finalization text as `output_text`.
If finalization itself fails or returns empty content, only then use a clear fallback message explaining that the run could not produce a usable answer.
## Limited Parallel Team Execution
`parallel` team nodes should run concurrently without rewriting the runtime.
Design:
- Keep sequence and DAG behavior on the shared loop where appropriate.
- For `parallel` graph batches, run nodes through isolated `AgentLoop` instances.
- Each isolated loop uses the same workspace and service configuration so session and run records remain queryable from the same stores.
- Add `max_parallel_team_nodes`, default `3`.
- Use an `asyncio.Semaphore` in the scheduler to bound concurrent nodes.
- Return `TeamRunResult.node_results` in graph node order, not completion order.
The implementation should check shared store concurrency. If the current store is not safe for concurrent async writes, add a narrow lock around session/task/run store writes used by these parallel runs.
## Validation Prompt
The validation prompt should consume the full rendered evidence packet, without `[:2500]`, `[:500]`, or `[:12]` fixed caps.
Required validator instructions:
- Return only JSON with the validation fields.
- If evidence is incomplete, return `insufficient_evidence`.
- Only return `rejected` for clear contradiction or clear task failure.
- Do not infer fabrication from missing evidence.
- Do not claim a source lacks a fact unless the rendered evidence proves that absence.
- Treat user feedback as the final business judgment outside automatic validation.
The validator should still be strict about answer quality when evidence is sufficient.
## Validation Debug Metadata
Each `task_validation_snapshotted` event should record:
- validation result;
- validation status;
- attempt index;
- evidence run ids;
- evidence session ids;
- tool result count;
- evidence character length;
- validator raw response;
- rendered validation input or prompt, unless a debug setting disables full prompt storage.
This makes future investigations direct: inspect the exact input the validator saw before interpreting its decision.
## Log Snapshot Size
`llm_request_snapshotted` currently stores complete messages and complete tool schemas in both payload and content. That makes logs large and slows inspection.
Default behavior should change to store a compact payload:
- iteration;
- provider name and model;
- message count;
- tool names;
- message character length;
- tool schema character length;
- max tokens, temperature, thinking flag.
Full request snapshots should be controlled by a debug config flag. This does not reduce validation evidence because evidence comes from transcript and tool result events.
## Testing Plan
Add or update focused unit tests:
1. Validation evidence is not fixed-truncated. A fact after the first 500 characters of a tool result still appears in the validator input.
2. Missing evidence returns `insufficient_evidence` and moves the task to `needs_review`, not `failed`.
3. A team node that ends with `max_tool_iterations` preserves tool evidence in `NodeRunResult.evidence`.
4. Team final synthesis defaults to `tools=None` and receives rendered team evidence.
5. Parallel team nodes start concurrently under a bounded semaphore and results remain in graph order.
6. Tool loop finalization produces a user-visible answer instead of the placeholder stop message.
7. Status transitions cover `accepted -> awaiting_feedback`, `insufficient_evidence -> needs_review`, `validator_error -> needs_review`, and terminal `failed`.
8. Validation debug events include evidence metadata and validator raw response.
## Migration Notes
To reduce risk, implement in layers:
1. Add evidence models and builders without changing behavior.
2. Attach evidence to team node results.
3. Switch final synthesis for team plans to no-tools evidence synthesis.
4. Switch validation to evidence packets and new statuses.
5. Add no-tools finalization for tool iteration limits.
6. Add limited isolated-loop parallel execution.
7. Slim `llm_request_snapshotted` behind a debug flag.
This order keeps each change testable and lets the old transcript-summary path remain as a temporary fallback while evidence packets are introduced.