Files
livekit_token/test_livekit.py
2026-05-21 10:14:05 +08:00

53 lines
1.2 KiB
Python

import asyncio
import requests
from livekit import rtc
TOKEN_URL = "http://localhost:8000/getToken"
WS_URL = "ws://localhost:7880" # 你的 LiveKit Server 地址
ROOM_NAME = "test-room"
IDENTITY = "test-user"
def get_token():
resp = requests.get(
TOKEN_URL,
params={
"room": ROOM_NAME,
"identity": IDENTITY,
"agent_name": "my-agent", # 关键!!!
},
)
data = resp.json()
return data["token"]
async def main():
token = get_token()
room = rtc.Room()
@room.on("participant_connected")
def on_participant_connected(participant):
print(f"✅ 有人加入房间: {participant.identity}")
@room.on("participant_disconnected")
def on_participant_disconnected(participant):
print(f"❌ 有人离开房间: {participant.identity}")
print("🔌 正在连接房间...")
await room.connect(WS_URL, token)
print("✅ 已连接房间:", ROOM_NAME)
print("当前房间成员:")
for p in room.participants.values():
print(" -", p.identity)
# 挂住观察
await asyncio.sleep(30)
await room.disconnect()
if __name__ == "__main__":
asyncio.run(main())