Retail: Drift Detection
Tutorial Notebook 3: Observability - Drift Detection
In the previous notebook you learned how to add simple validation rules to a pipeline, to monitor whether outputs (or inputs) stray out of some expected range. In this notebook, you will monitor the distribution of the pipeline’s predictions to see if the model, or the environment that it runs it, has changed.
Preliminaries
In the blocks below we will preload some required libraries; we will also redefine some of the convenience functions that you saw in the previous notebooks.
After that, you should log into Wallaroo and set your working environment to the workspace that you created for this tutorial. Please refer to Notebook 1 to refresh yourself on how to log in and set your working environment to the appropriate workspace.
# preload needed libraries
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
from IPython.display import display
# used to display DataFrame information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)
import json
import datetime
import time
# used for unique connection names
import string
import random
import pyarrow as pa
import sys
# setting path - only needed when running this from the `with-code` folder.
sys.path.append('../')
from CVDemoUtils import CVDemo
cvDemo = CVDemo()
cvDemo.COCO_CLASSES_PATH = "../models/coco_classes.pickle"
## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl
# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
# pull a single datum from a data frame
# and convert it to the format the model expects
def get_singleton(df, i):
singleton = df.iloc[i,:].to_numpy().tolist()
sdict = {'tensor': [singleton]}
return pd.DataFrame.from_dict(sdict)
# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
last = first + nrows
batch = df.iloc[first:last, :].to_numpy().tolist()
return pd.DataFrame.from_dict({'tensor': batch})
# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model
def get_singleton_forecast(df, field):
singleton = pd.DataFrame({field: [df[field].values.tolist()]})
return singleton
# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace
def get_model(mname):
modellist = wl.get_current_workspace().models()
model = [m.versions()[-1] for m in modellist if m.name() == mname]
if len(model) <= 0:
raise KeyError(f"model {mname} not found in this workspace")
return model[0]
# get a pipeline by name in the workspace
def get_pipeline(pname):
plist = wl.get_current_workspace().pipelines()
pipeline = [p for p in plist if p.name() == pname]
if len(pipeline) <= 0:
raise KeyError(f"pipeline {pname} not found in this workspace")
return pipeline[0]
## blank space to log in and go to correct workspace
## blank space to log in and go to the appropriate workspace
wl = wallaroo.Client()
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = f'computer-vision-tutorial'
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
{'name': 'computer-vision-tutorialjohn', 'id': 20, 'archived': False, 'created_by': '0a36fba2-ad42-441b-9a8c-bac8c68d13fa', 'created_at': '2023-08-04T19:16:04.283819+00:00', 'models': [{'name': 'mobilenet', 'versions': 4, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 4, 19, 42, 22, 155273, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 4, 19, 19, 46, 286247, tzinfo=tzutc())}, {'name': 'cv-post-process-drift-detection', 'versions': 3, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 4, 19, 42, 23, 233883, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 4, 19, 23, 10, 189958, tzinfo=tzutc())}, {'name': 'resnet', 'versions': 2, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 8, 5, 17, 25, 35, 579362, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 8, 5, 17, 20, 21, 886290, tzinfo=tzutc())}], 'pipelines': [{'name': 'cv-retail-pipeline', 'create_time': datetime.datetime(2023, 8, 4, 19, 23, 11, 179176, tzinfo=tzutc()), 'definition': '[]'}]}
Monitoring for Drift: Shift Happens.
In machine learning, you use data and known answers to train a model to make predictions for new previously unseen data. You do this with the assumption that the future unseen data will be similar to the data used during training: the future will look somewhat like the past.
But the conditions that existed when a model was created, trained and tested can change over time, due to various factors.
A good model should be robust to some amount of change in the environment; however, if the environment changes too much, your models may no longer be making the correct decisions. This situation is known as concept drift; too much drift can obsolete your models, requiring periodic retraining.
Let’s consider the example we’ve been working on: computer vision. You may notice over time that there has been a change in the average confidence of recognizing objects in an image. Perhaps the inventory has changed enough that recognizing a bottle of milk isn’t the same as recognizing a bag of milk. Perhaps tne new images are coming in at a lower quality
In Wallaroo you can monitor your housing model for signs of drift through the model monitoring and insight capability called Assays. Assays help you track changes in the environment that your model operates within, which can affect the model’s outcome. It does this by tracking the model’s predictions and/or the data coming into the model against an established baseline. If the distribution of monitored values in the current observation window differs too much from the baseline distribution, the assay will flag it. The figure below shows an example of a running scheduled assay.
Figure: A daily assay that’s been running for a month. The dots represent the difference between the distribution of values in the daily observation window, and the baseline. When that difference exceeds the specified threshold (indicated by a red dot) an alert is set.
This next set of exercises will walk you through setting up an assay to monitor the predictions of your house price model, in order to detect drift.
NOTE
An assay is a monitoring process that typically runs over an extended, ongoing period of time. For example, one might set up an assay that every day monitors the previous 24 hours’ worth of predictions and compares it to a baseline. For the purposes of these exercises, we’ll be compressing processes what normally would take hours or days into minutes.
Exercise Prep: Create some datasets for demonstrating assays
Because assays are designed to detect changes in distributions, let’s try to set up data with different distributions to test with. Take your houseprice data and create two sets: a set with lower prices, and a set with higher prices. You can split however you choose.
The idea is we will pretend that the set of lower priced houses represent the “typical” mix of houses in the housing portfolio at the time you set your baseline; you will introduce the higher priced houses later, to represent an environmental change when more expensive houses suddenly enter the market.
- If you are using the pre-provided models to do these exercises, you can use the provided data sets of images that show high confidence versus low confidence images. This is to establish our baseline as a set of known values, so the higher prices will trigger our assay alerts.
sample_images = [
"../data/images/input/example/coats-jackets.png",
"../data/images/input/example/dairy_bottles.png",
"../data/images/input/example/dairy_products.png",
"../data/images/input/example/example_01.jpg",
"../data/images/input/example/example_02.jpg",
"../data/images/input/example/example_03.jpg",
"../data/images/input/example/example_04.jpg",
"../data/images/input/example/example_05.jpg",
"../data/images/input/example/example_06.jpg",
"../data/images/input/example/example_09.jpg",
"../data/images/input/example/example_dress.jpg",
"../data/images/input/example/example_shoe.jpg",
"../data/images/input/example/frame5244.jpg",
"../data/images/input/example/frame8744.jpg",
"../data/images/input/example/product_cheeses.png",
"../data/images/input/example/store-front.png",
"../data/images/input/example/vegtables_poor_identification.jpg"
]
# high confidence images
baseline_images = [
"../data/images/input/example/example_02.jpg",
"../data/images/input/example/example_05.jpg",
"../data/images/input/example/example_dress.jpg",
]
# low confidence images
bad_images = [
"../data/images/input/example/coats-jackets.png",
"../data/images/input/example/frame8744.jpg",
]
# blank spot to split or download data
import datetime
import time
startBaseline = datetime.datetime.now()
startTime = time.time()
sample_images = [
"../data/images/input/example/coats-jackets.png",
"../data/images/input/example/dairy_bottles.png",
"../data/images/input/example/dairy_products.png",
"../data/images/input/example/example_01.jpg",
"../data/images/input/example/example_02.jpg",
"../data/images/input/example/example_03.jpg",
"../data/images/input/example/example_04.jpg",
"../data/images/input/example/example_05.jpg",
"../data/images/input/example/example_06.jpg",
"../data/images/input/example/example_09.jpg",
"../data/images/input/example/example_dress.jpg",
"../data/images/input/example/example_shoe.jpg",
"../data/images/input/example/frame5244.jpg",
"../data/images/input/example/frame8744.jpg",
"../data/images/input/example/product_cheeses.png",
"../data/images/input/example/store-front.png",
"../data/images/input/example/vegtables_poor_identification.jpg"
]
baseline_images = [
"../data/images/input/example/example_02.jpg",
"../data/images/input/example/example_05.jpg",
"../data/images/input/example/example_dress.jpg",
]
bad_images = [
"../data/images/input/example/coats-jackets.png",
"../data/images/input/example/frame8744.jpg",
]
We will use this data to set up some “historical data” in the computer vision pipeline that you build in the assay exercises.
Setting up a baseline for the assay
In order to know whether the distribution of your model’s predictions have changed, you need a baseline to compare them to. This baseline should represent how you expect the model to behave at the time it was trained. This might be approximated by the distribution of the model’s predictions over some “typical” period of time. For example, we might collect the predictions of our model over the first few days after it’s been deployed. For these exercises, we’ll compress that to a few minutes. Currently, to set up a wallaroo assay the pipeline must have been running for some period of time, and the assumption is that this period of time is “typical”, and that the distributions of the inputs and the outputs of the model during this period of time are “typical.”
Exercise Prep: Run some inferences and set some time stamps
Here, we simulate having a pipeline that’s been running for a long enough period of time to set up an assay.
To send enough data through the pipeline to create assays, you execute something like the following code (using the appropriate names for your pipelines and models). Note that this step will take a little while, because of the sleep
interval.
You will need the timestamps baseline_start
, and baseline_end
, for the next exercises.
# get your pipeline (in this example named "mypipeline")
pipeline = get_pipeline("mypipeline")
pipeline.deploy()
## Run some baseline data
# Where the baseline data will start
baseline_start = datetime.datetime.now()
# the number of samples we'll use for the baseline
nsample = 500
# Wait 30 seconds to set this data apart from the rest
# then send the data in batch
time.sleep(30)
# get a sample
# convert all of these into a parallel infer list, then one parallel infer to cut down time
for image in baseline_images:
width, height = 640, 480
dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image,
width,
height
)
inference_list.append(dfImage)
parallel_results = await pipeline.parallel_infer(tensor_list=inference_list, timeout=60, num_parallel=2, retries=2)
# Set the baseline end
baseline_end = datetime.datetime.now()
mobilenet_model_name = 'mobilenet'
mobilenet_model_path = "../models/mobilenet.pt.onnx"
mobilenet_model = wl.upload_model(mobilenet_model_name,
mobilenet_model_path,
framework=Framework.ONNX).configure('onnx',
batch_config="single"
)
# upload python step
field_boxes = pa.field('boxes', pa.list_(pa.list_(pa.float64(), 4)))
field_classes = pa.field('classes', pa.list_(pa.int32()))
field_confidences = pa.field('confidences', pa.list_(pa.float64()))
# field_boxes - will have a flattened array of the 4 coordinates representing the boxes. 128 entries
# field_classes - will have 32 entries
# field_confidences - will have 32 entries
input_schema = pa.schema([field_boxes, field_classes, field_confidences])
output_schema = pa.schema([
field_boxes,
field_classes,
field_confidences,
pa.field('avg_conf', pa.list_(pa.float64()))
])
module_post_process_model = wl.upload_model("cv-post-process-drift-detection",
"../models/post-process-drift-detection-arrow.py",
framework=Framework.PYTHON ).configure('python',
input_schema=input_schema,
output_schema=output_schema
)
# set the pipeline step
pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(mobilenet_model)
pipeline.add_model_step(module_post_process_model)
pipeline.deploy()
pipeline.steps()
[{'ModelInference': {'models': [{'name': 'mobilenet', 'version': '484fffe8-70fe-44b9-937f-e98838bcc245', 'sha': '9044c970ee061cc47e0c77e20b05e884be37f2a20aa9c0c3ce1993dbd486a830'}]}},
{'ModelInference': {'models': [{'name': 'cv-post-process-drift-detection', 'version': '3ae7dfc2-1b3c-44d3-9957-97c5704e3592', 'sha': 'f60c8ca55c6350d23a4e76d24cc3e5922616090686e88c875fadd6e79c403be5'}]}}]
# test inference
image = '../data/images/input/example/dairy_bottles.png'
width, height = 640, 480
dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image,
width,
height
)
results = pipeline.infer(dfImage, timeout=60)
display(results)
time | in.tensor | out.avg_conf | out.boxes | out.classes | out.confidences | check_failures | |
---|---|---|---|---|---|---|---|
0 | 2023-08-05 18:03:13.810 | [0.9372549057, 0.9529411793, 0.9490196109, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9450980425, 0.9490196109, 0.9490196109, 0.9529411793, 0.9529411793, 0.9490196109, 0.9607843161, 0.9686274529, 0.9647058845, 0.9686274529, 0.9647058845, 0.9568627477, 0.9607843161, 0.9647058845, 0.9647058845, 0.9607843161, 0.9647058845, 0.9725490212, 0.9568627477, 0.9607843161, 0.9176470637, 0.9568627477, 0.9176470637, 0.8784313798, 0.8941176534, 0.8431372643, 0.8784313798, 0.8627451062, 0.850980401, 0.9254902005, 0.8470588326, 0.9686274529, 0.8941176534, 0.8196078539, 0.850980401, 0.9294117689, 0.8666666746, 0.8784313798, 0.8666666746, 0.9647058845, 0.9764705896, 0.980392158, 0.9764705896, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.9725490212, 0.980392158, 0.8941176534, 0.4823529422, 0.4627451003, 0.4313725531, 0.270588249, 0.2588235438, 0.2941176593, 0.3450980484, 0.3686274588, 0.4117647111, 0.4549019635, 0.4862745106, 0.5254902244, 0.5607843399, 0.6039215922, 0.6470588446, 0.6862745285, 0.721568644, 0.7450980544, 0.7490196228, 0.7882353067, 0.8666666746, 0.980392158, 0.9882352948, 0.9686274529, 0.9647058845, 0.9686274529, 0.9725490212, 0.9647058845, 0.9607843161, 0.9607843161, 0.9607843161, 0.9607843161, ...] | [0.2895053208114637] | [[0.0, 210.29010009765625, 85.26463317871094, 479.074951171875], [72.03781127929688, 197.32269287109375, 151.44223022460938, 468.4322509765625], [211.2801513671875, 184.72837829589844, 277.2192077636719, 420.4274597167969], [143.23904418945312, 203.83004760742188, 216.85546875, 448.8880920410156], [13.095015525817873, 41.91339111328125, 640.0, 480.0], [106.51464080810548, 206.14498901367188, 159.5464324951172, 463.9675598144531], [278.0636901855469, 1.521723747253418, 321.45782470703125, 93.56348419189452], [462.3183288574219, 104.16201782226562, 510.53961181640625, 224.75314331054688], [310.4559020996094, 1.395981788635254, 352.8512878417969, 94.1238250732422], [528.0485229492188, 268.4224853515625, 636.2671508789062, 475.7666015625], [220.06292724609375, 0.513851165771484, 258.3183288574219, 90.18019104003906], [552.8711547851562, 96.30235290527344, 600.7255859375, 233.53384399414065], [349.24072265625, 0.270343780517578, 404.1732482910156, 98.6802215576172], [450.8934631347656, 264.235595703125, 619.603271484375, 472.6517333984375], [261.51385498046875, 193.4335021972656, 307.17913818359375, 408.7524719238281], [509.2201843261719, 101.16539001464844, 544.1857299804688, 235.7373962402344], [592.482421875, 100.38687133789062, 633.7798461914062, 239.1343231201172], [475.5420837402344, 297.6141052246094, 551.0543823242188, 468.0154724121094], [368.81982421875, 163.61407470703125, 423.909423828125, 362.7887878417969], [120.66899871826172, 0.0, 175.9362030029297, 81.77457427978516], [72.48429107666016, 0.0, 143.5078887939453, 85.46980285644531], [271.1268615722656, 200.891845703125, 305.6260070800781, 274.5953674316406], [161.80728149414062, 0.0, 213.0830841064453, 85.42828369140625], [162.13323974609375, 0.0, 214.6081390380859, 83.81443786621094], [310.8910827636719, 190.9546813964844, 367.3292541503906, 397.2813720703125], [396.67083740234375, 166.49578857421875, 441.5286254882813, 360.07525634765625], [439.2252807617187, 256.26361083984375, 640.0, 473.165771484375], [544.5545654296875, 375.67974853515625, 636.8954467773438, 472.8069458007813], [272.79437255859375, 2.753482818603515, 306.8874206542969, 96.72763061523438], [453.7723388671875, 303.7969665527344, 524.5588989257812, 463.2135009765625], [609.8508911132812, 94.62291717529295, 635.7705078125, 211.13577270507812]] | [44, 44, 44, 44, 82, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 44, 84, 84, 44, 84, 44, 44, 44, 61, 44, 86, 44, 44] | [0.98649001121521, 0.9011535644531251, 0.607784628868103, 0.592232286930084, 0.5372903347015381, 0.45131680369377103, 0.43728515505790705, 0.43094053864479004, 0.40848338603973305, 0.39185276627540505, 0.35759133100509605, 0.31812658905982905, 0.26451286673545804, 0.23062895238399503, 0.204820647835731, 0.17462100088596302, 0.173138618469238, 0.159995809197425, 0.14913696050643901, 0.136640205979347, 0.133227065205574, 0.12218642979860302, 0.12130125612020401, 0.11956108361482601, 0.115278266370296, 0.09616333246231001, 0.08654832839965801, 0.078406944870948, 0.07234089076519, 0.06282090395689001, 0.052787985652685006] | 0 |
Before setting up an assay on this pipeline’s output, we may want to look at the distribution of the predictions over our selected baseline period. To do that, we’ll create an assay_builder that specifies the pipeline, the model in the pipeline, and the baseline period.. We’ll also specify that we want to look at the output of the model, which in the example code is named variable
, and would appear as out.variable
in the logs.
# print out one of the logs to get the name of the output variable
display(pipeline.logs(limit=1))
# get the model name directly off the pipeline (you could just hard code this, if you know the name)
model_name = pipeline.model_configs()[0].model().name()
assay_builder = ( wl.build_assay(assay_name, pipeline, model_name,
baseline_start, baseline_end)
.add_iopath("output variable 0") ) # specify that we are looking at the first output of the output variable "variable"
where baseline_start
and baseline_end
are the beginning and end of the baseline periods as datetime.datetime
objects.
You can then examine the distribution of variable
over the baseline period:
assay_builder.baseline_histogram()
Exercise: Create an assay builder and set a baseline
Create an assay builder to monitor the output of your house price pipeline. The baseline period should be from baseline_start
to baseline_end
.
- You will need to know the name of your output variable, and the name of the model in the pipeline.
Examine the baseline distribution.
## Blank space to create an assay builder and examine the baseline distribution
# now build the actual baseline
# # do an inference off each image
image_confidence = pd.DataFrame(columns=["path", "confidence"])
inference_list = []
# convert all of these into a parallel infer list, then one parallel infer to cut down time
for image in baseline_images:
width, height = 640, 480
dfImage, resizedImage = cvDemo.loadImageAndConvertToDataframe(image,
width,
height
)
inference_list.append(dfImage)
display(inference_list)
startTime = time.time()
parallel_results = await pipeline.parallel_infer(tensor_list=inference_list, timeout=60, num_parallel=2, retries=2)
display(parallel_results)
endTime = time.time()
endBaseline = datetime.datetime.now()
[