.

.

Using Jupyter Notebooks in Production

How to go from Jupyter Notebooks to Production Systems

Using Jupyter Notebooks in Production

The following tutorials are available from the Wallaroo Tutorials Repository.

The following tutorials provide an example of an organization moving from experimentation to deployment in production using Jupyter Notebooks as the basis for code research and use. For this example, we can assume to main actors performing the following tasks.

NumberNotebook SampleTaskActorDescription
0101_explore_and_train.ipynbData Exploration and Model SelectionData ScientistThe data scientist evaluates the data and determines the best model to use to solve the proposed problems.
0202_automated_training_process.ipyndTraining Process Automation SetupData ScientistThe data scientist has selected the model and tested how to train it. In this phase, the data scientist tests automating the training process based on a data store.
0303_deploy_model.ipynbDeploy the Model in WallarooMLOps EngineerThe MLOps takes the trained model and deploys a Wallaroo pipeline with it to perform inferences on by feeding it data from a data store.
0404_regular_batch_inferences.ipynbRegular Batch InferenceMLOps EngineerWith the pipeline deployed, regular inferences can be made and the results reported to a data store.

Each Jupyter Notebook is arranged to demonstrate each step of the process.

Resources

The following resources are provided as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • preprocess.py: Formats the incoming data for the model.
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
    • wallaroo_client.py: Additional methods used with the Wallaroo instance to create workspaces, etc.

1 - Data Exploration And Model Selection

The following tutorials are available from the Wallaroo Tutorials Repository.

Stage 1: Data Exploration And Model Selection

When starting a project, the data scientist focuses on exploration and experimentation, rather than turning the process into an immediate production system. This notebook presents a simplified view of this stage.

Resources

The following resources are used as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • postprocess.py: Formats the data after inference by the model is complete.
    • preprocess.py: Formats the incoming data for the model.
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
    • wallaroo_client.py: Additional methods used with the Wallaroo instance to create workspaces, etc.

Steps

The following steps are part of this process:

Import Libraries

First we’ll import the libraries we’ll be using to evaluate the data and test different models.

import numpy as np
import pandas as pd

import sklearn
import sklearn.ensemble

import xgboost as xgb

import seaborn
import matplotlib
import matplotlib.pyplot as plt

import simdb # module for the purpose of this demo to simulate pulling data from a database

matplotlib.rcParams["figure.figsize"] = (12,6)

# ignoring warnings for demonstration
import warnings
warnings.filterwarnings('ignore')

Retrieve Training Data

For training, we will use the data on all houses sold in this market with the last two years. As a reminder, this data pulled from a simulated database as an example of how to pull from an existing data store.

Only a few columns will be shown for display purposes.

conn = simdb.simulate_db_connection()
tablename = simdb.tablename

query = f"select * from {tablename} where date > DATE(DATE(), '-24 month') AND sale_price is not NULL"
print(query)
# read in the data
housing_data = pd.read_sql_query(query, conn)

conn.close()
housing_data.loc[:, ["id", "date", "list_price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot"]]
select * from house_listings where date > DATE(DATE(), '-24 month') AND sale_price is not NULL
iddatelist_pricebedroomsbathroomssqft_livingsqft_lot
071293005202022-10-05221900.031.0011805650
164141001922022-12-01538000.032.2525707242
256315004002023-02-17180000.021.0077010000
324872008752022-12-01604000.043.0019605000
419544005102023-02-10510000.032.0016808080
........................
205182630000182022-05-13360000.032.5015301131
2051966000601202023-02-15400000.042.5023105813
2052015233001412022-06-15402101.020.7510201350
205212913101002023-01-08400000.032.5016002388
2052215233001572022-10-07325000.020.7510201076

20523 rows × 7 columns

Data transformations

To improve relative error performance, we will predict on log10 of the sale price.

Predict on log10 price to try to improve relative error performance

housing_data['logprice'] = np.log10(housing_data.sale_price)

From the data, we will create the following features to evaluate:

  • house_age: How old the house is.
  • renovated: Whether the house has been renovated or not.
  • yrs_since_reno: If the house has been renovated, how long has it been.
import datetime

thisyear = datetime.datetime.now().year

housing_data['house_age'] = thisyear - housing_data['yr_built']
housing_data['renovated'] =  np.where((housing_data['yr_renovated'] > 0), 1, 0) 
housing_data['yrs_since_reno'] =  np.where(housing_data['renovated'], housing_data['yr_renovated'] - housing_data['yr_built'], 0)

housing_data.loc[:, ['yr_built', 'yr_renovated', 'house_age', 'renovated', 'yrs_since_reno']]
yr_builtyr_renovatedhouse_agerenovatedyrs_since_reno
0195506800
11951199172140
2193309000
3196505800
4198703600
..................
20518200901400
2051920140900
20520200901400
20521200401900
20522200801500

20523 rows × 5 columns

Now we pick variables and split training data into training and holdout (test).

vars = ['bedrooms', 'bathrooms', 'sqft_living', 'sqft_lot', 'floors', 'waterfront', 'view',
'condition', 'grade', 'sqft_above', 'sqft_basement', 'lat', 'long', 'sqft_living15', 'sqft_lot15', 'house_age', 'renovated', 'yrs_since_reno']

outcome = 'logprice'

runif = np.random.default_rng(2206222).uniform(0, 1, housing_data.shape[0])
gp = np.where(runif < 0.2, 'test', 'training')

hd_train = housing_data.loc[gp=='training', :].reset_index(drop=True, inplace=False)
hd_test = housing_data.loc[gp=='test', :].reset_index(drop=True, inplace=False)

# split the training into training and val for xgboost
runif = np.random.default_rng(123).uniform(0, 1, hd_train.shape[0])
xgb_gp = np.where(runif < 0.2, 'val', 'train')
# for xgboost, further split into train and val
train_features = np.array(hd_train.loc[xgb_gp=='train', vars])
train_labels = np.array(hd_train.loc[xgb_gp=='train', outcome])

val_features = np.array(hd_train.loc[xgb_gp=='val', vars])
val_labels = np.array(hd_train.loc[xgb_gp=='val', outcome])

Postprocessing

Since we are fitting a model to predict log10 price, we need to convert predictions back into price units. We also want to round to the nearest dollar.

def postprocess(log10price):
    return np.rint(np.power(10, log10price))

Model testing

For the purposes of this demo, let’s say that we require a mean absolute percent error (MAPE) of 15% or less, and the we want to try a few models to decide which model we want to use.

One could also hyperparameter tune at this stage; for brevity, we’ll omit that in this demo.

XGBoost

First we will test out using a XGBoost model.


xgb_model = xgb.XGBRegressor(
    objective = 'reg:squarederror', 
    max_depth=5, 
    base_score = np.mean(hd_train[outcome])
    )

xgb_model.fit( 
    train_features,
    train_labels,
    eval_set=[(train_features, train_labels), (val_features, val_labels)],
    verbose=False,
    early_stopping_rounds=35
)
XGBRegressor(base_score=5.666446833601829, booster='gbtree', callbacks=None,
             colsample_bylevel=1, colsample_bynode=1, colsample_bytree=1,
             early_stopping_rounds=None, enable_categorical=False,
             eval_metric=None, gamma=0, gpu_id=-1, grow_policy='depthwise',
             importance_type=None, interaction_constraints='',
             learning_rate=0.300000012, max_bin=256, max_cat_to_onehot=4,
             max_delta_step=0, max_depth=5, max_leaves=0, min_child_weight=1,
             missing=nan, monotone_constraints='()', n_estimators=100, n_jobs=0,
             num_parallel_tree=1, predictor='auto', random_state=0, reg_alpha=0,
             reg_lambda=1, ...)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
XGBRegressor(base_score=5.666446833601829, booster='gbtree', callbacks=None,
         colsample_bylevel=1, colsample_bynode=1, colsample_bytree=1,
         early_stopping_rounds=None, enable_categorical=False,
         eval_metric=None, gamma=0, gpu_id=-1, grow_policy=&#x27;depthwise&#x27;,
         importance_type=None, interaction_constraints=&#x27;&#x27;,
         learning_rate=0.300000012, max_bin=256, max_cat_to_onehot=4,
         max_delta_step=0, max_depth=5, max_leaves=0, min_child_weight=1,
         missing=nan, monotone_constraints=&#x27;()&#x27;, n_estimators=100, n_jobs=0,
         num_parallel_tree=1, predictor=&#x27;auto&#x27;, random_state=0, reg_alpha=0,
         reg_lambda=1, ...)</pre>
print(xgb_model.best_score)
print(xgb_model.best_iteration)
print(xgb_model.best_ntree_limit)
0.07793614689092423
99
100

XGBoost Evaluate on holdout

With the sample model created, we will test it against the holdout data. Note that we are calling the postprocess function on the data.

test_features = np.array(hd_test.loc[:, vars])
test_labels = np.array(hd_test.loc[:, outcome])

pframe = pd.DataFrame({
    'pred' : postprocess(xgb_model.predict(test_features)),
    'actual' : postprocess(test_labels)
})

ax = seaborn.scatterplot(
    data=pframe,
    x='pred',
    y='actual',
    alpha=0.2
)
matplotlib.pyplot.plot(pframe.pred, pframe.pred, color='DarkGreen')
matplotlib.pyplot.title("test")
plt.show()
pframe['se'] = (pframe.pred - pframe.actual)**2

pframe['pct_err'] = 100*np.abs(pframe.pred - pframe.actual)/pframe.actual
pframe.describe()
predactualsepct_err
count4.094000e+034.094000e+034.094000e+034094.000000
mean5.340824e+055.396937e+051.657722e+1012.857674
std3.413714e+053.761666e+051.276017e+1113.512028
min1.216140e+058.200000e+041.000000e+000.000500
25%3.167628e+053.200000e+053.245312e+084.252492
50%4.568700e+054.500000e+051.602001e+099.101485
75%6.310372e+056.355250e+056.575385e+0917.041227
max5.126706e+067.700000e+066.637466e+12252.097895
rmse = np.sqrt(np.mean(pframe.se))
mape = np.mean(pframe.pct_err)

print(f'rmse = {rmse}, mape = {mape}')
rmse = 128752.54982046234, mape = 12.857674005250548

Random Forest

The next model to test is Random Forest.

model_rf = sklearn.ensemble.RandomForestRegressor(n_estimators=100, max_depth=5, n_jobs=2, max_samples=0.8)

train_features = np.array(hd_train.loc[:, vars])
train_labels = np.array(hd_train.loc[:, outcome])

model_rf.fit(train_features, train_labels)
RandomForestRegressor(max_depth=5, max_samples=0.8, n_jobs=2)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
RandomForestRegressor(max_depth=5, max_samples=0.8, n_jobs=2)

Random Forest Evaluate on holdout

With the Random Forest sample model created, now we can test it against the holdout data.

pframe = pd.DataFrame({
    'pred' : postprocess(model_rf.predict(test_features)),
    'actual' : postprocess(test_labels)
})

ax = seaborn.scatterplot(
    data=pframe,
    x='pred',
    y='actual',
    alpha=0.2
)
matplotlib.pyplot.plot(pframe.pred, pframe.pred, color='DarkGreen')
matplotlib.pyplot.title("random forest")
plt.show()
pframe['se'] = (pframe.pred - pframe.actual)**2

pframe['pct_err'] = 100*np.abs(pframe.pred - pframe.actual)/pframe.actual
pframe.describe()
predactualsepct_err
count4.094000e+034.094000e+034.094000e+034094.000000
mean5.194535e+055.396937e+053.875433e+1018.188652
std2.797001e+053.761666e+054.054895e+1117.634478
min2.039200e+058.200000e+041.444000e+030.014729
25%3.291252e+053.200000e+056.686879e+086.156760
50%4.621880e+054.500000e+053.321332e+0913.148593
75%5.851052e+056.355250e+051.367023e+1024.630187
max2.888692e+067.700000e+062.314868e+13175.444819
rmse = np.sqrt(np.mean(pframe.se))
mape = np.mean(pframe.pct_err)

print(f'rmse = {rmse}, mape = {mape}')
rmse = 196861.19318381665, mape = 18.188652142429135

Final Decision

At this stage, we decide to go with the xgboost model, with the variables/settings above.

With this stage complete, we can move on to Stage 2: Training Process Automation Setup.

2 - From Jupyter to Production

How to go from Jupyter Notebooks to Production Systems

The following tutorials are available from the Wallaroo Tutorials Repository.

Stage 2: Training Process Automation Setup

Now that we have decided on the type and structure of the model from Stage 1: Data Exploration And Model Selection, this notebook modularizes the various steps of the process in a structure that is compatible with production and with Wallaroo.

We have pulled the preprocessing and postprocessing steps out of the training notebook into individual scripts that can also be used when the model is deployed.

