Anomaly Testing Tutorial

How to use Wallaroo's pipeline validation feature to detect inference anomalies.

This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.

Anomaly Detection

Wallaroo provides multiple methods of analytical analysis to verify that the data received and generated during an inference is accurate. This tutorial will demonstrate how to use anomaly detection to track the outputs from a sample model to verify that the model is outputting acceptable results.

Anomaly detection allows organizations to set validation parameters in a pipeline. A validation is added to a pipeline to test data based on an expression, and flag any inferences where the validation failed to the InferenceResult object and the pipeline logs.

This tutorial will follow this process in setting up a validation to a pipeline and examining the results:

  1. Create a workspace and upload the sample model.
  2. Establish a pipeline and add the model as a step.
  3. Add a validation to the pipeline.
  4. Perform inferences and display anomalies through the InferenceResult object and the pipeline log files.

This tutorial provides the following:

  • Housing model: ./models/housing.zip - a pretrained model used to determine standard home prices.
  • Test Data: ./data - sample data.

This demonstration assumes that a Wallaroo instance has been installed.

Steps

Import libraries

The first step is to import the libraries needed for this notebook.

import wallaroo
from wallaroo.object import EntityNotFoundError
import os
import json

from IPython.display import display

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)

Connect to Wallaroo Instance

The following command will create a connection to the Wallaroo instance and store it in the variable wl.

# Client connection from local Wallaroo instance

wl = wallaroo.Client()

# SSO login through keycloak

# wallarooPrefix = "sparkly-apple-3026"
# wallarooSuffix = "wallaroo.community"

# wl = wallaroo.Client(api_endpoint=f"https://{wallarooPrefix}.api.{wallarooSuffix}", 
#                     auth_endpoint=f"https://{wallarooPrefix}.keycloak.{wallarooSuffix}", 
#                     auth_type="sso")
import os
# Only set the below to make the OS environment ARROW_ENABLED to TRUE.  Otherwise, leave as is.
# os.environ["ARROW_ENABLED"]="True"

if "ARROW_ENABLED" not in os.environ or os.environ["ARROW_ENABLED"].casefold() == "False".casefold():
    arrowEnabled = False
else:
    arrowEnabled = True
print(arrowEnabled)
True

Create Workspace

We will create a workspace to manage our pipeline and models. The following variables will set the name of our sample workspace then set it as the current workspace.

workspace_name = 'anomalytesting'

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)
{'name': 'anomalyexamples', 'id': 139, 'archived': False, 'created_by': '138bd7e6-4dc8-4dc1-a760-c9e721ef3c37', 'created_at': '2023-03-03T19:11:35.338843+00:00', 'models': [{'name': 'anomaly-housing-model', 'versions': 1, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 3, 3, 19, 11, 35, 799799, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 3, 3, 19, 11, 35, 799799, tzinfo=tzutc())}], 'pipelines': [{'name': 'anomalyhousingpipeline', 'create_time': datetime.datetime(2023, 3, 3, 19, 11, 35, 879127, tzinfo=tzutc()), 'definition': '[]'}]}

Upload The Model

The housing model will be uploaded for use in our pipeline.

housing_model = wl.upload_model("anomaly-housing-model", "./models/housing.zip").configure("tensorflow")

Build the Pipeline and Validation

The pipeline anomaly-housing-pipeline will be created and the anomaly-housing-model added as a step. A validation will be created for outputs greater 100.0. This is interpreted as houses with a value greater than $100 million with the add_validation method. When houses greater than this value are detected, the InferenceObject will add it in the check_failures array with the message “price too high”.

Once complete, the pipeline will be deployed and ready for inferences.

p = wl.build_pipeline('anomalyhousing')
p = p.add_model_step(housing_model)
p = p.add_validation('price too high', housing_model.outputs[0][0] < 100.0)
pipeline = p.deploy()
Waiting for deployment - this will take up to 45s ................ ok

Testing

Two data points will be fed used for an inference.

