Introduction
Vector streaming is being introduced in EmbedAnything, a feature designed to streamline large-scale document embedding. Enabling asynchronous sharding and embedding using Rust concurrency reduces memory usage and speeds up the process. Today, I’ll show how to integrate it with Weaviate’s vector database for seamless image search and embedding.
In my previous article, Supercharge Your Embedding Pipeline with EmbedAnything, I talked about the idea behind EmbedAnything and how it makes it easier to create embeddings from multiple modalities. In this article, I want to introduce a new feature of EmbedAnything called vector streaming and see how it works with Weaviate’s vector database.
Overview
- Vector streaming in EmbedAnything optimizes large-scale document embedding using asynchronous sharding with Rust's concurrency.
- Solves memory and efficiency issues in traditional integration methods by processing fragments in parallel.
- Integration with Weaviate allows for seamless integration and search into a vector database.
- Implementing vector streaming involves creating a database adapter, starting an ingestion model, and ingesting data.
- This approach offers a more efficient, scalable and flexible solution for large-scale document embedding.
What's the problem?
First, let us examine the current problem with creating embeddings, especially in large-scale documents. Current embedding frameworks work with a two-step process: chunking and embedding. First, text is extracted from all files and chunks/nodes are created. These chunks are then fed into an embedding model with a specific batch size to process the embeddings. While this is being done, the chunks and embeddings remain in the system memory.
This is not a problem when the files and embedding dimensions are small. But it becomes a problem when there are many files and you are working with large models and worse, multi-vector embeddings. So, to work with this, a lot of RAM is required to process the embeddings. Also, if this is done synchronously, a lot of time is wasted while creating the chunks since chunking is not a computationally intensive operation. As the chunks are created, passing them to the embedding model would be efficient.
Our solution to the problem
The solution is to create an asynchronous sharding and embedding task. We can effectively spawn threads to handle this task using Rust's concurrency patterns and thread safety. This is done using The rust MPSC (Multi-producer Single Consumer) module, which passes messages between threads. This creates a stream of fragments that are passed to the embedding thread with a buffer. Once the buffer is full, it embeds the fragments and sends the embeddings back to the main thread, which then sends them to the vector database. This ensures that no time is wasted on a single operation and that there are no bottlenecks. In addition, the system stores only the fragments and embeddings in the buffer, clearing them from memory once they are moved to the vector database.
Example use case with EmbedAnything
Now, let's see this function in action:
With EmbedAnything, streaming vectors from a file directory to the vector database is a simple three-step process.
- Create an adapter for your vector database: This is a wrapper for the database functions that allow you to create an index, convert metadata from EmbedAnything format to the format required by the database, and the function to insert the embeds into the index. Adapters for the featured databases have already been created and are present. here.
- Start an integration model of your choice: You can choose between different local models or even cloud models. The configuration can also be determined by setting the chunk size and the buffer size for the number of embeddings to be transmitted at once. Ideally, this value should be as large as possible, but this is limited by system RAM.
- Calling the embed function from EmbedAnything: Simply pass the directory path to embed, the embed model, adapter and configuration.
In this example, we will incorporate a directory of images and send it to the vector databases.
Step 1: Create the adapter
In EmbedAnything, adapters are built externally so as not to make the library too heavy and you can choose which database you want to work with. Here is a simple adapter for Weaviate:
from embed_anything import EmbedData
from embed_anything.vectordb import Adapter
class WeaviateAdapter(Adapter):
def __init__(self, api_key, url):
super().__init__(api_key)
self.client = weaviate.connect_to_weaviate_cloud(
cluster_url=url, auth_credentials=wvc.init.Auth.api_key(api_key)
)
if self.client.is_ready():
print("Weaviate is ready")
def create_index(self, index_name: str):
self.index_name = index_name
self.collection = self.client.collections.create(
index_name, vectorizer_config=wvc.config.Configure.Vectorizer.none()
)
return self.collection
def convert(self, embeddings: List(EmbedData)):
data = ()
for embedding in embeddings:
property = embedding.metadata
property("text") = embedding.text
data.append(
wvc.data.DataObject(properties=property, vector=embedding.embedding)
)
return data
def upsert(self, embeddings):
data = self.convert(embeddings)
self.client.collections.get(self.index_name).data.insert_many(data)
def delete_index(self, index_name: str):
self.client.collections.delete(index_name)
### Start the client and index
URL = "your-weaviate-url"
API_KEY = "your-weaviate-api-key"
weaviate_adapter = WeaviateAdapter(API_KEY, URL)
index_name = "Test_index"
if index_name in weaviate_adapter.client.collections.list_all():
weaviate_adapter.delete_index(index_name)
weaviate_adapter.create_index("Test_index")
Step 2: Create the embedding model
Here, since we are embedding images, we can use the clip model.
import embed_anything import WhichModel
model = embed_anything.EmbeddingModel.from_pretrained_cloud(
embed_anything.WhichModel.Clip,
model_id="openai/clip-vit-base-patch16")
Step 3: Embed the directory
data = embed_anything.embed_image_directory(
"\image_directory",
embeder=model,
adapter=weaviate_adapter,
config=embed_anything.ImageEmbedConfig(buffer_size=100),
)
Step 4: Query the vector database
query_vector = embed_anything.embed_query(("image of a cat"), embeder=model)(0).embedding
Step 5: Query the vector database
response = weaviate_adapter.collection.query.near_vector(
near_vector=query_vector,
limit=2,
return_metadata=wvc.query.MetadataQuery(certainty=True),
)
Check the response;
Production
Using the Clip model, we vectorized the entire directory with images of cats, dogs, and monkeys. With the simple query “cat images,” we were able to find images of cats in all the files.
See the notebook for the code here And to the.
Conclusion
I think vector streaming is one of the features that will allow many engineers to opt for a more streamlined solution without technological debt. Instead of using bulky cloud frameworks, you can use a lightweight streaming option.
Check out the GitHub repository here: EmbedAnything Repository.
Frequently Asked Questions
Answer: Vector streaming is a feature that optimizes large-scale document embedding by using Rust's concurrency for asynchronous sharding and embedding, reducing memory usage and speeding up the process.
Answer: It addresses high memory usage and inefficiency in traditional integration methods by processing fragments asynchronously, thereby reducing bottlenecks and optimizing resource usage.
Answer: It uses an adapter to connect EmbedAnything to the Weaviate vector database, allowing seamless data integration and querying.
Answer: These are the steps:
1. Create a database adapter.
2. Start an embedding model.
3. Embed the directory.
4. Consult the vector database.
Answer: It offers greater efficiency, lower memory usage, scalability and flexibility compared to traditional integration methods.