feat(task): synthesize and validate from evidence

This commit is contained in:
2026-05-22 11:33:39 +08:00
parent 60605a74e0
commit 0adc04806c
4 changed files with 122 additions and 13 deletions

View File

@ -637,7 +637,7 @@ class AgentLoop:
while True:
chat_kwargs: dict[str, Any] = {
"messages": messages,
"tools": tool_schemas,
"tools": tool_schemas if include_tools else None,
"model": final_model,
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,

View File

@ -22,7 +22,16 @@ from beaver.engine import AgentLoop, AgentProfile, AgentRunResult, EngineLoader
from beaver.engine.providers import make_provider_bundle
from beaver.foundation.events import InboundMessage, OutboundMessage
from beaver.foundation.models import CronJob, CronRunRecord
from beaver.tasks import MainAgentRouter, TaskExecutionPlan, TaskRecord, ValidationResult
from beaver.tasks import (
EvidenceBuilder,
MainAgentRouter,
RunEvidence,
TaskEvidencePacket,
TaskExecutionPlan,
TaskRecord,
ValidationResult,
render_task_evidence,
)
NOTIFICATION_SESSION_ID = "notify:default:scheduled"
@ -715,6 +724,7 @@ class AgentService:
)
team_summaries: list[str] = []
team_execution_context = ""
team_result: TeamRunResult | None = None
if plan.is_team:
team_result, team_error = await self._run_team_for_task(
plan,
@ -725,7 +735,18 @@ class AgentService:
)
if team_result is not None:
team_summaries = [self._team_summary_for_validation(team_result)]
team_execution_context = self._team_execution_context(plan, team_result)
team_packet = TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=None,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results),
final_output="",
)
team_execution_context = self._join_context(
self._team_execution_context(plan, team_result),
"Rendered team evidence:\n" + render_task_evidence(team_packet),
)
self._append_task_observation(
session_manager,
task.session_id,
@ -782,6 +803,9 @@ class AgentService:
)
elif team_execution_context:
attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context)
if plan.is_team and team_execution_context:
attempt_kwargs["include_tools"] = False
attempt_kwargs["max_tool_iterations"] = 0
attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context(
task=task,
user_message=message,
@ -810,10 +834,20 @@ class AgentService:
result.run_id,
skill_names=self._skill_names_for_run(loaded, result.run_id),
)
evidence_packet = self._build_task_evidence_packet(
session_manager=session_manager,
task=task,
attempt_index=attempt_index,
result=result,
team_result=team_result,
)
evidence_text = render_task_evidence(evidence_packet)
validation = await validation_service.validate_task_result(
task=task,
user_message=message,
final_output=result.output_text,
evidence_packet=evidence_packet,
evidence_text=evidence_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries,
@ -1083,6 +1117,36 @@ class AgentService:
payloads.append(payload)
return payloads
@staticmethod
def _team_run_evidence(result: TeamRunResult | None) -> list[RunEvidence]:
if result is None:
return []
return [node.evidence for node in result.node_results if node.evidence is not None]
def _build_task_evidence_packet(
self,
*,
session_manager: Any,
task: TaskRecord,
attempt_index: int,
result: AgentRunResult,
team_result: TeamRunResult | None,
) -> TaskEvidencePacket:
main_run = EvidenceBuilder(session_manager).build_run_evidence(
result.session_id,
result.run_id,
result.output_text,
result.finish_reason,
)
return TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=main_run,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results) if team_result is not None else [],
final_output=result.output_text,
)
@staticmethod
def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str:
node_lines = [

View File

@ -17,6 +17,8 @@ class ValidationService:
task: TaskRecord,
user_message: str,
final_output: str,
evidence_packet: Any | None = None,
evidence_text: str = "",
transcript_excerpt: str = "",
tool_summaries: list[str] | None = None,
team_summaries: list[str] | None = None,
@ -36,6 +38,7 @@ class ValidationService:
task=task,
user_message=user_message,
final_output=final_output,
evidence_text=evidence_text,
transcript_excerpt=transcript_excerpt,
tool_summaries=tool_summaries or [],
team_summaries=team_summaries or [],
@ -62,20 +65,25 @@ class ValidationService:
task: TaskRecord,
user_message: str,
final_output: str,
evidence_text: str,
transcript_excerpt: str,
tool_summaries: list[str],
team_summaries: list[str],
) -> ValidationResult:
legacy_context = "" if evidence_text else (
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
)
prompt = (
"Validate whether the assistant output satisfies the task. "
"Return only compact JSON with keys: passed, score, issues, "
"missing_requirements, recommended_revision_prompt.\n\n"
f"Task goal:\n{task.goal}\n\n"
f"Current user request:\n{user_message}\n\n"
f"Transcript excerpt:\n{transcript_excerpt[:2500]}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries[:12], ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries[:12], ensure_ascii=False)}\n\n"
f"Assistant final output:\n{final_output[:4000]}"
f"Evidence packet:\n{evidence_text}\n\n"
f"{legacy_context}"
f"Assistant final output:\n{final_output}"
)
response = await provider.chat(
messages=[

View File

@ -20,7 +20,7 @@ class StubProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]) -> None:
super().__init__()
self._responses = list(responses)
self.calls: list[list[dict]] = []
self.calls: list[dict[str, object]] = []
async def chat(
self,
@ -30,7 +30,7 @@ class StubProvider(LLMProvider):
max_tokens: int = 4096,
temperature: float = 0.7,
) -> LLMResponse:
self.calls.append(messages)
self.calls.append({"messages": messages, "tools": tools, "model": model})
if not self._responses:
raise AssertionError("No stubbed provider responses left")
return self._responses.pop(0)
@ -42,8 +42,10 @@ class StubProvider(LLMProvider):
class StubValidationService:
def __init__(self, results: list[ValidationResult]) -> None:
self.results = list(results)
self.calls: list[dict] = []
async def validate_task_result(self, **kwargs) -> ValidationResult:
self.calls.append(kwargs)
if not self.results:
raise AssertionError("No stubbed validation results left")
return self.results.pop(0)
@ -706,10 +708,45 @@ def test_task_mode_team_plan_runs_subagent_then_main_synthesis(tmp_path: Path) -
assert result.run_id == task.run_ids[-1]
assert any(event.event_type == "task_execution_planned" for event in events)
assert any(event.event_type == "task_team_run_completed" for event in events)
assert "sub-agent evidence" in main_provider.calls[0][0]["content"]
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "sub-agent evidence" != result.output_text
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
main_provider = StubProvider(
[
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
sub_provider = StubProvider(
[
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
validation_service=validation,
)
)
result = asyncio.run(
service.process_direct(
"implement team-backed workflow",
session_id="web:team-no-tools",
provider_bundle=_provider_bundle(main_provider),
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
)
)
assert result.output_text == "final synthesized answer"
assert main_provider.calls[0]["tools"] is None
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None:
main_provider = StubProvider(
[
@ -737,9 +774,9 @@ def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> Non
assert result.output_text == "fallback synthesized answer"
assert any(event.event_type == "task_team_run_failed" for event in events)
assert "sub-agent unavailable" in main_provider.calls[0][0]["content"]
assert "same class of tools fails repeatedly" in main_provider.calls[0][0]["content"]
assert "user-visible fallback answer" in main_provider.calls[0][0]["content"]
assert "sub-agent unavailable" in main_provider.calls[0]["messages"][0]["content"]
assert "same class of tools fails repeatedly" in main_provider.calls[0]["messages"][0]["content"]
assert "user-visible fallback answer" in main_provider.calls[0]["messages"][0]["content"]
def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None: