Retail: Drift Detection

How to use Wallaroo to detect model drift with assays

Tutorial Notebook 3: Observability - Drift Detection

In the previous notebook you learned how to add simple validation rules to a pipeline, to monitor whether outputs (or inputs) stray out of some expected range. In this notebook, you will monitor the distribution of the pipeline’s predictions to see if the model, or the environment that it runs it, has changed.

Preliminaries

In the blocks below we will preload some required libraries; we will also redefine some of the convenience functions that you saw in the previous notebooks.

After that, you should log into Wallaroo and set your working environment to the workspace that you created for this tutorial. Please refer to Notebook 1 to refresh yourself on how to log in and set your working environment to the appropriate workspace.

# preload needed libraries 

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

from IPython.display import display

# used to display DataFrame information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

import json
import datetime
import time

# used for unique connection names

import string
import random
import pyarrow as pa

import sys
 
# setting path - only needed when running this from the `with-code` folder.
sys.path.append('../')

from CVDemoUtils import CVDemo
cvDemo = CVDemo()
cvDemo.COCO_CLASSES_PATH = "../models/coco_classes.pickle"
## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname):
    modellist = wl.get_current_workspace().models()
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]
## blank space to log in and go to correct workspace

## blank space to log in and go to the appropriate workspace

wl = wallaroo.Client()

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'computer-vision-tutorial'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
{'name': 'computer-vision-tutorialjohn', 'id': 20, 'archived': False, 'created_by': '0a36fba2-ad42-441b-9a8c-bac8c68d13fa', 'created_at': '2023-08-04T19:16:04.283819+00:00', 'models': [{'name': 'mobilenet', 'versions': 4, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 4, 19, 42, 22, 155273, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 4, 19, 19, 46, 286247, tzinfo=tzutc())}, {'name': 'cv-post-process-drift-detection', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 4, 19, 42, 23, 233883, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 4, 19, 23, 10, 189958, tzinfo=tzutc())}, {'name': 'resnet', 'versions': 2, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 5, 17, 25, 35, 579362, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 5, 17, 20, 21, 886290, tzinfo=tzutc())}], 'pipelines': [{'name': 'cv-retail-pipeline', 'create_time': datetime.datetime(2023, 8, 4, 19, 23, 11, 179176, tzinfo=tzutc()), 'definition': '[]'}]}

Monitoring for Drift: Shift Happens.

In machine learning, you use data and known answers to train a model to make predictions for new previously unseen data. You do this with the assumption that the future unseen data will be similar to the data used during training: the future will look somewhat like the past.
But the conditions that existed when a model was created, trained and tested can change over time, due to various factors.

A good model should be robust to some amount of change in the environment; however, if the environment changes too much, your models may no longer be making the correct decisions. This situation is known as concept drift; too much drift can obsolete your models, requiring periodic retraining.

Let’s consider the example we’ve been working on: computer vision. You may notice over time that there has been a change in the average confidence of recognizing objects in an image. Perhaps the inventory has changed enough that recognizing a bottle of milk isn’t the same as recognizing a bag of milk. Perhaps tne new images are coming in at a lower quality

In Wallaroo you can monitor your housing model for signs of drift through the model monitoring and insight capability called Assays. Assays help you track changes in the environment that your model operates within, which can affect the model’s outcome. It does this by tracking the model’s predictions and/or the data coming into the model against an established baseline. If the distribution of monitored values in the current observation window differs too much from the baseline distribution, the assay will flag it. The figure below shows an example of a running scheduled assay.

Figure: A daily assay that’s been running for a month. The dots represent the difference between the distribution of values in the daily observation window, and the baseline. When that difference exceeds the specified threshold (indicated by a red dot) an alert is set.

This next set of exercises will walk you through setting up an assay to monitor the predictions of your house price model, in order to detect drift.

NOTE

An assay is a monitoring process that typically runs over an extended, ongoing period of time. For example, one might set up an assay that every day monitors the previous 24 hours’ worth of predictions and compares it to a baseline. For the purposes of these exercises, we’ll be compressing processes what normally would take hours or days into minutes.


Exercise Prep: Create some datasets for demonstrating assays

Because assays are designed to detect changes in distributions, let’s try to set up data with different distributions to test with. Take your houseprice data and create two sets: a set with lower prices, and a set with higher prices. You can split however you choose.

The idea is we will pretend that the set of lower priced houses represent the “typical” mix of houses in the housing portfolio at the time you set your baseline; you will introduce the higher priced houses later, to represent an environmental change when more expensive houses suddenly enter the market.

  • If you are using the pre-provided models to do these exercises, you can use the provided data sets of images that show high confidence versus low confidence images. This is to establish our baseline as a set of known values, so the higher prices will trigger our assay alerts.
sample_images = [
    "../data/images/input/example/coats-jackets.png",
    "../data/images/input/example/dairy_bottles.png",
    "../data/images/input/example/dairy_products.png",
    "../data/images/input/example/example_01.jpg",
    "../data/images/input/example/example_02.jpg",
    "../data/images/input/example/example_03.jpg",
    "../data/images/input/example/example_04.jpg",
    "../data/images/input/example/example_05.jpg",
    "../data/images/input/example/example_06.jpg",
    "../data/images/input/example/example_09.jpg",
    "../data/images/input/example/example_dress.jpg",
    "../data/images/input/example/example_shoe.jpg",
    "../data/images/input/example/frame5244.jpg",
    "../data/images/input/example/frame8744.jpg",
    "../data/images/input/example/product_cheeses.png",
    "../data/images/input/example/store-front.png",
    "../data/images/input/example/vegtables_poor_identification.jpg"
]

# high confidence images
baseline_images = [
    "../data/images/input/example/example_02.jpg",
    "../data/images/input/example/example_05.jpg",
    "../data/images/input/example/example_dress.jpg",
]

# low confidence images
bad_images = [
    "../data/images/input/example/coats-jackets.png",
    "../data/images/input/example/frame8744.jpg",
]
# blank spot to split or download data

import datetime
import time

startBaseline = datetime.datetime.now()
startTime = time.time()

sample_images = [
    "../data/images/input/example/coats-jackets.png",
    "../data/images/input/example/dairy_bottles.png",
    "../data/images/input/example/dairy_products.png",
    "../data/images/input/example/example_01.jpg",
    "../data/images/input/example/example_02.jpg",
    "../data/images/input/example/example_03.jpg",
    "../data/images/input/example/example_04.jpg",
    "../data/images/input/example/example_05.jpg",
    "../data/images/input/example/example_06.jpg",
    "../data/images/input/example/example_09.jpg",
    "../data/images/input/example/example_dress.jpg",
    "../data/images/input/example/example_shoe.jpg",
    "../data/images/input/example/frame5244.jpg",
    "../data/images/input/example/frame8744.jpg",
    "../data/images/input/example/product_cheeses.png",
    "../data/images/input/example/store-front.png",
    "../data/images/input/example/vegtables_poor_identification.jpg"
]

baseline_images = [
    "../data/images/input/example/example_02.jpg",
    "../data/images/input/example/example_05.jpg",
    "../data/images/input/example/example_dress.jpg",
]

bad_images = [
    "../data/images/input/example/coats-jackets.png",
    "../data/images/input/example/frame8744.jpg",
]

We will use this data to set up some “historical data” in the computer vision pipeline that you build in the assay exercises.

Setting up a baseline for the assay

In order to know whether the distribution of your model’s predictions have changed, you need a baseline to compare them to. This baseline should represent how you expect the model to behave at the time it was trained. This might be approximated by the distribution of the model’s predictions over some “typical” period of time. For example, we might collect the predictions of our model over the first few days after it’s been deployed. For these exercises, we’ll compress that to a few minutes. Currently, to set up a wallaroo assay the pipeline must have been running for some period of time, and the assumption is that this period of time is “typical”, and that the distributions of the inputs and the outputs of the model during this period of time are “typical.”

Exercise Prep: Run some inferences and set some time stamps

Here, we simulate having a pipeline that’s been running for a long enough period of time to set up an assay.

To send enough data through the pipeline to create assays, you execute something like the following code (using the appropriate names for your pipelines and models). Note that this step will take a little while, because of the sleep interval.

You will need the timestamps baseline_start, and baseline_end, for the next exercises.

# get your pipeline (in this example named "mypipeline")

pipeline = get_pipeline("mypipeline")
pipeline.deploy()

## Run some baseline data
# Where the baseline data will start
baseline_start = datetime.datetime.now()

# the number of samples we'll use for the baseline
nsample = 500

# Wait 30 seconds to set this data apart from the rest
# then send the data in batch
time.sleep(30)

# get a sample
# convert all of these into a parallel infer list, then one parallel infer to cut down time
for image in baseline_images:
    width, height = 640, 480
    dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image, 
                                                              width, 
                                                              height
                                                              )
    
    inference_list.append(dfImage)
parallel_results = await pipeline.parallel_infer(tensor_list=inference_list, timeout=60, num_parallel=2, retries=2)
# Set the baseline end
baseline_end = datetime.datetime.now()
mobilenet_model_name = 'mobilenet'
mobilenet_model_path = "../models/mobilenet.pt.onnx"

