Compare commits
15 Commits
personal-u
...
agent_team
| Author | SHA1 | Date | |
|---|---|---|---|
| 8068d86760 | |||
| 4022db8887 | |||
| c53e221117 | |||
| b808f5cbc2 | |||
| 0adc04806c | |||
| 60605a74e0 | |||
| 3ff2e2ce11 | |||
| 0ace09b984 | |||
| c3c4df306b | |||
| 5446614828 | |||
| 2fd618da9c | |||
| 28a2627b1f | |||
| 249087e943 | |||
| 8bff282892 | |||
| 3a3e848a78 |
@ -18,8 +18,9 @@ if TYPE_CHECKING:
|
||||
class TeamGraphScheduler:
|
||||
"""Execute sequence, parallel, and DAG team graphs."""
|
||||
|
||||
def __init__(self, runner: LocalAgentRunner) -> None:
|
||||
def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None:
|
||||
self.runner = runner
|
||||
self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes))
|
||||
|
||||
async def run(
|
||||
self,
|
||||
@ -96,7 +97,18 @@ class TeamGraphScheduler:
|
||||
nodes: list[ExecutionNode],
|
||||
**kwargs,
|
||||
) -> list[NodeRunResult]:
|
||||
return list(await asyncio.gather(*(self._run_node(node, dependency_outputs={}, **kwargs) for node in nodes)))
|
||||
semaphore = asyncio.Semaphore(self.max_parallel_team_nodes)
|
||||
|
||||
async def run_one(node: ExecutionNode) -> NodeRunResult:
|
||||
async with semaphore:
|
||||
return await self._run_node(
|
||||
node,
|
||||
dependency_outputs={},
|
||||
execution_mode="isolated_loop",
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
return list(await asyncio.gather(*(run_one(node) for node in nodes)))
|
||||
|
||||
async def _run_dag(
|
||||
self,
|
||||
@ -164,6 +176,7 @@ class TeamGraphScheduler:
|
||||
inherited_pinned_skill_contexts: list["SkillContext"],
|
||||
allow_candidate_generation: bool,
|
||||
dependency_outputs: dict[str, str],
|
||||
execution_mode: str = "shared_loop",
|
||||
) -> NodeRunResult:
|
||||
try:
|
||||
pinned = self._merge_pinned(inherited_pinned_skills, node.inherited_pinned_skills)
|
||||
@ -189,6 +202,7 @@ class TeamGraphScheduler:
|
||||
envelope,
|
||||
provider_bundle=node_provider_bundle,
|
||||
allow_candidate_generation=allow_candidate_generation,
|
||||
execution_mode=execution_mode,
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
@ -241,7 +255,7 @@ class TeamGraphScheduler:
|
||||
failed = [item for item in results if not item.success]
|
||||
if failed:
|
||||
failure_lines = [
|
||||
f"- {item.node_id}: {item.error or item.finish_reason}"
|
||||
f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}"
|
||||
for item in failed
|
||||
]
|
||||
summary_parts.append("Failed nodes:\n" + "\n".join(failure_lines))
|
||||
|
||||
@ -6,6 +6,7 @@ from uuid import uuid4
|
||||
|
||||
from beaver.engine import AgentLoop
|
||||
from beaver.engine.providers import ProviderBundle
|
||||
from beaver.tasks.evidence import EvidenceBuilder
|
||||
|
||||
from .models import DelegationEnvelope, NodeRunResult
|
||||
|
||||
@ -22,6 +23,7 @@ class LocalAgentRunner:
|
||||
*,
|
||||
provider_bundle: ProviderBundle | None = None,
|
||||
allow_candidate_generation: bool = False,
|
||||
execution_mode: str = "shared_loop",
|
||||
) -> NodeRunResult:
|
||||
if provider_bundle is not None and (envelope.agent.model or envelope.agent.provider_name):
|
||||
raise ValueError(
|
||||
@ -29,7 +31,14 @@ class LocalAgentRunner:
|
||||
"build a node-specific provider bundle instead."
|
||||
)
|
||||
child_session_id = self._child_session_id(envelope)
|
||||
runner = self.loop.submit_direct if self.loop.is_running else self.loop.process_direct
|
||||
target_loop = self.loop
|
||||
if execution_mode == "isolated_loop":
|
||||
target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader)
|
||||
runner = (
|
||||
target_loop.process_direct
|
||||
if execution_mode == "isolated_loop"
|
||||
else (self.loop.submit_direct if self.loop.is_running else self.loop.process_direct)
|
||||
)
|
||||
result = await runner(
|
||||
envelope.task,
|
||||
session_id=child_session_id,
|
||||
@ -47,6 +56,13 @@ class LocalAgentRunner:
|
||||
pinned_skill_contexts=envelope.inherited_pinned_skill_contexts,
|
||||
allow_candidate_generation=allow_candidate_generation,
|
||||
)
|
||||
loaded = target_loop.boot()
|
||||
evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence(
|
||||
result.session_id,
|
||||
result.run_id,
|
||||
result.output_text,
|
||||
result.finish_reason,
|
||||
)
|
||||
success = result.finish_reason == "stop"
|
||||
return NodeRunResult(
|
||||
node_id=envelope.node_id or envelope.agent.name,
|
||||
@ -56,6 +72,7 @@ class LocalAgentRunner:
|
||||
session_id=result.session_id,
|
||||
finish_reason=result.finish_reason,
|
||||
error=None if success else (result.output_text or result.finish_reason),
|
||||
evidence=evidence,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from beaver.engine.context import SkillContext
|
||||
from beaver.tasks.evidence import RunEvidence
|
||||
|
||||
|
||||
TeamStrategy = Literal[
|
||||
@ -116,6 +117,7 @@ class NodeRunResult:
|
||||
session_id: str | None = None
|
||||
finish_reason: str = "stop"
|
||||
error: str | None = None
|
||||
evidence: "RunEvidence | None" = None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
@ -126,6 +128,7 @@ class NodeRunResult:
|
||||
"session_id": self.session_id,
|
||||
"finish_reason": self.finish_reason,
|
||||
"error": self.error,
|
||||
"evidence": self.evidence.to_dict() if self.evidence is not None else None,
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -637,36 +637,39 @@ class AgentLoop:
|
||||
while True:
|
||||
chat_kwargs: dict[str, Any] = {
|
||||
"messages": messages,
|
||||
"tools": tool_schemas,
|
||||
"tools": tool_schemas if include_tools else None,
|
||||
"model": final_model,
|
||||
"max_tokens": resolved_max_tokens,
|
||||
"temperature": resolved_temperature,
|
||||
}
|
||||
if thinking_enabled is not None:
|
||||
chat_kwargs["thinking_enabled"] = thinking_enabled
|
||||
message_char_length = len(json.dumps(messages, ensure_ascii=False, default=str))
|
||||
tool_schema_char_length = len(json.dumps(tool_schemas, ensure_ascii=False, default=str))
|
||||
tool_names = [
|
||||
str(tool.get("function", {}).get("name") or tool.get("name") or "tool")
|
||||
for tool in (tool_schemas or [])
|
||||
if isinstance(tool, dict)
|
||||
]
|
||||
snapshot_payload = {
|
||||
"iteration": iterations,
|
||||
"provider_name": final_provider_name,
|
||||
"model": final_model,
|
||||
"message_count": len(messages),
|
||||
"tool_names": tool_names,
|
||||
"message_char_length": message_char_length,
|
||||
"tool_schema_char_length": tool_schema_char_length,
|
||||
"max_tokens": resolved_max_tokens,
|
||||
"temperature": resolved_temperature,
|
||||
"thinking_enabled": thinking_enabled,
|
||||
}
|
||||
session_manager.append_message(
|
||||
resolved_session_id,
|
||||
run_id=resolved_run_id,
|
||||
role="system",
|
||||
event_type="llm_request_snapshotted",
|
||||
event_payload={
|
||||
"iteration": iterations,
|
||||
"provider_name": final_provider_name,
|
||||
"model": final_model,
|
||||
"messages": messages,
|
||||
"tools": tool_schemas,
|
||||
"max_tokens": resolved_max_tokens,
|
||||
"temperature": resolved_temperature,
|
||||
"thinking_enabled": thinking_enabled,
|
||||
},
|
||||
content=json.dumps(
|
||||
{
|
||||
"messages": messages,
|
||||
"tools": tool_schemas,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
default=str,
|
||||
),
|
||||
event_payload=snapshot_payload,
|
||||
content=json.dumps(snapshot_payload, ensure_ascii=False, default=str),
|
||||
context_visible=False,
|
||||
source=source,
|
||||
title=title,
|
||||
@ -708,8 +711,19 @@ class AgentLoop:
|
||||
break
|
||||
|
||||
if iterations >= resolved_max_tool_iterations:
|
||||
final_text = response.content or "Tool loop stopped after reaching the configured iteration limit."
|
||||
final_finish_reason = "max_tool_iterations"
|
||||
finalized = await self._finalize_after_tool_limit(
|
||||
provider=provider,
|
||||
messages=messages,
|
||||
model=final_model,
|
||||
max_tokens=resolved_max_tokens,
|
||||
temperature=resolved_temperature,
|
||||
thinking_enabled=thinking_enabled,
|
||||
)
|
||||
final_text = finalized or (
|
||||
"Tool loop stopped after reaching the configured iteration limit, "
|
||||
"and no final answer was produced."
|
||||
)
|
||||
final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations"
|
||||
session_manager.append_message(
|
||||
resolved_session_id,
|
||||
run_id=resolved_run_id,
|
||||
@ -853,6 +867,39 @@ class AgentLoop:
|
||||
raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}")
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
async def _finalize_after_tool_limit(
|
||||
*,
|
||||
provider: Any,
|
||||
messages: list[dict[str, Any]],
|
||||
model: str,
|
||||
max_tokens: int,
|
||||
temperature: float,
|
||||
thinking_enabled: bool | None,
|
||||
) -> str:
|
||||
final_messages = [
|
||||
*messages,
|
||||
{
|
||||
"role": "system",
|
||||
"content": (
|
||||
"The configured tool iteration budget is exhausted. Do not call tools. "
|
||||
"Produce the best final answer from the existing conversation and tool results. "
|
||||
"State uncertainty explicitly."
|
||||
),
|
||||
},
|
||||
]
|
||||
kwargs: dict[str, Any] = {
|
||||
"messages": final_messages,
|
||||
"tools": None,
|
||||
"model": model,
|
||||
"max_tokens": max_tokens,
|
||||
"temperature": temperature,
|
||||
}
|
||||
if thinking_enabled is not None:
|
||||
kwargs["thinking_enabled"] = thinking_enabled
|
||||
response = await provider.chat(**kwargs)
|
||||
return (response.content or "").strip()
|
||||
|
||||
@staticmethod
|
||||
def _load_pinned_skill_contexts(skills_loader: Any, skill_names: list[str]) -> list[SkillContext]:
|
||||
contexts: list[SkillContext] = []
|
||||
|
||||
@ -22,7 +22,16 @@ from beaver.engine import AgentLoop, AgentProfile, AgentRunResult, EngineLoader
|
||||
from beaver.engine.providers import make_provider_bundle
|
||||
from beaver.foundation.events import InboundMessage, OutboundMessage
|
||||
from beaver.foundation.models import CronJob, CronRunRecord
|
||||
from beaver.tasks import MainAgentRouter, TaskExecutionPlan, TaskRecord, ValidationResult
|
||||
from beaver.tasks import (
|
||||
EvidenceBuilder,
|
||||
MainAgentRouter,
|
||||
RunEvidence,
|
||||
TaskEvidencePacket,
|
||||
TaskExecutionPlan,
|
||||
TaskRecord,
|
||||
ValidationResult,
|
||||
render_task_evidence,
|
||||
)
|
||||
|
||||
|
||||
NOTIFICATION_SESSION_ID = "notify:default:scheduled"
|
||||
@ -715,6 +724,7 @@ class AgentService:
|
||||
)
|
||||
team_summaries: list[str] = []
|
||||
team_execution_context = ""
|
||||
team_result: TeamRunResult | None = None
|
||||
if plan.is_team:
|
||||
team_result, team_error = await self._run_team_for_task(
|
||||
plan,
|
||||
@ -725,7 +735,18 @@ class AgentService:
|
||||
)
|
||||
if team_result is not None:
|
||||
team_summaries = [self._team_summary_for_validation(team_result)]
|
||||
team_execution_context = self._team_execution_context(plan, team_result)
|
||||
team_packet = TaskEvidencePacket(
|
||||
task_id=task.task_id,
|
||||
attempt_index=attempt_index,
|
||||
main_run=None,
|
||||
team_runs=self._team_run_evidence(team_result),
|
||||
team_node_results=list(team_result.node_results),
|
||||
final_output="",
|
||||
)
|
||||
team_execution_context = self._join_context(
|
||||
self._team_execution_context(plan, team_result),
|
||||
"Rendered team evidence:\n" + render_task_evidence(team_packet),
|
||||
)
|
||||
self._append_task_observation(
|
||||
session_manager,
|
||||
task.session_id,
|
||||
@ -782,6 +803,9 @@ class AgentService:
|
||||
)
|
||||
elif team_execution_context:
|
||||
attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context)
|
||||
if plan.is_team and team_execution_context:
|
||||
attempt_kwargs["include_tools"] = False
|
||||
attempt_kwargs["max_tool_iterations"] = 0
|
||||
attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context(
|
||||
task=task,
|
||||
user_message=message,
|
||||
@ -810,17 +834,39 @@ class AgentService:
|
||||
result.run_id,
|
||||
skill_names=self._skill_names_for_run(loaded, result.run_id),
|
||||
)
|
||||
evidence_packet = self._build_task_evidence_packet(
|
||||
session_manager=session_manager,
|
||||
task=task,
|
||||
attempt_index=attempt_index,
|
||||
result=result,
|
||||
team_result=team_result,
|
||||
)
|
||||
evidence_text = render_task_evidence(evidence_packet)
|
||||
validation = await validation_service.validate_task_result(
|
||||
task=task,
|
||||
user_message=message,
|
||||
final_output=result.output_text,
|
||||
evidence_packet=evidence_packet,
|
||||
evidence_text=evidence_text,
|
||||
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
|
||||
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
|
||||
team_summaries=team_summaries,
|
||||
provider_bundle=provider_bundle,
|
||||
)
|
||||
latest_validation = validation
|
||||
task = task_service.record_validation(task.task_id, result.run_id, validation)
|
||||
has_usable_answer = bool(result.output_text.strip()) and (
|
||||
"Tool loop stopped after reaching the configured iteration limit." not in result.output_text
|
||||
)
|
||||
task = task_service.record_validation(
|
||||
task.task_id,
|
||||
result.run_id,
|
||||
validation,
|
||||
final_attempt=(
|
||||
attempt_index == 2
|
||||
or validation.status in {"accepted", "insufficient_evidence", "validator_error"}
|
||||
),
|
||||
has_usable_answer=has_usable_answer,
|
||||
)
|
||||
run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict())
|
||||
session_manager.update_latest_assistant_event_payload(
|
||||
result.session_id,
|
||||
@ -831,6 +877,23 @@ class AgentService:
|
||||
"validation_status": "passed" if validation.accepted else "failed",
|
||||
},
|
||||
)
|
||||
validation_debug = {
|
||||
"evidence_run_ids": [
|
||||
item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
|
||||
],
|
||||
"evidence_session_ids": [
|
||||
item.session_id
|
||||
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
|
||||
if item is not None
|
||||
],
|
||||
"tool_result_count": sum(
|
||||
len(item.tool_results)
|
||||
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
|
||||
if item is not None
|
||||
),
|
||||
"evidence_length": len(evidence_text),
|
||||
}
|
||||
retry_scheduled = validation.status == "rejected" and attempt_index == 1
|
||||
session_manager.append_message(
|
||||
result.session_id,
|
||||
run_id=result.run_id,
|
||||
@ -840,17 +903,18 @@ class AgentService:
|
||||
"task_id": task.task_id,
|
||||
"attempt_index": attempt_index,
|
||||
"validation_result": validation.to_dict(),
|
||||
"retry_scheduled": not validation.accepted and attempt_index == 1,
|
||||
"validation_debug": validation_debug,
|
||||
"retry_scheduled": retry_scheduled,
|
||||
},
|
||||
content=validation.recommended_revision_prompt or None,
|
||||
context_visible=False,
|
||||
)
|
||||
if not validation.accepted and attempt_index == 1:
|
||||
if retry_scheduled:
|
||||
session_manager.set_run_context_visible(result.session_id, result.run_id, False)
|
||||
result.task_id = task.task_id
|
||||
result.task_status = task.status
|
||||
result.validation_result = validation.to_dict()
|
||||
if validation.accepted or attempt_index == 2:
|
||||
if not retry_scheduled:
|
||||
return result
|
||||
|
||||
if last_result is None: # pragma: no cover - defensive
|
||||
@ -1083,6 +1147,36 @@ class AgentService:
|
||||
payloads.append(payload)
|
||||
return payloads
|
||||
|
||||
@staticmethod
|
||||
def _team_run_evidence(result: TeamRunResult | None) -> list[RunEvidence]:
|
||||
if result is None:
|
||||
return []
|
||||
return [node.evidence for node in result.node_results if node.evidence is not None]
|
||||
|
||||
def _build_task_evidence_packet(
|
||||
self,
|
||||
*,
|
||||
session_manager: Any,
|
||||
task: TaskRecord,
|
||||
attempt_index: int,
|
||||
result: AgentRunResult,
|
||||
team_result: TeamRunResult | None,
|
||||
) -> TaskEvidencePacket:
|
||||
main_run = EvidenceBuilder(session_manager).build_run_evidence(
|
||||
result.session_id,
|
||||
result.run_id,
|
||||
result.output_text,
|
||||
result.finish_reason,
|
||||
)
|
||||
return TaskEvidencePacket(
|
||||
task_id=task.task_id,
|
||||
attempt_index=attempt_index,
|
||||
main_run=main_run,
|
||||
team_runs=self._team_run_evidence(team_result),
|
||||
team_node_results=list(team_result.node_results) if team_result is not None else [],
|
||||
final_output=result.output_text,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str:
|
||||
node_lines = [
|
||||
|
||||
@ -16,10 +16,10 @@ if TYPE_CHECKING:
|
||||
class TeamService:
|
||||
"""Internal service for Beaver-native multi-agent execution."""
|
||||
|
||||
def __init__(self, loop: AgentLoop) -> None:
|
||||
def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None:
|
||||
self.loop = loop
|
||||
self.runner = LocalAgentRunner(loop)
|
||||
self.scheduler = TeamGraphScheduler(self.runner)
|
||||
self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes)
|
||||
|
||||
async def run_team(
|
||||
self,
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
"""Internal task tracking for automatic Main Agent task mode."""
|
||||
|
||||
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult
|
||||
from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
|
||||
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult, ValidationStatus
|
||||
from .planner import TaskExecutionPlan, TaskExecutionPlanner
|
||||
from .router import MainAgentRouter
|
||||
from .service import TaskService
|
||||
@ -8,15 +9,21 @@ from .skill_resolver import SkillResolutionReport, TaskSkillResolver
|
||||
from .validation import ValidationService
|
||||
|
||||
__all__ = [
|
||||
"EvidenceBuilder",
|
||||
"MainAgentDecision",
|
||||
"MainAgentRouter",
|
||||
"RunEvidence",
|
||||
"TaskEvent",
|
||||
"TaskEvidencePacket",
|
||||
"TaskExecutionPlan",
|
||||
"TaskExecutionPlanner",
|
||||
"TaskRecord",
|
||||
"TaskService",
|
||||
"SkillResolutionReport",
|
||||
"TaskSkillResolver",
|
||||
"ToolEvidence",
|
||||
"ValidationResult",
|
||||
"ValidationStatus",
|
||||
"ValidationService",
|
||||
"render_task_evidence",
|
||||
]
|
||||
|
||||
183
app-instance/backend/beaver/tasks/evidence.py
Normal file
183
app-instance/backend/beaver/tasks/evidence.py
Normal file
@ -0,0 +1,183 @@
|
||||
"""Structured evidence for task synthesis and validation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ToolEvidence:
|
||||
tool_name: str
|
||||
tool_call_id: str | None
|
||||
content: str
|
||||
event_payload: dict[str, Any] = field(default_factory=dict)
|
||||
url: str | None = None
|
||||
title: str | None = None
|
||||
created_at: str | None = None
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"tool_name": self.tool_name,
|
||||
"tool_call_id": self.tool_call_id,
|
||||
"content": self.content,
|
||||
"event_payload": dict(self.event_payload),
|
||||
"url": self.url,
|
||||
"title": self.title,
|
||||
"created_at": self.created_at,
|
||||
}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RunEvidence:
|
||||
run_id: str
|
||||
session_id: str
|
||||
output_text: str
|
||||
finish_reason: str
|
||||
transcript: list[dict[str, Any]] = field(default_factory=list)
|
||||
tool_results: list[ToolEvidence] = field(default_factory=list)
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"run_id": self.run_id,
|
||||
"session_id": self.session_id,
|
||||
"output_text": self.output_text,
|
||||
"finish_reason": self.finish_reason,
|
||||
"transcript": list(self.transcript),
|
||||
"tool_results": [item.to_dict() for item in self.tool_results],
|
||||
"warnings": list(self.warnings),
|
||||
}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TaskEvidencePacket:
|
||||
task_id: str
|
||||
attempt_index: int
|
||||
main_run: RunEvidence | None
|
||||
team_runs: list[RunEvidence] = field(default_factory=list)
|
||||
team_node_results: list[Any] = field(default_factory=list)
|
||||
final_output: str = ""
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"task_id": self.task_id,
|
||||
"attempt_index": self.attempt_index,
|
||||
"main_run": self.main_run.to_dict() if self.main_run else None,
|
||||
"team_runs": [item.to_dict() for item in self.team_runs],
|
||||
"team_node_results": [
|
||||
item.to_dict() if hasattr(item, "to_dict") else dict(item)
|
||||
for item in self.team_node_results
|
||||
],
|
||||
"final_output": self.final_output,
|
||||
}
|
||||
|
||||
|
||||
class EvidenceBuilder:
|
||||
def __init__(self, session_manager: Any) -> None:
|
||||
self.session_manager = session_manager
|
||||
|
||||
def build_run_evidence(
|
||||
self,
|
||||
session_id: str,
|
||||
run_id: str,
|
||||
output_text: str,
|
||||
finish_reason: str,
|
||||
) -> RunEvidence:
|
||||
events = self.session_manager.get_run_event_records(session_id, run_id)
|
||||
transcript: list[dict[str, Any]] = []
|
||||
tool_results: list[ToolEvidence] = []
|
||||
warnings: list[str] = []
|
||||
for event in events:
|
||||
payload = dict(event.event_payload or {})
|
||||
transcript.append(
|
||||
{
|
||||
"role": event.role,
|
||||
"event_type": event.event_type,
|
||||
"content": event.content,
|
||||
"tool_name": event.tool_name,
|
||||
"tool_call_id": event.tool_call_id,
|
||||
"finish_reason": event.finish_reason,
|
||||
"event_payload": payload,
|
||||
}
|
||||
)
|
||||
if event.event_type == "tool_result_recorded":
|
||||
tool_results.append(
|
||||
ToolEvidence(
|
||||
tool_name=event.tool_name or "tool",
|
||||
tool_call_id=event.tool_call_id,
|
||||
content=event.content or "",
|
||||
event_payload=payload,
|
||||
url=_optional_str(payload.get("url")),
|
||||
title=_optional_str(payload.get("title")),
|
||||
created_at=_optional_str(payload.get("created_at")),
|
||||
)
|
||||
)
|
||||
if finish_reason and finish_reason != "stop":
|
||||
warnings.append(f"finish_reason={finish_reason}")
|
||||
return RunEvidence(
|
||||
run_id=run_id,
|
||||
session_id=session_id,
|
||||
output_text=output_text,
|
||||
finish_reason=finish_reason,
|
||||
transcript=transcript,
|
||||
tool_results=tool_results,
|
||||
warnings=warnings,
|
||||
)
|
||||
|
||||
|
||||
def render_task_evidence(packet: TaskEvidencePacket) -> str:
|
||||
sections = [
|
||||
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
|
||||
f"Final output:\n{packet.final_output}",
|
||||
]
|
||||
if packet.main_run is not None:
|
||||
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
|
||||
if packet.team_runs:
|
||||
sections.append(
|
||||
"Team run evidence:\n"
|
||||
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
|
||||
)
|
||||
if packet.team_node_results:
|
||||
lines = []
|
||||
for item in packet.team_node_results:
|
||||
lines.append(
|
||||
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
|
||||
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
|
||||
)
|
||||
sections.append("Team node results:\n" + "\n".join(lines))
|
||||
return "\n\n".join(section for section in sections if section.strip())
|
||||
|
||||
|
||||
def render_run_evidence(evidence: RunEvidence) -> str:
|
||||
lines = [
|
||||
f"run_id={evidence.run_id}",
|
||||
f"session_id={evidence.session_id}",
|
||||
f"finish_reason={evidence.finish_reason}",
|
||||
]
|
||||
if evidence.output_text:
|
||||
lines.append(f"output:\n{evidence.output_text}")
|
||||
if evidence.warnings:
|
||||
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
|
||||
if evidence.tool_results:
|
||||
lines.append(
|
||||
"tool_results:\n"
|
||||
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def _render_tool_evidence(item: ToolEvidence) -> str:
|
||||
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
|
||||
metadata = []
|
||||
if item.url:
|
||||
metadata.append(f"url={item.url}")
|
||||
if item.title:
|
||||
metadata.append(f"title={item.title}")
|
||||
if item.created_at:
|
||||
metadata.append(f"created_at={item.created_at}")
|
||||
return "\n".join([header, *metadata, item.content])
|
||||
|
||||
|
||||
def _optional_str(value: Any) -> str | None:
|
||||
return str(value) if value is not None else None
|
||||
@ -3,31 +3,63 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
from typing import Any, Literal
|
||||
|
||||
|
||||
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_revision"}
|
||||
ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
|
||||
|
||||
VALIDATION_STATUSES = {"accepted", "rejected", "insufficient_evidence", "validator_error"}
|
||||
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"}
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ValidationResult:
|
||||
passed: bool
|
||||
score: float
|
||||
status: ValidationStatus = "rejected"
|
||||
score: float = 0.0
|
||||
issues: list[str] = field(default_factory=list)
|
||||
missing_requirements: list[str] = field(default_factory=list)
|
||||
evidence_gaps: list[str] = field(default_factory=list)
|
||||
recommended_revision_prompt: str = ""
|
||||
validator: str = "heuristic"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
status: ValidationStatus | None = None,
|
||||
passed: bool | None = None,
|
||||
score: float = 0.0,
|
||||
issues: list[str] | None = None,
|
||||
missing_requirements: list[str] | None = None,
|
||||
evidence_gaps: list[str] | None = None,
|
||||
recommended_revision_prompt: str = "",
|
||||
validator: str = "heuristic",
|
||||
) -> None:
|
||||
if status is not None and status not in VALIDATION_STATUSES:
|
||||
raise ValueError(f"unknown validation status: {status}")
|
||||
self.status = status or ("accepted" if passed and score >= 0.75 else "rejected")
|
||||
self.score = max(0.0, min(1.0, float(score or 0.0)))
|
||||
self.issues = list(issues or [])
|
||||
self.missing_requirements = list(missing_requirements or [])
|
||||
self.evidence_gaps = list(evidence_gaps or [])
|
||||
self.recommended_revision_prompt = recommended_revision_prompt
|
||||
self.validator = validator
|
||||
|
||||
@property
|
||||
def passed(self) -> bool:
|
||||
return self.status == "accepted"
|
||||
|
||||
@property
|
||||
def accepted(self) -> bool:
|
||||
return self.passed and self.score >= 0.75
|
||||
return self.status == "accepted"
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"status": self.status,
|
||||
"passed": self.passed,
|
||||
"score": self.score,
|
||||
"issues": list(self.issues),
|
||||
"missing_requirements": list(self.missing_requirements),
|
||||
"evidence_gaps": list(self.evidence_gaps),
|
||||
"recommended_revision_prompt": self.recommended_revision_prompt,
|
||||
"validator": self.validator,
|
||||
"accepted": self.accepted,
|
||||
@ -37,11 +69,17 @@ class ValidationResult:
|
||||
def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None":
|
||||
if not isinstance(payload, dict):
|
||||
return None
|
||||
raw_status = payload.get("status")
|
||||
if "status" in payload and raw_status not in VALIDATION_STATUSES:
|
||||
raise ValueError(f"unknown validation status: {raw_status}")
|
||||
status: ValidationStatus | None = raw_status if "status" in payload else None
|
||||
return cls(
|
||||
passed=bool(payload.get("passed")),
|
||||
status=status,
|
||||
passed=bool(payload.get("passed")) if "status" not in payload else None,
|
||||
score=float(payload.get("score", 0.0) or 0.0),
|
||||
issues=[str(item) for item in payload.get("issues") or []],
|
||||
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
|
||||
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
|
||||
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
|
||||
validator=str(payload.get("validator") or "unknown"),
|
||||
)
|
||||
@ -73,6 +111,14 @@ class TaskRecord:
|
||||
def is_open(self) -> bool:
|
||||
return self.status in TASK_OPEN_STATUSES
|
||||
|
||||
@property
|
||||
def is_execution_active(self) -> bool:
|
||||
return self.status in {"running", "validating"}
|
||||
|
||||
@property
|
||||
def requires_user_action(self) -> bool:
|
||||
return self.status in {"awaiting_feedback", "needs_review", "needs_revision"}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return {
|
||||
"task_id": self.task_id,
|
||||
|
||||
@ -77,6 +77,8 @@ class TaskService:
|
||||
payload = task.to_dict()
|
||||
payload["short_title"] = self.ensure_short_title(task).metadata.get("short_title")
|
||||
payload["is_open"] = task.is_open
|
||||
payload["is_execution_active"] = task.is_execution_active
|
||||
payload["requires_user_action"] = task.requires_user_action
|
||||
return payload
|
||||
|
||||
def ensure_short_title(self, task: TaskRecord) -> TaskRecord:
|
||||
@ -108,10 +110,30 @@ class TaskService:
|
||||
self._event(task, "run_completed", run_id=run_id, payload={"skill_names": skill_names or []})
|
||||
return task
|
||||
|
||||
def record_validation(self, task_id: str, run_id: str, validation: ValidationResult) -> TaskRecord:
|
||||
def record_validation(
|
||||
self,
|
||||
task_id: str,
|
||||
run_id: str,
|
||||
validation: ValidationResult,
|
||||
*,
|
||||
final_attempt: bool = True,
|
||||
has_usable_answer: bool = True,
|
||||
) -> TaskRecord:
|
||||
task = self._require(task_id)
|
||||
task.status = "awaiting_feedback"
|
||||
task.updated_at = self._now()
|
||||
now = self._now()
|
||||
if validation.status == "accepted":
|
||||
task.status = "awaiting_feedback"
|
||||
elif validation.status in {"insufficient_evidence", "validator_error"}:
|
||||
task.status = "needs_review"
|
||||
elif validation.status == "rejected" and not final_attempt:
|
||||
task.status = "needs_revision"
|
||||
elif validation.status == "rejected" and has_usable_answer:
|
||||
task.status = "needs_review"
|
||||
else:
|
||||
task.status = "failed"
|
||||
task.closed_at = now
|
||||
task.close_reason = "automatic validation rejected the final attempt"
|
||||
task.updated_at = now
|
||||
task.validation_result = validation.to_dict()
|
||||
self.store.upsert_task(task)
|
||||
self._event(task, "validated", run_id=run_id, payload=validation.to_dict())
|
||||
|
||||
@ -17,6 +17,8 @@ class ValidationService:
|
||||
task: TaskRecord,
|
||||
user_message: str,
|
||||
final_output: str,
|
||||
evidence_packet: Any | None = None,
|
||||
evidence_text: str = "",
|
||||
transcript_excerpt: str = "",
|
||||
tool_summaries: list[str] | None = None,
|
||||
team_summaries: list[str] | None = None,
|
||||
@ -36,19 +38,20 @@ class ValidationService:
|
||||
task=task,
|
||||
user_message=user_message,
|
||||
final_output=final_output,
|
||||
evidence_text=evidence_text,
|
||||
transcript_excerpt=transcript_excerpt,
|
||||
tool_summaries=tool_summaries or [],
|
||||
team_summaries=team_summaries or [],
|
||||
)
|
||||
except Exception as exc:
|
||||
return ValidationResult(
|
||||
passed=False,
|
||||
status="validator_error",
|
||||
score=0.0,
|
||||
issues=[f"Validator failed: {exc}"],
|
||||
missing_requirements=["A valid automatic validation result is required before accepting the task."],
|
||||
evidence_gaps=["Automatic validation failed before producing a reliable decision."],
|
||||
missing_requirements=["User review is required because automatic validation failed."],
|
||||
recommended_revision_prompt=(
|
||||
"Review the task result again because automatic validation failed, "
|
||||
"then provide a corrected final answer that explicitly satisfies the task goal."
|
||||
"Review the answer and evidence, then decide whether to revise or accept it."
|
||||
),
|
||||
validator="llm_error",
|
||||
)
|
||||
@ -62,20 +65,25 @@ class ValidationService:
|
||||
task: TaskRecord,
|
||||
user_message: str,
|
||||
final_output: str,
|
||||
evidence_text: str,
|
||||
transcript_excerpt: str,
|
||||
tool_summaries: list[str],
|
||||
team_summaries: list[str],
|
||||
) -> ValidationResult:
|
||||
legacy_context = "" if evidence_text else (
|
||||
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
|
||||
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
|
||||
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
|
||||
)
|
||||
prompt = (
|
||||
"Validate whether the assistant output satisfies the task. "
|
||||
"Return only compact JSON with keys: passed, score, issues, "
|
||||
"missing_requirements, recommended_revision_prompt.\n\n"
|
||||
f"Task goal:\n{task.goal}\n\n"
|
||||
f"Current user request:\n{user_message}\n\n"
|
||||
f"Transcript excerpt:\n{transcript_excerpt[:2500]}\n\n"
|
||||
f"Tool summaries:\n{json.dumps(tool_summaries[:12], ensure_ascii=False)}\n\n"
|
||||
f"Team summaries:\n{json.dumps(team_summaries[:12], ensure_ascii=False)}\n\n"
|
||||
f"Assistant final output:\n{final_output[:4000]}"
|
||||
f"Evidence packet:\n{evidence_text}\n\n"
|
||||
f"{legacy_context}"
|
||||
f"Assistant final output:\n{final_output}"
|
||||
)
|
||||
response = await provider.chat(
|
||||
messages=[
|
||||
@ -88,11 +96,19 @@ class ValidationService:
|
||||
temperature=0.0,
|
||||
)
|
||||
payload = self._parse_json_object(response.content or "")
|
||||
status = payload.get("status")
|
||||
if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}:
|
||||
status = (
|
||||
"accepted"
|
||||
if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75
|
||||
else "rejected"
|
||||
)
|
||||
return ValidationResult(
|
||||
passed=bool(payload.get("passed")),
|
||||
status=status,
|
||||
score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))),
|
||||
issues=[str(item) for item in payload.get("issues") or []],
|
||||
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
|
||||
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
|
||||
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
|
||||
validator="llm",
|
||||
)
|
||||
|
||||
@ -45,6 +45,18 @@ class RecordingProvider(LLMProvider):
|
||||
return "stub-model"
|
||||
|
||||
|
||||
class BlockingProvider(RecordingProvider):
|
||||
def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None:
|
||||
super().__init__([_response(content)])
|
||||
self.started = started
|
||||
self.release = release
|
||||
|
||||
async def chat(self, *args, **kwargs) -> LLMResponse:
|
||||
self.started.set()
|
||||
await self.release.wait()
|
||||
return await super().chat(*args, **kwargs)
|
||||
|
||||
|
||||
class StubSkillAssembler:
|
||||
def __init__(self, activated_skills: list[SkillContext] | None = None) -> None:
|
||||
self.activated_skills = list(activated_skills or [])
|
||||
@ -153,6 +165,26 @@ def test_local_agent_runner_uses_shared_loop_and_records_parent_task(tmp_path: P
|
||||
assert child_session["parent_session_id"] == "session-root"
|
||||
|
||||
|
||||
def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None:
|
||||
loop = _loop(tmp_path)
|
||||
provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")])
|
||||
envelope = DelegationEnvelope(
|
||||
parent_task_id="task-parent",
|
||||
parent_session_id="session-root",
|
||||
parent_run_id="run-root",
|
||||
agent=AgentDescriptor(name="researcher", role="research"),
|
||||
task="research the requested topic",
|
||||
node_id="research",
|
||||
)
|
||||
|
||||
result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider)))
|
||||
|
||||
assert result.success is False
|
||||
assert result.evidence is not None
|
||||
assert result.evidence.output_text == "partial evidence"
|
||||
assert result.evidence.finish_reason == "max_tool_iterations"
|
||||
|
||||
|
||||
def test_pinned_skill_is_injected_into_delegated_run(tmp_path: Path) -> None:
|
||||
_publish_skill(
|
||||
tmp_path,
|
||||
@ -278,6 +310,57 @@ def test_team_parallel_runs_all_nodes(tmp_path: Path) -> None:
|
||||
assert [item.output_text for item in result.node_results] == ["one", "two", "three"]
|
||||
|
||||
|
||||
def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None:
|
||||
loop = _loop(tmp_path)
|
||||
first_started = asyncio.Event()
|
||||
second_started = asyncio.Event()
|
||||
release = asyncio.Event()
|
||||
providers = {
|
||||
"one": BlockingProvider("one", first_started, release),
|
||||
"two": BlockingProvider("two", second_started, release),
|
||||
}
|
||||
graph = ExecutionGraph(
|
||||
strategy="parallel",
|
||||
nodes=[
|
||||
ExecutionNode("one", "task one", AgentDescriptor(name="one")),
|
||||
ExecutionNode("two", "task two", AgentDescriptor(name="two")),
|
||||
],
|
||||
)
|
||||
|
||||
async def run_case():
|
||||
loop_task = asyncio.create_task(loop.run())
|
||||
await asyncio.sleep(0)
|
||||
task = asyncio.create_task(
|
||||
TeamService(loop).run_team(
|
||||
graph,
|
||||
parent_task_id=None,
|
||||
parent_session_id="session-root",
|
||||
parent_run_id="run-root",
|
||||
provider_bundle_factory=lambda node: _bundle(providers[node.node_id]),
|
||||
)
|
||||
)
|
||||
try:
|
||||
await asyncio.wait_for(first_started.wait(), timeout=1)
|
||||
await asyncio.wait_for(second_started.wait(), timeout=1)
|
||||
release.set()
|
||||
return await task
|
||||
finally:
|
||||
release.set()
|
||||
if not task.done():
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
await loop.stop()
|
||||
await loop_task
|
||||
|
||||
result = asyncio.run(run_case())
|
||||
|
||||
assert result.success is True
|
||||
assert [item.node_id for item in result.node_results] == ["one", "two"]
|
||||
|
||||
|
||||
def test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs(tmp_path: Path) -> None:
|
||||
loop = _loop(tmp_path)
|
||||
loaded = loop.boot()
|
||||
@ -438,7 +521,7 @@ def test_team_summary_lists_only_failed_nodes_when_all_nodes_fail(tmp_path: Path
|
||||
)
|
||||
|
||||
assert result.success is False
|
||||
assert result.summary == "Failed nodes:\n- one: one down\n- two: two down"
|
||||
assert result.summary == "Failed nodes:\n- one: one down evidence=no\n- two: two down evidence=no"
|
||||
|
||||
|
||||
def test_graph_structure_errors_still_raise(tmp_path: Path) -> None:
|
||||
|
||||
@ -608,6 +608,12 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
|
||||
provider_name="stub",
|
||||
model="stub-model",
|
||||
),
|
||||
LLMResponse(
|
||||
content="Based on the available tool result, the container likely failed during startup.",
|
||||
finish_reason="stop",
|
||||
provider_name="stub",
|
||||
model="stub-model",
|
||||
),
|
||||
]
|
||||
),
|
||||
)
|
||||
@ -621,7 +627,29 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
|
||||
)
|
||||
loaded = loop.boot()
|
||||
|
||||
assert result.finish_reason == "max_tool_iterations"
|
||||
assert result.finish_reason == "max_tool_iterations_finalized"
|
||||
assert "Based on the available tool result" in result.output_text
|
||||
assert "Tool loop stopped" not in result.output_text
|
||||
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
|
||||
assert effect_records[-1].run_id == result.run_id
|
||||
assert effect_records[-1].success is False
|
||||
|
||||
|
||||
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
|
||||
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler([])))
|
||||
bundle = ProviderBundle(
|
||||
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
|
||||
main_provider=StubProvider(
|
||||
[LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]
|
||||
),
|
||||
)
|
||||
|
||||
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
|
||||
loaded = loop.boot()
|
||||
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
|
||||
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
|
||||
|
||||
assert "message_count" in snapshot.event_payload
|
||||
assert "tool_names" in snapshot.event_payload
|
||||
assert "messages" not in snapshot.event_payload
|
||||
assert "tools" not in snapshot.event_payload
|
||||
|
||||
91
app-instance/backend/tests/unit/test_task_evidence.py
Normal file
91
app-instance/backend/tests/unit/test_task_evidence.py
Normal file
@ -0,0 +1,91 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
from beaver.engine.session.manager import SessionManager
|
||||
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
|
||||
|
||||
|
||||
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
|
||||
session_manager = SessionManager(tmp_path)
|
||||
session_id = "session-1"
|
||||
run_id = "run-1"
|
||||
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
|
||||
session_manager.ensure_session(session_id, source="test")
|
||||
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
|
||||
session_manager.append_message(
|
||||
session_id,
|
||||
run_id=run_id,
|
||||
role="tool",
|
||||
event_type="tool_result_recorded",
|
||||
event_payload={"success": True, "url": "https://example.test/match"},
|
||||
content=long_content,
|
||||
tool_name="web_fetch",
|
||||
tool_call_id="call-1",
|
||||
)
|
||||
session_manager.append_message(
|
||||
session_id,
|
||||
run_id=run_id,
|
||||
role="system",
|
||||
event_type="run_completed",
|
||||
event_payload={"finish_reason": "stop"},
|
||||
content="Manchester United won 3-2.",
|
||||
finish_reason="stop",
|
||||
context_visible=False,
|
||||
)
|
||||
|
||||
evidence = EvidenceBuilder(session_manager).build_run_evidence(
|
||||
session_id,
|
||||
run_id,
|
||||
"Manchester United won 3-2.",
|
||||
"stop",
|
||||
)
|
||||
rendered = render_task_evidence(
|
||||
TaskEvidencePacket(
|
||||
task_id="task-1",
|
||||
attempt_index=1,
|
||||
main_run=evidence,
|
||||
team_runs=[],
|
||||
team_node_results=[],
|
||||
final_output="Manchester United won 3-2.",
|
||||
)
|
||||
)
|
||||
|
||||
assert evidence.tool_results[0].content == long_content
|
||||
assert "MAN 3 FT 2 NFO" in rendered
|
||||
assert "https://example.test/match" in rendered
|
||||
|
||||
|
||||
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
|
||||
run = RunEvidence(
|
||||
run_id="run-team",
|
||||
session_id="session-team",
|
||||
output_text="Tool loop stopped.",
|
||||
finish_reason="max_tool_iterations",
|
||||
transcript=[],
|
||||
tool_results=[
|
||||
ToolEvidence(
|
||||
tool_name="web_fetch",
|
||||
tool_call_id="call-team",
|
||||
content="Recovered partial source content.",
|
||||
event_payload={"success": True, "created_at": "2026-05-22T12:00:00Z"},
|
||||
created_at="2026-05-22T12:00:00Z",
|
||||
)
|
||||
],
|
||||
warnings=["finish_reason=max_tool_iterations"],
|
||||
)
|
||||
packet = TaskEvidencePacket(
|
||||
task_id="task-1",
|
||||
attempt_index=2,
|
||||
main_run=None,
|
||||
team_runs=[run],
|
||||
team_node_results=[],
|
||||
final_output="partial answer",
|
||||
)
|
||||
|
||||
rendered = render_task_evidence(packet)
|
||||
|
||||
assert "finish_reason=max_tool_iterations" in rendered
|
||||
assert "partial answer" in rendered
|
||||
assert "Recovered partial source content." in rendered
|
||||
assert "created_at=2026-05-22T12:00:00Z" in rendered
|
||||
@ -13,14 +13,14 @@ from beaver.engine.providers.base import LLMProvider, LLMResponse
|
||||
from beaver.engine.providers.factory import ProviderBundle
|
||||
from beaver.services.agent_service import AgentService
|
||||
from beaver.skills.assembler import SkillAssemblyResult
|
||||
from beaver.tasks import TaskExecutionPlan, TaskService, ValidationResult, ValidationService
|
||||
from beaver.tasks import TaskExecutionPlan, TaskRecord, TaskService, ValidationResult, ValidationService
|
||||
|
||||
|
||||
class StubProvider(LLMProvider):
|
||||
def __init__(self, responses: list[LLMResponse]) -> None:
|
||||
super().__init__()
|
||||
self._responses = list(responses)
|
||||
self.calls: list[list[dict]] = []
|
||||
self.calls: list[dict[str, object]] = []
|
||||
|
||||
async def chat(
|
||||
self,
|
||||
@ -30,7 +30,7 @@ class StubProvider(LLMProvider):
|
||||
max_tokens: int = 4096,
|
||||
temperature: float = 0.7,
|
||||
) -> LLMResponse:
|
||||
self.calls.append(messages)
|
||||
self.calls.append({"messages": messages, "tools": tools, "model": model})
|
||||
if not self._responses:
|
||||
raise AssertionError("No stubbed provider responses left")
|
||||
return self._responses.pop(0)
|
||||
@ -42,8 +42,10 @@ class StubProvider(LLMProvider):
|
||||
class StubValidationService:
|
||||
def __init__(self, results: list[ValidationResult]) -> None:
|
||||
self.results = list(results)
|
||||
self.calls: list[dict] = []
|
||||
|
||||
async def validate_task_result(self, **kwargs) -> ValidationResult:
|
||||
self.calls.append(kwargs)
|
||||
if not self.results:
|
||||
raise AssertionError("No stubbed validation results left")
|
||||
return self.results.pop(0)
|
||||
@ -153,6 +155,21 @@ def _main_only_bundle(*responses: str) -> ProviderBundle:
|
||||
)
|
||||
|
||||
|
||||
def _task_record(status: str) -> TaskRecord:
|
||||
return TaskRecord(
|
||||
task_id="task-1",
|
||||
session_id="session-1",
|
||||
description="test task",
|
||||
goal="test task",
|
||||
constraints=[],
|
||||
priority=0,
|
||||
status=status,
|
||||
creator="main-agent",
|
||||
created_at="2026-05-22T00:00:00+00:00",
|
||||
updated_at="2026-05-22T00:00:00+00:00",
|
||||
)
|
||||
|
||||
|
||||
def test_simple_question_does_not_create_task(tmp_path: Path) -> None:
|
||||
service = AgentService(
|
||||
loader=EngineLoader(
|
||||
@ -393,6 +410,81 @@ def test_explicit_revision_feedback_then_input_reruns_without_duplicate_feedback
|
||||
assert task.feedback[0]["comment"] == "准备补充穿衣建议"
|
||||
|
||||
|
||||
def test_validation_result_status_drives_accepted_and_passed() -> None:
|
||||
accepted = ValidationResult(status="accepted", score=0.9, validator="test")
|
||||
insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test")
|
||||
rejected = ValidationResult(status="rejected", score=0.9, validator="test")
|
||||
|
||||
assert accepted.passed is True
|
||||
assert accepted.accepted is True
|
||||
assert insufficient.passed is False
|
||||
assert insufficient.accepted is False
|
||||
assert rejected.passed is False
|
||||
assert rejected.accepted is False
|
||||
|
||||
|
||||
def test_validation_result_from_legacy_payload_maps_to_status() -> None:
|
||||
accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"})
|
||||
low_score = ValidationResult.from_dict({"passed": True, "score": 0.7, "validator": "legacy"})
|
||||
rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"})
|
||||
|
||||
assert accepted is not None
|
||||
assert accepted.status == "accepted"
|
||||
assert low_score is not None
|
||||
assert low_score.status == "rejected"
|
||||
assert rejected is not None
|
||||
assert rejected.status == "rejected"
|
||||
|
||||
|
||||
def test_validation_result_rejects_unknown_status() -> None:
|
||||
with pytest.raises(ValueError, match="unknown validation status"):
|
||||
ValidationResult(status="pending", score=0.9, validator="test") # type: ignore[arg-type]
|
||||
|
||||
|
||||
def test_validation_result_from_dict_rejects_unknown_explicit_status() -> None:
|
||||
with pytest.raises(ValueError, match="unknown validation status"):
|
||||
ValidationResult.from_dict({"status": "pending", "passed": True, "score": 0.9})
|
||||
|
||||
|
||||
def test_validation_result_evidence_gaps_round_trip() -> None:
|
||||
validation = ValidationResult(
|
||||
status="insufficient_evidence",
|
||||
score=0.4,
|
||||
evidence_gaps=["missing command output", "missing file reference"],
|
||||
validator="test",
|
||||
)
|
||||
|
||||
restored = ValidationResult.from_dict(validation.to_dict())
|
||||
|
||||
assert restored is not None
|
||||
assert restored.status == "insufficient_evidence"
|
||||
assert restored.evidence_gaps == ["missing command output", "missing file reference"]
|
||||
assert restored.to_dict()["evidence_gaps"] == ["missing command output", "missing file reference"]
|
||||
|
||||
|
||||
def test_task_record_status_helpers_distinguish_review_and_failed() -> None:
|
||||
needs_review = _task_record("needs_review")
|
||||
failed = _task_record("failed")
|
||||
|
||||
assert needs_review.is_open is True
|
||||
assert needs_review.is_execution_active is False
|
||||
assert needs_review.requires_user_action is True
|
||||
assert failed.is_open is False
|
||||
assert failed.is_execution_active is False
|
||||
assert failed.requires_user_action is False
|
||||
|
||||
|
||||
def test_task_service_api_payload_emits_status_helpers(tmp_path: Path) -> None:
|
||||
service = TaskService(tmp_path)
|
||||
task = _task_record("needs_review")
|
||||
|
||||
payload = service.to_api_dict(task)
|
||||
|
||||
assert payload["is_open"] is True
|
||||
assert payload["is_execution_active"] is False
|
||||
assert payload["requires_user_action"] is True
|
||||
|
||||
|
||||
def test_validation_failure_retries_once(tmp_path: Path) -> None:
|
||||
service = AgentService(
|
||||
loader=EngineLoader(
|
||||
@ -616,10 +708,45 @@ def test_task_mode_team_plan_runs_subagent_then_main_synthesis(tmp_path: Path) -
|
||||
assert result.run_id == task.run_ids[-1]
|
||||
assert any(event.event_type == "task_execution_planned" for event in events)
|
||||
assert any(event.event_type == "task_team_run_completed" for event in events)
|
||||
assert "sub-agent evidence" in main_provider.calls[0][0]["content"]
|
||||
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
|
||||
assert "sub-agent evidence" != result.output_text
|
||||
|
||||
|
||||
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
|
||||
main_provider = StubProvider(
|
||||
[
|
||||
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
|
||||
]
|
||||
)
|
||||
sub_provider = StubProvider(
|
||||
[
|
||||
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
|
||||
]
|
||||
)
|
||||
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
|
||||
service = AgentService(
|
||||
loader=EngineLoader(
|
||||
workspace=tmp_path,
|
||||
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
|
||||
validation_service=validation,
|
||||
)
|
||||
)
|
||||
|
||||
result = asyncio.run(
|
||||
service.process_direct(
|
||||
"implement team-backed workflow",
|
||||
session_id="web:team-no-tools",
|
||||
provider_bundle=_provider_bundle(main_provider),
|
||||
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
|
||||
)
|
||||
)
|
||||
|
||||
assert result.output_text == "final synthesized answer"
|
||||
assert main_provider.calls[0]["tools"] is None
|
||||
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
|
||||
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
|
||||
|
||||
|
||||
def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None:
|
||||
main_provider = StubProvider(
|
||||
[
|
||||
@ -647,9 +774,48 @@ def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> Non
|
||||
|
||||
assert result.output_text == "fallback synthesized answer"
|
||||
assert any(event.event_type == "task_team_run_failed" for event in events)
|
||||
assert "sub-agent unavailable" in main_provider.calls[0][0]["content"]
|
||||
assert "same class of tools fails repeatedly" in main_provider.calls[0][0]["content"]
|
||||
assert "user-visible fallback answer" in main_provider.calls[0][0]["content"]
|
||||
assert "sub-agent unavailable" in main_provider.calls[0]["messages"][0]["content"]
|
||||
assert "same class of tools fails repeatedly" in main_provider.calls[0]["messages"][0]["content"]
|
||||
assert "user-visible fallback answer" in main_provider.calls[0]["messages"][0]["content"]
|
||||
|
||||
|
||||
def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None:
|
||||
service = AgentService(
|
||||
loader=EngineLoader(
|
||||
workspace=tmp_path,
|
||||
task_execution_planner=_single_planner(),
|
||||
validation_service=StubValidationService(
|
||||
[
|
||||
ValidationResult(
|
||||
status="insufficient_evidence",
|
||||
score=0.4,
|
||||
evidence_gaps=["source missing"],
|
||||
validator="test",
|
||||
)
|
||||
]
|
||||
),
|
||||
)
|
||||
)
|
||||
|
||||
result = asyncio.run(
|
||||
service.process_direct(
|
||||
"answer with uncertain evidence",
|
||||
session_id="web:needs-review",
|
||||
provider_bundle=_bundle("possible answer"),
|
||||
)
|
||||
)
|
||||
loaded = service.create_loop().boot()
|
||||
task = loaded.task_service.get_task(result.task_id)
|
||||
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
|
||||
validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted")
|
||||
|
||||
assert task is not None
|
||||
assert task.status == "needs_review"
|
||||
assert task.requires_user_action is True
|
||||
assert task.is_execution_active is False
|
||||
assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence"
|
||||
assert validation_event.event_payload["retry_scheduled"] is False
|
||||
assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0
|
||||
|
||||
|
||||
def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None:
|
||||
@ -763,5 +929,6 @@ def test_llm_validator_parse_failure_is_not_accepted(tmp_path: Path) -> None:
|
||||
)
|
||||
|
||||
assert validation.accepted is False
|
||||
assert validation.status == "validator_error"
|
||||
assert validation.validator == "llm_error"
|
||||
assert validation.issues
|
||||
|
||||
1528
docs/superpowers/plans/2026-05-22-task-evidence-validation.md
Normal file
1528
docs/superpowers/plans/2026-05-22-task-evidence-validation.md
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,265 @@
|
||||
# Task Evidence and Validation Redesign
|
||||
|
||||
Date: 2026-05-22
|
||||
|
||||
## Context
|
||||
|
||||
Two recent task runs exposed the same underlying weakness from different angles:
|
||||
|
||||
- The agent can use complete tool results, but validation only receives truncated excerpts. A key fact can be present in the run log yet absent from validation context, causing a false rejection.
|
||||
- Team execution can gather useful evidence in sub-agent runs, but that evidence is not reliably carried into final synthesis or validation. Failed team nodes are especially lossy.
|
||||
- Team graphs marked `parallel` are currently scheduled through a shared single-consumer `AgentLoop`, so production execution can be effectively serial.
|
||||
- Final synthesis after a team run still has full tools available, so it can repeat searches instead of synthesizing from team evidence.
|
||||
- `max_tool_iterations` stops the tool loop with a placeholder message instead of forcing a final answer from already gathered evidence.
|
||||
- Validation failures enter an open-looking state, which makes the UI feel like the task never completed.
|
||||
|
||||
The selected approach is a medium refactor: keep the existing `AgentService`, `TeamService`, and `AgentLoop` structure, but add a structured evidence pipeline, clearer validation semantics, finite team concurrency, no-tools synthesis after team runs, and explicit task states.
|
||||
|
||||
## Goals
|
||||
|
||||
- Preserve complete run evidence for synthesis and validation.
|
||||
- Stop using fixed truncation for validation inputs.
|
||||
- Distinguish "answer is contradicted" from "validator lacks enough evidence".
|
||||
- Let user feedback be the final business judgment after an answer is shown.
|
||||
- Make `parallel` team execution actually concurrent within a bounded limit.
|
||||
- Prevent final synthesis from repeating team tool work by default.
|
||||
- Produce a useful final answer when tool iteration limits are reached.
|
||||
- Add enough debug metadata to diagnose validation decisions without reconstructing SQLite logs by hand.
|
||||
|
||||
## Non-Goals
|
||||
|
||||
- Rewriting the whole execution runtime.
|
||||
- Introducing a distributed worker pool.
|
||||
- Building a generic evidence bus for every future subsystem.
|
||||
- Solving all provider rate-limit and storage concurrency concerns beyond the bounded local concurrency needed for team parallel nodes.
|
||||
|
||||
## Validation Semantics and Task States
|
||||
|
||||
Automatic validation becomes advisory evidence assessment, not the final user satisfaction signal.
|
||||
|
||||
Validation results should include:
|
||||
|
||||
```python
|
||||
status: Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
|
||||
passed: bool
|
||||
score: float
|
||||
issues: list[str]
|
||||
missing_requirements: list[str]
|
||||
evidence_gaps: list[str]
|
||||
recommended_revision_prompt: str
|
||||
```
|
||||
|
||||
`status` is the business decision field. `passed` is a compatibility boolean derived from `status`, not an independent source of truth. The mapping is:
|
||||
|
||||
- `status == "accepted"` -> `passed=True`
|
||||
- `status in {"rejected", "insufficient_evidence", "validator_error"}` -> `passed=False`
|
||||
|
||||
Task mode, retry, and status transition logic must branch on `status`. New code treats `status == "accepted"` as the acceptance condition. Existing compatibility paths may continue to interpret acceptance as `passed and score >= 0.75` until they are migrated, but new logic should not derive status from `passed` or infer failure from `passed=False` alone.
|
||||
|
||||
Rules:
|
||||
|
||||
- `accepted`: the final answer is supported by available evidence and satisfies the task. The task enters `awaiting_feedback`.
|
||||
- `insufficient_evidence`: the validator cannot confirm the answer from available evidence. It must not claim fabrication or contradiction. The task enters `needs_review`.
|
||||
- `validator_error`: the validator failed to produce a reliable decision. The task enters `needs_review`.
|
||||
- `rejected`: the evidence clearly contradicts the answer, or the answer clearly misses the task. The first attempt can trigger retry. The last attempt enters `failed` only when there is no usable answer; otherwise it enters `needs_review`.
|
||||
|
||||
Task statuses:
|
||||
|
||||
- `open`: task exists but has not started.
|
||||
- `running`: execution is active.
|
||||
- `validating`: final answer exists and automatic validation is running.
|
||||
- `awaiting_feedback`: answer is available and automatic validation accepted it.
|
||||
- `needs_review`: answer is available, but automatic validation could not confirm it or hit a validator error.
|
||||
- `needs_revision`: user requested revision, or automatic validation rejected an attempt that can still be retried.
|
||||
- `failed`: execution ended without a usable answer.
|
||||
- `closed`: user marked the answer satisfied.
|
||||
- `abandoned`: user abandoned the task.
|
||||
|
||||
`needs_review` remains an open status for the active task API, but the UI should distinguish it from `running`. `failed`, `closed`, and `abandoned` are terminal.
|
||||
|
||||
Open status does not mean auto-runnable. The backend should split status semantics:
|
||||
|
||||
- `is_open`: the task can still receive user feedback or revision.
|
||||
- `is_execution_active`: the backend is currently running or validating work.
|
||||
- `requires_user_action`: the task has stopped automatic execution and needs user input.
|
||||
|
||||
`needs_review` should have `is_open=True`, `is_execution_active=False`, and `requires_user_action=True`. Schedulers, automatic retry loops, and active-task polling must not treat `needs_review` as a reason to continue execution. It should appear in the active task API only so the user can review, mark satisfied, revise, or abandon.
|
||||
|
||||
User feedback is authoritative:
|
||||
|
||||
- `satisfied` closes the task.
|
||||
- `revise` moves the task to `needs_revision`.
|
||||
- `abandon` moves the task to `abandoned`.
|
||||
|
||||
## Evidence Models
|
||||
|
||||
Add structured evidence models in the task or coordinator layer.
|
||||
|
||||
```python
|
||||
@dataclass(slots=True)
|
||||
class ToolEvidence:
|
||||
tool_name: str
|
||||
tool_call_id: str | None
|
||||
content: str
|
||||
event_payload: dict[str, Any]
|
||||
url: str | None = None
|
||||
title: str | None = None
|
||||
created_at: str | None = None
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RunEvidence:
|
||||
run_id: str
|
||||
session_id: str
|
||||
output_text: str
|
||||
finish_reason: str
|
||||
transcript: list[dict[str, Any]]
|
||||
tool_results: list[ToolEvidence]
|
||||
warnings: list[str]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class TaskEvidencePacket:
|
||||
task_id: str
|
||||
attempt_index: int
|
||||
main_run: RunEvidence | None
|
||||
team_runs: list[RunEvidence]
|
||||
team_node_results: list[NodeRunResult]
|
||||
final_output: str
|
||||
```
|
||||
|
||||
`llm_request_snapshotted` events are debug material, not task evidence. They may be referenced in validation debug metadata, but validation should primarily consume transcript, tool results, team node outputs, and final output.
|
||||
|
||||
## Evidence Data Flow
|
||||
|
||||
1. `AgentLoop` continues to write session events as it does now.
|
||||
2. After a run completes, an evidence builder reads `session_manager.get_run_event_records(session_id, run_id)` and creates `RunEvidence`.
|
||||
3. `LocalAgentRunner.run()` attaches `RunEvidence` to `NodeRunResult`.
|
||||
4. `NodeRunResult` gains `evidence: RunEvidence | None`.
|
||||
5. `TeamRunResult` carries node evidence through `node_results`; it may also expose a convenience `run_evidence` list.
|
||||
6. `AgentService._run_task_mode()` builds a `TaskEvidencePacket` after team execution and final synthesis.
|
||||
7. Final synthesis receives a rendered evidence context built from the same packet.
|
||||
8. `ValidationService.validate_task_result()` receives the same packet and renders it into the validation prompt without fixed truncation.
|
||||
|
||||
Failed or partial nodes must still preserve evidence. A node with `finish_reason="max_tool_iterations"` can be unsuccessful while still carrying useful tool results.
|
||||
|
||||
## Final Synthesis Behavior
|
||||
|
||||
For team-backed task plans, final synthesis defaults to no tools:
|
||||
|
||||
```python
|
||||
include_tools = False
|
||||
max_tool_iterations = 0
|
||||
```
|
||||
|
||||
The synthesis prompt should instruct the main agent to:
|
||||
|
||||
- use team evidence as the source of truth;
|
||||
- avoid repeating failed or completed tool calls;
|
||||
- answer with available evidence;
|
||||
- clearly state missing or uncertain information.
|
||||
|
||||
The planner may explicitly allow a small synthesis tool budget, but the default is no-tools synthesis. If allowed, the budget should be small, such as `max_tool_iterations=1`.
|
||||
|
||||
## Tool Iteration Finalization
|
||||
|
||||
When a run reaches `max_tool_iterations` and the model still requests tools, the loop should not return `Tool loop stopped...` as the final user-visible answer.
|
||||
|
||||
Instead, the loop performs one no-tools finalization call:
|
||||
|
||||
- use the accumulated messages and tool results;
|
||||
- call the provider with `tools=None`;
|
||||
- add an instruction that the tool budget is exhausted and the model must answer from existing evidence;
|
||||
- mark the finish reason as `max_tool_iterations_finalized` or another explicit non-stop value;
|
||||
- return the finalization text as `output_text`.
|
||||
|
||||
If finalization itself fails or returns empty content, only then use a clear fallback message explaining that the run could not produce a usable answer.
|
||||
|
||||
## Limited Parallel Team Execution
|
||||
|
||||
`parallel` team nodes should run concurrently without rewriting the runtime.
|
||||
|
||||
Design:
|
||||
|
||||
- Keep sequence and DAG behavior on the shared loop where appropriate.
|
||||
- For `parallel` graph batches, run nodes through isolated `AgentLoop` instances.
|
||||
- Each isolated loop uses the same workspace and service configuration so session and run records remain queryable from the same stores.
|
||||
- Add `max_parallel_team_nodes`, default `3`.
|
||||
- Use an `asyncio.Semaphore` in the scheduler to bound concurrent nodes.
|
||||
- Return `TeamRunResult.node_results` in graph node order, not completion order.
|
||||
|
||||
The implementation should check shared store concurrency. If the current store is not safe for concurrent async writes, add a narrow lock around session/task/run store writes used by these parallel runs.
|
||||
|
||||
## Validation Prompt
|
||||
|
||||
The validation prompt should consume the full rendered evidence packet, without `[:2500]`, `[:500]`, or `[:12]` fixed caps.
|
||||
|
||||
Required validator instructions:
|
||||
|
||||
- Return only JSON with the validation fields.
|
||||
- If evidence is incomplete, return `insufficient_evidence`.
|
||||
- Only return `rejected` for clear contradiction or clear task failure.
|
||||
- Do not infer fabrication from missing evidence.
|
||||
- Do not claim a source lacks a fact unless the rendered evidence proves that absence.
|
||||
- Treat user feedback as the final business judgment outside automatic validation.
|
||||
|
||||
The validator should still be strict about answer quality when evidence is sufficient.
|
||||
|
||||
## Validation Debug Metadata
|
||||
|
||||
Each `task_validation_snapshotted` event should record:
|
||||
|
||||
- validation result;
|
||||
- validation status;
|
||||
- attempt index;
|
||||
- evidence run ids;
|
||||
- evidence session ids;
|
||||
- tool result count;
|
||||
- evidence character length;
|
||||
- validator raw response;
|
||||
- rendered validation input or prompt, unless a debug setting disables full prompt storage.
|
||||
|
||||
This makes future investigations direct: inspect the exact input the validator saw before interpreting its decision.
|
||||
|
||||
## Log Snapshot Size
|
||||
|
||||
`llm_request_snapshotted` currently stores complete messages and complete tool schemas in both payload and content. That makes logs large and slows inspection.
|
||||
|
||||
Default behavior should change to store a compact payload:
|
||||
|
||||
- iteration;
|
||||
- provider name and model;
|
||||
- message count;
|
||||
- tool names;
|
||||
- message character length;
|
||||
- tool schema character length;
|
||||
- max tokens, temperature, thinking flag.
|
||||
|
||||
Full request snapshots should be controlled by a debug config flag. This does not reduce validation evidence because evidence comes from transcript and tool result events.
|
||||
|
||||
## Testing Plan
|
||||
|
||||
Add or update focused unit tests:
|
||||
|
||||
1. Validation evidence is not fixed-truncated. A fact after the first 500 characters of a tool result still appears in the validator input.
|
||||
2. Missing evidence returns `insufficient_evidence` and moves the task to `needs_review`, not `failed`.
|
||||
3. A team node that ends with `max_tool_iterations` preserves tool evidence in `NodeRunResult.evidence`.
|
||||
4. Team final synthesis defaults to `tools=None` and receives rendered team evidence.
|
||||
5. Parallel team nodes start concurrently under a bounded semaphore and results remain in graph order.
|
||||
6. Tool loop finalization produces a user-visible answer instead of the placeholder stop message.
|
||||
7. Status transitions cover `accepted -> awaiting_feedback`, `insufficient_evidence -> needs_review`, `validator_error -> needs_review`, and terminal `failed`.
|
||||
8. Validation debug events include evidence metadata and validator raw response.
|
||||
|
||||
## Migration Notes
|
||||
|
||||
To reduce risk, implement in layers:
|
||||
|
||||
1. Add evidence models and builders without changing behavior.
|
||||
2. Attach evidence to team node results.
|
||||
3. Switch final synthesis for team plans to no-tools evidence synthesis.
|
||||
4. Switch validation to evidence packets and new statuses.
|
||||
5. Add no-tools finalization for tool iteration limits.
|
||||
6. Add limited isolated-loop parallel execution.
|
||||
7. Slim `llm_request_snapshotted` behind a debug flag.
|
||||
|
||||
This order keeps each change testable and lets the old transcript-summary path remain as a temporary fallback while evidence packets are introduced.
|
||||
Reference in New Issue
Block a user