from io import BytesIO from typing import Any, Coroutine from fastapi import Request, Response, status from fastapi.responses import JSONResponse from funasr import AutoModel from funasr.utils.postprocess_utils import rich_transcription_postprocess from .blackbox import Blackbox from injector import singleton, inject import tempfile import os from ..configuration import SenseVoiceConf from ..log.logging_time import logging_time import logging logger = logging.getLogger(__name__) @singleton class ASR(Blackbox): mode: str url: str speed: int device: str language: str speaker: str @logging_time(logger=logger) def model_init(self, sensevoice_config: SenseVoiceConf) -> None: model_dir = "/home/gpu/Workspace/Models/SenseVoice/SenseVoiceSmall" self.speed = sensevoice_config.speed self.device = sensevoice_config.device self.language = sensevoice_config.language self.speaker = sensevoice_config.speaker self.device = sensevoice_config.device self.url = '' self.mode = sensevoice_config.mode self.asr = None self.speaker_ids = None os.environ['CUDA_VISIBLE_DEVICES'] = str(sensevoice_config.device) if self.mode == 'local': self.asr = AutoModel( model=model_dir, trust_remote_code=True, remote_code= "/home/gpu/Workspace/SenseVoice/model.py", vad_model="fsmn-vad", vad_kwargs={"max_single_segment_time": 30000}, device="cuda:0", ) else: self.url = sensevoice_config.url logging.info('#### Initializing SenseVoiceASR Service in cuda:' + str(sensevoice_config.device) + ' mode...') @inject def __init__(self, sensevoice_config: SenseVoiceConf) -> None: self.model_init(sensevoice_config) def __call__(self, *args, **kwargs): return self.processing(*args, **kwargs) async def processing(self, *args, **kwargs): data = args[0] # 创建一个临时文件 with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file: temp_audio_file.write(data) temp_audio_path = temp_audio_file.name res = self.asr.generate( input=temp_audio_path, cache={}, language="auto", # "zh", "en", "yue", "ja", "ko", "nospeech" use_itn=True, batch_size_s=60, merge_vad=True, # merge_length_s=15, ) # results = self.paraformer([BytesIO(data)]) results = rich_transcription_postprocess(res[0]["text"]) os.remove(temp_audio_path) if len(results) == 0: return None return results def valid(self, data: any) -> bool: if isinstance(data, bytes): return True return False async def fast_api_handler(self, request: Request) -> Response: data = (await request.form()).get("audio") if data is None: # self.logger.warn("asr bag request","type", "fast_api_handler", "api", "asr") return JSONResponse(content={"error": "data is required"}, status_code=status.HTTP_400_BAD_REQUEST) d = await data.read() try: txt = await self.processing(d) except ValueError as e: return JSONResponse(content={"error": str(e)}, status_code=status.HTTP_400_BAD_REQUEST) return JSONResponse(content={"text": txt}, status_code=status.HTTP_200_OK)