Regular Batch Inference

The following tutorials are available from the Wallaroo Tutorials Repository.

Stage 4: Regular Batch Inference

In Stage 3: Deploy the Model in Wallaroo, the housing model created and tested in Stage 2: Training Process Automation Setup was uploaded to a Wallaroo instance and added to the pipeline housing-pipe in the workspace housepricing. This pipeline can be deployed at any point and time and used with new inferences.

For the purposes of this demo, let’s say that every month we find the newly entered and still-unsold houses and predict their sale price.

The predictions are entered into a staging table, for further inspection before being joined to the primary housing data table.

We show this as a notebook, but this can also be scripted and scheduled, using CRON or some other process.

Resources

The following resources are used as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
  • models
    • housing_model_xgb.onnx: Model created in Stage 2: Training Process Automation Setup.
    • ./models/preprocess_byop.zip.: Formats the incoming data for the model.
    • ./models/postprocess_byop.zip: Formats the outgoing data for the model.

Steps

This process will use the following steps:

Connect to Wallaroo

Connect to the Wallaroo instance and set the housepricing workspace as the current workspace.

import json
import pickle
import wallaroo
import pandas as pd
import numpy as np
import pyarrow as pa
import datetime

import simdb # module for the purpose of this demo to simulate pulling data from a database

from wallaroo_client import get_workspace

# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()
def get_workspace(name, client):
    workspace = None
    for ws in client.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = client.create_workspace(name)
    return workspace
workspace_name = 'housepricing'
model_name = "housepricemodel"
model_file = "./housing_model_xgb.onnx"
pipeline_name = "housing-pipe"
new_workspace = get_workspace(workspace_name, wl)
_ = wl.set_current_workspace(new_workspace)

Deploy the Pipeline

Deploy the housing-pipe workspace established in Stage 3: Deploy the Model in Wallaroo (03_deploy_model.ipynb).

pipeline = new_workspace.pipelines()[0]
pipeline
namehousing-pipe
created2024-03-13 18:19:37.008014+00:00
last_updated2024-03-13 18:19:37.077324+00:00
deployedFalse
archNone
tags
versionsbc52c554-6b82-4e3c-a3d1-f6402ee388b0, 6348f7aa-89ba-4051-ad91-1398e5cdd519
stepspreprocess-byop
publishedFalse
deploy_config = wallaroo.DeploymentConfigBuilder().replica_count(1).cpus(0.5).memory("1Gi").build()
pipeline.deploy(deployment_config=deploy_config)
Waiting for deployment - this will take up to 45s ................. ok
namehousing-pipe
created2024-03-13 18:19:37.008014+00:00
last_updated2024-03-13 19:14:39.155596+00:00
deployedTrue
archNone
tags
versions0d8029ac-dcf0-4920-b74e-5ba0d31eab13, bc52c554-6b82-4e3c-a3d1-f6402ee388b0, 6348f7aa-89ba-4051-ad91-1398e5cdd519
stepspreprocess-byop
publishedFalse

Read In New House Listings

From the data store, load the previous month’s house listing, prepare it as a DataFrame, then submit it for inferencing.

conn = simdb.simulate_db_connection()

# create the query
query = f"select * from {simdb.tablename} where date > DATE(DATE(), '-1 month') AND sale_price is NULL"
print(query)

# read in the data
# can't have null values - turn them into 0
newbatch = pd.read_sql_query(query, conn)
newbatch['sale_price'] = newbatch.sale_price.apply(lambda x: 0)
display(newbatch.shape)
display(newbatch.head(10).loc[:, ["id", "date", "list_price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot"]])
select * from house_listings where date > DATE(DATE(), '-1 month') AND sale_price is NULL

(843, 22)

iddatelist_pricebedroomsbathroomssqft_livingsqft_lot
016959000602024-02-26535000.041.0016102982
114329002402024-02-23205000.031.0016108579
279609000602024-02-192900000.043.25505020100
363785001252024-02-16436000.021.0010407538
420220692002024-02-20455000.042.50221049375
594129000552024-02-20405000.031.7523906000
674247000452024-02-282050000.053.0038308480
734220592082024-02-26390000.032.50193064904
842682000552024-02-16245000.031.75174011547
928832001602024-02-14595000.042.0020202849
# query = {'query': newbatch.to_json()}

result = pipeline.infer(newbatch)
# display(result)
predicted_prices = pd.DataFrame(result['out.variable'].apply(lambda x: x[0])).rename(columns={'out.variable':'prediction'})
display(predicted_prices[0:5])
prediction
0500198.0
1270739.0
23067264.0
3378917.0
4496464.0

Send Predictions to Results Staging Table

Take the predicted prices based on the inference results so they can be joined into the house_listings table.

Once complete, undeploy the pipeline to return the resources back to the Kubernetes environment.

result_table = pd.DataFrame({
    'id': newbatch['id'],
    'saleprice_estimate': predicted_prices['prediction']
})

display(result_table)

result_table.to_sql('results_table', conn, index=False, if_exists='append')
idsaleprice_estimate
01695900060500198.0
11432900240270739.0
279609000603067264.0
36378500125378917.0
42022069200496464.0
.........
8386140100028365689.0
8393304300300577492.0
8406453550090882930.0
8413345700207537434.0
8427853420110634226.0

843 rows × 2 columns

# Display the top of the table for confirmation
pd.read_sql_query("select * from results_table limit 5", conn)
idsaleprice_estimate
01695900060500198.0
11432900240270739.0
279609000603067264.0
36378500125378917.0
42022069200496464.0
conn.close()
pipeline.undeploy()
Waiting for undeployment - this will take up to 45s .................................... ok
namehousing-pipe
created2024-03-13 18:19:37.008014+00:00
last_updated2024-03-13 19:14:39.155596+00:00
deployedFalse
archNone
tags
versions0d8029ac-dcf0-4920-b74e-5ba0d31eab13, bc52c554-6b82-4e3c-a3d1-f6402ee388b0, 6348f7aa-89ba-4051-ad91-1398e5cdd519
stepspreprocess-byop
publishedFalse

From here, organizations can automate this process. Other features could be used such as data analysis using Wallaroo assays, or other features such as shadow deployments to test champion and challenger models to find which models provide the best results.