.

.

Forecasting: Retail

Wallaroo Use Case Tutorials focused on solving Retail problems with Forecasting models

1 - Retail: Upload and Deploy

How to upload and deploy a Retail Forecasting model to Wallaroo.

Statsmodel Forecast Tutorial Notebook 1: Build and Deploy a Model

For this tutorial, let’s pretend that you work for a bike rental company. You have developed a model to predict the number of rentals for the week following a given date, based on data collected in the company’s rental history database.

In this set of exercises, you will build a model to predict house sale prices, and deploy it to Wallaroo.

Before we start, let’s load some libraries that we will need for this notebook (note that this may not be a complete list).

  • IMPORTANT NOTE: This tutorial is geared towards a Wallaroo 2023.2.1 environment.
# preload needed libraries 

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

from IPython.display import display

# used to display DataFrame information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

import json

import datetime
import time

# used for unique connection names

import string
import random

import pyarrow as pa

Exercise: Build a model

Use the house price data seattle_housing.csv in the data subdirectory to build a model to predict the sales price of homes based on the features in the data set.

At the end of the exercise, you should have a notebook and possibly other artifacts to produce a model for predicting house prices. For the purposes of the exercise, please use a framework that can be converted to ONNX, such as scikit-learn or XGBoost.

For assistance converting a model to ONNX, see the Wallaroo Model Conversion Tutorials for some examples.

NOTE

If you prefer to shortcut this step, you can use one of the pre-trained model pickle files in the models subdirectory.

## Blank space for training model, if needed

Getting Ready to deploy

Wallaroo natively supports models in the ONNX, Python based models, Tensorflow frameworks, and other frameworks via containerization. For this exercise, we assume that you have a model that can be converted to the ONNX framework. The first steps to deploying in Wallaroo, then, is to convert your model to ONNX, and to add some extra functions to your processing modules so Wallaroo can call them.


Exercise: Convert your Model to ONNX

Take the model that you created in the previous exercises, and convert it to ONNX supported by Wallaroo. If you need help, see the Wallaroo ONNX conversion tips. The model can also be deployed to Wallaroo if is supported by Wallaroo. See the Wallaroo SDK Essentials Guide: Model Uploads and Registrations

At the end of this exercise, you should have your model as a standalone artifact, for example, a file called model.onnx.

NOTE

If you prefer to shortcut this exercise, you can use one of the Python models in the models directory.

# Blank space to load for converting model, if needed

Get ready to work with Wallaroo

Now that you have a model ready to go, you can log into Wallaroo and set up a workspace to organize your deployment artifacts. A Wallaroo workspace is place to organize the deployment artifacts for a project, and to collaborate with other team members. For more information, see the Wallaroo 101.

Logging into Wallaroo via the cluster’s integrated JupyterLab is quite straightfoward:

# Login through local Wallaroo instance 
wl = wallaroo.Client()

See the documentation if you are logging into Wallaroo some other way.

Once you are logged in, you can create a workspace and set it as your working environment. To make the first exercise easier, here is a convenience function to get or create a workspace:

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

Then logging in and creating a workspace looks something like this:

# Login through local Wallaroo instance 
wl = wallaroo.Client()

Setting up the workspace may resemble this. Verify that the workspace name is unique across the Wallaroo instance.

# workspace names need to be globally unique, so add a random suffix to insure this
# especially important if the "main" workspace name is potentially a common one

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = "my-workspace"+suffix

workspace = get_workspace(workspace_name)

# set your current workspace to the workspace that you just created
wl.set_current_workspace(workspace)

# optionally, examine your current workspace
wl.get_current_workspace()

Exercise: Log in and create a workspace

Log into wallaroo, and create a workspace for this tutorial. Then set that new workspace to your current workspace.
Make sure you remember the name that you gave the workspace, as you will need it for later notebooks. Set that workspace to be your working environment.

Notes

  • Workspace names must be globally unique, so don’t pick something too common. The “random suffix” trick in the code snippet is one way to try to generate a unique workspace name, if you suspect you are using a common name.

At the end of the exercise, you should be in a new workspace to do further work.

## Blank spot to log in

wl = wallaroo.Client()
## Blank spot to connect to the workspace

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'forecast-model-tutorial'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
{'name': 'forecast-model-tutorialjohn', 'id': 16, 'archived': False, 'created_by': '0a36fba2-ad42-441b-9a8c-bac8c68d13fa', 'created_at': '2023-08-02T15:50:52.816795+00:00', 'models': [{'name': 'forecast-control-model', 'versions': 2, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 2, 18, 16, 45, 620061, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 54, 223186, tzinfo=tzutc())}, {'name': 'forecast-challenger01-model', 'versions': 2, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 2, 18, 16, 46, 633644, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 55, 208179, tzinfo=tzutc())}, {'name': 'forecast-challenger02-model', 'versions': 2, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 2, 18, 16, 47, 740983, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 56, 291043, tzinfo=tzutc())}], 'pipelines': [{'name': 'forecast-tutorial-pipeline', 'create_time': datetime.datetime(2023, 8, 2, 15, 50, 59, 480547, tzinfo=tzutc()), 'definition': '[]'}]}

Deploy a Simple Single-Step Pipeline

Once your model is in the ONNX format, and you have a workspace to work in, you can easily upload your model to Wallaroo’s production platform with just a few lines of code. For example, if you have a model called model.onnx, and you wish to upload it to Wallaroo with the name mymodel, then upload the model as follows (once you are in the appropriate workspace):


my_model = wl.upload_model("mymodel", "model.onnx", framework=Framework.ONNX).configure()

See Wallaroo SDK Essentials Guide: Model Uploads and Registrations: ONNX for full details.

The function upload_model() returns a handle to the uploaded model that you will continue to work with in the SDK.

Upload Python Models

If you choose to use one of our sample models, our forecast example uses a Python library ARIMA statsmodel. The models are all available in the ./models directory.

To upload a Python model to Wallaroo, the following is required:

  • The name of the model: What to designate the model name once uploaded to Wallaroo.
  • Path of the Python script: This specifies the file location. For example: ./models/my-python-script.py
  • The input and output schemas: These inform Wallaroo how inference request data will be formatted, and the shape of the data going out from the model. The schema is in Apache Arrow schema format, aka pyarrow.lib.Schema.. These are required to be in the For example, if the inference inputs are a pandas DataFrame with the following shape:
 tensor
0[15.5, 17.2, 35.724, 0.37894 ]

With the following output:

[
“forecast”: [235, 135, 175],
“average_forecast”: [181.66]
]

In this case, the input schema is represented as:

import pyarrow as py

input_schema = pa.schema([
    pa.field('tensor', pa.list_(pa.float64()))
])

output_schema = pa.schema([
    pa.field('forecast', pa.list_(pa.int64())),
    pa.field('average_forecast', pa.list_(pa.float64()))
])

Python models are uploaded to Wallaroo with the upload_model method, with the additional configure method specifying the python framework with the inputs and outputs. For example:

sample_model = (wl.upload_model(control_model_name, 
                                 control_model_file, 
                                 framework=Framework.PYTHON)
                                 .configure("python", 
                                 input_schema=input_schema, 
                                 output_schema=output_schema)
                )

For more information about Python models, see Wallaroo SDK Essentials Guide: Model Uploads and Registrations: Python Models.

Once the model has been uploaded, you can create a pipeline that contains the model. The pipeline is the mechanism that manages deployments. A pipeline contains a series of steps - sequential sets of models which take in the data from the preceding step, process it through the model, then return a result. Some pipelines can have just one step, while others may have multiple models with multiple steps or arranged for A/B testing. Deployed pipelines allocate resources and can then process data either through local files or through a deployment URL.

So for your model to accept inferences, you must add it to a pipeline. You can create a single step pipeline called mypipeline as follows.

# create the pipeline
my_pipeline = wl.build_pipeline("mypipeline").add_model_step(my_model)

# deploy the pipeline
my_pipeline = my_pipeline.deploy()

Deploying the pipeline means that resources from the cluster are allocated to the pipeline, and it is ready to accept inferences. You can “turn off” the pipeline with the call pipeline.undeploy(), which returns the resources back to the cluster. This is an important step - leaving pipeline deployed when they’re no longer needed takes up resources that may be needed by other pipelines or services.

See Wallaroo SDK Essentials Guide: Pipeline Management for full details.

More Hints

  • workspace = wl.get_current_workspace() gives you a handle to the current workspace
  • then workspace.models() will return a list of the models in the workspace
  • and workspace.pipelines() will return a list of the pipelines in the workspace

Exercise: Upload and deploy your model

Upload and deploy the ONNX model that you created in the previous exercise. For simplicity, do any needed pre-processing in the notebook.

At the end of the exercise, you should have a model and a deployed pipeline in your workspace.

## blank space to upload model, and create the pipeline

control_model_name = 'forecast-control-model'
control_model_file = '../models/forecast_standard.py'

# Holding on these for later
input_schema = pa.schema([
    pa.field('count', pa.list_(pa.int64()))
])

output_schema = pa.schema([
    pa.field('forecast', pa.list_(pa.int64())),
    pa.field('weekly_average', pa.list_(pa.float64()))
])

# upload the models

bike_day_model = (wl.upload_model(control_model_name, 
                                 control_model_file, 
                                 framework=Framework.PYTHON)
                                 .configure("python", 
                                 input_schema=input_schema, 
                                 output_schema=output_schema)
                )

# create the pipeline

pipeline_name = 'forecast-tutorial-pipeline'

pipeline = wl.build_pipeline(pipeline_name).clear().add_model_step(bike_day_model).deploy()

Sending Data to your Pipeline

ONNX models generally expect their input as an array in a dictionary, keyed by input name. In Wallaroo, the default input name is “tensor”. So (outside of Wallaroo), an ONNX model that expected three numeric values as its input would expect input data similar to the below: (Note: The below examples are only notional, they aren’t intended to work with our example models.)

# one datum
singleton = {'tensor': [[1, 2, 3]] }

# two datums
two_inputs = {'tensor': [[1, 2, 3], [4, 5, 6]] }

In the Wallaroo SDK, you can send a pandas DataFrame representation of this dictionary (pandas record format) to the pipeline, via the pipeline.infer() method.

import pandas as pd

# one datum (notional example)
sdf = pd.DataFrame(singleton)
sdf
#       tensor
# 0  [1, 2, 3]

# send the datum to a pipeline for inference
# notional example - not houseprice model!
result = my_pipeline.infer(sdf)

# two datums
# Note that the value of 'tensor' must be a list, not a numpy array 
twodf = pd.DataFrame(two_inputs)
twodf
#      tensor
# 0  [1, 2, 3]
# 1  [4, 5, 6]

# send the data to a pipeline for inference
# notional example, not houseprice model!
result = my_pipeline.infer(twodf)

To send data to a pipeline via the inference URL (for example, via CURL), you need the JSON representation of these data frames.

#
# notional examples, not houseprice model!
#
sdf.to_json(orient='records')
# '[{"tensor":[1,2,3]}]'

twodf.to_json(orient='records')
# '[{"tensor":[1,2,3]},{"tensor":[4,5,6]}]'

If the JSON data is in a file, you can send it to the pipeline from within the SDK via the pipeline.infer_from_file() method.

In either case, a successful inference will return a data frame of inference results. The model inference(s) will be in the column out.<outputname>.

For more details, see Wallaroo SDK Essentials Guide: Inference Management.

Inference Input with Forecast Statsmodel

For our particular statsmodel, the input is stored in the file day.csv, which tracks bike rentals and conditions from 2011 through 2012. The sample Python ARIMA Statsmodel used in these demonstrations has the following input and output schemas.

input_schema = pa.schema([
    pa.field('count', pa.list_(pa.int64()))
])

output_schema = pa.schema([
    pa.field('forecast', pa.list_(pa.int64())),
    pa.field('weekly_average', pa.list_(pa.float64()))
])

Converting from tabular format

If your input data is in a standard tabular format (like the test_data.csv example data in the data directory), then you need to convert to pandas record format to send the data to your pipeline. See the pandas DataFrame documentation for methods on how to import a CSV file to a DataFrame.

To help with the following exercises, here are some convenience functions you might find useful for doing this conversion. These functions convert input data in standard tabular format (in a pandas DataFrame) to the pandas record format that the model expects.

get_singleton assumes that all of the information is on a single row.

get_singleton_forecast assumes that the data is composed in a single DataFrame, and then reformats it to be in a single row. Our example ARIMA forecast model expects data in the input schema detailed above. So a series of inputs such as:

 count
0985
1801
21349
31562
41600
51606
61510
7959
8822
91321

Would have to be reformatted as:

 count
0[985, 801, 1349, 1562, 1600, 1606, 1510, 959, 822, 1321]
# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

Execute the following code block to see examples of what get_singleton and get_batch do.

# RUN ME!

print('''TOY data for a model that takes inputs var1, var2, var3.
The dataframe is called df.
Pretend the model is in a Wallaroo pipeline called "toypipeline"''')

df = pd.DataFrame({
    'var1': [1, 3, 5],
    'var2': [33, 88, 45],
    'var3': [6, 20, 5]
})

display(df)

# create a model input from the first row
# this is now in the format that a model would accept
singleton = get_singleton(df, 0)

print('''The command "singleton = get_singleton(df, 0)" converts
the first row of the data frame into the format that Wallaroo pipelines accept.
You could now get a prediction by: "toypipeline.infer(singleton)".
''')
display(singleton)

# create a batch of queries from the entire dataframe
batch = get_batch(df, nrows=2)

print('''The command "batch = get_batch(df, nrows=2)" converts
the the first two rows of the data frame into a batch format that Wallaroo pipelines accept.
You could now get a batch prediction by: "toypipeline.infer(batch)".
''')
display(batch)

print('''The command "singleton = get_singleton(df, 0)" converts
the first row of the data frame into the format that Wallaroo pipelines accept.
You could now get a prediction by: "toypipeline.infer(singleton)".
''')

sample_df = pd.DataFrame({"count": [1526, 
                                    1550, 
                                    1708, 
                                    1005, 
                                    1623]
                        })
display(sample_df)

inference_df = get_singleton_forecast(sample_df, 'count')
display(inference_df)
TOY data for a model that takes inputs var1, var2, var3.
The dataframe is called df.
Pretend the model is in a Wallaroo pipeline called "toypipeline"
var1var2var3
01336
138820
25455
The command "singleton = get_singleton(df, 0)" converts
the first row of the data frame into the format that Wallaroo pipelines accept.
You could now get a prediction by: "toypipeline.infer(singleton)".
tensor
0[1, 33, 6]
The command "batch = get_batch(df, nrows=2)" converts
the the first two rows of the data frame into a batch format that Wallaroo pipelines accept.
You could now get a batch prediction by: "toypipeline.infer(batch)".
tensor
0[1, 33, 6]
1[3, 88, 20]
The command "singleton = get_singleton(df, 0)" converts
the first row of the data frame into the format that Wallaroo pipelines accept.
You could now get a prediction by: "toypipeline.infer(singleton)".
count
01526
11550
21708
31005
41623
count
0[1526, 1550, 1708, 1005, 1623]

Exercise: Send data to your pipeline for inference.

Create some test data from the housing data and send it to the pipeline that you deployed in the previous exercise.

  • If you used the pre-provided models, then you can use test_data.csv from the data directory. This can be loaded directly into your sample pandas DataFrame - check the pandas documentation for a handy function for doing that. (We mention yours because sometimes people try to use the example code above rather than their own data.)

  • Start easy, with just one datum; retrieve the inference results. You can try small batches, as well. Use the above example as a guide.

  • Examine the inference results; observe what the model prediction column is called; it should be of the form out.<outputname>.

For more hints about the different ways of sending data to the pipeline, and to see an example of the inference result format, see the “Running Inferences” section of Wallaroo 101.

At the end of the exercise, you should have a set of inference results that you got through the Wallaroo pipeline.

##  blank space to create test data, and send some data to your model

sample_count = pd.read_csv('../data/test_data.csv')
# sample_df = sample_count.loc[0:20, ['count']]
inference_df = get_singleton_forecast(sample_count.loc[0:20], 'count')
display(inference_df)

results = pipeline.infer(inference_df)
display(results)
count
0[985, 801, 1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543]
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 01:12:30.126[985, 801, 1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543][1434, 1288, 1296, 1295, 1295, 1295, 1295][1314.0]0

Undeploying Your Pipeline

You should always undeploy your pipelines when you are done with them, or don’t need them for a while. This releases the resources that the pipeline is using for other processes to use. You can always redeploy the pipeline when you need it again. As a reminder, here are the commands to deploy and undeploy a pipeline:


# when the pipeline is deployed, it's ready to receive data and infer
pipeline.deploy()

# "turn off" the pipeline and releaase its resources
pipeline.undeploy()

If you are continuing on to the next notebook now, you can leave the pipeline deployed to keep working; but if you are taking a break, then you should undeploy.

## blank space to undeploy the pipeline, if needed

pipeline.undeploy()

Congratulations!

You have now

  • Successfully trained a model
  • Converted your model and uploaded it to Wallaroo
  • Created and deployed a simple single-step pipeline
  • Successfully send data to your pipeline for inference

In the next notebook, you will look at two different ways to evaluate your model against the real world environment.

2 - Retail: Model Experiments

How to use Wallaroo to experiment with different Retail Forecasting models.

Tutorial Notebook 2: Vetting a Model With Production Experiments

So far, we’ve discussed practices and methods for transitioning an ML model and related artifacts from development to production. However, just the act of pushing a model into production is not the only consideration. In many situations, it’s important to vet a model’s performance in the real world before fully activating it. Real world vetting can surface issues that may not have arisen during the development stage, when models are only checked using hold-out data.

In this notebook, you will learn about two kinds of production ML model validation methods: A/B testing and Shadow Deployments. A/B tests and other types of experimentation are part of the ML lifecycle. The ability to quickly experiment and test new models in the real world helps data scientists to continually learn, innovate, and improve AI-driven decision processes.

Preliminaries

In the blocks below we will preload some required libraries; we will also redefine some of the convenience functions that you saw in the previous notebook.

After that, you should log into Wallaroo and set your working environment to the workspace that you created in the previous notebook.

# preload needed libraries 

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

from IPython.display import display

# used to display DataFrame information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

import json
import datetime
import time

# used for unique connection names

import string
import random

import pyarrow as pa
## convenience functions from the previous notebook

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

Pre-exercise

If needed, log into Wallaroo and go to the workspace that you created in the previous notebook. Please refer to Notebook 1 to refresh yourself on how to log in and set your working environment to the appropriate workspace.

## blank space to log in and go to the appropriate workspace

wl = wallaroo.Client()


import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
suffix='john'

workspace_name = f'forecast-model-tutorial{suffix}'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
Please log into the following URL in a web browser:
https://doc-test.keycloak.wallarooexample.ai/auth/realms/master/device?user_code=PDKG-QSIF

Login successful!

{’name’: ‘forecast-model-tutorialjohn’, ‘id’: 16, ‘archived’: False, ‘created_by’: ‘0a36fba2-ad42-441b-9a8c-bac8c68d13fa’, ‘created_at’: ‘2023-08-02T15:50:52.816795+00:00’, ‘models’: [{’name’: ‘forecast-control-model’, ‘versions’: 3, ‘owner_id’: ‘""’, ’last_update_time’: datetime.datetime(2023, 8, 3, 1, 11, 50, 568151, tzinfo=tzutc()), ‘created_at’: datetime.datetime(2023, 8, 2, 15, 50, 54, 223186, tzinfo=tzutc())}, {’name’: ‘forecast-challenger01-model’, ‘versions’: 3, ‘owner_id’: ‘""’, ’last_update_time’: datetime.datetime(2023, 8, 3, 13, 55, 23, 119224, tzinfo=tzutc()), ‘created_at’: datetime.datetime(2023, 8, 2, 15, 50, 55, 208179, tzinfo=tzutc())}, {’name’: ‘forecast-challenger02-model’, ‘versions’: 3, ‘owner_id’: ‘""’, ’last_update_time’: datetime.datetime(2023, 8, 3, 13, 55, 24, 133756, tzinfo=tzutc()), ‘created_at’: datetime.datetime(2023, 8, 2, 15, 50, 56, 291043, tzinfo=tzutc())}], ‘pipelines’: [{’name’: ‘forecast-tutorial-pipeline’, ‘create_time’: datetime.datetime(2023, 8, 2, 15, 50, 59, 480547, tzinfo=tzutc()), ‘definition’: ‘[]’}]}

A/B Testing

An A/B test, also called a controlled experiment or a randomized control trial, is a statistical method of determining which of a set of variants is the best. A/B tests allow organizations and policy-makers to make smarter, data-driven decisions that are less dependent on guesswork.

In the simplest version of an A/B test, subjects are randomly assigned to either the control group (group A) or the treatment group (group B). Subjects in the treatment group receive the treatment (such as a new medicine, a special offer, or a new web page design) while the control group proceeds as normal without the treatment. Data is then collected on the outcomes and used to study the effects of the treatment.

In data science, A/B tests are often used to choose between two or more candidate models in production, by measuring which model performs best in the real world. In this formulation, the control is often an existing model that is currently in production, sometimes called the champion. The treatment is a new model being considered to replace the old one. This new model is sometimes called the challenger. In our discussion, we’ll use the terms champion and challenger, rather than control and treatment.

When data is sent to a Wallaroo A/B test pipeline for inference, each datum is randomly sent to either the champion or challenger. After enough data has been sent to collect statistics on all the models in the A/B test pipeline, then those outcomes can be analyzed to determine the difference (if any) in the performance of the champion and challenger. Usually, the purpose of an A/B test is to decide whether or not to replace the champion with the challenger.

Keep in mind that in machine learning, the terms experiments and trials also often refer to the process of finding a training configuration that works best for the problem at hand (this is sometimes called hyperparameter optimization). In this guide, we will use the term experiment to refer to the use of A/B tests to compare the performance of different models in production.


Exercise: Create some forecast models and upload them to Wallaroo

Use the forecast process from Notebook 1 to create at least one alternate forecast. You can do this by varying the modeling algorithm, the inputs, the feature engineering, or all of the above.

For the purpose of these exercises, please make sure that the predictions from the new model(s) are in the same units as the (champion) model that you created in Chapter 3. For example, if the champion model predicts log price, then the challenger models should also predict log price. If the champion model predicts price in units of $10,000, then the challenger models should, also.

This applies to the Python step if using one similar to the python step provided from Notebook 1 - if the outputs of the new forecast model match the previous one, that post-processing python step can be applied to both models.

  • If you prefer to shortcut this step, you can use some of the pretrained model Python model files in the models directory
    • If the Python models are used, ensure that the proper input and output schemas are set. See the N1_deploy_a_model notebook for instructions.
  • Upload your new model(s) to Wallaroo, into your forecast workspace

At the end of this exercise, you should have at least one challenger model to compare to your champion model uploaded to your workspace.

challenger01_model_name = "forecast-challenger01-model"
challenger01_model_path = "../models/forecast_alternate01.py"

challenger02_model_name = "forecast-challenger02-model"
challenger02_model_path = "../models/forecast_alternate02.py"

# Holding on these for later
input_schema = pa.schema([
    pa.field('count', pa.list_(pa.int64()))
])

output_schema = pa.schema([
    pa.field('forecast', pa.list_(pa.int64())),
    pa.field('weekly_average', pa.list_(pa.float64()))
])

# upload the models

challenger01_model = (wl.upload_model(challenger01_model_name, 
                                 challenger01_model_path, 
                                 framework=Framework.PYTHON)
                                 .configure("python", 
                                 input_schema=input_schema, 
                                 output_schema=output_schema)
                )

challenger02_model = (wl.upload_model(challenger02_model_name, 
                                 challenger02_model_path, 
                                 framework=Framework.PYTHON)
                                 .configure("python", 
                                 input_schema=input_schema, 
                                 output_schema=output_schema)
                )

There are a number of considerations to designing an A/B test; you can check out the article The What, Why, and How of A/B Testing for more details. In these exercises, we will concentrate on the deployment aspects. You will need a champion model and at least one challenger model. You also need to decide on a data split: for example 50-50 between the champion and challenger, or a 2:1 ratio between champion and challenger (two-thirds of the data to the champion, one-third to the challenger).

As an example of creating an A/B test deployment, suppose you have a champion model called “champion”, that you have been running in a one-step pipeline called “pipeline”. You now want to compare it to a challenger model called “challenger”. For your A/B test, you will send two-thirds of the data to the champion, and the other third to the challenger. Both models have already been uploaded.

To help you with the exercises, here some convenience functions to retrieve a models and pipelines that have been previously uploaded to your workspace (in this example, wl is your wallaroo.client() object).

# Get the most recent version of a model.
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname, modellist=wl.get_current_workspace().models()):
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname, plist = wl.get_current_workspace().pipelines()):
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]
# use the space here for retrieving the pipeline

pipeline_name = 'forecast-tutorial-pipeline'

pipeline = get_pipeline(pipeline_name)

Pipelines may have already been issued with pipeline steps. Pipeline steps can be removed or replaced with other steps.

The easiest way to clear all pipeline steps is with the Pipeline clear() method.

To remove one step, use the Pipeline remove_step(index) method, where index is the step number ordered from zero. For example, if a pipeline has one step, then remove_step(0) would remove that step.

To replace a pipeline step, use the Pipeline replace_with_model_step(index, model), where index is the step number ordered from zero, and the model is the model to be replacing it with.

Updated pipeline steps are not saved until the pipeline is redeployed with the Pipeline deploy() method.

Reference: Wallaroo SDK Essentials Guide: Pipeline Management
.

For A/B testing, pipeline steps are added or replace an existing step.

To add a A/B testing step use the Pipeline add_random_split method with the following parameters:

ParameterTypeDescription
champion_weightFloat (Required)The weight for the champion model.
champion_modelWallaroo.Model (Required)The uploaded champion model.
challenger_weightFloat (Required)The weight of the challenger model.
challenger_modelWallaroo.Model (Required)The uploaded challenger model.
hash_keyString(Optional)A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

Note that multiple challenger models with different weights can be added as the random split step.

In this example, a pipeline will be built with a 2:1 weighted ratio between the champion and a single challenger model.

pipeline.add_random_split([(2, control), (1, challenger)]))

To replace an existing pipeline step with an A/B testing step use the Pipeline replace_with_random_split method.

ParameterTypeDescription
indexInteger (Required)The pipeline step being replaced.
champion_weightFloat (Required)The weight for the champion model.
champion_modelWallaroo.Model (Required)The uploaded champion model.
challenger_weightFloat (Required)The weight of the challenger model.
challenger_modelWallaroo.Model (Required)The uploaded challenger model.
hash_keyString(Optional)A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

This example replaces the first pipeline step with a 2:1 champion to challenger radio.

pipeline.replace_with_random_split(0,[(2, control), (1, challenger)]))

In either case, the random split will randomly send inference data to one model based on the weighted ratio. As more inferences are performed, the ratio between the champion and challengers will align more and more to the ratio specified.

Reference: Wallaroo SDK Essentials Guide: Pipeline Management A/B Testing.

Then creating an A/B test deployment would look something like this:

First get the models used.

# retrieve handles to the most recent versions 
# of the champion and challenger models
champion = get_model("champion")
challenger = get_model("challenger")
# blank space to get the model(s)

control_model_name = 'forecast-control-model'

bike_day_model = get_model(control_model_name)

Second step is to retrieve the pipeline created in the previous Notebook, then redeploy it with the A/B testing split step.

Here’s some sample code:

# get an existing single-step pipeline and undeploy it
pipeline = get_pipeline("pipeline")
pipeline.undeploy()

# clear the pipeline and add a random split
pipeline.clear()
pipeline.add_random_split([(2, champion), (1, challenger)])
pipeline.deploy()

The above code clears out all the steps of the pipeline and adds a new step with a A/B test deployment, where the incoming data is randomly sent in a 2:1 ratio to the champion and the challenger, respectively.

You can add multiple challengers to an A/B test::

pipeline.add_random_split([ (2, champion), (1, challenger01), (1, challenger02) ])

This pipeline will distribute data in the ratio 2:1:1 (or half to the champion, a quarter each to the challlengers) to the champion and challenger models, respectively.

You can also create an A/B test deployment from scratch:

pipeline = wl.build_pipeline("pipeline")
pipeline.add_random_split([(2, champion), (1, challenger)])

If there is a pre or post process step, those would be added normally. For example:

pipeline = wl.build_pipeline("pipeline")
pipeline.add_random_split([(2, champion), (1, challenger)])
pipeline.add_model_step(postprocess-python)

This assumes that both the champion and challenger return the same data output schema that the post process step is expecting.


Exercise: Create an A/B test deployment of your house price models

Use the champion and challenger models that you created in the previous exercises to create an A/B test deployment. You can either create one from scratch, or reconfigure an existing pipeline.

  • Send half the data to the champion, and distribute the rest among the challenger(s).

At the end of this exercise, you should have an A/B test deployment and be ready to compare multiple models.

# blank space to retrieve pipeline and redeploy with a/b testing step

pipeline.undeploy()
pipeline.clear()
pipeline.add_random_split([(2, bike_day_model), (1, challenger01_model), (1, challenger02_model)])
pipeline.deploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-14 15:46:31.432410+00:00
deployedTrue
tags
versions3c5a263b-05de-4305-b121-a9a6b3749dbc, c065b69b-d622-4a7b-93e5-4fcacf62da86, b0a212e3-66b7-4599-9701-f4183045cec6, af0f9c1c-0c28-4aaa-81f5-abb470500960, 980ee03b-694e-47c7-b76b-43b3e927b281, 85af5504-f1e4-4d0d-bd9e-e46891211843, 39b82898-12b6-4a30-ab41-f06cb05c7391, d8edf8c5-07f0-455e-9f34-075b7062f56f, 170402aa-8e83-420e-bee3-51a9fca4a9d9, 14912dd4-5e3a-4314-9e3f-0ea3af3660c1, 3309619d-54b9-4499-8afd-ed7819339b64, 2af1f08c-976c-4d51-9cf6-2cc371788844, 76fbec8d-cebf-40e5-81d5-447170c4a836, c6c10a83-9b6c-449f-a5c3-63b36a3d749b, 436fe308-283f-43b0-a4f0-159c05193d97, eb9e5b9f-41d9-42dc-8e49-13ec4771abad, 4d062242-1477-40fd-bf11-835e6bd62c10, 1f3d774d-7626-4722-b4b8-7dedbaa35803, 12f73035-cf94-4e6c-b2b6-05946ab06aef, b4ec30ef-6724-467e-b42a-d54399198f32, 57e7acf8-b3f0-436b-a236-0b1d6e76ba18, 5697a317-d0e6-402b-9369-7f0e732cc1fa, 5d0cb620-f8ba-4b9d-a81b-0ba333584508, 6b14e208-1319-4bc4-927b-b76a4893d373, 0b44d911-c69e-4030-b481-84e947fe6c70, dc5605d2-bb6a-48d2-b83a-3d77b7e608af, a68819c0-7508-467e-9fc1-60cbf8aaf9e1, b908d302-ce87-4a52-8ef2-b595fac2c67e, 7b94201f-ef5b-4629-ae2f-acf894cb1fcf, dc8bf23f-b598-48c6-bb2d-c5098d264622, 3a8ebc46-6261-4977-8a60-038c99c255d7, 40ab9d3d-ee6c-4f0c-bf38-345385130285, 47792a90-bea8-432a-981f-232bf67288c8, 97b815f3-636b-4424-8be4-3d95bcf32b40, 0d2f2250-9a43-47ce-beef-32371986f798, 46c95b7f-a79e-41ee-8565-578f9c3c20e5, 1ff98a35-3468-4b70-84fc-fe71aed99a75, 73ff8fc2-ca4d-4ea1-887b-0d31190cfe36, f8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-challenger01-model

The pipeline steps are displayed with the Pipeline steps() method. This is used to verify the current deployed steps in the pipeline.

  • IMPORTANT NOTE: Verify that the pipeline is deployed before checking for pipeline steps. Deploying the pipeline sets the steps into the Wallaroo system - until that happens, the steps only exist in the local system as potential steps.
# blank space to get the current pipeline steps
pipeline.steps()
[{'RandomSplit': {'hash_key': None, 'weights': [{'model': {'name': 'forecast-control-model', 'version': '4c5ade81-ae25-4200-a69e-01e24d15fac5', 'sha': '3cd2acdd1f513f46615be7aa5beac16f09903be851e91f20f6dcdead4a48faa0'}, 'weight': 2}, {'model': {'name': 'forecast-challenger01-model', 'version': 'c99efea6-70b0-4bb0-a5c6-58f50478ca34', 'sha': '5035aca1989226ec1fa16ab325ed2ca7f88de22813d41f1a343f3acbca181dc4'}, 'weight': 1}, {'model': {'name': 'forecast-challenger02-model', 'version': 'fc12d991-9d79-499f-91cb-a7332cb91af6', 'sha': '94473071d321c00670dda36c7e7f953f4ed5fd2f33c2188b3a96dace19ece71d'}, 'weight': 1}]}}]

Please note that for batch inferences, the entire batch will be sent to the same model. So in order to verify that your pipeline is distributing inferences in the proportion you specified, you will need to send your queries one datum at a time.

To help with the next exercise, here is another convenience function you might find useful.

# get the names of the inferring models
# from a dataframe of a/b test results
def get_names(resultframe):
    modelcol = resultframe['out._model_split']
    jsonstrs = [mod[0]  for mod in modelcol]
    return [json.loads(jstr)['name'] for jstr in jsonstrs]

Here’s an example of how to send a large number of queries one at a time to your pipeline in the SDK

results = []

# get a list of result frames
for i in range(1000):
    query = get_singleton(testdata, i)
    results.append(pipeline.infer(query))

# make one data frame of all results    
allresults = pd.concat(results, ignore_index=True)

# add a column to indicate which model made the inference
allresults['modelname'] = get_names(allresults)

# get the counts of how many inferences were made by each model
allresults.modelname.value_counts()
  • NOTE: Performing 1,000 inferences sequentially may take several minutes to complete. Adjust the range for time as required.

As with the single-step pipeline, the model predictions will be in a column named out.<outputname>. In addition, there will be a column named out._model_split that contains information about the model that made a particular prediction. The get_names() convenience function above extracts the model name from the out._model_split column.


Exercise: Send some queries to your A/B test deployment

  1. Send a single datum to the A/B test pipeline you created in the previous exercise. You can use the same test data set that you created/downloaded in the previous notebook. Observe what the inference result looks like. If you send the singleton through the pipeline multiple times, you should observe that the model making the inference changes.
  2. Send a large number of queries (at least 100) one at a time to the pipeline.
  • Note that approximately half the inferences were made by the champion model.
  • The remaining inferences should be distributed as you specified.

The more queries you send, the closer the distribution should be to what you specified.

If you can align the actual house prices from your test data to the predictions, you can also compare the accuracy of the different models.

