diff --git a/main/bridge_server.py b/main/bridge_server.py index 51e2a35..0ab4a44 100644 --- a/main/bridge_server.py +++ b/main/bridge_server.py @@ -62,6 +62,7 @@ class ESP32LiveKitBridge: self.audio_queue = queue.Queue() self.wav_writer_thread: Optional[threading.Thread] = None self.stop_event = threading.Event() + self.agent_ready = asyncio.Event() # Moved here to be accessible earlier if needed def _wav_writer_loop(self): import wave @@ -88,6 +89,45 @@ class ESP32LiveKitBridge: def on_connection_state_changed(state: int) -> None: print(f"[livekit] state={rtc.ConnectionState.Name(state)}") + @self.room.on("connected") + def on_connected(): + print("✅ 成功连接到 LiveKit 房间") + if self.room.remote_participants: + self.agent_ready.set() + + @self.room.on("participant_connected") + def on_participant_connected(p: rtc.RemoteParticipant): + print(f"👋 Agent ({p.identity}) 已加入房间") + self.agent_ready.set() + + @self.room.on("data_received") + def on_data_received(data_packet: rtc.DataPacket): + identity = data_packet.participant.identity if data_packet.participant else "未知" + try: + print(f"📩 [数据接收 | {identity}]: {data_packet.data.decode('utf-8')}") + except Exception: + pass + + @self.room.on("transcription_received") + def on_transcription_received( + segments: list[rtc.TranscriptionSegment], + participant: rtc.Participant, + track_pub: rtc.TrackPublication + ): + identity = participant.identity if participant else "未知" + for segment in segments: + status = "✅ 最终结果" if segment.final else "⏳ 正在思考/中间结果" + print(f"🗣️ [{status} | {identity}]: {segment.text}") + # 将识别结果实时推送到 ESP32 + if self.esp_ws is not None: + # 使用局部变量避免类型检查器报错,同时确保在创建任务时 ws 不是 None + ws = self.esp_ws + asyncio.create_task(ws.send(json.dumps({ + "type": "stt", + "text": segment.text, + "final": segment.final + }))) + # 1. 获取 Token 并连接 LiveKit print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") print(f"[config] token_url={TOKEN_URL}") @@ -112,28 +152,18 @@ class ESP32LiveKitBridge: await self.room.local_participant.publish_track(track, options) # 3. 监听房间内的音频 (LiveKit -> ESP32) - # 当房间里有其他人(比如 AI Agent)说话时,触发此回调 @self.room.on("track_subscribed") def on_track_subscribed(track, publication, participant): if track.kind == rtc.TrackKind.KIND_AUDIO: print(f"收到音频流: {participant.identity}") - # 启动一个任务来接收音频流并转发给 ESP32,明确指定采样率为 16000Hz 以进行自动重采样 asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1))) - self.agent_ready = asyncio.Event() - - @self.room.on("participant_connected") - def on_participant_connected(p): - print(f"👤 participant joined: {p.identity}") - if "agent" in p.identity: - self.agent_ready.set() - print("等待 agent 加入...") try: await asyncio.wait_for(self.agent_ready.wait(), timeout=10) - print("✅ agent 已加入") + print("✅ agent 已就绪") except asyncio.TimeoutError: - print("⚠️ agent 未加入(后续可能收不到音频)") + print("⚠️ agent 等待超时(后续可能收不到音频)") async def close(self): """优雅关闭所有连接和资源""" @@ -228,12 +258,15 @@ class ESP32LiveKitBridge: num_samples = len(pcm_bytes) // 2 if num_samples > 0: - frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples) - # Use memoryview to safely copy bytes into the frame data - memoryview(frame.data).cast('B')[:] = pcm_bytes - - # 将 capture_frame 放入当前事件循环的任务中 - await self.mic_source.capture_frame(frame) + try: + # Use the more robust frame creation logic from test_client_wav.py + frame = AudioFrame(pcm_bytes, SAMPLE_RATE, 1, num_samples) + await self.mic_source.capture_frame(frame) + except TypeError: + # Fallback for different SDK versions + frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples) + memoryview(frame.data).cast('B')[:] = pcm_bytes + await self.mic_source.capture_frame(frame) except Exception as e: print(f"Opus audio decode error ({len(message)} bytes): {e}") elif isinstance(message, str): @@ -273,4 +306,4 @@ if __name__ == "__main__": asyncio.run(main()) except Exception as exc: print(f"[error] {exc}", file=sys.stderr) - sys.exit(1) \ No newline at end of file + sys.exit(1)