jfeng1115's picture
init commit
58d33f0
"""Wrapper around the Milvus vector database."""
from __future__ import annotations
import uuid
from typing import Any, Iterable, List, Optional, Tuple
import numpy as np
from langchain.docstore.document import Document
from langchain.embeddings.base import Embeddings
from langchain.vectorstores.base import VectorStore
from langchain.vectorstores.utils import maximal_marginal_relevance
class Milvus(VectorStore):
"""Wrapper around the Milvus vector database."""
def __init__(
self,
embedding_function: Embeddings,
connection_args: dict,
collection_name: str,
text_field: str,
):
"""Initialize wrapper around the milvus vector database.
In order to use this you need to have `pymilvus` installed and a
running Milvus instance.
See the following documentation for how to run a Milvus instance:
https://milvus.io/docs/install_standalone-docker.md
Args:
embedding_function (Embeddings): Function used to embed the text
connection_args (dict): Arguments for pymilvus connections.connect()
collection_name (str): The name of the collection to search.
text_field (str): The field in Milvus schema where the
original text is stored.
"""
try:
from pymilvus import Collection, DataType, connections
except ImportError:
raise ValueError(
"Could not import pymilvus python package. "
"Please install it with `pip install pymilvus`."
)
# Connecting to Milvus instance
if not connections.has_connection("default"):
connections.connect(**connection_args)
self.embedding_func = embedding_function
self.collection_name = collection_name
self.text_field = text_field
self.auto_id = False
self.primary_field = None
self.vector_field = None
self.fields = []
self.col = Collection(self.collection_name)
schema = self.col.schema
# Grabbing the fields for the existing collection.
for x in schema.fields:
self.fields.append(x.name)
if x.auto_id:
self.fields.remove(x.name)
if x.is_primary:
self.primary_field = x.name
if x.dtype == DataType.FLOAT_VECTOR or x.dtype == DataType.BINARY_VECTOR:
self.vector_field = x.name
# Default search params when one is not provided.
self.index_params = {
"IVF_FLAT": {"params": {"nprobe": 10}},
"IVF_SQ8": {"params": {"nprobe": 10}},
"IVF_PQ": {"params": {"nprobe": 10}},
"HNSW": {"params": {"ef": 10}},
"RHNSW_FLAT": {"params": {"ef": 10}},
"RHNSW_SQ": {"params": {"ef": 10}},
"RHNSW_PQ": {"params": {"ef": 10}},
"IVF_HNSW": {"params": {"nprobe": 10, "ef": 10}},
"ANNOY": {"params": {"search_k": 10}},
}
def add_texts(
self,
texts: Iterable[str],
metadatas: Optional[List[dict]] = None,
partition_name: Optional[str] = None,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[str]:
"""Insert text data into Milvus.
When using add_texts() it is assumed that a collecton has already
been made and indexed. If metadata is included, it is assumed that
it is ordered correctly to match the schema provided to the Collection
and that the embedding vector is the first schema field.
Args:
texts (Iterable[str]): The text being embedded and inserted.
metadatas (Optional[List[dict]], optional): The metadata that
corresponds to each insert. Defaults to None.
partition_name (str, optional): The partition of the collection
to insert data into. Defaults to None.
timeout: specified timeout.
Returns:
List[str]: The resulting keys for each inserted element.
"""
insert_dict: Any = {self.text_field: list(texts)}
try:
insert_dict[self.vector_field] = self.embedding_func.embed_documents(
list(texts)
)
except NotImplementedError:
insert_dict[self.vector_field] = [
self.embedding_func.embed_query(x) for x in texts
]
# Collect the metadata into the insert dict.
if len(self.fields) > 2 and metadatas is not None:
for d in metadatas:
for key, value in d.items():
if key in self.fields:
insert_dict.setdefault(key, []).append(value)
# Convert dict to list of lists for insertion
insert_list = [insert_dict[x] for x in self.fields]
# Insert into the collection.
res = self.col.insert(
insert_list, partition_name=partition_name, timeout=timeout
)
# Flush to make sure newly inserted is immediately searchable.
self.col.flush()
return res.primary_keys
def _worker_search(
self,
query: str,
k: int = 4,
param: Optional[dict] = None,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[int] = None,
**kwargs: Any,
) -> Tuple[List[float], List[Tuple[Document, Any, Any]]]:
# Load the collection into memory for searching.
self.col.load()
# Decide to use default params if not passed in.
if param is None:
index_type = self.col.indexes[0].params["index_type"]
param = self.index_params[index_type]
# Embed the query text.
data = [self.embedding_func.embed_query(query)]
# Determine result metadata fields.
output_fields = self.fields[:]
output_fields.remove(self.vector_field)
# Perform the search.
res = self.col.search(
data,
self.vector_field,
param,
k,
expr=expr,
output_fields=output_fields,
partition_names=partition_names,
round_decimal=round_decimal,
timeout=timeout,
**kwargs,
)
# Organize results.
ret = []
for result in res[0]:
meta = {x: result.entity.get(x) for x in output_fields}
ret.append(
(
Document(page_content=meta.pop(self.text_field), metadata=meta),
result.distance,
result.id,
)
)
return data[0], ret
def similarity_search_with_score(
self,
query: str,
k: int = 4,
param: Optional[dict] = None,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[Tuple[Document, float]]:
"""Perform a search on a query string and return results.
Args:
query (str): The text being searched.
k (int, optional): The amount of results ot return. Defaults to 4.
param (dict, optional): The search params for the specified index.
Defaults to None.
expr (str, optional): Filtering expression. Defaults to None.
partition_names (List[str], optional): Partitions to search through.
Defaults to None.
round_decimal (int, optional): Round the resulting distance. Defaults
to -1.
timeout (int, optional): Amount to wait before timeout error. Defaults
to None.
kwargs: Collection.search() keyword arguments.
Returns:
List[float], List[Tuple[Document, any, any]]: search_embedding,
(Document, distance, primary_field) results.
"""
_, result = self._worker_search(
query, k, param, expr, partition_names, round_decimal, timeout, **kwargs
)
return [(x, y) for x, y, _ in result]
def max_marginal_relevance_search(
self,
query: str,
k: int = 4,
fetch_k: int = 20,
param: Optional[dict] = None,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[Document]:
"""Perform a search and return results that are reordered by MMR.
Args:
query (str): The text being searched.
k (int, optional): How many results to give. Defaults to 4.
fetch_k (int, optional): Total results to select k from.
Defaults to 20.
param (dict, optional): The search params for the specified index.
Defaults to None.
expr (str, optional): Filtering expression. Defaults to None.
partition_names (List[str], optional): What partitions to search.
Defaults to None.
round_decimal (int, optional): Round the resulting distance. Defaults
to -1.
timeout (int, optional): Amount to wait before timeout error. Defaults
to None.
Returns:
List[Document]: Document results for search.
"""
data, res = self._worker_search(
query,
fetch_k,
param,
expr,
partition_names,
round_decimal,
timeout,
**kwargs,
)
# Extract result IDs.
ids = [x for _, _, x in res]
# Get the raw vectors from Milvus.
vectors = self.col.query(
expr=f"{self.primary_field} in {ids}",
output_fields=[self.primary_field, self.vector_field],
)
# Reorganize the results from query to match result order.
vectors = {x[self.primary_field]: x[self.vector_field] for x in vectors}
search_embedding = data
ordered_result_embeddings = [vectors[x] for x in ids]
# Get the new order of results.
new_ordering = maximal_marginal_relevance(
np.array(search_embedding), ordered_result_embeddings, k=k
)
# Reorder the values and return.
ret = []
for x in new_ordering:
if x == -1:
break
else:
ret.append(res[x][0])
return ret
def similarity_search(
self,
query: str,
k: int = 4,
param: Optional[dict] = None,
expr: Optional[str] = None,
partition_names: Optional[List[str]] = None,
round_decimal: int = -1,
timeout: Optional[int] = None,
**kwargs: Any,
) -> List[Document]:
"""Perform a similarity search against the query string.
Args:
query (str): The text to search.
k (int, optional): How many results to return. Defaults to 4.
param (dict, optional): The search params for the index type.
Defaults to None.
expr (str, optional): Filtering expression. Defaults to None.
partition_names (List[str], optional): What partitions to search.
Defaults to None.
round_decimal (int, optional): What decimal point to round to.
Defaults to -1.
timeout (int, optional): How long to wait before timeout error.
Defaults to None.
Returns:
List[Document]: Document results for search.
"""
_, docs_and_scores = self._worker_search(
query, k, param, expr, partition_names, round_decimal, timeout, **kwargs
)
return [doc for doc, _, _ in docs_and_scores]
@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings,
metadatas: Optional[List[dict]] = None,
**kwargs: Any,
) -> Milvus:
"""Create a Milvus collection, indexes it with HNSW, and insert data.
Args:
texts (List[str]): Text to insert.
embedding (Embeddings): Embedding function to use.
metadatas (Optional[List[dict]], optional): Dict metatadata.
Defaults to None.
Returns:
VectorStore: The Milvus vector store.
"""
try:
from pymilvus import (
Collection,
CollectionSchema,
DataType,
FieldSchema,
connections,
)
from pymilvus.orm.types import infer_dtype_bydata
except ImportError:
raise ValueError(
"Could not import pymilvus python package. "
"Please install it with `pip install pymilvus`."
)
# Connect to Milvus instance
if not connections.has_connection("default"):
connections.connect(**kwargs.get("connection_args", {"port": 19530}))
# Determine embedding dim
embeddings = embedding.embed_query(texts[0])
dim = len(embeddings)
# Generate unique names
primary_field = "c" + str(uuid.uuid4().hex)
vector_field = "c" + str(uuid.uuid4().hex)
text_field = "c" + str(uuid.uuid4().hex)
collection_name = "c" + str(uuid.uuid4().hex)
fields = []
# Determine metadata schema
if metadatas:
# Check if all metadata keys line up
key = metadatas[0].keys()
for x in metadatas:
if key != x.keys():
raise ValueError(
"Mismatched metadata. "
"Make sure all metadata has the same keys and datatype."
)
# Create FieldSchema for each entry in singular metadata.
for key, value in metadatas[0].items():
# Infer the corresponding datatype of the metadata
dtype = infer_dtype_bydata(value)
if dtype == DataType.UNKNOWN:
raise ValueError(f"Unrecognized datatype for {key}.")
elif dtype == DataType.VARCHAR:
# Find out max length text based metadata
max_length = 0
for subvalues in metadatas:
max_length = max(max_length, len(subvalues[key]))
fields.append(
FieldSchema(key, DataType.VARCHAR, max_length=max_length + 1)
)
else:
fields.append(FieldSchema(key, dtype))
# Find out max length of texts
max_length = 0
for y in texts:
max_length = max(max_length, len(y))
# Create the text field
fields.append(
FieldSchema(text_field, DataType.VARCHAR, max_length=max_length + 1)
)
# Create the primary key field
fields.append(
FieldSchema(primary_field, DataType.INT64, is_primary=True, auto_id=True)
)
# Create the vector field
fields.append(FieldSchema(vector_field, DataType.FLOAT_VECTOR, dim=dim))
# Create the schema for the collection
schema = CollectionSchema(fields)
# Create the collection
collection = Collection(collection_name, schema)
# Index parameters for the collection
index = {
"index_type": "HNSW",
"metric_type": "L2",
"params": {"M": 8, "efConstruction": 64},
}
# Create the index
collection.create_index(vector_field, index)
# Create the VectorStore
milvus = cls(
embedding,
kwargs.get("connection_args", {"port": 19530}),
collection_name,
text_field,
)
# Add the texts.
milvus.add_texts(texts, metadatas)
return milvus