Compare commits

...

1 Commits

Author SHA1 Message Date
01792e5211 feat: bridge_server livekit 2026-04-27 10:39:21 +08:00
6 changed files with 852 additions and 55 deletions

BIN
main/bridge_debug.wav Normal file

Binary file not shown.

276
main/bridge_server.py Normal file
View File

@ -0,0 +1,276 @@
import asyncio
import websockets
import os
import sys
import httpx
import json
import time
import queue
import threading
from typing import Any, Optional
from livekit import rtc
from livekit.rtc import AudioSource, AudioFrame
from websockets.exceptions import ConnectionClosedError
import http.server
import multipart
from urllib.parse import parse_qs
# 配置信息
# TOKEN_URL = "http://10.6.80.130:8000/v1/token"
# LIVEKIT_WS_URL = "ws://10.6.80.130:8000/"
# ROOM = "vera-room"
# IDENTITY = "vera-1"
# TOKEN_URL = "https://omnichat.bwgdi.com/v1/token"
TOKEN_URL = "http://10.6.80.130:8000/getToken"
LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud"
# LIVEKIT_WS_URL = "wss://rtc.bwgdi.com/"
ROOM = "test-livekit-room2"
IDENTITY = "uv-livekit-hardcoded"
import uuid
# IDENTITY = f"uv-{uuid.uuid4().hex[:6]}"
CONNECT_TIMEOUT_SECONDS = 10.0
WS_PORT = 8080
SAMPLE_RATE = 16000
async def fetch_token() -> str:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(
TOKEN_URL,
params={"room": ROOM, "identity": IDENTITY, "agent_name": "my-agent"},
)
response.raise_for_status()
payload: dict[str, Any] = response.json()
token = payload.get("token")
if not isinstance(token, str) or not token:
raise ValueError(f"token response missing token field: {payload}")
print(f"[token] room={payload.get('room')} identity={payload.get('identity')}")
print(f"[token] jwt_prefix={token[:16]}... len={len(token)}")
print(f"[token] jwt_prefix={token}")
return token
class ESP32LiveKitBridge:
def __init__(self):
self.room = rtc.Room()
# 创建一个音频源,用于将 ESP32 的声音推送到 LiveKit
# 注意:采样率需与 ESP32 发送的一致,通常是 16000 或 24000
self.mic_source = AudioSource(sample_rate=SAMPLE_RATE, num_channels=1)
self.esp_ws = None # 保存 WebSocket 连接
self.audio_queue = queue.Queue()
self.wav_writer_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
def _wav_writer_loop(self):
import wave
print("启动音频保存线程...")
try:
with wave.open("bridge_debug.wav", "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(SAMPLE_RATE)
while not self.stop_event.is_set() or not self.audio_queue.empty():
try:
# 使用 timeout 避免永久阻塞,以便检查 stop_event
pcm_bytes = self.audio_queue.get(timeout=0.5)
wav_file.writeframes(pcm_bytes)
except queue.Empty:
continue
except Exception as e:
print(f"音频保存线程错误: {e}")
finally:
print("音频保存线程退出")
async def start(self):
@self.room.on("connection_state_changed")
def on_connection_state_changed(state: int) -> None:
print(f"[livekit] state={rtc.ConnectionState.Name(state)}")
# 1. 获取 Token 并连接 LiveKit
print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}")
print(f"[config] token_url={TOKEN_URL}")
print(f"[config] room={ROOM} identity={IDENTITY}")
token = await fetch_token()
await asyncio.wait_for(
self.room.connect(
LIVEKIT_WS_URL,
token,
options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS),
),
timeout=CONNECT_TIMEOUT_SECONDS + 2.0,
)
print(f"已连接到 LiveKit 房间: {self.room.name}")
print(f"[livekit] local_identity={self.room.local_participant.identity}")
print(f"[livekit] local_sid={self.room.local_participant.sid}")
# 2. 发布麦克风轨道 (ESP32 -> LiveKit)
track = rtc.LocalAudioTrack.create_audio_track("esp32-mic", self.mic_source)
options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
await self.room.local_participant.publish_track(track, options)
# 3. 监听房间内的音频 (LiveKit -> ESP32)
# 当房间里有其他人(比如 AI Agent说话时触发此回调
@self.room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
print(f"收到音频流: {participant.identity}")
# 启动一个任务来接收音频流并转发给 ESP32明确指定采样率为 16000Hz 以进行自动重采样
asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1)))
self.agent_ready = asyncio.Event()
@self.room.on("participant_connected")
def on_participant_connected(p):
print(f"👤 participant joined: {p.identity}")
if "agent" in p.identity:
self.agent_ready.set()
print("等待 agent 加入...")
try:
await asyncio.wait_for(self.agent_ready.wait(), timeout=10)
print("✅ agent 已加入")
except asyncio.TimeoutError:
print("⚠️ agent 未加入(后续可能收不到音频)")
async def close(self):
"""优雅关闭所有连接和资源"""
self.stop_event.set()
if self.room:
await self.room.disconnect()
async def forward_audio_to_esp32(self, audio_stream):
"""从 LiveKit 接收音频,通过 WebSocket 发回给 ESP32"""
import opuslib
import json
# 创建下行 Opus 编码器
encoder = opuslib.Encoder(SAMPLE_RATE, 1, 'voip')
# 1. 告知 ESP32 开始说话,切换 UI 到“说话中”并准备解码
if self.esp_ws:
await self.esp_ws.send(json.dumps({"type": "tts", "state": "start"}))
try:
async for event in audio_stream:
if self.esp_ws:
try:
# AudioStream 迭代产生的是 AudioFrameEvent需要从中提取 frame
frame = event.frame
# 将 PCM 编码为 Opus 才能发给 ESP32
pcm_data = frame.data.tobytes()
# 使用当前帧的实际采样数进行编码
opus_packet = encoder.encode(pcm_data, frame.samples_per_channel)
await self.esp_ws.send(opus_packet)
except Exception as e:
print(f"发送回 ESP32 失败: {e}")
finally:
# 2. 音频流结束,告知 ESP32 停止说话,切换回聆听或闲置状态
if self.esp_ws:
await self.esp_ws.send(json.dumps({"type": "tts", "state": "stop"}))
async def handle_websocket(self, websocket):
"""处理来自 ESP32 的 WebSocket 连接"""
self.esp_ws = websocket
print("ESP32 已连接")
opus_decoder = None
try:
# 发送 hello 告诉 ESP32 握手成功
hello_msg = {
"type": "hello",
"transport": "websocket",
"audio_params": {
"format": "opus", # 明确要求 ESP32 发送 Opus
"sample_rate": SAMPLE_RATE,
"channels": 1,
"frame_duration": 60
}
}
import json
await websocket.send(json.dumps(hello_msg))
async for message in websocket:
# 接收 ESP32 的数据 -> 推送到 LiveKit
if isinstance(message, bytes):
# 判断如果消息长度极其短并且不是合理的音频流可能是ping包等
if len(message) < 4:
print(f"收到过短的字节消息 ({len(message)} bytes),跳过")
continue
# ESP32 默认使用 websocket_protocol version=1 (见 websocket_protocol.cc)
# 这个版本下,没有 4 字节的 header接收到的就是原生的 Opus 数据帧。
# 直接丢给 opuslib 解码即可。
audio_data = message
print(f"收到音频包长度: {len(message)}")
if audio_data:
try:
# Create Opus decoder if not exists
if opus_decoder is None:
import opuslib
print(f"初始化 Opus 解码器: {SAMPLE_RATE}Hz, mono")
opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1)
# 启动音频保存线程
self.stop_event.clear()
thread = threading.Thread(target=self._wav_writer_loop, daemon=True)
self.wav_writer_thread = thread
thread.start()
# Decode Opus packet.
# Frame size for 60ms is SAMPLE_RATE * 0.06
frame_size = int(SAMPLE_RATE * 0.06)
pcm_bytes = opus_decoder.decode(audio_data, frame_size)
# 将音频数据放入队列由后台线程保存
self.audio_queue.put(pcm_bytes)
num_samples = len(pcm_bytes) // 2
if num_samples > 0:
frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples)
# Use memoryview to safely copy bytes into the frame data
memoryview(frame.data).cast('B')[:] = pcm_bytes
# 将 capture_frame 放入当前事件循环的任务中
await self.mic_source.capture_frame(frame)
except Exception as e:
print(f"Opus audio decode error ({len(message)} bytes): {e}")
elif isinstance(message, str):
import json
try:
data = json.loads(message)
print(f"收到 ESP32 JSON 消息: {data}")
except json.JSONDecodeError:
print(f"收到未知的字符消息: {message}")
except ConnectionClosedError as e:
print(f"ESP32 异常断开: {e}")
except Exception as e:
print(f"WebSocket 其他错误: {e}")
finally:
print("ESP32 断开连接")
self.esp_ws = None
if hasattr(self, "wav_writer_thread") and self.wav_writer_thread:
self.stop_event.set()
# 我们不一定需要 join因为是 daemon=True
# 但这里设置 stop_event 会让线程在完成队列后退出
async def main():
bridge = ESP32LiveKitBridge()
try:
await bridge.start()
# 启动 WebSocket 服务器
async with websockets.serve(bridge.handle_websocket, "0.0.0.0", WS_PORT):
print(f"WebSocket 服务器运行在端口 {WS_PORT},等待 ESP32 连接...")
await asyncio.Future() # 保持运行
finally:
await bridge.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as exc:
print(f"[error] {exc}", file=sys.stderr)
sys.exit(1)

