diff --git a/main/bridge_server.py b/main/bridge_server.py index d189b1d..d198992 100644 --- a/main/bridge_server.py +++ b/main/bridge_server.py @@ -22,8 +22,8 @@ try: except ImportError: livekit_api = None -TOKEN_URL = "http://10.6.80.130:8000/getToken" -LIVEKIT_WS_URL = "wss://test-b2zm4kva.livekit.cloud" +TOKEN_URL = "http://172.19.0.240:8000/getToken" +LIVEKIT_WS_URL = "ws://172.19.0.240:7880" ROOM_PREFIX = "test-livekit" IDENTITY_PREFIX = "uv-livekit" AGENT_NAME = "my-agent" @@ -34,16 +34,21 @@ AGENT_DISPATCH_MODE = os.getenv("AGENT_DISPATCH_MODE", "token").lower() INPUT_SAMPLE_RATE = 16000 OUTPUT_SAMPLE_RATE = 24000 -INPUT_FRAME_DURATION_MS = 60 +INPUT_FRAME_DURATION_MS = 20 INPUT_SAMPLES_PER_OPUS_FRAME = INPUT_SAMPLE_RATE * INPUT_FRAME_DURATION_MS // 1000 +INPUT_MAX_SAMPLES_PER_OPUS_FRAME = INPUT_SAMPLE_RATE * 60 // 1000 OUTPUT_FRAME_DURATION_MS = 20 OUTPUT_SAMPLES_PER_OPUS_FRAME = OUTPUT_SAMPLE_RATE * OUTPUT_FRAME_DURATION_MS // 1000 -TTS_IDLE_TIMEOUT_SECONDS = 0.8 +TTS_IDLE_TIMEOUT_SECONDS = 0.25 TTS_SILENCE_PEAK_THRESHOLD = 96 -TTS_PRE_ROLL_MS = 200 -TTS_START_CONSECUTIVE_AUDIBLE_FRAMES = 3 +TTS_PRE_ROLL_MS = 80 +TTS_START_CONSECUTIVE_AUDIBLE_FRAMES = 1 TTS_INTERRUPT_SILENCE_FRAMES = 3 INTERRUPT_TOPIC = "lk.interrupt" +TTS_DISPLAY_SENTENCE_BREAKS = "。!?!?;;" +TTS_DISPLAY_SCROLL_WIDTH = int(os.getenv("TTS_DISPLAY_SCROLL_WIDTH", "18")) +TTS_DISPLAY_SCROLL_INTERVAL_SECONDS = float(os.getenv("TTS_DISPLAY_SCROLL_INTERVAL_SECONDS", "0.18")) +TTS_DISPLAY_SCROLL_GAP = " " @dataclass @@ -59,8 +64,11 @@ class DeviceSession: forwarding_tracks: dict[str, asyncio.Task[Any]] = field(default_factory=dict) tts_active: bool = False tts_idle_task: Optional[asyncio.Task] = None + tts_display_task: Optional[asyncio.Task] = None tts_stream_id: int = 0 tts_transcript_text: str = "" + tts_display_text: str = "" + tts_display_final: bool = False agent_dispatch_task: Optional[asyncio.Task] = None closed: bool = False captured_frame_count: int = 0 @@ -81,8 +89,8 @@ async def fetch_token(room_name: str, identity: str) -> str: if not isinstance(token, str) or not token: raise ValueError(f"token response missing token field: {payload}") - print(f"[token] room={payload.get('room')} identity={payload.get('identity')}") - print(f"[token] jwt_prefix={token[:16]}... len={len(token)}") + # print(f"[token] room={payload.get('room')} identity={payload.get('identity')}") + # print(f"[token] jwt_prefix={token[:16]}... len={len(token)}") return token @@ -123,10 +131,10 @@ class ESP32LiveKitBridge: def _log_agent_participants(self, session: DeviceSession, source: str) -> None: agent_identities = self._get_agent_identities(session) - print( - f"[agent-check] source={source} room={session.room_name} " - f"agent_count={len(agent_identities)} agents={agent_identities}" - ) + # print( + # f"[agent-check] source={source} room={session.room_name} " + # f"agent_count={len(agent_identities)} agents={agent_identities}" + # ) async def _has_existing_dispatch(self, session: DeviceSession) -> bool: if livekit_api is None: @@ -176,7 +184,7 @@ class ESP32LiveKitBridge: async def ensure_agent_dispatched(self, session: DeviceSession) -> None: if AGENT_DISPATCH_MODE != "bridge": - print(f"跳过 bridge 手动 dispatch: mode={AGENT_DISPATCH_MODE}") + # print(f"跳过 bridge 手动 dispatch: mode={AGENT_DISPATCH_MODE}") return if await self._has_existing_dispatch(session): @@ -184,11 +192,11 @@ class ESP32LiveKitBridge: for participant in session.room.remote_participants.values(): if self._is_agent_participant(participant): - print(f"Agent 已在房间中,跳过 dispatch: {participant.identity}") + # print(f"Agent 已在房间中,跳过 dispatch: {participant.identity}") return if session.agent_dispatch_task is not None and not session.agent_dispatch_task.done(): - print("Agent dispatch 正在进行中,跳过重复请求") + # print("Agent dispatch 正在进行中,跳过重复请求") return session.agent_dispatch_task = asyncio.create_task(self._dispatch_agent(session)) @@ -333,11 +341,82 @@ class ESP32LiveKitBridge: await session.websocket.send(json.dumps({"type": "tts", "state": state})) print(f"已发送 tts {state}: device={session.device_id}") + async def _send_tts_text(self, session: DeviceSession, text: str, final: bool) -> None: + if session.websocket is None: + return + await session.websocket.send( + json.dumps( + { + "type": "tts", + "state": "sentence_start", + "text": text, + "final": final, + } + ) + ) + + def _cancel_tts_display_task(self, session: DeviceSession) -> None: + if session.tts_display_task is not None: + session.tts_display_task.cancel() + session.tts_display_task = None + + def _update_tts_display_text(self, session: DeviceSession, text: str, final: bool) -> None: + session.tts_display_text = text + session.tts_display_final = final + + if len(text) <= TTS_DISPLAY_SCROLL_WIDTH: + self._cancel_tts_display_task(session) + asyncio.create_task(self._send_tts_text(session, text, final)) + return + + if session.tts_display_task is None or session.tts_display_task.done(): + session.tts_display_task = asyncio.create_task( + self._scroll_tts_display_text(session, session.tts_stream_id) + ) + + async def _scroll_tts_display_text(self, session: DeviceSession, stream_id: int) -> None: + offset = 0 + last_sent = "" + try: + while stream_id == session.tts_stream_id and session.websocket is not None: + text = session.tts_display_text + if not text: + return + + if len(text) <= TTS_DISPLAY_SCROLL_WIDTH: + if text != last_sent: + await self._send_tts_text(session, text, session.tts_display_final) + return + + scroll_text = text + TTS_DISPLAY_SCROLL_GAP + if offset >= len(scroll_text): + offset = 0 + + looped_text = scroll_text + scroll_text[:TTS_DISPLAY_SCROLL_WIDTH] + visible_text = looped_text[offset:offset + TTS_DISPLAY_SCROLL_WIDTH].rstrip() + if visible_text and visible_text != last_sent: + await self._send_tts_text( + session, + visible_text, + session.tts_display_final, + ) + last_sent = visible_text + + offset += 1 + await asyncio.sleep(TTS_DISPLAY_SCROLL_INTERVAL_SECONDS) + except asyncio.CancelledError: + pass + except Exception as exc: + print(f"TTS 字幕滚动失败: {exc}") + async def _start_tts(self, session: DeviceSession) -> None: if session.tts_active: print("跳过 tts start,当前已处于激活状态") return - session.tts_transcript_text = "" + if not session.tts_display_text: + session.tts_transcript_text = "" + session.tts_display_final = False + self._cancel_tts_display_task(session) await self._send_tts_state(session, "start") session.tts_active = True @@ -345,9 +424,12 @@ class ESP32LiveKitBridge: if not session.tts_active: print("跳过 tts stop,当前未激活") return + self._cancel_tts_display_task(session) await self._send_tts_state(session, "stop") session.tts_active = False session.tts_transcript_text = "" + session.tts_display_text = "" + session.tts_display_final = False async def _abort_tts(self, session: DeviceSession, reason: str = "client_abort") -> None: print(f"收到打断请求,停止当前 TTS: device={session.device_id} reason={reason}") @@ -355,6 +437,7 @@ class ESP32LiveKitBridge: if session.tts_idle_task is not None: session.tts_idle_task.cancel() session.tts_idle_task = None + self._cancel_tts_display_task(session) await self._send_agent_interrupt(session, reason) await self._stop_tts(session) @@ -479,10 +562,33 @@ class ESP32LiveKitBridge: return payload + def _current_tts_display_text(self, text: str) -> str: + normalized = " ".join(text.split()) + if not normalized: + return "" + + last_break = -1 + previous_break = -1 + for index, char in enumerate(normalized): + if char in TTS_DISPLAY_SENTENCE_BREAKS: + previous_break = last_break + last_break = index + + if last_break == -1: + return normalized + + if last_break == len(normalized) - 1: + start = previous_break + 1 + else: + start = last_break + 1 + + return normalized[start:].strip() or normalized + def _register_room_handlers(self, session: DeviceSession) -> None: @session.room.on("connection_state_changed") def on_connection_state_changed(state: int) -> None: - print(f"[livekit] room={session.room_name} state={rtc.ConnectionState.Name(state)}") + # print(f"[livekit] room={session.room_name} state={rtc.ConnectionState.Name(state)}") + pass @session.room.on("connected") def on_connected() -> None: @@ -535,22 +641,20 @@ class ESP32LiveKitBridge: for segment in segments: status = "✅ 最终结果" if segment.final else "⏳ 正在思考/中间结果" print(f"🗣️ [{status} | room={session.room_name} | {identity}]: {segment.text}") + if is_agent: + display_text = self._current_tts_display_text(segment.text) + if not display_text or display_text == session.tts_transcript_text: + continue + session.tts_transcript_text = display_text + else: + if not segment.final: + continue + display_text = segment.text + if session.websocket is not None: ws = session.websocket if is_agent: - session.tts_transcript_text = segment.text - asyncio.create_task( - ws.send( - json.dumps( - { - "type": "tts", - "state": "sentence_start", - "text": session.tts_transcript_text, - "final": segment.final, - } - ) - ) - ) + self._update_tts_display_text(session, display_text, segment.final) else: asyncio.create_task( ws.send( @@ -578,10 +682,10 @@ class ESP32LiveKitBridge: participant: rtc.RemoteParticipant, ) -> None: track_sid = getattr(publication, "sid", None) - print( - f"📡 远端音轨已发布: room={session.room_name} identity={participant.identity} " - f"track_sid={track_sid}" - ) + # print( + # f"📡 远端音轨已发布: room={session.room_name} identity={participant.identity} " + # f"track_sid={track_sid}" + # ) track = getattr(publication, "track", None) if track is not None: self._maybe_forward_remote_audio(session, track, publication, participant, "published") @@ -589,10 +693,10 @@ class ESP32LiveKitBridge: async def _connect_session_room(self, session: DeviceSession) -> None: self._register_room_handlers(session) - print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") - print(f"[config] token_url={TOKEN_URL}") - print(f"[config] room={session.room_name} identity={session.identity}") - print(f"[config] livekit_connect_timeout={CONNECT_TIMEOUT_SECONDS}") + # print(f"[config] livekit_ws_url={LIVEKIT_WS_URL}") + # print(f"[config] token_url={TOKEN_URL}") + # print(f"[config] room={session.room_name} identity={session.identity}") + # print(f"[config] livekit_connect_timeout={CONNECT_TIMEOUT_SECONDS}") token = await fetch_token(session.room_name, session.identity) try: @@ -614,9 +718,9 @@ class ESP32LiveKitBridge: raise print(f"已连接到 LiveKit 房间: {session.room.name}") - print(f"[livekit] local_identity={session.room.local_participant.identity}") - print(f"[livekit] local_sid={session.room.local_participant.sid}") - print(f"[livekit] remote_participants={list(session.room.remote_participants.keys())}") + # print(f"[livekit] local_identity={session.room.local_participant.identity}") + # print(f"[livekit] local_sid={session.room.local_participant.sid}") + # print(f"[livekit] remote_participants={list(session.room.remote_participants.keys())}") self._log_agent_participants(session, "after_connect") await self.ensure_agent_dispatched(session) @@ -629,16 +733,16 @@ class ESP32LiveKitBridge: publication = await session.room.local_participant.publish_track(track, options) publication_sid = getattr(publication, "sid", None) track_sid = getattr(track, "sid", None) - print( - f"已发布 ESP32 mic track: room={session.room_name} " - f"track_sid={track_sid} publication_sid={publication_sid}" - ) + # print( + # f"已发布 ESP32 mic track: room={session.room_name} " + # f"track_sid={track_sid} publication_sid={publication_sid}" + # ) self._log_agent_participants(session, "after_publish_mic") - print(f"等待 agent 加入: room={session.room_name}") + # print(f"等待 agent 加入: room={session.room_name}") try: await asyncio.wait_for(session.agent_ready.wait(), timeout=AGENT_READY_TIMEOUT_SECONDS) - print(f"✅ agent 已就绪: room={session.room_name}") + # print(f"✅ agent 已就绪: room={session.room_name}") except asyncio.TimeoutError: print(f"⚠️ agent 等待超时: room={session.room_name}") @@ -662,6 +766,7 @@ class ESP32LiveKitBridge: if session.tts_idle_task is not None: session.tts_idle_task.cancel() session.tts_idle_task = None + self._cancel_tts_display_task(session) try: await session.room.disconnect() except Exception as exc: @@ -810,8 +915,8 @@ class ESP32LiveKitBridge: ) self.device_sessions[device_id] = session - print(f"ESP32 已连接: device={device_id}") - print(f"ESP32 协议版本: {session.protocol_version}") + # print(f"ESP32 已连接: device={device_id}") + # print(f"ESP32 协议版本: {session.protocol_version}") session.tts_stream_id += 1 opus_decoder = None @@ -846,10 +951,10 @@ class ESP32LiveKitBridge: try: if opus_decoder is None: - print(f"初始化 Opus 解码器: {INPUT_SAMPLE_RATE}Hz, mono") + # print(f"初始化 Opus 解码器: {INPUT_SAMPLE_RATE}Hz, mono") opus_decoder = opuslib.Decoder(INPUT_SAMPLE_RATE, 1) - pcm_bytes = opus_decoder.decode(audio_data, INPUT_SAMPLES_PER_OPUS_FRAME) + pcm_bytes = opus_decoder.decode(audio_data, INPUT_MAX_SAMPLES_PER_OPUS_FRAME) num_samples = len(pcm_bytes) // 2 if num_samples > 0: @@ -860,11 +965,11 @@ class ESP32LiveKitBridge: or now - session.first_capture_log_time >= 5.0 ): session.first_capture_log_time = now - print( - f"[uplink] capture_frame count={session.captured_frame_count} " - f"bytes={len(pcm_bytes)} samples={num_samples} " - f"room={session.room_name}" - ) + # print( + # f"[uplink] capture_frame count={session.captured_frame_count} " + # f"bytes={len(pcm_bytes)} samples={num_samples} " + # f"room={session.room_name}" + # ) try: frame = AudioFrame(pcm_bytes, INPUT_SAMPLE_RATE, 1, num_samples) await session.mic_source.capture_frame(frame) @@ -881,7 +986,7 @@ class ESP32LiveKitBridge: elif isinstance(message, str): try: data = json.loads(message) - print(f"收到 ESP32 JSON 消息: {data}") + # print(f"收到 ESP32 JSON 消息: {data}") msg_type = data.get("type") if msg_type == "abort": reason = data.get("reason")