Assuming no changes are made to the structure of the model, this notebook, or a script based on this notebook, can then be scheduled to run on a regular basis, to refresh the model with more recent training data. We’d expect to run this notebook in conjunction with the Stage 3 notebook, 03_deploy_model.ipynb. For clarity in this demo, we have split the training/upload task into two notebooks, 02_automated_training_process.ipynb and 03_deploy_model.ipynb.

Resources

The following resources are used as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • postprocess.py: Formats the data after inference by the model is complete.
    • preprocess.py: Formats the incoming data for the model.
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
    • wallaroo_client.py: Additional methods used with the Wallaroo instance to create workspaces, etc.

Steps

The following steps are part of this process:

Retrieve Training Data

Note that this connection is simulated to demonstrate how data would be retrieved from an existing data store. For training, we will use the data on all houses sold in this market with the last two years.

import numpy as np
import pandas as pd

import sklearn

import xgboost as xgb

import seaborn
import matplotlib
import matplotlib.pyplot as plt

import pickle

import simdb # module for the purpose of this demo to simulate pulling data from a database

from preprocess import create_features  # our custom preprocessing
from postprocess import postprocess    # our custom postprocessing

matplotlib.rcParams["figure.figsize"] = (12,6)

# ignoring warnings for demonstration
import warnings
warnings.filterwarnings('ignore')
conn = simdb.simulate_db_connection()
tablename = simdb.tablename

query = f"select * from {tablename} where date > DATE(DATE(), '-24 month') AND sale_price is not NULL"
print(query)
# read in the data
housing_data = pd.read_sql_query(query, conn)

conn.close()
housing_data.loc[:, ["id", "date", "list_price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot"]]
select * from house_listings where date > DATE(DATE(), '-24 month') AND sale_price is not NULL
iddatelist_pricebedroomsbathroomssqft_livingsqft_lot
071293005202022-10-05221900.031.0011805650
164141001922022-12-01538000.032.2525707242
256315004002023-02-17180000.021.0077010000
324872008752022-12-01604000.043.0019605000
419544005102023-02-10510000.032.0016808080
........................
205182630000182022-05-13360000.032.5015301131
2051966000601202023-02-15400000.042.5023105813
2052015233001412022-06-15402101.020.7510201350
205212913101002023-01-08400000.032.5016002388
2052215233001572022-10-07325000.020.7510201076

20523 rows × 7 columns

Data transformations

To improve relative error performance, we will predict on log10 of the sale price.

Predict on log10 price to try to improve relative error performance

housing_data['logprice'] = np.log10(housing_data.list_price)
# split data into training and test
outcome = 'logprice'

runif = np.random.default_rng(2206222).uniform(0, 1, housing_data.shape[0])
gp = np.where(runif < 0.2, 'test', 'training')

hd_train = housing_data.loc[gp=='training', :].reset_index(drop=True, inplace=False)
hd_test = housing_data.loc[gp=='test', :].reset_index(drop=True, inplace=False)

# split the training into training and val for xgboost
runif = np.random.default_rng(123).uniform(0, 1, hd_train.shape[0])
xgb_gp = np.where(runif < 0.2, 'val', 'train')
# for xgboost
train_features = hd_train.loc[xgb_gp=='train', :].reset_index(drop=True, inplace=False)
train_features = np.array(create_features(train_features))
train_labels = np.array(hd_train.loc[xgb_gp=='train', outcome])

val_features = hd_train.loc[xgb_gp=='val', :].reset_index(drop=True, inplace=False)
val_features = np.array(create_features(val_features))
val_labels = np.array(hd_train.loc[xgb_gp=='val', outcome])

print(f'train_features: {train_features.shape}, train_labels: {len(train_labels)}')
print(f'val_features: {val_features.shape}, val_labels: {len(val_labels)}')
train_features: (13129, 18), train_labels: 13129
val_features: (3300, 18), val_labels: 3300

Generate and Test the Model

Based on the experimentation and testing performed in Stage 1: Data Exploration And Model Selection, XGBoost was selected as the ML model and the variables for training were selected. The model will be generated and tested against sample data.


xgb_model = xgb.XGBRegressor(
    objective = 'reg:squarederror', 
    max_depth=5, 
    base_score = np.mean(hd_train[outcome])
    )

