Files
beaver_project/app-instance/backend/beaver/interfaces/gateway/main.py
steven_li 8a12c30141 feat(beaver): 完成Task Team功能v1实现,重构后端架构支持统一内核
新增内部Task系统,包括验证、反馈门控机制,实现自动质量验证
(通过率>=0.75)和用户反馈闭环(satisfied/revise/abandon)。

实现Agent Team v1协调器,支持sequence/parallel/dag执行策略,
sub-agent复用主AgentLoop,每个run使用独立memory snapshot。

建立Skill学习pipeline,包含draft/审核/发布/回滚完整生命周期,
通过Task验证通过且用户满意才生成学习候选。

重构目录结构,移除third_party依赖,建立统一engine内核,
所有agent共享运行时基础组件。

更新ContextBuilder清理provider消息字段,增强SkillContext版本管理,
集成TaskExecutionPlanner和TaskSkillResolver实现技能解析机制。
2026-05-08 17:14:14 +08:00

225 lines
7.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Gateway entrypoint for Beaver.
当前阶段只做最小 gateway 宿主与 channel adapter 桥接:
1. 启动时托管 `AgentService.start()`
2. 常驻消费 `MessageBus.inbound`
3. 调 `service.handle_inbound_message(...)`
4. 将结果写回 `MessageBus.outbound`
5. 如果配置了 channel adapters则由 `ChannelManager` 分发 outbound
6. 退出时走 `AgentService.shutdown()`
"""
from __future__ import annotations
import asyncio
from collections.abc import Sequence
from contextlib import suppress
from pathlib import Path
from beaver.foundation.events import InboundMessage, MessageBus
from beaver.interfaces.channels import ChannelAdapter, ChannelManager
from beaver.services.agent_service import AgentService
def _validate_gateway_service(service: AgentService) -> None:
"""Fail fast on injected service objects that do not satisfy gateway needs."""
handler = getattr(service, "handle_inbound_message", None)
if not callable(handler):
raise TypeError(
"Gateway requires a service with an async 'handle_inbound_message(inbound)' method"
)
async def _cleanup_owned_service(
service: AgentService,
*,
timeout_seconds: float | None,
force: bool,
) -> None:
"""Best-effort cleanup for service startup failures or cancellations."""
with suppress(BaseException):
if service.is_running:
await service.shutdown(timeout_seconds=timeout_seconds, force=force)
else:
service.close()
async def _flush_pending_inbound(bus: MessageBus, *, reason: str) -> None:
"""把尚未处理的 inbound 明确冲刷成 outbound 错误,而不是静默丢弃。"""
while True:
try:
pending = bus.inbound.get_nowait()
except asyncio.QueueEmpty:
break
await bus.publish_outbound(
AgentService.build_outbound_error(
pending,
detail=reason,
finish_reason="stopped",
)
)
async def _await_task_shutdown(task: asyncio.Task[None], *, timeout_seconds: float = 1.0) -> None:
"""等待后台任务退出;超时则取消,避免 shutdown 被反向卡死。"""
try:
await asyncio.wait_for(task, timeout=timeout_seconds)
except asyncio.CancelledError:
pass
except asyncio.TimeoutError:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def _bridge_inbound_to_runtime(
service: AgentService,
bus: MessageBus,
stop_event: asyncio.Event,
) -> None:
"""Consume inbound messages, run the agent, and publish outbound results."""
while True:
if stop_event.is_set():
await _flush_pending_inbound(
bus,
reason="Gateway stopped before processing the inbound message",
)
break
try:
inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=0.25)
except asyncio.TimeoutError:
continue
try:
outbound = await service.handle_inbound_message(inbound)
except asyncio.CancelledError:
await bus.publish_outbound(
AgentService.build_outbound_error(
inbound,
detail="Gateway stopped before completing the inbound message",
finish_reason="cancelled",
)
)
raise
else:
await bus.publish_outbound(outbound)
async def run_gateway(
*,
workspace: str | Path | None = None,
config_path: str | Path | None = None,
service: AgentService | None = None,
bus: MessageBus | None = None,
channels: Sequence[ChannelAdapter] | None = None,
channel_manager: ChannelManager | None = None,
manage_service_lifecycle: bool | None = None,
stop_event: asyncio.Event | None = None,
shutdown_timeout_seconds: float | None = 5.0,
shutdown_force: bool = True,
) -> None:
"""运行最小 gateway 宿主层与消息桥接。
默认 ownership 语义:
- 未传 `service`gateway 自己创建并接管其 lifecycle
- 传入外部 `service`:默认只使用,不自动 start/shutdown
- `channel_manager` 和 `channels` 二选一,避免隐式修改外部 manager
"""
attached_service = service or AgentService(workspace=workspace, config_path=config_path)
_validate_gateway_service(attached_service)
if channel_manager is not None and channels is not None:
raise ValueError("Pass either channel_manager or channels, not both")
if bus is not None:
attached_bus = bus
elif channel_manager is not None:
attached_bus = channel_manager.bus
else:
attached_bus = MessageBus()
attached_channel_manager = channel_manager
if attached_channel_manager is not None and attached_channel_manager.bus is not attached_bus:
raise ValueError("Injected channel_manager must share the gateway MessageBus")
if attached_channel_manager is None and channels is not None:
attached_channel_manager = ChannelManager(attached_bus)
if attached_channel_manager is not None and channels is not None:
for channel in channels:
attached_channel_manager.register(channel)
owns_service = manage_service_lifecycle if manage_service_lifecycle is not None else service is None
owned_stop_event = stop_event or asyncio.Event()
started = False
channels_started = False
if owns_service:
try:
await attached_service.start()
started = True
except BaseException:
await _cleanup_owned_service(
attached_service,
timeout_seconds=shutdown_timeout_seconds,
force=shutdown_force,
)
raise
if not attached_service.is_running:
raise RuntimeError(
"Gateway requires AgentService running mode; start the injected service first "
"or allow the gateway to manage its lifecycle."
)
if attached_channel_manager is not None:
try:
await attached_channel_manager.start()
channels_started = True
except BaseException:
if owns_service and started:
await _cleanup_owned_service(
attached_service,
timeout_seconds=shutdown_timeout_seconds,
force=shutdown_force,
)
raise
bridge_task = asyncio.create_task(_bridge_inbound_to_runtime(attached_service, attached_bus, owned_stop_event))
dispatch_task: asyncio.Task[None] | None = None
dispatch_stop_event = asyncio.Event()
if attached_channel_manager is not None:
dispatch_task = asyncio.create_task(attached_channel_manager.dispatch_outbound(dispatch_stop_event))
try:
await owned_stop_event.wait()
finally:
owned_stop_event.set()
if owns_service and started:
try:
await attached_service.shutdown(
timeout_seconds=shutdown_timeout_seconds,
force=shutdown_force,
)
finally:
await _await_task_shutdown(bridge_task)
else:
await _await_task_shutdown(bridge_task)
if dispatch_task is not None:
dispatch_stop_event.set()
await _await_task_shutdown(dispatch_task)
if attached_channel_manager is not None and channels_started:
await attached_channel_manager.stop()
def main() -> None:
"""同步 gateway 入口。"""
try:
asyncio.run(run_gateway())
except KeyboardInterrupt:
pass