mobilenet_model = wl.upload_model(mobilenet_model_name, 
                                  mobilenet_model_path, 
                                  framework=Framework.ONNX).configure('onnx', 
                                                                      batch_config="single"
                                                                      )

# upload python step

field_boxes = pa.field('boxes', pa.list_(pa.list_(pa.float64(), 4)))
field_classes = pa.field('classes', pa.list_(pa.int32()))
field_confidences =  pa.field('confidences', pa.list_(pa.float64()))

# field_boxes - will have a flattened array of the 4 coordinates representing the boxes.  128 entries
# field_classes - will have 32 entries
# field_confidences - will have 32 entries
input_schema = pa.schema([field_boxes, field_classes, field_confidences])

output_schema = pa.schema([
    field_boxes,
    field_classes,
    field_confidences,
    pa.field('avg_conf', pa.list_(pa.float64()))
])

module_post_process_model = wl.upload_model("cv-post-process-drift-detection", 
                                      "../models/post-process-drift-detection-arrow.py",
                                      framework=Framework.PYTHON ).configure('python', 
                                                                             input_schema=input_schema, 
                                                                             output_schema=output_schema
                                    )
# set the pipeline step
pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(mobilenet_model)
pipeline.add_model_step(module_post_process_model)
pipeline.deploy()
pipeline.steps()
[{'ModelInference': {'models': [{'name': 'mobilenet', 'version': '484fffe8-70fe-44b9-937f-e98838bcc245', 'sha': '9044c970ee061cc47e0c77e20b05e884be37f2a20aa9c0c3ce1993dbd486a830'}]}},
 {'ModelInference': {'models': [{'name': 'cv-post-process-drift-detection', 'version': '3ae7dfc2-1b3c-44d3-9957-97c5704e3592', 'sha': 'f60c8ca55c6350d23a4e76d24cc3e5922616090686e88c875fadd6e79c403be5'}]}}]
# test inference

image = '../data/images/input/example/dairy_bottles.png'

width, height = 640, 480
dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image, 
                                                              width, 
                                                              height
                                                              )
