Automated Vector Database Enrichment in Wallaroo

Table of Contents

This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.

RAG LLMs: Automated Vector Database Enrichment in Wallaroo

The following demonstrates using a Bidirectional Attentive Autoencoder for Inducing Semantics (BAAI) general embedding (BGE) model to update embeddings in a vector database. This process uses Wallaroo features to:

  • Deploy the BGE model for embedding computation.
  • Create a Wallaroo Data Connector to connect to a vector database.
  • Use Wallaroo Inference Automations to batch process documents on a regular basis to update embeddings in the vector database.

These embeddings are used in a vector database to generate context for RAG LLMs - text matching the embedding is used by the RAG LLM to narrow its responses and prevent hallucinations.

For this example, the Mongo Atlas Vector Database is used as the representational database.

For access to these sample models and for a demonstration of how to use a LLM Validation Listener.

Library Import

For this tutorial, import the pymongo package. This is used later to interact with the MongoDB Atlas Vector Database.

!pip install pymongo
Collecting pymongo
  Using cached pymongo-4.7.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (668 kB)
Collecting dnspython<3.0.0,>=1.16.0
  Using cached dnspython-2.6.1-py3-none-any.whl (307 kB)
Installing collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.7.3

Next we import the libraries used for this tutorial into the notebook.

import json
import os
import pymongo

import wallaroo
from wallaroo.pipeline   import Pipeline
from wallaroo.deployment_config import DeploymentConfigBuilder
from wallaroo.framework import Framework
from wallaroo.engine_config import Architecture

import pyarrow as pa
import numpy as np
import pandas as pd

import zipfile
import time

Upload and Deploy BGE Model

This process shows how to upload the sample BGE model to Wallaroo and perform sample inferences through it that generate the embeddings.

Connect to the Wallaroo Instance

This step sets a connection to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

wl = wallaroo.Client(request_timeout=480)
workspace = wl.get_workspace("embedding-computation", create_if_not_exist=True)
_ = wl.set_current_workspace(ws)

Upload BGE Model

Before uploading the BGE model, we define the input and output schemas in Apache PyArrow Schema format.

input_schema = pa.schema([
    pa.field('text', pa.string())
])
output_schema = pa.schema([
    pa.field('embedding', 
        pa.list_(
            pa.float64(), list_size=768
        ),
    )
])

The BGE model is a Hugging Face model in a Wallaroo BYOP framework in the file byop_bge_base2.zip. We upload it to Wallaroo via the wallaroo.client.Client.upload_model method, providing the following parameters:

  • The name to assign to the BGE model.
  • The file path to upload the model.
  • The Framework set to wallaroo.framework.Framework.CUSTOM for our Hugging Face model encapsulated in the BYOP framework.
  • The input and output schemas.

For more information, see the Wallaroo Model Upload guide.

model = wl.upload_model('byop-bge-base-v2', 
    'byop_bge_base2.zip',
    framework=Framework.CUSTOM,
    input_schema=input_schema,
    output_schema=output_schema,
)
model
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime.......................................successful

Ready
Namebyop-bge-base-v2
Versionc5bb0af6-eb8a-403b-9ada-bd92d8bdcdc7
File Namebyop_bge_base2.zip
SHA4854c685c46258ecbbfe55cf4e516b9f4b578bd87cc14cd0a9be4775e91ced6d
Statusready
Image Pathproxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/mac-deploy:v2024.1.0-5208
Architecturex86
Accelerationnone
Updated At2024-26-Jun 18:11:30

Deployment Configuration Settings

Before deploying the model, we set the deployment configuration, which sets what resources are allocated to the model through the method wallaroo.deployment_config.DeploymentConfigBuilder. For this example, the following resources are allocated to the BGE model:

  • Cpus: 4
  • Memory: 3 Gi

For more details, see Model Deployment Configuration.

