diff --git a/main/bridge_debug.wav b/main/bridge_debug.wav new file mode 100644 index 0000000..e67e9bf Binary files /dev/null and b/main/bridge_debug.wav differ diff --git a/main/bridge_server.py b/main/bridge_server.py new file mode 100644 index 0000000..51e2a35 --- /dev/null +++ b/main/bridge_server.py @@ -0,0 +1,276 @@ + +import asyncio +import websockets +import os +import sys +import httpx +import json +import time +import queue +import threading +from typing import Any, Optional +from livekit import rtc +from livekit.rtc import AudioSource, AudioFrame +from websockets.exceptions import ConnectionClosedError +import http.server +import multipart +from urllib.parse import parse_qs + +# 配置信息 +# TOKEN_URL = "http://10.6.80.130:8000/v1/token" +# LIVEKIT_WS_URL = "ws://10.6.80.130:8000/" +# ROOM = "vera-room" +# IDENTITY = "vera-1" +# TOKEN_URL = "https://omnichat.bwgdi.com/v1/token" +TOKEN_URL = "http://10.6.80.130:8000/getToken" +LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud" +# LIVEKIT_WS_URL = "wss://rtc.bwgdi.com/" +ROOM = "test-livekit-room2" +IDENTITY = "uv-livekit-hardcoded" +import uuid +# IDENTITY = f"uv-{uuid.uuid4().hex[:6]}" +CONNECT_TIMEOUT_SECONDS = 10.0 +WS_PORT = 8080 + +SAMPLE_RATE = 16000 + +async def fetch_token() -> str: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + response = await client.get( + TOKEN_URL, + params={"room": ROOM, "identity": IDENTITY, "agent_name": "my-agent"}, + ) + response.raise_for_status() + + payload: dict[str, Any] = response.json() + token = payload.get("token") + if not isinstance(token, str) or not token: + raise ValueError(f"token response missing token field: {payload}") + + print(f"[token] room={payload.get('room')} identity={payload.get('identity')}") + print(f"[token] jwt_prefix={token[:16]}... len={len(token)}") + print(f"[token] jwt_prefix={token}") + return token + +class ESP32LiveKitBridge: + def __init__(self): + self.room = rtc.Room() + # 创建一个音频源,用于将 ESP32 的声音推送到 LiveKit + # 注意:采样率需与 ESP32 发送的一致,通常是 16000 或 24000 + self.mic_source = AudioSource(sample_rate=SAMPLE_RATE, num_channels=1) + self.esp_ws = None # 保存 WebSocket 连接 + self.audio_queue = queue.Queue() + self.wav_writer_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + + def _wav_writer_loop(self): + import wave + print("启动音频保存线程...") + try: + with wave.open("bridge_debug.wav", "wb") as wav_file: + wav_file.setnchannels(1) + wav_file.setsampwidth(2) # 16-bit + wav_file.setframerate(SAMPLE_RATE) + while not self.stop_event.is_set() or not self.audio_queue.empty(): + try: + # 使用 timeout 避免永久阻塞,以便检查 stop_event + pcm_bytes = self.audio_queue.get(timeout=0.5) + wav_file.writeframes(pcm_bytes) + except queue.Empty: + continue + except Exception as e: + print(f"音频保存线程错误: {e}") + finally: + print("音频保存线程退出") + + async def start(self): + @self.room.on("connection_state_changed") + def on_connection_state_changed(state: int) -> None: + print(f"[livekit] state={rtc.ConnectionState.Name(state)}") + + # 1. 获取 Token 并连接 LiveKit + print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") + print(f"[config] token_url={TOKEN_URL}") + print(f"[config] room={ROOM} identity={IDENTITY}") + token = await fetch_token() + + await asyncio.wait_for( + self.room.connect( + LIVEKIT_WS_URL, + token, + options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS), + ), + timeout=CONNECT_TIMEOUT_SECONDS + 2.0, + ) + print(f"已连接到 LiveKit 房间: {self.room.name}") + print(f"[livekit] local_identity={self.room.local_participant.identity}") + print(f"[livekit] local_sid={self.room.local_participant.sid}") + + # 2. 发布麦克风轨道 (ESP32 -> LiveKit) + track = rtc.LocalAudioTrack.create_audio_track("esp32-mic", self.mic_source) + options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + await self.room.local_participant.publish_track(track, options) + + # 3. 监听房间内的音频 (LiveKit -> ESP32) + # 当房间里有其他人(比如 AI Agent)说话时,触发此回调 + @self.room.on("track_subscribed") + def on_track_subscribed(track, publication, participant): + if track.kind == rtc.TrackKind.KIND_AUDIO: + print(f"收到音频流: {participant.identity}") + # 启动一个任务来接收音频流并转发给 ESP32,明确指定采样率为 16000Hz 以进行自动重采样 + asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1))) + + self.agent_ready = asyncio.Event() + + @self.room.on("participant_connected") + def on_participant_connected(p): + print(f"👤 participant joined: {p.identity}") + if "agent" in p.identity: + self.agent_ready.set() + + print("等待 agent 加入...") + try: + await asyncio.wait_for(self.agent_ready.wait(), timeout=10) + print("✅ agent 已加入") + except asyncio.TimeoutError: + print("⚠️ agent 未加入(后续可能收不到音频)") + + async def close(self): + """优雅关闭所有连接和资源""" + self.stop_event.set() + if self.room: + await self.room.disconnect() + + async def forward_audio_to_esp32(self, audio_stream): + """从 LiveKit 接收音频,通过 WebSocket 发回给 ESP32""" + import opuslib + import json + # 创建下行 Opus 编码器 + encoder = opuslib.Encoder(SAMPLE_RATE, 1, 'voip') + + # 1. 告知 ESP32 开始说话,切换 UI 到“说话中”并准备解码 + if self.esp_ws: + await self.esp_ws.send(json.dumps({"type": "tts", "state": "start"})) + + try: + async for event in audio_stream: + if self.esp_ws: + try: + # AudioStream 迭代产生的是 AudioFrameEvent,需要从中提取 frame + frame = event.frame + # 将 PCM 编码为 Opus 才能发给 ESP32 + pcm_data = frame.data.tobytes() + + # 使用当前帧的实际采样数进行编码 + opus_packet = encoder.encode(pcm_data, frame.samples_per_channel) + await self.esp_ws.send(opus_packet) + except Exception as e: + print(f"发送回 ESP32 失败: {e}") + finally: + # 2. 音频流结束,告知 ESP32 停止说话,切换回聆听或闲置状态 + if self.esp_ws: + await self.esp_ws.send(json.dumps({"type": "tts", "state": "stop"})) + + async def handle_websocket(self, websocket): + """处理来自 ESP32 的 WebSocket 连接""" + self.esp_ws = websocket + print("ESP32 已连接") + opus_decoder = None + try: + # 发送 hello 告诉 ESP32 握手成功 + hello_msg = { + "type": "hello", + "transport": "websocket", + "audio_params": { + "format": "opus", # 明确要求 ESP32 发送 Opus + "sample_rate": SAMPLE_RATE, + "channels": 1, + "frame_duration": 60 + } + } + import json + await websocket.send(json.dumps(hello_msg)) + + async for message in websocket: + # 接收 ESP32 的数据 -> 推送到 LiveKit + if isinstance(message, bytes): + # 判断如果消息长度极其短并且不是合理的音频流,可能是ping包等 + if len(message) < 4: + print(f"收到过短的字节消息 ({len(message)} bytes),跳过") + continue + + # ESP32 默认使用 websocket_protocol version=1 (见 websocket_protocol.cc) + # 这个版本下,没有 4 字节的 header,接收到的就是原生的 Opus 数据帧。 + # 直接丢给 opuslib 解码即可。 + audio_data = message + print(f"收到音频包长度: {len(message)}") + if audio_data: + try: + # Create Opus decoder if not exists + if opus_decoder is None: + import opuslib + print(f"初始化 Opus 解码器: {SAMPLE_RATE}Hz, mono") + opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1) + + # 启动音频保存线程 + self.stop_event.clear() + thread = threading.Thread(target=self._wav_writer_loop, daemon=True) + self.wav_writer_thread = thread + thread.start() + + # Decode Opus packet. + # Frame size for 60ms is SAMPLE_RATE * 0.06 + frame_size = int(SAMPLE_RATE * 0.06) + pcm_bytes = opus_decoder.decode(audio_data, frame_size) + + # 将音频数据放入队列由后台线程保存 + self.audio_queue.put(pcm_bytes) + + num_samples = len(pcm_bytes) // 2 + if num_samples > 0: + frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples) + # Use memoryview to safely copy bytes into the frame data + memoryview(frame.data).cast('B')[:] = pcm_bytes + + # 将 capture_frame 放入当前事件循环的任务中 + await self.mic_source.capture_frame(frame) + except Exception as e: + print(f"Opus audio decode error ({len(message)} bytes): {e}") + elif isinstance(message, str): + import json + try: + data = json.loads(message) + print(f"收到 ESP32 JSON 消息: {data}") + except json.JSONDecodeError: + print(f"收到未知的字符消息: {message}") + except ConnectionClosedError as e: + print(f"ESP32 异常断开: {e}") + except Exception as e: + print(f"WebSocket 其他错误: {e}") + finally: + print("ESP32 断开连接") + self.esp_ws = None + if hasattr(self, "wav_writer_thread") and self.wav_writer_thread: + self.stop_event.set() + # 我们不一定需要 join,因为是 daemon=True + # 但这里设置 stop_event 会让线程在完成队列后退出 + + +async def main(): + bridge = ESP32LiveKitBridge() + try: + await bridge.start() + + # 启动 WebSocket 服务器 + async with websockets.serve(bridge.handle_websocket, "0.0.0.0", WS_PORT): + print(f"WebSocket 服务器运行在端口 {WS_PORT},等待 ESP32 连接...") + await asyncio.Future() # 保持运行 + finally: + await bridge.close() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except Exception as exc: + print(f"[error] {exc}", file=sys.stderr) + sys.exit(1) \ No newline at end of file diff --git a/main/bridge_server_bak.py b/main/bridge_server_bak.py new file mode 100644 index 0000000..51e2a35 --- /dev/null +++ b/main/bridge_server_bak.py @@ -0,0 +1,276 @@ + +import asyncio +import websockets +import os +import sys +import httpx +import json +import time +import queue +import threading +from typing import Any, Optional +from livekit import rtc +from livekit.rtc import AudioSource, AudioFrame +from websockets.exceptions import ConnectionClosedError +import http.server +import multipart +from urllib.parse import parse_qs + +# 配置信息 +# TOKEN_URL = "http://10.6.80.130:8000/v1/token" +# LIVEKIT_WS_URL = "ws://10.6.80.130:8000/" +# ROOM = "vera-room" +# IDENTITY = "vera-1" +# TOKEN_URL = "https://omnichat.bwgdi.com/v1/token" +TOKEN_URL = "http://10.6.80.130:8000/getToken" +LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud" +# LIVEKIT_WS_URL = "wss://rtc.bwgdi.com/" +ROOM = "test-livekit-room2" +IDENTITY = "uv-livekit-hardcoded" +import uuid +# IDENTITY = f"uv-{uuid.uuid4().hex[:6]}" +CONNECT_TIMEOUT_SECONDS = 10.0 +WS_PORT = 8080 + +SAMPLE_RATE = 16000 + +async def fetch_token() -> str: + async with httpx.AsyncClient(timeout=15.0, follow_redirects=True) as client: + response = await client.get( + TOKEN_URL, + params={"room": ROOM, "identity": IDENTITY, "agent_name": "my-agent"}, + ) + response.raise_for_status() + + payload: dict[str, Any] = response.json() + token = payload.get("token") + if not isinstance(token, str) or not token: + raise ValueError(f"token response missing token field: {payload}") + + print(f"[token] room={payload.get('room')} identity={payload.get('identity')}") + print(f"[token] jwt_prefix={token[:16]}... len={len(token)}") + print(f"[token] jwt_prefix={token}") + return token + +class ESP32LiveKitBridge: + def __init__(self): + self.room = rtc.Room() + # 创建一个音频源,用于将 ESP32 的声音推送到 LiveKit + # 注意:采样率需与 ESP32 发送的一致,通常是 16000 或 24000 + self.mic_source = AudioSource(sample_rate=SAMPLE_RATE, num_channels=1) + self.esp_ws = None # 保存 WebSocket 连接 + self.audio_queue = queue.Queue() + self.wav_writer_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + + def _wav_writer_loop(self): + import wave + print("启动音频保存线程...") + try: + with wave.open("bridge_debug.wav", "wb") as wav_file: + wav_file.setnchannels(1) + wav_file.setsampwidth(2) # 16-bit + wav_file.setframerate(SAMPLE_RATE) + while not self.stop_event.is_set() or not self.audio_queue.empty(): + try: + # 使用 timeout 避免永久阻塞,以便检查 stop_event + pcm_bytes = self.audio_queue.get(timeout=0.5) + wav_file.writeframes(pcm_bytes) + except queue.Empty: + continue + except Exception as e: + print(f"音频保存线程错误: {e}") + finally: + print("音频保存线程退出") + + async def start(self): + @self.room.on("connection_state_changed") + def on_connection_state_changed(state: int) -> None: + print(f"[livekit] state={rtc.ConnectionState.Name(state)}") + + # 1. 获取 Token 并连接 LiveKit + print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") + print(f"[config] token_url={TOKEN_URL}") + print(f"[config] room={ROOM} identity={IDENTITY}") + token = await fetch_token() + + await asyncio.wait_for( + self.room.connect( + LIVEKIT_WS_URL, + token, + options=rtc.RoomOptions(connect_timeout=CONNECT_TIMEOUT_SECONDS), + ), + timeout=CONNECT_TIMEOUT_SECONDS + 2.0, + ) + print(f"已连接到 LiveKit 房间: {self.room.name}") + print(f"[livekit] local_identity={self.room.local_participant.identity}") + print(f"[livekit] local_sid={self.room.local_participant.sid}") + + # 2. 发布麦克风轨道 (ESP32 -> LiveKit) + track = rtc.LocalAudioTrack.create_audio_track("esp32-mic", self.mic_source) + options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) + await self.room.local_participant.publish_track(track, options) + + # 3. 监听房间内的音频 (LiveKit -> ESP32) + # 当房间里有其他人(比如 AI Agent)说话时,触发此回调 + @self.room.on("track_subscribed") + def on_track_subscribed(track, publication, participant): + if track.kind == rtc.TrackKind.KIND_AUDIO: + print(f"收到音频流: {participant.identity}") + # 启动一个任务来接收音频流并转发给 ESP32,明确指定采样率为 16000Hz 以进行自动重采样 + asyncio.create_task(self.forward_audio_to_esp32(rtc.AudioStream(track, sample_rate=SAMPLE_RATE, num_channels=1))) + + self.agent_ready = asyncio.Event() + + @self.room.on("participant_connected") + def on_participant_connected(p): + print(f"👤 participant joined: {p.identity}") + if "agent" in p.identity: + self.agent_ready.set() + + print("等待 agent 加入...") + try: + await asyncio.wait_for(self.agent_ready.wait(), timeout=10) + print("✅ agent 已加入") + except asyncio.TimeoutError: + print("⚠️ agent 未加入(后续可能收不到音频)") + + async def close(self): + """优雅关闭所有连接和资源""" + self.stop_event.set() + if self.room: + await self.room.disconnect() + + async def forward_audio_to_esp32(self, audio_stream): + """从 LiveKit 接收音频,通过 WebSocket 发回给 ESP32""" + import opuslib + import json + # 创建下行 Opus 编码器 + encoder = opuslib.Encoder(SAMPLE_RATE, 1, 'voip') + + # 1. 告知 ESP32 开始说话,切换 UI 到“说话中”并准备解码 + if self.esp_ws: + await self.esp_ws.send(json.dumps({"type": "tts", "state": "start"})) + + try: + async for event in audio_stream: + if self.esp_ws: + try: + # AudioStream 迭代产生的是 AudioFrameEvent,需要从中提取 frame + frame = event.frame + # 将 PCM 编码为 Opus 才能发给 ESP32 + pcm_data = frame.data.tobytes() + + # 使用当前帧的实际采样数进行编码 + opus_packet = encoder.encode(pcm_data, frame.samples_per_channel) + await self.esp_ws.send(opus_packet) + except Exception as e: + print(f"发送回 ESP32 失败: {e}") + finally: + # 2. 音频流结束,告知 ESP32 停止说话,切换回聆听或闲置状态 + if self.esp_ws: + await self.esp_ws.send(json.dumps({"type": "tts", "state": "stop"})) + + async def handle_websocket(self, websocket): + """处理来自 ESP32 的 WebSocket 连接""" + self.esp_ws = websocket + print("ESP32 已连接") + opus_decoder = None + try: + # 发送 hello 告诉 ESP32 握手成功 + hello_msg = { + "type": "hello", + "transport": "websocket", + "audio_params": { + "format": "opus", # 明确要求 ESP32 发送 Opus + "sample_rate": SAMPLE_RATE, + "channels": 1, + "frame_duration": 60 + } + } + import json + await websocket.send(json.dumps(hello_msg)) + + async for message in websocket: + # 接收 ESP32 的数据 -> 推送到 LiveKit + if isinstance(message, bytes): + # 判断如果消息长度极其短并且不是合理的音频流,可能是ping包等 + if len(message) < 4: + print(f"收到过短的字节消息 ({len(message)} bytes),跳过") + continue + + # ESP32 默认使用 websocket_protocol version=1 (见 websocket_protocol.cc) + # 这个版本下,没有 4 字节的 header,接收到的就是原生的 Opus 数据帧。 + # 直接丢给 opuslib 解码即可。 + audio_data = message + print(f"收到音频包长度: {len(message)}") + if audio_data: + try: + # Create Opus decoder if not exists + if opus_decoder is None: + import opuslib + print(f"初始化 Opus 解码器: {SAMPLE_RATE}Hz, mono") + opus_decoder = opuslib.Decoder(SAMPLE_RATE, 1) + + # 启动音频保存线程 + self.stop_event.clear() + thread = threading.Thread(target=self._wav_writer_loop, daemon=True) + self.wav_writer_thread = thread + thread.start() + + # Decode Opus packet. + # Frame size for 60ms is SAMPLE_RATE * 0.06 + frame_size = int(SAMPLE_RATE * 0.06) + pcm_bytes = opus_decoder.decode(audio_data, frame_size) + + # 将音频数据放入队列由后台线程保存 + self.audio_queue.put(pcm_bytes) + + num_samples = len(pcm_bytes) // 2 + if num_samples > 0: + frame = AudioFrame.create(sample_rate=SAMPLE_RATE, num_channels=1, samples_per_channel=num_samples) + # Use memoryview to safely copy bytes into the frame data + memoryview(frame.data).cast('B')[:] = pcm_bytes + + # 将 capture_frame 放入当前事件循环的任务中 + await self.mic_source.capture_frame(frame) + except Exception as e: + print(f"Opus audio decode error ({len(message)} bytes): {e}") + elif isinstance(message, str): + import json + try: + data = json.loads(message) + print(f"收到 ESP32 JSON 消息: {data}") + except json.JSONDecodeError: + print(f"收到未知的字符消息: {message}") + except ConnectionClosedError as e: + print(f"ESP32 异常断开: {e}") + except Exception as e: + print(f"WebSocket 其他错误: {e}") + finally: + print("ESP32 断开连接") + self.esp_ws = None + if hasattr(self, "wav_writer_thread") and self.wav_writer_thread: + self.stop_event.set() + # 我们不一定需要 join,因为是 daemon=True + # 但这里设置 stop_event 会让线程在完成队列后退出 + + +async def main(): + bridge = ESP32LiveKitBridge() + try: + await bridge.start() + + # 启动 WebSocket 服务器 + async with websockets.serve(bridge.handle_websocket, "0.0.0.0", WS_PORT): + print(f"WebSocket 服务器运行在端口 {WS_PORT},等待 ESP32 连接...") + await asyncio.Future() # 保持运行 + finally: + await bridge.close() + +if __name__ == "__main__": + try: + asyncio.run(main()) + except Exception as exc: + print(f"[error] {exc}", file=sys.stderr) + sys.exit(1) \ No newline at end of file diff --git a/main/debug_connection.py b/main/debug_connection.py new file mode 100644 index 0000000..13fe608 --- /dev/null +++ b/main/debug_connection.py @@ -0,0 +1,38 @@ +import asyncio +import websockets +import ssl +import sys + +async def test_connect(url): + print(f"Testing connection to: {url}") + try: + # Create a custom SSL context that ignores certificate validation errors + ssl_context = ssl.create_default_context() + ssl_context.check_hostname = False + ssl_context.verify_mode = ssl.CERT_NONE + + async with websockets.connect(url, ssl=ssl_context) as websocket: + print(f"SUCCESS: Connected to {url}") + await websocket.close() + return True + except Exception as e: + print(f"FAILED: Could not connect to {url}. Error: {e}") + return False + +async def main(): + urls_to_test = [ + "wss://10.6.80.12:31581", + "ws://10.6.80.12:31581", + "wss://10.6.80.12:31581", + "ws://10.6.80.12:31581", + ] + + print("--- Starting LiveKit Connection Probe ---") + for url in urls_to_test: + if await test_connect(url): + print(f"\nFOUND WORKING URL: {url}") + break + print("--- Probe Finished ---") + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/main/ota.cc b/main/ota.cc index bcc0dc5..9341127 100644 --- a/main/ota.cc +++ b/main/ota.cc @@ -1,30 +1,29 @@ #include "ota.h" -#include "system_info.h" -#include "settings.h" #include "assets/lang_config.h" +#include "settings.h" +#include "system_info.h" -#include -#include -#include -#include -#include -#include #include #include #include #include +#include +#include +#include +#include +#include +#include #ifdef SOC_HMAC_SUPPORTED #include #endif -#include -#include -#include #include +#include +#include +#include #define TAG "Ota" - Ota::Ota() { #ifdef ESP_EFUSE_BLOCK_USR_DATA // Read Serial Number from efuse user_data @@ -40,8 +39,7 @@ Ota::Ota() { #endif } -Ota::~Ota() { -} +Ota::~Ota() {} std::string Ota::GetCheckVersionUrl() { Settings settings("wifi", false); @@ -62,7 +60,8 @@ std::unique_ptr Ota::SetupHttp() { http->SetHeader("Client-Id", board.GetUuid()); if (has_serial_number_) { http->SetHeader("Serial-Number", serial_number_.c_str()); - ESP_LOGI(TAG, "Setup HTTP, User-Agent: %s, Serial-Number: %s", user_agent.c_str(), serial_number_.c_str()); + ESP_LOGI(TAG, "Setup HTTP, User-Agent: %s, Serial-Number: %s", user_agent.c_str(), + serial_number_.c_str()); } http->SetHeader("User-Agent", user_agent); http->SetHeader("Accept-Language", Lang::CODE); @@ -71,7 +70,7 @@ std::unique_ptr Ota::SetupHttp() { return http; } -/* +/* * Specification: https://ccnphfhqs21z.feishu.cn/wiki/FjW6wZmisimNBBkov6OcmfvknVd */ esp_err_t Ota::CheckVersion() { @@ -112,8 +111,8 @@ esp_err_t Ota::CheckVersion() { // Response: { "firmware": { "version": "1.0.0", "url": "http://" } } // Parse the JSON response and check if the version is newer // If it is, set has_new_version_ to true and store the new version and URL - - cJSON *root = cJSON_Parse(data.c_str()); + + cJSON* root = cJSON_Parse(data.c_str()); if (root == NULL) { ESP_LOGE(TAG, "Failed to parse JSON response"); return ESP_ERR_INVALID_RESPONSE; @@ -121,7 +120,7 @@ esp_err_t Ota::CheckVersion() { has_activation_code_ = false; has_activation_challenge_ = false; - cJSON *activation = cJSON_GetObjectItem(root, "activation"); + cJSON* activation = cJSON_GetObjectItem(root, "activation"); if (cJSON_IsObject(activation)) { cJSON* message = cJSON_GetObjectItem(activation, "message"); if (cJSON_IsString(message)) { @@ -144,11 +143,11 @@ esp_err_t Ota::CheckVersion() { } has_mqtt_config_ = false; - cJSON *mqtt = cJSON_GetObjectItem(root, "mqtt"); + cJSON* mqtt = cJSON_GetObjectItem(root, "mqtt"); if (cJSON_IsObject(mqtt)) { Settings settings("mqtt", true); - cJSON *item = NULL; - cJSON_ArrayForEach(item, mqtt) { + cJSON* item = NULL; + cJSON_ArrayForEach (item, mqtt) { if (cJSON_IsString(item)) { if (settings.GetString(item->string) != item->valuestring) { settings.SetString(item->string, item->valuestring); @@ -159,17 +158,17 @@ esp_err_t Ota::CheckVersion() { } } } - has_mqtt_config_ = true; + has_mqtt_config_ = false; } else { ESP_LOGI(TAG, "No mqtt section found !"); } has_websocket_config_ = false; - cJSON *websocket = cJSON_GetObjectItem(root, "websocket"); + cJSON* websocket = cJSON_GetObjectItem(root, "websocket"); if (cJSON_IsObject(websocket)) { Settings settings("websocket", true); - cJSON *item = NULL; - cJSON_ArrayForEach(item, websocket) { + cJSON* item = NULL; + cJSON_ArrayForEach (item, websocket) { if (cJSON_IsString(item)) { if (settings.GetString(item->string) != item->valuestring) { settings.SetString(item->string, item->valuestring); @@ -185,23 +184,28 @@ esp_err_t Ota::CheckVersion() { ESP_LOGI(TAG, "No websocket section found!"); } - has_server_time_ = false; - cJSON *server_time = cJSON_GetObjectItem(root, "server_time"); + // [开发调试] 强制修改 WebSocket URL 指向本地 Bridge 服务 + // 请将下面的 IP 地址修改为你电脑的局域网 IP (例如 192.168.1.5) + Settings settings("websocket", true); + settings.SetString("url", "ws://10.6.80.130:8080"); + has_websocket_config_ = true; + + cJSON* server_time = cJSON_GetObjectItem(root, "server_time"); if (cJSON_IsObject(server_time)) { - cJSON *timestamp = cJSON_GetObjectItem(server_time, "timestamp"); - cJSON *timezone_offset = cJSON_GetObjectItem(server_time, "timezone_offset"); - + cJSON* timestamp = cJSON_GetObjectItem(server_time, "timestamp"); + cJSON* timezone_offset = cJSON_GetObjectItem(server_time, "timezone_offset"); + if (cJSON_IsNumber(timestamp)) { // 设置系统时间 struct timeval tv; double ts = timestamp->valuedouble; - + // 如果有时区偏移,计算本地时间 if (cJSON_IsNumber(timezone_offset)) { - ts += (timezone_offset->valueint * 60 * 1000); // 转换分钟为毫秒 + ts += (timezone_offset->valueint * 60 * 1000); // 转换分钟为毫秒 } - - tv.tv_sec = (time_t)(ts / 1000); // 转换毫秒为秒 + + tv.tv_sec = (time_t)(ts / 1000); // 转换毫秒为秒 tv.tv_usec = (suseconds_t)((long long)ts % 1000) * 1000; // 剩余的毫秒转换为微秒 settimeofday(&tv, NULL); has_server_time_ = true; @@ -211,13 +215,13 @@ esp_err_t Ota::CheckVersion() { } has_new_version_ = false; - cJSON *firmware = cJSON_GetObjectItem(root, "firmware"); + cJSON* firmware = cJSON_GetObjectItem(root, "firmware"); if (cJSON_IsObject(firmware)) { - cJSON *version = cJSON_GetObjectItem(firmware, "version"); + cJSON* version = cJSON_GetObjectItem(firmware, "version"); if (cJSON_IsString(version)) { firmware_version_ = version->valuestring; } - cJSON *url = cJSON_GetObjectItem(firmware, "url"); + cJSON* url = cJSON_GetObjectItem(firmware, "url"); if (cJSON_IsString(url)) { firmware_url_ = url->valuestring; } @@ -231,7 +235,7 @@ esp_err_t Ota::CheckVersion() { ESP_LOGI(TAG, "Current is the latest version"); } // If the force flag is set to 1, the given version is forced to be installed - cJSON *force = cJSON_GetObjectItem(firmware, "force"); + cJSON* force = cJSON_GetObjectItem(firmware, "force"); if (cJSON_IsNumber(force) && force->valueint == 1) { has_new_version_ = true; } @@ -264,7 +268,8 @@ void Ota::MarkCurrentVersionValid() { } } -bool Ota::Upgrade(const std::string& firmware_url, std::function callback) { +bool Ota::Upgrade(const std::string& firmware_url, + std::function callback) { ESP_LOGI(TAG, "Upgrading firmware from %s", firmware_url.c_str()); esp_ota_handle_t update_handle = 0; auto update_partition = esp_ota_get_next_update_partition(NULL); @@ -273,7 +278,8 @@ bool Ota::Upgrade(const std::string& firmware_url, std::functionlabel, update_partition->address); + ESP_LOGI(TAG, "Writing to partition %s at offset 0x%lx", update_partition->label, + update_partition->address); bool image_header_checked = false; std::string image_header; @@ -319,7 +325,8 @@ bool Ota::Upgrade(const std::string& firmware_url, std::function= 1000000 || ret == 0) { size_t progress = total_read * 100 / content_length; - ESP_LOGI(TAG, "Progress: %u%% (%u/%u), Speed: %uB/s", progress, total_read, content_length, recent_read); + ESP_LOGI(TAG, "Progress: %u%% (%u/%u), Speed: %uB/s", progress, total_read, + content_length, recent_read); if (callback) { callback(progress, recent_read); } @@ -329,9 +336,14 @@ bool Ota::Upgrade(const std::string& firmware_url, std::function= sizeof(esp_image_header_t) + sizeof(esp_image_segment_header_t) + sizeof(esp_app_desc_t)) { + if (image_header.size() >= sizeof(esp_image_header_t) + + sizeof(esp_image_segment_header_t) + + sizeof(esp_app_desc_t)) { esp_app_desc_t new_app_info; - memcpy(&new_app_info, image_header.data() + sizeof(esp_image_header_t) + sizeof(esp_image_segment_header_t), sizeof(esp_app_desc_t)); + memcpy(&new_app_info, + image_header.data() + sizeof(esp_image_header_t) + + sizeof(esp_image_segment_header_t), + sizeof(esp_app_desc_t)); if (esp_ota_begin(update_partition, OTA_WITH_SEQUENTIAL_WRITES, &update_handle)) { esp_ota_abort(update_handle); @@ -390,23 +402,22 @@ bool Ota::StartUpgrade(std::function callback) return Upgrade(firmware_url_, callback); } - std::vector Ota::ParseVersion(const std::string& version) { std::vector versionNumbers; std::stringstream ss(version); std::string segment; - + while (std::getline(ss, segment, '.')) { versionNumbers.push_back(std::stoi(segment)); } - + return versionNumbers; } bool Ota::IsNewVersionAvailable(const std::string& currentVersion, const std::string& newVersion) { std::vector current = ParseVersion(currentVersion); std::vector newer = ParseVersion(newVersion); - + for (size_t i = 0; i < std::min(current.size(), newer.size()); ++i) { if (newer[i] > current[i]) { return true; @@ -414,7 +425,7 @@ bool Ota::IsNewVersionAvailable(const std::string& currentVersion, const std::st return false; } } - + return newer.size() > current.size(); } @@ -425,10 +436,11 @@ std::string Ota::GetActivationPayload() { std::string hmac_hex; #ifdef SOC_HMAC_SUPPORTED - uint8_t hmac_result[32]; // SHA-256 输出为32字节 - + uint8_t hmac_result[32]; // SHA-256 输出为32字节 + // 使用Key0计算HMAC - esp_err_t ret = esp_hmac_calculate(HMAC_KEY0, (uint8_t*)activation_challenge_.data(), activation_challenge_.size(), hmac_result); + esp_err_t ret = esp_hmac_calculate(HMAC_KEY0, (uint8_t*)activation_challenge_.data(), + activation_challenge_.size(), hmac_result); if (ret != ESP_OK) { ESP_LOGE(TAG, "HMAC calculation failed: %s", esp_err_to_name(ret)); return "{}"; @@ -441,7 +453,7 @@ std::string Ota::GetActivationPayload() { } #endif - cJSON *payload = cJSON_CreateObject(); + cJSON* payload = cJSON_CreateObject(); cJSON_AddStringToObject(payload, "algorithm", "hmac-sha256"); cJSON_AddStringToObject(payload, "serial_number", serial_number_.c_str()); cJSON_AddStringToObject(payload, "challenge", activation_challenge_.c_str()); @@ -477,13 +489,14 @@ esp_err_t Ota::Activate() { ESP_LOGE(TAG, "Failed to open HTTP connection"); return ESP_FAIL; } - + auto status_code = http->GetStatusCode(); if (status_code == 202) { return ESP_ERR_TIMEOUT; } if (status_code != 200) { - ESP_LOGE(TAG, "Failed to activate, code: %d, body: %s", status_code, http->ReadAll().c_str()); + ESP_LOGE(TAG, "Failed to activate, code: %d, body: %s", status_code, + http->ReadAll().c_str()); return ESP_FAIL; } diff --git a/main/test_client_wav.py b/main/test_client_wav.py new file mode 100644 index 0000000..4daaa9e --- /dev/null +++ b/main/test_client_wav.py @@ -0,0 +1,194 @@ +import asyncio +import os +import sys +import wave +import time +from dotenv import load_dotenv + +# Try to load credentials from .env.local +load_dotenv(".env.local") + +from livekit import rtc + +async def publish_audio_from_wav(room: rtc.Room, wav_path: str): + print(f"🎵 准备加载音频文件: {wav_path}") + if not os.path.exists(wav_path): + print(f"❌ 找不到文件 {wav_path}") + return + + with wave.open(wav_path, "rb") as wf: + sample_rate = wf.getframerate() + num_channels = wf.getnchannels() + sampwidth = wf.getsampwidth() + + if sampwidth != 2: + print("❌ 错误:只支持 16-bit PCM (S16LE) 编码的 WAV 文件。") + return + + print(f"📊 音频信息: 采样率 {sample_rate}Hz, 通道数 {num_channels}") + + source = rtc.AudioSource(sample_rate, num_channels) + track = rtc.LocalAudioTrack.create_audio_track("agent_input_audio", source) + options = rtc.TrackPublishOptions() + options.source = rtc.TrackSource.SOURCE_MICROPHONE + + await room.local_participant.publish_track(track, options) + print("✅ 成功发布麦克风音频轨道,开始推流...") + + # 每次读取并推送 20ms 的数据 + chunk_duration_ms = 20 + samples_per_chunk = int(sample_rate * (chunk_duration_ms / 1000.0)) + bytes_per_chunk = samples_per_chunk * num_channels * sampwidth + + while True: + data = wf.readframes(samples_per_chunk) + if not data: + break + + # 根据已读取的数据计算出完整的采样数(最后一帧可能不足 20ms) + frame_samples = len(data) // (num_channels * sampwidth) + + # 使用 LiveKit SDK 封装 AudioFrame + try: + # LiveKit Python SDK version >= 0.15 + audio_frame = rtc.AudioFrame(data, sample_rate, num_channels, frame_samples) + await source.capture_frame(audio_frame) + except TypeError: + # 若抛出 TypeError,可能 SDK 版本有差异,尝试旧版本 API 或者直接 copy + frame = rtc.AudioFrame.create(sample_rate, num_channels, frame_samples) + frame.data[:] = data + await source.capture_frame(frame) + + # 严格控制发送速率,避免瞬时把整个音频发过去而导致对面不识别(模拟真实发音) + await asyncio.sleep(chunk_duration_ms / 1000.0) + + print("🎉 录音流推送完毕!等待 Agent 回复中...") + +async def save_audio_stream(track: rtc.RemoteAudioTrack): + # 为避免旧文件冲突,加个时间戳 + filename = f"agent_response_{int(time.time())}.wav" + print(f"🎙️ 正在将 Agent 的声音写入文件: {filename}") + stream = rtc.AudioStream(track) + + wf = None + try: + async for event in stream: + # 接收到第一帧时初始化 WAV 格式 + if wf is None: + wf = wave.open(filename, "wb") + wf.setnchannels(event.frame.num_channels) + wf.setsampwidth(2) # 16-bit + wf.setframerate(event.frame.sample_rate) + + # 写入当前帧音频数据 (16-bit PCM) + wf.writeframes(bytes(event.frame.data)) + except Exception as e: + print(f"音频流断开或出错: {e}") + finally: + if wf is not None: + wf.close() + print(f"💾 Agent 语音结果已成功保存到: {filename}") + + +async def main(room_url: str, token: str, wav_path: str): + room = rtc.Room() + agent_ready = asyncio.Event() + + @room.on("connected") + def on_connected(): + print("✅ 成功连接到 LiveKit 房间") + # 如果连接时房间里已经有 Agent(远程参与者),直接准备触发 + if room.remote_participants: + agent_ready.set() + + @room.on("participant_connected") + def on_participant_connected(participant: rtc.RemoteParticipant): + print(f"👋 Agent ({participant.identity}) 已加入房间") + agent_ready.set() + + @room.on("data_received") + def on_data_received(data_packet: rtc.DataPacket): + identity = data_packet.participant.identity if data_packet.participant else "未知" + try: + print(f"📩 [数据接收 | {identity}]: {data_packet.data.decode('utf-8')}") + except Exception: + pass + + @room.on("transcription_received") + def on_transcription_received( + segments: list[rtc.TranscriptionSegment], + participant: rtc.Participant, + track_pub: rtc.TrackPublication + ): + identity = participant.identity if participant else "未知" + for segment in segments: + status = "✅ 最终结果" if segment.final else "⏳ 正在思考/中间结果" + print(f"🗣️ [{status} | {identity}]: {segment.text}") + + @room.on("track_subscribed") + def on_track_subscribed( + track: rtc.Track, + publication: rtc.RemoteTrackPublication, + participant: rtc.RemoteParticipant + ): + # 当 Agent 发出新的声音(音频轨道)时,我们订阅并保存 + if track.kind == rtc.TrackKind.KIND_AUDIO: + asyncio.create_task(save_audio_stream(track)) + + print("⏳ 正在建立连接...") + await room.connect(room_url, token) + + print("⏳ 等待 Agent 初始化并加入房间...") + await agent_ready.wait() + # 稍微延迟半秒钟,确保 Agent 侧面的准备(如模型加载等)一切就绪 + await asyncio.sleep(0.5) + + # 开始推送本地 wav 音频 + asyncio.create_task(publish_audio_from_wav(room, wav_path)) + + try: + await asyncio.Event().wait() + except KeyboardInterrupt: + print("\n断开连接中...") + finally: + await room.disconnect() + +if __name__ == "__main__": + if len(sys.argv) < 2: + print("❌ 用法: python test_client_wav.py [LIVEKIT_URL] [LIVEKIT_TOKEN/API_KEY]") + print("说明:\n1. 必须提供 WAV 路径。") + print("2. 自动从 .env.local 读取 LIVEKIT_URL。并在没有提供 Token 时自动向 localhost:8000/getToken 请求。") + sys.exit(1) + + wav_file = sys.argv[1] + + url = os.getenv("LIVEKIT_URL") + + token = None + if len(sys.argv) >= 4: + url = sys.argv[2] + token = sys.argv[3] + + if not token: + import urllib.request + import json + import random + # 每次使用随机的测试房间,防止上一次没退出的 agent 堆积在同一个房间里导致多重回复 + unique_room = f"test-room-{random.randint(1000, 9999)}" + print(f"🔄 正在通过本地服务获取 Token,请求加入全新独立房间: {unique_room} ...") + try: + req = urllib.request.urlopen(f"http://localhost:8000/getToken?room={unique_room}&identity=python_tester&agent_name=my-agent") + res_body = req.read().decode('utf-8') + data = json.loads(res_body) + token = data.get("token") + print("✅ 成功获取了包含了 Agent dispatch 的临时 Token!") + except Exception as e: + print(f"❌ 获取 Token 失败,错误信息: {e}") + print("若本地 token 服务未启动,请手动提供有效的测试 token") + sys.exit(1) + + if not url or not token: + print("❌ 缺少 LiveKit URL 或 Token") + sys.exit(1) + + asyncio.run(main(url, token, wav_file))