from fastapi import Request, Response, status from fastapi.responses import JSONResponse from injector import singleton,inject from typing import Optional, List from .blackbox import Blackbox from ..log.logging_time import logging_time # from .chroma_query import ChromaQuery from ..configuration import VLMConf import requests import base64 import copy import ast import io from PIL import Image from lmdeploy.serve.openai.api_client import APIClient import io from PIL import Image from lmdeploy.serve.openai.api_client import APIClient def is_base64(value) -> bool: try: base64.b64decode(base64.b64decode(value)) == value.encode() return True except Exception: return False @singleton class VLMS(Blackbox): @inject def __init__(self, vlm_config: VLMConf): """ Initialization for endpoint url and generation config. - temperature (float): to modulate the next token probability - top_p (float): If set to float < 1, only the smallest set of most probable tokens with probabilities that add up to top_p or higher are kept for generation. - max_tokens (int | None): output token nums. Default to None. - repetition_penalty (float): The parameter for repetition penalty. 1.0 means no penalty - stop (str | List[str] | None): To stop generating further tokens. Only accept stop words that's encoded to one token idex. Additional arguments supported by LMDeploy: - top_k (int): The number of the highest probability vocabulary tokens to keep for top-k-filtering - ignore_eos (bool): indicator for ignoring eos - skip_special_tokens (bool): Whether or not to remove special tokens in the decoding. Default to be True.""" self.url = vlm_config.url self.temperature: float = 0.7 self.top_p:float = 1 self.max_tokens: (int |None) = 512 self.repetition_penalty: float = 1 self.stop: (str | List[str] |None) = ['<|endoftext|>','<|im_end|>'] self.top_k: (int) = None self.ignore_eos: (bool) = False self.skip_special_tokens: (bool) = True self.settings: dict = { "temperature": self.temperature, "top_p":self.top_p, "max_tokens": self.max_tokens, "repetition_penalty": self.repetition_penalty, "stop": self.stop, "top_k": self.top_k, "ignore_eos": self.ignore_eos, "skip_special_tokens": self.skip_special_tokens, } def __call__(self, *args, **kwargs): return self.processing(*args, **kwargs) def valid(self, *args, **kwargs) -> bool: data = args[0] return isinstance(data, list) def processing(self, prompt:str, images:str | bytes, settings: dict, model_name: Optional[str] = None, user_context: List[dict] = None) -> str: """ Args: prompt: a string query to the model. images: a base64 string of image data; user_context: a list of history conversation, should be a list of openai format. settings: a dictionary set by user with fields stated in __init__ Return: response: a string history: a list """ if settings: for k in settings: if k not in self.settings: print("Warning: '{}' is not a support argument and ignore this argment, check the arguments {}".format(k,self.settings.keys())) settings.pop(k) tmp = copy.deepcopy(self.settings) tmp.update(settings) settings = tmp else: settings = {} # Transform the images into base64 format where openai format need. if images: if is_base64(images): # image as base64 str images_data = images elif isinstance(images,bytes): # image as bytes images_data = str(base64.b64encode(images),'utf-8') else: # image as pathLike str # with open(images, "rb") as img_file: # images_data = str(base64.b64encode(img_file.read()), 'utf-8') res = requests.get(images) images_data = str(base64.b64encode(res.content),'utf-8') else: images_data = None ## AutoLoad Model # url = 'http://10.6.80.87:8000/' + model_name + '/' # data_input = {'model': model_name, 'prompt': prompt, 'img_data': images_data} # data = requests.post(url, json=data_input) # print(data.text) # return data.text # 'https://raw.githubusercontent.com/open-mmlab/mmdeploy/main/tests/data/tiger.jpeg' ## Lmdeploy # if not user_context: # user_context = [] ## Predefine user_context only for testing # user_context = [{'role':'user','content':'你好,我叫康康,你是谁?'}, {'role': 'assistant', 'content': '你好!很高兴为你提供帮助。'}] # user_context = [{ # 'role': 'user', # 'content': [{ # 'type': 'text', # 'text': '图中有什么,请描述一下', # }, { # 'type': 'image_url', # 'image_url': { # 'url': 'https://raw.githubusercontent.com/open-mmlab/mmdeploy/main/tests/data/tiger.jpeg' # }, # }] # },{ # 'role': 'assistant', # 'content': '图片中主要展示了一只老虎,它正在绿色的草地上休息。草地上有很多可以让人坐下的地方,而且看起来相当茂盛。背景比较模糊,可能是因为老虎的影响,让整个图片的其他部分都变得不太清晰了。' # } # ] api_client = APIClient(self.url) # api_client = APIClient("http://10.6.80.91:23333") model_name = api_client.available_models[0] # Reformat input into openai format to request. if images_data: messages = user_context + [{ 'role': 'user', 'content': [{ 'type': 'text', 'text': prompt, },{ 'type': 'image_url', 'image_url': { # Image two 'url': # 'https://raw.githubusercontent.com/open-mmlab/mmdeploy/main/tests/data/tiger.jpeg' # './val_data/image_5.jpg' f"data:image/jpeg;base64,{images_data}", }, # },{ # Image one # 'type': 'image_url', # 'image_url': { # 'url': 'https://raw.githubusercontent.com/open-mmlab/mmdeploy/main/tests/data/tiger.jpeg' # }, }] } ] else: messages = user_context + [{ 'role': 'user', 'content': [{ 'type': 'text', 'text': prompt, }] } ] responses = '' total_token_usage = 0 # which can be used to count the cost of a query for i,item in enumerate(api_client.chat_completions_v1(model=model_name, messages=messages,#stream = True, **settings, # session_id=, )): # Stream output # print(item["choices"][0]["delta"]['content'],end='') # responses += item["choices"][0]["delta"]['content'] print(item["choices"][0]["message"]['content']) responses += item["choices"][0]["message"]['content'] # total_token_usage += item['usage']['total_tokens'] # 'usage': {'prompt_tokens': *, 'total_tokens': *, 'completion_tokens': *} user_context = messages + [{'role': 'assistant', 'content': responses}] return responses, user_context def _into_openai_format(self, context:List[list]) -> List[dict]: """ Convert the data into openai format. context: a list of list, each element have the form [user_input, response], and the first one of list 'user_input' is also tuple with [,text]; [image,text] or [[imgs],text] #TODO: add support for multiple images """ user_context = [] for i,item in enumerate(context): user_content = item[0] if isinstance(user_content, list): if len(user_content) == 1: user_content = [{ 'type': 'text', 'text': user_content[0] }] elif is_base64(user_content[0]): user_content = [{ 'type': 'image_url', 'image_url': { 'url': f"data:image/jpeg;base64,{user_content[0]}" }, },{ 'type': 'text', 'text': user_content[1] }] else: user_content = [{ 'type': 'image_url', 'image_url': { 'url': user_content[0] }, },{ 'type': 'text', 'text': user_content[1] }] else: user_content = [{ 'type': 'text', 'text': user_content }] user_context.append({ 'role': 'user', 'content': user_content }) user_context.append({ 'role': 'assistant', 'content': item[1] }) return user_context async def fast_api_handler(self, request: Request) -> Response: ## TODO: add support for multiple images and support image in form-data format json_request = True try: content_type = request.headers['content-type'] if content_type == 'application/json': data = await request.json() else: data = await request.form() json_request = False except Exception as e: return JSONResponse(content={"error": "json parse error"}, status_code=status.HTTP_400_BAD_REQUEST) model_name = data.get("model_name") prompt = data.get("prompt") settings: dict = data.get('settings') context = data.get("context") if not context: user_context = [] elif isinstance(context[0], list): user_context = self._into_openai_format(context) elif isinstance(context[0], dict): user_context = context else: return JSONResponse(content={"error": "context format error, should be in format of list or Openai_format"}, status_code=status.HTTP_400_BAD_REQUEST) if json_request: img_data = data.get("img_data") else: img_data = await data.get("img_data").read() if settings: settings = ast.literal_eval(settings) if prompt is None: return JSONResponse(content={'error': "Question is required"}, status_code=status.HTTP_400_BAD_REQUEST) if model_name is None or model_name.isspace(): model_name = "Qwen-VL-Chat" response, history = self.processing(prompt, img_data,settings, model_name,user_context=user_context) # jsonresp = str(JSONResponse(content={"response": self.processing(prompt, img_data, model_name)}).body, "utf-8") return JSONResponse(content={"response": response}, status_code=status.HTTP_200_OK)