add chroma_query.py

This commit is contained in:
ACBBZ
2024-04-29 02:55:51 +00:00
parent 0b71c60082
commit 99c9cf83a4

68
src/blackbox/chroma_query.py Executable file
View File

@ -0,0 +1,68 @@
from typing import Any, Coroutine
from fastapi import Request, Response, status
from fastapi.responses import JSONResponse
from .blackbox import Blackbox
import requests
import json
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.document_loaders import TextLoader, DirectoryLoader
import chromadb
from chromadb.utils import embedding_functions
# from langchain_community.embeddings.sentence_transformer import (
# SentenceTransformerEmbeddings, HuggingFaceEmbeddings
# )
class ChromaQuery(Blackbox):
def __init__(self, *args, **kwargs) -> None:
# config = read_yaml(args[0])
# load embedding model
# self.embedding_model = embedding_functions.DefaultEmbeddingFunction()
self.embedding_model = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="/model/Weight/BAAI/bge-small-en-v1.5", device = "cuda")
# load chroma db
self.persistent_client = chromadb.PersistentClient(path="./data/test1")
def __call__(self, *args, **kwargs):
return self.processing(*args, **kwargs)
def valid(self, *args, **kwargs) -> bool:
data = args[0]
return isinstance(data, list)
def processing(self, question, collection_id, context: list) -> str:
# load or create collection
collection = persistent_client.get_or_create_collection(collection_id, embedding_function=embedding_model)
# query it
results = collection.query(
query_texts=[question],
n_results=3,
)
response = results["documents"] + results["ids"]
return results
async def fast_api_handler(self, request: Request) -> Response:
try:
data = await request.json()
except:
return JSONResponse(content={"error": "json parse error"}, status_code=status.HTTP_400_BAD_REQUEST)
user_question = data.get("question")
user_context = data.get("context")
user_collection_id = data.get("collection_id")
if user_question is None:
return JSONResponse(content={"error": "question is required"}, status_code=status.HTTP_400_BAD_REQUEST)
if user_collection_id is None or user_collection_id.isspace():
user_collection_id = "123"
return JSONResponse(
content={"response": self.processing(user_question, user_collection_id, user_context)},
status_code=status.HTTP_200_OK)