276
main/bridge_server_bak.py Normal file
View File

@ -0,0 +1,276 @@
import asyncio
import websockets
import os
import sys
import httpx
import json
import time
import queue
import threading
from typing import Any, Optional
from livekit import rtc
from livekit.rtc import AudioSource, AudioFrame
from websockets.exceptions import ConnectionClosedError
import http.server
import multipart
from urllib.parse import parse_qs
# 配置信息
# TOKEN_URL = "http://10.6.80.130:8000/v1/token"
# LIVEKIT_WS_URL = "ws://10.6.80.130:8000/"
# ROOM = "vera-room"
# IDENTITY = "vera-1"
# TOKEN_URL = "https://omnichat.bwgdi.com/v1/token"
TOKEN_URL = "http://10.6.80.130:8000/getToken"
LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud"
# LIVEKIT_WS_URL = "wss://rtc.bwgdi.com/"
ROOM = "test-livekit-room2"
IDENTITY = "uv-livekit-hardcoded"
import uuid
# IDENTITY = f"uv-{uuid.uuid4().hex[:6]}"
CONNECT_TIMEOUT_SECONDS = 10.0
WS_PORT = 8080
SAMPLE_RATE = 16000
async def fetch_token() -> str:
async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client:
response = await client.get(
TOKEN_URL,
params={"room": ROOM, "identity": IDENTITY, "agent_name": "my-agent"},
)
response.raise_for_status()
payload: dict[str, Any] = response.json()
token = payload.get("token")
if not isinstance(token, str) or not token:
raise ValueError(f"token response missing token field: {payload}")
print(f"[token] room={payload.get('room')} identity={payload.get('identity')}")
print(f"[token] jwt_prefix={token[:16]}... len={len(token)}")
print(f"[token] jwt_prefix={token}")
return token
class ESP32LiveKitBridge:
def __init__(self):
self.room = rtc.Room()
# 创建一个音频源,用于将 ESP32 的声音推送到 LiveKit
# 注意:采样率需与 ESP32 发送的一致,通常是 16000 或 24000
self.mic_source = AudioSource(sample_rate=SAMPLE_RATE, num_channels=1)
self.esp_ws = None # 保存 WebSocket 连接
self.audio_queue = queue.Queue()
self.wav_writer_thread: Optional[threading.Thread] = None
self.stop_event = threading.Event()
def _wav_writer_loop(self):
import wave
print("启动音频保存线程...")
try:
with wave.open("bridge_debug.wav", "wb") as wav_file:
wav_file.setnchannels(1)
wav_file.setsampwidth(2) # 16-bit
wav_file.setframerate(SAMPLE_RATE)
while not self.stop_event.is_set() or not self.audio_queue.empty():
try:
# 使用 timeout 避免永久阻塞,以便检查 stop_event
pcm_bytes = self.audio_queue.get(timeout=0.5)
wav_file.writeframes(pcm_bytes)
except queue.Empty:
continue
except Exception as e:
print(f"音频保存线程错误: {e}")
finally:
print("音频保存线程退出")
async def start(self):
@self.room.on("connection_state_changed")
def on_connection_state_changed(state: int) -> None:
print(f"[livekit] state={rtc.ConnectionState.Name(state)}")
# 1. 获取 Token 并连接 LiveKit
print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}")
print(f"[config] token_url={TOKEN_URL}")
print(f"[config] room={ROOM} identity={IDENTITY}")
token = await fetch_token()
await asyncio.wait_for(
self.room.connect(
LIVEKIT_WS_URL,
token,
options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS),
),
timeout=CONNECT_TIMEOUT_SECONDS + 2.0,
)
print(f"已连接到 LiveKit 房间: {self.room.name}")
print(f"[livekit] local_identity={self.room.local_participant.identity}")
print(f"[livekit] local_sid={self.room.local_participant.sid}")
# 2. 发布麦克风轨道 (ESP32 -> LiveKit)
track = rtc.LocalAudioTrack.create_audio_track("esp32-mic", self.mic_source)
options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
await self.room.local_participant.publish_track(track, options)
# 3. 监听房间内的音频 (LiveKit -> ESP32)
# 当房间里有其他人(比如 AI Agent说话时触发此回调
@self.room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
print(f"收到音频流: {participant.identity}")
# 启动一个任务来接收音频流并转发给 ESP32明确指定采样率为 16000Hz 以进行自动重采样
asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1)))
self.agent_ready = asyncio.Event()
@self.room.on("participant_connected")
def on_participant_connected(p):
print(f"👤 participant joined: {p.identity}")
if "agent" in p.identity:
self.agent_ready.set()
print("等待 agent 加入...")
try:
await asyncio.wait_for(self.agent_ready.wait(), timeout=10)
print("✅ agent 已加入")
except asyncio.TimeoutError:
print("⚠️ agent 未加入(后续可能收不到音频)")
async def close(self):
"""优雅关闭所有连接和资源"""
self.stop_event.set()
if self.room:
await self.room.disconnect()
async def forward_audio_to_esp32(self, audio_stream):
"""从 LiveKit 接收音频,通过 WebSocket 发回给 ESP32"""
import opuslib
import json
# 创建下行 Opus 编码器
encoder = opuslib.Encoder(SAMPLE_RATE, 1, 'voip')
# 1. 告知 ESP32 开始说话,切换 UI 到“说话中”并准备解码
if self.esp_ws:
await self.esp_ws.send(json.dumps({"type": "tts", "state": "start"}))
try:
async for event in audio_stream:
if self.esp_ws:
try:
# AudioStream 迭代产生的是 AudioFrameEvent需要从中提取 frame
frame = event.frame
# 将 PCM 编码为 Opus 才能发给 ESP32
pcm_data = frame.data.tobytes()
# 使用当前帧的实际采样数进行编码
opus_packet = encoder.encode(pcm_data, frame.samples_per_channel)
await self.esp_ws.send(opus_packet)
except Exception as e:
print(f"发送回 ESP32 失败: {e}")
finally:
# 2. 音频流结束,告知 ESP32 停止说话,切换回聆听或闲置状态
if self.esp_ws:
await self.esp_ws.send(json.dumps({"type": "tts", "state": "stop"}))
async def handle_websocket(self, websocket):
"""处理来自 ESP32 的 WebSocket 连接"""
self.esp_ws = websocket
print("ESP32 已连接")
opus_decoder = None
try:
# 发送 hello 告诉 ESP32 握手成功
hello_msg = {
"type": "hello",
"transport": "websocket",
"audio_params": {
"format": "opus", # 明确要求 ESP32 发送 Opus
"sample_rate": SAMPLE_RATE,
"channels": 1,
"frame_duration": 60
}
}
import json
await websocket.send(json.dumps(hello_msg))
async for message in websocket:
# 接收 ESP32 的数据 -> 推送到 LiveKit
if isinstance(message, bytes):
# 判断如果消息长度极其短并且不是合理的音频流可能是ping包等
if len(message) < 4:
print(f"收到过短的字节消息 ({len(message)} bytes),跳过")
continue
# ESP32 默认使用 websocket_protocol version=1 (见 websocket_protocol.cc)
# 这个版本下,没有 4 字节的 header接收到的就是原生的 Opus 数据帧。
# 直接丢给 opuslib 解码即可。
audio_data = message
print(f"收到音频包长度: {len(message)}")
if audio_data:
try:
# Create Opus decoder if not exists
if opus_decoder is None:
import opuslib
print(f"初始化 Opus 解码器: {SAMPLE_RATE}Hz, mono")
opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1)
# 启动音频保存线程
self.stop_event.clear()
thread = threading.Thread(target=self._wav_writer_loop, daemon=True)
self.wav_writer_thread = thread
thread.start()
# Decode Opus packet.
# Frame size for 60ms is SAMPLE_RATE * 0.06
frame_size = int(SAMPLE_RATE * 0.06)
pcm_bytes = opus_decoder.decode(audio_data, frame_size)
# 将音频数据放入队列由后台线程保存
self.audio_queue.put(pcm_bytes)
num_samples = len(pcm_bytes) // 2
if num_samples > 0:
frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples)
# Use memoryview to safely copy bytes into the frame data
memoryview(frame.data).cast('B')[:] = pcm_bytes
# 将 capture_frame 放入当前事件循环的任务中
await self.mic_source.capture_frame(frame)
except Exception as e:
print(f"Opus audio decode error ({len(message)} bytes): {e}")
elif isinstance(message, str):
import json
try:
data = json.loads(message)
print(f"收到 ESP32 JSON 消息: {data}")
except json.JSONDecodeError:
print(f"收到未知的字符消息: {message}")
except ConnectionClosedError as e:
print(f"ESP32 异常断开: {e}")
except Exception as e:
print(f"WebSocket 其他错误: {e}")
finally:
print("ESP32 断开连接")
self.esp_ws = None
if hasattr(self, "wav_writer_thread") and self.wav_writer_thread:
self.stop_event.set()
# 我们不一定需要 join因为是 daemon=True
# 但这里设置 stop_event 会让线程在完成队列后退出
async def main():
bridge = ESP32LiveKitBridge()
try:
await bridge.start()
# 启动 WebSocket 服务器
async with websockets.serve(bridge.handle_websocket, "0.0.0.0", WS_PORT):
print(f"WebSocket 服务器运行在端口 {WS_PORT},等待 ESP32 连接...")
await asyncio.Future() # 保持运行
finally:
await bridge.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as exc:
print(f"[error] {exc}", file=sys.stderr)
sys.exit(1)

