Files
beaver_project/app-instance/backend/beaver/services/agent_service.py
steven_li a27560102b feat(task): 添加任务修订功能和超时处理机制
添加了 `revise_task` 路由动作类型,允许用户修改、纠正或重新执行最新活动任务结果。
实现了工具失败指导原则,防止相同类别工具重复失败。
为任务规划器添加了超时处理机制,避免长时间等待。

BREAKING CHANGE: 任务路由逻辑已更新,新增 `revise_task` 动作类型。

fix(api): 修复任务详情API返回完整流程投影

修复了任务详情API端点,现在会包含过滤后的流程运行、事件和工件信息,
并确保时间戳字段正确序列化。

refactor(engine): 优化任务技能解析器摘要节点处理

改进了任务技能解析器对摘要节点的处理逻辑,对于仅依赖文本生成功能的摘要节
点不再分配具体技能,直接使用依赖项输出进行汇总。

test: 增加任务修订和超时处理测试用例

添加了测试用例验证任务修订输入记录反馈、超时回退到单模式以及
摘要节点技能解析等新功能。
2026-05-21 16:40:44 +08:00

1265 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Application service for agent entry.
这层的职责是把“接口层如何调用 AgentLoop”统一收口。
接口层以后不应该各自做这些事情:
1. 自己 new `AgentLoop`
2. 自己决定何时 `boot()`
3. 自己处理 direct run 的同步/异步包装
统一放在 `AgentService` 后CLI / Web / Gateway 才能共享同一条运行主链。
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
from uuid import uuid4
from beaver.coordinator.models import ExecutionNode, TeamRunResult
from beaver.engine import AgentLoop, AgentProfile, AgentRunResult, EngineLoader
from beaver.engine.providers import make_provider_bundle
from beaver.foundation.events import InboundMessage, OutboundMessage
from beaver.foundation.models import CronJob, CronRunRecord
from beaver.tasks import MainAgentRouter, TaskExecutionPlan, TaskRecord, ValidationResult
NOTIFICATION_SESSION_ID = "notify:default:scheduled"
class AgentService:
"""面向 interfaces 的统一 agent 运行入口。
这里明确区分两种调用模式:
1. direct mode
- 不启动后台运行循环
- 直接调用 `process_direct()` / `run_direct()`
2. running mode
- 先 `await start()`
- 之后所有外部任务都必须走 `submit_direct()`
- 不允许再直接调用 `process_direct()`
"""
def __init__(
self,
*,
workspace: str | Path | None = None,
config_path: str | Path | None = None,
profile: AgentProfile | None = None,
loader: EngineLoader | None = None,
) -> None:
self.profile = profile or AgentProfile()
self.loader = loader or EngineLoader(workspace=workspace, config_path=config_path)
self._loop: AgentLoop | None = None
self._run_task: asyncio.Task[None] | None = None
self._main_agent_router = MainAgentRouter()
self._runtime_services: dict[str, Any] = {}
def create_loop(self) -> AgentLoop:
"""创建并缓存当前 service 使用的 AgentLoop。"""
if self._loop is None:
self._loop = AgentLoop(profile=self.profile, loader=self.loader)
self._loop.runtime_services.update(self._runtime_services)
self._loop.boot()
return self._loop
def register_runtime_service(self, name: str, service: Any) -> None:
"""Expose process-level services to tools during agent runs."""
self._runtime_services[name] = service
if self._loop is not None:
self._loop.runtime_services[name] = service
@property
def has_loop(self) -> bool:
"""当前 service 是否已经创建过 loop。"""
return self._loop is not None
@property
def is_running(self) -> bool:
"""当前 service 是否处于 running mode。"""
return self._run_task is not None and not self._run_task.done()
def close(self) -> None:
"""关闭当前 service 持有的 runtime。"""
if self._run_task is not None and not self._run_task.done():
raise RuntimeError("AgentService.close() requires stop() before closing a running loop")
self._run_task = None
if self._loop is None:
return
try:
self._loop.close()
finally:
self._loop = None
async def start(self) -> None:
"""启动后台运行循环,进入 running mode。
进入 running mode 后:
- 外部任务必须通过 `submit_direct()` 提交
- `process_direct()` 不再允许直接调用
"""
if self._run_task is not None and not self._run_task.done():
return
loop = self.create_loop()
self._run_task = asyncio.create_task(loop.run())
while not loop.is_running:
if self._run_task.done():
await self._run_task
break
await asyncio.sleep(0)
async def _stop_impl(
self,
*,
timeout_seconds: float | None = None,
force: bool = False,
) -> None:
"""内部停止实现,支持 graceful timeout 和可选 force cancel。"""
if self._run_task is None:
return
run_task = self._run_task
loop = self.create_loop()
try:
await loop.stop()
if timeout_seconds is None:
await run_task
else:
try:
await asyncio.wait_for(asyncio.shield(run_task), timeout=timeout_seconds)
except asyncio.TimeoutError as exc:
if force:
run_task.cancel()
try:
await run_task
except asyncio.CancelledError:
pass
else:
raise TimeoutError(
f"AgentService.stop() timed out after {timeout_seconds} seconds while draining queued tasks"
) from exc
finally:
if run_task.done():
self._run_task = None
async def stop(
self,
*,
timeout_seconds: float | None = None,
force: bool = False,
) -> None:
"""停止后台运行循环并等待退出。
参数:
- `timeout_seconds`: graceful drain 的最长等待时间;`None` 表示一直等
- `force`: 超时后是否 cancel 掉运行循环 task
"""
await self._stop_impl(timeout_seconds=timeout_seconds, force=force)
async def shutdown(
self,
*,
timeout_seconds: float | None = None,
force: bool = False,
) -> None:
"""先停运行循环,再释放 runtime。"""
await self._stop_impl(timeout_seconds=timeout_seconds, force=force)
self.close()
async def process_direct(
self,
message: str,
**kwargs: Any,
) -> AgentRunResult:
"""异步 direct run 入口。
仅在 direct mode 下可用。
如果 service 已经 `start()` 进入 running mode
调用方必须改用 `submit_direct()`,不能绕过运行队列直接执行。
"""
if self._run_task is not None and not self._run_task.done():
raise RuntimeError(
"AgentService.process_direct() is unavailable while the service is running; "
"use 'await AgentService.submit_direct(...)' after start()."
)
loop = self.create_loop()
return await self._process_with_main_agent(message, runner=loop.process_direct, kwargs=kwargs)
async def submit_direct(
self,
message: str,
**kwargs: Any,
) -> AgentRunResult:
"""向 running mode 下的 loop 提交 direct task。
这是 `start()` 之后唯一合法的外部任务入口。
"""
loop = self.create_loop()
return await self._process_with_main_agent(message, runner=loop.submit_direct, kwargs=kwargs)
async def run_scheduled_task(
self,
message: str,
*,
session_id: str,
cron_job_id: str,
cron_job_name: str,
scheduled_run_id: str | None = None,
requires_followup: bool = False,
) -> AgentRunResult:
"""Run a cron trigger as a normal internal Task.
Scheduled jobs are product-level Tasks, not hidden one-off agent turns.
This entry bypasses the main-agent classifier and forces Task mode so
every trigger produces a TaskRecord, validation, feedback state, and a
run_id that the scheduled-task history can link to.
"""
loaded = self.create_loop().boot()
task_service = self._require_loaded(loaded, "task_service")
loop = self.create_loop()
task = task_service.create_task(
session_id=session_id,
description=message,
creator="cron",
metadata={
"source": "scheduled_cron",
"cron_job_id": cron_job_id,
"cron_job_name": cron_job_name,
"scheduled_run_id": scheduled_run_id,
"user_engaged": False,
"requires_followup": requires_followup,
},
)
execution_context = (
"This turn was triggered automatically by a scheduled task.\n\n"
f"Cron Job ID: {cron_job_id}\n"
f"Cron Job Name: {cron_job_name}\n"
f"Scheduled Run ID: {scheduled_run_id or 'unknown'}\n"
"Run it as a normal Beaver Task. Do not ask the user for confirmation; "
"execute the task and report the concrete outcome."
)
runner = loop.submit_direct if self.is_running else loop.process_direct
result = await self._run_task_mode(
message,
runner=runner,
task=task,
kwargs={
"session_id": session_id,
"source": "cron",
"user_id": "cron",
"title": cron_job_name,
"execution_context": execution_context,
},
)
loaded = self.create_loop().boot()
session_manager = self._require_loaded(loaded, "session_manager")
session_manager.update_latest_assistant_event_payload(
result.session_id,
result.run_id,
{
"message_type": "scheduled_reply",
"scheduled_job_id": job.id,
"scheduled_run_id": run.scheduled_run_id,
"cron_job_name": job.name,
"mode": "notification",
},
)
return result
async def run_scheduled_notification(
self,
message: str,
*,
session_id: str = NOTIFICATION_SESSION_ID,
cron_job_id: str,
cron_job_name: str,
scheduled_run_id: str,
) -> AgentRunResult:
"""Run a cron trigger as a notification result, not as an active Task."""
loop = self.create_loop()
loaded = loop.boot()
session_manager = self._require_loaded(loaded, "session_manager")
runner = loop.submit_direct if self.is_running else loop.process_direct
execution_context = (
"This turn was triggered automatically by a scheduled notification.\n\n"
f"Cron Job ID: {cron_job_id}\n"
f"Cron Job Name: {cron_job_name}\n"
f"Scheduled Run ID: {scheduled_run_id}\n"
"Generate the notification content directly for the user. Do not ask for confirmation."
)
result = await runner(
message,
session_id=session_id,
source="notification",
user_id="cron",
title=cron_job_name,
execution_context=execution_context,
)
session_manager.update_latest_assistant_event_payload(
result.session_id,
result.run_id,
{
"message_type": "scheduled_result",
"scheduled_job_id": cron_job_id,
"scheduled_run_id": scheduled_run_id,
"cron_job_name": cron_job_name,
"mode": "notification",
},
)
return result
def engage_scheduled_run(
self,
*,
job: CronJob,
run: CronRunRecord,
intent: str = "revise_once",
thinking_enabled: bool | None = None,
) -> TaskRecord:
"""Create or mark the Task that lets the user work on a scheduled result."""
loaded = self.create_loop().boot()
task_service = self._require_loaded(loaded, "task_service")
if run.task_id:
existing = task_service.get_task(run.task_id)
if existing is not None:
existing.metadata["user_engaged"] = True
existing.metadata["engage_intent"] = intent
task_service.store.upsert_task(existing)
return existing
task = task_service.create_task(
session_id=run.notification_session_id or NOTIFICATION_SESSION_ID,
description=f"修改定时通知:{job.name}",
creator="cron",
metadata={
"source": "scheduled_run",
"cron_job_id": job.id,
"cron_job_name": job.name,
"scheduled_run_id": run.scheduled_run_id,
"scheduled_output": run.output,
"user_engaged": True,
"engage_intent": intent,
},
)
return task
async def submit_scheduled_reply(
self,
message: str,
*,
job: CronJob,
run: CronRunRecord,
intent: str = "revise_once",
) -> AgentRunResult:
task = self.engage_scheduled_run(job=job, run=run, intent=intent)
loop = self.create_loop()
runner = loop.submit_direct if self.is_running else loop.process_direct
execution_context = (
"The user is replying to a scheduled notification result.\n\n"
f"Cron Job ID: {job.id}\n"
f"Cron Job Name: {job.name}\n"
f"Scheduled Run ID: {run.scheduled_run_id}\n"
f"Engagement intent: {intent}\n"
f"Original scheduled instruction: {job.payload.message}\n"
f"Original notification output:\n{run.output or ''}\n\n"
"Handle this as a Task continuation. If the intent is update_future, explain the durable change "
"that should apply to future notifications."
)
return await self._run_task_mode(
message,
runner=runner,
task=task,
kwargs={
"session_id": task.session_id,
"source": "notification",
"user_id": "web",
"title": job.name,
"execution_context": execution_context,
"thinking_enabled": thinking_enabled,
},
)
async def submit_feedback(
self,
*,
session_id: str,
run_id: str,
feedback_type: str,
comment: str | None = None,
) -> dict[str, Any]:
"""Record chat feedback for the internal task linked to a run."""
loaded = self.create_loop().boot()
task_service = self._require_loaded(loaded, "task_service")
task = task_service.get_task_by_run_id(run_id)
if task is None or task.session_id != session_id:
raise ValueError(f"No internal task found for run_id={run_id!r}")
normalized = feedback_type.strip().lower()
if normalized not in {"satisfied", "revise", "abandon"}:
raise ValueError("feedback_type must be one of: satisfied, revise, abandon")
already_recorded = any(
item.get("run_id") == run_id and item.get("feedback_type") == normalized
for item in task.feedback
)
conflicting_feedback = next(
(
item
for item in task.feedback
if item.get("run_id") == run_id and item.get("feedback_type") != normalized
),
None,
)
if conflicting_feedback is not None:
raise ValueError(
f"Feedback for run_id={run_id!r} was already recorded as "
f"{conflicting_feedback.get('feedback_type')!r}"
)
if task.status in {"closed", "abandoned"} and not already_recorded:
raise ValueError(f"Task {task.task_id} is already finalized as {task.status!r}")
updated = task if already_recorded else task_service.add_feedback(
task.task_id,
feedback_type=normalized,
comment=comment,
run_id=run_id,
)
session_manager = self._require_loaded(loaded, "session_manager")
session_manager.update_latest_assistant_event_payload(
session_id,
run_id,
{
"task_id": updated.task_id,
"task_status": updated.status,
"feedback_state": normalized,
},
)
if not already_recorded:
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="task_feedback_recorded",
event_payload={
"task_id": task.task_id,
"feedback_type": normalized,
"comment": comment,
"task_status": updated.status,
},
content=comment,
context_visible=False,
)
generated_candidates = []
validation = ValidationResult.from_dict(updated.validation_result)
if not already_recorded:
run_memory_store = self._require_loaded(loaded, "run_memory_store")
feedback_payload = {
"feedback_type": normalized,
"comment": comment or "",
"task_status": updated.status,
}
run_memory_store.update_run_record(
run_id,
success=normalized == "satisfied",
feedback=feedback_payload,
)
run_memory_store.update_skill_effects_for_run(
run_id,
success=normalized == "satisfied",
feedback_score=self._feedback_score_for_learning(normalized, validation),
notes=(comment or normalized).strip(),
)
skill_learning_service = self._require_loaded(loaded, "skill_learning_service")
skill_learning_service.rescore_skill_versions()
if already_recorded:
generated_candidates = []
elif normalized == "satisfied" and validation is not None and validation.accepted:
generated_candidates = [
item.to_dict()
for item in skill_learning_service.build_learning_candidates_for_task(
updated.task_id,
trigger_run_id=run_id,
)
]
elif normalized == "abandon":
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="task_failure_evidence_recorded",
event_payload={
"task_id": updated.task_id,
"feedback_type": normalized,
"comment": comment or "",
"task_status": updated.status,
"durable_memory_written": False,
},
content=(comment or "Task abandoned; retained as run/session failure evidence."),
context_visible=False,
)
return {
"session_id": session_id,
"run_id": run_id,
"task_id": updated.task_id,
"task_status": updated.status,
"feedback_type": normalized,
"learning_candidates": generated_candidates,
}
async def _process_with_main_agent(
self,
message: str,
*,
runner: Any,
kwargs: dict[str, Any],
) -> AgentRunResult:
loaded = self.create_loop().boot()
task_service = self._require_loaded(loaded, "task_service")
session_manager = self._require_loaded(loaded, "session_manager")
session_id = kwargs.get("session_id") or uuid4().hex
kwargs = dict(kwargs)
kwargs["session_id"] = session_id
provider_bundle = kwargs.get("provider_bundle") or self._make_provider_bundle_for_task(loaded, kwargs)
kwargs["provider_bundle"] = provider_bundle
router_provider = provider_bundle.auxiliary_provider or provider_bundle.main_provider
router_runtime = provider_bundle.auxiliary_runtime or provider_bundle.main_runtime
active_task = task_service.get_latest_open_task(session_id)
decision = await self._main_agent_router.classify(
message,
active_task=active_task,
provider=router_provider,
model=getattr(router_runtime, "model", None),
recent_messages=session_manager.get_messages_as_conversation(session_id),
intent_skill=self._load_intent_agent_skill(loaded),
thinking_enabled=kwargs.get("thinking_enabled"),
)
kwargs["intent_agent_decision"] = self._intent_decision_payload(
decision,
active_task=active_task,
)
if active_task is not None and decision.short_title and not active_task.metadata.get("short_title"):
active_task.metadata["short_title"] = decision.short_title
task_service.store.upsert_task(active_task)
if active_task is not None and decision.closes_task:
task_service.close_task(active_task.task_id, reason=decision.reason)
return await runner(message, **kwargs)
if active_task is not None and decision.abandons_task:
task_service.abandon_task(active_task.task_id, reason=decision.reason)
return await runner(message, **kwargs)
if not decision.is_task:
kwargs["include_skill_assembly"] = False
kwargs["include_tools"] = False
return await runner(message, **kwargs)
task = (
task_service.create_task(
session_id=session_id,
description=message,
metadata={
"router_reason": decision.reason,
**({"short_title": decision.short_title} if decision.short_title else {}),
},
)
if active_task is None or decision.starts_new_task
else active_task
)
if active_task is not None and decision.action == "revise_task" and task.task_id == active_task.task_id:
task = self._record_revision_feedback_for_task(
loaded,
task=task,
session_id=session_id,
comment=message,
)
return await self._run_task_mode(message, runner=runner, kwargs=kwargs, task=task)
def _record_revision_feedback_for_task(
self,
loaded: Any,
*,
task: TaskRecord,
session_id: str,
comment: str,
) -> TaskRecord:
"""Mark the latest feedback-eligible run as revised before continuing a task."""
if task.status not in {"awaiting_feedback", "needs_revision"}:
return task
run_id = next((item for item in reversed(task.run_ids) if item), None)
if not run_id:
return task
existing = next((item for item in task.feedback if item.get("run_id") == run_id), None)
if existing is not None:
if existing.get("feedback_type") != "revise":
return task
updated = task
already_recorded = True
else:
task_service = self._require_loaded(loaded, "task_service")
updated = task_service.add_feedback(
task.task_id,
feedback_type="revise",
comment=comment,
run_id=run_id,
)
already_recorded = False
session_manager = self._require_loaded(loaded, "session_manager")
session_manager.update_latest_assistant_event_payload(
session_id,
run_id,
{
"task_id": updated.task_id,
"task_status": updated.status,
"feedback_state": "revise",
},
)
if already_recorded:
return updated
session_manager.append_message(
session_id,
run_id=run_id,
role="system",
event_type="task_feedback_recorded",
event_payload={
"task_id": updated.task_id,
"feedback_type": "revise",
"comment": comment,
"task_status": updated.status,
"auto_recorded": True,
},
content=comment,
context_visible=False,
)
validation = ValidationResult.from_dict(updated.validation_result)
run_memory_store = self._require_loaded(loaded, "run_memory_store")
run_memory_store.update_run_record(
run_id,
success=False,
feedback={
"feedback_type": "revise",
"comment": comment,
"task_status": updated.status,
},
)
run_memory_store.update_skill_effects_for_run(
run_id,
success=False,
feedback_score=self._feedback_score_for_learning("revise", validation),
notes=comment.strip() or "revise",
)
skill_learning_service = self._require_loaded(loaded, "skill_learning_service")
skill_learning_service.rescore_skill_versions()
return updated
async def _run_task_mode(
self,
message: str,
*,
runner: Any,
kwargs: dict[str, Any],
task: TaskRecord,
) -> AgentRunResult:
loaded = self.create_loop().boot()
task_service = self._require_loaded(loaded, "task_service")
validation_service = self._require_loaded(loaded, "validation_service")
task_execution_planner = self._require_loaded(loaded, "task_execution_planner")
session_manager = self._require_loaded(loaded, "session_manager")
run_memory_store = self._require_loaded(loaded, "run_memory_store")
last_result: AgentRunResult | None = None
latest_validation: ValidationResult | None = None
base_execution_context = kwargs.get("execution_context")
provider_bundle = kwargs.get("provider_bundle") or self._make_provider_bundle_for_task(loaded, kwargs)
kwargs = dict(kwargs)
team_provider_bundle_factory = kwargs.pop("team_provider_bundle_factory", None)
kwargs["provider_bundle"] = provider_bundle
for attempt_index in (1, 2):
task_service.start_run(task.task_id, user_message=message, attempt_index=attempt_index)
plan = await task_execution_planner.plan(
task=task,
user_message=message,
attempt_index=attempt_index,
latest_validation=latest_validation,
provider_bundle=provider_bundle,
)
self._append_task_observation(
session_manager,
task.session_id,
event_type="task_execution_planned",
payload={
"task_id": task.task_id,
"attempt_index": attempt_index,
**plan.to_event_payload(),
},
)
team_summaries: list[str] = []
team_execution_context = ""
if plan.is_team:
team_result, team_error = await self._run_team_for_task(
plan,
task=task,
parent_session_id=kwargs["session_id"],
provider_bundle_factory=team_provider_bundle_factory
or self._build_team_provider_bundle_factory(loaded, kwargs),
)
if team_result is not None:
team_summaries = [self._team_summary_for_validation(team_result)]
team_execution_context = self._team_execution_context(plan, team_result)
self._append_task_observation(
session_manager,
task.session_id,
event_type="task_team_run_completed" if team_result.success else "task_team_run_failed",
payload={
"task_id": task.task_id,
"attempt_index": attempt_index,
"plan_mode": plan.mode,
"strategy": plan.graph.strategy if plan.graph else None,
"node_ids": [node.node_id for node in plan.graph.nodes] if plan.graph else [],
"team_run_ids": team_result.run_ids,
"team_success": team_result.success,
"node_results": self._team_node_results_for_event(plan, team_result),
"reason": plan.reason,
"error": None if team_result.success else "one or more team nodes failed",
},
)
else:
team_summaries = [f"Team execution failed: {team_error}"]
team_execution_context = self._failed_team_execution_context(plan, team_error or "unknown error")
self._append_task_observation(
session_manager,
task.session_id,
event_type="task_team_run_failed",
payload={
"task_id": task.task_id,
"attempt_index": attempt_index,
"plan_mode": plan.mode,
"strategy": plan.graph.strategy if plan.graph else None,
"node_ids": [node.node_id for node in plan.graph.nodes] if plan.graph else [],
"team_run_ids": [],
"team_success": False,
"reason": plan.reason,
"error": team_error,
},
)
attempt_kwargs = dict(kwargs)
attempt_kwargs.update(
{
"task_id": task.task_id,
"task_mode": True,
"attempt_index": attempt_index,
"allow_candidate_generation": False,
}
)
if attempt_index == 2 and latest_validation is not None:
revision_context = latest_validation.recommended_revision_prompt.strip()
if revision_context:
attempt_kwargs["execution_context"] = self._join_context(
base_execution_context,
f"Task validation revision request:\n{revision_context}",
team_execution_context,
)
elif team_execution_context:
attempt_kwargs["execution_context"] = self._join_context(base_execution_context, team_execution_context)
attempt_kwargs["skill_selection_context"] = self._build_skill_selection_context(
task=task,
user_message=message,
attempt_index=attempt_index,
latest_validation=latest_validation,
plan=plan,
team_summaries=team_summaries,
)
result = await runner(message, **attempt_kwargs)
last_result = result
self._append_task_observation(
session_manager,
task.session_id,
event_type="task_synthesis_completed",
payload={
"task_id": task.task_id,
"attempt_index": attempt_index,
"main_run_id": result.run_id,
"plan_mode": plan.mode,
"strategy": plan.graph.strategy if plan.graph else None,
},
)
task = task_service.append_run(
task.task_id,
result.run_id,
skill_names=self._skill_names_for_run(loaded, result.run_id),
)
validation = await validation_service.validate_task_result(
task=task,
user_message=message,
final_output=result.output_text,
transcript_excerpt=self._run_excerpt(session_manager, result.session_id, result.run_id),
tool_summaries=self._tool_summaries(session_manager, result.session_id, result.run_id),
team_summaries=team_summaries,
provider_bundle=provider_bundle,
)
latest_validation = validation
task = task_service.record_validation(task.task_id, result.run_id, validation)
run_memory_store.update_run_record(result.run_id, validation_result=validation.to_dict())
session_manager.update_latest_assistant_event_payload(
result.session_id,
result.run_id,
{
"task_id": task.task_id,
"task_status": task.status,
"validation_status": "passed" if validation.accepted else "failed",
},
)
session_manager.append_message(
result.session_id,
run_id=result.run_id,
role="system",
event_type="task_validation_snapshotted",
event_payload={
"task_id": task.task_id,
"attempt_index": attempt_index,
"validation_result": validation.to_dict(),
"retry_scheduled": not validation.accepted and attempt_index == 1,
},
content=validation.recommended_revision_prompt or None,
context_visible=False,
)
if not validation.accepted and attempt_index == 1:
session_manager.set_run_context_visible(result.session_id, result.run_id, False)
result.task_id = task.task_id
result.task_status = task.status
result.validation_result = validation.to_dict()
if validation.accepted or attempt_index == 2:
return result
if last_result is None: # pragma: no cover - defensive
raise RuntimeError("Task mode did not produce a run result")
return last_result
async def _run_team_for_task(
self,
plan: TaskExecutionPlan,
*,
task: TaskRecord,
parent_session_id: str,
provider_bundle_factory: Any,
) -> tuple[TeamRunResult | None, str | None]:
if plan.graph is None:
return None, "team plan did not include an execution graph"
try:
from beaver.services.team_service import TeamService
result = await TeamService(self.create_loop()).run_team(
plan.graph,
parent_task_id=task.task_id,
parent_session_id=parent_session_id,
parent_run_id=None,
provider_bundle_factory=provider_bundle_factory,
allow_candidate_generation=False,
)
return result, None
except Exception as exc:
return None, str(exc)
@staticmethod
def _require_loaded(loaded: Any, field_name: str) -> Any:
value = getattr(loaded, field_name)
if value is None:
raise RuntimeError(f"Engine loader did not provide required dependency {field_name!r}")
return value
@staticmethod
def _load_intent_agent_skill(loaded: Any) -> str | None:
skills_loader = getattr(loaded, "skills_loader", None)
if skills_loader is None:
return None
return skills_loader.load_skill("intent-agent-router")
@staticmethod
def _intent_decision_payload(decision: Any, *, active_task: TaskRecord | None) -> dict[str, Any]:
action = decision.action or ("create_task" if decision.is_task and active_task is None else decision.mode)
return {
"agent": "intent_agent",
"choice": action,
"mode": "task" if decision.is_task else "simple",
"reason": decision.reason,
"active_task_id": active_task.task_id if active_task is not None else None,
"starts_new_task": bool(decision.starts_new_task or (decision.is_task and active_task is None)),
"closes_task": bool(decision.closes_task),
"abandons_task": bool(decision.abandons_task),
"short_title": decision.short_title,
}
@staticmethod
def _skill_names_for_run(loaded: Any, run_id: str) -> list[str]:
store = getattr(loaded, "run_memory_store", None)
if store is None:
return []
for record in store.list_runs():
if record.run_id == run_id:
return [receipt.skill_name for receipt in record.activated_skills]
return []
@staticmethod
def _feedback_score_for_learning(feedback_type: str, validation: ValidationResult | None) -> float:
if feedback_type == "satisfied":
if validation is not None:
return max(0.0, min(1.0, float(validation.score)))
return 1.0
if feedback_type == "revise":
return 0.5
return 0.0
@staticmethod
def _build_skill_selection_context(
*,
task: TaskRecord,
user_message: str,
attempt_index: int,
latest_validation: ValidationResult | None = None,
plan: TaskExecutionPlan | None = None,
team_summaries: list[str] | None = None,
) -> str:
phase = f"attempt_{attempt_index}"
if latest_validation is not None:
phase = f"revision_attempt_{attempt_index}"
elif plan is not None and plan.is_team:
phase = f"team_synthesis_attempt_{attempt_index}"
sections = [
f"Task goal:\n{task.goal or task.description}",
f"Task description:\n{task.description}",
f"Current user request:\n{user_message}",
f"Execution phase:\n{phase}",
f"Task status:\n{task.status}",
]
if task.constraints:
sections.append("Known constraints:\n" + "\n".join(f"- {item}" for item in task.constraints))
if task.skill_names:
sections.append(
"Previously activated skills (reuse bias, not pinned):\n"
+ "\n".join(f"- {item}" for item in task.skill_names)
)
else:
sections.append("Previously activated skills:\nNone")
if latest_validation is not None:
validation_lines = [
f"accepted: {latest_validation.accepted}",
f"score: {latest_validation.score}",
]
if latest_validation.issues:
validation_lines.append("issues:\n" + "\n".join(f"- {item}" for item in latest_validation.issues))
if latest_validation.missing_requirements:
validation_lines.append(
"missing requirements:\n"
+ "\n".join(f"- {item}" for item in latest_validation.missing_requirements)
)
if latest_validation.recommended_revision_prompt:
validation_lines.append(
"recommended revision:\n"
+ latest_validation.recommended_revision_prompt
)
sections.append("Validation feedback:\n" + "\n".join(validation_lines))
if plan is not None:
plan_lines = [
f"mode: {plan.mode}",
f"reason: {plan.reason}",
]
if plan.final_synthesis_instruction:
plan_lines.append(f"final synthesis instruction: {plan.final_synthesis_instruction}")
if plan.graph is not None:
plan_lines.append(f"strategy: {plan.graph.strategy}")
plan_lines.append(
"nodes:\n"
+ "\n".join(
f"- {node.node_id}: {node.task}"
for node in plan.graph.nodes
)
)
sections.append("Execution plan:\n" + "\n".join(plan_lines))
if team_summaries:
sections.append("Team execution summaries:\n" + "\n\n".join(team_summaries)[:2400])
sections.append(
"Skill selection instruction:\n"
"Prefer reusing previously activated skills when they still match the Task. "
"Select new skills only if the current request, revision, or execution plan needs a different capability. "
"If no published skill matches, return [] and let the run continue without skills."
)
return "\n\n".join(section for section in sections if section.strip())
@staticmethod
def _run_excerpt(session_manager: Any, session_id: str, run_id: str) -> str:
lines = []
for event in session_manager.get_run_event_records(session_id, run_id):
if event.context_visible and event.content:
lines.append(f"{event.role}: {event.content.strip()}")
return "\n".join(lines[:12])[:2400]
@staticmethod
def _tool_summaries(session_manager: Any, session_id: str, run_id: str) -> list[str]:
summaries = []
for event in session_manager.get_run_event_records(session_id, run_id):
if event.event_type != "tool_result_recorded":
continue
text = (event.content or "").strip()
if text:
summaries.append(f"{event.tool_name or 'tool'}: {text[:500]}")
return summaries[:12]
@staticmethod
def _append_task_observation(
session_manager: Any,
session_id: str,
*,
event_type: str,
payload: dict[str, Any],
) -> None:
session_manager.append_message(
session_id,
role="system",
event_type=event_type,
event_payload=payload,
content=payload.get("reason") or payload.get("error"),
context_visible=False,
)
@staticmethod
def _join_context(*parts: str | None) -> str:
return "\n\n".join(part.strip() for part in parts if part and part.strip())
@staticmethod
def _team_summary_for_validation(result: TeamRunResult) -> str:
lines = [
f"success={result.success}",
f"task_id={result.task_id or ''}",
"summary:",
result.summary,
"nodes:",
]
for node in result.node_results:
lines.append(
f"- {node.node_id}: success={node.success} finish_reason={node.finish_reason} "
f"error={node.error or ''} output={node.output_text[:500]}"
)
return "\n".join(lines)
@staticmethod
def _team_node_results_for_event(plan: TaskExecutionPlan, result: TeamRunResult) -> list[dict[str, Any]]:
nodes = {node.node_id: node for node in plan.graph.nodes} if plan.graph else {}
payloads: list[dict[str, Any]] = []
for item in result.node_results:
payload = item.to_dict()
node = nodes.get(item.node_id)
if node is not None:
payload["selected_skill_names"] = list(node.inherited_pinned_skills)
payload["ephemeral_skill_names"] = [
skill.name for skill in node.inherited_pinned_skill_contexts
]
payload["skill_query"] = node.agent.metadata.get("skill_query")
payload["ephemeral_guidance_id"] = node.agent.metadata.get("ephemeral_guidance_id")
payload["ephemeral_guidance_name"] = node.agent.metadata.get("ephemeral_guidance_name")
payload["ephemeral_used"] = bool(node.inherited_pinned_skill_contexts)
payloads.append(payload)
return payloads
@staticmethod
def _team_execution_context(plan: TaskExecutionPlan, result: TeamRunResult) -> str:
node_lines = [
(
f"- {node.node_id}: success={node.success}, finish_reason={node.finish_reason}, "
f"run_id={node.run_id or ''}, error={node.error or ''}\n{node.output_text}"
)
for node in result.node_results
]
return "\n\n".join(
item
for item in [
"Task team execution result:",
f"Planner reason: {plan.reason}",
f"Strategy: {plan.graph.strategy if plan.graph else ''}",
f"Team success: {result.success}",
f"Team summary:\n{result.summary}",
"Node results:\n" + "\n\n".join(node_lines),
(
"Final synthesis instruction:\n" + plan.final_synthesis_instruction
if plan.final_synthesis_instruction
else None
),
(
"Use successful team outputs as internal evidence. If one or more nodes failed, "
"do not blindly repeat failed tool calls. Produce a user-visible fallback answer "
"with available evidence and clearly state any missing or uncertain data."
),
]
if item
)
@staticmethod
def _failed_team_execution_context(plan: TaskExecutionPlan, error: str) -> str:
return "\n\n".join(
[
"Task team execution failed before final synthesis.",
f"Planner reason: {plan.reason}",
f"Strategy: {plan.graph.strategy if plan.graph else ''}",
f"Error: {error}",
(
"Proceed as the main agent. Do not blindly repeat failed tool calls; "
"produce a user-visible fallback answer with available evidence and clearly "
"state any missing or uncertain data."
),
]
)
def _build_team_provider_bundle_factory(self, loaded: Any, kwargs: dict[str, Any]) -> Any:
def factory(node: ExecutionNode) -> Any:
node_kwargs = dict(kwargs)
node_kwargs.pop("provider_bundle", None)
if node.agent.model:
node_kwargs["model"] = node.agent.model
if node.agent.provider_name:
node_kwargs["provider_name"] = node.agent.provider_name
return self._make_provider_bundle_for_task(loaded, node_kwargs)
return factory
def _make_provider_bundle_for_task(self, loaded: Any, kwargs: dict[str, Any]) -> Any:
config = loaded.config
configured_provider = config.resolve_provider_target(
model=kwargs.get("model"),
provider_name=kwargs.get("provider_name"),
)
resolved_model = configured_provider.get("model") or self.profile.default_model
resolved_provider_name = configured_provider.get("provider_name") or kwargs.get("provider_name")
return make_provider_bundle(
model=resolved_model,
provider_name=resolved_provider_name,
api_key=kwargs.get("api_key") or configured_provider.get("api_key"),
api_base=kwargs.get("api_base") or configured_provider.get("api_base"),
request_timeout_seconds=configured_provider.get("request_timeout_seconds"),
extra_headers=kwargs.get("extra_headers") or configured_provider.get("extra_headers"),
routing=kwargs.get("routing"),
fallback_target=kwargs.get("fallback_target"),
auxiliary_target=kwargs.get("auxiliary_target"),
embedding_target=kwargs.get("embedding_target") or config.resolve_embedding_target(),
embedding_model=kwargs.get("embedding_model") or config.default_embedding_model,
)
async def handle_inbound_message(self, inbound: InboundMessage) -> OutboundMessage:
"""把 bus inbound 映射成标准 runtime 调用,并返回结构化 outbound。"""
try:
result = await self.submit_direct(
inbound.content,
session_id=inbound.session_id,
source=f"gateway:{inbound.channel}",
user_id=inbound.user_id,
title=inbound.title,
execution_context=inbound.execution_context,
model=inbound.model,
provider_name=inbound.provider_name,
embedding_model=inbound.embedding_model,
)
except Exception as exc:
return self.build_outbound_error(
inbound,
detail=str(exc),
finish_reason=self._classify_inbound_failure(exc),
)
return self.build_outbound_message(inbound, result)
@staticmethod
def _classify_inbound_failure(exc: Exception) -> str:
"""把 runtime 异常收口为更稳定的 bus finish reason。"""
if isinstance(exc, RuntimeError):
detail = str(exc)
if (
"requires an active run() loop" in detail
or "not accepting new tasks after stop()" in detail
):
return "stopped"
return "error"
@staticmethod
def build_outbound_message(inbound: InboundMessage, result: AgentRunResult) -> OutboundMessage:
"""把一次 runtime 正常结果转成 bus outbound。"""
return OutboundMessage(
message_id=inbound.message_id,
channel=inbound.channel,
session_id=result.session_id,
run_id=result.run_id,
content=result.output_text,
finish_reason=result.finish_reason,
provider_name=result.provider_name,
model=result.model,
usage=dict(result.usage),
metadata={
"inbound_metadata": dict(inbound.metadata),
"task_id": getattr(result, "task_id", None),
"task_status": getattr(result, "task_status", None),
"validation_result": getattr(result, "validation_result", None),
},
)
@staticmethod
def build_outbound_error(
inbound: InboundMessage,
*,
detail: str,
finish_reason: str = "error",
) -> OutboundMessage:
"""把 inbound 处理失败转换成结构化 outbound 错误消息。"""
return OutboundMessage(
message_id=inbound.message_id,
channel=inbound.channel,
session_id=inbound.session_id,
content=detail,
finish_reason=finish_reason,
metadata={"error": detail, "inbound_metadata": dict(inbound.metadata)},
)
def run_direct(
self,
message: str,
**kwargs: Any,
) -> AgentRunResult:
"""同步 direct run 包装。
主要给当前 CLI 或简单脚本使用。真正的长期方向仍然是让 interfaces
在 direct mode 下直接走 `await process_direct(...)`。
"""
try:
asyncio.get_running_loop()
except RuntimeError:
pass
else:
raise RuntimeError(
"AgentService.run_direct() cannot be used inside an active event loop; "
"use 'await AgentService.process_direct(...)' instead."
)
return asyncio.run(self.process_direct(message, **kwargs))