18 Commits

Author SHA1 Message Date
030bce8a60 feat(litellm): 添加 reasoning_content 支持并强制禁用思考模式
- 在 LiteLLMProvider 中添加 "reasoning_content" 到允许的消息键集合中
- 修改 _apply_thinking_mode 方法以强制禁用思考模式,不再基于模型名称判断
- 总是设置 enable_thinking 为 False 并添加 thinking.type: disabled 配置
- 更新相关测试用例验证新的思考模式行为

fix(web): 修复非运行状态下的直接处理逻辑

- 创建 _run_web_direct 辅助函数来处理代理服务的直接提交/处理逻辑
- 当代理服务未运行时使用 process_direct 而不是 submit_direct
- 更新 REST 和 WebSocket 接口以使用新的处理逻辑
- 添加相应的单元测试验证非运行状态下使用直接处理

test(config): 添加代理配置重载功能的测试

- 添加 test_reload_agent_config_updates_booted_loop_config 测试函数
- 验证配置文件更新后代理循环能够正确加载新配置
- 测试模型、API 基础地址和 API 密钥的更新

chore(frontend): 默认禁用前端思考模式偏好

- 将前端思考模式存储的默认值从 true 改为 false
- 确保窗口未定义时返回 false 而不是 true
- 更新本地存储缺失时的默认行为为禁用思考模式
2026-05-22 17:43:21 +08:00
c671b66043 feat(frontend): restore session progress sidebar 2026-05-22 14:34:45 +08:00
e061961a79 merge agent team evidence validation work 2026-05-22 11:51:05 +08:00
8068d86760 chore(engine): compact llm request snapshots 2026-05-22 11:40:21 +08:00
4022db8887 feat(team): run parallel nodes with isolated loops 2026-05-22 11:39:17 +08:00
c53e221117 feat(engine): finalize after tool iteration limit 2026-05-22 11:37:02 +08:00
b808f5cbc2 feat(task): route validation status to review states 2026-05-22 11:35:46 +08:00
0adc04806c feat(task): synthesize and validate from evidence 2026-05-22 11:33:39 +08:00
60605a74e0 feat(team): preserve node run evidence 2026-05-22 11:30:19 +08:00
3ff2e2ce11 fix(task): complete evidence rendering contract 2026-05-22 11:28:19 +08:00
0ace09b984 feat(task): add structured run evidence 2026-05-22 11:15:10 +08:00
c3c4df306b fix(task): reject unknown validation status payloads 2026-05-22 11:04:28 +08:00
5446614828 test(task): strengthen validation status semantics 2026-05-22 11:00:53 +08:00
2fd618da9c feat(task): add validation status semantics 2026-05-22 10:55:45 +08:00
28a2627b1f docs: plan task evidence validation implementation 2026-05-22 10:47:03 +08:00
249087e943 docs: clarify validation acceptance compatibility 2026-05-22 10:41:23 +08:00
8bff282892 docs: clarify task validation status semantics 2026-05-22 10:35:30 +08:00
3a3e848a78 docs: design task evidence validation refactor 2026-05-22 10:30:35 +08:00
30 changed files with 3911 additions and 82 deletions

View File

@ -18,8 +18,9 @@ if TYPE_CHECKING:
class TeamGraphScheduler: class TeamGraphScheduler:
"""Execute sequence, parallel, and DAG team graphs.""" """Execute sequence, parallel, and DAG team graphs."""
def __init__(self, runner: LocalAgentRunner) -> None: def __init__(self, runner: LocalAgentRunner, *, max_parallel_team_nodes: int = 3) -> None:
self.runner = runner self.runner = runner
self.max_parallel_team_nodes = max(1, int(max_parallel_team_nodes))
async def run( async def run(
self, self,
@ -96,7 +97,18 @@ class TeamGraphScheduler:
nodes: list[ExecutionNode], nodes: list[ExecutionNode],
**kwargs, **kwargs,
) -> list[NodeRunResult]: ) -> list[NodeRunResult]:
return list(await asyncio.gather(*(self._run_node(node, dependency_outputs={}, **kwargs) for node in nodes))) semaphore = asyncio.Semaphore(self.max_parallel_team_nodes)
async def run_one(node: ExecutionNode) -> NodeRunResult:
async with semaphore:
return await self._run_node(
node,
dependency_outputs={},
execution_mode="isolated_loop",
**kwargs,
)
return list(await asyncio.gather(*(run_one(node) for node in nodes)))
async def _run_dag( async def _run_dag(
self, self,
@ -164,6 +176,7 @@ class TeamGraphScheduler:
inherited_pinned_skill_contexts: list["SkillContext"], inherited_pinned_skill_contexts: list["SkillContext"],
allow_candidate_generation: bool, allow_candidate_generation: bool,
dependency_outputs: dict[str, str], dependency_outputs: dict[str, str],
execution_mode: str = "shared_loop",
) -> NodeRunResult: ) -> NodeRunResult:
try: try:
pinned = self._merge_pinned(inherited_pinned_skills, node.inherited_pinned_skills) pinned = self._merge_pinned(inherited_pinned_skills, node.inherited_pinned_skills)
@ -189,6 +202,7 @@ class TeamGraphScheduler:
envelope, envelope,
provider_bundle=node_provider_bundle, provider_bundle=node_provider_bundle,
allow_candidate_generation=allow_candidate_generation, allow_candidate_generation=allow_candidate_generation,
execution_mode=execution_mode,
) )
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
@ -241,7 +255,7 @@ class TeamGraphScheduler:
failed = [item for item in results if not item.success] failed = [item for item in results if not item.success]
if failed: if failed:
failure_lines = [ failure_lines = [
f"- {item.node_id}: {item.error or item.finish_reason}" f"- {item.node_id}: {item.error or item.finish_reason} evidence={'yes' if item.evidence else 'no'}"
for item in failed for item in failed
] ]
summary_parts.append("Failed nodes:\n" + "\n".join(failure_lines)) summary_parts.append("Failed nodes:\n" + "\n".join(failure_lines))

View File

@ -6,6 +6,7 @@ from uuid import uuid4
from beaver.engine import AgentLoop from beaver.engine import AgentLoop
from beaver.engine.providers import ProviderBundle from beaver.engine.providers import ProviderBundle
from beaver.tasks.evidence import EvidenceBuilder
from .models import DelegationEnvelope, NodeRunResult from .models import DelegationEnvelope, NodeRunResult
@ -22,6 +23,7 @@ class LocalAgentRunner:
*, *,
provider_bundle: ProviderBundle | None = None, provider_bundle: ProviderBundle | None = None,
allow_candidate_generation: bool = False, allow_candidate_generation: bool = False,
execution_mode: str = "shared_loop",
) -> NodeRunResult: ) -> NodeRunResult:
if provider_bundle is not None and (envelope.agent.model or envelope.agent.provider_name): if provider_bundle is not None and (envelope.agent.model or envelope.agent.provider_name):
raise ValueError( raise ValueError(
@ -29,7 +31,14 @@ class LocalAgentRunner:
"build a node-specific provider bundle instead." "build a node-specific provider bundle instead."
) )
child_session_id = self._child_session_id(envelope) child_session_id = self._child_session_id(envelope)
runner = self.loop.submit_direct if self.loop.is_running else self.loop.process_direct target_loop = self.loop
if execution_mode == "isolated_loop":
target_loop = AgentLoop(profile=self.loop.profile, loader=self.loop.loader)
runner = (
target_loop.process_direct
if execution_mode == "isolated_loop"
else (self.loop.submit_direct if self.loop.is_running else self.loop.process_direct)
)
result = await runner( result = await runner(
envelope.task, envelope.task,
session_id=child_session_id, session_id=child_session_id,
@ -47,6 +56,13 @@ class LocalAgentRunner:
pinned_skill_contexts=envelope.inherited_pinned_skill_contexts, pinned_skill_contexts=envelope.inherited_pinned_skill_contexts,
allow_candidate_generation=allow_candidate_generation, allow_candidate_generation=allow_candidate_generation,
) )
loaded = target_loop.boot()
evidence = EvidenceBuilder(loaded.session_manager).build_run_evidence(
result.session_id,
result.run_id,
result.output_text,
result.finish_reason,
)
success = result.finish_reason == "stop" success = result.finish_reason == "stop"
return NodeRunResult( return NodeRunResult(
node_id=envelope.node_id or envelope.agent.name, node_id=envelope.node_id or envelope.agent.name,
@ -56,6 +72,7 @@ class LocalAgentRunner:
session_id=result.session_id, session_id=result.session_id,
finish_reason=result.finish_reason, finish_reason=result.finish_reason,
error=None if success else (result.output_text or result.finish_reason), error=None if success else (result.output_text or result.finish_reason),
evidence=evidence,
) )
@staticmethod @staticmethod

View File

@ -7,6 +7,7 @@ from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING: if TYPE_CHECKING:
from beaver.engine.context import SkillContext from beaver.engine.context import SkillContext
from beaver.tasks.evidence import RunEvidence
TeamStrategy = Literal[ TeamStrategy = Literal[
@ -116,6 +117,7 @@ class NodeRunResult:
session_id: str | None = None session_id: str | None = None
finish_reason: str = "stop" finish_reason: str = "stop"
error: str | None = None error: str | None = None
evidence: "RunEvidence | None" = None
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
@ -126,6 +128,7 @@ class NodeRunResult:
"session_id": self.session_id, "session_id": self.session_id,
"finish_reason": self.finish_reason, "finish_reason": self.finish_reason,
"error": self.error, "error": self.error,
"evidence": self.evidence.to_dict() if self.evidence is not None else None,
} }

View File