38
main/debug_connection.py Normal file
View File

@ -0,0 +1,38 @@
import asyncio
import websockets
import ssl
import sys
async def test_connect(url):
print(f"Testing connection to: {url}")
try:
# Create a custom SSL context that ignores certificate validation errors
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
async with websockets.connect(url, ssl=ssl_context) as websocket:
print(f"SUCCESS: Connected to {url}")
await websocket.close()
return True
except Exception as e:
print(f"FAILED: Could not connect to {url}. Error: {e}")
return False
async def main():
urls_to_test = [
"wss://10.6.80.12:31581",
"ws://10.6.80.12:31581",
"wss://10.6.80.12:31581",
"ws://10.6.80.12:31581",
]
print("--- Starting LiveKit Connection Probe ---")
for url in urls_to_test:
if await test_connect(url):
print(f"\nFOUND WORKING URL: {url}")
break
print("--- Probe Finished ---")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,30 +1,29 @@
#include "ota.h"
#include "system_info.h"
#include "settings.h"
#include "assets/lang_config.h"
#include "settings.h"
#include "system_info.h"
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#include <cJSON.h>
#include <esp_log.h>
#include <esp_partition.h>
#include <esp_ota_ops.h>
#include <esp_app_format.h>
#include <esp_efuse.h>
#include <esp_efuse_table.h>
#include <esp_heap_caps.h>
#include <esp_log.h>
#include <esp_ota_ops.h>
#include <esp_partition.h>
#include <cJSON.h>
#include <freertos/FreeRTOS.h>
#include <freertos/task.h>
#ifdef SOC_HMAC_SUPPORTED
#include <esp_hmac.h>
#endif
#include <cstring>
#include <vector>
#include <sstream>
#include <algorithm>
#include <cstring>
#include <sstream>
#include <vector>
#define TAG "Ota"
Ota::Ota() {
#ifdef ESP_EFUSE_BLOCK_USR_DATA
// Read Serial Number from efuse user_data
@ -40,8 +39,7 @@ Ota::Ota() {
#endif
}
Ota::~Ota() {
}
Ota::~Ota() {}
std::string Ota::GetCheckVersionUrl() {
Settings settings("wifi", false);
@ -62,7 +60,8 @@ std::unique_ptr<Http> Ota::SetupHttp() {
http->SetHeader("Client-Id", board.GetUuid());
if (has_serial_number_) {
http->SetHeader("Serial-Number", serial_number_.c_str());
ESP_LOGI(TAG, "Setup HTTP, User-Agent: %s, Serial-Number: %s", user_agent.c_str(), serial_number_.c_str());
ESP_LOGI(TAG, "Setup HTTP, User-Agent: %s, Serial-Number: %s", user_agent.c_str(),
serial_number_.c_str());
}
http->SetHeader("User-Agent", user_agent);
http->SetHeader("Accept-Language", Lang::CODE);
@ -113,7 +112,7 @@ esp_err_t Ota::CheckVersion() {
// Parse the JSON response and check if the version is newer
// If it is, set has_new_version_ to true and store the new version and URL
cJSON *root = cJSON_Parse(data.c_str());
cJSON* root = cJSON_Parse(data.c_str());
if (root == NULL) {
ESP_LOGE(TAG, "Failed to parse JSON response");
return ESP_ERR_INVALID_RESPONSE;
@ -121,7 +120,7 @@ esp_err_t Ota::CheckVersion() {
has_activation_code_ = false;
has_activation_challenge_ = false;
cJSON *activation = cJSON_GetObjectItem(root, "activation");
cJSON* activation = cJSON_GetObjectItem(root, "activation");
if (cJSON_IsObject(activation)) {
cJSON* message = cJSON_GetObjectItem(activation, "message");
if (cJSON_IsString(message)) {
@ -144,11 +143,11 @@ esp_err_t Ota::CheckVersion() {
}
has_mqtt_config_ = false;
cJSON *mqtt = cJSON_GetObjectItem(root, "mqtt");
cJSON* mqtt = cJSON_GetObjectItem(root, "mqtt");
if (cJSON_IsObject(mqtt)) {
Settings settings("mqtt", true);
cJSON *item = NULL;
cJSON_ArrayForEach(item, mqtt) {
cJSON* item = NULL;
cJSON_ArrayForEach (item, mqtt) {
if (cJSON_IsString(item)) {
if (settings.GetString(item->string) != item->valuestring) {
settings.SetString(item->string, item->valuestring);
@ -159,17 +158,17 @@ esp_err_t Ota::CheckVersion() {
}
}
}
has_mqtt_config_ = true;
has_mqtt_config_ = false;
} else {
ESP_LOGI(TAG, "No mqtt section found !");
}
has_websocket_config_ = false;
cJSON *websocket = cJSON_GetObjectItem(root, "websocket");
cJSON* websocket = cJSON_GetObjectItem(root, "websocket");
if (cJSON_IsObject(websocket)) {
Settings settings("websocket", true);
cJSON *item = NULL;
cJSON_ArrayForEach(item, websocket) {
cJSON* item = NULL;
cJSON_ArrayForEach (item, websocket) {
if (cJSON_IsString(item)) {
if (settings.GetString(item->string) != item->valuestring) {
settings.SetString(item->string, item->valuestring);
@ -185,11 +184,16 @@ esp_err_t Ota::CheckVersion() {
ESP_LOGI(TAG, "No websocket section found!");
}
has_server_time_ = false;
cJSON *server_time = cJSON_GetObjectItem(root, "server_time");
// [开发调试] 强制修改 WebSocket URL 指向本地 Bridge 服务
// 请将下面的 IP 地址修改为你电脑的局域网 IP (例如 192.168.1.5)
Settings settings("websocket", true);
settings.SetString("url", "ws://10.6.80.130:8080");
has_websocket_config_ = true;
cJSON* server_time = cJSON_GetObjectItem(root, "server_time");
if (cJSON_IsObject(server_time)) {
cJSON *timestamp = cJSON_GetObjectItem(server_time, "timestamp");
cJSON *timezone_offset = cJSON_GetObjectItem(server_time, "timezone_offset");
cJSON* timestamp = cJSON_GetObjectItem(server_time, "timestamp");
cJSON* timezone_offset = cJSON_GetObjectItem(server_time, "timezone_offset");
if (cJSON_IsNumber(timestamp)) {
// 设置系统时间
@ -198,10 +202,10 @@ esp_err_t Ota::CheckVersion() {
// 如果有时区偏移,计算本地时间
if (cJSON_IsNumber(timezone_offset)) {
ts += (timezone_offset->valueint * 60 * 1000); // 转换分钟为毫秒
ts += (timezone_offset->valueint * 60 * 1000); // 转换分钟为毫秒
}
tv.tv_sec = (time_t)(ts / 1000); // 转换毫秒为秒
tv.tv_sec = (time_t)(ts / 1000); // 转换毫秒为秒
tv.tv_usec = (suseconds_t)((long long)ts % 1000) * 1000; // 剩余的毫秒转换为微秒
settimeofday(&tv, NULL);
has_server_time_ = true;
@ -211,13 +215,13 @@ esp_err_t Ota::CheckVersion() {
}
has_new_version_ = false;
cJSON *firmware = cJSON_GetObjectItem(root, "firmware");
cJSON* firmware = cJSON_GetObjectItem(root, "firmware");
if (cJSON_IsObject(firmware)) {
cJSON *version = cJSON_GetObjectItem(firmware, "version");
cJSON* version = cJSON_GetObjectItem(firmware, "version");
if (cJSON_IsString(version)) {
firmware_version_ = version->valuestring;
}
cJSON *url = cJSON_GetObjectItem(firmware, "url");
cJSON* url = cJSON_GetObjectItem(firmware, "url");
if (cJSON_IsString(url)) {
firmware_url_ = url->valuestring;
}
@ -231,7 +235,7 @@ esp_err_t Ota::CheckVersion() {
ESP_LOGI(TAG, "Current is the latest version");
}
// If the force flag is set to 1, the given version is forced to be installed
cJSON *force = cJSON_GetObjectItem(firmware, "force");
cJSON* force = cJSON_GetObjectItem(firmware, "force");
if (cJSON_IsNumber(force) && force->valueint == 1) {
has_new_version_ = true;
}
@ -264,7 +268,8 @@ void Ota::MarkCurrentVersionValid() {
}
}
bool Ota::Upgrade(const std::string& firmware_url, std::function<void(int progress, size_t speed)> callback) {
bool Ota::Upgrade(const std::string& firmware_url,
std::function<void(int progress, size_t speed)> callback) {
ESP_LOGI(TAG, "Upgrading firmware from %s", firmware_url.c_str());
esp_ota_handle_t update_handle = 0;
auto update_partition = esp_ota_get_next_update_partition(NULL);
@ -273,7 +278,8 @@ bool Ota::Upgrade(const std::string& firmware_url, std::function<void(int progre
return false;
}
ESP_LOGI(TAG, "Writing to partition %s at offset 0x%lx", update_partition->label, update_partition->address);
ESP_LOGI(TAG, "Writing to partition %s at offset 0x%lx", update_partition->label,
update_partition->address);
bool image_header_checked = false;
std::string image_header;
@ -319,7 +325,8 @@ bool Ota::Upgrade(const std::string& firmware_url, std::function<void(int progre
buffer_offset += ret;
if (esp_timer_get_time() - last_calc_time >= 1000000 || ret == 0) {
size_t progress = total_read * 100 / content_length;
ESP_LOGI(TAG, "Progress: %u%% (%u/%u), Speed: %uB/s", progress, total_read, content_length, recent_read);
ESP_LOGI(TAG, "Progress: %u%% (%u/%u), Speed: %uB/s", progress, total_read,
content_length, recent_read);
if (callback) {
callback(progress, recent_read);
}
@ -329,9 +336,14 @@ bool Ota::Upgrade(const std::string& firmware_url, std::function<void(int progre
if (!image_header_checked) {
image_header.append(buffer, buffer_offset);
if (image_header.size() >= sizeof(esp_image_header_t) + sizeof(esp_image_segment_header_t) + sizeof(esp_app_desc_t)) {
if (image_header.size() >= sizeof(esp_image_header_t) +
sizeof(esp_image_segment_header_t) +
sizeof(esp_app_desc_t)) {
esp_app_desc_t new_app_info;
memcpy(&new_app_info, image_header.data() + sizeof(esp_image_header_t) + sizeof(esp_image_segment_header_t), sizeof(esp_app_desc_t));
memcpy(&new_app_info,
image_header.data() + sizeof(esp_image_header_t) +
sizeof(esp_image_segment_header_t),
sizeof(esp_app_desc_t));
if (esp_ota_begin(update_partition, OTA_WITH_SEQUENTIAL_WRITES, &update_handle)) {
esp_ota_abort(update_handle);
@ -390,7 +402,6 @@ bool Ota::StartUpgrade(std::function<void(int progress, size_t speed)> callback)
return Upgrade(firmware_url_, callback);
}
std::vector<int> Ota::ParseVersion(const std::string& version) {
std::vector<int> versionNumbers;
std::stringstream ss(version);
@ -425,10 +436,11 @@ std::string Ota::GetActivationPayload() {
std::string hmac_hex;
#ifdef SOC_HMAC_SUPPORTED
uint8_t hmac_result[32]; // SHA-256 输出为32字节
uint8_t hmac_result[32]; // SHA-256 输出为32字节
// 使用Key0计算HMAC
esp_err_t ret = esp_hmac_calculate(HMAC_KEY0, (uint8_t*)activation_challenge_.data(), activation_challenge_.size(), hmac_result);
esp_err_t ret = esp_hmac_calculate(HMAC_KEY0, (uint8_t*)activation_challenge_.data(),
activation_challenge_.size(), hmac_result);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "HMAC calculation failed: %s", esp_err_to_name(ret));
return "{}";
@ -441,7 +453,7 @@ std::string Ota::GetActivationPayload() {
}
#endif
cJSON *payload = cJSON_CreateObject();
cJSON* payload = cJSON_CreateObject();
cJSON_AddStringToObject(payload, "algorithm", "hmac-sha256");
cJSON_AddStringToObject(payload, "serial_number", serial_number_.c_str());
cJSON_AddStringToObject(payload, "challenge", activation_challenge_.c_str());
@ -483,7 +495,8 @@ esp_err_t Ota::Activate() {
return ESP_ERR_TIMEOUT;
}
if (status_code != 200) {
ESP_LOGE(TAG, "Failed to activate, code: %d, body: %s", status_code, http->ReadAll().c_str());
ESP_LOGE(TAG, "Failed to activate, code: %d, body: %s", status_code,
http->ReadAll().c_str());
return ESP_FAIL;
}

194
main/test_client_wav.py Normal file
View File

@ -0,0 +1,194 @@
import asyncio
import os
import sys
import wave
import time
from dotenv import load_dotenv
# Try to load credentials from .env.local
load_dotenv(".env.local")
from livekit import rtc
async def publish_audio_from_wav(room: rtc.Room, wav_path: str):
print(f"🎵 准备加载音频文件: {wav_path}")
if not os.path.exists(wav_path):
print(f"❌ 找不到文件 {wav_path}")
return
with wave.open(wav_path, "rb") as wf:
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
if sampwidth != 2:
print("❌ 错误:只支持 16-bit PCM (S16LE) 编码的 WAV 文件。")
return
print(f"📊 音频信息: 采样率 {sample_rate}Hz, 通道数 {num_channels}")
source = rtc.AudioSource(sample_rate, num_channels)
track = rtc.LocalAudioTrack.create_audio_track("agent_input_audio", source)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_MICROPHONE
await room.local_participant.publish_track(track, options)
print("✅ 成功发布麦克风音频轨道,开始推流...")
# 每次读取并推送 20ms 的数据
chunk_duration_ms = 20
samples_per_chunk = int(sample_rate * (chunk_duration_ms / 1000.0))
bytes_per_chunk = samples_per_chunk * num_channels * sampwidth
while True:
data = wf.readframes(samples_per_chunk)
if not data:
break
# 根据已读取的数据计算出完整的采样数(最后一帧可能不足 20ms
frame_samples = len(data) // (num_channels * sampwidth)
# 使用 LiveKit SDK 封装 AudioFrame
try:
# LiveKit Python SDK version >= 0.15
audio_frame = rtc.AudioFrame(data, sample_rate, num_channels, frame_samples)
await source.capture_frame(audio_frame)
except TypeError:
# 若抛出 TypeError可能 SDK 版本有差异,尝试旧版本 API 或者直接 copy
frame = rtc.AudioFrame.create(sample_rate, num_channels, frame_samples)
frame.data[:] = data
await source.capture_frame(frame)
# 严格控制发送速率,避免瞬时把整个音频发过去而导致对面不识别(模拟真实发音)
await asyncio.sleep(chunk_duration_ms / 1000.0)
print("🎉 录音流推送完毕!等待 Agent 回复中...")
async def save_audio_stream(track: rtc.RemoteAudioTrack):
# 为避免旧文件冲突,加个时间戳
filename = f"agent_response_{int(time.time())}.wav"
print(f"🎙️ 正在将 Agent 的声音写入文件: {filename}")
stream = rtc.AudioStream(track)
wf = None
try:
async for event in stream:
# 接收到第一帧时初始化 WAV 格式
if wf is None:
wf = wave.open(filename, "wb")
wf.setnchannels(event.frame.num_channels)
wf.setsampwidth(2) # 16-bit
wf.setframerate(event.frame.sample_rate)
# 写入当前帧音频数据 (16-bit PCM)
wf.writeframes(bytes(event.frame.data))
except Exception as e:
print(f"音频流断开或出错: {e}")
finally:
if wf is not None:
wf.close()
print(f"💾 Agent 语音结果已成功保存到: {filename}")
async def main(room_url: str, token: str, wav_path: str):
room = rtc.Room()
agent_ready = asyncio.Event()
@room.on("connected")
def on_connected():
print("✅ 成功连接到 LiveKit 房间")
# 如果连接时房间里已经有 Agent远程参与者直接准备触发
if room.remote_participants:
agent_ready.set()
@room.on("participant_connected")
def on_participant_connected(participant: rtc.RemoteParticipant):
print(f"👋 Agent ({participant.identity}) 已加入房间")
agent_ready.set()
@room.on("data_received")
def on_data_received(data_packet: rtc.DataPacket):
identity = data_packet.participant.identity if data_packet.participant else "未知"
try:
print(f"📩 [数据接收 | {identity}]: {data_packet.data.decode('utf-8')}")
except Exception:
pass
@room.on("transcription_received")
def on_transcription_received(
segments: list[rtc.TranscriptionSegment],
participant: rtc.Participant,
track_pub: rtc.TrackPublication
):
identity = participant.identity if participant else "未知"
for segment in segments:
status = "✅ 最终结果" if segment.final else "⏳ 正在思考/中间结果"
print(f"🗣️ [{status} | {identity}]: {segment.text}")
@room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.RemoteTrackPublication,
participant: rtc.RemoteParticipant
):
# 当 Agent 发出新的声音(音频轨道)时,我们订阅并保存
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(save_audio_stream(track))
print("⏳ 正在建立连接...")
await room.connect(room_url, token)
print("⏳ 等待 Agent 初始化并加入房间...")
await agent_ready.wait()
# 稍微延迟半秒钟,确保 Agent 侧面的准备(如模型加载等)一切就绪
await asyncio.sleep(0.5)
# 开始推送本地 wav 音频
asyncio.create_task(publish_audio_from_wav(room, wav_path))
try:
await asyncio.Event().wait()
except KeyboardInterrupt:
print("\n断开连接中...")
finally:
await room.disconnect()
if __name__ == "__main__":
if len(sys.argv) < 2:
print("❌ 用法: python test_client_wav.py <WAV文件路径> [LIVEKIT_URL] [LIVEKIT_TOKEN/API_KEY]")
print("说明:\n1. 必须提供 WAV 路径。")
print("2. 自动从 .env.local 读取 LIVEKIT_URL。并在没有提供 Token 时自动向 localhost:8000/getToken 请求。")
sys.exit(1)
wav_file = sys.argv[1]
url = os.getenv("LIVEKIT_URL")
token = None
if len(sys.argv) >= 4:
url = sys.argv[2]
token = sys.argv[3]
if not token:
import urllib.request
import json
import random
# 每次使用随机的测试房间,防止上一次没退出的 agent 堆积在同一个房间里导致多重回复
unique_room = f"test-room-{random.randint(1000, 9999)}"
print(f"🔄 正在通过本地服务获取 Token请求加入全新独立房间: {unique_room} ...")
try:
req = urllib.request.urlopen(f"http://localhost:8000/getToken?room={unique_room}&identity=python_tester&agent_name=my-agent")
res_body = req.read().decode('utf-8')
data = json.loads(res_body)
token = data.get("token")
print("✅ 成功获取了包含了 Agent dispatch 的临时 Token")
except Exception as e:
print(f"❌ 获取 Token 失败,错误信息: {e}")
print("若本地 token 服务未启动,请手动提供有效的测试 token")
sys.exit(1)
if not url or not token:
print("❌ 缺少 LiveKit URL 或 Token")
sys.exit(1)
asyncio.run(main(url, token, wav_file))