import asyncio import websockets import os import sys import httpx import json import time import queue import threading from typing import Any, Optional from livekit import rtc from livekit.rtc import AudioSource, AudioFrame from websockets.exceptions import ConnectionClosedError import http.server import multipart from urllib.parse import parse_qs # 配置信息 # TOKEN_URL = "http://10.6.80.130:8000/v1/token" # LIVEKIT_WS_URL = "ws://10.6.80.130:8000/" # ROOM = "vera-room" # IDENTITY = "vera-1" # TOKEN_URL = "https://omnichat.bwgdi.com/v1/token" TOKEN_URL = "http://10.6.80.130:8000/getToken" LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud" # LIVEKIT_WS_URL = "wss://rtc.bwgdi.com/" ROOM = "test-livekit-room2" IDENTITY = "uv-livekit-hardcoded" import uuid # IDENTITY = f"uv-{uuid.uuid4().hex[:6]}" CONNECT_TIMEOUT_SECONDS = 10.0 WS_PORT = 8080 SAMPLE_RATE = 16000 async def fetch_token() -> str: async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: response = await client.get( TOKEN_URL, params={"room": ROOM, "identity": IDENTITY, "agent_name": "my-agent"}, ) response.raise_for_status() payload: dict[str, Any] = response.json() token = payload.get("token") if not isinstance(token, str) or not token: raise ValueError(f"token response missing token field: {payload}") print(f"[token] room={payload.get('room')} identity={payload.get('identity')}") print(f"[token] jwt_prefix={token[:16]}... len={len(token)}") print(f"[token] jwt_prefix={token}") return token class ESP32LiveKitBridge: def __init__(self): self.room = rtc.Room() # 创建一个音频源,用于将 ESP32 的声音推送到 LiveKit # 注意:采样率需与 ESP32 发送的一致,通常是 16000 或 24000 self.mic_source = AudioSource(sample_rate=SAMPLE_RATE, num_channels=1) self.esp_ws = None # 保存 WebSocket 连接 self.audio_queue = queue.Queue() self.wav_writer_thread: Optional[threading.Thread] = None self.stop_event = threading.Event() def _wav_writer_loop(self): import wave print("启动音频保存线程...") try: with wave.open("bridge_debug.wav", "wb") as wav_file: wav_file.setnchannels(1) wav_file.setsampwidth(2) # 16-bit wav_file.setframerate(SAMPLE_RATE) while not self.stop_event.is_set() or not self.audio_queue.empty(): try: # 使用 timeout 避免永久阻塞,以便检查 stop_event pcm_bytes = self.audio_queue.get(timeout=0.5) wav_file.writeframes(pcm_bytes) except queue.Empty: continue except Exception as e: print(f"音频保存线程错误: {e}") finally: print("音频保存线程退出") async def start(self): @self.room.on("connection_state_changed") def on_connection_state_changed(state: int) -> None: print(f"[livekit] state={rtc.ConnectionState.Name(state)}") # 1. 获取 Token 并连接 LiveKit print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") print(f"[config] token_url={TOKEN_URL}") print(f"[config] room={ROOM} identity={IDENTITY}") token = await fetch_token() await asyncio.wait_for( self.room.connect( LIVEKIT_WS_URL, token, options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS), ), timeout=CONNECT_TIMEOUT_SECONDS + 2.0, ) print(f"已连接到 LiveKit 房间: {self.room.name}") print(f"[livekit] local_identity={self.room.local_participant.identity}") print(f"[livekit] local_sid={self.room.local_participant.sid}") # 2. 发布麦克风轨道 (ESP32 -> LiveKit) track = rtc.LocalAudioTrack.create_audio_track("esp32-mic", self.mic_source) options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) await self.room.local_participant.publish_track(track, options) # 3. 监听房间内的音频 (LiveKit -> ESP32) # 当房间里有其他人(比如 AI Agent)说话时,触发此回调 @self.room.on("track_subscribed") def on_track_subscribed(track, publication, participant): if track.kind == rtc.TrackKind.KIND_AUDIO: print(f"收到音频流: {participant.identity}") # 启动一个任务来接收音频流并转发给 ESP32,明确指定采样率为 16000Hz 以进行自动重采样 asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1))) self.agent_ready = asyncio.Event() @self.room.on("participant_connected") def on_participant_connected(p): print(f"👤 participant joined: {p.identity}") if "agent" in p.identity: self.agent_ready.set() print("等待 agent 加入...") try: await asyncio.wait_for(self.agent_ready.wait(), timeout=10) print("✅ agent 已加入") except asyncio.TimeoutError: print("⚠️ agent 未加入(后续可能收不到音频)") async def close(self): """优雅关闭所有连接和资源""" self.stop_event.set() if self.room: await self.room.disconnect() async def forward_audio_to_esp32(self, audio_stream): """从 LiveKit 接收音频,通过 WebSocket 发回给 ESP32""" import opuslib import json # 创建下行 Opus 编码器 encoder = opuslib.Encoder(SAMPLE_RATE, 1, 'voip') # 1. 告知 ESP32 开始说话,切换 UI 到“说话中”并准备解码 if self.esp_ws: await self.esp_ws.send(json.dumps({"type": "tts", "state": "start"})) try: async for event in audio_stream: if self.esp_ws: try: # AudioStream 迭代产生的是 AudioFrameEvent,需要从中提取 frame frame = event.frame # 将 PCM 编码为 Opus 才能发给 ESP32 pcm_data = frame.data.tobytes() # 使用当前帧的实际采样数进行编码 opus_packet = encoder.encode(pcm_data, frame.samples_per_channel) await self.esp_ws.send(opus_packet) except Exception as e: print(f"发送回 ESP32 失败: {e}") finally: # 2. 音频流结束,告知 ESP32 停止说话,切换回聆听或闲置状态 if self.esp_ws: await self.esp_ws.send(json.dumps({"type": "tts", "state": "stop"})) async def handle_websocket(self, websocket): """处理来自 ESP32 的 WebSocket 连接""" self.esp_ws = websocket print("ESP32 已连接") opus_decoder = None try: # 发送 hello 告诉 ESP32 握手成功 hello_msg = { "type": "hello", "transport": "websocket", "audio_params": { "format": "opus", # 明确要求 ESP32 发送 Opus "sample_rate": SAMPLE_RATE, "channels": 1, "frame_duration": 60 } } import json await websocket.send(json.dumps(hello_msg)) async for message in websocket: # 接收 ESP32 的数据 -> 推送到 LiveKit if isinstance(message, bytes): # 判断如果消息长度极其短并且不是合理的音频流,可能是ping包等 if len(message) < 4: print(f"收到过短的字节消息 ({len(message)} bytes),跳过") continue # ESP32 默认使用 websocket_protocol version=1 (见 websocket_protocol.cc) # 这个版本下,没有 4 字节的 header,接收到的就是原生的 Opus 数据帧。 # 直接丢给 opuslib 解码即可。 audio_data = message print(f"收到音频包长度: {len(message)}") if audio_data: try: # Create Opus decoder if not exists if opus_decoder is None: import opuslib print(f"初始化 Opus 解码器: {SAMPLE_RATE}Hz, mono") opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1) # 启动音频保存线程 self.stop_event.clear() thread = threading.Thread(target=self._wav_writer_loop, daemon=True) self.wav_writer_thread = thread thread.start() # Decode Opus packet. # Frame size for 60ms is SAMPLE_RATE * 0.06 frame_size = int(SAMPLE_RATE * 0.06) pcm_bytes = opus_decoder.decode(audio_data, frame_size) # 将音频数据放入队列由后台线程保存 self.audio_queue.put(pcm_bytes) num_samples = len(pcm_bytes) // 2 if num_samples > 0: frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples) # Use memoryview to safely copy bytes into the frame data memoryview(frame.data).cast('B')[:] = pcm_bytes # 将 capture_frame 放入当前事件循环的任务中 await self.mic_source.capture_frame(frame) except Exception as e: print(f"Opus audio decode error ({len(message)} bytes): {e}") elif isinstance(message, str): import json try: data = json.loads(message) print(f"收到 ESP32 JSON 消息: {data}") except json.JSONDecodeError: print(f"收到未知的字符消息: {message}") except ConnectionClosedError as e: print(f"ESP32 异常断开: {e}") except Exception as e: print(f"WebSocket 其他错误: {e}") finally: print("ESP32 断开连接") self.esp_ws = None if hasattr(self, "wav_writer_thread") and self.wav_writer_thread: self.stop_event.set() # 我们不一定需要 join,因为是 daemon=True # 但这里设置 stop_event 会让线程在完成队列后退出 async def main(): bridge = ESP32LiveKitBridge() try: await bridge.start() # 启动 WebSocket 服务器 async with websockets.serve(bridge.handle_websocket, "0.0.0.0", WS_PORT): print(f"WebSocket 服务器运行在端口 {WS_PORT},等待 ESP32 连接...") await asyncio.Future() # 保持运行 finally: await bridge.close() if __name__ == "__main__": try: asyncio.run(main()) except Exception as exc: print(f"[error] {exc}", file=sys.stderr) sys.exit(1)