@ -637,36 +637,39 @@ class AgentLoop:
while True: while True:
chat_kwargs: dict[str, Any] = { chat_kwargs: dict[str, Any] = {
"messages": messages, "messages": messages,
"tools": tool_schemas, "tools": tool_schemas if include_tools else None,
"model": final_model, "model": final_model,
"max_tokens": resolved_max_tokens, "max_tokens": resolved_max_tokens,
"temperature": resolved_temperature, "temperature": resolved_temperature,
} }
if thinking_enabled is not None: if thinking_enabled is not None:
chat_kwargs["thinking_enabled"] = thinking_enabled chat_kwargs["thinking_enabled"] = thinking_enabled
message_char_length = len(json.dumps(messages, ensure_ascii=False, default=str))
tool_schema_char_length = len(json.dumps(tool_schemas, ensure_ascii=False, default=str))
tool_names = [
str(tool.get("function", {}).get("name") or tool.get("name") or "tool")
for tool in (tool_schemas or [])
if isinstance(tool, dict)
]
snapshot_payload = {
"iteration": iterations,
"provider_name": final_provider_name,
"model": final_model,
"message_count": len(messages),
"tool_names": tool_names,
"message_char_length": message_char_length,
"tool_schema_char_length": tool_schema_char_length,
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
}
session_manager.append_message( session_manager.append_message(
resolved_session_id, resolved_session_id,
run_id=resolved_run_id, run_id=resolved_run_id,
role="system", role="system",
event_type="llm_request_snapshotted", event_type="llm_request_snapshotted",
event_payload={ event_payload=snapshot_payload,
"iteration": iterations, content=json.dumps(snapshot_payload, ensure_ascii=False, default=str),
"provider_name": final_provider_name,
"model": final_model,
"messages": messages,
"tools": tool_schemas,
"max_tokens": resolved_max_tokens,
"temperature": resolved_temperature,
"thinking_enabled": thinking_enabled,
},
content=json.dumps(
{
"messages": messages,
"tools": tool_schemas,
},
ensure_ascii=False,
default=str,
),
context_visible=False, context_visible=False,
source=source, source=source,
title=title, title=title,
@ -708,8 +711,19 @@ class AgentLoop:
break break
if iterations >= resolved_max_tool_iterations: if iterations >= resolved_max_tool_iterations:
final_text = response.content or "Tool loop stopped after reaching the configured iteration limit." finalized = await self._finalize_after_tool_limit(
final_finish_reason = "max_tool_iterations" provider=provider,
messages=messages,
model=final_model,
max_tokens=resolved_max_tokens,
temperature=resolved_temperature,
thinking_enabled=thinking_enabled,
)
final_text = finalized or (
"Tool loop stopped after reaching the configured iteration limit, "
"and no final answer was produced."
)
final_finish_reason = "max_tool_iterations_finalized" if finalized else "max_tool_iterations"
session_manager.append_message( session_manager.append_message(
resolved_session_id, resolved_session_id,
run_id=resolved_run_id, run_id=resolved_run_id,
@ -853,6 +867,39 @@ class AgentLoop:
raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}") raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}")
return value return value
@staticmethod
async def _finalize_after_tool_limit(
*,
provider: Any,
messages: list[dict[str, Any]],
model: str,
max_tokens: int,
temperature: float,
thinking_enabled: bool | None,
) -> str:
final_messages = [
*messages,
{
"role": "system",
"content": (
"The configured tool iteration budget is exhausted. Do not call tools. "
"Produce the best final answer from the existing conversation and tool results. "
"State uncertainty explicitly."
),
},
]
kwargs: dict[str, Any] = {
"messages": final_messages,
"tools": None,
"model": model,
"max_tokens": max_tokens,
"temperature": temperature,
}
if thinking_enabled is not None:
kwargs["thinking_enabled"] = thinking_enabled
response = await provider.chat(**kwargs)
return (response.content or "").strip()
@staticmethod @staticmethod
def _load_pinned_skill_contexts(skills_loader: Any, skill_names: list[str]) -> list[SkillContext]: def _load_pinned_skill_contexts(skills_loader: Any, skill_names: list[str]) -> list[SkillContext]:
contexts: list[SkillContext] = [] contexts: list[SkillContext] = []

View File

@ -23,7 +23,7 @@ except ModuleNotFoundError: # pragma: no cover
litellm = None # type: ignore[assignment] litellm = None # type: ignore[assignment]
acompletion = None # type: ignore[assignment] acompletion = None # type: ignore[assignment]
_ALLOWED_MSG_KEYS = frozenset({"role", "content", "tool_calls", "tool_call_id", "name"}) _ALLOWED_MSG_KEYS = frozenset({"role", "content", "tool_calls", "tool_call_id", "name", "reasoning_content"})
class LiteLLMProvider(LLMProvider): class LiteLLMProvider(LLMProvider):
@ -175,15 +175,11 @@ class LiteLLMProvider(LLMProvider):
kwargs["provider"] = provider_payload kwargs["provider"] = provider_payload
def _apply_thinking_mode(self, original_model: str, resolved_model: str, kwargs: dict[str, Any], enabled: bool | None) -> None: def _apply_thinking_mode(self, original_model: str, resolved_model: str, kwargs: dict[str, Any], enabled: bool | None) -> None:
if enabled is None:
return
model_key = f"{original_model} {resolved_model}".lower()
if "qwen" not in model_key:
return
extra_body = dict(kwargs.get("extra_body") or {}) extra_body = dict(kwargs.get("extra_body") or {})
chat_template_kwargs = dict(extra_body.get("chat_template_kwargs") or {}) chat_template_kwargs = dict(extra_body.get("chat_template_kwargs") or {})
chat_template_kwargs["enable_thinking"] = bool(enabled) chat_template_kwargs["enable_thinking"] = False
extra_body["chat_template_kwargs"] = chat_template_kwargs extra_body["chat_template_kwargs"] = chat_template_kwargs
extra_body["thinking"] = {"type": "disabled"}
kwargs["extra_body"] = extra_body kwargs["extra_body"] = extra_body
async def chat( async def chat(

View File

@ -1745,7 +1745,7 @@ def create_app(
} }
if payload.thinking_enabled is not None: if payload.thinking_enabled is not None:
direct_kwargs["thinking_enabled"] = payload.thinking_enabled direct_kwargs["thinking_enabled"] = payload.thinking_enabled
result = await agent_service.submit_direct(message, **direct_kwargs) result = await _run_web_direct(agent_service, message, **direct_kwargs)
except ValueError as exc: except ValueError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc raise HTTPException(status_code=400, detail=str(exc)) from exc
except RuntimeError as exc: except RuntimeError as exc:
@ -1855,7 +1855,7 @@ def create_app(
websocket_thinking_enabled = _bool_or_none(payload.get("thinking_enabled")) websocket_thinking_enabled = _bool_or_none(payload.get("thinking_enabled"))
if websocket_thinking_enabled is not None: if websocket_thinking_enabled is not None:
direct_kwargs["thinking_enabled"] = websocket_thinking_enabled direct_kwargs["thinking_enabled"] = websocket_thinking_enabled
result = await agent_service.submit_direct(content, **direct_kwargs) result = await _run_web_direct(agent_service, content, **direct_kwargs)
except Exception as exc: except Exception as exc:
await websocket.send_json( await websocket.send_json(
{ {
@ -1940,6 +1940,12 @@ def _session_detail(session_manager: Any, session_id: str, session: dict[str, An
} }
async def _run_web_direct(agent_service: AgentService, message: str, **kwargs: Any) -> Any:
if agent_service.is_running:
return await agent_service.submit_direct(message, **kwargs)
return await agent_service.process_direct(message, **kwargs)
def _create_skill_upload_draft(loaded: Any, filename: str, content: bytes) -> dict[str, Any]: def _create_skill_upload_draft(loaded: Any, filename: str, content: bytes) -> dict[str, Any]:
try: try:
archive = zipfile.ZipFile(io.BytesIO(content), "r") archive = zipfile.ZipFile(io.BytesIO(content), "r")

View File

@ -22,7 +22,16 @@ from beaver.engine import AgentLoop, AgentProfile, AgentRunResult, EngineLoader
from beaver.engine.providers import make_provider_bundle from beaver.engine.providers import make_provider_bundle
from beaver.foundation.events import InboundMessage, OutboundMessage from beaver.foundation.events import InboundMessage, OutboundMessage
from beaver.foundation.models import CronJob, CronRunRecord from beaver.foundation.models import CronJob, CronRunRecord
from beaver.tasks import MainAgentRouter, TaskExecutionPlan, TaskRecord, ValidationResult from beaver.tasks import (
EvidenceBuilder,
MainAgentRouter,
RunEvidence,
TaskEvidencePacket,
TaskExecutionPlan,
TaskRecord,
ValidationResult,
render_task_evidence,
)
NOTIFICATION_SESSION_ID = "notify:default:scheduled" NOTIFICATION_SESSION_ID = "notify:default:scheduled"
@ -715,6 +724,7 @@ class AgentService:
) )
team_summaries: list[str] = [] team_summaries: list[str] = []
team_execution_context = "" team_execution_context = ""
team_result: TeamRunResult | None = None
if plan.is_team: if plan.is_team:
team_result, team_error = await self._run_team_for_task( team_result, team_error = await self._run_team_for_task(
plan, plan,
@ -725,7 +735,18 @@ class AgentService:
) )
if team_result is not None: if team_result is not None:
team_summaries = [self._team_summary_for_validation(team_result)] team_summaries = [self._team_summary_for_validation(team_result)]
team_execution_context = self._team_execution_context(plan, team_result) team_packet = TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=None,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results),
final_output="",
)
team_execution_context = self._join_context(
self._team_execution_context(plan, team_result),
"Rendered team evidence:\n" + render_task_evidence(team_packet),
)
self._append_task_observation( self._append_task_observation(
session_manager, session_manager,
task.session_id, task.session_id,
@ -782,6 +803,9 @@ class AgentService:
) )
elif team_execution_context: elif team_execution_context:
attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context) attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context)
if plan.is_team and team_execution_context:
attempt_kwargs["include_tools"] = False
attempt_kwargs["max_tool_iterations"] = 0
attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context( attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context(
task=task, task=task,
user_message=message, user_message=message,
@ -810,17 +834,39 @@ class AgentService:
result.run_id, result.run_id,
skill_names=self._skill_names_for_run(loaded, result.run_id), skill_names=self._skill_names_for_run(loaded, result.run_id),
) )
evidence_packet = self._build_task_evidence_packet(
session_manager=session_manager,
task=task,
attempt_index=attempt_index,
result=result,
team_result=team_result,
)
evidence_text = render_task_evidence(evidence_packet)
validation = await validation_service.validate_task_result( validation = await validation_service.validate_task_result(
task=task, task=task,
user_message=message, user_message=message,
final_output=result.output_text, final_output=result.output_text,
evidence_packet=evidence_packet,
evidence_text=evidence_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id), transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id), tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries, team_summaries=team_summaries,
provider_bundle=provider_bundle, provider_bundle=provider_bundle,
) )
latest_validation = validation latest_validation = validation
task = task_service.record_validation(task.task_id, result.run_id, validation) has_usable_answer = bool(result.output_text.strip()) and (
"Tool loop stopped after reaching the configured iteration limit." not in result.output_text
)
task = task_service.record_validation(
task.task_id,
result.run_id,
validation,
final_attempt=(
attempt_index == 2
or validation.status in {"accepted", "insufficient_evidence", "validator_error"}
),
has_usable_answer=has_usable_answer,
)
run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict()) run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict())
session_manager.update_latest_assistant_event_payload( session_manager.update_latest_assistant_event_payload(
result.session_id, result.session_id,
@ -831,6 +877,23 @@ class AgentService:
"validation_status": "passed" if validation.accepted else "failed", "validation_status": "passed" if validation.accepted else "failed",
}, },
) )
validation_debug = {
"evidence_run_ids": [
item.run_id for item in [evidence_packet.main_run, *evidence_packet.team_runs] if item is not None
],
"evidence_session_ids": [
item.session_id
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
if item is not None
],
"tool_result_count": sum(
len(item.tool_results)
for item in [evidence_packet.main_run, *evidence_packet.team_runs]
if item is not None
),
"evidence_length": len(evidence_text),
}
retry_scheduled = validation.status == "rejected" and attempt_index == 1
session_manager.append_message( session_manager.append_message(
result.session_id, result.session_id,
run_id=result.run_id, run_id=result.run_id,
@ -840,17 +903,18 @@ class AgentService:
"task_id": task.task_id, "task_id": task.task_id,
"attempt_index": attempt_index, "attempt_index": attempt_index,
"validation_result": validation.to_dict(), "validation_result": validation.to_dict(),
"retry_scheduled": not validation.accepted and attempt_index == 1, "validation_debug": validation_debug,
"retry_scheduled": retry_scheduled,
}, },
content=validation.recommended_revision_prompt or None, content=validation.recommended_revision_prompt or None,
context_visible=False, context_visible=False,
) )
if not validation.accepted and attempt_index == 1: if retry_scheduled:
session_manager.set_run_context_visible(result.session_id, result.run_id, False) session_manager.set_run_context_visible(result.session_id, result.run_id, False)
result.task_id = task.task_id result.task_id = task.task_id
result.task_status = task.status result.task_status = task.status
result.validation_result = validation.to_dict() result.validation_result = validation.to_dict()
if validation.accepted or attempt_index == 2: if not retry_scheduled:
return result return result
if last_result is None: # pragma: no cover - defensive if last_result is None: # pragma: no cover - defensive
@ -1083,6 +1147,36 @@ class AgentService:
payloads.append(payload) payloads.append(payload)
return payloads return payloads
@staticmethod
def _team_run_evidence(result: TeamRunResult | None) -> list[RunEvidence]:
if result is None:
return []
return [node.evidence for node in result.node_results if node.evidence is not None]
def _build_task_evidence_packet(
self,
*,
session_manager: Any,
task: TaskRecord,
attempt_index: int,
result: AgentRunResult,
team_result: TeamRunResult | None,
) -> TaskEvidencePacket:
main_run = EvidenceBuilder(session_manager).build_run_evidence(
result.session_id,
result.run_id,
result.output_text,
result.finish_reason,
)
return TaskEvidencePacket(
task_id=task.task_id,
attempt_index=attempt_index,
main_run=main_run,
team_runs=self._team_run_evidence(team_result),
team_node_results=list(team_result.node_results) if team_result is not None else [],
final_output=result.output_text,
)
@staticmethod @staticmethod
def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str: def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str:
node_lines = [ node_lines = [

View File

@ -16,10 +16,10 @@ if TYPE_CHECKING:
class TeamService: class TeamService:
"""Internal service for Beaver-native multi-agent execution.""" """Internal service for Beaver-native multi-agent execution."""
def __init__(self, loop: AgentLoop) -> None: def __init__(self, loop: AgentLoop, *, max_parallel_team_nodes: int = 3) -> None:
self.loop = loop self.loop = loop
self.runner = LocalAgentRunner(loop) self.runner = LocalAgentRunner(loop)
self.scheduler = TeamGraphScheduler(self.runner) self.scheduler = TeamGraphScheduler(self.runner, max_parallel_team_nodes=max_parallel_team_nodes)
async def run_team( async def run_team(
self, self,

View File

@ -1,6 +1,7 @@
"""Internal task tracking for automatic Main Agent task mode.""" """Internal task tracking for automatic Main Agent task mode."""
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult from .evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
from .models import MainAgentDecision, TaskEvent, TaskRecord, ValidationResult, ValidationStatus
from .planner import TaskExecutionPlan, TaskExecutionPlanner from .planner import TaskExecutionPlan, TaskExecutionPlanner
from .router import MainAgentRouter from .router import MainAgentRouter
from .service import TaskService from .service import TaskService
@ -8,15 +9,21 @@ from .skill_resolver import SkillResolutionReport, TaskSkillResolver
from .validation import ValidationService from .validation import ValidationService
__all__ = [ __all__ = [
"EvidenceBuilder",
"MainAgentDecision", "MainAgentDecision",
"MainAgentRouter", "MainAgentRouter",
"RunEvidence",
"TaskEvent", "TaskEvent",
"TaskEvidencePacket",
"TaskExecutionPlan", "TaskExecutionPlan",
"TaskExecutionPlanner", "TaskExecutionPlanner",
"TaskRecord", "TaskRecord",
"TaskService", "TaskService",
"SkillResolutionReport", "SkillResolutionReport",
"TaskSkillResolver", "TaskSkillResolver",
"ToolEvidence",
"ValidationResult", "ValidationResult",
"ValidationStatus",
"ValidationService", "ValidationService",
"render_task_evidence",
] ]

View File

@ -0,0 +1,183 @@
"""Structured evidence for task synthesis and validation."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any] = field(default_factory=dict)
url: str | None = None
title: str | None = None
created_at: str | None = None
def to_dict(self) -> dict[str, Any]:
return {
"tool_name": self.tool_name,
"tool_call_id": self.tool_call_id,
"content": self.content,
"event_payload": dict(self.event_payload),
"url": self.url,
"title": self.title,
"created_at": self.created_at,
}
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]] = field(default_factory=list)
tool_results: list[ToolEvidence] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"run_id": self.run_id,
"session_id": self.session_id,
"output_text": self.output_text,
"finish_reason": self.finish_reason,
"transcript": list(self.transcript),
"tool_results": [item.to_dict() for item in self.tool_results],
"warnings": list(self.warnings),
}
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence] = field(default_factory=list)
team_node_results: list[Any] = field(default_factory=list)
final_output: str = ""
def to_dict(self) -> dict[str, Any]:
return {
"task_id": self.task_id,
"attempt_index": self.attempt_index,
"main_run": self.main_run.to_dict() if self.main_run else None,
"team_runs": [item.to_dict() for item in self.team_runs],
"team_node_results": [
item.to_dict() if hasattr(item, "to_dict") else dict(item)
for item in self.team_node_results
],
"final_output": self.final_output,
}
class EvidenceBuilder:
def __init__(self, session_manager: Any) -> None:
self.session_manager = session_manager
def build_run_evidence(
self,
session_id: str,
run_id: str,
output_text: str,
finish_reason: str,
) -> RunEvidence:
events = self.session_manager.get_run_event_records(session_id, run_id)
transcript: list[dict[str, Any]] = []
tool_results: list[ToolEvidence] = []
warnings: list[str] = []
for event in events:
payload = dict(event.event_payload or {})
transcript.append(
{
"role": event.role,
"event_type": event.event_type,
"content": event.content,
"tool_name": event.tool_name,
"tool_call_id": event.tool_call_id,
"finish_reason": event.finish_reason,
"event_payload": payload,
}
)
if event.event_type == "tool_result_recorded":
tool_results.append(
ToolEvidence(
tool_name=event.tool_name or "tool",
tool_call_id=event.tool_call_id,
content=event.content or "",
event_payload=payload,
url=_optional_str(payload.get("url")),
title=_optional_str(payload.get("title")),
created_at=_optional_str(payload.get("created_at")),
)
)
if finish_reason and finish_reason != "stop":
warnings.append(f"finish_reason={finish_reason}")
return RunEvidence(
run_id=run_id,
session_id=session_id,
output_text=output_text,
finish_reason=finish_reason,
transcript=transcript,
tool_results=tool_results,
warnings=warnings,
)
def render_task_evidence(packet: TaskEvidencePacket) -> str:
sections = [
f"Task evidence packet: task_id={packet.task_id} attempt={packet.attempt_index}",
f"Final output:\n{packet.final_output}",
]
if packet.main_run is not None:
sections.append("Main run evidence:\n" + render_run_evidence(packet.main_run))
if packet.team_runs:
sections.append(
"Team run evidence:\n"
+ "\n\n".join(render_run_evidence(item) for item in packet.team_runs)
)
if packet.team_node_results:
lines = []
for item in packet.team_node_results:
lines.append(
f"- {getattr(item, 'node_id', '')}: success={getattr(item, 'success', False)} "
f"finish_reason={getattr(item, 'finish_reason', '')} error={getattr(item, 'error', '') or ''}"
)
sections.append("Team node results:\n" + "\n".join(lines))
return "\n\n".join(section for section in sections if section.strip())
def render_run_evidence(evidence: RunEvidence) -> str:
lines = [
f"run_id={evidence.run_id}",
f"session_id={evidence.session_id}",
f"finish_reason={evidence.finish_reason}",
]
if evidence.output_text:
lines.append(f"output:\n{evidence.output_text}")
if evidence.warnings:
lines.append("warnings:\n" + "\n".join(f"- {item}" for item in evidence.warnings))
if evidence.tool_results:
lines.append(
"tool_results:\n"
+ "\n\n".join(_render_tool_evidence(item) for item in evidence.tool_results)
)
return "\n".join(lines)
def _render_tool_evidence(item: ToolEvidence) -> str:
header = f"- tool={item.tool_name} call_id={item.tool_call_id or ''}"
metadata = []
if item.url:
metadata.append(f"url={item.url}")
if item.title:
metadata.append(f"title={item.title}")
if item.created_at:
metadata.append(f"created_at={item.created_at}")
return "\n".join([header, *metadata, item.content])
def _optional_str(value: Any) -> str | None:
return str(value) if value is not None else None

View File

@ -3,31 +3,63 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from typing import Any, Literal
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_revision"} ValidationStatus = Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
VALIDATION_STATUSES = {"accepted", "rejected", "insufficient_evidence", "validator_error"}
TASK_OPEN_STATUSES = {"open", "running", "validating", "awaiting_feedback", "needs_review", "needs_revision"}
@dataclass(slots=True) @dataclass(slots=True)
class ValidationResult: class ValidationResult:
passed: bool status: ValidationStatus = "rejected"
score: float score: float = 0.0
issues: list[str] = field(default_factory=list) issues: list[str] = field(default_factory=list)
missing_requirements: list[str] = field(default_factory=list) missing_requirements: list[str] = field(default_factory=list)
evidence_gaps: list[str] = field(default_factory=list)
recommended_revision_prompt: str = "" recommended_revision_prompt: str = ""
validator: str = "heuristic" validator: str = "heuristic"
def __init__(
self,
*,
status: ValidationStatus | None = None,
passed: bool | None = None,
score: float = 0.0,
issues: list[str] | None = None,
missing_requirements: list[str] | None = None,
evidence_gaps: list[str] | None = None,
recommended_revision_prompt: str = "",
validator: str = "heuristic",
) -> None:
if status is not None and status not in VALIDATION_STATUSES:
raise ValueError(f"unknown validation status: {status}")
self.status = status or ("accepted" if passed and score >= 0.75 else "rejected")
self.score = max(0.0, min(1.0, float(score or 0.0)))
self.issues = list(issues or [])
self.missing_requirements = list(missing_requirements or [])
self.evidence_gaps = list(evidence_gaps or [])
self.recommended_revision_prompt = recommended_revision_prompt
self.validator = validator
@property
def passed(self) -> bool:
return self.status == "accepted"
@property @property
def accepted(self) -> bool: def accepted(self) -> bool:
return self.passed and self.score >= 0.75 return self.status == "accepted"
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
"status": self.status,
"passed": self.passed, "passed": self.passed,
"score": self.score, "score": self.score,
"issues": list(self.issues), "issues": list(self.issues),
"missing_requirements": list(self.missing_requirements), "missing_requirements": list(self.missing_requirements),
"evidence_gaps": list(self.evidence_gaps),
"recommended_revision_prompt": self.recommended_revision_prompt, "recommended_revision_prompt": self.recommended_revision_prompt,
"validator": self.validator, "validator": self.validator,
"accepted": self.accepted, "accepted": self.accepted,
@ -37,11 +69,17 @@ class ValidationResult:
def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None": def from_dict(cls, payload: dict[str, Any] | None) -> "ValidationResult | None":
if not isinstance(payload, dict): if not isinstance(payload, dict):
return None return None
raw_status = payload.get("status")
if "status" in payload and raw_status not in VALIDATION_STATUSES:
raise ValueError(f"unknown validation status: {raw_status}")
status: ValidationStatus | None = raw_status if "status" in payload else None
return cls( return cls(
passed=bool(payload.get("passed")), status=status,
passed=bool(payload.get("passed")) if "status" not in payload else None,
score=float(payload.get("score", 0.0) or 0.0), score=float(payload.get("score", 0.0) or 0.0),
issues=[str(item) for item in payload.get("issues") or []], issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator=str(payload.get("validator") or "unknown"), validator=str(payload.get("validator") or "unknown"),
) )
@ -73,6 +111,14 @@ class TaskRecord:
def is_open(self) -> bool: def is_open(self) -> bool:
return self.status in TASK_OPEN_STATUSES return self.status in TASK_OPEN_STATUSES
@property
def is_execution_active(self) -> bool:
return self.status in {"running", "validating"}
@property
def requires_user_action(self) -> bool:
return self.status in {"awaiting_feedback", "needs_review", "needs_revision"}
def to_dict(self) -> dict[str, Any]: def to_dict(self) -> dict[str, Any]:
return { return {
"task_id": self.task_id, "task_id": self.task_id,

View File

@ -77,6 +77,8 @@ class TaskService:
payload = task.to_dict() payload = task.to_dict()
payload["short_title"] = self.ensure_short_title(task).metadata.get("short_title") payload["short_title"] = self.ensure_short_title(task).metadata.get("short_title")
payload["is_open"] = task.is_open payload["is_open"] = task.is_open
payload["is_execution_active"] = task.is_execution_active
payload["requires_user_action"] = task.requires_user_action
return payload return payload
def ensure_short_title(self, task: TaskRecord) -> TaskRecord: def ensure_short_title(self, task: TaskRecord) -> TaskRecord:
@ -108,10 +110,30 @@ class TaskService:
self._event(task, "run_completed", run_id=run_id, payload={"skill_names": skill_names or []}) self._event(task, "run_completed", run_id=run_id, payload={"skill_names": skill_names or []})
return task return task
def record_validation(self, task_id: str, run_id: str, validation: ValidationResult) -> TaskRecord: def record_validation(
self,
task_id: str,
run_id: str,
validation: ValidationResult,
*,
final_attempt: bool = True,
has_usable_answer: bool = True,
) -> TaskRecord:
task = self._require(task_id) task = self._require(task_id)
task.status = "awaiting_feedback" now = self._now()
task.updated_at = self._now() if validation.status == "accepted":
task.status = "awaiting_feedback"
elif validation.status in {"insufficient_evidence", "validator_error"}:
task.status = "needs_review"
elif validation.status == "rejected" and not final_attempt:
task.status = "needs_revision"
elif validation.status == "rejected" and has_usable_answer:
task.status = "needs_review"
else:
task.status = "failed"
task.closed_at = now
task.close_reason = "automatic validation rejected the final attempt"
task.updated_at = now
task.validation_result = validation.to_dict() task.validation_result = validation.to_dict()
self.store.upsert_task(task) self.store.upsert_task(task)
self._event(task, "validated", run_id=run_id, payload=validation.to_dict()) self._event(task, "validated", run_id=run_id, payload=validation.to_dict())

View File

@ -17,6 +17,8 @@ class ValidationService:
task: TaskRecord, task: TaskRecord,
user_message: str, user_message: str,
final_output: str, final_output: str,
evidence_packet: Any | None = None,
evidence_text: str = "",
transcript_excerpt: str = "", transcript_excerpt: str = "",
tool_summaries: list[str] | None = None, tool_summaries: list[str] | None = None,
team_summaries: list[str] | None = None, team_summaries: list[str] | None = None,
@ -36,19 +38,20 @@ class ValidationService:
task=task, task=task,
user_message=user_message, user_message=user_message,
final_output=final_output, final_output=final_output,
evidence_text=evidence_text,
transcript_excerpt=transcript_excerpt, transcript_excerpt=transcript_excerpt,
tool_summaries=tool_summaries or [], tool_summaries=tool_summaries or [],
team_summaries=team_summaries or [], team_summaries=team_summaries or [],
) )
except Exception as exc: except Exception as exc:
return ValidationResult( return ValidationResult(
passed=False, status="validator_error",
score=0.0, score=0.0,
issues=[f"Validator failed: {exc}"], issues=[f"Validator failed: {exc}"],
missing_requirements=["A valid automatic validation result is required before accepting the task."], evidence_gaps=["Automatic validation failed before producing a reliable decision."],
missing_requirements=["User review is required because automatic validation failed."],
recommended_revision_prompt=( recommended_revision_prompt=(
"Review the task result again because automatic validation failed, " "Review the answer and evidence, then decide whether to revise or accept it."
"then provide a corrected final answer that explicitly satisfies the task goal."
), ),
validator="llm_error", validator="llm_error",
) )
@ -62,20 +65,25 @@ class ValidationService:
task: TaskRecord, task: TaskRecord,
user_message: str, user_message: str,
final_output: str, final_output: str,
evidence_text: str,
transcript_excerpt: str, transcript_excerpt: str,
tool_summaries: list[str], tool_summaries: list[str],
team_summaries: list[str], team_summaries: list[str],
) -> ValidationResult: ) -> ValidationResult:
legacy_context = "" if evidence_text else (
f"Transcript excerpt:\n{transcript_excerpt}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries, ensure_ascii=False)}\n\n"
f"Team summaries:\n{json.dumps(team_summaries, ensure_ascii=False)}\n\n"
)
prompt = ( prompt = (
"Validate whether the assistant output satisfies the task. " "Validate whether the assistant output satisfies the task. "
"Return only compact JSON with keys: passed, score, issues, " "Return only compact JSON with keys: passed, score, issues, "
"missing_requirements, recommended_revision_prompt.\n\n" "missing_requirements, recommended_revision_prompt.\n\n"
f"Task goal:\n{task.goal}\n\n" f"Task goal:\n{task.goal}\n\n"
f"Current user request:\n{user_message}\n\n" f"Current user request:\n{user_message}\n\n"
f"Transcript excerpt:\n{transcript_excerpt[:2500]}\n\n" f"Evidence packet:\n{evidence_text}\n\n"
f"Tool summaries:\n{json.dumps(tool_summaries[:12], ensure_ascii=False)}\n\n" f"{legacy_context}"
f"Team summaries:\n{json.dumps(team_summaries[:12], ensure_ascii=False)}\n\n" f"Assistant final output:\n{final_output}"
f"Assistant final output:\n{final_output[:4000]}"
) )
response = await provider.chat( response = await provider.chat(
messages=[ messages=[
@ -88,11 +96,19 @@ class ValidationService:
temperature=0.0, temperature=0.0,
) )
payload = self._parse_json_object(response.content or "") payload = self._parse_json_object(response.content or "")
status = payload.get("status")
if status not in {"accepted", "rejected", "insufficient_evidence", "validator_error"}:
status = (
"accepted"
if payload.get("passed") and float(payload.get("score", 0.0) or 0.0) >= 0.75
else "rejected"
)
return ValidationResult( return ValidationResult(
passed=bool(payload.get("passed")), status=status,
score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))), score=max(0.0, min(1.0, float(payload.get("score", 0.0) or 0.0))),
issues=[str(item) for item in payload.get("issues") or []], issues=[str(item) for item in payload.get("issues") or []],
missing_requirements=[str(item) for item in payload.get("missing_requirements") or []], missing_requirements=[str(item) for item in payload.get("missing_requirements") or []],
evidence_gaps=[str(item) for item in payload.get("evidence_gaps") or []],
recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""), recommended_revision_prompt=str(payload.get("recommended_revision_prompt") or ""),
validator="llm", validator="llm",
) )

View File

@ -45,6 +45,18 @@ class RecordingProvider(LLMProvider):
return "stub-model" return "stub-model"
class BlockingProvider(RecordingProvider):
def __init__(self, content: str, started: asyncio.Event, release: asyncio.Event) -> None:
super().__init__([_response(content)])
self.started = started
self.release = release
async def chat(self, *args, **kwargs) -> LLMResponse:
self.started.set()
await self.release.wait()
return await super().chat(*args, **kwargs)
class StubSkillAssembler: class StubSkillAssembler:
def __init__(self, activated_skills: list[SkillContext] | None = None) -> None: def __init__(self, activated_skills: list[SkillContext] | None = None) -> None:
self.activated_skills = list(activated_skills or []) self.activated_skills = list(activated_skills or [])
@ -153,6 +165,26 @@ def test_local_agent_runner_uses_shared_loop_and_records_parent_task(tmp_path: P
assert child_session["parent_session_id"] == "session-root" assert child_session["parent_session_id"] == "session-root"
def test_team_node_preserves_evidence_when_finish_reason_is_not_stop(tmp_path: Path) -> None:
loop = _loop(tmp_path)
provider = RecordingProvider([_response("partial evidence", finish_reason="max_tool_iterations")])
envelope = DelegationEnvelope(
parent_task_id="task-parent",
parent_session_id="session-root",
parent_run_id="run-root",
agent=AgentDescriptor(name="researcher", role="research"),
task="research the requested topic",
node_id="research",
)
result = asyncio.run(LocalAgentRunner(loop).run(envelope, provider_bundle=_bundle(provider)))
assert result.success is False
assert result.evidence is not None
assert result.evidence.output_text == "partial evidence"
assert result.evidence.finish_reason == "max_tool_iterations"
def test_pinned_skill_is_injected_into_delegated_run(tmp_path: Path) -> None: def test_pinned_skill_is_injected_into_delegated_run(tmp_path: Path) -> None:
_publish_skill( _publish_skill(
tmp_path, tmp_path,
@ -278,6 +310,57 @@ def test_team_parallel_runs_all_nodes(tmp_path: Path) -> None:
assert [item.output_text for item in result.node_results] == ["one", "two", "three"] assert [item.output_text for item in result.node_results] == ["one", "two", "three"]
def test_team_parallel_starts_nodes_concurrently_with_isolated_loops(tmp_path: Path) -> None:
loop = _loop(tmp_path)
first_started = asyncio.Event()
second_started = asyncio.Event()
release = asyncio.Event()
providers = {
"one": BlockingProvider("one", first_started, release),
"two": BlockingProvider("two", second_started, release),
}
graph = ExecutionGraph(
strategy="parallel",
nodes=[
ExecutionNode("one", "task one", AgentDescriptor(name="one")),
ExecutionNode("two", "task two", AgentDescriptor(name="two")),
],
)
async def run_case():
loop_task = asyncio.create_task(loop.run())
await asyncio.sleep(0)
task = asyncio.create_task(
TeamService(loop).run_team(
graph,
parent_task_id=None,
parent_session_id="session-root",
parent_run_id="run-root",
provider_bundle_factory=lambda node: _bundle(providers[node.node_id]),
)
)
try:
await asyncio.wait_for(first_started.wait(), timeout=1)
await asyncio.wait_for(second_started.wait(), timeout=1)
release.set()
return await task
finally:
release.set()
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await loop.stop()
await loop_task
result = asyncio.run(run_case())
assert result.success is True
assert [item.node_id for item in result.node_results] == ["one", "two"]
def test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs(tmp_path: Path) -> None: def test_parallel_node_factory_error_is_normalized_and_keeps_completed_runs(tmp_path: Path) -> None:
loop = _loop(tmp_path) loop = _loop(tmp_path)
loaded = loop.boot() loaded = loop.boot()
@ -438,7 +521,7 @@ def test_team_summary_lists_only_failed_nodes_when_all_nodes_fail(tmp_path: Path
) )
assert result.success is False assert result.success is False
assert result.summary == "Failed nodes:\n- one: one down\n- two: two down" assert result.summary == "Failed nodes:\n- one: one down evidence=no\n- two: two down evidence=no"
def test_graph_structure_errors_still_raise(tmp_path: Path) -> None: def test_graph_structure_errors_still_raise(tmp_path: Path) -> None:

View File

@ -4,6 +4,8 @@ from beaver.engine import AgentLoop, EngineLoader
from beaver.engine.providers import make_provider_bundle from beaver.engine.providers import make_provider_bundle
from beaver.engine.providers.litellm import LiteLLMProvider from beaver.engine.providers.litellm import LiteLLMProvider
from beaver.foundation.config import load_config from beaver.foundation.config import load_config
from beaver.interfaces.web.app import _reload_agent_config
from beaver.services.agent_service import AgentService
def test_load_config_reads_current_instance_shape(tmp_path) -> None: def test_load_config_reads_current_instance_shape(tmp_path) -> None:
@ -124,6 +126,41 @@ def test_agent_loop_config_drives_provider_bundle(tmp_path) -> None:
loop.close() loop.close()
def test_reload_agent_config_updates_booted_loop_config(tmp_path) -> None:
workspace = tmp_path / "workspace"
config_path = tmp_path / "config.json"
config_path.write_text(
json.dumps(
{
"agents": {"defaults": {"workspace": str(workspace), "model": "old-model"}},
"providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://old.example.com/v1"}},
}
),
encoding="utf-8",
)
service = AgentService(config_path=config_path)
loaded = service.create_loop().boot()
assert loaded.config.default_model == "old-model"
config_path.write_text(
json.dumps(
{
"agents": {"defaults": {"workspace": str(workspace), "model": "new-model"}},
"providers": {"openai": {"apiKey": "sk-test", "apiBase": "https://new.example.com/v1"}},
}
),
encoding="utf-8",
)
_reload_agent_config(service, config_path)
target = service.create_loop().boot().config.resolve_provider_target()
assert target["model"] == "new-model"
assert target["api_base"] == "https://new.example.com/v1"
assert target["api_key"] == "sk-test"
service.close()
def test_openai_compatible_qwen_config_keeps_openai_provider() -> None: def test_openai_compatible_qwen_config_keeps_openai_provider() -> None:
bundle = make_provider_bundle( bundle = make_provider_bundle(
model="qwen-plus", model="qwen-plus",

View File

@ -45,10 +45,13 @@ def test_qwen_thinking_mode_is_sent_as_chat_template_kwargs(monkeypatch: pytest.
) )
assert response.content == "可以" assert response.content == "可以"
assert captured["extra_body"] == {"chat_template_kwargs": {"enable_thinking": False}} assert captured["extra_body"] == {
"chat_template_kwargs": {"enable_thinking": False},
"thinking": {"type": "disabled"},
}
def test_non_qwen_thinking_mode_is_not_sent(monkeypatch: pytest.MonkeyPatch) -> None: def test_thinking_mode_disabled_is_sent_without_model_name_matching(monkeypatch: pytest.MonkeyPatch) -> None:
captured: dict = {} captured: dict = {}
class Message: class Message:
@ -85,7 +88,72 @@ def test_non_qwen_thinking_mode_is_not_sent(monkeypatch: pytest.MonkeyPatch) ->
) )
) )
assert "extra_body" not in captured assert captured["extra_body"] == {
"chat_template_kwargs": {"enable_thinking": False},
"thinking": {"type": "disabled"},
}
def test_litellm_provider_preserves_reasoning_content_for_tool_round_trip() -> None:
messages = [
{
"role": "assistant",
"content": "",
"reasoning_content": "must be passed back",
"tool_calls": [
{
"id": "call-1",
"type": "function",
"function": {"name": "lookup", "arguments": "{}"},
}
],
}
]
assert LiteLLMProvider._sanitize_messages(messages)[0]["reasoning_content"] == "must be passed back"
def test_thinking_mode_is_forced_disabled_even_when_requested_enabled(monkeypatch: pytest.MonkeyPatch) -> None:
captured: dict = {}
class Message:
content = "ok"
reasoning_content = None
tool_calls = []
class Choice:
message = Message()
finish_reason = "stop"
class Response:
choices = [Choice()]
usage = None
async def fake_acompletion(**kwargs):
captured.update(kwargs)
return Response()
monkeypatch.setattr("beaver.engine.providers.litellm.acompletion", fake_acompletion)
monkeypatch.setattr("beaver.engine.providers.litellm.litellm", SimpleNamespace())
provider = LiteLLMProvider(
api_key="sk-test",
api_base="https://oai.example.com/v1",
default_model="gpt-4.1-mini",
provider_name="openai",
)
asyncio.run(
provider.chat(
[{"role": "user", "content": "reply ok"}],
model="gpt-4.1-mini",
thinking_enabled=True,
)
)
assert captured["extra_body"] == {
"chat_template_kwargs": {"enable_thinking": False},
"thinking": {"type": "disabled"},
}
def test_litellm_provider_sanitizes_tool_call_arguments(monkeypatch: pytest.MonkeyPatch) -> None: def test_litellm_provider_sanitizes_tool_call_arguments(monkeypatch: pytest.MonkeyPatch) -> None:

View File

@ -608,6 +608,12 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
provider_name="stub", provider_name="stub",
model="stub-model", model="stub-model",
), ),
LLMResponse(
content="Based on the available tool result, the container likely failed during startup.",
finish_reason="stop",
provider_name="stub",
model="stub-model",
),
] ]
), ),
) )
@ -621,7 +627,29 @@ def test_agent_loop_records_max_tool_iterations_as_failed_skill_effect(tmp_path:
) )
loaded = loop.boot() loaded = loop.boot()
assert result.finish_reason == "max_tool_iterations" assert result.finish_reason == "max_tool_iterations_finalized"
assert "Based on the available tool result" in result.output_text
assert "Tool loop stopped" not in result.output_text
effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007") effect_records = loaded.run_memory_store.list_skill_effects("docker-debug", version="v0007")
assert effect_records[-1].run_id == result.run_id assert effect_records[-1].run_id == result.run_id
assert effect_records[-1].success is False assert effect_records[-1].success is False
def test_llm_request_snapshot_defaults_to_compact_payload(tmp_path: Path) -> None:
loop = AgentLoop(loader=EngineLoader(workspace=tmp_path, skill_assembler=StubSkillAssembler([])))
bundle = ProviderBundle(
main_runtime=SimpleNamespace(model="stub-model", provider_name="stub"),
main_provider=StubProvider(
[LLMResponse(content="done", finish_reason="stop", provider_name="stub", model="stub-model")]
),
)
result = asyncio.run(loop.process_direct("hello", provider_bundle=bundle))
loaded = loop.boot()
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
snapshot = next(event for event in events if event.event_type == "llm_request_snapshotted")
assert "message_count" in snapshot.event_payload
assert "tool_names" in snapshot.event_payload
assert "messages" not in snapshot.event_payload
assert "tools" not in snapshot.event_payload

View File

@ -0,0 +1,91 @@
from __future__ import annotations
from pathlib import Path
from beaver.engine.session.manager import SessionManager
from beaver.tasks.evidence import EvidenceBuilder, RunEvidence, TaskEvidencePacket, ToolEvidence, render_task_evidence
def test_evidence_builder_preserves_full_tool_result(tmp_path: Path) -> None:
session_manager = SessionManager(tmp_path)
session_id = "session-1"
run_id = "run-1"
long_content = "prefix " + ("x" * 700) + " MAN 3 FT 2 NFO"
session_manager.ensure_session(session_id, source="test")
session_manager.append_message(session_id, run_id=run_id, role="user", event_type="user_message_added", content="score?")
session_manager.append_message(
session_id,
run_id=run_id,
role="tool",
event_type="tool_result_recorded",
event_payload={"success": True, "url": "https://example.test/match"},
content=long_content,
tool_name="web_fetch",
tool_call_id="call-1",
)
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="run_completed",
event_payload={"finish_reason": "stop"},
content="Manchester United won 3-2.",
finish_reason="stop",
context_visible=False,
)
evidence = EvidenceBuilder(session_manager).build_run_evidence(
session_id,
run_id,
"Manchester United won 3-2.",
"stop",
)
rendered = render_task_evidence(
TaskEvidencePacket(
task_id="task-1",
attempt_index=1,
main_run=evidence,
team_runs=[],
team_node_results=[],
final_output="Manchester United won 3-2.",
)
)
assert evidence.tool_results[0].content == long_content
assert "MAN 3 FT 2 NFO" in rendered
assert "https://example.test/match" in rendered
def test_render_task_evidence_includes_failed_team_run_tool_results() -> None:
run = RunEvidence(
run_id="run-team",
session_id="session-team",
output_text="Tool loop stopped.",
finish_reason="max_tool_iterations",
transcript=[],
tool_results=[
ToolEvidence(
tool_name="web_fetch",
tool_call_id="call-team",
content="Recovered partial source content.",
event_payload={"success": True, "created_at": "2026-05-22T12:00:00Z"},
created_at="2026-05-22T12:00:00Z",
)
],
warnings=["finish_reason=max_tool_iterations"],
)
packet = TaskEvidencePacket(
task_id="task-1",
attempt_index=2,
main_run=None,
team_runs=[run],
team_node_results=[],
final_output="partial answer",
)
rendered = render_task_evidence(packet)
assert "finish_reason=max_tool_iterations" in rendered
assert "partial answer" in rendered
assert "Recovered partial source content." in rendered
assert "created_at=2026-05-22T12:00:00Z" in rendered

View File

@ -13,14 +13,14 @@ from beaver.engine.providers.base import LLMProvider, LLMResponse
from beaver.engine.providers.factory import ProviderBundle from beaver.engine.providers.factory import ProviderBundle
from beaver.services.agent_service import AgentService from beaver.services.agent_service import AgentService
from beaver.skills.assembler import SkillAssemblyResult from beaver.skills.assembler import SkillAssemblyResult
from beaver.tasks import TaskExecutionPlan, TaskService, ValidationResult, ValidationService from beaver.tasks import TaskExecutionPlan, TaskRecord, TaskService, ValidationResult, ValidationService
class StubProvider(LLMProvider): class StubProvider(LLMProvider):
def __init__(self, responses: list[LLMResponse]) -> None: def __init__(self, responses: list[LLMResponse]) -> None:
super().__init__() super().__init__()
self._responses = list(responses) self._responses = list(responses)
self.calls: list[list[dict]] = [] self.calls: list[dict[str, object]] = []
async def chat( async def chat(
self, self,
@ -30,7 +30,7 @@ class StubProvider(LLMProvider):
max_tokens: int = 4096, max_tokens: int = 4096,
temperature: float = 0.7, temperature: float = 0.7,
) -> LLMResponse: ) -> LLMResponse:
self.calls.append(messages) self.calls.append({"messages": messages, "tools": tools, "model": model})
if not self._responses: if not self._responses:
raise AssertionError("No stubbed provider responses left") raise AssertionError("No stubbed provider responses left")
return self._responses.pop(0) return self._responses.pop(0)
@ -42,8 +42,10 @@ class StubProvider(LLMProvider):
class StubValidationService: class StubValidationService:
def __init__(self, results: list[ValidationResult]) -> None: def __init__(self, results: list[ValidationResult]) -> None:
self.results = list(results) self.results = list(results)
self.calls: list[dict] = []
async def validate_task_result(self, **kwargs) -> ValidationResult: async def validate_task_result(self, **kwargs) -> ValidationResult:
self.calls.append(kwargs)
if not self.results: if not self.results:
raise AssertionError("No stubbed validation results left") raise AssertionError("No stubbed validation results left")
return self.results.pop(0) return self.results.pop(0)
@ -153,6 +155,21 @@ def _main_only_bundle(*responses: str) -> ProviderBundle:
) )
def _task_record(status: str) -> TaskRecord:
return TaskRecord(
task_id="task-1",
session_id="session-1",
description="test task",
goal="test task",
constraints=[],
priority=0,
status=status,
creator="main-agent",
created_at="2026-05-22T00:00:00+00:00",
updated_at="2026-05-22T00:00:00+00:00",
)
def test_simple_question_does_not_create_task(tmp_path: Path) -> None: def test_simple_question_does_not_create_task(tmp_path: Path) -> None:
service = AgentService( service = AgentService(
loader=EngineLoader( loader=EngineLoader(
@ -393,6 +410,81 @@ def test_explicit_revision_feedback_then_input_reruns_without_duplicate_feedback
assert task.feedback[0]["comment"] == "准备补充穿衣建议" assert task.feedback[0]["comment"] == "准备补充穿衣建议"
def test_validation_result_status_drives_accepted_and_passed() -> None:
accepted = ValidationResult(status="accepted", score=0.9, validator="test")
insufficient = ValidationResult(status="insufficient_evidence", score=0.9, validator="test")
rejected = ValidationResult(status="rejected", score=0.9, validator="test")
assert accepted.passed is True
assert accepted.accepted is True
assert insufficient.passed is False
assert insufficient.accepted is False
assert rejected.passed is False
assert rejected.accepted is False
def test_validation_result_from_legacy_payload_maps_to_status() -> None:
accepted = ValidationResult.from_dict({"passed": True, "score": 0.9, "validator": "legacy"})
low_score = ValidationResult.from_dict({"passed": True, "score": 0.7, "validator": "legacy"})
rejected = ValidationResult.from_dict({"passed": False, "score": 0.2, "validator": "legacy"})
assert accepted is not None
assert accepted.status == "accepted"
assert low_score is not None
assert low_score.status == "rejected"
assert rejected is not None
assert rejected.status == "rejected"
def test_validation_result_rejects_unknown_status() -> None:
with pytest.raises(ValueError, match="unknown validation status"):
ValidationResult(status="pending", score=0.9, validator="test") # type: ignore[arg-type]
def test_validation_result_from_dict_rejects_unknown_explicit_status() -> None:
with pytest.raises(ValueError, match="unknown validation status"):
ValidationResult.from_dict({"status": "pending", "passed": True, "score": 0.9})
def test_validation_result_evidence_gaps_round_trip() -> None:
validation = ValidationResult(
status="insufficient_evidence",
score=0.4,
evidence_gaps=["missing command output", "missing file reference"],
validator="test",
)
restored = ValidationResult.from_dict(validation.to_dict())
assert restored is not None
assert restored.status == "insufficient_evidence"
assert restored.evidence_gaps == ["missing command output", "missing file reference"]
assert restored.to_dict()["evidence_gaps"] == ["missing command output", "missing file reference"]
def test_task_record_status_helpers_distinguish_review_and_failed() -> None:
needs_review = _task_record("needs_review")
failed = _task_record("failed")
assert needs_review.is_open is True
assert needs_review.is_execution_active is False
assert needs_review.requires_user_action is True
assert failed.is_open is False
assert failed.is_execution_active is False
assert failed.requires_user_action is False
def test_task_service_api_payload_emits_status_helpers(tmp_path: Path) -> None:
service = TaskService(tmp_path)
task = _task_record("needs_review")
payload = service.to_api_dict(task)
assert payload["is_open"] is True
assert payload["is_execution_active"] is False
assert payload["requires_user_action"] is True
def test_validation_failure_retries_once(tmp_path: Path) -> None: def test_validation_failure_retries_once(tmp_path: Path) -> None:
service = AgentService( service = AgentService(
loader=EngineLoader( loader=EngineLoader(
@ -616,10 +708,45 @@ def test_task_mode_team_plan_runs_subagent_then_main_synthesis(tmp_path: Path) -
assert result.run_id == task.run_ids[-1] assert result.run_id == task.run_ids[-1]
assert any(event.event_type == "task_execution_planned" for event in events) assert any(event.event_type == "task_execution_planned" for event in events)
assert any(event.event_type == "task_team_run_completed" for event in events) assert any(event.event_type == "task_team_run_completed" for event in events)
assert "sub-agent evidence" in main_provider.calls[0][0]["content"] assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "sub-agent evidence" != result.output_text assert "sub-agent evidence" != result.output_text
def test_task_mode_team_synthesis_runs_without_tools_and_receives_evidence(tmp_path: Path) -> None:
main_provider = StubProvider(
[
LLMResponse(content="final synthesized answer", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
sub_provider = StubProvider(
[
LLMResponse(content="sub-agent evidence", finish_reason="stop", provider_name="stub", model="stub-model")
]
)
validation = StubValidationService([ValidationResult(status="accepted", score=0.9, validator="test")])
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=StubTaskExecutionPlanner([_team_plan()]),
validation_service=validation,
)
)
result = asyncio.run(
service.process_direct(
"implement team-backed workflow",
session_id="web:team-no-tools",
provider_bundle=_provider_bundle(main_provider),
team_provider_bundle_factory=lambda node: _provider_bundle(sub_provider),
)
)
assert result.output_text == "final synthesized answer"
assert main_provider.calls[0]["tools"] is None
assert "sub-agent evidence" in main_provider.calls[0]["messages"][0]["content"]
assert "Task evidence packet" in validation.calls[0]["evidence_text"]
def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None: def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> None:
main_provider = StubProvider( main_provider = StubProvider(
[ [
@ -647,9 +774,48 @@ def test_task_mode_team_failure_still_uses_main_synthesis(tmp_path: Path) -> Non
assert result.output_text == "fallback synthesized answer" assert result.output_text == "fallback synthesized answer"
assert any(event.event_type == "task_team_run_failed" for event in events) assert any(event.event_type == "task_team_run_failed" for event in events)
assert "sub-agent unavailable" in main_provider.calls[0][0]["content"] assert "sub-agent unavailable" in main_provider.calls[0]["messages"][0]["content"]
assert "same class of tools fails repeatedly" in main_provider.calls[0][0]["content"] assert "same class of tools fails repeatedly" in main_provider.calls[0]["messages"][0]["content"]
assert "user-visible fallback answer" in main_provider.calls[0][0]["content"] assert "user-visible fallback answer" in main_provider.calls[0]["messages"][0]["content"]
def test_insufficient_evidence_moves_task_to_needs_review(tmp_path: Path) -> None:
service = AgentService(
loader=EngineLoader(
workspace=tmp_path,
task_execution_planner=_single_planner(),
validation_service=StubValidationService(
[
ValidationResult(
status="insufficient_evidence",
score=0.4,
evidence_gaps=["source missing"],
validator="test",
)
]
),
)
)
result = asyncio.run(
service.process_direct(
"answer with uncertain evidence",
session_id="web:needs-review",
provider_bundle=_bundle("possible answer"),
)
)
loaded = service.create_loop().boot()
task = loaded.task_service.get_task(result.task_id)
events = loaded.session_manager.get_run_event_records(result.session_id, result.run_id)
validation_event = next(event for event in events if event.event_type == "task_validation_snapshotted")
assert task is not None
assert task.status == "needs_review"
assert task.requires_user_action is True
assert task.is_execution_active is False
assert validation_event.event_payload["validation_result"]["status"] == "insufficient_evidence"
assert validation_event.event_payload["retry_scheduled"] is False
assert validation_event.event_payload["validation_debug"]["tool_result_count"] >= 0
def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None: def test_task_mode_team_retry_hides_first_synthesis_run(tmp_path: Path) -> None:
@ -763,5 +929,6 @@ def test_llm_validator_parse_failure_is_not_accepted(tmp_path: Path) -> None:
) )
assert validation.accepted is False assert validation.accepted is False
assert validation.status == "validator_error"
assert validation.validator == "llm_error" assert validation.validator == "llm_error"
assert validation.issues assert validation.issues

View File

@ -30,6 +30,15 @@ class StubAgentService(AgentService):
self.fail = fail self.fail = fail
self.calls: list[dict[str, Any]] = [] self.calls: list[dict[str, Any]] = []
async def process_direct(self, message: str, **kwargs: Any) -> StubRunResult: # type: ignore[override]
self.calls.append({"message": message, **kwargs})
if self.fail:
raise RuntimeError("boom")
return StubRunResult(
session_id=kwargs.get("session_id") or "web:default",
output_text=f"echo:{message}",
)
async def submit_direct(self, message: str, **kwargs: Any) -> StubRunResult: # type: ignore[override] async def submit_direct(self, message: str, **kwargs: Any) -> StubRunResult: # type: ignore[override]
self.calls.append({"message": message, **kwargs}) self.calls.append({"message": message, **kwargs})
if self.fail: if self.fail:
@ -40,6 +49,11 @@ class StubAgentService(AgentService):
) )
class DirectModeOnlyAgentService(StubAgentService):
async def submit_direct(self, message: str, **kwargs: Any) -> StubRunResult: # type: ignore[override]
raise RuntimeError("submit_direct should not be used when service is not running")
def test_websocket_ping_pong() -> None: def test_websocket_ping_pong() -> None:
app = create_app(service=StubAgentService(), manage_service_lifecycle=False) app = create_app(service=StubAgentService(), manage_service_lifecycle=False)
@ -101,6 +115,64 @@ def test_websocket_message_returns_chat_metadata_and_session_updated() -> None:
} }
def test_websocket_message_uses_direct_processing_when_loop_is_not_running() -> None:
service = DirectModeOnlyAgentService()
app = create_app(service=service, manage_service_lifecycle=False)
with TestClient(app) as client:
with client.websocket_connect("/ws/web:alpha") as websocket:
websocket.send_json({"type": "message", "content": "hello"})
assert websocket.receive_json() == {"type": "status", "status": "thinking"}
message = websocket.receive_json()
assert service.calls == [
{
"message": "hello",
"session_id": "web:alpha",
"source": "websocket",
"user_id": None,
"title": None,
"execution_context": None,
"model": None,
"provider_name": None,
"embedding_model": None,
"max_tool_iterations": None,
}
]
assert message["type"] == "message"
assert message["content"] == "echo:hello"
def test_rest_chat_uses_direct_processing_when_loop_is_not_running() -> None:
service = DirectModeOnlyAgentService()
app = create_app(service=service, manage_service_lifecycle=False)
with TestClient(app) as client:
response = client.post("/api/chat", json={"session_id": "web:alpha", "message": "hello"})
assert response.status_code == 200
assert service.calls == [
{
"message": "hello",
"session_id": "web:alpha",
"source": "web",
"user_id": None,
"title": None,
"execution_context": None,
"model": None,
"provider_name": None,
"embedding_model": None,
"temperature": None,
"max_tokens": None,
"max_tool_iterations": None,
"fallback_target": None,
"auxiliary_target": None,
"embedding_target": None,
}
]
assert response.json()["output_text"] == "echo:hello"
def test_websocket_empty_content_returns_error_without_runtime_call() -> None: def test_websocket_empty_content_returns_error_without_runtime_call() -> None:
service = StubAgentService() service = StubAgentService()
app = create_app(service=service, manage_service_lifecycle=False) app = create_app(service=service, manage_service_lifecycle=False)

View File

@ -5,6 +5,7 @@ import React, { useCallback, useEffect, useLayoutEffect, useMemo, useRef, useSta
import { Brain, Plus, Send, Trash2, X } from 'lucide-react'; import { Brain, Plus, Send, Trash2, X } from 'lucide-react';
import { ChatWorkbench } from '@/components/chat-workbench/ChatWorkbench'; import { ChatWorkbench } from '@/components/chat-workbench/ChatWorkbench';
import { CurrentSessionProgressSidebar } from '@/components/chat-workbench/CurrentSessionProgressSidebar';
import { ScrollArea } from '@/components/ui/scroll-area'; import { ScrollArea } from '@/components/ui/scroll-area';
import { import {
archiveSession, archiveSession,
@ -18,9 +19,10 @@ import {
uploadFile, uploadFile,
wsManager, wsManager,
} from '@/lib/api'; } from '@/lib/api';
import { mergeServerWithPendingUsers } from '@/lib/chat-messages'; import { mergeServerWithPendingUsers, shouldMergePendingUsers } from '@/lib/chat-messages';
import { pickAppText } from '@/lib/i18n/core'; import { pickAppText } from '@/lib/i18n/core';
import { useAppI18n } from '@/lib/i18n/provider'; import { useAppI18n } from '@/lib/i18n/provider';
import { buildSessionProgressView } from '@/lib/session-progress';
import { useChatStore } from '@/lib/store'; import { useChatStore } from '@/lib/store';
import type { ActiveTask, ChatMessage, FileAttachment, SessionUpdatedEvent, WsEvent } from '@/types'; import type { ActiveTask, ChatMessage, FileAttachment, SessionUpdatedEvent, WsEvent } from '@/types';
@ -39,10 +41,10 @@ const THINKING_MODE_STORAGE_KEY = 'beaver_chat_thinking_enabled';
function loadThinkingModePreference(): boolean { function loadThinkingModePreference(): boolean {
if (typeof window === 'undefined') { if (typeof window === 'undefined') {
return true; return false;
} }
const stored = window.localStorage.getItem(THINKING_MODE_STORAGE_KEY); const stored = window.localStorage.getItem(THINKING_MODE_STORAGE_KEY);
return stored == null ? true : stored !== 'false'; return stored == null ? false : stored !== 'false';
} }
export default function ChatPage() { export default function ChatPage() {
@ -60,6 +62,9 @@ export default function ChatPage() {
setSessionId, setSessionId,
setMessages, setMessages,
addMessage, addMessage,
setInputDraft,
getInputDraft,
clearInputDraft,
setIsLoading, setIsLoading,
clearMessages, clearMessages,
setIsThinking, setIsThinking,
@ -68,7 +73,7 @@ export default function ChatPage() {
updateMessageFeedback, updateMessageFeedback,
} = useChatStore(); } = useChatStore();
const [input, setInput] = useState(''); const [input, setInput] = useState(() => useChatStore.getState().getInputDraft(useChatStore.getState().sessionId));
const [thinkingModeEnabled, setThinkingModeEnabled] = useState(loadThinkingModePreference); const [thinkingModeEnabled, setThinkingModeEnabled] = useState(loadThinkingModePreference);
const [pendingFiles, setPendingFiles] = useState<Array<{ file: File; id?: string; progress: number; error?: string }>>([]); const [pendingFiles, setPendingFiles] = useState<Array<{ file: File; id?: string; progress: number; error?: string }>>([]);
const [activeTask, setActiveTask] = useState<ActiveTask | null>(null); const [activeTask, setActiveTask] = useState<ActiveTask | null>(null);
@ -105,6 +110,17 @@ export default function ChatPage() {
); );
const selectedSessionRunId = selectedRunId && sessionRunIds.has(selectedRunId) ? selectedRunId : null; const selectedSessionRunId = selectedRunId && sessionRunIds.has(selectedRunId) ? selectedRunId : null;
const sessionProgressView = useMemo(
() =>
buildSessionProgressView({
sessionId,
processRuns,
processEvents,
processArtifacts,
locale,
}),
[locale, processArtifacts, processEvents, processRuns, sessionId]
);
const loadSessions = useCallback(async () => { const loadSessions = useCallback(async () => {
try { try {
@ -141,7 +157,8 @@ export default function ChatPage() {
setSessionProcess(key, process); setSessionProcess(key, process);
} }
void loadActiveTask(key); void loadActiveTask(key);
const nextMessages = waitingForReply const shouldMergePending = shouldMergePendingUsers(detail.messages, localSnapshot, waitingForReply);
const nextMessages = shouldMergePending
? mergeServerWithPendingUsers(detail.messages, localSnapshot) ? mergeServerWithPendingUsers(detail.messages, localSnapshot)
: detail.messages; : detail.messages;
setMessages(nextMessages); setMessages(nextMessages);
@ -167,6 +184,7 @@ export default function ChatPage() {
} }
setActiveTask(null); setActiveTask(null);
setRevisionTargetRunId(null); setRevisionTargetRunId(null);
setInput(useChatStore.getState().getInputDraft(sessionId));
void loadSessionMessages(sessionId); void loadSessionMessages(sessionId);
void loadActiveTask(sessionId); void loadActiveTask(sessionId);
}, [clearMessages, loadActiveTask, loadSessionMessages, sessionId, setIsLoading, setIsThinking]); }, [clearMessages, loadActiveTask, loadSessionMessages, sessionId, setIsLoading, setIsThinking]);
@ -308,6 +326,7 @@ export default function ChatPage() {
} }
setInput(''); setInput('');
clearInputDraft(sessionId);
setPendingFiles([]); setPendingFiles([]);
addMessage({ addMessage({
role: 'user', role: 'user',
@ -372,7 +391,7 @@ export default function ChatPage() {
}); });
} }
} }
}, [addMessage, input, isLoading, loadActiveTask, loadSessionMessages, loadSessions, locale, pendingFiles, revisionTargetRunId, sessionId, setIsLoading, setIsThinking, setSessionProcess, thinkingModeEnabled, updateMessageFeedback]); }, [addMessage, clearInputDraft, input, isLoading, loadActiveTask, loadSessionMessages, loadSessions, locale, pendingFiles, revisionTargetRunId, sessionId, setIsLoading, setIsThinking, setSessionProcess, thinkingModeEnabled, updateMessageFeedback]);
const handleFeedback = useCallback(async (runId: string, feedbackType: 'satisfied' | 'revise' | 'abandon', comment?: string) => { const handleFeedback = useCallback(async (runId: string, feedbackType: 'satisfied' | 'revise' | 'abandon', comment?: string) => {
updateMessageFeedback(runId, feedbackType); updateMessageFeedback(runId, feedbackType);
@ -433,6 +452,8 @@ export default function ChatPage() {
setSelectedRunId(null); setSelectedRunId(null);
setActiveTask(null); setActiveTask(null);
setRevisionTargetRunId(null); setRevisionTargetRunId(null);
clearInputDraft(id);
setInput('');
clearMessages(); clearMessages();
useChatStore.getState().resetProcessState(); useChatStore.getState().resetProcessState();
try { try {
@ -452,6 +473,8 @@ export default function ChatPage() {
setSessionId('web:default'); setSessionId('web:default');
setActiveTask(null); setActiveTask(null);
setRevisionTargetRunId(null); setRevisionTargetRunId(null);
clearInputDraft(key);
setInput(useChatStore.getState().getInputDraft('web:default'));
clearMessages(); clearMessages();
useChatStore.getState().resetProcessState(); useChatStore.getState().resetProcessState();
} }
@ -469,6 +492,7 @@ export default function ChatPage() {
setSelectedRunId(null); setSelectedRunId(null);
setActiveTask(null); setActiveTask(null);
setRevisionTargetRunId(null); setRevisionTargetRunId(null);
setInput(useChatStore.getState().getInputDraft(key));
setSessionId(key); setSessionId(key);
}; };
@ -619,7 +643,10 @@ export default function ChatPage() {
<textarea <textarea
ref={textareaRef} ref={textareaRef}
value={input} value={input}
onChange={(e) => setInput(e.target.value)} onChange={(e) => {
setInput(e.target.value);
setInputDraft(sessionId, e.target.value);
}}
onKeyDown={handleKeyDown} onKeyDown={handleKeyDown}
placeholder={ placeholder={
revisionTargetRunId revisionTargetRunId
@ -678,6 +705,8 @@ export default function ChatPage() {
</div> </div>
</div> </div>
</div> </div>
{sessionProgressView && <CurrentSessionProgressSidebar view={sessionProgressView} />}
</div> </div>
); );
} }

View File

@ -0,0 +1,324 @@
'use client';
import React from 'react';
import {
AlertCircle,
CheckCircle2,
Circle,
FileJson,
FileOutput,
FileText,
Image as ImageIcon,
Link2,
ListChecks,
Loader2,
PanelRightOpen,
X,
} from 'lucide-react';
import { ScrollArea } from '@/components/ui/scroll-area';
import { appStatusLabel } from '@/lib/i18n/common';
import { pickAppText } from '@/lib/i18n/core';
import { useAppI18n } from '@/lib/i18n/provider';
import type {
SessionProgressArtifactView,
SessionProgressStepView,
SessionProgressView,
} from '@/lib/session-progress';
import type { ProcessArtifact, ProcessRunStatus } from '@/types';
function formatShortTime(value: string, locale: 'zh-CN' | 'en-US') {
const date = new Date(value);
if (Number.isNaN(date.getTime())) return value;
return new Intl.DateTimeFormat(locale, {
hour: '2-digit',
minute: '2-digit',
}).format(date);
}
function statusTone(status: ProcessRunStatus) {
if (status === 'done') return 'text-[#2F8D50] bg-[#E3F1E7] border-[#B8D9C2]';
if (status === 'running') return 'text-[#2F6FCA] bg-[#E7EEF9] border-[#B8CBE8]';
if (status === 'error') return 'text-[#8A3A2D] bg-[#F0E5E1] border-[#D9BDB4]';
if (status === 'cancelled') return 'text-[#6A5E58] bg-[#ECE8E5] border-[#D8D2CE]';
return 'text-[#6A5E58] bg-[#F0ECE9] border-[#D8D2CE]';
}
function StepMarker({ step, index }: { step: SessionProgressStepView; index: number }) {
if (step.status === 'done') {
return (
<span className="flex h-6 w-6 items-center justify-center rounded-full bg-[#2F8D50] text-white">
<CheckCircle2 className="h-4 w-4" />
</span>
);
}
if (step.status === 'running') {
return (
<span className="flex h-6 w-6 items-center justify-center rounded-full bg-[#2F6FCA] text-[11px] font-semibold text-white">
{index + 1}
</span>
);
}
if (step.status === 'error') {
return (
<span className="flex h-6 w-6 items-center justify-center rounded-full bg-[#8A3A2D] text-white">
<AlertCircle className="h-4 w-4" />
</span>
);
}
return (
<span className="flex h-6 w-6 items-center justify-center rounded-full bg-[#D8D2CE] text-[#6A5E58]">
<Circle className="h-3.5 w-3.5" />
</span>
);
}
function artifactIcon(type: ProcessArtifact['artifact_type']) {
if (type === 'json') return <FileJson className="h-4 w-4" />;
if (type === 'image') return <ImageIcon className="h-4 w-4" />;
if (type === 'link') return <Link2 className="h-4 w-4" />;
if (type === 'markdown' || type === 'text') return <FileText className="h-4 w-4" />;
return <FileOutput className="h-4 w-4" />;
}
function ProgressHeader({ view }: { view: SessionProgressView }) {
const { locale } = useAppI18n();
const percent = view.progress.percent;
return (
<section className="rounded-lg border border-[#ECE7E3] bg-white px-4 py-4 shadow-[0_8px_24px_rgba(0,0,0,0.04)]">
<div className="flex items-start gap-3">
<div className="mt-0.5 flex h-8 w-8 shrink-0 items-center justify-center rounded-full bg-[#E3F1E7] text-[#2F8D50]">
<ListChecks className="h-4 w-4" />
</div>
<div className="min-w-0 flex-1">
<div className="line-clamp-2 text-sm font-semibold text-foreground">{view.title}</div>
<div className="mt-2 flex items-center gap-2">
<span className={`rounded-full border px-2 py-0.5 text-[11px] font-medium ${statusTone(view.status)}`}>
{appStatusLabel(view.status, locale)}
</span>
<span className="text-[11px] text-muted-foreground">
{pickAppText(locale, '更新于', 'Updated')} {formatShortTime(view.updatedAt, locale)}
</span>
</div>
</div>
</div>
<div className="mt-4">
<div className="mb-2 flex items-center justify-between gap-3 text-xs text-muted-foreground">
<span>{view.progress.label}</span>
{percent !== null && <span className="font-medium text-foreground">{percent}%</span>}
</div>
<div className="h-2 overflow-hidden rounded-full bg-[#ECE8E5]">
<div
className="h-full rounded-full bg-[#5DB56F] transition-all"
style={{ width: `${percent ?? 0}%` }}
/>
</div>
</div>
{view.summary && (
<p className="mt-3 line-clamp-3 text-xs leading-5 text-muted-foreground">{view.summary}</p>
)}
</section>
);
}
function StepList({ steps }: { steps: SessionProgressStepView[] }) {
const { locale } = useAppI18n();
return (
<section className="rounded-lg border border-[#ECE7E3] bg-white px-4 py-4">
<div className="mb-4 flex items-center justify-between">
<h3 className="text-sm font-semibold text-foreground">
{pickAppText(locale, '运行步骤', 'Run Steps')}
</h3>
<span className="text-xs text-muted-foreground">
{pickAppText(locale, `${steps.length}`, `${steps.length} steps`)}
</span>
</div>
<div className="space-y-0">
{steps.map((step, index) => (
<div key={step.runId} className="grid grid-cols-[24px_1fr] gap-3">
<div className="flex flex-col items-center">
<StepMarker step={step} index={index} />
{index < steps.length - 1 && <span className="mt-2 h-full min-h-8 w-px bg-[#E6E1DE]" />}
</div>
<div className="pb-5">
<div className="flex items-start justify-between gap-3">
<div className="min-w-0">
<div className="line-clamp-2 text-sm font-medium text-foreground">
{index + 1}. {step.title}
</div>
<div className="mt-1 text-[11px] text-muted-foreground">
{step.actorName} · {formatShortTime(step.updatedAt, locale)}
</div>
</div>
<span className={`shrink-0 rounded-full border px-2 py-0.5 text-[11px] ${statusTone(step.status)}`}>
{appStatusLabel(step.status, locale)}
</span>
</div>
{step.description && (
<p className="mt-2 line-clamp-3 text-xs leading-5 text-muted-foreground">
{step.description}
</p>
)}
{step.status === 'running' && (
<div className="mt-2 flex items-center gap-1.5 text-[11px] text-[#2F6FCA]">
<Loader2 className="h-3 w-3 animate-spin" />
<span>{pickAppText(locale, '正在处理', 'In progress')}</span>
</div>
)}
</div>
</div>
))}
</div>
</section>
);
}
function ArtifactRow({ artifact }: { artifact: SessionProgressArtifactView }) {
return (
<a
href={artifact.url || undefined}
target={artifact.url ? '_blank' : undefined}
rel={artifact.url ? 'noreferrer' : undefined}
className="block rounded-lg border border-[#ECE7E3] bg-[#FDFDFC] px-3 py-3 transition-colors hover:bg-[#F7F6F5]"
>
<div className="flex items-start gap-3">
<div className="flex h-8 w-8 shrink-0 items-center justify-center rounded-full bg-[#ECE8E5] text-[#5F5550]">
{artifactIcon(artifact.type)}
</div>
<div className="min-w-0 flex-1">
<div className="truncate text-sm font-medium text-foreground">{artifact.title}</div>
<div className="mt-1 text-[11px] text-muted-foreground">
{artifact.actorName} · {artifact.typeLabel}
</div>
<p className="mt-2 line-clamp-2 text-xs leading-5 text-muted-foreground">{artifact.preview}</p>
</div>
</div>
</a>
);
}
function ArtifactSection({ view }: { view: SessionProgressView }) {
const { locale } = useAppI18n();
return (
<section className="rounded-lg border border-[#ECE7E3] bg-white px-4 py-4">
<div className="mb-3 flex items-center justify-between">
<h3 className="text-sm font-semibold text-foreground">
{pickAppText(locale, '生成内容', 'Generated Content')}
</h3>
<span className="text-xs text-muted-foreground">
{pickAppText(locale, `${view.artifacts.length}`, `${view.artifacts.length} items`)}
</span>
</div>
{view.artifactTypeSummaries.length > 0 ? (
<div className="mb-3 flex flex-wrap gap-2">
{view.artifactTypeSummaries.map((item) => (
<span
key={item.type}
className="inline-flex items-center gap-1.5 rounded-full border border-[#E6E1DE] bg-[#F7F6F5] px-2.5 py-1 text-xs text-[#4F4642]"
>
{artifactIcon(item.type)}
<span>{item.label}</span>
<span className="font-semibold">{item.count}</span>
</span>
))}
</div>
) : (
<p className="mb-3 text-xs text-muted-foreground">
{pickAppText(locale, '暂时还没有生成内容。', 'No generated content yet.')}
</p>
)}
<div className="space-y-2">
{view.artifacts.map((artifact) => (
<ArtifactRow key={artifact.artifactId} artifact={artifact} />
))}
</div>
</section>
);
}
function ProgressPanel({
view,
onClose,
}: {
view: SessionProgressView;
onClose?: () => void;
}) {
const { locale } = useAppI18n();
return (
<div className="flex h-full flex-col bg-[#FBFAF9]">
<div className="flex h-16 shrink-0 items-center justify-between border-b border-[#E6E1DE] px-5">
<div>
<h2 className="text-base font-semibold text-foreground">
{pickAppText(locale, '当前会话的运行进度', 'Current Session Progress')}
</h2>
<p className="text-xs text-muted-foreground">
{pickAppText(locale, '任务列表会自动刷新', 'Task updates refresh automatically')}
</p>
</div>
{onClose && (
<button
type="button"
onClick={onClose}
className="rounded-full p-2 text-muted-foreground transition-colors hover:bg-[#ECE8E5] hover:text-foreground"
aria-label={pickAppText(locale, '关闭进度面板', 'Close progress panel')}
>
<X className="h-4 w-4" />
</button>
)}
</div>
<ScrollArea className="min-h-0 flex-1 px-4 py-4">
<div className="space-y-4 pb-6">
<ProgressHeader view={view} />
<StepList steps={view.steps} />
<ArtifactSection view={view} />
</div>
</ScrollArea>
</div>
);
}
export function CurrentSessionProgressSidebar({ view }: { view: SessionProgressView }) {
const { locale } = useAppI18n();
const [mobileOpen, setMobileOpen] = React.useState(false);
return (
<>
<aside className="hidden h-full w-[380px] shrink-0 border-l border-[#E6E1DE] xl:flex">
<ProgressPanel view={view} />
</aside>
<button
type="button"
onClick={() => setMobileOpen(true)}
className="fixed right-3 top-24 z-40 flex h-11 w-11 items-center justify-center rounded-full border border-[#E6E1DE] bg-white text-[#342E2B] shadow-[0_8px_22px_rgba(0,0,0,0.16)] transition-colors hover:bg-[#F7F6F5] xl:hidden"
aria-label={pickAppText(locale, '查看当前会话运行进度', 'View current session progress')}
>
<PanelRightOpen className="h-5 w-5" />
</button>
{mobileOpen && (
<div className="fixed inset-0 z-50 xl:hidden">
<button
type="button"
className="absolute inset-0 bg-black/30"
onClick={() => setMobileOpen(false)}
aria-label={pickAppText(locale, '关闭进度面板', 'Close progress panel')}
/>
<div className="absolute inset-y-0 right-0 w-[min(92vw,390px)] border-l border-[#E6E1DE] shadow-2xl">
<ProgressPanel view={view} onClose={() => setMobileOpen(false)} />
</div>
</div>
)}
</>
);
}

View File

@ -1,6 +1,6 @@
import { describe, expect, it } from 'vitest'; import { describe, expect, it } from 'vitest';
import { getTaskCardMessageIndexes, mergeServerWithPendingUsers } from '@/lib/chat-messages'; import { getTaskCardMessageIndexes, mergeServerWithPendingUsers, shouldMergePendingUsers } from '@/lib/chat-messages';
import type { ChatMessage } from '@/types'; import type { ChatMessage } from '@/types';
describe('chat message helpers', () => { describe('chat message helpers', () => {
@ -46,6 +46,26 @@ describe('chat message helpers', () => {
expect(mergeServerWithPendingUsers(serverMessages, localMessages)).toEqual(serverMessages); expect(mergeServerWithPendingUsers(serverMessages, localMessages)).toEqual(serverMessages);
}); });
it('merges pending user messages when local state has an unpersisted trailing user turn', () => {
const serverMessages: ChatMessage[] = [
{
role: 'assistant',
content: 'Earlier answer',
timestamp: '2026-05-21T08:00:00.000Z',
},
];
const localMessages: ChatMessage[] = [
...serverMessages,
{
role: 'user',
content: 'Do this long task',
timestamp: '2026-05-21T08:01:00.000Z',
},
];
expect(shouldMergePendingUsers(serverMessages, localMessages, false)).toBe(true);
});
it('shows a task card only on the latest assistant message for the same task', () => { it('shows a task card only on the latest assistant message for the same task', () => {
const messages: ChatMessage[] = [ const messages: ChatMessage[] = [
{ {

View File

@ -30,6 +30,42 @@ export function mergeServerWithPendingUsers(serverMessages: ChatMessage[], local
return [...serverMessages, ...pendingUsers]; return [...serverMessages, ...pendingUsers];
} }
export function shouldMergePendingUsers(
serverMessages: ChatMessage[],
localMessages: ChatMessage[],
waitingForReply: boolean
): boolean {
if (waitingForReply) {
return true;
}
const lastLocal = localMessages[localMessages.length - 1];
if (lastLocal?.role !== 'user') {
return false;
}
const counts = new Map<string, number>();
for (const message of serverMessages) {
const key = messageFingerprint(message);
counts.set(key, (counts.get(key) ?? 0) + 1);
}
for (const message of localMessages) {
if (message.role !== 'user') {
continue;
}
const key = messageFingerprint(message);
const count = counts.get(key) ?? 0;
if (count > 0) {
counts.set(key, count - 1);
continue;
}
return true;
}
return false;
}
export function getTaskCardMessageIndexes(messages: ChatMessage[]): Set<number> { export function getTaskCardMessageIndexes(messages: ChatMessage[]): Set<number> {
const latestByTask = new Map<string, number>(); const latestByTask = new Map<string, number>();

View File

@ -0,0 +1,201 @@
import { describe, expect, it } from 'vitest';
import { buildSessionProgressView } from '@/lib/session-progress';
import type { ProcessArtifact, ProcessEvent, ProcessRun } from '@/types';
describe('session progress view builder', () => {
it('selects the latest active root run for the current session and builds its run tree', () => {
const processRuns: ProcessRun[] = [
{
run_id: 'old-root',
parent_run_id: null,
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'main',
actor_name: 'Main Agent',
title: '旧任务',
status: 'done',
started_at: '2026-05-22T08:00:00.000Z',
finished_at: '2026-05-22T08:05:00.000Z',
},
{
run_id: 'latest-root',
parent_run_id: null,
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'main',
actor_name: 'Main Agent',
title: '销售数据分析报告生成',
status: 'running',
started_at: '2026-05-22T09:00:00.000Z',
metadata: {
step_index: 3,
step_total: 5,
},
},
{
run_id: 'collect-data',
parent_run_id: 'latest-root',
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'collector',
actor_name: 'Data Agent',
title: '收集销售数据',
status: 'done',
started_at: '2026-05-22T09:01:00.000Z',
finished_at: '2026-05-22T09:03:00.000Z',
summary: '已获取 Q1 销售数据',
},
{
run_id: 'clean-data',
parent_run_id: 'latest-root',
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'cleaner',
actor_name: 'Cleaning Agent',
title: '数据清洗与预处理',
status: 'running',
started_at: '2026-05-22T09:04:00.000Z',
},
{
run_id: 'other-root',
parent_run_id: null,
session_id: 'web:other',
actor_type: 'agent',
actor_id: 'main',
actor_name: 'Main Agent',
title: '其他会话任务',
status: 'running',
started_at: '2026-05-22T10:00:00.000Z',
},
];
const processEvents: ProcessEvent[] = [
{
event_id: 'evt-clean',
run_id: 'clean-data',
parent_run_id: 'latest-root',
kind: 'run_progress',
actor_type: 'agent',
actor_id: 'cleaner',
actor_name: 'Cleaning Agent',
text: '清洗缺失值、异常值,统一格式',
created_at: '2026-05-22T09:05:00.000Z',
},
];
const processArtifacts: ProcessArtifact[] = [
{
artifact_id: 'artifact-json',
run_id: 'collect-data',
actor_type: 'agent',
actor_id: 'collector',
actor_name: 'Data Agent',
title: '销售数据',
artifact_type: 'json',
data: { rows: 120 },
created_at: '2026-05-22T09:03:30.000Z',
},
{
artifact_id: 'artifact-markdown',
run_id: 'clean-data',
actor_type: 'agent',
actor_id: 'cleaner',
actor_name: 'Cleaning Agent',
title: '清洗说明',
artifact_type: 'markdown',
content: '已完成数据标准化。',
created_at: '2026-05-22T09:05:30.000Z',
},
{
artifact_id: 'artifact-other-session',
run_id: 'other-root',
actor_type: 'agent',
actor_id: 'main',
title: '其他会话产物',
artifact_type: 'text',
content: '不应出现',
created_at: '2026-05-22T10:01:00.000Z',
},
];
const view = buildSessionProgressView({
sessionId: 'web:current',
processRuns,
processEvents,
processArtifacts,
locale: 'zh-CN',
});
expect(view).not.toBeNull();
expect(view?.rootRunId).toBe('latest-root');
expect(view?.title).toBe('销售数据分析报告生成');
expect(view?.progress).toMatchObject({
value: 3,
max: 5,
percent: 60,
label: '运行中3 / 5 步',
});
expect(view?.steps.map((step) => step.runId)).toEqual(['collect-data', 'clean-data', 'latest-root']);
expect(view?.steps.find((step) => step.runId === 'clean-data')?.description).toBe('清洗缺失值、异常值,统一格式');
expect(view?.artifactTypeSummaries).toEqual([
{ type: 'json', count: 1, label: 'JSON' },
{ type: 'markdown', count: 1, label: 'Markdown' },
]);
expect(view?.artifacts.map((artifact) => artifact.artifactId)).toEqual(['artifact-markdown', 'artifact-json']);
});
it('falls back to completed child run counts when no explicit progress metadata exists', () => {
const processRuns: ProcessRun[] = [
{
run_id: 'root',
parent_run_id: null,
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'main',
actor_name: 'Main Agent',
title: '生成总结',
status: 'running',
started_at: '2026-05-22T09:00:00.000Z',
},
{
run_id: 'done-child',
parent_run_id: 'root',
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'writer',
actor_name: 'Writer',
title: '整理结果',
status: 'done',
started_at: '2026-05-22T09:01:00.000Z',
finished_at: '2026-05-22T09:02:00.000Z',
},
{
run_id: 'running-child',
parent_run_id: 'root',
session_id: 'web:current',
actor_type: 'agent',
actor_id: 'reviewer',
actor_name: 'Reviewer',
title: '复核结果',
status: 'running',
started_at: '2026-05-22T09:03:00.000Z',
},
];
const view = buildSessionProgressView({
sessionId: 'web:current',
processRuns,
processEvents: [],
processArtifacts: [],
locale: 'zh-CN',
});
expect(view?.progress).toMatchObject({
value: 1,
max: 2,
percent: 50,
label: '已完成 1 / 2 步',
});
});
});

View File

@ -0,0 +1,392 @@
import type { ProcessArtifact, ProcessEvent, ProcessRun, ProcessRunStatus } from '@/types';
import { getCurrentAppLocale, pickAppText, type AppLocale } from '@/lib/i18n/core';
const TERMINAL_STATUSES = new Set<ProcessRunStatus>(['done', 'error', 'cancelled']);
const ACTIVE_STATUSES = new Set<ProcessRunStatus>(['queued', 'running', 'waiting']);
const ARTIFACT_TYPE_ORDER: ProcessArtifact['artifact_type'][] = [
'text',
'json',
'file',
'image',
'link',
'markdown',
];
export interface SessionProgressValueView {
label: string;
value: number | null;
max: number | null;
percent: number | null;
}
export interface SessionProgressStepView {
runId: string;
title: string;
actorName: string;
status: ProcessRunStatus;
description: string | null;
startedAt: string;
updatedAt: string;
finishedAt: string | null;
artifactCount: number;
isRoot: boolean;
isCurrent: boolean;
}
export interface SessionProgressArtifactView {
artifactId: string;
runId: string;
title: string;
type: ProcessArtifact['artifact_type'];
typeLabel: string;
actorName: string;
preview: string;
createdAt: string;
url?: string;
}
export interface SessionProgressArtifactTypeSummary {
type: ProcessArtifact['artifact_type'];
count: number;
label: string;
}
export interface SessionProgressView {
rootRunId: string;
title: string;
status: ProcessRunStatus;
summary: string | null;
updatedAt: string;
progress: SessionProgressValueView;
steps: SessionProgressStepView[];
artifacts: SessionProgressArtifactView[];
artifactTypeSummaries: SessionProgressArtifactTypeSummary[];
}
export type BuildSessionProgressInput = {
sessionId: string;
processRuns: ProcessRun[];
processEvents: ProcessEvent[];
processArtifacts: ProcessArtifact[];
locale?: AppLocale;
};
function toTime(value?: string | null): number | null {
if (!value) return null;
const parsed = new Date(value).getTime();
return Number.isFinite(parsed) ? parsed : null;
}
function latestTimestamp(values: Array<string | null | undefined>): string | null {
let selected: string | null = null;
let selectedTime = -1;
for (const value of values) {
const time = toTime(value);
if (time === null || time <= selectedTime) continue;
selected = value ?? null;
selectedTime = time;
}
return selected;
}
function compareIsoDesc(a?: string | null, b?: string | null): number {
return (toTime(b) ?? 0) - (toTime(a) ?? 0);
}
function firstNumber(metadata: Record<string, unknown> | undefined, keys: string[]): number | null {
for (const key of keys) {
const value = metadata?.[key];
if (typeof value === 'number' && Number.isFinite(value)) return value;
}
return null;
}
function buildChildrenMap(processRuns: ProcessRun[]): Map<string, ProcessRun[]> {
const map = new Map<string, ProcessRun[]>();
for (const run of processRuns) {
if (!run.parent_run_id) continue;
const children = map.get(run.parent_run_id);
if (children) {
children.push(run);
} else {
map.set(run.parent_run_id, [run]);
}
}
return map;
}
function collectRunTree(rootRun: ProcessRun, childrenMap: Map<string, ProcessRun[]>): ProcessRun[] {
const collected: ProcessRun[] = [];
const stack = [rootRun];
const seen = new Set<string>();
while (stack.length > 0) {
const current = stack.pop();
if (!current || seen.has(current.run_id)) continue;
seen.add(current.run_id);
collected.push(current);
const children = childrenMap.get(current.run_id) ?? [];
for (let index = children.length - 1; index >= 0; index -= 1) {
stack.push(children[index]);
}
}
return collected;
}
function groupByRunId<T extends { run_id: string }>(items: T[]): Map<string, T[]> {
const map = new Map<string, T[]>();
for (const item of items) {
const existing = map.get(item.run_id);
if (existing) {
existing.push(item);
} else {
map.set(item.run_id, [item]);
}
}
return map;
}
function getRunUpdatedAt(
run: ProcessRun,
eventsByRun: Map<string, ProcessEvent[]>,
artifactsByRun: Map<string, ProcessArtifact[]>,
): string {
return (
latestTimestamp([
run.finished_at,
run.started_at,
...(eventsByRun.get(run.run_id) ?? []).map((event) => event.created_at),
...(artifactsByRun.get(run.run_id) ?? []).map((artifact) => artifact.created_at),
]) ?? run.started_at
);
}
function getTreeUpdatedAt(
runs: ProcessRun[],
eventsByRun: Map<string, ProcessEvent[]>,
artifactsByRun: Map<string, ProcessArtifact[]>,
): string {
return latestTimestamp(runs.map((run) => getRunUpdatedAt(run, eventsByRun, artifactsByRun))) ?? runs[0]?.started_at ?? '';
}
function latestEventText(events: ProcessEvent[]): string | null {
const event = [...events]
.filter((item) => item.text?.trim())
.sort((a, b) => compareIsoDesc(a.created_at, b.created_at))[0];
return event?.text?.trim() || null;
}
function percent(value: number, max: number): number {
return Math.max(0, Math.min(100, Math.round((value / max) * 100)));
}
function explicitProgress(
rootRun: ProcessRun,
treeEvents: ProcessEvent[],
locale: AppLocale,
): SessionProgressValueView | null {
const metadataSources = [
rootRun.metadata,
...[...treeEvents]
.sort((a, b) => compareIsoDesc(a.created_at, b.created_at))
.map((event) => event.metadata),
];
for (const metadata of metadataSources) {
const stepValue = firstNumber(metadata, ['step_index']);
const stepMax = firstNumber(metadata, ['step_total']);
if (stepValue !== null && stepMax !== null && stepMax > 0) {
const safeValue = Math.min(stepValue, stepMax);
return {
label: pickAppText(locale, `运行中:${safeValue} / ${stepMax}`, `Running: ${safeValue} / ${stepMax} steps`),
value: safeValue,
max: stepMax,
percent: percent(safeValue, stepMax),
};
}
const stageValue = firstNumber(metadata, ['stage_index', 'phase_index']);
const stageMax = firstNumber(metadata, ['stage_total', 'phase_total']);
if (stageValue !== null && stageMax !== null && stageMax > 0) {
const safeValue = Math.min(stageValue, stageMax);
return {
label: pickAppText(locale, `运行中:${safeValue} / ${stageMax} 阶段`, `Running: ${safeValue} / ${stageMax} stages`),
value: safeValue,
max: stageMax,
percent: percent(safeValue, stageMax),
};
}
}
return null;
}
function fallbackProgress(taskRuns: ProcessRun[], locale: AppLocale): SessionProgressValueView {
const childRuns = taskRuns.filter((run) => run.parent_run_id);
const runsForProgress = childRuns.length > 0 ? childRuns : taskRuns;
const doneRuns = runsForProgress.filter((run) => run.status === 'done').length;
const totalRuns = runsForProgress.length;
if (totalRuns > 0) {
return {
label: pickAppText(locale, `已完成 ${doneRuns} / ${totalRuns}`, `Completed ${doneRuns} / ${totalRuns} steps`),
value: doneRuns,
max: totalRuns,
percent: percent(doneRuns, totalRuns),
};
}
return {
label: pickAppText(locale, '等待任务数据', 'Waiting for task data'),
value: null,
max: null,
percent: null,
};
}
function artifactTypeLabel(type: ProcessArtifact['artifact_type'], locale: AppLocale): string {
if (type === 'text') return pickAppText(locale, '文本', 'Text');
if (type === 'json') return 'JSON';
if (type === 'file') return pickAppText(locale, '文件', 'File');
if (type === 'image') return pickAppText(locale, '图片', 'Image');
if (type === 'link') return pickAppText(locale, '链接', 'Link');
return 'Markdown';
}
function artifactPreview(artifact: ProcessArtifact, locale: AppLocale): string {
if (artifact.content?.trim()) {
return artifact.content.trim().replace(/\s+/g, ' ').slice(0, 120);
}
if (artifact.url?.trim()) return artifact.url.trim();
if (artifact.data !== undefined) {
return JSON.stringify(artifact.data).slice(0, 120);
}
return pickAppText(locale, '暂无预览', 'No preview');
}
function buildArtifactSummaries(
artifacts: ProcessArtifact[],
locale: AppLocale,
): SessionProgressArtifactTypeSummary[] {
const counts = new Map<ProcessArtifact['artifact_type'], number>();
for (const artifact of artifacts) {
counts.set(artifact.artifact_type, (counts.get(artifact.artifact_type) ?? 0) + 1);
}
return ARTIFACT_TYPE_ORDER
.filter((type) => counts.has(type))
.map((type) => ({
type,
count: counts.get(type) ?? 0,
label: artifactTypeLabel(type, locale),
}));
}
function buildArtifactViews(
artifacts: ProcessArtifact[],
locale: AppLocale,
): SessionProgressArtifactView[] {
return [...artifacts]
.sort((a, b) => compareIsoDesc(a.created_at, b.created_at))
.map((artifact) => ({
artifactId: artifact.artifact_id,
runId: artifact.run_id,
title: artifact.title,
type: artifact.artifact_type,
typeLabel: artifactTypeLabel(artifact.artifact_type, locale),
actorName: artifact.actor_name || artifact.actor_id,
preview: artifactPreview(artifact, locale),
createdAt: artifact.created_at,
url: artifact.url,
}));
}
function buildSteps(
rootRun: ProcessRun,
taskRuns: ProcessRun[],
eventsByRun: Map<string, ProcessEvent[]>,
artifactsByRun: Map<string, ProcessArtifact[]>,
): SessionProgressStepView[] {
return [...taskRuns]
.sort((a, b) => {
if (a.run_id === rootRun.run_id) return 1;
if (b.run_id === rootRun.run_id) return -1;
return (toTime(a.started_at) ?? 0) - (toTime(b.started_at) ?? 0);
})
.map((run) => {
const runEvents = eventsByRun.get(run.run_id) ?? [];
const runArtifacts = artifactsByRun.get(run.run_id) ?? [];
return {
runId: run.run_id,
title: run.title,
actorName: run.actor_name,
status: run.status,
description: latestEventText(runEvents) || run.summary?.trim() || null,
startedAt: run.started_at,
updatedAt: getRunUpdatedAt(run, eventsByRun, artifactsByRun),
finishedAt: run.finished_at ?? null,
artifactCount: runArtifacts.length,
isRoot: run.run_id === rootRun.run_id,
isCurrent: !TERMINAL_STATUSES.has(run.status),
};
});
}
export function buildSessionProgressView({
sessionId,
processRuns,
processEvents,
processArtifacts,
locale = getCurrentAppLocale(),
}: BuildSessionProgressInput): SessionProgressView | null {
const sessionRuns = processRuns.filter((run) => run.session_id === sessionId);
const rootRuns = sessionRuns.filter((run) => !run.parent_run_id);
if (rootRuns.length === 0) return null;
const allChildrenMap = buildChildrenMap(processRuns);
const runTreeCache = new Map<string, ProcessRun[]>();
const treeForRoot = (root: ProcessRun) => {
const cached = runTreeCache.get(root.run_id);
if (cached) return cached;
const tree = collectRunTree(root, allChildrenMap).filter(
(run) => run.session_id === sessionId || run.run_id === root.run_id
);
runTreeCache.set(root.run_id, tree);
return tree;
};
const allEventsByRun = groupByRunId(processEvents);
const allArtifactsByRun = groupByRunId(processArtifacts);
const selectedRoot = [...rootRuns].sort((a, b) => {
const aActive = ACTIVE_STATUSES.has(a.status);
const bActive = ACTIVE_STATUSES.has(b.status);
if (aActive !== bActive) return aActive ? -1 : 1;
return compareIsoDesc(
getTreeUpdatedAt(treeForRoot(a), allEventsByRun, allArtifactsByRun),
getTreeUpdatedAt(treeForRoot(b), allEventsByRun, allArtifactsByRun)
);
})[0];
if (!selectedRoot) return null;
const taskRuns = treeForRoot(selectedRoot);
const taskRunIds = new Set(taskRuns.map((run) => run.run_id));
const taskEvents = processEvents.filter((event) => taskRunIds.has(event.run_id));
const taskArtifacts = processArtifacts.filter((artifact) => taskRunIds.has(artifact.run_id));
const eventsByRun = groupByRunId(taskEvents);
const artifactsByRun = groupByRunId(taskArtifacts);
const updatedAt = getTreeUpdatedAt(taskRuns, eventsByRun, artifactsByRun);
const progress = explicitProgress(selectedRoot, taskEvents, locale) ?? fallbackProgress(taskRuns, locale);
return {
rootRunId: selectedRoot.run_id,
title: selectedRoot.title,
status: selectedRoot.status,
summary: selectedRoot.summary?.trim() || latestEventText(eventsByRun.get(selectedRoot.run_id) ?? []) || null,
updatedAt,
progress,
steps: buildSteps(selectedRoot, taskRuns, eventsByRun, artifactsByRun),
artifacts: buildArtifactViews(taskArtifacts, locale),
artifactTypeSummaries: buildArtifactSummaries(taskArtifacts, locale),
};
}

View File

@ -6,6 +6,7 @@ describe('chat store process event ingestion', () => {
beforeEach(() => { beforeEach(() => {
useChatStore.setState({ useChatStore.setState({
sessionId: 'web:alpha', sessionId: 'web:alpha',
inputDrafts: {},
processRuns: [], processRuns: [],
processEvents: [], processEvents: [],
processArtifacts: [], processArtifacts: [],
@ -18,6 +19,7 @@ describe('chat store process event ingestion', () => {
afterEach(() => { afterEach(() => {
useChatStore.setState({ useChatStore.setState({
sessionId: 'web:default', sessionId: 'web:default',
inputDrafts: {},
processRuns: [], processRuns: [],
processEvents: [], processEvents: [],
processArtifacts: [], processArtifacts: [],
@ -49,4 +51,17 @@ describe('chat store process event ingestion', () => {
}), }),
]); ]);
}); });
it('stores input drafts per session', () => {
useChatStore.getState().setInputDraft('web:alpha', 'message for alpha');
useChatStore.getState().setInputDraft('web:beta', 'message for beta');
expect(useChatStore.getState().getInputDraft('web:alpha')).toBe('message for alpha');
expect(useChatStore.getState().getInputDraft('web:beta')).toBe('message for beta');
useChatStore.getState().clearInputDraft('web:alpha');
expect(useChatStore.getState().getInputDraft('web:alpha')).toBe('');
expect(useChatStore.getState().getInputDraft('web:beta')).toBe('message for beta');
});
}); });

View File

@ -36,6 +36,7 @@ interface ChatStore {
isAuthLoading: boolean; isAuthLoading: boolean;
sessionId: string; sessionId: string;
messages: ChatMessage[]; messages: ChatMessage[];
inputDrafts: Record<string, string>;
isLoading: boolean; isLoading: boolean;
streamingContent: string; streamingContent: string;
wsStatus: WsStatus; wsStatus: WsStatus;
@ -56,6 +57,9 @@ interface ChatStore {
setSessionId: (id: string) => void; setSessionId: (id: string) => void;
setMessages: (msgs: ChatMessage[]) => void; setMessages: (msgs: ChatMessage[]) => void;
addMessage: (msg: ChatMessage) => void; addMessage: (msg: ChatMessage) => void;
setInputDraft: (sessionId: string, value: string) => void;
getInputDraft: (sessionId: string) => string;
clearInputDraft: (sessionId: string) => void;
updateMessageFeedback: ( updateMessageFeedback: (
runId: string, runId: string,
feedbackState: ChatMessage['feedback_state'], feedbackState: ChatMessage['feedback_state'],
@ -126,11 +130,12 @@ function createEventId(event: ProcessWsEvent): string {
return `${event.type}:${event.run_id}:${event.created_at}:${suffix}`; return `${event.type}:${event.run_id}:${event.created_at}:${suffix}`;
} }
export const useChatStore = create<ChatStore>((set) => ({ export const useChatStore = create<ChatStore>((set, get) => ({
user: null, user: null,
isAuthLoading: true, isAuthLoading: true,
sessionId: getInitialSessionId(), sessionId: getInitialSessionId(),
messages: [], messages: [],
inputDrafts: {},
isLoading: false, isLoading: false,
streamingContent: '', streamingContent: '',
wsStatus: 'disconnected', wsStatus: 'disconnected',
@ -155,6 +160,23 @@ export const useChatStore = create<ChatStore>((set) => ({
}, },
setMessages: (msgs) => set({ messages: msgs }), setMessages: (msgs) => set({ messages: msgs }),
addMessage: (msg) => set((s) => ({ messages: [...s.messages, msg] })), addMessage: (msg) => set((s) => ({ messages: [...s.messages, msg] })),
setInputDraft: (sessionId, value) =>
set((state) => ({
inputDrafts: {
...state.inputDrafts,
[sessionId]: value,
},
})),
getInputDraft: (sessionId) => get().inputDrafts[sessionId] ?? '',
clearInputDraft: (sessionId) =>
set((state) => {
if (!(sessionId in state.inputDrafts)) {
return {};
}
const nextDrafts = { ...state.inputDrafts };
delete nextDrafts[sessionId];
return { inputDrafts: nextDrafts };
}),
updateMessageFeedback: (runId, feedbackState, error) => updateMessageFeedback: (runId, feedbackState, error) =>
set((s) => ({ set((s) => ({
messages: s.messages.map((message) => messages: s.messages.map((message) =>

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,265 @@
# Task Evidence and Validation Redesign
Date: 2026-05-22
## Context
Two recent task runs exposed the same underlying weakness from different angles:
- The agent can use complete tool results, but validation only receives truncated excerpts. A key fact can be present in the run log yet absent from validation context, causing a false rejection.
- Team execution can gather useful evidence in sub-agent runs, but that evidence is not reliably carried into final synthesis or validation. Failed team nodes are especially lossy.
- Team graphs marked `parallel` are currently scheduled through a shared single-consumer `AgentLoop`, so production execution can be effectively serial.
- Final synthesis after a team run still has full tools available, so it can repeat searches instead of synthesizing from team evidence.
- `max_tool_iterations` stops the tool loop with a placeholder message instead of forcing a final answer from already gathered evidence.
- Validation failures enter an open-looking state, which makes the UI feel like the task never completed.
The selected approach is a medium refactor: keep the existing `AgentService`, `TeamService`, and `AgentLoop` structure, but add a structured evidence pipeline, clearer validation semantics, finite team concurrency, no-tools synthesis after team runs, and explicit task states.
## Goals
- Preserve complete run evidence for synthesis and validation.
- Stop using fixed truncation for validation inputs.
- Distinguish "answer is contradicted" from "validator lacks enough evidence".
- Let user feedback be the final business judgment after an answer is shown.
- Make `parallel` team execution actually concurrent within a bounded limit.
- Prevent final synthesis from repeating team tool work by default.
- Produce a useful final answer when tool iteration limits are reached.
- Add enough debug metadata to diagnose validation decisions without reconstructing SQLite logs by hand.
## Non-Goals
- Rewriting the whole execution runtime.
- Introducing a distributed worker pool.
- Building a generic evidence bus for every future subsystem.
- Solving all provider rate-limit and storage concurrency concerns beyond the bounded local concurrency needed for team parallel nodes.
## Validation Semantics and Task States
Automatic validation becomes advisory evidence assessment, not the final user satisfaction signal.
Validation results should include:
```python
status: Literal["accepted", "rejected", "insufficient_evidence", "validator_error"]
passed: bool
score: float
issues: list[str]
missing_requirements: list[str]
evidence_gaps: list[str]
recommended_revision_prompt: str
```
`status` is the business decision field. `passed` is a compatibility boolean derived from `status`, not an independent source of truth. The mapping is:
- `status == "accepted"` -> `passed=True`
- `status in {"rejected", "insufficient_evidence", "validator_error"}` -> `passed=False`
Task mode, retry, and status transition logic must branch on `status`. New code treats `status == "accepted"` as the acceptance condition. Existing compatibility paths may continue to interpret acceptance as `passed and score >= 0.75` until they are migrated, but new logic should not derive status from `passed` or infer failure from `passed=False` alone.
Rules:
- `accepted`: the final answer is supported by available evidence and satisfies the task. The task enters `awaiting_feedback`.
- `insufficient_evidence`: the validator cannot confirm the answer from available evidence. It must not claim fabrication or contradiction. The task enters `needs_review`.
- `validator_error`: the validator failed to produce a reliable decision. The task enters `needs_review`.
- `rejected`: the evidence clearly contradicts the answer, or the answer clearly misses the task. The first attempt can trigger retry. The last attempt enters `failed` only when there is no usable answer; otherwise it enters `needs_review`.
Task statuses:
- `open`: task exists but has not started.
- `running`: execution is active.
- `validating`: final answer exists and automatic validation is running.
- `awaiting_feedback`: answer is available and automatic validation accepted it.
- `needs_review`: answer is available, but automatic validation could not confirm it or hit a validator error.
- `needs_revision`: user requested revision, or automatic validation rejected an attempt that can still be retried.
- `failed`: execution ended without a usable answer.
- `closed`: user marked the answer satisfied.
- `abandoned`: user abandoned the task.
`needs_review` remains an open status for the active task API, but the UI should distinguish it from `running`. `failed`, `closed`, and `abandoned` are terminal.
Open status does not mean auto-runnable. The backend should split status semantics:
- `is_open`: the task can still receive user feedback or revision.
- `is_execution_active`: the backend is currently running or validating work.
- `requires_user_action`: the task has stopped automatic execution and needs user input.
`needs_review` should have `is_open=True`, `is_execution_active=False`, and `requires_user_action=True`. Schedulers, automatic retry loops, and active-task polling must not treat `needs_review` as a reason to continue execution. It should appear in the active task API only so the user can review, mark satisfied, revise, or abandon.
User feedback is authoritative:
- `satisfied` closes the task.
- `revise` moves the task to `needs_revision`.
- `abandon` moves the task to `abandoned`.
## Evidence Models
Add structured evidence models in the task or coordinator layer.
```python
@dataclass(slots=True)
class ToolEvidence:
tool_name: str
tool_call_id: str | None
content: str
event_payload: dict[str, Any]
url: str | None = None
title: str | None = None
created_at: str | None = None
@dataclass(slots=True)
class RunEvidence:
run_id: str
session_id: str
output_text: str
finish_reason: str
transcript: list[dict[str, Any]]
tool_results: list[ToolEvidence]
warnings: list[str]
@dataclass(slots=True)
class TaskEvidencePacket:
task_id: str
attempt_index: int
main_run: RunEvidence | None
team_runs: list[RunEvidence]
team_node_results: list[NodeRunResult]
final_output: str
```
`llm_request_snapshotted` events are debug material, not task evidence. They may be referenced in validation debug metadata, but validation should primarily consume transcript, tool results, team node outputs, and final output.
## Evidence Data Flow
1. `AgentLoop` continues to write session events as it does now.
2. After a run completes, an evidence builder reads `session_manager.get_run_event_records(session_id, run_id)` and creates `RunEvidence`.
3. `LocalAgentRunner.run()` attaches `RunEvidence` to `NodeRunResult`.
4. `NodeRunResult` gains `evidence: RunEvidence | None`.
5. `TeamRunResult` carries node evidence through `node_results`; it may also expose a convenience `run_evidence` list.
6. `AgentService._run_task_mode()` builds a `TaskEvidencePacket` after team execution and final synthesis.
7. Final synthesis receives a rendered evidence context built from the same packet.
8. `ValidationService.validate_task_result()` receives the same packet and renders it into the validation prompt without fixed truncation.
Failed or partial nodes must still preserve evidence. A node with `finish_reason="max_tool_iterations"` can be unsuccessful while still carrying useful tool results.
## Final Synthesis Behavior
For team-backed task plans, final synthesis defaults to no tools:
```python
include_tools = False
max_tool_iterations = 0
```
The synthesis prompt should instruct the main agent to:
- use team evidence as the source of truth;
- avoid repeating failed or completed tool calls;
- answer with available evidence;
- clearly state missing or uncertain information.
The planner may explicitly allow a small synthesis tool budget, but the default is no-tools synthesis. If allowed, the budget should be small, such as `max_tool_iterations=1`.
## Tool Iteration Finalization
When a run reaches `max_tool_iterations` and the model still requests tools, the loop should not return `Tool loop stopped...` as the final user-visible answer.
Instead, the loop performs one no-tools finalization call:
- use the accumulated messages and tool results;
- call the provider with `tools=None`;
- add an instruction that the tool budget is exhausted and the model must answer from existing evidence;
- mark the finish reason as `max_tool_iterations_finalized` or another explicit non-stop value;
- return the finalization text as `output_text`.
If finalization itself fails or returns empty content, only then use a clear fallback message explaining that the run could not produce a usable answer.
## Limited Parallel Team Execution
`parallel` team nodes should run concurrently without rewriting the runtime.
Design:
- Keep sequence and DAG behavior on the shared loop where appropriate.
- For `parallel` graph batches, run nodes through isolated `AgentLoop` instances.
- Each isolated loop uses the same workspace and service configuration so session and run records remain queryable from the same stores.
- Add `max_parallel_team_nodes`, default `3`.
- Use an `asyncio.Semaphore` in the scheduler to bound concurrent nodes.
- Return `TeamRunResult.node_results` in graph node order, not completion order.
The implementation should check shared store concurrency. If the current store is not safe for concurrent async writes, add a narrow lock around session/task/run store writes used by these parallel runs.
## Validation Prompt
The validation prompt should consume the full rendered evidence packet, without `[:2500]`, `[:500]`, or `[:12]` fixed caps.
Required validator instructions:
- Return only JSON with the validation fields.
- If evidence is incomplete, return `insufficient_evidence`.
- Only return `rejected` for clear contradiction or clear task failure.
- Do not infer fabrication from missing evidence.
- Do not claim a source lacks a fact unless the rendered evidence proves that absence.
- Treat user feedback as the final business judgment outside automatic validation.
The validator should still be strict about answer quality when evidence is sufficient.
## Validation Debug Metadata
Each `task_validation_snapshotted` event should record:
- validation result;
- validation status;
- attempt index;
- evidence run ids;
- evidence session ids;
- tool result count;
- evidence character length;
- validator raw response;
- rendered validation input or prompt, unless a debug setting disables full prompt storage.
This makes future investigations direct: inspect the exact input the validator saw before interpreting its decision.
## Log Snapshot Size
`llm_request_snapshotted` currently stores complete messages and complete tool schemas in both payload and content. That makes logs large and slows inspection.
Default behavior should change to store a compact payload:
- iteration;
- provider name and model;
- message count;
- tool names;
- message character length;
- tool schema character length;
- max tokens, temperature, thinking flag.
Full request snapshots should be controlled by a debug config flag. This does not reduce validation evidence because evidence comes from transcript and tool result events.
## Testing Plan
Add or update focused unit tests:
1. Validation evidence is not fixed-truncated. A fact after the first 500 characters of a tool result still appears in the validator input.
2. Missing evidence returns `insufficient_evidence` and moves the task to `needs_review`, not `failed`.
3. A team node that ends with `max_tool_iterations` preserves tool evidence in `NodeRunResult.evidence`.
4. Team final synthesis defaults to `tools=None` and receives rendered team evidence.
5. Parallel team nodes start concurrently under a bounded semaphore and results remain in graph order.
6. Tool loop finalization produces a user-visible answer instead of the placeholder stop message.
7. Status transitions cover `accepted -> awaiting_feedback`, `insufficient_evidence -> needs_review`, `validator_error -> needs_review`, and terminal `failed`.
8. Validation debug events include evidence metadata and validator raw response.
## Migration Notes
To reduce risk, implement in layers:
1. Add evidence models and builders without changing behavior.
2. Attach evidence to team node results.
3. Switch final synthesis for team plans to no-tools evidence synthesis.
4. Switch validation to evidence packets and new statuses.
5. Add no-tools finalization for tool iteration limits.
6. Add limited isolated-loop parallel execution.
7. Slim `llm_request_snapshotted` behind a debug flag.
This order keeps each change testable and lets the old transcript-summary path remain as a temporary fallback while evidence packets are introduced.