xgb_model.fit( 
    train_features,
    train_labels,
    eval_set=[(train_features, train_labels), (val_features, val_labels)],
    verbose=False,
    early_stopping_rounds=35
)
XGBRegressor(base_score=5.666446833601829, booster='gbtree', callbacks=None,
             colsample_bylevel=1, colsample_bynode=1, colsample_bytree=1,
             early_stopping_rounds=None, enable_categorical=False,
             eval_metric=None, gamma=0, gpu_id=-1, grow_policy='depthwise',
             importance_type=None, interaction_constraints='',
             learning_rate=0.300000012, max_bin=256, max_cat_to_onehot=4,
             max_delta_step=0, max_depth=5, max_leaves=0, min_child_weight=1,
             missing=nan, monotone_constraints='()', n_estimators=100, n_jobs=0,
             num_parallel_tree=1, predictor='auto', random_state=0, reg_alpha=0,
             reg_lambda=1, ...)
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
XGBRegressor(base_score=5.666446833601829, booster='gbtree', callbacks=None,
         colsample_bylevel=1, colsample_bynode=1, colsample_bytree=1,
         early_stopping_rounds=None, enable_categorical=False,
         eval_metric=None, gamma=0, gpu_id=-1, grow_policy=&#x27;depthwise&#x27;,
         importance_type=None, interaction_constraints=&#x27;&#x27;,
         learning_rate=0.300000012, max_bin=256, max_cat_to_onehot=4,
         max_delta_step=0, max_depth=5, max_leaves=0, min_child_weight=1,
         missing=nan, monotone_constraints=&#x27;()&#x27;, n_estimators=100, n_jobs=0,
         num_parallel_tree=1, predictor=&#x27;auto&#x27;, random_state=0, reg_alpha=0,
         reg_lambda=1, ...)</pre>
print(xgb_model.best_score)
print(xgb_model.best_iteration)
print(xgb_model.best_ntree_limit)
0.07793614689092423
99
100
test_features = np.array(create_features(hd_test.copy()))
test_labels = np.array(hd_test.loc[:, outcome])

pframe = pd.DataFrame({
    'pred' : postprocess(xgb_model.predict(test_features)),
    'actual' : postprocess(test_labels)
})

ax = seaborn.scatterplot(
    data=pframe,
    x='pred',
    y='actual',
    alpha=0.2
)
matplotlib.pyplot.plot(pframe.pred, pframe.pred, color='DarkGreen')
matplotlib.pyplot.title("test")
plt.show()
pframe['se'] = (pframe.pred - pframe.actual)**2

pframe['pct_err'] = 100*np.abs(pframe.pred - pframe.actual)/pframe.actual
pframe.describe()
predactualsepct_err
count4.094000e+034.094000e+034.094000e+034094.000000
mean5.340824e+055.396937e+051.657722e+1012.857674
std3.413714e+053.761666e+051.276017e+1113.512028
min1.216140e+058.200000e+041.000000e+000.000500
25%3.167628e+053.200000e+053.245312e+084.252492
50%4.568700e+054.500000e+051.602001e+099.101485
75%6.310372e+056.355250e+056.575385e+0917.041227
max5.126706e+067.700000e+066.637466e+12252.097895
rmse = np.sqrt(np.mean(pframe.se))
mape = np.mean(pframe.pct_err)

print(f'rmse = {rmse}, mape = {mape}')
rmse = 128752.54982046234, mape = 12.857674005250548

Convert the Model to Onnx

This step converts the model to onnx for easy import into Wallaroo.

import onnx
from onnxmltools.convert import convert_xgboost

from skl2onnx.common.data_types import FloatTensorType, DoubleTensorType

import preprocess

# set the number of columns
ncols = len(preprocess._vars)

# derive the opset value

from onnx.defs import onnx_opset_version
from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER
TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version())
# Convert the model to onnx

onnx_model_converted = convert_xgboost(xgb_model, 'tree-based classifier',
                             [('input', FloatTensorType([None, ncols]))],
                             target_opset=TARGET_OPSET)

# Save the model

onnx.save_model(onnx_model_converted, "housing_model_xgb.onnx")

With the model trained and ready, we can now go to Stage 3: Deploy the Model in Wallaroo.

3 - Deploy the Model in Wallaroo

The following tutorials are available from the Wallaroo Tutorials Repository.

Stage 3: Deploy the Model in Wallaroo

In this stage, we upload the trained model and the processing steps to Wallaroo, then set up and deploy the inference pipeline.