Don’t forget to undeploy your pipeline after you are done, to free up resources.

# blank space to send queries to A/B test pipeline and examine the results

sample_count = pd.read_csv('../data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')
display(inference_df)

for i in range(10):
    results = pipeline.infer(inference_df)
    display(results.loc[:, ["time", "out.weekly_average"]])
    display(get_names(results))
count
0[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986]
timeout.weekly_average
02023-08-14 15:47:55.828[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:56.327[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:56.793[986.0]
['forecast-challenger01-model']
timeout.weekly_average
02023-08-14 15:47:57.249[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:57.690[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:58.126[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:58.582[1048.0]
['forecast-challenger02-model']
timeout.weekly_average
02023-08-14 15:47:59.027[1292.5714285714287]
['forecast-control-model']
timeout.weekly_average
02023-08-14 15:47:59.487[986.0]
['forecast-challenger01-model']
timeout.weekly_average
02023-08-14 15:47:59.938[1292.5714285714287]
['forecast-control-model']

Shadow Deployments

Another way to vet your new model is to set it up in a shadow deployment. With shadow deployments, all the models in the experiment pipeline get all the data, and all inferences are recorded. However, the pipeline returns only one “official” prediction: the one from default, or champion model.

Shadow deployments are useful for “sanity checking” a model before it goes truly live. For example, you might have built a smaller, leaner version of an existing model using knowledge distillation or other model optimization techniques, as discussed here. A shadow deployment of the new model alongside the original model can help ensure that the new model meets desired accuracy and performance requirements before it’s put into production.

As an example of creating a shadow deployment, suppose you have a champion model called “champion”, that you have been running in a one-step pipeline called “pipeline”. You now want to put a challenger model called “challenger” into a shadow deployment with the champion. Both models have already been uploaded.

Shadow deployments can be added as a pipeline step, or replace an existing pipeline step.

Shadow deployment steps are added with the add_shadow_deploy(champion, [model2, model3,...]) method, where the champion is the model that the inference results will be returned. The array of models listed after are the models where inference data is also submitted with their results displayed as as shadow inference results.

Shadow deployment steps replace an existing pipeline step with the replace_with_shadow_deploy(index, champion, [model2, model3,...]) method. The index is the step being replaced with pipeline steps starting at 0, and the champion is the model that the inference results will be returned. The array of models listed after are the models where inference data is also submitted with their results displayed as as shadow inference results.

Then creating a shadow deployment from a previously created (and deployed) pipeline could look something like this:

# retrieve handles to the most recent versions 
# of the champion and challenger models
# see the A/B test section for the definition of get_model()
champion = get_model("champion")
challenger = get_model("challenger")

# get the existing pipeline and undeploy it
# see the A/B test section for the definition of get_pipeline()
pipeline = get_pipeline("pipeline")
pipeline.undeploy()

# clear the pipeline and add a shadow deploy step
pipeline.clear()
pipeline.add_shadow_deploy(champion, [challenger])
pipeline.deploy()

The above code clears the pipeline and adds a shadow deployment. The pipeline will still only return the inferences from the champion model, but it will also run the challenger model in parallel and log the inferences, so that you can compare what all the models do on the same inputs.

You can add multiple challengers to a shadow deploy:

pipeline.add_shadow_deploy(champion, [challenger01, challenger02])

You can also create a shadow deployment from scratch with a new pipeline. This example just uses two models - one champion, one challenger.

newpipeline = wl.build_pipeline("pipeline")
newpipeline.add_shadow_deploy(champion, [challenger])

Exercise: Create a house price model shadow deployment

Use the champion and challenger models that you created in the previous exercises to create a shadow deployment. You can either create one from scratch, or reconfigure an existing pipeline.

At the end of this exercise, you should have a shadow deployment running multiple models in parallel.

# blank space to create a shadow deployment

pipeline.clear()
pipeline.add_shadow_deploy(bike_day_model, [challenger01_model, challenger02_model])
pipeline.deploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-14 15:48:29.360779+00:00
deployedTrue
tags
versions2be416d2-bad8-4210-9fcd-aa1b3f197f64, 3c5a263b-05de-4305-b121-a9a6b3749dbc, c065b69b-d622-4a7b-93e5-4fcacf62da86, b0a212e3-66b7-4599-9701-f4183045cec6, af0f9c1c-0c28-4aaa-81f5-abb470500960, 980ee03b-694e-47c7-b76b-43b3e927b281, 85af5504-f1e4-4d0d-bd9e-e46891211843, 39b82898-12b6-4a30-ab41-f06cb05c7391, d8edf8c5-07f0-455e-9f34-075b7062f56f, 170402aa-8e83-420e-bee3-51a9fca4a9d9, 14912dd4-5e3a-4314-9e3f-0ea3af3660c1, 3309619d-54b9-4499-8afd-ed7819339b64, 2af1f08c-976c-4d51-9cf6-2cc371788844, 76fbec8d-cebf-40e5-81d5-447170c4a836, c6c10a83-9b6c-449f-a5c3-63b36a3d749b, 436fe308-283f-43b0-a4f0-159c05193d97, eb9e5b9f-41d9-42dc-8e49-13ec4771abad, 4d062242-1477-40fd-bf11-835e6bd62c10, 1f3d774d-7626-4722-b4b8-7dedbaa35803, 12f73035-cf94-4e6c-b2b6-05946ab06aef, b4ec30ef-6724-467e-b42a-d54399198f32, 57e7acf8-b3f0-436b-a236-0b1d6e76ba18, 5697a317-d0e6-402b-9369-7f0e732cc1fa, 5d0cb620-f8ba-4b9d-a81b-0ba333584508, 6b14e208-1319-4bc4-927b-b76a4893d373, 0b44d911-c69e-4030-b481-84e947fe6c70, dc5605d2-bb6a-48d2-b83a-3d77b7e608af, a68819c0-7508-467e-9fc1-60cbf8aaf9e1, b908d302-ce87-4a52-8ef2-b595fac2c67e, 7b94201f-ef5b-4629-ae2f-acf894cb1fcf, dc8bf23f-b598-48c6-bb2d-c5098d264622, 3a8ebc46-6261-4977-8a60-038c99c255d7, 40ab9d3d-ee6c-4f0c-bf38-345385130285, 47792a90-bea8-432a-981f-232bf67288c8, 97b815f3-636b-4424-8be4-3d95bcf32b40, 0d2f2250-9a43-47ce-beef-32371986f798, 46c95b7f-a79e-41ee-8565-578f9c3c20e5, 1ff98a35-3468-4b70-84fc-fe71aed99a75, 73ff8fc2-ca4d-4ea1-887b-0d31190cfe36, f8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-challenger01-model

Since a shadow deployment returns multiple predictions for a single datum, its inference result will look a little different from those of an A/B test or a single-step pipelne. The next exercise will show you how to examine all the inferences from all the models.


Exercise: Examine shadow deployment inferences

Use the test data that you created in a previous exercise to send a single datum to the shadow deployment that you created in the previous exercise.

  • Observe the inference result
  • You should see a column called out.<outputname>; this is the prediction from the champion model. It is the “official” prediction from the pipeline. If you used the same champion model in the A/B test exercise above, and in the single-step pipeline from the previous notebook, you should see the inference results from all those pipelines was also called out.<outputname>.
  • You should also see a column called out_<challengermodel>.<outputname> (or more than one, if you had multiple challengers). These are the predictions from the challenger models.

For example, if your champion model is called “champion”, your challenger model is called “challenger”, and the outputname is “output”,
then you should see the “official” prediction out.output and the shadow prediction out_challenger.output.

Save the datum and the inference result from this exercise. You will need it for the next exercise.

# blank space to send an inference and examine the result

inference_df = get_singleton_forecast(sample_count.loc[4:30], 'count')
display(inference_df)

results = pipeline.infer(inference_df)
display(results.filter(regex='time|out.*?weekly_average'))
count
0[1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986, 1416, 1985, 506, 431, 1167, 1098, 1096, 1501]
timeout.weekly_averageout_forecast-challenger01-model.weekly_averageout_forecast-challenger02-model.weekly_average
02023-08-14 15:49:25.882[1260.4285714285713][1472.7142857142858][1235.0]

After the Experiment: Swapping in New Models

You have seen two methods to validate models in production with test (challenger) models.
The end result of an experiment is a decision about which model becomes the new champion. Let’s say that you have been running the shadow deployment that you created in the previous exercise, and you have decided that you want to replace the model “champion” with the model “challenger”. To do this, you will clear all the steps out of the pipeline, and add only “challenger” back in.

# retrieve a handle to the challenger model
# see the A/B test section for the definition of get_model()
challenger = get_model("challenger")

# get the existing pipeline and undeploy it
# see the A/B test section for the definition of get_pipeline()
pipeline = get_pipeline("pipeline")
pipeline.undeploy()

# clear out all the steps and add the champion back in 
pipeline.clear() 
pipeline.add_model_step(challenger).deploy()

Exercise: Set a challenger model as the new active model

Pick one of your challenger models as the new champion, and reconfigure your shadow deployment back into a single-step pipeline with the new chosen model.

  • Run the test datum from the previous exercise through the reconfigured pipeline.
  • Compare the results to the results from the previous exercise.
  • Notice that the pipeline predictions are different from the old champion, and consistent with the new one.

At the end of this exercise, you should have a single step pipeline, running a new model.

# Blank space - remove all steps, then redeploy with new champion model

pipeline.undeploy()
pipeline.clear()

pipeline.add_model_step(bike_day_model)
pipeline.deploy()
display(pipeline.steps())

inference_df = get_singleton_forecast(sample_count.loc[4:30], 'count')
display(inference_df)

results = pipeline.infer(inference_df)
display(results.filter(regex='time|out.*?weekly_average'))

# swap the model

pipeline.replace_with_model_step(0, challenger02_model)
pipeline.deploy()

# gives time for the update to happen - usually milliseconds, sometimes longer.  This gives enough time for the database updates to happen
import time
time.sleep(15)

display(pipeline.steps())

results = pipeline.infer(inference_df)
display(results.filter(regex='time|out.*?weekly_average'))
[{'ModelInference': {'models': [{'name': 'forecast-control-model', 'version': '4c5ade81-ae25-4200-a69e-01e24d15fac5', 'sha': '3cd2acdd1f513f46615be7aa5beac16f09903be851e91f20f6dcdead4a48faa0'}]}}]
count
0[1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986, 1416, 1985, 506, 431, 1167, 1098, 1096, 1501]
timeout.weekly_average
02023-08-14 15:52:23.301[1260.4285714285713]
[{'ModelInference': {'models': [{'name': 'forecast-challenger02-model', 'version': 'fc12d991-9d79-499f-91cb-a7332cb91af6', 'sha': '94473071d321c00670dda36c7e7f953f4ed5fd2f33c2188b3a96dace19ece71d'}]}}]
timeout.weekly_average
02023-08-14 15:52:43.318[1235.0]

Congratulations!

You have now

  • successfully trained new challenger models for the house price prediction problem
  • compared your models using an A/B test
  • compared your models using a shadow deployment
  • replaced your old model for a new one in the house price prediction pipeline

In the next notebook, you will learn how to monitor your production pipeline for “anomalous” or out-of-range behavior.

Cleaning up.

At this point, if you are not continuing on to the next notebook, undeploy your pipeline(s) to give the resources back to the environment.

## blank space to undeploy the pipelines
pipeline.undeploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-14 15:52:23.736145+00:00
deployedFalse
tags
versions826ffd60-0113-4101-b356-395d8aa713a1, 9897a80f-a81e-4170-9d11-9ad6800f0fa7, f3e2c7a6-7a5a-4f7d-a441-9a6945812735, 2be416d2-bad8-4210-9fcd-aa1b3f197f64, 3c5a263b-05de-4305-b121-a9a6b3749dbc, c065b69b-d622-4a7b-93e5-4fcacf62da86, b0a212e3-66b7-4599-9701-f4183045cec6, af0f9c1c-0c28-4aaa-81f5-abb470500960, 980ee03b-694e-47c7-b76b-43b3e927b281, 85af5504-f1e4-4d0d-bd9e-e46891211843, 39b82898-12b6-4a30-ab41-f06cb05c7391, d8edf8c5-07f0-455e-9f34-075b7062f56f, 170402aa-8e83-420e-bee3-51a9fca4a9d9, 14912dd4-5e3a-4314-9e3f-0ea3af3660c1, 3309619d-54b9-4499-8afd-ed7819339b64, 2af1f08c-976c-4d51-9cf6-2cc371788844, 76fbec8d-cebf-40e5-81d5-447170c4a836, c6c10a83-9b6c-449f-a5c3-63b36a3d749b, 436fe308-283f-43b0-a4f0-159c05193d97, eb9e5b9f-41d9-42dc-8e49-13ec4771abad, 4d062242-1477-40fd-bf11-835e6bd62c10, 1f3d774d-7626-4722-b4b8-7dedbaa35803, 12f73035-cf94-4e6c-b2b6-05946ab06aef, b4ec30ef-6724-467e-b42a-d54399198f32, 57e7acf8-b3f0-436b-a236-0b1d6e76ba18, 5697a317-d0e6-402b-9369-7f0e732cc1fa, 5d0cb620-f8ba-4b9d-a81b-0ba333584508, 6b14e208-1319-4bc4-927b-b76a4893d373, 0b44d911-c69e-4030-b481-84e947fe6c70, dc5605d2-bb6a-48d2-b83a-3d77b7e608af, a68819c0-7508-467e-9fc1-60cbf8aaf9e1, b908d302-ce87-4a52-8ef2-b595fac2c67e, 7b94201f-ef5b-4629-ae2f-acf894cb1fcf, dc8bf23f-b598-48c6-bb2d-c5098d264622, 3a8ebc46-6261-4977-8a60-038c99c255d7, 40ab9d3d-ee6c-4f0c-bf38-345385130285, 47792a90-bea8-432a-981f-232bf67288c8, 97b815f3-636b-4424-8be4-3d95bcf32b40, 0d2f2250-9a43-47ce-beef-32371986f798, 46c95b7f-a79e-41ee-8565-578f9c3c20e5, 1ff98a35-3468-4b70-84fc-fe71aed99a75, 73ff8fc2-ca4d-4ea1-887b-0d31190cfe36, f8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-challenger01-model

3 - Retail: Drift Detection

How to use Wallaroo to detect model drift with assays

Tutorial Notebook 4: Observability Part 2 - Drift Detection

In the previous notebook you learned how to add simple validation rules to a pipeline, to monitor whether outputs (or inputs) stray out of some expected range. In this notebook, you will monitor the distribution of the pipeline’s predictions to see if the model, or the environment that it runs it, has changed.

Preliminaries

In the blocks below we will preload some required libraries; we will also redefine some of the convenience functions that you saw in the previous notebooks.

After that, you should log into Wallaroo and set your working environment to the workspace that you created for this tutorial. Please refer to Notebook 1 to refresh yourself on how to log in and set your working environment to the appropriate workspace.

# preload needed libraries 

import wallaroo
from wallaroo.object import EntityNotFoundError

from IPython.display import display

# used to display DataFrame information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

import json
import datetime
import time

# used for unique connection names

import string
import random
import pyarrow as pa
## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname):
    modellist = wl.get_current_workspace().models()
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]
## blank space to log in and go to correct workspace

## blank space to log in and go to the appropriate workspace

wl = wallaroo.Client()

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'forecast-model-tutorial'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
{'name': 'forecast-model-tutorialjohn', 'id': 16, 'archived': False, 'created_by': '0a36fba2-ad42-441b-9a8c-bac8c68d13fa', 'created_at': '2023-08-02T15:50:52.816795+00:00', 'models': [{'name': 'forecast-control-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 1, 11, 50, 568151, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 54, 223186, tzinfo=tzutc())}, {'name': 'forecast-challenger01-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 13, 55, 23, 119224, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 55, 208179, tzinfo=tzutc())}, {'name': 'forecast-challenger02-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 13, 55, 24, 133756, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 56, 291043, tzinfo=tzutc())}], 'pipelines': [{'name': 'forecast-tutorial-pipeline', 'create_time': datetime.datetime(2023, 8, 2, 15, 50, 59, 480547, tzinfo=tzutc()), 'definition': '[]'}]}

Monitoring for Drift: Shift Happens.

In machine learning, you use data and known answers to train a model to make predictions for new previously unseen data. You do this with the assumption that the future unseen data will be similar to the data used during training: the future will look somewhat like the past.
But the conditions that existed when a model was created, trained and tested can change over time, due to various factors.

A good model should be robust to some amount of change in the environment; however, if the environment changes too much, your models may no longer be making the correct decisions. This situation is known as concept drift; too much drift can obsolete your models, requiring periodic retraining.

Let’s consider the example we’ve been working on: home sale price prediction. You may notice over time that there has been a change in the mix of properties in the listings portfolio: for example a dramatic increase or decrease in expensive properties (or more precisely, properties that the model thinks are expensive)

Such a change could be due to many factors: a change in interest rates; the appearance or disappearance of major sources of employment; new housing developments opening up in the area. Whatever the cause, detecting such a change quickly is crucial, so that the business can react quickly in the appropriate manner, whether that means simply retraining the model on fresher data, or a pivot in business strategy.

In Wallaroo you can monitor your housing model for signs of drift through the model monitoring and insight capability called Assays. Assays help you track changes in the environment that your model operates within, which can affect the model’s outcome. It does this by tracking the model’s predictions and/or the data coming into the model against an established baseline. If the distribution of monitored values in the current observation window differs too much from the baseline distribution, the assay will flag it. The figure below shows an example of a running scheduled assay.

Figure: A daily assay that’s been running for a month. The dots represent the difference between the distribution of values in the daily observation window, and the baseline. When that difference exceeds the specified threshold (indicated by a red dot) an alert is set.

This next set of exercises will walk you through setting up an assay to monitor the predictions of your house price model, in order to detect drift.

NOTE

An assay is a monitoring process that typically runs over an extended, ongoing period of time. For example, one might set up an assay that every day monitors the previous 24 hours’ worth of predictions and compares it to a baseline. For the purposes of these exercises, we’ll be compressing processes what normally would take hours or days into minutes.


Exercise Prep: Create some datasets for demonstrating assays

Because assays are designed to detect changes in distributions, let’s try to set up data with different distributions to test with. Take your houseprice data and create two sets: a set with lower prices, and a set with higher prices. You can split however you choose.

The idea is we will pretend that the set of lower priced houses represent the “typical” mix of houses in the housing portfolio at the time you set your baseline; you will introduce the higher priced houses later, to represent an environmental change when more expensive houses suddenly enter the market.

  • If you are using the pre-provided models to do these exercises, you can use the provided data sets lowprice.df.json and highprice.df.json. This is to establish our baseline as a set of known values, so the higher prices will trigger our assay alerts.
lowprice_data = pd.read_json('lowprice.df.json')
highprice_data = pd.read_json('highprice.df.json')

Note that the data in these files are already in the form expected by the models, so you don’t need to use the get_singleton or get_batch convenience functions to infer.

At the end of this exercise, you should have two sets of data to demonstrate assays. In the discussion below, we’ll refer to these sets as lowprice_data and highprice_data.

# blank spot to split or download data

sample_count = pd.read_csv('../data/test_data.csv')
# sample_df = sample_count.loc[0:20, ['count']]
inference_df = get_singleton_forecast(sample_count.loc[1:30], 'count')
display(inference_df)
count
0[801, 1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986, 1416, 1985, 506, 431, 1167, 1098, 1096, 1501]

We will use this data to set up some “historical data” in the house price prediction pipeline that you build in the assay exercises.

Setting up a baseline for the assay

In order to know whether the distribution of your model’s predictions have changed, you need a baseline to compare them to. This baseline should represent how you expect the model to behave at the time it was trained. This might be approximated by the distribution of the model’s predictions over some “typical” period of time. For example, we might collect the predictions of our model over the first few days after it’s been deployed. For these exercises, we’ll compress that to a few minutes. Currently, to set up a wallaroo assay the pipeline must have been running for some period of time, and the assumption is that this period of time is “typical”, and that the distributions of the inputs and the outputs of the model during this period of time are “typical.”

Exercise Prep: Run some inferences and set some time stamps

Here, we simulate having a pipeline that’s been running for a long enough period of time to set up an assay.

To send enough data through the pipeline to create assays, you execute something like the following code (using the appropriate names for your pipelines and models). Note that this step will take a little while, because of the sleep interval.

You will need the timestamps baseline_start, and baseline_end, for the next exercises.

# get your pipeline (in this example named "mypipeline")

pipeline = get_pipeline("mypipeline")
pipeline.deploy()

## Run some baseline data
# Where the baseline data will start
baseline_start = datetime.datetime.now()

# the number of samples we'll use for the baseline
nsample = 500

# Wait 30 seconds to set this data apart from the rest
# then send the data in batch
time.sleep(30)

# get a sample
lowprice_data_sample = lowprice_data.sample(nsample, replace=True).reset_index(drop=True)
pipeline.infer(lowprice_data_sample)

# Set the baseline end
baseline_end = datetime.datetime.now()
# blank space to get pipeline and set up baseline data

control_model_name = 'forecast-control-model'

bike_day_model = get_model(control_model_name)

pipeline_name = 'forecast-tutorial-pipeline'

pipeline = get_pipeline(pipeline_name)
# set the pipeline step
pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(bike_day_model)
pipeline.deploy()
pipeline.steps()
[{'ModelInference': {'models': [{'name': 'forecast-control-model', 'version': 'd9af417f-29c3-49b1-9cad-a930779825d2', 'sha': '98b5f0911f608fdf9052b1b6db95c89a2c77c4b10d8f64a6d27df846ac616eb1'}]}}]
# test inference

sample_count = pd.read_csv('../data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)
display(results)
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 15:01:28.625[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295][1292.5714285714287]0

Before setting up an assay on this pipeline’s output, we may want to look at the distribution of the predictions over our selected baseline period. To do that, we’ll create an assay_builder that specifies the pipeline, the model in the pipeline, and the baseline period.. We’ll also specify that we want to look at the output of the model, which in the example code is named variable, and would appear as out.variable in the logs.

# print out one of the logs to get the name of the output variable
display(pipeline.logs(limit=1))

# get the model name directly off the pipeline (you could just hard code this, if you know the name)

model_name = pipeline.model_configs()[0].model().name()

assay_builder = ( wl.build_assay(assay_name, pipeline, model_name, 
                     baseline_start, baseline_end)
                    .add_iopath("output variable 0") ) # specify that we are looking at the first output of the output variable "variable"

where baseline_start and baseline_end are the beginning and end of the baseline periods as datetime.datetime objects.

You can then examine the distribution of variable over the baseline period:

assay_builder.baseline_histogram()

Exercise: Create an assay builder and set a baseline

Create an assay builder to monitor the output of your house price pipeline. The baseline period should be from baseline_start to baseline_end.

  • You will need to know the name of your output variable, and the name of the model in the pipeline.

Examine the baseline distribution.

## Blank space to create an assay builder and examine the baseline distribution

# we'll use the first month in 2011 as the baseline.  Then we can compare the first month of 2022 next.

import datetime
baseline_start = datetime.datetime.now()

for i in range(30):
    inference_df = get_singleton_forecast(sample_count.loc[i:i+30], 'count')
    results = pipeline.infer(inference_df)

baseline_end = datetime.datetime.now()
# now build the actual baseline

import string
import random

assay_suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

assay_name = f'forecast assay {assay_suffix}'

assay_builder = ( wl.build_assay(assay_name, 
                                 pipeline, 
                                 control_model_name,
                                 baseline_start, 
                                 baseline_end
                                 )
                                 .add_iopath("output weekly_average 0") )
baseline_run = assay_builder.build().interactive_baseline_run()

baseline_run.baseline_stats()
Baseline
count29
min1232.428571
max1800.142857
mean1439.492611
median1387.571429
std168.388389
start2023-08-03T15:03:54.045601Z
end2023-08-03T15:04:08.323293Z

An assay should detect if the distribution of model predictions changes from the above distribution over regularly sampled observation windows. This is called drift.

To show drift, we’ll run more data through the pipeline – first some data drawn from the same distribution as the baseline (lowprice_data). Then, we will gradually introduce more data from a different distribution (highprice_data). We should see the difference between the baseline distribution and the distribution in the observation window increase.

To set up the data, you should do something like the below. It will take a while to run, because of all the sleep intervals.

You will need the assay_window_end for a later exercise.

IMPORTANT NOTE: To generate the data for the assay, this process may take 4-5 minutes. Because the shortest period of time for an assay window is 1 minute, the intervals of inference data are spaced to fall within that time period.

# Set the start for our assay window period.
assay_window_start = datetime.datetime.now()

# Run a set of house values, spread across a "longer" period of time

# run "typical" data
for x in range(4):
    pipeline.infer(lowprice_data.sample(2*nsample, replace=True).reset_index(drop=True))
    time.sleep(25)
    
# run a mix
for x in range(3):
    pipeline.infer(lowprice_data.sample(nsample, replace=True).reset_index(drop=True))
    pipeline.infer(highprice_data.sample(nsample, replace=True).reset_index(drop=True))
    time.sleep(25)
    
# high price houses dominate the sample
for x in range(3):
    pipeline.infer(highprice_data.sample(2*nsample, replace=True).reset_index(drop=True))
    time.sleep(25)

# End our assay window period
assay_window_end = datetime.datetime.now()

Exercise Prep: Run some inferences and set some time stamps

Run more data through the pipeline, manifesting a drift, like the example above. It may around 10 minutes depending on how you stagger the inferences.

## Blank space to run more data

# same inferences as before, but now we'll jump ahead to 2012, and get 4 months of assays
import time

assay_start = datetime.datetime.now()

for i in range(30):
    inference_df = get_singleton_forecast(sample_count.loc[i+367:i+397], 'count')
    results = pipeline.infer(inference_df)

time.sleep(60)
for i in range(30):
    inference_df = get_singleton_forecast(sample_count.loc[i+397:i+427], 'count')
    results = pipeline.infer(inference_df)

time.sleep(60)
for i in range(30):
    inference_df = get_singleton_forecast(sample_count.loc[i+427:i+457], 'count')
    results = pipeline.infer(inference_df)

assay_end = datetime.datetime.now()

Defining the Assay Parameters

Now we’re finally ready to set up an assay!

The Observation Window

Once a baseline period has been established, you must define the window of observations that will be compared to the baseline. For instance, you might want to set up an assay that runs every 12 hours, collects the previous 24 hours’ predictions and compares the distribution of predictions within that 24 hour window to the baseline. To set such a comparison up would look like this:

assay_builder.window_builder().add_width(hours=24).add_interval(hours=12)

In other words width is the width of the observation window, and interval is how often an assay (comparison) is run. The default value of width is 24 hours; the default value of interval is to set it equal to width. The units can be specified in one of: minutes, hours, days, weeks.

The Comparison Threshold

Given an observation window and a baseline distribution, an assay computes the distribution of predictions in the observation window. It then calculates the “difference” (or “distance”) between the observed distribution and the baseline distribution. For the assay’s default distance metric (which we will use here), a good starting threshold is 0.1. Since a different value may work best for a specific situation, you can try interactive assay runs on historical data to find a good threshold, as we do in these exercises.

To set the assay threshold for the assays to 0.1:

assay_builder.add_alert_threshold(0.1)

Running an Assay on Historical Data

In this exercise, you will build an interactive assay over historical data. To do this, you need an end time (endtime).

Depending on the historical history, the window and interval may need adjusting. If using the previously generated information, an interval window as short as 1 minute may be useful.

Assuming you have an assay builder with the appropriate window parameters and threshold set, you can do an interactive run and look at the results would look like this.

# set the end of the interactive run
assay_builder.add_run_until(endtime)

# set the window

assay_builder.window_builder().add_width(hours=24).add_interval(hours=12)

assay_results = assay_builder.build().interactive_run()
df = assay_results.to_dataframe() # to return the results as a table
assay_results.chart_scores() # to plot the run

Exercise: Create an interactive assay

Use the assay_builder you created in the previous exercise to set up an interactive assay.

  • The assay should run every minute, on a window that is a minute wide.
  • Set the alert threshold to 0.1.
  • You can use assay_window_end (or a later timestamp) as the end of the interactive run.

Examine the assay results. Do you see any drift?

To try other ways of examining the assay results, see the “Interactive Assay Runs” section of the Model Insights tutorial.

# blank space for setting assay parameters, creating and examining an interactive assay

# now set up our interactive assay based on the window set above.

assay_builder = assay_builder.add_run_until(assay_end)

# We don't have many records at the moment, so set the width to 1 minute so it'll slice each 
# one minute interval into a window to analyze
assay_builder.window_builder().add_width(minutes=1).add_interval(minutes=1)

# Build the assay and then do an interactive run rather than waiting for the next interval
assay_config = assay_builder.build()
assay_results = assay_config.interactive_run()
# Show how many assay windows were analyzed, then show the chart
print(f"Generated {len(assay_results)} analyses")
assay_results.chart_scores()
Generated 9 analyses
# Display the results as a DataFrame - we're mainly interested in the score and whether the 
# alert threshold was triggered
display(assay_results.to_dataframe().loc[:, ["score", "start", "alert_threshold", "status"]])
scorestartalert_thresholdstatus
04.1030502023-08-03T15:04:08.323293+00:000.25Alert
18.8769292023-08-03T15:09:08.323293+00:000.25Alert
28.8769292023-08-03T15:10:08.323293+00:000.25Alert
38.8769292023-08-03T15:11:08.323293+00:000.25Alert
48.8769292023-08-03T15:12:08.323293+00:000.25Alert
58.8769292023-08-03T15:13:08.323293+00:000.25Alert
68.8769292023-08-03T15:14:08.323293+00:000.25Alert
78.8769292023-08-03T15:15:08.323293+00:000.25Alert
81.7619332023-08-03T15:16:08.323293+00:000.25Alert

Scheduling an Assay to run on ongoing data

(We won’t be doing an exercise here, this is for future reference).

Once you are satisfied with the parameters you have set, you can schedule an assay to run regularly .

# create a fresh assay builder with the correct parameters
assay_builder = ( wl.build_assay(assay_name, pipeline, model_name, 
                     baseline_start, baseline_end)
                    .add_iopath("output variable 0") )

# this assay runs every 24 hours on a 24 hour window
assay_builder.window_builder().add_width(hours=24)
assay_builder.add_alert_threshold(0.1)

# now schedule the assay
assay_id = assay_builder.upload()

You can use the assay id later to get the assay results.

You have now walked through setting up a basic assay and running it over historical data.

Congratulations!

In this tutorial you have

  • Deployed a single step house price prediction pipeline and sent data to it.
  • Compared two house price prediction models in an A/B test
  • Compared two house price prediction models in a shadow deployment.
  • Swapped the “winner” of the comparisons into the house price prediction pipeline.
  • Set validation rules on the pipeline.
  • Set up an assay on the pipeline to monitor for drift in its predictions.

Great job!

Cleaning up.

Now that the tutorial is complete, don’t forget to undeploy your pipeline to free up the resources.

# blank space to undeploy your pipeline

pipeline.undeploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-03 15:01:17.534210+00:00
deployedFalse
tags
versionsaf0f9c1c-0c28-4aaa-81f5-abb470500960, 980ee03b-694e-47c7-b76b-43b3e927b281, 85af5504-f1e4-4d0d-bd9e-e46891211843, 39b82898-12b6-4a30-ab41-f06cb05c7391, d8edf8c5-07f0-455e-9f34-075b7062f56f, 170402aa-8e83-420e-bee3-51a9fca4a9d9, 14912dd4-5e3a-4314-9e3f-0ea3af3660c1, 3309619d-54b9-4499-8afd-ed7819339b64, 2af1f08c-976c-4d51-9cf6-2cc371788844, 76fbec8d-cebf-40e5-81d5-447170c4a836, c6c10a83-9b6c-449f-a5c3-63b36a3d749b, 436fe308-283f-43b0-a4f0-159c05193d97, eb9e5b9f-41d9-42dc-8e49-13ec4771abad, 4d062242-1477-40fd-bf11-835e6bd62c10, 1f3d774d-7626-4722-b4b8-7dedbaa35803, 12f73035-cf94-4e6c-b2b6-05946ab06aef, b4ec30ef-6724-467e-b42a-d54399198f32, 57e7acf8-b3f0-436b-a236-0b1d6e76ba18, 5697a317-d0e6-402b-9369-7f0e732cc1fa, 5d0cb620-f8ba-4b9d-a81b-0ba333584508, 6b14e208-1319-4bc4-927b-b76a4893d373, 0b44d911-c69e-4030-b481-84e947fe6c70, dc5605d2-bb6a-48d2-b83a-3d77b7e608af, a68819c0-7508-467e-9fc1-60cbf8aaf9e1, b908d302-ce87-4a52-8ef2-b595fac2c67e, 7b94201f-ef5b-4629-ae2f-acf894cb1fcf, dc8bf23f-b598-48c6-bb2d-c5098d264622, 3a8ebc46-6261-4977-8a60-038c99c255d7, 40ab9d3d-ee6c-4f0c-bf38-345385130285, 47792a90-bea8-432a-981f-232bf67288c8, 97b815f3-636b-4424-8be4-3d95bcf32b40, 0d2f2250-9a43-47ce-beef-32371986f798, 46c95b7f-a79e-41ee-8565-578f9c3c20e5, 1ff98a35-3468-4b70-84fc-fe71aed99a75, 73ff8fc2-ca4d-4ea1-887b-0d31190cfe36, f8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-control-model

4 - Retail: Data Connections

How to use Wallaroo Connections to store external data configurations

Statsmodel Forecast with Wallaroo Features: Data Connection

Wallaroo Connections are definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source.

This provides MLOps engineers a method of creating and updating connection information for data stores: databases, Kafka topics, etc. Wallaroo Connections are composed of three main parts:

  • Name: The unique name of the connection.
  • Type: A user defined string that designates the type of connection. This is used to organize connections.
  • Details: Details are a JSON object containing the information needed to make the connection. This can include data sources, authentication tokens, etc.

Wallaroo Connections are only used to store the connection information used by other processes to create and use external connections. The user still has to provide the libraries and other elements to actually make and use the conneciton.

The primary advantage is Wallaroo connections allow scripts and other code to retrieve the connection details directly from their Wallaroo instance, then refer to those connection details. They don’t need to know what those details actually - they can refer to them in their code to make their code more flexible.

For this step, we will use a Google BigQuery dataset to retrieve the inference information, predict the next month of sales, then store those predictions into another table. This will use the Wallaroo Connection feature to create a Connection, assign it to our workspace, then perform our inferences by using the Connection details to connect to the BigQuery dataset and tables.

Prerequisites

  • A Wallaroo instance version 2023.2.1 or greater.

References

Statsmodel Forecast Connection Steps

Import Libraries

The first step is to import the libraries that we will need.

import json
import os
import datetime

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np

pd.set_option('display.max_colwidth', None)

import time
import pyarrow as pa
## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname):
    modellist = wl.get_current_workspace().models()
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Set Configurations

The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.

Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, set the suffix variable or share the workspace with other users.

Set Configurations References

# retrieve the workspace, pipeline and model

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'forecast-model-tutorial'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)

control_model_name = 'forecast-control-model'

bike_day_model = get_model(control_model_name)

pipeline_name = 'forecast-tutorial-pipeline'

pipeline = get_pipeline(pipeline_name)

Deploy the Pipeline

Let’s set the model step to our single model pipeline, and perform a sample inference with our current data.

# Set pipeline step and deploy

pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(bike_day_model)
pipeline.deploy()
pipeline.steps()
[{'ModelInference': {'models': [{'name': 'forecast-control-model', 'version': 'd9af417f-29c3-49b1-9cad-a930779825d2', 'sha': '98b5f0911f608fdf9052b1b6db95c89a2c77c4b10d8f64a6d27df846ac616eb1'}]}}]
# sample inference from previous code here

sample_count = pd.read_csv('../data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)
display(results)
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 15:33:47.771[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295][1292.5714285714287]0

Create the Connection

For this demonstration, the connection set to a specific file on a GitHub repository. The connection details can be anything that can be stored in JSON: connection URLs, tokens, etc.

This connection will set a URL to pull a file from GitHub, then use the file contents to perform an inference.

Wallaroo connections are created through the Wallaroo Client create_connection(name, type, details) method. See the Wallaroo SDK Essentials Guide: Data Connections Management guide for full details.

Note that connection names must be unique across the Wallaroo instance - if needed, use random characters at the end to make sure your connection doesn’t have the same name as a previously created connection.

# set the connection information for other steps
# suffix is used to create a unique data connection

forecast_connection_input_name = f'bike-rentals-csv'
forecast_connection_input_type = "HTTP"
forecast_connection_input_argument = { 
    "url": "https://raw.githubusercontent.com/WallarooLabs/Tutorials/jch-forecast-tutorials/Forecasting/Retail-CPG/data/test_data.csv"
    }

wl.create_connection(forecast_connection_input_name, forecast_connection_input_type, forecast_connection_input_argument)
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces[]

List Connections

Connections for the entire Wallaroo instance are listed with Wallaroo Client list_connections() method.

# list the connections here

wl.list_connections()
nameconnection typedetailscreated atlinked workspaces
statsmodel-bike-rentals-johnHTTP*****2023-08-02T20:38:34.662841+00:00['forecast-model-tutorialjohn']
bike-rentals-csv-johnHTTP*****2023-08-03T15:40:28.804640+00:00[]

Get Connection by Name

To retrieve a previosly created conneciton, we can assign it to a variable with the method Wallaroo Client.get_connection(connection_name). Then we can display the connection itself. Notice that when displaying a connection, the details section will be hidden, but they are retrieved with connection.details(). Here’s an example:

myconnection = client.get_connection("My amazing connection")
display(myconnection)
display(myconnection.details()

Use that code to retrieve your new connection.

# get the connection by name

connection = wl.get_connection(forecast_connection_input_name)
connection
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces[]

Add Connection to Workspace

We’ll now add the connection to our workspace so it can be retrieved by other workspace users. The method Workspace add_connection(connection_name) adds a Data Connection to a workspace. The method Workspace list_connections() displays a list of connections attached to the workspace.

workspace.add_connection(forecast_connection_input_name)
workspace.list_connections()
nameconnection typedetailscreated atlinked workspaces
statsmodel-bike-rentals-johnHTTP*****2023-08-02T20:38:34.662841+00:00['forecast-model-tutorialjohn']
bike-rentals-csv-johnHTTP*****2023-08-03T15:40:28.804640+00:00['forecast-model-tutorialjohn']

Retrieve Connection from Workspace

To simulate a data scientist’s procedural flow, we’ll now retrieve the connection from the workspace. Specific connections are retrieved by specifying their position in the returned list.

For example, if we have two connections in a workspace and we want the second one, we can assign it to a variable with list_connections[1].

Create a new variable and retrieve the connection we just assigned to the workspace.

forecast_connection = workspace.list_connections()[-1]
display(forecast_connection)
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces['forecast-model-tutorialjohn']

Run Inference with Connection

We’ll now retrieve sample data through the Wallaroo connection, and perform a sample inference. The connection details are retrieved through the Connection details() method. Use them to retrieve the CSV file and convert it to a DataFrame, and use it with our sample model.

Or create a new connection with your own data. Here’s some sample code for retrieving a CSV file from a URL.

response = requests.get('https://myurl.com/csvsample.csv')

csv_text = response.text
csv_new = csv_text.replace('\\n', '\n')

from io import StringIO
csvStringIO = StringIO(csv_new)
df = pd.read_csv(csvStringIO)
display(forecast_connection.details()['url'])

import requests

response = requests.get(
                    forecast_connection.details()['url']
                )

csv_text = response.text
csv_new = csv_text.replace('\\n', '\n')

from io import StringIO
csvStringIO = StringIO(csv_new)

# print(csv_new)

sample_count = pd.read_csv(csvStringIO)
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)
display(results)
'https://raw.githubusercontent.com/WallarooLabs/Tutorials/jch-forecast-tutorials/Forecasting/Retail-CPG/data/test_data.csv'
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 15:56:59.299[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295][1292.5714285714287]0

You have now walked through setting up a basic assay and running it over historical data.

Congratulations!

In this tutorial you have

  • Deployed a single step house price prediction pipeline and sent data to it.
  • Create a new Wallaroo connection
  • Assigned the connection to a workspace
  • Retrieved the connection from the workspace
  • Used the data connection to retrieve information from outside of Wallaroo, and use it for an inference

Great job!

Cleaning up.

Now that the tutorial is complete, don’t forget to undeploy your pipeline to free up the resources.

pipeline.undeploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-02 20:22:05.264816+00:00
deployedFalse
tags
versionsa109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-control-model

5 - Retail: ML Workload Orchestrations

How to use ML Workload Orchestrations to automate reoccurring tasks in Wallaroo

Statsmodel Forecast with Wallaroo Features: ML Workload Orchestration

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Prerequisites

  • A Wallaroo instance version 2023.2.1 or greater.

References

Orchestrations, Taks, and Tasks Runs

We’ve details how Wallaroo Connections work. Now we’ll use Orchestrations, Tasks, and Task Runs.

ItemDescription
OrchestrationML Workload orchestration allows data scientists and ML Engineers to automate and scale production ML workflows in Wallaroo to ensure a tight feedback loop and continuous tuning of models from training to production. Wallaroo platform users (data scientists or ML Engineers) have the ability to deploy, automate and scale recurring batch production ML workloads that can ingest data from predefined data sources to run inferences in Wallaroo, chain pipelines, and send inference results to predefined destinations to analyze model insights and assess business outcomes.
TaskAn implementation of an Orchestration. Tasks can be either Run Once: They run once and upon completion, stop. Run Scheduled: The task runs whenever a specific cron like schedule is reached. Scheduled tasks will run until the kill command is issued.
Task RunThe execusion of a task. For Run Once tasks, there will be only one Run Task. A Run Scheduled tasks will have multiple tasks, one for every time the schedule parameter is met. Task Runs have their own log files that can be examined to track progress and results.

Statsmodel Forecast Connection Steps

Import Libraries

The first step is to import the libraries that we will need.

import json
import os
import datetime

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np

pd.set_option('display.max_colwidth', None)

import time
display(wallaroo.__version__)
'2023.2.1'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Set Configurations

The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.

Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, set the suffix variable or share the workspace with other users.

Set Configurations References

# used for unique connection names

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'forecast-model-tutorial'

pipeline_name = f'forecast-tutorial-pipeline'

Set the Workspace and Pipeline

The workspace will be either used or created if it does not exist, along with the pipeline. The models uploaded in the Upload and Deploy tutorial are referenced in this step.

## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname):
    modellist = wl.get_current_workspace().models()
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]
workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

control_model_name = 'forecast-control-model'

bike_day_model = get_model(control_model_name)

Deploy Pipeline

The pipeline is already set witht the model. For our demo we’ll verify that it’s deployed.

# Set the deployment to allow for additional engines to run
# Undeploy and clear the pipeline in case it was used in other demonstrations
pipeline.undeploy()
pipeline.clear()

pipeline.add_model_step(bike_day_model)
# pipeline.add_model_step(step)
pipeline.deploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-03 17:58:19.277566+00:00
deployedTrue
tags
versionsc065b69b-d622-4a7b-93e5-4fcacf62da86, b0a212e3-66b7-4599-9701-f4183045cec6, af0f9c1c-0c28-4aaa-81f5-abb470500960, 980ee03b-694e-47c7-b76b-43b3e927b281, 85af5504-f1e4-4d0d-bd9e-e46891211843, 39b82898-12b6-4a30-ab41-f06cb05c7391, d8edf8c5-07f0-455e-9f34-075b7062f56f, 170402aa-8e83-420e-bee3-51a9fca4a9d9, 14912dd4-5e3a-4314-9e3f-0ea3af3660c1, 3309619d-54b9-4499-8afd-ed7819339b64, 2af1f08c-976c-4d51-9cf6-2cc371788844, 76fbec8d-cebf-40e5-81d5-447170c4a836, c6c10a83-9b6c-449f-a5c3-63b36a3d749b, 436fe308-283f-43b0-a4f0-159c05193d97, eb9e5b9f-41d9-42dc-8e49-13ec4771abad, 4d062242-1477-40fd-bf11-835e6bd62c10, 1f3d774d-7626-4722-b4b8-7dedbaa35803, 12f73035-cf94-4e6c-b2b6-05946ab06aef, b4ec30ef-6724-467e-b42a-d54399198f32, 57e7acf8-b3f0-436b-a236-0b1d6e76ba18, 5697a317-d0e6-402b-9369-7f0e732cc1fa, 5d0cb620-f8ba-4b9d-a81b-0ba333584508, 6b14e208-1319-4bc4-927b-b76a4893d373, 0b44d911-c69e-4030-b481-84e947fe6c70, dc5605d2-bb6a-48d2-b83a-3d77b7e608af, a68819c0-7508-467e-9fc1-60cbf8aaf9e1, b908d302-ce87-4a52-8ef2-b595fac2c67e, 7b94201f-ef5b-4629-ae2f-acf894cb1fcf, dc8bf23f-b598-48c6-bb2d-c5098d264622, 3a8ebc46-6261-4977-8a60-038c99c255d7, 40ab9d3d-ee6c-4f0c-bf38-345385130285, 47792a90-bea8-432a-981f-232bf67288c8, 97b815f3-636b-4424-8be4-3d95bcf32b40, 0d2f2250-9a43-47ce-beef-32371986f798, 46c95b7f-a79e-41ee-8565-578f9c3c20e5, 1ff98a35-3468-4b70-84fc-fe71aed99a75, 73ff8fc2-ca4d-4ea1-887b-0d31190cfe36, f8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-control-model

Sample Inference

Verify the pipeline is deployed properly with a sample inference.


sample_count = pd.read_csv('../data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)

display(results)
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 18:00:43.307[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295]1292.5714290

Forecast Sample Orchestration

The orchestration that will automate this process is ./resources/forecast-orchestration.zip. The files used are stored in the directory forecast-orchestration, created with the command:

zip -r forecast-bigquery-connection.zip forecast-orchestration/.

This contains the following:

  • requirements.txt: The Python requirements file to specify the following libraries used. For this example, that will be empty since we will be using the
  • main.py: The entry file that uses a deployed pipeline and performs an inference request against it visible from its log files.
  • data/testdata_dict.json: An inference input file.

The main.py script performs a workspace and pipeline retrieval, then an inference against the inference input file.

import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd

wl = wallaroo.Client()

# get the arguments
arguments = wl.task_args()

if "workspace_name" in arguments:
    workspace_name = arguments['workspace_name']
else:
    workspace_name="forecast-model-tutorial"

if "pipeline_name" in arguments:
    pipeline_name = arguments['pipeline_name']
else:
    pipeline_name="bikedaypipe"

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        print(f"Pipeline not found:{name}")
    return pipeline

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

print(f"Workspace: {workspace_name}")
workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
print(workspace)

# the pipeline is assumed to be deployed
print(f"Pipeline: {pipeline_name}")
pipeline = get_pipeline(pipeline_name)
print(pipeline)

print(pipeline.status())

sample_count = pd.read_csv('./data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)

print(results)

A few things to go over here. You’ll notice this is almost the exact procedures we’ve been following so far: we get a workspace and pipeline, pull data from a CSV file, and perform an inference off the data.

This script assumes that the pipeline has already been deployed, and also includes this part:

arguments = wl.task_args()

This allows us to pass arguments into a Task created from an Orchestration, so we can specify a different workspace, pipeline, or any other arguments we construct. This allows orchestrations to be very flexible.

Also, notice that it refers to a specific file:

sample_count = pd.read_csv('./data/test_data.csv')

In the forecast-orchestration directory is the data directory with our sample CSV file. Orchestrations can include additional artifacts. We could have used a Wallaroo Connection instead, and we encourage you to try that if you want.

Upload the Orchestration

Orchestrations are uploaded with the Wallaroo client upload_orchestration(path) method with the following parameters.

ParameterTypeDescription
pathstring (Required)The path to the ZIP file to be uploaded.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed. A typical orchestration upload looks like this:

my_orchestration = wl.upload_orchestration(path-to-zip-file)

Try uploading our orchestration from ./forecast-orchestration/forecast-orchestration.zip - or make your own and upload it.

Once uploaded, you can check the status with the status(). If using the orchestration example above, that would be my_orchestration.status() This is handy to make into a loop to check the status until is shows ready.

orchestration = wl.upload_orchestration(name="forecast example", path="../forecast-orchestration/forecast-orchestration.zip")

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(5)
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging

List Orchestrations

Orchestrations are listed with the Wallaroo Client list_orchestrations() method. Orchestrations can be retrieved to a variable by allocated their position in the array - for example: orchestration = wl.list_orchestrations()[0] would return the first orchestration on the list.

# list orchestration here

wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
fc4fd8cb-a108-404b-8ef9-8a1c9e279bb7statsmodel-orchestration 5readyforecast-orchestration.zip2c1f30...0f07612023-02-Aug 20:50:472023-02-Aug 20:51:36
f8cccfd4-5ef3-49f7-a0e0-4bbccc0fc664statsmodel-orchestration 6readyforecast-orchestration.zip1b675d...3a4a322023-02-Aug 21:07:512023-02-Aug 21:08:37
8a476448-06da-43b8-96a6-6f4b492973b0statsmodel-orchestration 6readyforecast-orchestration.zip1b675d...3a4a322023-02-Aug 21:13:182023-02-Aug 21:14:02
db9cdef8-4171-43c2-97ae-2188c7d29b41statsmodel-orchestration 6readyforecast-orchestration.zip1b675d...3a4a322023-02-Aug 21:17:332023-02-Aug 21:18:17
07bfa923-1372-4560-aa5b-c2d1e27f79bfforecast examplereadyforecast-orchestration.zip1a93aa...6f73f52023-03-Aug 18:01:492023-03-Aug 18:02:33
7f4cbd97-5064-46a4-93a8-1b4f500ad7a8forecast examplereadyforecast-orchestration.zipd38397...fcf8032023-03-Aug 18:04:472023-03-Aug 18:05:31
# retrieve the orchestration from the list

orchestration_from_list = wl.list_orchestration()[-1]
orchestration_from_list
FieldValue
ID7f4cbd97-5064-46a4-93a8-1b4f500ad7a8
Nameforecast example
File Nameforecast-orchestration.zip
SHAd38397d19fa05339a7884cd324208515d3ef2cdc85542af31290c45176fcf803
Statusready
Created At2023-03-Aug 18:04:47
Updated At2023-03-Aug 18:05:31

Create the Task

The orchestration is now ready to be implemented as a Wallaroo Task. We’ll just run it once as an example. This specific Orchestration that creates the Task assumes that the pipeline is deployed, and accepts the arguments:

  • workspace_name
  • pipeline_name

Tasks are either Run Once, or Run Scheduled. We create a new task from the Orchestration with either run_once(task_name, json_args, timeout) or with run_scheduled(name, timeout,schedule,json_args). The schedule is based on the Kubernetes cron scheduler. For example:

schedule={'42 * * * *'}

Runs every 42 minutes and contains the answer to life, the universe, and everything.

Creating a scheduled task might be:

task_scheduled = orchestration.run_scheduled(name="schedule example", 
                                             timeout=600, 
                                             schedule=schedule, 
                                             json_args={"workspace_name": workspace_name, 
                                                        "pipeline_name": pipeline_name})

Using the uploaded orchestration, create a Run Once task using your workspace and pipeline names as the json_args.

# create the task here

task = orchestration.run_once(name="forecast single run", 
                              json_args={"workspace_name":workspace_name,
                                         "pipeline_name":pipeline_name}
                              )

Monitor Run with Task Status

We’ll monitor the run first with it’s status with the Task.status() command.

Get the status of the task, and once it is started proceed to the next step. Try doing it as a while loop if you feel confident.

while task.status() != "started":
    display(task.status())
    time.sleep(5)
'pending'

List Tasks

We’ll use the Wallaroo client list_tasks method to view the tasks currently running or scheduled.

wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
b858db7a-fb70-4bb6-b4bb-49b48cefe504forecast single runsuccessTemporary RunTrue-2023-03-Aug 18:05:362023-03-Aug 18:05:41
b1ff0ce6-f2cc-4613-91ce-b3a676165c8dforecast single runfailureTemporary RunTrue-2023-03-Aug 18:02:382023-03-Aug 18:02:43
d897ef95-911e-42ee-a874-2a7435b5ca77statsmodel single run finalesuccessTemporary RunTrue-2023-02-Aug 21:18:192023-02-Aug 21:18:30
f406497a-d8c1-4b20-8fe9-d83c8102da40statsmodel single run finalesuccessTemporary RunTrue-2023-02-Aug 21:08:422023-02-Aug 21:08:48
7117f780-5fc4-476a-a5d2-0654fdb6271fstatsmodel single run finalefailureTemporary RunTrue-2023-02-Aug 20:55:172023-02-Aug 20:55:23
f209c52a-88e2-43e3-a614-b08a35b72a94statsmodel single run finalefailureTemporary RunTrue-2023-02-Aug 20:52:242023-02-Aug 20:52:35

Display Task Run Results

The Task Run is the implementation of the task - the actual running of the script and it’s results. Tasks that are Run Once will only have one Task Run, while a Task set to Run Scheduled will have a Task Run for each time the task is executed. Each Task Run has its own set of logs and results that are monitored through the Task Run logs() method.

First, get the Task Run - this is the actual execution of a Task. The Task is the scheduled run of an Orchestration. The Task Run is the implementation of a scheduled Task. A Run Once Task while generate one Task Run, while a Scheduled Task generated a new Task Run every time the schedule pattern is met until the Task is killed.

We retrieve the task runs with the Task last_runs() method, and assign a single Task Run to a variable by selecting it with the list with last_runs()[index]. If you only have one Task Run from a Task, then you can just set the index to 0.

Retrieve the task run for our generated task, then start checking the logs for our task run. It may take longer than 30 seconds to launch the task, so be prepared to run the .logs() method again to view the logs.

task
FieldValue
IDb858db7a-fb70-4bb6-b4bb-49b48cefe504
Nameforecast single run
Last Run Statussuccess
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-03-Aug 18:05:36
Updated At2023-03-Aug 18:05:41
statsmodel_task_run = task.last_runs()[0]

The Task Run Status is checked with the _status method. This lets you know if there was a failure or if it ran successfully. If it didn’t, you can still get the task run logs to find out why.

statsmodel_task_run._status
'success'

Retrieve Task Run Logs

The Task Run logs are retrieved with the Wallaroo task runs log(), and shows the outputs of the results. This is why it’s useful to have print commands in your code to track what it’s doing.

statsmodel_task_run.logs()
2023-03-Aug 18:05:43 Workspace: forecast-model-tutorialjohn
2023-03-Aug 18:05:43 {'name': 'forecast-model-tutorialjohn', 'id': 16, 'archived': False, 'created_by': '0a36fba2-ad42-441b-9a8c-bac8c68d13fa', 'created_at': '2023-08-02T15:50:52.816795+00:00', 'models': [{'name': 'forecast-control-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 1, 11, 50, 568151, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 54, 223186, tzinfo=tzutc())}, {'name': 'forecast-challenger01-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 13, 55, 23, 119224, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 55, 208179, tzinfo=tzutc())}, {'name': 'forecast-challenger02-model', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 3, 13, 55, 24, 133756, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 2, 15, 50, 56, 291043, tzinfo=tzutc())}], 'pipelines': [{'name': 'forecast-tutorial-pipeline', 'create_time': datetime.datetime(2023, 8, 2, 15, 50, 59, 480547, tzinfo=tzutc()), 'definition': '[]'}]}
2023-03-Aug 18:05:43 Pipeline: forecast-tutorial-pipeline
2023-03-Aug 18:05:43                      time  ... check_failures
2023-03-Aug 18:05:43 {'name': 'forecast-tutorial-pipeline', 'create_time': datetime.datetime(2023, 8, 2, 15, 50, 59, 480547, tzinfo=tzutc()), 'definition': '[]'}
2023-03-Aug 18:05:43 {'status': 'Running', 'details': [], 'engines': [{'ip': '10.244.3.225', 'name': 'engine-5fc486bbf7-wklvf', 'status': 'Running', 'reason': None, 'details': [], 'pipeline_statuses': {'pipelines': [{'id': 'forecast-tutorial-pipeline', 'status': 'Running'}]}, 'model_statuses': {'models': [{'name': 'forecast-control-model', 'version': 'ffca51bd-f9c6-40cf-a36b-c6126ce98dd3', 'sha': 'dcbd11947ae1e51f5c882687a0ec2dbcf60c0b0de8e5156cb6f1d669e0a6d76b', 'status': 'Running'}]}}], 'engine_lbs': [{'ip': '10.244.4.151', 'name': 'engine-lb-584f54c899-rbdhr', 'status': 'Running', 'reason': None, 'details': []}], 'sidekicks': []}
2023-03-Aug 18:05:43 
2023-03-Aug 18:05:43 0 2023-08-03 18:05:43.232  ...              0
2023-03-Aug 18:05:43 [1 rows x 5 columns]

You have now walked through setting up a basic assay and running it over historical data.

Congratulations!

In this tutorial you have

  • Deployed a single step house price prediction pipeline and sent data to it.
  • Uploaded an ML Orchestration into Wallaroo.
  • Created a Run Once Task from the Orchestration.
  • Viewed the Task Run’s status generated from the Task.
  • Viewed the Task Run’s logs.

Great job!

Cleaning up.

Now that the tutorial is complete, don’t forget to undeploy your pipeline to free up the resources.

pipeline.undeploy()
Waiting for undeployment - this will take up to 45s .................................... ok
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-02 21:16:55.320303+00:00
deployedFalse
tags
versionsf8188956-8b3e-4479-8b15-e8747fe915a6, 33e5cc2c-2bb2-4dc2-8a9e-c058e60f6163, 5d419693-97cc-461b-b72a-a389ab7a001b, 56c78f52-cba5-415c-913a-fee0e1863a90, a109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-control-model