from fastapi import FastAPI, Query from livekit import api import os import uvicorn from dotenv import load_dotenv load_dotenv() app = FastAPI() # 请确保在你运行此代码的环境中已经设置了下面两个环境变量 # export LIVEKIT_API_KEY="your_api_key" # export LIVEKIT_API_SECRET="your_api_secret" @app.get("/getToken") def get_token(room: str = Query(..., description="房间名"), identity: str = Query(..., description="用户唯一标识"), name: str = Query("", description="用户名(可选)"), agent_name: str = Query("", description="要调度的Agent名称(可选)")): """ 获取 LiveKit Access Token """ api_key = os.getenv('LIVEKIT_API_KEY') api_secret = os.getenv('LIVEKIT_API_SECRET') if not api_key or not api_secret: return {"error": "未配置 LIVEKIT_API_KEY 或 LIVEKIT_API_SECRET 环境变量"} # 基础 Token 配置 access_token = api.AccessToken(api_key, api_secret) \ .with_identity(identity) \ .with_name(name) \ .with_grants(api.VideoGrants( room_join=True, room=room )) # 如果传了 agent_name,就在 Token 里加上显式调度的配置 if agent_name: access_token = access_token.with_room_config( api.RoomConfiguration( agents=[ api.RoomAgentDispatch( agent_name=agent_name, metadata="通过Token显式调度" ) ], ) ) return { "token": access_token.to_jwt(), "room": room, "identity": identity, "dispatched_agent": agent_name if agent_name else "none" } if __name__ == "__main__": print("Starting token server on http://localhost:8000") print("Example usage: http://localhost:8000/getToken?room=my-room&identity=user123") uvicorn.run(app, host="0.0.0.0", port=8000)