deployment_config = DeploymentConfigBuilder() \
    .cpus(1).memory('2Gi') \
    .sidekick_cpus(model, 4) \
    .sidekick_memory(model, '3Gi') \
    .build()

Deploy BGE Model

The BGE model is deployed through the following steps:

  1. Create a Wallaroo pipeline.
  2. Set the BGE model as a pipeline step.
  3. Deploy the pipeline with the wallaroo.pipeline.Pipeline.deploy(deployment_config) method. This deploys the pipeline and sets the deployment configuration.

Once deployed, the BGE model is ready for inference requests.

pipeline = wl.build_pipeline("byop-bge-pipe-base-v2")
pipeline.add_model_step(model)
pipeline.deploy(deployment_config=deployment_config)
Waiting for deployment - this will take up to 480s ....................................................................................................... ok
namebyop-bge-pipe-base-v2
created2024-06-26 18:12:21.032554+00:00
last_updated2024-06-26 18:12:21.085707+00:00
deployedTrue
archx86
accelnone
tags
versionse9e8a7de-9aff-400c-a3b3-4bd280ee1923, a0e0949b-01f2-413a-8a09-8875c32f00e8
stepsbyop-bge-base-v2
publishedFalse

Sample Inference

Models deployed in Wallaroo accept either pandas DataFrames or Apache Arrow tables as inputs. For our inference example, we submit a pandas DataFrame, then see the BGE embedding results in the column out.embedding.

pipeline.infer(pd.DataFrame({"text": ["embed this sentence."]}))
timein.textout.embeddinganomaly.count
02024-06-26 20:09:20.979embed this sentence.[0.02977638, -0.017274762, 0.048839126, -0.023...0

Vector Database Connection with Wallaroo

Wallaroo Data Connections define settings that are stored and used for connecting to different data sources. For full details, see Data Connections Management.

The following shows creating a Wallaroo Data Connection and saving the artifacts used to connect to the sample Mongo Atlas Vector database. The Data Connection is assigned to the workspace for use by other workspace team members.

connect = wl.create_connection("mongodb_atlas", 
                     "mongodb", 
                     details={"uri":"mongodb+srv://<username>:<password>@cluster0.lfnspv5.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0"}
                     )

workspace.add_connection("mongodb_atlas")
FieldValue
Namemongodb_atlas
Connection Typemongodb
Details*****
Created At2024-06-20T22:00:39.396887+00:00
Linked Workspaces[]

We test the connection by using the connection details() method, which retrieves the stored credentials and other data, and store the movie data into the collection variable for use later.

client = pymongo.MongoClient(connect.details()["uri"])
db = client.sample_mflix
collection = db.movies

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)
Pinged your deployment. You successfully connected to MongoDB!

Run Inference on Documents

With out collection of movie data, we’ll scan through and find any that have the plot value, then use that to create a DataFrame from those values. For this example, we’ll limit our selection to 10 elements.

texts = []
for doc in collection.find({'plot':{"$exists": True}}).limit(10):
    texts.append(doc['plot'])
data = pd.DataFrame({'text': texts})
display(data)
text
0A group of bandits stage a brazen train hold-u...
1A greedy tycoon decides, on a whim, to corner ...
2Cartoon figures announce, via comic strip ball...
3A woman, with the aid of her police officer sw...
4The cartoonist, Winsor McCay, brings the Dinos...
5Original advertising for the film describes it...
6Young Pauline is left a lot of money when her ...
7An immigrant leaves his sweetheart in Italy to...
8At 10 years old, Owens becomes a ragged orphan...
9Christ takes on the form of a pacifist count t...

We submit an inference request with our data and get the new embedding values from each submission.

