from typing import Union from fastapi import FastAPI, Request, status from fastapi.responses import JSONResponse, Response, StreamingResponse from src.blackbox.blackbox_factory import BlackboxFactory from fastapi.middleware.cors import CORSMiddleware from injector import Injector app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) injector = Injector() blackbox_factory = injector.get(BlackboxFactory) @app.post("/") @app.get("/") async def blackbox(blackbox_name: Union[str, None] = None, request: Request = None): if not blackbox_name: return await JSONResponse(content={"error": "blackbox_name is required"}, status_code=status.HTTP_400_BAD_REQUEST) try: box = blackbox_factory.get_blackbox(blackbox_name) except ValueError: return await JSONResponse(content={"error": "value error"}, status_code=status.HTTP_400_BAD_REQUEST) return await box.fast_api_handler(request) # @app.get("/audio/{filename}") # async def serve_audio(filename: str): # import os # # 确保文件存在 # if os.path.exists(filename): # with open(filename, 'rb') as f: # audio_data = f.read() # if filename.endswith(".mp3"): # # 对于 MP3 文件,设置为 inline 以在浏览器中播放 # return Response(content=audio_data, media_type="audio/mpeg", headers={"Content-Disposition": f"inline; filename={filename}"}) # elif filename.endswith(".wav"): # # 对于 WAV 文件,设置为 inline 以在浏览器中播放 # return Response(content=audio_data, media_type="audio/wav", headers={"Content-Disposition": f"inline; filename={filename}"}) # else: # return JSONResponse(content={"error": f"{filename} not found"}, status_code=status.HTTP_404_NOT_FOUND) @app.get("/audio/audio_files/{filename}") async def serve_audio(filename: str): import os import aiofiles filename = os.path.join("audio_files", filename) # 确保文件存在 if os.path.exists(filename): try: # 使用 aiofiles 异步读取文件 async with aiofiles.open(filename, 'rb') as f: audio_data = await f.read() # 根据文件类型返回正确的音频响应 if filename.endswith(".mp3"): return Response(content=audio_data, media_type="audio/mpeg", headers={"Content-Disposition": f"inline; filename={filename}"}) elif filename.endswith(".wav"): return Response(content=audio_data, media_type="audio/wav", headers={"Content-Disposition": f"inline; filename={filename}"}) else: return JSONResponse(content={"error": "Unsupported audio format"}, status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE) except asyncio.CancelledError: # 处理任务被取消的情况 return JSONResponse(content={"error": "Request was cancelled"}, status_code=status.HTTP_400_BAD_REQUEST) except Exception as e: # 捕获其他异常 return JSONResponse(content={"error": str(e)}, status_code=status.HTTP_500_INTERNAL_SERVER_ERROR) else: return JSONResponse(content={"error": f"{filename} not found"}, status_code=status.HTTP_404_NOT_FOUND) # @app.get("/audio/{filename}") # async def serve_audio(filename: str): # import os # file_path = f"audio/{filename}" # # 检查文件是否存在 # if os.path.exists(file_path): # return StreamingResponse(open(file_path, "rb"), media_type="audio/mpeg" if filename.endswith(".mp3") else "audio/wav") # else: # return JSONResponse(content={"error": f"{filename} not found"}, status_code=404)