|
import pinecone |
|
from colorama import Fore, Style |
|
|
|
from autogpt.logger import logger |
|
from autogpt.memory.base import MemoryProviderSingleton, get_ada_embedding |
|
|
|
|
|
class PineconeMemory(MemoryProviderSingleton): |
|
def __init__(self, cfg): |
|
pinecone_api_key = cfg.pinecone_api_key |
|
pinecone_region = cfg.pinecone_region |
|
pinecone.init(api_key=pinecone_api_key, environment=pinecone_region) |
|
dimension = 1536 |
|
metric = "cosine" |
|
pod_type = "p1" |
|
table_name = "auto-gpt" |
|
|
|
|
|
|
|
self.vec_num = 0 |
|
|
|
try: |
|
pinecone.whoami() |
|
except Exception as e: |
|
logger.typewriter_log( |
|
"FAILED TO CONNECT TO PINECONE", |
|
Fore.RED, |
|
Style.BRIGHT + str(e) + Style.RESET_ALL, |
|
) |
|
logger.double_check( |
|
"Please ensure you have setup and configured Pinecone properly for use. " |
|
+ f"You can check out {Fore.CYAN + Style.BRIGHT}https://github.com/Torantulino/Auto-GPT#-pinecone-api-key-setup{Style.RESET_ALL} to ensure you've set up everything correctly." |
|
) |
|
exit(1) |
|
|
|
if table_name not in pinecone.list_indexes(): |
|
pinecone.create_index( |
|
table_name, dimension=dimension, metric=metric, pod_type=pod_type |
|
) |
|
self.index = pinecone.Index(table_name) |
|
|
|
def add(self, data): |
|
vector = get_ada_embedding(data) |
|
|
|
resp = self.index.upsert([(str(self.vec_num), vector, {"raw_text": data})]) |
|
_text = f"Inserting data into memory at index: {self.vec_num}:\n data: {data}" |
|
self.vec_num += 1 |
|
return _text |
|
|
|
def get(self, data): |
|
return self.get_relevant(data, 1) |
|
|
|
def clear(self): |
|
self.index.delete(deleteAll=True) |
|
return "Obliviated" |
|
|
|
def get_relevant(self, data, num_relevant=5): |
|
""" |
|
Returns all the data in the memory that is relevant to the given data. |
|
:param data: The data to compare to. |
|
:param num_relevant: The number of relevant data to return. Defaults to 5 |
|
""" |
|
query_embedding = get_ada_embedding(data) |
|
results = self.index.query( |
|
query_embedding, top_k=num_relevant, include_metadata=True |
|
) |
|
sorted_results = sorted(results.matches, key=lambda x: x.score) |
|
return [str(item["metadata"]["raw_text"]) for item in sorted_results] |
|
|
|
def get_stats(self): |
|
return self.index.describe_index_stats() |
|
|