Once deployed we can feed the newest batch of data to the pipeline, do the inferences and write the results to a results table.

For clarity in this demo, we have split the training/upload task into two notebooks:

  • 02_automated_training_process.ipynb: Train and pickle ML model.
  • 03_deploy_model.ipynb: Upload the model to Wallaroo and deploy into a pipeline.

Assuming no changes are made to the structure of the model, these two notebooks, or a script based on them, can then be scheduled to run on a regular basis, to refresh the model with more recent training data and update the inference pipeline.

This notebook is expected to run within the Wallaroo instance’s Jupyter Hub service to provide access to all required Wallaroo libraries and functionality.

Resources

The following resources are used as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • postprocess.py: Formats the data after inference by the model is complete.
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
    • wallaroo_client.py: Additional methods used with the Wallaroo instance to create workspaces, etc.
  • models
    • housing_model_xgb.onnx: Model created in Stage 2: Training Process Automation Setup.

Steps

The process of uploading the model to Wallaroo follows these steps:

Connect to Wallaroo

First we import the required libraries to connect to the Wallaroo instance, then connect to the Wallaroo instance.

import json
import pickle
import pandas as pd
import numpy as np
import pyarrow as pa

import simdb # module for the purpose of this demo to simulate pulling data from a database

# from wallaroo.ModelConversion import ConvertXGBoostArgs, ModelConversionSource, ModelConversionInputType
import wallaroo
from wallaroo.object import EntityNotFoundError

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

import datetime

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline
workspace_name = 'housepricing'
model_name = "housepricemodel"
model_file = "./housing_model_xgb.onnx"
pipeline_name = "housing-pipe"

The workspace housepricing will either be created or, if already existing, used and set to the current workspace.

new_workspace = get_workspace(workspace_name)
new_workspace
{'name': 'housepricing', 'id': 16, 'archived': False, 'created_by': 'aa707604-ec80-495a-a9a1-87774c8086d5', 'created_at': '2023-09-12T17:35:46.994384+00:00', 'models': [{'name': 'housepricemodel', 'versions': 1, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 9, 12, 17, 35, 49, 181499, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 9, 12, 17, 35, 49, 181499, tzinfo=tzutc())}, {'name': 'preprocess', 'versions': 1, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 9, 12, 17, 35, 50, 150472, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 9, 12, 17, 35, 50, 150472, tzinfo=tzutc())}, {'name': 'postprocess', 'versions': 1, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 9, 12, 17, 35, 51, 80789, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 9, 12, 17, 35, 51, 80789, tzinfo=tzutc())}], 'pipelines': [{'name': 'housing-pipe', 'create_time': datetime.datetime(2023, 9, 12, 17, 35, 52, 273091, tzinfo=tzutc()), 'definition': '[]'}]}
_ = wl.set_current_workspace(new_workspace)

Upload The Model

With the connection set and workspace prepared, upload the model created in 02_automated_training_process.ipynb into the current workspace.

hpmodel = wl.upload_model(model_name, model_file, framework=wallaroo.framework.Framework.ONNX).configure()

Upload the Processing Modules

Upload the postprocess.py modules as models to be added to the pipeline.

# load the postprocess module

preprocess_input_schema = pa.schema([
    pa.field('id', pa.int64()),
    pa.field('date', pa.string()),
    pa.field('list_price', pa.float64()),
    pa.field('bedrooms', pa.int64()),
    pa.field('bathrooms', pa.float64()),
    pa.field('sqft_living', pa.int64()),
    pa.field('sqft_lot', pa.int64()),
    pa.field('floors', pa.float64()),
    pa.field('waterfront', pa.int64()),
    pa.field('view', pa.int64()),
    pa.field('condition', pa.int64()),
    pa.field('grade', pa.int64()),
    pa.field('sqft_above', pa.int64()),
    pa.field('sqft_basement', pa.int64()),
    pa.field('yr_built', pa.int64()),
    pa.field('yr_renovated', pa.int64()),
    pa.field('zipcode', pa.int64()),
    pa.field('lat', pa.float64()),
    pa.field('long', pa.float64()),
    pa.field('sqft_living15', pa.int64()),
    pa.field('sqft_lot15', pa.int64()),
    pa.field('sale_price', pa.float64())
])

preprocess_output_schema = pa.schema([
    pa.field('tensor', pa.list_(pa.float32()))
])