The first, labeled response_normal, will not trigger an anomaly detection. The other, labeled response_trigger, will trigger the anomaly detection, which will be shown in the InferenceResult check_failures array.

Note that multiple validations can be created to allow for multiple anomalies detected.

if arrowEnabled is True:
    test_input = pd.DataFrame.from_records({"dense_16_input":{"0":[0.02675675,0.0,0.02677953,0.0,0.0010046,0.00951931,0.14795322,0.0027145,0.03550877,0.98536841,0.02988655,0.04031725,0.04298041]}})
else:
    test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145,  0.03550877, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}

response_normal = pipeline.infer(test_input)
display(response_normal)
time in.dense_16_input out.dense_19 check_failures
0 2023-03-03 19:19:44.569 [0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145, 0.03550877, 0.98536841, 0.02988655, 0.04031725, 0.04298041] [10.349835] 0
if arrowEnabled is True:
    test_input = pd.DataFrame.from_records({"dense_16_input":{"0":[0.02675675,0.0,0.02677953,0.0,0.0010046,0.00951931,0.14795322,0.0027145,2,0.98536841,0.02988655,0.04031725,0.04298041]}})
else:
    test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145, 2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}

response_trigger = pipeline.infer(test_input)
display(response_trigger)
time in.dense_16_input out.dense_19 check_failures
0 2023-03-03 19:19:44.657 [0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145, 2.0, 0.98536841, 0.02988655, 0.04031725, 0.04298041] [350.4699] 1

Multiple Tests

With the initial tests run, we can run the inferences against a larger set of data and identify anomalies that appear versus the expected results. These will be displayed into a graph so we can see where the anomalies occur. In this case with the house that came in at $350 million - outside of our validation range.

Note: Because this is splitting one batch inference into 400 separate inferences for this example, it may take longer to run.

if arrowEnabled is True:
    test_data = pd.read_json('./data/test_data_anomaly_df.json', orient="records")
    responses_anomaly = pd.DataFrame()
    # For the first 400 rows, submit that row as a separate DataFrame
    # Add the results to the responses_anomaly dataframe
    for index, row in test_data.head(400).iterrows():
        responses_anomaly = responses_anomaly.append(pipeline.infer(row.to_frame('dense_16_input').reset_index()))
else:
    responses_anomaly =[]
    from data import test_data_anomaly
    for nth in range(400):
        responses_anomaly.extend(pipeline.infer(test_data_anomaly.data[nth]))
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
if arrowEnabled is True:
    houseprices = pd.DataFrame({'sell_price': responses_anomaly['out.dense_19'].apply(lambda x: x[0])})
else:
    houseprices = pd.DataFrame({'sell_price': [r.raw['outputs'][0]['Float']['data'][0] for  r in responses_anomaly]})

houseprices.hist(column='sell_price', bins=50, grid=False, figsize=(12,8))
plt.axvline(x=100, color='gray', ls='--')
_ = plt.title('Distribution of predicted home sales price')
png

How To Check For Anomalies

There are two primary methods for detecting anomalies with Wallaroo:

  • As demonstrated in the example above, from the InferenceObject check_failures array in the output of each inference to see if anything has happened.
  • The other method is to view pipeline’s logs and see what anomalies have been detected.

View Logs

Anomalies can be displayed through the pipeline logs() method. The parameter valid=False will show any validations that were flagged as False - in this case, houses that were above 100 million in value.

logs = pipeline.logs(valid=False)
display(logs)

Undeploy The Pipeline

With the example complete, we undeploy the pipeline to return the resources back to the Wallaroo instance.

pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ..................................... ok
name anomalyhousingpipeline
created 2023-03-03 19:11:35.879127+00:00
last_updated 2023-03-03 19:19:27.462171+00:00
deployed False
tags
versions 649283cf-6a4c-45b5-a6a7-a7c7dada5d84, 58c17376-838f-4121-91c4-4ff6dcb85728, f05819e7-8019-4f5c-ae07-6f74c02450d0, 09f7b6e3-009f-4e0f-b93a-9225975c8fbd
steps anomaly-housing-model