Files
livekit_token/server.py
2026-05-21 10:14:05 +08:00

61 lines
2.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import FastAPI, Query
from livekit import api
import os
import uvicorn
from dotenv import load_dotenv
load_dotenv()
app = FastAPI()
# 请确保在你运行此代码的环境中已经设置了下面两个环境变量
# export LIVEKIT_API_KEY="your_api_key"
# export LIVEKIT_API_SECRET="your_api_secret"
@app.get("/getToken")
def get_token(room: str = Query(..., description="房间名"),
identity: str = Query(..., description="用户唯一标识"),
name: str = Query("", description="用户名(可选)"),
agent_name: str = Query("", description="要调度的Agent名称可选")):
"""
获取 LiveKit Access Token
"""
api_key = os.getenv('LIVEKIT_API_KEY')
api_secret = os.getenv('LIVEKIT_API_SECRET')
if not api_key or not api_secret:
return {"error": "未配置 LIVEKIT_API_KEY 或 LIVEKIT_API_SECRET 环境变量"}
# 基础 Token 配置
access_token = api.AccessToken(api_key, api_secret) \
.with_identity(identity) \
.with_name(name) \
.with_grants(api.VideoGrants(
room_join=True,
room=room
))
# 如果传了 agent_name就在 Token 里加上显式调度的配置
if agent_name:
access_token = access_token.with_room_config(
api.RoomConfiguration(
agents=[
api.RoomAgentDispatch(
agent_name=agent_name,
metadata="通过Token显式调度"
)
],
)
)
return {
"token": access_token.to_jwt(),
"room": room,
"identity": identity,
"dispatched_agent": agent_name if agent_name else "none"
}
if __name__ == "__main__":
print("Starting token server on http://localhost:8000")
print("Example usage: http://localhost:8000/getToken?room=my-room&identity=user123")
uvicorn.run(app, host="0.0.0.0", port=8000)