module_pre = (wl.upload_model("preprocess", 
                              "./preprocess.py", 
                              framework=wallaroo.framework.Framework.PYTHON)
                              .configure('python',
                                         input_schema=preprocess_input_schema,
                                         output_schema=preprocess_output_schema)
                )
# load the postprocess module

input_schema = pa.schema([
    pa.field('variable', pa.list_(pa.float64()))
])

output_schema = pa.schema([
    pa.field('variable', pa.list_(pa.float64()))
])

module_post = (wl.upload_model("postprocess", 
                              "./postprocess.py", 
                              framework=wallaroo.framework.Framework.PYTHON)
                              .configure('python',
                                         input_schema=input_schema,
                                         output_schema=output_schema)
                )

Create and Deploy the Pipeline

Create the pipeline with the preprocess module, housing model, and postprocess module as pipeline steps, then deploy the newpipeline.

pipeline = get_pipeline(pipeline_name)
# clear if the tutorial was run before
pipeline.clear()

pipeline.add_model_step(module_pre)
pipeline.add_model_step(hpmodel)
pipeline.add_model_step(module_post)

pipeline.deploy()
namehousing-pipe
created2023-09-12 17:35:52.273091+00:00
last_updated2023-09-12 17:40:44.630596+00:00
deployedTrue
tags
versions05d941bb-6547-4608-be5d-4515388d205c, d957ce8d-9d70-477e-bc03-d58b70cd047a, ba8a411e-9318-4ba5-95f5-22c22be8c064, ab42a8de-3551-4551-bc36-9a71d323f81c
stepspreprocess
publishedFalse

Test the Pipeline

We will use a single query from the simulated housing_price table and infer. When successful, we will undeploy the pipeline to restore the resources back to the Kubernetes environment.

conn = simdb.simulate_db_connection()

# create the query
query = f"select * from {simdb.tablename} limit 1"
print(query)

# read in the data
singleton = pd.read_sql_query(query, conn)
conn.close()

display(singleton.loc[:, ["id", "date", "list_price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot"]])
select * from house_listings limit 1
iddatelist_pricebedroomsbathroomssqft_livingsqft_lot
071293005202023-01-29221900.031.011805650
result = pipeline.infer(singleton)
display(result.loc[:, ['time', 'out.variable']])
timeout.variable
02023-09-12 17:41:00.319[224852.0]

When finished, we undeploy the pipeline to return the resources back to the environment.

pipeline.undeploy()
namehousing-pipe
created2023-09-12 17:35:52.273091+00:00
last_updated2023-09-12 17:40:44.630596+00:00
deployedFalse
tags
versions05d941bb-6547-4608-be5d-4515388d205c, d957ce8d-9d70-477e-bc03-d58b70cd047a, ba8a411e-9318-4ba5-95f5-22c22be8c064, ab42a8de-3551-4551-bc36-9a71d323f81c
stepspreprocess
publishedFalse

With this stage complete, we can proceed to Stage 4: Regular Batch Inference.

4 - Regular Batch Inference

The following tutorials are available from the Wallaroo Tutorials Repository.

Stage 4: Regular Batch Inference

In Stage 3: Deploy the Model in Wallaroo, the housing model created and tested in Stage 2: Training Process Automation Setup was uploaded to a Wallaroo instance and added to the pipeline housing-pipe in the workspace housepricing. This pipeline can be deployed at any point and time and used with new inferences.

For the purposes of this demo, let’s say that every month we find the newly entered and still-unsold houses and predict their sale price.

The predictions are entered into a staging table, for further inspection before being joined to the primary housing data table.

We show this as a notebook, but this can also be scripted and scheduled, using CRON or some other process.

Resources

The following resources are used as part of this tutorial:

  • data
    • data/seattle_housing_col_description.txt: Describes the columns used as part data analysis.
    • data/seattle_housing.csv: Sample data of the Seattle, Washington housing market between 2014 and 2015.
  • code
    • postprocess.py: Formats the data after inference by the model is complete.
    • simdb.py: A simulated database to demonstrate sending and receiving queries.
    • wallaroo_client.py: Additional methods used with the Wallaroo instance to create workspaces, etc.
  • models
    • housing_model_xgb.onnx: Model created in Stage 2: Training Process Automation Setup.

Steps

This process will use the following steps:

Connect to Wallaroo

Connect to the Wallaroo instance and set the housepricing workspace as the current workspace.

