mirror of
https://github.com/BoardWare-Genius/jarvis-models.git
synced 2025-12-13 16:53:24 +00:00
87 lines
3.7 KiB
Python
87 lines
3.7 KiB
Python
from typing import Union
|
|
|
|
from fastapi import FastAPI, Request, status
|
|
from fastapi.responses import JSONResponse, Response, StreamingResponse
|
|
|
|
from src.blackbox.blackbox_factory import BlackboxFactory
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from injector import Injector
|
|
|
|
app = FastAPI()
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
injector = Injector()
|
|
blackbox_factory = injector.get(BlackboxFactory)
|
|
|
|
@app.post("/")
|
|
@app.get("/")
|
|
async def blackbox(blackbox_name: Union[str, None] = None, request: Request = None):
|
|
if not blackbox_name:
|
|
return await JSONResponse(content={"error": "blackbox_name is required"}, status_code=status.HTTP_400_BAD_REQUEST)
|
|
try:
|
|
box = blackbox_factory.get_blackbox(blackbox_name)
|
|
except ValueError:
|
|
return await JSONResponse(content={"error": "value error"}, status_code=status.HTTP_400_BAD_REQUEST)
|
|
return await box.fast_api_handler(request)
|
|
|
|
# @app.get("/audio/{filename}")
|
|
# async def serve_audio(filename: str):
|
|
# import os
|
|
# # 确保文件存在
|
|
# if os.path.exists(filename):
|
|
# with open(filename, 'rb') as f:
|
|
# audio_data = f.read()
|
|
# if filename.endswith(".mp3"):
|
|
# # 对于 MP3 文件,设置为 inline 以在浏览器中播放
|
|
# return Response(content=audio_data, media_type="audio/mpeg", headers={"Content-Disposition": f"inline; filename={filename}"})
|
|
# elif filename.endswith(".wav"):
|
|
# # 对于 WAV 文件,设置为 inline 以在浏览器中播放
|
|
# return Response(content=audio_data, media_type="audio/wav", headers={"Content-Disposition": f"inline; filename={filename}"})
|
|
# else:
|
|
# return JSONResponse(content={"error": f"{filename} not found"}, status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
@app.get("/audio/audio_files/{filename}")
|
|
async def serve_audio(filename: str):
|
|
import os
|
|
import aiofiles
|
|
filename = os.path.join("audio_files", filename)
|
|
# 确保文件存在
|
|
if os.path.exists(filename):
|
|
try:
|
|
# 使用 aiofiles 异步读取文件
|
|
async with aiofiles.open(filename, 'rb') as f:
|
|
audio_data = await f.read()
|
|
|
|
# 根据文件类型返回正确的音频响应
|
|
if filename.endswith(".mp3"):
|
|
return Response(content=audio_data, media_type="audio/mpeg", headers={"Content-Disposition": f"inline; filename={filename}"})
|
|
elif filename.endswith(".wav"):
|
|
return Response(content=audio_data, media_type="audio/wav", headers={"Content-Disposition": f"inline; filename={filename}"})
|
|
else:
|
|
return JSONResponse(content={"error": "Unsupported audio format"}, status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE)
|
|
except asyncio.CancelledError:
|
|
# 处理任务被取消的情况
|
|
return JSONResponse(content={"error": "Request was cancelled"}, status_code=status.HTTP_400_BAD_REQUEST)
|
|
except Exception as e:
|
|
# 捕获其他异常
|
|
return JSONResponse(content={"error": str(e)}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
|
else:
|
|
return JSONResponse(content={"error": f"{filename} not found"}, status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
|
|
# @app.get("/audio/{filename}")
|
|
# async def serve_audio(filename: str):
|
|
# import os
|
|
# file_path = f"audio/{filename}"
|
|
# # 检查文件是否存在
|
|
# if os.path.exists(file_path):
|
|
# return StreamingResponse(open(file_path, "rb"), media_type="audio/mpeg" if filename.endswith(".mp3") else "audio/wav")
|
|
# else:
|
|
# return JSONResponse(content={"error": f"{filename} not found"}, status_code=404) |