From 60df0fe196f1d6c62d6cf7c2df93d64644b3c537 Mon Sep 17 00:00:00 2001 From: 0Xiao0 <511201264@qq.com> Date: Fri, 12 Jun 2026 14:23:41 +0800 Subject: [PATCH] feat: add tools to normal agent --- main/bridge_server.py | 99 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 97 insertions(+), 2 deletions(-) diff --git a/main/bridge_server.py b/main/bridge_server.py index 5746c57..fd0ada1 100644 --- a/main/bridge_server.py +++ b/main/bridge_server.py @@ -67,6 +67,7 @@ TTS_START_CONSECUTIVE_AUDIBLE_FRAMES = int(os.getenv("TTS_START_CONSECUTIVE_AUDI TTS_INTERRUPT_SILENCE_FRAMES = 3 INTERRUPT_TOPIC = "lk.interrupt" VISION_FRAME_TOPIC = "vision.frame" +MCP_TOPIC = "mcp" AGENT_STATE_ATTRIBUTE = "lk.agent.state" TTS_DISPLAY_SENTENCE_BREAKS = "。!?!?;;" TTS_DISPLAY_SCROLL_WIDTH = int(os.getenv("TTS_DISPLAY_SCROLL_WIDTH", "18")) @@ -601,6 +602,74 @@ class ESP32LiveKitBridge: if last_error is not None: print(f"发送 vision frame 失败,publish_data 签名不兼容: {last_error}") + async def _publish_mcp_message(self, session: DeviceSession, message: dict[str, Any]) -> None: + payload = message.get("payload") + if not isinstance(payload, dict): + print(f"收到 ESP32 MCP 消息但缺少 payload: {message}") + return + + participant = getattr(session.room, "local_participant", None) + if participant is None: + print("跳过发送 MCP 消息,local participant 尚未就绪") + return + + outbound = { + "type": "mcp", + "topic": MCP_TOPIC, + "room": session.room_name, + "identity": session.identity, + "device_id": session.device_id, + "payload": payload, + } + data = json.dumps(outbound, ensure_ascii=False).encode("utf-8") + agent_identities = self._get_agent_identities(session) + kwargs: dict[str, Any] = {} + if agent_identities: + kwargs["destination_identities"] = agent_identities + + last_error: Optional[Exception] = None + for attempt in ({"topic": MCP_TOPIC, **kwargs}, kwargs): + try: + await participant.publish_data(data, **attempt) + print( + f"已发送 MCP 响应: id={payload.get('id')} " + f"targets={agent_identities or 'broadcast'}" + ) + return + except TypeError as exc: + last_error = exc + except Exception as exc: + print(f"发送 MCP 响应失败: {exc}") + return + + if last_error is not None: + print(f"发送 MCP 响应失败,publish_data 签名不兼容: {last_error}") + + async def _forward_mcp_to_device( + self, + session: DeviceSession, + payload: dict[str, Any], + *, + source_identity: str, + ) -> None: + if session.websocket is None: + print("跳过 MCP 请求,ESP32 尚未连接") + return + + await session.websocket.send( + json.dumps( + { + "type": "mcp", + "payload": payload, + }, + ensure_ascii=False, + ) + ) + print( + f"已转发 MCP 请求到 ESP32: id={payload.get('id')} " + f"method={payload.get('method')} source={source_identity}" + ) + async def _send_tts_state(self, session: DeviceSession, state: str) -> None: if session.websocket is None: print(f"跳过 tts {state},ESP32 尚未连接") @@ -1042,13 +1111,37 @@ class ESP32LiveKitBridge: @session.room.on("data_received") def on_data_received(data_packet: rtc.DataPacket) -> None: identity = data_packet.participant.identity if data_packet.participant else "未知" + packet_topic = getattr(data_packet, "topic", None) try: + decoded = data_packet.data.decode("utf-8") print( f"📩 [数据接收 | room={session.room_name} | {identity}]: " - f"{data_packet.data.decode('utf-8')}" + f"{decoded}" ) except Exception: - pass + decoded = "" + + try: + payload = json.loads(decoded) if decoded else None + except Exception: + payload = None + + if isinstance(payload, dict) and ( + packet_topic == MCP_TOPIC + or payload.get("type") == "mcp" + or payload.get("topic") == MCP_TOPIC + ): + mcp_payload = payload.get("payload") + if isinstance(mcp_payload, dict): + asyncio.create_task( + self._forward_mcp_to_device( + session, + mcp_payload, + source_identity=identity, + ) + ) + else: + print(f"收到 MCP 数据但缺少 payload: {payload}") @session.room.on("transcription_received") def on_transcription_received( @@ -1551,6 +1644,8 @@ class ESP32LiveKitBridge: abort_reason = reason if isinstance(reason, str) and reason else "button_abort" print(f"处理 ESP32 打断请求: reason={abort_reason}") await self._abort_tts(session, abort_reason) + elif msg_type == "mcp": + await self._publish_mcp_message(session, data) elif msg_type == "vision" and data.get("state") == "frame": await self._publish_vision_frame(session, data) except json.JSONDecodeError: