Compare commits

..

1 Commits

Author SHA1 Message Date
f368e156f0 beaver test 2026-06-03 17:26:46 +08:00
8 changed files with 1417 additions and 0 deletions

228
beaver_terminal_client.py Normal file
View File

@ -0,0 +1,228 @@
from __future__ import annotations
import asyncio
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from uuid import uuid4
import aiohttp
from dotenv import load_dotenv
logger = logging.getLogger("beaver-terminal-client")
DEFAULT_BEAVER_WS_URL = "ws://127.0.0.1:8080/api/channels/terminal-dev/ws"
DEFAULT_TERMINAL_PEER_ID = "device-001"
DEFAULT_TERMINAL_DEVICE_NAME = "desk-terminal"
CUSTOM_ENV_PATH = Path(__file__).with_name(".env")
class BeaverTerminalError(RuntimeError):
pass
class BeaverTerminalConnectionClosed(BeaverTerminalError):
pass
@dataclass
class MessageIdGenerator:
peer_id: str
initial_counter: int = 0
instance_id: str | None = None
def __post_init__(self) -> None:
self.counter = self.initial_counter
def next_id(self) -> str:
self.counter += 1
if self.instance_id:
return f"{self.peer_id}-{self.instance_id}-{self.counter:06d}"
return f"{self.peer_id}-{self.counter:06d}"
def build_connect_frame(*, peer_id: str, device_name: str) -> dict[str, Any]:
return {
"type": "connect",
"peer_id": peer_id,
"device_name": device_name,
"capabilities": ["text"],
}
def build_message_frame(*, message_id: str, text: str) -> dict[str, Any]:
return {
"type": "message",
"message_id": message_id,
"text": text,
}
class BeaverTerminalClient:
def __init__(
self,
*,
url: str,
peer_id: str,
device_name: str,
http_session: aiohttp.ClientSession | None = None,
message_ids: MessageIdGenerator | None = None,
) -> None:
self._url = url
self._peer_id = peer_id
self._device_name = device_name
self._owned_session = http_session is None
self._http_session = http_session
self._ws: aiohttp.ClientWebSocketResponse | None = None
self._message_ids = message_ids or MessageIdGenerator(
peer_id=peer_id,
instance_id=uuid4().hex[:8],
)
self.session_id: str | None = None
async def connect(self) -> None:
await self._close_websocket()
session = self._ensure_http_session()
self._ws = await session.ws_connect(self._url)
await self._send_json(
build_connect_frame(peer_id=self._peer_id, device_name=self._device_name)
)
frame = await self._receive_json()
if frame.get("type") != "connected":
raise BeaverTerminalError(f"expected connected frame, received {frame!r}")
session_id = frame.get("session_id")
self.session_id = session_id if isinstance(session_id, str) else None
async def send_text(self, text: str) -> str:
for attempt in range(2):
if not self._websocket_is_open():
await self.connect()
message_id = self._message_ids.next_id()
message_frame = build_message_frame(message_id=message_id, text=text)
try:
await self._send_json(message_frame)
return await self._wait_for_reply(message_id)
except (aiohttp.ClientConnectionError, BeaverTerminalConnectionClosed) as exc:
if attempt == 1:
raise BeaverTerminalConnectionClosed(
"Beaver websocket closed before assistant reply"
) from exc
logger.info("Beaver websocket closed mid-turn; reconnecting with same peer_id")
await self.connect()
raise BeaverTerminalError("unreachable Beaver send state")
async def _wait_for_reply(self, message_id: str) -> str:
while True:
frame = await self._receive_json()
frame_type = frame.get("type")
if frame_type == "ack" and frame.get("message_id") == message_id:
reply = frame.get("reply")
if isinstance(reply, str):
return reply
continue
if (
frame_type == "message"
and frame.get("role") == "assistant"
and frame.get("message_id") == message_id
):
text = frame.get("text")
if frame.get("finish_reason") == "error":
raise BeaverTerminalError(text if isinstance(text, str) else "assistant turn failed")
return text if isinstance(text, str) else ""
if frame_type == "error":
error = frame.get("error")
raise BeaverTerminalError(error if isinstance(error, str) else "unknown error")
async def ping(self) -> bool:
await self._send_json({"type": "ping"})
while True:
frame = await self._receive_json()
if frame.get("type") == "pong":
return True
if frame.get("type") == "error":
error = frame.get("error")
raise BeaverTerminalError(error if isinstance(error, str) else "unknown error")
async def close(self) -> None:
await self._close_websocket()
if self._owned_session and self._http_session is not None:
await self._http_session.close()
self._http_session = None
async def _close_websocket(self) -> None:
if self._ws is not None:
await self._ws.close()
self._ws = None
def _websocket_is_open(self) -> bool:
return self._ws is not None and not self._ws.closed
def _ensure_http_session(self) -> aiohttp.ClientSession:
if self._http_session is None:
self._http_session = aiohttp.ClientSession()
return self._http_session
async def _send_json(self, frame: dict[str, Any]) -> None:
if self._ws is None:
raise BeaverTerminalError("Beaver websocket is not connected")
await self._ws.send_json(frame)
async def _receive_json(self) -> dict[str, Any]:
if self._ws is None:
raise BeaverTerminalError("Beaver websocket is not connected")
message = await self._ws.receive()
if message.type in (aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSING):
raise BeaverTerminalConnectionClosed("Beaver websocket closed")
if message.type == aiohttp.WSMsgType.ERROR:
raise BeaverTerminalConnectionClosed(
f"Beaver websocket error: {self._ws.exception()!r}"
)
if message.type != aiohttp.WSMsgType.TEXT:
raise BeaverTerminalError(f"expected Beaver text frame, received {message.type!r}")
data = message.json()
if not isinstance(data, dict):
raise BeaverTerminalError(f"expected Beaver JSON object, received {data!r}")
return data
def client_from_env() -> BeaverTerminalClient:
load_dotenv(dotenv_path=CUSTOM_ENV_PATH)
return BeaverTerminalClient(
url=os.getenv("BEAVER_WS_URL", DEFAULT_BEAVER_WS_URL),
peer_id=os.getenv("TERMINAL_PEER_ID", DEFAULT_TERMINAL_PEER_ID),
device_name=os.getenv("TERMINAL_DEVICE_NAME", DEFAULT_TERMINAL_DEVICE_NAME),
)
async def run_console() -> None:
logging.basicConfig(level=logging.INFO)
client = client_from_env()
try:
await client.connect()
logger.info("Connected to Beaver session_id=%s", client.session_id)
while True:
text = await asyncio.to_thread(input, "> ")
text = text.strip()
if not text:
continue
if text in {"quit", "exit"}:
return
try:
reply = await client.send_text(text)
except BeaverTerminalError as exc:
logger.error("Beaver turn failed: %s", exc)
continue
print(reply)
finally:
await client.close()
if __name__ == "__main__":
asyncio.run(run_console())

188
test_agent.py Normal file
View File

@ -0,0 +1,188 @@
import asyncio
import requests
import logging
from pathlib import Path
import uuid
import wave
import numpy as np
from datetime import datetime
from livekit import rtc
from livekit.rtc import AudioSource, AudioFrame, LocalAudioTrack
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger("test-agent")
TOKEN_URL = "http://localhost:8000/getToken"
WS_URL = "wss://esp32-vt80c4y6.livekit.cloud"
ROOM_NAME = "test-room20"
WAV_FILE = "2food.wav"
TEST_TIMEOUT = 30
class TestState:
def __init__(self):
self.agent_connected = False
self.tts_received = False
self.tts_count = 0
test_state = TestState()
def get_token(agent_name="my-agent"):
try:
resp = requests.get(
TOKEN_URL,
params={
"room": ROOM_NAME,
"identity": f"test-{uuid.uuid4().hex[:6]}",
"agent_name": agent_name,
},
timeout=5
)
resp.raise_for_status()
return resp.json()["token"]
except Exception as e:
logger.error(f"❌ 获取token失败: {e}")
raise
async def publish_wav(room, wav_path):
wav_path = Path(wav_path)
if not wav_path.exists():
logger.error(f"❌ WAV文件不存在: {wav_path}")
raise FileNotFoundError(f"文件不存在: {wav_path}")
logger.info(f"📂 开始上传: {wav_path}")
with wave.open(str(wav_path), "rb") as wf:
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
logger.info(f"📊 WAV信息: {sample_rate}Hz, {num_channels}ch, {sample_width*8}bit")
source = AudioSource(sample_rate, num_channels)
track = LocalAudioTrack.create_audio_track("mic", source)
await room.local_participant.publish_track(track)
logger.info("📡 已发布音轨")
frame_duration = 0.02
samples_per_frame = int(sample_rate * frame_duration)
while True:
data = wf.readframes(samples_per_frame)
if not data:
break
audio = np.frombuffer(data, dtype=np.int16)
if len(audio) == 0:
continue
samples_per_channel = len(audio) // num_channels
frame = AudioFrame(
data=data,
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=samples_per_channel,
)
await source.capture_frame(frame)
await asyncio.sleep(frame_duration)
logger.info("✅ WAV推流完成")
async def test_agent():
try:
logger.info("🔑 正在获取token...")
token = get_token()
logger.info("✅ Token获取成功")
room = rtc.Room()
@room.on("participant_connected")
def on_participant_connected(participant):
logger.info(f"✅ 参与者加入: {participant.identity}")
if "agent" in participant.identity.lower():
test_state.agent_connected = True
logger.info("🎉 Agent已连接")
@room.on("participant_disconnected")
def on_participant_disconnected(participant):
logger.info(f"❌ 参与者离开: {participant.identity}")
@room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
if track.kind == rtc.TrackKind.KIND_AUDIO:
test_state.tts_count += 1
logger.info(f"🎵 收到TTS音频! (第 {test_state.tts_count} 次)")
test_state.tts_received = True
logger.info(f"🔌 正在连接房间 {ROOM_NAME}...")
await room.connect(WS_URL, token)
logger.info("✅ 已连接到房间")
logger.info(f"🆔 本地参与者ID: {room.local_participant.identity}")
logger.info("⏳ 等待Agent连接...")
for i in range(10):
if test_state.agent_connected:
break
await asyncio.sleep(1)
if not test_state.agent_connected:
logger.warning("⚠️ Agent未连接")
return False
logger.info("🎙️ 正在上传测试音频...")
await publish_wav(room, WAV_FILE)
logger.info("⏳ 等待Agent响应...")
for i in range(TEST_TIMEOUT):
if test_state.tts_received:
logger.info("✅ 收到Agent TTS响应!")
break
if i % 5 == 0:
logger.info(f" 等待中... ({i+1}/{TEST_TIMEOUT}秒)")
await asyncio.sleep(1)
await asyncio.sleep(2)
logger.info("\n" + "="*60)
logger.info("✅ 测试结果")
logger.info("="*60)
logger.info(f"Agent连接: {'' if test_state.agent_connected else ''}")
logger.info(f"收到TTS响应: {'' if test_state.tts_received else ''}")
logger.info(f"TTS音频次数: {test_state.tts_count}")
logger.info("="*60)
await room.disconnect()
logger.info("✅ 已断开连接\n")
return test_state.agent_connected and test_state.tts_received
except Exception as e:
logger.error(f"❌ 测试失败: {e}", exc_info=True)
return False
async def main():
logger.info("🚀 开始测试custom_agent...\n")
success = await test_agent()
if success:
logger.info("✅ 测试成功custom_agent 正常工作")
logger.info("💡 提示: Agent内部的转录和响应日志只能在Agent自身看到")
logger.info(" 或通过 agent-starter-react 这样的客户端交互查看")
return 0
else:
logger.error("❌ 测试失败")
return 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
exit(exit_code)

55
test_asr.py Normal file
View File

@ -0,0 +1,55 @@
import asyncio
import logging
import wave
from asr import BlackboxSTT
from livekit import rtc
# 设置日志级别以查看输出
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("test-asr")
async def test():
# 替换为你本地的一个音频文件路径
audio_path = "/home/verachen/Music/voice/2food.wav"
# 初始化 ASR
stt = BlackboxSTT(url="http://10.6.80.21:5003/asr-blackbox", model_name="sensevoice")
print(f"Testing ASR connectivity with file: {audio_path}")
try:
# 读取音频文件
with wave.open(audio_path, "rb") as wf:
frames = wf.readframes(wf.getnframes())
# 简单构造一个 AudioBuffer (假设是单声道 16kHz)
# 实际上 BlackboxSTT._recognize_impl 会用 combine_audio_frames(buffer).to_wav_bytes()
# 所以我们需要传递一个包含 AudioFrame 的 list
# 这里我们模拟一个 Frame
frame = rtc.AudioFrame(
data=frames,
sample_rate=wf.getframerate(),
num_channels=wf.getnchannels(),
samples_per_channel=wf.getnframes(),
)
# 调用 recognize
result = await stt.recognize(buffer=[frame])
if result.alternatives:
print("\n--- ASR Result ---")
print(f"Text: {result.alternatives[0].text}")
print("------------------\n")
else:
print("ASR returned no text.")
except FileNotFoundError:
print(f"Error: Audio file not found at {audio_path}")
except Exception as e:
print(f"An error occurred: {e}")
if __name__ == "__main__":
asyncio.run(test())

253
test_beaver_llm.py Normal file
View File

@ -0,0 +1,253 @@
import json
import aiohttp
from aiohttp import web
try:
from custom.beaver_llm import BeaverLLM, latest_user_text
except ModuleNotFoundError:
from beaver_llm import BeaverLLM, latest_user_text
from livekit.agents import ChatContext
def test_latest_user_text_uses_most_recent_user_message() -> None:
ctx = ChatContext.empty()
ctx.add_message(role="user", content="first")
ctx.add_message(role="assistant", content="ignored")
ctx.add_message(role="user", content=["second", "line"])
assert latest_user_text(ctx) == "second\nline"
async def test_beaver_llm_can_connect_before_first_message(
unused_tcp_port: int,
) -> None:
received: list[dict[str, object]] = []
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
assert message.type == aiohttp.WSMsgType.TEXT
frame = json.loads(message.data)
received.append(frame)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:livekit-room",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:livekit-room",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-1",
"text": "beaver reply",
"finish_reason": "stop",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
beaver_llm = BeaverLLM(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="livekit-room",
device_name="livekit-custom-agent",
)
ctx = ChatContext.empty()
ctx.add_message(role="user", content="hello beaver")
try:
await beaver_llm.connect()
assert beaver_llm.session_id == "terminal-dev:local:livekit-room"
assert received == [
{
"type": "connect",
"peer_id": "livekit-room",
"device_name": "livekit-custom-agent",
"capabilities": ["text"],
}
]
collected = await beaver_llm.chat(chat_ctx=ctx).collect()
finally:
await beaver_llm.aclose()
await runner.cleanup()
assert collected.text == "beaver reply"
assert received[1]["type"] == "message"
assert received[1]["text"] == "hello beaver"
async def test_beaver_llm_connect_can_send_warmup_message(
unused_tcp_port: int,
) -> None:
received: list[dict[str, object]] = []
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
assert message.type == aiohttp.WSMsgType.TEXT
frame = json.loads(message.data)
received.append(frame)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:livekit-room",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:livekit-room",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-warmup",
"text": "ready",
"finish_reason": "stop",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
beaver_llm = BeaverLLM(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="livekit-room",
device_name="livekit-custom-agent",
)
try:
warmup_reply = await beaver_llm.connect(warmup_text="初始化连接")
finally:
await beaver_llm.aclose()
await runner.cleanup()
assert warmup_reply == "ready"
assert received[0] == {
"type": "connect",
"peer_id": "livekit-room",
"device_name": "livekit-custom-agent",
"capabilities": ["text"],
}
assert received[1]["type"] == "message"
assert received[1]["text"] == "初始化连接"
async def test_beaver_llm_sends_latest_user_text_and_returns_reply(
unused_tcp_port: int,
) -> None:
received: list[dict[str, object]] = []
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
assert message.type == aiohttp.WSMsgType.TEXT
frame = json.loads(message.data)
received.append(frame)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:livekit-room",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:livekit-room",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-1",
"text": "beaver reply",
"finish_reason": "stop",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
beaver_llm = BeaverLLM(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="livekit-room",
device_name="livekit-custom-agent",
)
ctx = ChatContext.empty()
ctx.add_message(role="system", content="ignored instructions")
ctx.add_message(role="user", content="hello beaver")
try:
collected = await beaver_llm.chat(chat_ctx=ctx).collect()
finally:
await beaver_llm.aclose()
await runner.cleanup()
assert collected.text == "beaver reply"
assert received[0] == {
"type": "connect",
"peer_id": "livekit-room",
"device_name": "livekit-custom-agent",
"capabilities": ["text"],
}
assert received[1]["type"] == "message"
assert received[1]["message_id"].startswith("livekit-room-")
assert received[1]["message_id"].endswith("-000001")
assert received[1]["text"] == "hello beaver"

View File

@ -0,0 +1,426 @@
import asyncio
import json
import sys
from pathlib import Path
import aiohttp
import pytest
from aiohttp import web
if __name__ == "__main__":
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
raise SystemExit(pytest.main([__file__]))
try:
from custom.beaver_terminal_client import (
BeaverTerminalClient,
BeaverTerminalError,
MessageIdGenerator,
build_connect_frame,
build_message_frame,
)
except ModuleNotFoundError:
from beaver_terminal_client import (
BeaverTerminalClient,
BeaverTerminalError,
MessageIdGenerator,
build_connect_frame,
build_message_frame,
)
def test_build_connect_frame_uses_stable_peer_id() -> None:
frame = build_connect_frame(peer_id="device-001", device_name="desk-terminal")
assert frame == {
"type": "connect",
"peer_id": "device-001",
"device_name": "desk-terminal",
"capabilities": ["text"],
}
def test_build_message_frame_uses_message_id_and_text() -> None:
frame = build_message_frame(message_id="device-001-000001", text="hello")
assert frame == {
"type": "message",
"message_id": "device-001-000001",
"text": "hello",
}
def test_message_id_generator_uses_monotonic_peer_counter() -> None:
generator = MessageIdGenerator(peer_id="device-001", initial_counter=7)
assert generator.next_id() == "device-001-000008"
assert generator.next_id() == "device-001-000009"
assert generator.counter == 9
def test_message_id_generator_can_include_instance_id() -> None:
generator = MessageIdGenerator(peer_id="device-001", instance_id="abc123ef")
assert generator.next_id() == "device-001-abc123ef-000001"
assert generator.next_id() == "device-001-abc123ef-000002"
async def test_client_connects_sends_text_and_returns_assistant_reply(
unused_tcp_port: int,
) -> None:
received: list[dict[str, object]] = []
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
assert message.type == aiohttp.WSMsgType.TEXT
frame = json.loads(message.data)
received.append(frame)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:device-001",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-1",
"text": "assistant reply",
"finish_reason": "stop",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
reply = await client.send_text("hello")
finally:
await client.close()
await runner.cleanup()
assert client.session_id == "terminal-dev:local:device-001"
assert reply == "assistant reply"
assert received == [
{
"type": "connect",
"peer_id": "device-001",
"device_name": "desk-terminal",
"capabilities": ["text"],
},
{
"type": "message",
"message_id": "device-001-000001",
"text": "hello",
},
]
async def test_client_returns_cached_duplicate_reply(unused_tcp_port: int) -> None:
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
frame = json.loads(message.data)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:device-001",
"accepted": False,
"duplicate": True,
"pending": False,
"reply": "cached assistant reply",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
reply = await client.send_text("hello")
finally:
await client.close()
await runner.cleanup()
assert reply == "cached assistant reply"
async def test_client_raises_on_error_frames(unused_tcp_port: int) -> None:
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
frame = json.loads(message.data)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "message":
await ws.send_json({"type": "error", "error": "text is required"})
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
with pytest.raises(BeaverTerminalError, match="text is required"):
await client.send_text("hello")
finally:
await client.close()
await runner.cleanup()
async def test_client_treats_assistant_finish_reason_error_as_failed_turn(
unused_tcp_port: int,
) -> None:
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
frame = json.loads(message.data)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "message":
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:device-001",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-1",
"text": "failed turn",
"finish_reason": "error",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
with pytest.raises(BeaverTerminalError, match="failed turn"):
await client.send_text("hello")
finally:
await client.close()
await runner.cleanup()
async def test_client_ping_sends_ping_and_waits_for_pong(unused_tcp_port: int) -> None:
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
frame = json.loads(message.data)
if frame["type"] == "connect":
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "ping":
await ws.send_json({"type": "pong"})
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
assert await client.ping()
finally:
await client.close()
await runner.cleanup()
async def test_client_reconnects_with_same_peer_id_when_socket_closes_before_send(
unused_tcp_port: int,
) -> None:
connect_peer_ids: list[str] = []
message_ids: list[str] = []
connection_count = 0
async def websocket_handler(request: web.Request) -> web.WebSocketResponse:
nonlocal connection_count
connection_count += 1
current_connection = connection_count
ws = web.WebSocketResponse()
await ws.prepare(request)
async for message in ws:
frame = json.loads(message.data)
if frame["type"] == "connect":
connect_peer_ids.append(frame["peer_id"])
await ws.send_json(
{
"type": "connected",
"channel_id": "terminal-dev",
"session_id": "terminal-dev:local:device-001",
}
)
elif frame["type"] == "message":
message_ids.append(frame["message_id"])
if current_connection == 1:
await ws.close()
continue
await ws.send_json(
{
"type": "ack",
"message_id": frame["message_id"],
"session_id": "terminal-dev:local:device-001",
"accepted": True,
}
)
await ws.send_json(
{
"type": "message",
"role": "assistant",
"message_id": frame["message_id"],
"run_id": "run-2",
"text": "reply after reconnect",
"finish_reason": "stop",
}
)
return ws
app = web.Application()
app.router.add_get("/api/channels/terminal-dev/ws", websocket_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "127.0.0.1", unused_tcp_port)
await site.start()
client = BeaverTerminalClient(
url=f"http://127.0.0.1:{unused_tcp_port}/api/channels/terminal-dev/ws",
peer_id="device-001",
device_name="desk-terminal",
message_ids=MessageIdGenerator(peer_id="device-001"),
)
try:
await client.connect()
await asyncio.sleep(0.01)
reply = await client.send_text("hello")
finally:
await client.close()
await runner.cleanup()
assert reply == "reply after reconnect"
assert connect_peer_ids == ["device-001", "device-001"]
assert message_ids == ["device-001-000001", "device-001-000002"]

130
test_livekit.py Normal file
View File

@ -0,0 +1,130 @@
import asyncio
import requests
from livekit import rtc
import wave
import numpy as np
from livekit.rtc import AudioSource, AudioFrame, LocalAudioTrack
TOKEN_URL = "http://localhost:8000/getToken"
WS_URL = "wss://esp32-vt80c4y6.livekit.cloud" # 你的 LiveKit Server 地址
ROOM_NAME = "test-room20"
import uuid
IDENTITY = f"uv-{uuid.uuid4().hex[:6]}"
# IDENTITY = "test-user0"
def get_token():
resp = requests.get(
TOKEN_URL,
params={
"room": ROOM_NAME,
"identity": IDENTITY,
"agent_name": "my-agent", # 关键!!!
},
)
data = resp.json()
return data["token"]
async def main():
token = get_token()
room = rtc.Room()
@room.on("participant_connected")
def on_participant_connected(participant):
print(f"✅ 有人加入房间: {participant.identity}")
@room.on("participant_disconnected")
def on_participant_disconnected(participant):
print(f"❌ 有人离开房间: {participant.identity}")
print("🔌 正在连接房间...")
await room.connect(WS_URL, token)
print("✅ 已连接房间:", ROOM_NAME)
print("当前房间成员:")
for p in room.remote_participants.values():
print(" -", p.identity)
@room.on("data_received")
def on_data_received(data, participant, kind, topic):
try:
msg = data.decode()
print(f"📩 来自 {participant.identity}: {msg}")
except:
print("📩 收到二进制数据")
@room.on("track_subscribed")
def on_track_subscribed(track, publication, participant):
print(f"🎧 订阅轨道: {participant.identity}")
if track.kind == rtc.TrackKind.KIND_AUDIO:
print("👉 TTS 音频来了")
# 等一下确保连接稳定
await asyncio.sleep(1)
await room.local_participant.publish_data(
b"hello",
reliable=True,
topic="chat"
)
# 上传 wav
await publish_wav(room, "2food.wav")
await room.disconnect()
async def publish_wav(room, wav_path):
print("🎵 开始上传本地 wav:", wav_path)
wf = wave.open(wav_path, "rb")
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
sample_width = wf.getsampwidth()
print(f"📊 WAV信息: {sample_rate}Hz, {num_channels}ch, {sample_width*8}bit")
# 创建音频源
source = AudioSource(sample_rate, num_channels)
# 创建本地音轨
track = LocalAudioTrack.create_audio_track("mic", source)
# 发布轨道
await room.local_participant.publish_track(track)
print("📡 已发布音轨")
frame_duration = 0.02 # 20ms
samples_per_frame = int(sample_rate * frame_duration)
while True:
data = wf.readframes(samples_per_frame)
if not data:
break
# 用于计算长度
audio = np.frombuffer(data, dtype=np.int16)
if len(audio) == 0:
continue
samples_per_channel = len(audio) // num_channels
frame = AudioFrame(
data=data, # ✅ 关键:用 bytes
sample_rate=sample_rate,
num_channels=num_channels,
samples_per_channel=samples_per_channel,
)
await source.capture_frame(frame)
await asyncio.sleep(frame_duration)
print("✅ wav 推流结束")
if __name__ == "__main__":
asyncio.run(main())

71
test_minimax.py Normal file
View File

@ -0,0 +1,71 @@
import asyncio
import os
import logging
from dotenv import load_dotenv
from livekit.agents.llm import ChatContext
from livekit.plugins import openai
# Configure logging to see what's happening
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("test-minimax")
async def test_minimax():
print("Loading .env...")
load_dotenv()
# Configuration from environment or defaults from custom_agent.py
MINIMAX_BASE_URL = os.getenv("MINIMAX_LLM_BASE_URL", "https://oai.bwgdi.com/v1")
MINIMAX_MODEL = os.getenv("MINIMAX_LLM_MODEL", "MiniMaxAI")
# Using the hardcoded key from custom_agent.py as a fallback if not in .env
API_KEY = os.getenv("MINIMAX_API_KEY", "sk-orez64WkG1NkfksB5j_hGA")
import httpx
from openai import AsyncClient as OpenAIAsyncClient
print(f"Connecting to Minimax at {MINIMAX_BASE_URL} using model {MINIMAX_MODEL}")
# Create a custom HTTP client that disables SSL verification
http_client = httpx.AsyncClient(verify=False)
# Create the OpenAI AsyncClient with the custom HTTP client
openai_client = OpenAIAsyncClient(
api_key=API_KEY,
base_url=MINIMAX_BASE_URL,
http_client=http_client,
)
llm = openai.LLM(
model=MINIMAX_MODEL,
client=openai_client,
)
print("Creating ChatContext...")
chat_ctx = ChatContext()
chat_ctx.add_message(
content="Hello! Can you introduce yourself? Please reply in Chinese.",
role="user",
)
print(f"\n--- Testing Streaming Chat ---")
print(f"Request: {chat_ctx.items[-1].content}")
print("Response: ", end="", flush=True)
try:
print("\nCalling llm.chat()...")
stream = llm.chat(chat_ctx=chat_ctx)
print("Iterating over stream...")
async for chunk in stream:
if chunk.delta and chunk.delta.content:
print(chunk.delta.content, end="", flush=True)
print("\n--- Test Completed Successfully ---")
except Exception as e:
logger.error(f"\nTest failed with error: {e}")
if __name__ == "__main__":
print("Starting...")
try:
asyncio.run(asyncio.wait_for(test_minimax(), timeout=30))
except asyncio.TimeoutError:
print("\nTest timed out after 30 seconds.")
except Exception as e:
print(f"\nAn error occurred: {e}")

