"""Gateway entrypoint for Beaver. 当前阶段只做最小 gateway 宿主与 channel adapter 桥接: 1. 启动时托管 `AgentService.start()` 2. 常驻消费 `MessageBus.inbound` 3. 调 `service.handle_inbound_message(...)` 4. 将结果写回 `MessageBus.outbound` 5. 如果配置了 channel adapters,则由 `ChannelManager` 分发 outbound 6. 退出时走 `AgentService.shutdown()` """ from __future__ import annotations import asyncio from collections.abc import Sequence from contextlib import suppress from pathlib import Path from beaver.foundation.events import InboundMessage, MessageBus from beaver.interfaces.channels import ChannelAdapter, ChannelManager from beaver.services.agent_service import AgentService def _validate_gateway_service(service: AgentService) -> None: """Fail fast on injected service objects that do not satisfy gateway needs.""" handler = getattr(service, "handle_inbound_message", None) if not callable(handler): raise TypeError( "Gateway requires a service with an async 'handle_inbound_message(inbound)' method" ) async def _cleanup_owned_service( service: AgentService, *, timeout_seconds: float | None, force: bool, ) -> None: """Best-effort cleanup for service startup failures or cancellations.""" with suppress(BaseException): if service.is_running: await service.shutdown(timeout_seconds=timeout_seconds, force=force) else: service.close() async def _flush_pending_inbound(bus: MessageBus, *, reason: str) -> None: """把尚未处理的 inbound 明确冲刷成 outbound 错误,而不是静默丢弃。""" while True: try: pending = bus.inbound.get_nowait() except asyncio.QueueEmpty: break await bus.publish_outbound( AgentService.build_outbound_error( pending, detail=reason, finish_reason="stopped", ) ) async def _await_task_shutdown(task: asyncio.Task[None], *, timeout_seconds: float = 1.0) -> None: """等待后台任务退出;超时则取消,避免 shutdown 被反向卡死。""" try: await asyncio.wait_for(task, timeout=timeout_seconds) except asyncio.CancelledError: pass except asyncio.TimeoutError: task.cancel() try: await task except asyncio.CancelledError: pass async def _bridge_inbound_to_runtime( service: AgentService, bus: MessageBus, stop_event: asyncio.Event, ) -> None: """Consume inbound messages, run the agent, and publish outbound results.""" while True: if stop_event.is_set(): await _flush_pending_inbound( bus, reason="Gateway stopped before processing the inbound message", ) break try: inbound = await asyncio.wait_for(bus.consume_inbound(), timeout=0.25) except asyncio.TimeoutError: continue try: outbound = await service.handle_inbound_message(inbound) except asyncio.CancelledError: await bus.publish_outbound( AgentService.build_outbound_error( inbound, detail="Gateway stopped before completing the inbound message", finish_reason="cancelled", ) ) raise else: await bus.publish_outbound(outbound) async def run_gateway( *, workspace: str | Path | None = None, config_path: str | Path | None = None, service: AgentService | None = None, bus: MessageBus | None = None, channels: Sequence[ChannelAdapter] | None = None, channel_manager: ChannelManager | None = None, manage_service_lifecycle: bool | None = None, stop_event: asyncio.Event | None = None, shutdown_timeout_seconds: float | None = 5.0, shutdown_force: bool = True, ) -> None: """运行最小 gateway 宿主层与消息桥接。 默认 ownership 语义: - 未传 `service`:gateway 自己创建并接管其 lifecycle - 传入外部 `service`:默认只使用,不自动 start/shutdown - `channel_manager` 和 `channels` 二选一,避免隐式修改外部 manager """ attached_service = service or AgentService(workspace=workspace, config_path=config_path) _validate_gateway_service(attached_service) if channel_manager is not None and channels is not None: raise ValueError("Pass either channel_manager or channels, not both") if bus is not None: attached_bus = bus elif channel_manager is not None: attached_bus = channel_manager.bus else: attached_bus = MessageBus() attached_channel_manager = channel_manager if attached_channel_manager is not None and attached_channel_manager.bus is not attached_bus: raise ValueError("Injected channel_manager must share the gateway MessageBus") if attached_channel_manager is None and channels is not None: attached_channel_manager = ChannelManager(attached_bus) if attached_channel_manager is not None and channels is not None: for channel in channels: attached_channel_manager.register(channel) owns_service = manage_service_lifecycle if manage_service_lifecycle is not None else service is None owned_stop_event = stop_event or asyncio.Event() started = False channels_started = False if owns_service: try: await attached_service.start() started = True except BaseException: await _cleanup_owned_service( attached_service, timeout_seconds=shutdown_timeout_seconds, force=shutdown_force, ) raise if not attached_service.is_running: raise RuntimeError( "Gateway requires AgentService running mode; start the injected service first " "or allow the gateway to manage its lifecycle." ) if attached_channel_manager is not None: try: await attached_channel_manager.start() channels_started = True except BaseException: if owns_service and started: await _cleanup_owned_service( attached_service, timeout_seconds=shutdown_timeout_seconds, force=shutdown_force, ) raise bridge_task = asyncio.create_task(_bridge_inbound_to_runtime(attached_service, attached_bus, owned_stop_event)) dispatch_task: asyncio.Task[None] | None = None dispatch_stop_event = asyncio.Event() if attached_channel_manager is not None: dispatch_task = asyncio.create_task(attached_channel_manager.dispatch_outbound(dispatch_stop_event)) try: await owned_stop_event.wait() finally: owned_stop_event.set() if owns_service and started: try: await attached_service.shutdown( timeout_seconds=shutdown_timeout_seconds, force=shutdown_force, ) finally: await _await_task_shutdown(bridge_task) else: await _await_task_shutdown(bridge_task) if dispatch_task is not None: dispatch_stop_event.set() await _await_task_shutdown(dispatch_task) if attached_channel_manager is not None and channels_started: await attached_channel_manager.stop() def main() -> None: """同步 gateway 入口。""" try: asyncio.run(run_gateway()) except KeyboardInterrupt: pass