61 lines
2.0 KiB
Python
61 lines
2.0 KiB
Python
from fastapi import FastAPI, Query
|
||
from livekit import api
|
||
import os
|
||
import uvicorn
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv()
|
||
app = FastAPI()
|
||
|
||
# 请确保在你运行此代码的环境中已经设置了下面两个环境变量
|
||
# export LIVEKIT_API_KEY="your_api_key"
|
||
# export LIVEKIT_API_SECRET="your_api_secret"
|
||
|
||
@app.get("/getToken")
|
||
def get_token(room: str = Query(..., description="房间名"),
|
||
identity: str = Query(..., description="用户唯一标识"),
|
||
name: str = Query("", description="用户名(可选)"),
|
||
agent_name: str = Query("", description="要调度的Agent名称(可选)")):
|
||
"""
|
||
获取 LiveKit Access Token
|
||
"""
|
||
api_key = os.getenv('LIVEKIT_API_KEY')
|
||
api_secret = os.getenv('LIVEKIT_API_SECRET')
|
||
|
||
if not api_key or not api_secret:
|
||
return {"error": "未配置 LIVEKIT_API_KEY 或 LIVEKIT_API_SECRET 环境变量"}
|
||
|
||
# 基础 Token 配置
|
||
access_token = api.AccessToken(api_key, api_secret) \
|
||
.with_identity(identity) \
|
||
.with_name(name) \
|
||
.with_grants(api.VideoGrants(
|
||
room_join=True,
|
||
room=room
|
||
))
|
||
|
||
# 如果传了 agent_name,就在 Token 里加上显式调度的配置
|
||
if agent_name:
|
||
access_token = access_token.with_room_config(
|
||
api.RoomConfiguration(
|
||
agents=[
|
||
api.RoomAgentDispatch(
|
||
agent_name=agent_name,
|
||
metadata="通过Token显式调度"
|
||
)
|
||
],
|
||
)
|
||
)
|
||
|
||
return {
|
||
"token": access_token.to_jwt(),
|
||
"room": room,
|
||
"identity": identity,
|
||
"dispatched_agent": agent_name if agent_name else "none"
|
||
}
|
||
|
||
if __name__ == "__main__":
|
||
print("Starting token server on http://localhost:8000")
|
||
print("Example usage: http://localhost:8000/getToken?room=my-room&identity=user123")
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|