feat: bridge_server livekit first commit

This commit is contained in:
0Xiao0
2026-04-27 15:14:32 +08:00
parent 01792e5211
commit 10125c9cd4

View File

@ -62,6 +62,7 @@ class ESP32LiveKitBridge:
self.audio_queue = queue.Queue()
self.wav_writer_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
self.agent_ready = asyncio.Event() # Moved here to be accessible earlier if needed
def _wav_writer_loop(self):
import wave
@ -88,6 +89,45 @@ class ESP32LiveKitBridge:
def on_connection_state_changed(state: int) -> None:
print(f"[livekit] state={rtc.ConnectionState.Name(state)}")
@self.room.on("connected")
def on_connected():
print("✅ 成功连接到 LiveKit 房间")
if self.room.remote_participants:
self.agent_ready.set()
@self.room.on("participant_connected")
def on_participant_connected(p: rtc.RemoteParticipant):
print(f"👋 Agent ({p.identity}) 已加入房间")
self.agent_ready.set()
@self.room.on("data_received")
def on_data_received(data_packet: rtc.DataPacket):
identity = data_packet.participant.identity if data_packet.participant else "未知"
try:
print(f"📩 [数据接收 | {identity}]: {data_packet.data.decode('utf-8')}")
except Exception:
pass
@self.room.on("transcription_received")
def on_transcription_received(
segments: list[rtc.TranscriptionSegment],
participant: rtc.Participant,
track_pub: rtc.TrackPublication
):
identity = participant.identity if participant else "未知"
for segment in segments:
status = "✅ 最终结果" if segment.final else "⏳ 正在思考/中间结果"
print(f"🗣️ [{status} | {identity}]: {segment.text}")
# 将识别结果实时推送到 ESP32
if self.esp_ws is not None:
# 使用局部变量避免类型检查器报错,同时确保在创建任务时 ws 不是 None
ws = self.esp_ws
asyncio.create_task(ws.send(json.dumps({
"type": "stt",
"text": segment.text,
"final": segment.final
})))
# 1. 获取 Token 并连接 LiveKit
print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}")
print(f"[config] token_url={TOKEN_URL}")
@ -112,28 +152,18 @@ class ESP32LiveKitBridge:
await self.room.local_participant.publish_track(track, options)
# 3. 监听房间内的音频 (LiveKit -> ESP32)
# 当房间里有其他人(比如 AI Agent说话时触发此回调
@self.room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
print(f"收到音频流: {participant.identity}")
# 启动一个任务来接收音频流并转发给 ESP32明确指定采样率为 16000Hz 以进行自动重采样
asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1)))
self.agent_ready = asyncio.Event()
@self.room.on("participant_connected")
def on_participant_connected(p):
print(f"👤 participant joined: {p.identity}")
if "agent" in p.identity:
self.agent_ready.set()
print("等待 agent 加入...")
try:
await asyncio.wait_for(self.agent_ready.wait(), timeout=10)
print("✅ agent 已加入")
print("✅ agent 已就绪")
except asyncio.TimeoutError:
print("⚠️ agent 未加入(后续可能收不到音频)")
print("⚠️ agent 等待超时(后续可能收不到音频)")
async def close(self):
"""优雅关闭所有连接和资源"""
@ -228,12 +258,15 @@ class ESP32LiveKitBridge:
num_samples = len(pcm_bytes) // 2
if num_samples > 0:
frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples)
# Use memoryview to safely copy bytes into the frame data
memoryview(frame.data).cast('B')[:] = pcm_bytes
# 将 capture_frame 放入当前事件循环的任务中
await self.mic_source.capture_frame(frame)
try:
# Use the more robust frame creation logic from test_client_wav.py
frame = AudioFrame(pcm_bytes, SAMPLE_RATE, 1, num_samples)
await self.mic_source.capture_frame(frame)
except TypeError:
# Fallback for different SDK versions
frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples)
memoryview(frame.data).cast('B')[:] = pcm_bytes
await self.mic_source.capture_frame(frame)
except Exception as e:
print(f"Opus audio decode error ({len(message)} bytes): {e}")
elif isinstance(message, str):
@ -273,4 +306,4 @@ if __name__ == "__main__":
asyncio.run(main())
except Exception as exc:
print(f"[error] {exc}", file=sys.stderr)
sys.exit(1)
sys.exit(1)