result = pipeline.infer(data, timeout=10000)
result
timein.textout.embeddinganomaly.count
02024-06-26 20:15:35.259A group of bandits stage a brazen train hold-u...[-0.027950192, -0.054571882, -0.002392033, 0.0...0
12024-06-26 20:15:35.259A greedy tycoon decides, on a whim, to corner ...[-0.071634166, -0.0073989113, -0.025931077, -0...0
22024-06-26 20:15:35.259Cartoon figures announce, via comic strip ball...[-0.00864067, -0.020116393, 0.035886534, -0.00...0
32024-06-26 20:15:35.259A woman, with the aid of her police officer sw...[-0.06523778, -0.09331782, -0.02681339, -0.007...0
42024-06-26 20:15:35.259The cartoonist, Winsor McCay, brings the Dinos...[-0.07010095, -0.035720695, -0.03118671, 0.026...0
52024-06-26 20:15:35.259Original advertising for the film describes it...[-0.02530954, 0.012174658, -0.016730076, -0.00...0
62024-06-26 20:15:35.259Young Pauline is left a lot of money when her ...[-0.03885297, -0.018563386, 0.010222761, -0.00...0
72024-06-26 20:15:35.259An immigrant leaves his sweetheart in Italy to...[-0.07279091, -0.050980825, 0.029236948, 0.016...0
82024-06-26 20:15:35.259At 10 years old, Owens becomes a ragged orphan...[-0.10594661, 0.0073492057, -0.0008419599, -0....0
92024-06-26 20:15:35.259Christ takes on the form of a pacifist count t...[-0.04602558, -0.013552995, 0.01844381, -0.022...0

Inference Automation Embedding Generation

This step demonstrates using Wallaroo Inference Automation to generate the embeddings and store the results in our vector database either as a single task, or as a repeated task that scans the database and generates new embeddings on a regular schedule.

Inference Automation Script

The BGE Inference Automation contains the following items:

  • main.py: A Python script that uses the Wallaroo connection defined in the step Vector Database Connection with Wallaroo to retrieve Movie plot information, run the text through the BGE model, then upload the embeddings into the vector database.
  • requirements.txt: A list of the Python libraries required for the main.py script to execute, which includes pymongo==4.7.3.

Before executing the embedding script, the database table doesn’t contain the vector index values. The following is an example of database pre-embedding:

titleplotplot_embedding_hf
The Great Train RobberyA group of bandits stage a brazen train hold-u… 
A Corner in WheatA greedy tycoon decides, on a whim, to corner … 
Little NemoCartoon figures announce, via comic strip ball… 
Traffic in SoulsA woman, with the aid of her police officer sw… 
Gertie the DinosaurThe cartoonist, Winsor McCay, brings the Dinos… 
In the Land of the Head HuntersOriginal advertising for the film describes it… 
The Perils of PaulineYoung Pauline is left a lot of money when her … 
The ItalianAn immigrant leaves his sweetheart in Italy to… 
The RegenerationAt 10 years old, Owens becomes a ragged orphan… 
CivilizationChrist takes on the form of a pacifist count t… 

The following is a snippet from the main.py script showing the database connection, inferencing the text to create the embeddings, and uploading the embeddings into the vector database.

    for doc in collection.find({'plot':{"$exists": True}}):
        myquery = { 'plot': doc['plot']} # retrieve the plot
        
        data = pd.DataFrame({'text': doc['plot']}) # convert the plot into a pandas DataFrame
        embedding = pipeline.infer(data)['out.embedding'] # infer on the plot text to create the embedding
        update = { '$set': { 'plot_embedding_hf': embedding } } # add the embedding to the data set
        
        collection.updateOne(myquery, update) # update the vector database with the new embedding

The following is an example of database post-embedding:

titleplotplot_embedding_hf
The Great Train RobberyA group of bandits stage a brazen train hold-u…[-0.027950192, -0.054571882, -0.002392033, 0.0…]
A Corner in WheatA greedy tycoon decides, on a whim, to corner …[-0.071634166, -0.0073989113, -0.025931077, -0…]
Little NemoCartoon figures announce, via comic strip ball…[-0.00864067, -0.020116393, 0.035886534, -0.00…]
Traffic in SoulsA woman, with the aid of her police officer sw…[-0.06523778, -0.09331782, -0.02681339, -0.007…]
Gertie the DinosaurThe cartoonist, Winsor McCay, brings the Dinos…[-0.07010095, -0.035720695, -0.03118671, 0.026…]
In the Land of the Head HuntersOriginal advertising for the film describes it…[-0.02530954, 0.012174658, -0.016730076, -0.00…]
The Perils of PaulineYoung Pauline is left a lot of money when her …[-0.03885297, -0.018563386, 0.010222761, -0.00…]
The ItalianAn immigrant leaves his sweetheart in Italy to…[-0.07279091, -0.050980825, 0.029236948, 0.016…]
The RegenerationAt 10 years old, Owens becomes a ragged orphan…[-0.10594661, 0.0073492057, -0.0008419599, -0….]
CivilizationChrist takes on the form of a pacifist count t…[-0.04602558, -0.013552995, 0.01844381, -0.022…]

Upload the Inference Automation as Orchestration

To generate the inference automation, we we zip all of the components:

  • The Python script that executes the task with the supplied arguments.
  • A requirements.txt file to set what Python libraries to use.
files_to_include = [
    'orchestration/main.py', # execution script
    'orchestration/requirements.txt' # required if you have additional package dependencies beyond what's included in wallaroo environment
]

zipfile_name = 'orchestration.zip'

with zipfile.ZipFile(zipfile_name, mode='w') as archive:
    for filename in files_to_include:
        archive.write(filename, filename.split('/')[-1])
        
# verify the contents
with zipfile.ZipFile(zipfile_name, mode='r') as archive:
    archive.printdir()
File Name                                             Modified             Size
main.py                                        2024-06-22 20:40:32         1725
requirements.txt                               2024-06-22 20:10:28           42

With the inference automation stored in our .zip file, we upload it. Once it’s status is ready, we can generate new tasks from the inference automation.

Next we can upload our orchestration:

orchestration = wl.upload_orchestration(name='automated-embedding-generation', path='orchestration.zip')

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(15)
pending_packaging
packaging
packaging
packaging

BGE Embedding Run Once Task

With our inference automation uploaded and prepared, we can create two types of tasks:

  • Run Once Task: Parameters are passed to the inference automation that generates one execution of the script.
  • Run Schedule Tasks: The parameters and schedule are passed, which generates a new task from the automation inference every time the cron schedule is met.

For more details, see Inference Automation: Task Methods.

The following generates a Run Once task, specifying the Wallaroo Connection, pipeline, and workspace. The write_db parameter indicates whether to write the new embeddings to the database or just retrieve the data and run through the embeddings generation process.

task = orchestration.run_once(name = 'sample embedding generation',
                              json_args = {'connection_name': 'mongodb_atlas',
                                           'pipeline_name': 'byop-bge-pipe-base-v2',
                                           'workspace_name': 'embedding-computation',
                                           'write_db': True
                                          })
task
FieldValue
IDcd125107-7663-40a7-a1e2-b41025288559
Namesample embedding generation
Last Run Statusfailure
TypeTemporary Run
ActiveTrue
Schedule-
Created At2024-22-Jun 20:37:49
Updated At2024-22-Jun 20:37:55

The following generates the Run Scheduled version of the same task, set to execute every 12:01 AM. This allows for new embeddings as the database is updated.

task = orchestration.run_scheduled(name = 'sample embedding generation',
                               schedule = '1 0 * * *',
                              json_args = {'connection_name': 'mongodb_atlas',
                                           'pipeline_name': 'byop-bge-pipe-base-v2',
                                           'workspace_name': 'embedding-computation',
                                           'write_db': True
                                          })

With the tutorial complete, we undeploy the model and return the resources back to the cluster.

pipeline.undeploy()

For access to these sample models and for a demonstration of how to use a LLM Validation Listener.