From 7020f2d67f51facdc505960aff571995ef1115dc Mon Sep 17 00:00:00 2001 From: steven_li Date: Tue, 16 Jun 2026 11:05:08 +0800 Subject: [PATCH] =?UTF-8?q?feat(agent-service):=20=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E7=9B=B4=E6=8E=A5=E6=A8=A1=E5=BC=8F=E4=B8=8B=E7=9A=84=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=A4=84=E7=90=86=E6=94=AF=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当代理服务处于非运行状态时,现在会使用process_direct方法来处理入站消息, 而不是依赖submit_direct方法。这使得服务能够在两种模式下都能正确处理消息。 添加了新的DirectModeInboundService和RunningInboundService测试类来验证 不同模式下的行为,并增加了相应的集成测试用例。 --- .../backend/beaver/services/agent_service.py | 3 +- .../tests/unit/test_gateway_channels.py | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/app-instance/backend/beaver/services/agent_service.py b/app-instance/backend/beaver/services/agent_service.py index 904ccc2..7fcfc98 100644 --- a/app-instance/backend/beaver/services/agent_service.py +++ b/app-instance/backend/beaver/services/agent_service.py @@ -1285,7 +1285,8 @@ class AgentService: channel_identity = inbound.channel_identity try: - result = await self.submit_direct( + runner = self.submit_direct if self.is_running else self.process_direct + result = await runner( inbound.content, session_id=inbound.session_id, source=f"gateway:{inbound.channel}", diff --git a/app-instance/backend/tests/unit/test_gateway_channels.py b/app-instance/backend/tests/unit/test_gateway_channels.py index d230581..c2cdc9c 100644 --- a/app-instance/backend/tests/unit/test_gateway_channels.py +++ b/app-instance/backend/tests/unit/test_gateway_channels.py @@ -53,6 +53,27 @@ class InvalidService: is_running = True +class DirectModeInboundService(AgentService): + @property + def is_running(self) -> bool: + return False + + async def submit_direct(self, message: str, **kwargs: Any) -> FakeResult: + raise RuntimeError("AgentLoop.submit_direct() requires an active run() loop") + + async def process_direct(self, message: str, **kwargs: Any) -> FakeResult: + return FakeResult( + session_id=kwargs.get("session_id") or "s1", + output_text=f"direct:{message}", + ) + + +class RunningInboundService(AgentService): + @property + def is_running(self) -> bool: + return True + + def test_gateway_routes_memory_channel_roundtrip(tmp_path) -> None: async def run() -> None: bus = MessageBus() @@ -197,7 +218,7 @@ def test_gateway_fails_fast_for_service_without_handle_inbound_message() -> None def test_agent_service_maps_inbound_error_to_structured_outbound() -> None: async def run() -> None: - service = AgentService() + service = RunningInboundService() async def failing_submit_direct(message: str, **kwargs: Any) -> FakeResult: raise RuntimeError("boom") @@ -217,7 +238,7 @@ def test_agent_service_maps_inbound_error_to_structured_outbound() -> None: def test_agent_service_maps_stopped_runtime_to_stopped_outbound() -> None: async def run() -> None: - service = AgentService() + service = RunningInboundService() async def stopped_submit_direct(message: str, **kwargs: Any) -> FakeResult: raise RuntimeError("AgentLoop.submit_direct() is not accepting new tasks after stop()") @@ -233,6 +254,19 @@ def test_agent_service_maps_stopped_runtime_to_stopped_outbound() -> None: asyncio.run(run()) +def test_agent_service_handles_inbound_in_direct_mode() -> None: + async def run() -> None: + service = DirectModeInboundService() + outbound = await service.handle_inbound_message( + InboundMessage(channel="memory", content="hello", session_id="s1") + ) + + assert outbound.finish_reason == "stop" + assert outbound.content == "direct:hello" + + asyncio.run(run()) + + def test_channel_manager_keeps_unknown_channel_outbound_undeliverable() -> None: async def run() -> None: bus = MessageBus()