results = pipeline.infer(dfImage, timeout=60)
display(results)
timein.tensorout.avg_confout.boxesout.classesout.confidencescheck_failures
02023-08-05 18:03:13.810[0.9372549057, 0.9529411793, 0.9490196109, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9490196109, 0.9490196109, 0.9529411793, 0.9529411793, 0.9490196109, 0.9607843161, 0.9686274529, 0.9647058845, 0.9686274529, 0.9647058845, 0.9568627477, 0.9607843161, 0.9647058845, 0.9647058845, 0.9607843161, 0.9647058845, 0.9725490212, 0.9568627477, 0.9607843161, 0.9176470637, 0.9568627477, 0.9176470637, 0.8784313798, 0.8941176534, 0.8431372643, 0.8784313798, 0.8627451062, 0.850980401, 0.9254902005, 0.8470588326, 0.9686274529, 0.8941176534, 0.8196078539, 0.850980401, 0.9294117689, 0.8666666746, 0.8784313798, 0.8666666746, 0.9647058845, 0.9764705896, 0.980392158, 0.9764705896, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.980392158, 0.8941176534, 0.4823529422, 0.4627451003, 0.4313725531, 0.270588249, 0.2588235438, 0.2941176593, 0.3450980484, 0.3686274588, 0.4117647111, 0.4549019635, 0.4862745106, 0.5254902244, 0.5607843399, 0.6039215922, 0.6470588446, 0.6862745285, 0.721568644, 0.7450980544, 0.7490196228, 0.7882353067, 0.8666666746, 0.980392158, 0.9882352948, 0.9686274529, 0.9647058845, 0.9686274529, 0.9725490212, 0.9647058845, 0.9607843161, 0.9607843161, 0.9607843161, 0.9607843161, ...][0.2895053208114637][[0.0, 210.29010009765625, 85.26463317871094, 479.074951171875], [72.03781127929688, 197.32269287109375, 151.44223022460938, 468.4322509765625], [211.2801513671875, 184.72837829589844, 277.2192077636719, 420.4274597167969], [143.23904418945312, 203.83004760742188, 216.85546875, 448.8880920410156], [13.095015525817873, 41.91339111328125, 640.0, 480.0], [106.51464080810548, 206.14498901367188, 159.5464324951172, 463.9675598144531], [278.0636901855469, 1.521723747253418, 321.45782470703125, 93.56348419189452], [462.3183288574219, 104.16201782226562, 510.53961181640625, 224.75314331054688], [310.4559020996094, 1.395981788635254, 352.8512878417969, 94.1238250732422], [528.0485229492188, 268.4224853515625, 636.2671508789062, 475.7666015625], [220.06292724609375, 0.513851165771484, 258.3183288574219, 90.18019104003906], [552.8711547851562, 96.30235290527344, 600.7255859375, 233.53384399414065], [349.24072265625, 0.270343780517578, 404.1732482910156, 98.6802215576172], [450.8934631347656, 264.235595703125, 619.603271484375, 472.6517333984375], [261.51385498046875, 193.4335021972656, 307.17913818359375, 408.7524719238281], [509.2201843261719, 101.16539001464844, 544.1857299804688, 235.7373962402344], [592.482421875, 100.38687133789062, 633.7798461914062, 239.1343231201172], [475.5420837402344, 297.6141052246094, 551.0543823242188, 468.0154724121094], [368.81982421875, 163.61407470703125, 423.909423828125, 362.7887878417969], [120.66899871826172, 0.0, 175.9362030029297, 81.77457427978516], [72.48429107666016, 0.0, 143.5078887939453, 85.46980285644531], [271.1268615722656, 200.891845703125, 305.6260070800781, 274.5953674316406], [161.80728149414062, 0.0, 213.0830841064453, 85.42828369140625], [162.13323974609375, 0.0, 214.6081390380859, 83.81443786621094], [310.8910827636719, 190.9546813964844, 367.3292541503906, 397.2813720703125], [396.67083740234375, 166.49578857421875, 441.5286254882813, 360.07525634765625], [439.2252807617187, 256.26361083984375, 640.0, 473.165771484375], [544.5545654296875, 375.67974853515625, 636.8954467773438, 472.8069458007813], [272.79437255859375, 2.753482818603515, 306.8874206542969, 96.72763061523438], [453.7723388671875, 303.7969665527344, 524.5588989257812, 463.2135009765625], [609.8508911132812, 94.62291717529295, 635.7705078125, 211.13577270507812]][44, 44, 44, 44, 82, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 84, 84, 44, 84, 44, 44, 44, 61, 44, 86, 44, 44][0.98649001121521, 0.9011535644531251, 0.607784628868103, 0.592232286930084, 0.5372903347015381, 0.45131680369377103, 0.43728515505790705, 0.43094053864479004, 0.40848338603973305, 0.39185276627540505, 0.35759133100509605, 0.31812658905982905, 0.26451286673545804, 0.23062895238399503, 0.204820647835731, 0.17462100088596302, 0.173138618469238, 0.159995809197425, 0.14913696050643901, 0.136640205979347, 0.133227065205574, 0.12218642979860302, 0.12130125612020401, 0.11956108361482601, 0.115278266370296, 0.09616333246231001, 0.08654832839965801, 0.078406944870948, 0.07234089076519, 0.06282090395689001, 0.052787985652685006]0

Before setting up an assay on this pipeline’s output, we may want to look at the distribution of the predictions over our selected baseline period. To do that, we’ll create an assay_builder that specifies the pipeline, the model in the pipeline, and the baseline period.. We’ll also specify that we want to look at the output of the model, which in the example code is named variable, and would appear as out.variable in the logs.

# print out one of the logs to get the name of the output variable
display(pipeline.logs(limit=1))

# get the model name directly off the pipeline (you could just hard code this, if you know the name)

model_name = pipeline.model_configs()[0].model().name()

assay_builder = ( wl.build_assay(assay_name, pipeline, model_name, 
                     baseline_start, baseline_end)
                    .add_iopath("output variable 0") ) # specify that we are looking at the first output of the output variable "variable"

where baseline_start and baseline_end are the beginning and end of the baseline periods as datetime.datetime objects.

You can then examine the distribution of variable over the baseline period:

assay_builder.baseline_histogram()

Exercise: Create an assay builder and set a baseline

Create an assay builder to monitor the output of your house price pipeline. The baseline period should be from baseline_start to baseline_end.

  • You will need to know the name of your output variable, and the name of the model in the pipeline.

Examine the baseline distribution.

## Blank space to create an assay builder and examine the baseline distribution

# now build the actual baseline

# # do an inference off each image

image_confidence = pd.DataFrame(columns=["path", "confidence"])

inference_list = []

# convert all of these into a parallel infer list, then one parallel infer to cut down time
for image in baseline_images:
    width, height = 640, 480
    dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image, 
                                                              width, 
                                                              height
                                                              )
    
    inference_list.append(dfImage)
    
display(inference_list)
startTime = time.time()
parallel_results = await pipeline.parallel_infer(tensor_list=inference_list, timeout=60, num_parallel=2, retries=2)
display(parallel_results)
endTime = time.time()

endBaseline = datetime.datetime.now()
[