import json
import pickle
import wallaroo
import pandas as pd
import numpy as np
import pyarrow as pa
import datetime

import simdb # module for the purpose of this demo to simulate pulling data from a database

from wallaroo_client import get_workspace

# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline
workspace_name = 'housepricing'
model_name = "housepricemodel"
model_file = "./housing_model_xgb.onnx"
pipeline_name = "housing-pipe"
new_workspace = get_workspace(workspace_name)
_ = wl.set_current_workspace(new_workspace)

Deploy the Pipeline

Deploy the housing-pipe workspace established in Stage 3: Deploy the Model in Wallaroo (03_deploy_model.ipynb).

pipeline = get_pipeline(pipeline_name)
pipeline.deploy()
namehousing-pipe
created2023-09-12 17:35:52.273091+00:00
last_updated2023-09-12 17:37:27.074611+00:00
deployedTrue
tags
versionsd957ce8d-9d70-477e-bc03-d58b70cd047a, ba8a411e-9318-4ba5-95f5-22c22be8c064, ab42a8de-3551-4551-bc36-9a71d323f81c
stepspreprocess
publishedFalse

Read In New House Listings

From the data store, load the previous month’s house listing, prepare it as a DataFrame, then submit it for inferencing.

conn = simdb.simulate_db_connection()

# create the query
query = f"select * from {simdb.tablename} where date > DATE(DATE(), '-1 month') AND sale_price is NULL"
print(query)

# read in the data
# can't have null values - turn them into 0
newbatch = pd.read_sql_query(query, conn)
newbatch['sale_price'] = newbatch.sale_price.apply(lambda x: 0)
display(newbatch.shape)
display(newbatch.head(10).loc[:, ["id", "date", "list_price", "bedrooms", "bathrooms", "sqft_living", "sqft_lot"]])
select * from house_listings where date > DATE(DATE(), '-1 month') AND sale_price is NULL

(1090, 22)

iddatelist_pricebedroomsbathroomssqft_livingsqft_lot
092154001052023-08-14450000.031.7512505963
116959000602023-08-27535000.041.0016102982
295452400702023-08-14660500.042.2520109603
314329002402023-08-24205000.031.0016108579
461316000752023-08-13225000.031.0013008316
514003000552023-08-14425000.021.007705040
679609000602023-08-202900000.043.25505020100
763785001252023-08-17436000.021.0010407538
820220692002023-08-21455000.042.50221049375
994129000552023-08-21405000.031.7523906000
# query = {'query': newbatch.to_json()}

result = pipeline.infer(newbatch)
# display(result)
predicted_prices = pd.DataFrame(result['out.variable'].apply(lambda x: x[0])).rename(columns={'out.variable':'prediction'})
display(predicted_prices[0:5])
prediction
0508255.0
1500198.0
2539598.0
3270739.0
4191304.0

Send Predictions to Results Staging Table

Take the predicted prices based on the inference results so they can be joined into the house_listings table.

Once complete, undeploy the pipeline to return the resources back to the Kubernetes environment.

result_table = pd.DataFrame({
    'id': newbatch['id'],
    'saleprice_estimate': predicted_prices['prediction']
})

display(result_table)

result_table.to_sql('results_table', conn, index=False, if_exists='append')
idsaleprice_estimate
09215400105508255.0
11695900060500198.0
29545240070539598.0
31432900240270739.0
46131600075191304.0
.........
10853304300300577492.0
10866453550090882930.0
10871760650820271484.0
10883345700207537434.0
10897853420110634226.0

1090 rows × 2 columns

# Display the top of the table for confirmation
pd.read_sql_query("select * from results_table limit 5", conn)
idsaleprice_estimate
09215400105508255.0
11695900060500198.0
29545240070539598.0
31432900240270739.0
46131600075191304.0
conn.close()
pipeline.undeploy()
namehousing-pipe
created2023-09-12 17:35:52.273091+00:00
last_updated2023-09-12 17:37:27.074611+00:00
deployedFalse
tags
versionsd957ce8d-9d70-477e-bc03-d58b70cd047a, ba8a411e-9318-4ba5-95f5-22c22be8c064, ab42a8de-3551-4551-bc36-9a71d323f81c
stepspreprocess
publishedFalse

From here, organizations can automate this process. Other features could be used such as data analysis using Wallaroo assays, or other features such as shadow deployments to test champion and challenger models to find which models provide the best results.