66
test_voxcpm.py Normal file
View File

@ -0,0 +1,66 @@
import asyncio
import logging
import os
from tts import BlackboxTTS
logging.basicConfig(level=logging.INFO)
async def test_tts():
# Use the URL from the user's curl command
url = "http://10.6.80.21:5002/tts-blackbox"
# Check if we have a real wav file to test with
# In the earlier find_by_name, we found tests/change-sophie.wav
prompt_wav = "/home/verachen/Music/voice/2food.wav"
if not os.path.exists(prompt_wav):
prompt_wav = "/home/verachen/Music/voice/2food.wav" # fallback to the one in curl
print(f"Testing BlackboxTTS with URL: {url}")
print(f"Using prompt wav: {prompt_wav}")
blackbox_tts = BlackboxTTS(
url=url,
model_name="voxcpmtts",
prompt_wav_path=prompt_wav,
params={
"streaming": "false",
"prompt_text": "澳门有乜嘢好食嘅",
"cfg_value": "2.0",
"inference_timesteps": "10",
"do_normalize": "true",
"denoise": "true",
"retry_badcase": "true",
"retry_badcase_max_times": "3",
"retry_badcase_ratio_threshold": "6.0",
},
)
text = "你好,这是一段测试文本"
print(f"Synthesizing text: {text}")
try:
stream = blackbox_tts.synthesize(text)
audio_frame = await stream.collect()
print("Successfully synthesized audio!")
print(
f"Audio duration: {audio_frame.sample_rate * len(audio_frame.data) / (audio_frame.num_channels * 2)} samples?"
)
# Actually AudioFrame has duration or samples
print(f"Samples: {len(audio_frame.data) // 2}")
# Save to file for manual check if possible
with open("test_output.wav", "wb") as f:
# This won't be a valid WAV yet if it's just raw PCM,
# but if collect() returns combined frames, we can use to_wav_bytes()
f.write(audio_frame.to_wav_bytes())
print("Saved output to test_output.wav")
except Exception as e:
print(f"TTS test failed: {e}")
if __name__ == "__main__":
asyncio.run(test_tts())