Compare commits

...

2 Commits

Author SHA1 Message Date
66d318774a fix: server error to disconnect 2026-06-17 15:19:31 +08:00
60df0fe196 feat: add tools to normal agent 2026-06-12 14:23:41 +08:00

View File

@ -1,6 +1,5 @@
import asyncio import asyncio
import base64 import base64
import contextlib
import json import json
import os import os
import re import re
@ -12,7 +11,7 @@ import traceback
import uuid import uuid
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Coroutine, Optional
import httpx import httpx
import opuslib import opuslib
@ -67,6 +66,7 @@ TTS_START_CONSECUTIVE_AUDIBLE_FRAMES = int(os.getenv("TTS_START_CONSECUTIVE_AUDI
TTS_INTERRUPT_SILENCE_FRAMES = 3 TTS_INTERRUPT_SILENCE_FRAMES = 3
INTERRUPT_TOPIC = "lk.interrupt" INTERRUPT_TOPIC = "lk.interrupt"
VISION_FRAME_TOPIC = "vision.frame" VISION_FRAME_TOPIC = "vision.frame"
MCP_TOPIC = "mcp"
AGENT_STATE_ATTRIBUTE = "lk.agent.state" AGENT_STATE_ATTRIBUTE = "lk.agent.state"
TTS_DISPLAY_SENTENCE_BREAKS = "。!?!?;" TTS_DISPLAY_SENTENCE_BREAKS = "。!?!?;"
TTS_DISPLAY_SCROLL_WIDTH = int(os.getenv("TTS_DISPLAY_SCROLL_WIDTH", "18")) TTS_DISPLAY_SCROLL_WIDTH = int(os.getenv("TTS_DISPLAY_SCROLL_WIDTH", "18"))
@ -122,6 +122,9 @@ class DeviceSession:
last_interrupt_time: float = 0.0 last_interrupt_time: float = 0.0
last_uplink_audible_time: float = 0.0 last_uplink_audible_time: float = 0.0
agent_dispatch_task: Optional[asyncio.Task] = None agent_dispatch_task: Optional[asyncio.Task] = None
room_connect_task: Optional[asyncio.Task] = None
background_tasks: set[asyncio.Task[Any]] = field(default_factory=set)
forwarding_track_participants: dict[str, str] = field(default_factory=dict)
closed: bool = False closed: bool = False
captured_frame_count: int = 0 captured_frame_count: int = 0
first_capture_log_time: float = 0.0 first_capture_log_time: float = 0.0
@ -208,11 +211,15 @@ class ESP32LiveKitBridge:
session: DeviceSession, session: DeviceSession,
task: asyncio.Task[Any], task: asyncio.Task[Any],
) -> None: ) -> None:
if session.room_connect_task is task:
session.room_connect_task = None
if task.cancelled(): if task.cancelled():
return return
try: try:
task.result() task.result()
except Exception as exc: except Exception as exc:
if session.closed:
return
self._log_exception( self._log_exception(
f"LiveKit 房间连接后台任务失败: room={session.room_name}", f"LiveKit 房间连接后台任务失败: room={session.room_name}",
exc, exc,
@ -221,6 +228,48 @@ class ESP32LiveKitBridge:
if websocket is not None: if websocket is not None:
asyncio.create_task(websocket.close(code=1011, reason="livekit connect failed")) asyncio.create_task(websocket.close(code=1011, reason="livekit connect failed"))
def _create_session_task(
self,
session: DeviceSession,
coroutine: Coroutine[Any, Any, Any],
description: str,
) -> Optional[asyncio.Task[Any]]:
if session.closed:
coroutine.close()
return None
task = asyncio.create_task(coroutine)
session.background_tasks.add(task)
task.add_done_callback(
lambda done_task: self._handle_session_task_done(
session,
done_task,
description,
)
)
return task
def _handle_session_task_done(
self,
session: DeviceSession,
task: asyncio.Task[Any],
description: str,
) -> None:
session.background_tasks.discard(task)
if task.cancelled():
return
try:
task.result()
except Exception as exc:
if not session.closed:
self._log_exception(f"{description} 失败: room={session.room_name}", exc)
async def _disconnect_room_quietly(self, session: DeviceSession, reason: str) -> None:
try:
await session.room.disconnect()
except Exception as exc:
print(f"断开 LiveKit 房间失败: room={session.room_name} reason={reason} error={exc}")
async def _capture_mic_frame( async def _capture_mic_frame(
self, self,
session: DeviceSession, session: DeviceSession,
@ -522,6 +571,8 @@ class ESP32LiveKitBridge:
return False return False
async def _send_agent_interrupt(self, session: DeviceSession, reason: str) -> None: async def _send_agent_interrupt(self, session: DeviceSession, reason: str) -> None:
if session.closed:
return
payload = { payload = {
"type": "interrupt", "type": "interrupt",
"topic": INTERRUPT_TOPIC, "topic": INTERRUPT_TOPIC,
@ -552,6 +603,8 @@ class ESP32LiveKitBridge:
return path return path
async def _publish_vision_frame(self, session: DeviceSession, message: dict[str, Any]) -> None: async def _publish_vision_frame(self, session: DeviceSession, message: dict[str, Any]) -> None:
if session.closed:
return
image = message.get("image") image = message.get("image")
if not isinstance(image, str) or not image: if not isinstance(image, str) or not image:
print("收到 vision frame但 image 字段为空") print("收到 vision frame但 image 字段为空")
@ -601,7 +654,81 @@ class ESP32LiveKitBridge:
if last_error is not None: if last_error is not None:
print(f"发送 vision frame 失败publish_data 签名不兼容: {last_error}") print(f"发送 vision frame 失败publish_data 签名不兼容: {last_error}")
async def _publish_mcp_message(self, session: DeviceSession, message: dict[str, Any]) -> None:
if session.closed:
return
payload = message.get("payload")
if not isinstance(payload, dict):
print(f"收到 ESP32 MCP 消息但缺少 payload: {message}")
return
participant = getattr(session.room, "local_participant", None)
if participant is None:
print("跳过发送 MCP 消息local participant 尚未就绪")
return
outbound = {
"type": "mcp",
"topic": MCP_TOPIC,
"room": session.room_name,
"identity": session.identity,
"device_id": session.device_id,
"payload": payload,
}
data = json.dumps(outbound, ensure_ascii=False).encode("utf-8")
agent_identities = self._get_agent_identities(session)
kwargs: dict[str, Any] = {}
if agent_identities:
kwargs["destination_identities"] = agent_identities
last_error: Optional[Exception] = None
for attempt in ({"topic": MCP_TOPIC, **kwargs}, kwargs):
try:
await participant.publish_data(data, **attempt)
print(
f"已发送 MCP 响应: id={payload.get('id')} "
f"targets={agent_identities or 'broadcast'}"
)
return
except TypeError as exc:
last_error = exc
except Exception as exc:
print(f"发送 MCP 响应失败: {exc}")
return
if last_error is not None:
print(f"发送 MCP 响应失败publish_data 签名不兼容: {last_error}")
async def _forward_mcp_to_device(
self,
session: DeviceSession,
payload: dict[str, Any],
*,
source_identity: str,
) -> None:
if session.closed:
return
if session.websocket is None:
print("跳过 MCP 请求ESP32 尚未连接")
return
await session.websocket.send(
json.dumps(
{
"type": "mcp",
"payload": payload,
},
ensure_ascii=False,
)
)
print(
f"已转发 MCP 请求到 ESP32: id={payload.get('id')} "
f"method={payload.get('method')} source={source_identity}"
)
async def _send_tts_state(self, session: DeviceSession, state: str) -> None: async def _send_tts_state(self, session: DeviceSession, state: str) -> None:
if session.closed:
return
if session.websocket is None: if session.websocket is None:
print(f"跳过 tts {state}ESP32 尚未连接") print(f"跳过 tts {state}ESP32 尚未连接")
return return
@ -609,6 +736,8 @@ class ESP32LiveKitBridge:
print(f"已发送 tts {state}: device={session.device_id}") print(f"已发送 tts {state}: device={session.device_id}")
async def _send_emotion(self, session: DeviceSession, emotion: str) -> None: async def _send_emotion(self, session: DeviceSession, emotion: str) -> None:
if session.closed:
return
if session.websocket is None: if session.websocket is None:
print(f"跳过 emotion {emotion}ESP32 尚未连接") print(f"跳过 emotion {emotion}ESP32 尚未连接")
return return
@ -634,6 +763,8 @@ class ESP32LiveKitBridge:
await self._send_emotion(session, emotion) await self._send_emotion(session, emotion)
async def _send_tts_text(self, session: DeviceSession, text: str, final: bool) -> None: async def _send_tts_text(self, session: DeviceSession, text: str, final: bool) -> None:
if session.closed:
return
if session.websocket is None: if session.websocket is None:
return return
raw_text = text raw_text = text
@ -664,12 +795,18 @@ class ESP32LiveKitBridge:
if len(text) <= TTS_DISPLAY_SCROLL_WIDTH: if len(text) <= TTS_DISPLAY_SCROLL_WIDTH:
self._cancel_tts_display_task(session) self._cancel_tts_display_task(session)
asyncio.create_task(self._send_tts_text(session, text, final)) self._create_session_task(
session,
self._send_tts_text(session, text, final),
"发送 TTS 字幕",
)
return return
if session.tts_display_task is None or session.tts_display_task.done(): if session.tts_display_task is None or session.tts_display_task.done():
session.tts_display_task = asyncio.create_task( session.tts_display_task = self._create_session_task(
self._scroll_tts_display_text(session, session.tts_stream_id) session,
self._scroll_tts_display_text(session, session.tts_stream_id),
"滚动 TTS 字幕",
) )
async def _scroll_tts_display_text(self, session: DeviceSession, stream_id: int) -> None: async def _scroll_tts_display_text(self, session: DeviceSession, stream_id: int) -> None:
@ -788,7 +925,11 @@ class ESP32LiveKitBridge:
f"[agent-state] room={session.room_name} identity={participant.identity} state={state}" f"[agent-state] room={session.room_name} identity={participant.identity} state={state}"
) )
if state == "thinking": if state == "thinking":
asyncio.create_task(self._start_thinking(session)) self._create_session_task(
session,
self._start_thinking(session),
"处理 agent thinking 状态",
)
async def _stop_tts(self, session: DeviceSession) -> None: async def _stop_tts(self, session: DeviceSession) -> None:
if not session.tts_active and not session.tts_thinking: if not session.tts_active and not session.tts_thinking:
@ -829,14 +970,20 @@ class ESP32LiveKitBridge:
session.tts_suppressed_until = now + TTS_INTERRUPT_SUPPRESS_SECONDS session.tts_suppressed_until = now + TTS_INTERRUPT_SUPPRESS_SECONDS
session.tts_waiting_for_user_audio_after_interrupt = True session.tts_waiting_for_user_audio_after_interrupt = True
await self._force_stop_tts(session, reason) await self._force_stop_tts(session, reason)
asyncio.create_task(self._send_agent_interrupt(session, reason)) self._create_session_task(
session,
self._send_agent_interrupt(session, reason),
"发送 agent interrupt",
)
def _reset_tts_idle_timer(self, session: DeviceSession) -> None: def _reset_tts_idle_timer(self, session: DeviceSession) -> None:
session.tts_last_audible_at = time.monotonic() session.tts_last_audible_at = time.monotonic()
if session.tts_idle_task is not None: if session.tts_idle_task is not None:
session.tts_idle_task.cancel() session.tts_idle_task.cancel()
session.tts_idle_task = asyncio.create_task( session.tts_idle_task = self._create_session_task(
self._tts_idle_watchdog(session, session.tts_stream_id) session,
self._tts_idle_watchdog(session, session.tts_stream_id),
"TTS idle watchdog",
) )
async def _tts_idle_watchdog(self, session: DeviceSession, stream_id: int) -> None: async def _tts_idle_watchdog(self, session: DeviceSession, stream_id: int) -> None:
@ -890,6 +1037,8 @@ class ESP32LiveKitBridge:
participant: rtc.RemoteParticipant, participant: rtc.RemoteParticipant,
source: str, source: str,
) -> None: ) -> None:
if session.closed or session.websocket is None:
return
if track.kind != rtc.TrackKind.KIND_AUDIO: if track.kind != rtc.TrackKind.KIND_AUDIO:
return return
@ -905,6 +1054,7 @@ class ESP32LiveKitBridge:
if existing_task is not None and existing_task.done(): if existing_task is not None and existing_task.done():
print(f"检测到已结束的音频转发任务,重新创建: sid={track_sid}") print(f"检测到已结束的音频转发任务,重新创建: sid={track_sid}")
session.forwarding_tracks.pop(track_sid, None) session.forwarding_tracks.pop(track_sid, None)
session.forwarding_track_participants.pop(track_sid, None)
task = asyncio.create_task( task = asyncio.create_task(
self.forward_audio_to_esp32( self.forward_audio_to_esp32(
@ -914,17 +1064,37 @@ class ESP32LiveKitBridge:
) )
) )
session.forwarding_tracks[track_sid] = task session.forwarding_tracks[track_sid] = task
session.forwarding_track_participants[track_sid] = participant.identity
print( print(
f"收到音频流: {participant.identity} sid={track_sid} " f"收到音频流: {participant.identity} sid={track_sid} "
f"source={source} room={session.room_name}" f"source={source} room={session.room_name}"
) )
def _cancel_forwarding_tracks(
self,
session: DeviceSession,
participant_identity: Optional[str] = None,
) -> list[asyncio.Task[Any]]:
cancelled: list[asyncio.Task[Any]] = []
for track_sid, task in list(session.forwarding_tracks.items()):
track_participant = session.forwarding_track_participants.get(track_sid)
if participant_identity is not None and track_participant != participant_identity:
continue
session.forwarding_tracks.pop(track_sid, None)
session.forwarding_track_participants.pop(track_sid, None)
if not task.done():
task.cancel()
cancelled.append(task)
return cancelled
def _scan_participant_audio_tracks( def _scan_participant_audio_tracks(
self, self,
session: DeviceSession, session: DeviceSession,
participant: rtc.RemoteParticipant, participant: rtc.RemoteParticipant,
source: str, source: str,
) -> None: ) -> None:
if session.closed:
return
publications = getattr(participant, "track_publications", None) or {} publications = getattr(participant, "track_publications", None) or {}
for publication in publications.values(): for publication in publications.values():
track = getattr(publication, "track", None) track = getattr(publication, "track", None)
@ -992,6 +1162,76 @@ class ESP32LiveKitBridge:
return normalized[start:].strip() or normalized return normalized[start:].strip() or normalized
async def _handle_room_connected(self, session: DeviceSession) -> None:
if session.closed:
return
print(f"✅ 成功连接到 LiveKit 房间: room={session.room_name}")
self._log_agent_participants(session, "connected")
for participant in list(session.room.remote_participants.values()):
if session.closed:
return
if self._is_agent_participant(participant, session.agent_name):
session.agent_ready.set()
self._scan_participant_audio_tracks(session, participant, "connected_scan")
self._handle_agent_state(session, participant)
async def _handle_participant_connected(
self,
session: DeviceSession,
participant: rtc.RemoteParticipant,
) -> None:
if session.closed:
return
role = "Agent" if self._is_agent_participant(participant, session.agent_name) else "Remote participant"
print(f"👋 {role} ({participant.identity}) 已加入房间: room={session.room_name}")
self._log_agent_participants(session, "participant_connected")
if self._is_agent_participant(participant, session.agent_name):
session.agent_ready.set()
self._scan_participant_audio_tracks(
session, participant, "participant_connected_scan"
)
self._handle_agent_state(session, participant)
async def _handle_participant_attributes_changed(
self,
session: DeviceSession,
changed: list[str],
participant: rtc.Participant,
) -> None:
if session.closed:
return
if AGENT_STATE_ATTRIBUTE not in changed:
return
if not isinstance(participant, rtc.RemoteParticipant):
return
if not self._is_agent_participant(participant, session.agent_name):
return
self._handle_agent_state(session, participant)
async def _handle_track_subscribed(
self,
session: DeviceSession,
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
source: str,
) -> None:
if session.closed:
return
self._maybe_forward_remote_audio(session, track, publication, participant, source)
async def _handle_track_published(
self,
session: DeviceSession,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant,
) -> None:
if session.closed:
return
track = getattr(publication, "track", None)
if track is not None:
self._maybe_forward_remote_audio(session, track, publication, participant, "published")
def _register_room_handlers(self, session: DeviceSession) -> None: def _register_room_handlers(self, session: DeviceSession) -> None:
@session.room.on("connection_state_changed") @session.room.on("connection_state_changed")
def on_connection_state_changed(state: int) -> None: def on_connection_state_changed(state: int) -> None:
@ -1000,55 +1240,69 @@ class ESP32LiveKitBridge:
@session.room.on("connected") @session.room.on("connected")
def on_connected() -> None: def on_connected() -> None:
print(f"✅ 成功连接到 LiveKit 房间: room={session.room_name}") self._create_session_task(
self._log_agent_participants(session, "connected") session,
for participant in session.room.remote_participants.values(): self._handle_room_connected(session),
if self._is_agent_participant(participant, session.agent_name): "处理 LiveKit connected 事件",
session.agent_ready.set() )
self._scan_participant_audio_tracks(session, participant, "connected_scan")
self._handle_agent_state(session, participant)
@session.room.on("participant_connected") @session.room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant) -> None: def on_participant_connected(participant: rtc.RemoteParticipant) -> None:
role = "Agent" if self._is_agent_participant(participant, session.agent_name) else "Remote participant" self._create_session_task(
print(f"👋 {role} ({participant.identity}) 已加入房间: room={session.room_name}") session,
self._log_agent_participants(session, "participant_connected") self._handle_participant_connected(session, participant),
if self._is_agent_participant(participant, session.agent_name): "处理 LiveKit participant_connected 事件",
session.agent_ready.set() )
self._scan_participant_audio_tracks(
session, participant, "participant_connected_scan"
)
self._handle_agent_state(session, participant)
@session.room.on("participant_disconnected") @session.room.on("participant_disconnected")
def on_participant_disconnected(participant: rtc.RemoteParticipant) -> None: def on_participant_disconnected(participant: rtc.RemoteParticipant) -> None:
print(f"👋 远端参与者离开房间: room={session.room_name} identity={participant.identity}") print(f"👋 远端参与者离开房间: room={session.room_name} identity={participant.identity}")
session.forwarding_tracks = { self._cancel_forwarding_tracks(session, participant.identity)
track_sid: task
for track_sid, task in session.forwarding_tracks.items()
if not track_sid.endswith(f":{participant.identity}")
}
@session.room.on("participant_attributes_changed") @session.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed: list[str], participant: rtc.Participant) -> None: def on_participant_attributes_changed(changed: list[str], participant: rtc.Participant) -> None:
if AGENT_STATE_ATTRIBUTE not in changed: self._create_session_task(
return session,
if not isinstance(participant, rtc.RemoteParticipant): self._handle_participant_attributes_changed(session, changed, participant),
return "处理 LiveKit participant_attributes_changed 事件",
if not self._is_agent_participant(participant, session.agent_name): )
return
self._handle_agent_state(session, participant)
@session.room.on("data_received") @session.room.on("data_received")
def on_data_received(data_packet: rtc.DataPacket) -> None: def on_data_received(data_packet: rtc.DataPacket) -> None:
identity = data_packet.participant.identity if data_packet.participant else "未知" identity = data_packet.participant.identity if data_packet.participant else "未知"
packet_topic = getattr(data_packet, "topic", None)
try: try:
decoded = data_packet.data.decode("utf-8")
print( print(
f"📩 [数据接收 | room={session.room_name} | {identity}]: " f"📩 [数据接收 | room={session.room_name} | {identity}]: "
f"{data_packet.data.decode('utf-8')}" f"{decoded}"
) )
except Exception: except Exception:
pass decoded = ""
try:
payload = json.loads(decoded) if decoded else None
except Exception:
payload = None
if isinstance(payload, dict) and (
packet_topic == MCP_TOPIC
or payload.get("type") == "mcp"
or payload.get("topic") == MCP_TOPIC
):
mcp_payload = payload.get("payload")
if isinstance(mcp_payload, dict):
self._create_session_task(
session,
self._forward_mcp_to_device(
session,
mcp_payload,
source_identity=identity,
),
"转发 MCP 到 ESP32",
)
else:
print(f"收到 MCP 数据但缺少 payload: {payload}")
@session.room.on("transcription_received") @session.room.on("transcription_received")
def on_transcription_received( def on_transcription_received(
@ -1074,7 +1328,11 @@ class ESP32LiveKitBridge:
) )
if emotion and emotion != session.tts_emotion: if emotion and emotion != session.tts_emotion:
session.tts_emotion = emotion session.tts_emotion = emotion
asyncio.create_task(self._send_emotion(session, emotion)) self._create_session_task(
session,
self._send_emotion(session, emotion),
"发送 emotion",
)
display_text = self._current_tts_display_text(tts_text) display_text = self._current_tts_display_text(tts_text)
print(f"[livekit-llm] display_text={display_text!r} final={segment.final}") print(f"[livekit-llm] display_text={display_text!r} final={segment.final}")
if not display_text or display_text == session.tts_transcript_text: if not display_text or display_text == session.tts_transcript_text:
@ -1084,14 +1342,19 @@ class ESP32LiveKitBridge:
if not segment.final: if not segment.final:
continue continue
display_text = segment.text display_text = segment.text
asyncio.create_task(self._start_thinking(session)) self._create_session_task(
session,
self._start_thinking(session),
"发送 TTS thinking",
)
if session.websocket is not None: if session.websocket is not None:
ws = session.websocket ws = session.websocket
if is_agent: if is_agent:
self._update_tts_display_text(session, display_text, segment.final) self._update_tts_display_text(session, display_text, segment.final)
else: else:
asyncio.create_task( self._create_session_task(
session,
ws.send( ws.send(
json.dumps( json.dumps(
{ {
@ -1100,7 +1363,8 @@ class ESP32LiveKitBridge:
"final": segment.final, "final": segment.final,
} }
) )
) ),
"发送 STT 到 ESP32",
) )
@session.room.on("track_subscribed") @session.room.on("track_subscribed")
@ -1109,7 +1373,11 @@ class ESP32LiveKitBridge:
publication: rtc.TrackPublication, publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant, participant: rtc.RemoteParticipant,
) -> None: ) -> None:
self._maybe_forward_remote_audio(session, track, publication, participant, "event") self._create_session_task(
session,
self._handle_track_subscribed(session, track, publication, participant, "event"),
"处理 LiveKit track_subscribed 事件",
)
@session.room.on("track_published") @session.room.on("track_published")
def on_track_published( def on_track_published(
@ -1121,18 +1389,25 @@ class ESP32LiveKitBridge:
# f"📡 远端音轨已发布: room={session.room_name} identity={participant.identity} " # f"📡 远端音轨已发布: room={session.room_name} identity={participant.identity} "
# f"track_sid={track_sid}" # f"track_sid={track_sid}"
# ) # )
track = getattr(publication, "track", None) self._create_session_task(
if track is not None: session,
self._maybe_forward_remote_audio(session, track, publication, participant, "published") self._handle_track_published(session, publication, participant),
"处理 LiveKit track_published 事件",
)
async def _connect_session_room(self, session: DeviceSession) -> None: async def _connect_session_room(self, session: DeviceSession) -> None:
if session.closed:
return
self._register_room_handlers(session) self._register_room_handlers(session)
connected = False
# print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") # print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}")
# print(f"[config] token_url={TOKEN_URL}") # print(f"[config] token_url={TOKEN_URL}")
# print(f"[config] room={session.room_name} identity={session.identity}") # print(f"[config] room={session.room_name} identity={session.identity}")
# print(f"[config] livekit_connect_timeout={CONNECT_TIMEOUT_SECONDS}") # print(f"[config] livekit_connect_timeout={CONNECT_TIMEOUT_SECONDS}")
token = await fetch_token(session.room_name, session.identity, session.agent_name) token = await fetch_token(session.room_name, session.identity, session.agent_name)
if session.closed:
return
try: try:
await session.room.connect( await session.room.connect(
@ -1140,7 +1415,10 @@ class ESP32LiveKitBridge:
token, token,
options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS), options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS),
) )
connected = True
except Exception as exc: except Exception as exc:
if session.closed:
return
self._log_exception( self._log_exception(
f"连接 LiveKit 房间失败: room={session.room_name}", f"连接 LiveKit 房间失败: room={session.room_name}",
exc, exc,
@ -1152,6 +1430,9 @@ class ESP32LiveKitBridge:
) )
raise raise
if session.closed:
await self._disconnect_room_quietly(session, "session_closed_after_connect")
return
print(f"已连接到 LiveKit 房间: {session.room.name}") print(f"已连接到 LiveKit 房间: {session.room.name}")
# print(f"[livekit] local_identity={session.room.local_participant.identity}") # print(f"[livekit] local_identity={session.room.local_participant.identity}")
# print(f"[livekit] local_sid={session.room.local_participant.sid}") # print(f"[livekit] local_sid={session.room.local_participant.sid}")
@ -1159,6 +1440,9 @@ class ESP32LiveKitBridge:
self._log_agent_participants(session, "after_connect") self._log_agent_participants(session, "after_connect")
await self.ensure_agent_dispatched(session) await self.ensure_agent_dispatched(session)
if session.closed:
await self._disconnect_room_quietly(session, "session_closed_after_dispatch")
return
track = rtc.LocalAudioTrack.create_audio_track( track = rtc.LocalAudioTrack.create_audio_track(
f"esp32-mic-{session.device_id}", f"esp32-mic-{session.device_id}",
@ -1166,6 +1450,9 @@ class ESP32LiveKitBridge:
) )
options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
publication = await session.room.local_participant.publish_track(track, options) publication = await session.room.local_participant.publish_track(track, options)
if session.closed:
await self._disconnect_room_quietly(session, "session_closed_after_publish")
return
publication_sid = getattr(publication, "sid", None) publication_sid = getattr(publication, "sid", None)
track_sid = getattr(track, "sid", None) track_sid = getattr(track, "sid", None)
# print( # print(
@ -1185,6 +1472,9 @@ class ESP32LiveKitBridge:
return return
print(f"⚠️ agent 等待超时: room={session.room_name}") print(f"⚠️ agent 等待超时: room={session.room_name}")
if connected and session.closed:
await self._disconnect_room_quietly(session, "session_closed_after_agent_wait")
async def start(self) -> None: async def start(self) -> None:
print(f"[config] websocket_port={WS_PORT}") print(f"[config] websocket_port={WS_PORT}")
print(f"[config] websocket_max_queue={WS_MAX_QUEUE} websocket_max_size={WS_MAX_SIZE}") print(f"[config] websocket_max_queue={WS_MAX_QUEUE} websocket_max_size={WS_MAX_SIZE}")
@ -1230,14 +1520,47 @@ class ESP32LiveKitBridge:
session.tts_active = False session.tts_active = False
session.tts_thinking = False session.tts_thinking = False
session.tts_stream_id += 1 session.tts_stream_id += 1
cleanup_tasks: list[asyncio.Task[Any]] = []
current_task = asyncio.current_task()
room_connect_pending = (
session.room_connect_task is not None
and not session.room_connect_task.done()
)
if (
not room_connect_pending
and session.agent_dispatch_task is not None
and not session.agent_dispatch_task.done()
):
session.agent_dispatch_task.cancel()
if session.agent_dispatch_task is not current_task:
cleanup_tasks.append(session.agent_dispatch_task)
session.agent_dispatch_task = None
cleanup_tasks.extend(self._cancel_forwarding_tracks(session))
if session.tts_idle_task is not None: if session.tts_idle_task is not None:
session.tts_idle_task.cancel() session.tts_idle_task.cancel()
if session.tts_idle_task is not current_task:
cleanup_tasks.append(session.tts_idle_task)
session.tts_idle_task = None session.tts_idle_task = None
self._cancel_tts_display_task(session) if session.tts_display_task is not None:
try: session.tts_display_task.cancel()
await session.room.disconnect() if session.tts_display_task is not current_task:
except Exception as exc: cleanup_tasks.append(session.tts_display_task)
print(f"断开 LiveKit 房间失败: room={session.room_name} error={exc}") session.tts_display_task = None
for task in list(session.background_tasks):
if task is current_task or task.done():
continue
task.cancel()
cleanup_tasks.append(task)
if cleanup_tasks:
await asyncio.gather(*cleanup_tasks, return_exceptions=True)
if not room_connect_pending:
await self._disconnect_room_quietly(session, "session_close")
async def forward_audio_to_esp32( async def forward_audio_to_esp32(
self, self,
@ -1407,11 +1730,20 @@ class ESP32LiveKitBridge:
except Exception as exc: except Exception as exc:
print(f"音频流处理错误: {exc}") print(f"音频流处理错误: {exc}")
finally: finally:
close_stream = getattr(audio_stream, "aclose", None) or getattr(audio_stream, "close", None)
if close_stream is not None:
try:
result = close_stream()
if result is not None and hasattr(result, "__await__"):
await result
except Exception as exc:
print(f"关闭 LiveKit 音频流失败: {exc}")
print("🎧 TTS 音频结束") print("🎧 TTS 音频结束")
task = session.forwarding_tracks.get(track_sid) task = session.forwarding_tracks.get(track_sid)
current_task = asyncio.current_task() current_task = asyncio.current_task()
if task is current_task: if task is current_task:
session.forwarding_tracks.pop(track_sid, None) session.forwarding_tracks.pop(track_sid, None)
session.forwarding_track_participants.pop(track_sid, None)
if stream_id == session.tts_stream_id and session.tts_idle_task is not None: if stream_id == session.tts_stream_id and session.tts_idle_task is not None:
session.tts_idle_task.cancel() session.tts_idle_task.cancel()
session.tts_idle_task = None session.tts_idle_task = None
@ -1473,9 +1805,14 @@ class ESP32LiveKitBridge:
f"已发送 server hello: device={device_id} room={session.room_name} " f"已发送 server hello: device={device_id} room={session.room_name} "
f"audio={OUTPUT_SAMPLE_RATE}Hz/{OUTPUT_FRAME_DURATION_MS}ms" f"audio={OUTPUT_SAMPLE_RATE}Hz/{OUTPUT_FRAME_DURATION_MS}ms"
) )
asyncio.create_task(self._run_emotion_test_sequence(session)) self._create_session_task(
session,
self._run_emotion_test_sequence(session),
"emotion 测试序列",
)
room_connect_task = asyncio.create_task(self._connect_session_room(session)) room_connect_task = asyncio.create_task(self._connect_session_room(session))
session.room_connect_task = room_connect_task
room_connect_task.add_done_callback( room_connect_task.add_done_callback(
lambda task: self._track_room_connect_task(session, task) lambda task: self._track_room_connect_task(session, task)
) )
@ -1551,6 +1888,8 @@ class ESP32LiveKitBridge:
abort_reason = reason if isinstance(reason, str) and reason else "button_abort" abort_reason = reason if isinstance(reason, str) and reason else "button_abort"
print(f"处理 ESP32 打断请求: reason={abort_reason}") print(f"处理 ESP32 打断请求: reason={abort_reason}")
await self._abort_tts(session, abort_reason) await self._abort_tts(session, abort_reason)
elif msg_type == "mcp":
await self._publish_mcp_message(session, data)
elif msg_type == "vision" and data.get("state") == "frame": elif msg_type == "vision" and data.get("state") == "frame":
await self._publish_vision_frame(session, data) await self._publish_vision_frame(session, data)
except json.JSONDecodeError: except json.JSONDecodeError:
@ -1561,10 +1900,6 @@ class ESP32LiveKitBridge:
self._log_exception("WebSocket 其他错误", exc) self._log_exception("WebSocket 其他错误", exc)
finally: finally:
print(f"ESP32 断开连接: device={device_id} room={session.room_name}") print(f"ESP32 断开连接: device={device_id} room={session.room_name}")
if room_connect_task is not None and not room_connect_task.done():
room_connect_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await room_connect_task
await self._close_session(session) await self._close_session(session)
self.device_sessions.pop(device_id, None) self.device_sessions.pop(device_id, None)