kenken999's picture
fda
0f43f8a
raw
history blame
4.99 kB
import importlib
import logging
import re
from typing import Dict, List
import openai
import weaviate
from weaviate.embedded import EmbeddedOptions
# default opt out of chromadb telemetry.
from chromadb.config import Settings
from transformers import AutoTokenizer, AutoModel
import torch
import numpy
# モデル名を指定
model_name = "sentence-transformers/all-MiniLM-L6-v2"
# トークナイザーとモデルをロード
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
client = chromadb.Client(Settings(anonymized_telemetry=False))
def can_import(module_name):
try:
importlib.import_module(module_name)
return True
except ImportError:
return False
assert can_import("weaviate"), (
"\033[91m\033[1m"
+ "Weaviate storage requires package weaviate-client.\nInstall: pip install -r extensions/requirements.txt"
)
def create_client(
weaviate_url: str, weaviate_api_key: str, weaviate_use_embedded: bool
):
if weaviate_use_embedded:
client = weaviate.Client(embedded_options=EmbeddedOptions())
else:
auth_config = (
weaviate.auth.AuthApiKey(api_key=weaviate_api_key)
if weaviate_api_key
else None
)
client = weaviate.Client(weaviate_url, auth_client_secret=auth_config)
return client
class WeaviateResultsStorage:
schema = {
"properties": [
{"name": "result_id", "dataType": ["string"]},
{"name": "task", "dataType": ["string"]},
{"name": "result", "dataType": ["text"]},
]
}
def __init__(
self,
openai_api_key: str,
weaviate_url: str,
weaviate_api_key: str,
weaviate_use_embedded: bool,
llm_model: str,
llama_model_path: str,
results_store_name: str,
objective: str,
):
openai.api_key = openai_api_key
self.client = create_client(
weaviate_url, weaviate_api_key, weaviate_use_embedded
)
self.index_name = None
self.create_schema(results_store_name)
self.llm_model = llm_model
self.llama_model_path = llama_model_path
def create_schema(self, results_store_name: str):
valid_class_name = re.compile(r"^[A-Z][a-zA-Z0-9_]*$")
if not re.match(valid_class_name, results_store_name):
raise ValueError(
f"Invalid index name: {results_store_name}. "
"Index names must start with a capital letter and "
"contain only alphanumeric characters and underscores."
)
self.schema["class"] = results_store_name
if self.client.schema.contains(self.schema):
logging.info(
f"Index named {results_store_name} already exists. Reusing it."
)
else:
logging.info(f"Creating index named {results_store_name}")
self.client.schema.create_class(self.schema)
self.index_name = results_store_name
def add(self, task: Dict, result: Dict, result_id: int, vector: List):
enriched_result = {"data": result}
vector = self.get_embedding(enriched_result["data"])
with self.client.batch as batch:
data_object = {
"result_id": result_id,
"task": task["task_name"],
"result": result,
}
batch.add_data_object(
data_object=data_object, class_name=self.index_name, vector=vector
)
def query(self, query: str, top_results_num: int) -> List[dict]:
query_embedding = self.get_embedding(query)
results = (
self.client.query.get(self.index_name, ["task"])
.with_hybrid(query=query, alpha=0.5, vector=query_embedding)
.with_limit(top_results_num)
.do()
)
return self._extract_tasks(results)
def _extract_tasks(self, data):
task_data = data.get("data", {}).get("Get", {}).get(self.index_name, [])
return [item["task"] for item in task_data]
# Get embedding for the text
def get_embedding(self, text: str) -> list:
text = text.replace("\n", " ")
inputs = tokenizer(text, return_tensors="pt")
outputs = model(**inputs)
# [CLS]トークンの出力を取得
embeddings = outputs.last_hidden_state[:,0,:].squeeze().detach().cpu().numpy().tolist()
return embeddings
if self.llm_model.startswith("llama"):
from llama_cpp import Llama
llm_embed = Llama(
model_path=self.llama_model_path,
n_ctx=2048,
n_threads=4,
embedding=True,
use_mlock=True,
)
return llm_embed.embed(text)
return openai.Embedding.create(input=[text], model="text-embedding-ada-002")[
"data"
][0]["embedding"]