Wallaroo SDK Essentials Guide: Pipeline Management

How to create and manage Wallaroo Pipelines through the Wallaroo SDK

Pipeline Management

Pipelines are the method of taking submitting data and processing that data through the models. Each pipeline can have one or more steps that submit the data from the previous step to the next one. Information can be submitted to a pipeline as a file, or through the pipeline’s URL.

A pipeline’s metrics can be viewed through the Wallaroo Dashboard Pipeline Details and Metrics page.

Create a Pipeline

New pipelines are created in the current workspace.

To create a new pipeline, use the Wallaroo Client build_pipeline("{Pipeline Name}") command.

The following example creates a new pipeline imdb-pipeline through a Wallaroo Client connection wl:

imdb_pipeline = wl.build_pipeline("imdb-pipeline")

imdb_pipeline.status()
{'status': 'Pipeline imdb-pipeline is not deployed'}


List All Pipelines

The Wallaroo Client method list_pipelines() lists all pipelines in a Wallaroo Instance.

The following example lists all pipelines in the wl Wallaroo Client connection:

wl.list_pipelines()

[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]


Select an Existing Pipeline

Rather than creating a new pipeline each time, an existing pipeline can be selected by using the list_pipelines() command and assigning one of the array members to a variable.

The following example sets the pipeline ccfraud-pipeline to the variable current_pipeline:

wl.list_pipelines()

[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]

current_pipeline = wl.list_pipelines()[0]

current_pipeline.status()

{'status': 'Running',
'details': None,
'engines': [{'ip': '10.244.5.4',
'name': 'engine-7fcc7df596-hvlxb',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'ccfraud-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraud-model',
'version': '4624e8a8-1414-4408-8b40-e03da4b5cb68',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.1.24',
'name': 'engine-lb-85846c64f8-mtq9p',
'status': 'Running',
'reason': None}]}


Add a Step to a Pipeline

Once a pipeline has been created, or during its creation process, a pipeline step can be added. The pipeline step refers to the model that will perform an inference off of the data submitted to it. Each time a step is added, it is added to the pipeline’s models array.

A pipeline step is added through the pipeline add_model_step({Model}) command.

In the following example, two models uploaded to the workspace are added as pipeline step:

imdb_pipeline.add_model_step(embedder)
imdb_pipeline.add_model_step(smodel)

imdb_pipeline.status()

{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}


Replace a Pipeline Step

The model specified in a pipeline step can be replaced with the pipeline method replace_with_model_step(index, model).

The following parameters are used for replacing a pipeline step:

Parameter Default Value Purpose
index null The pipeline step to be replaced. Pipeline steps follow array numbering, where the first step is 0, etc.
model null The new model to be used in the pipeline step.

In the following example, a deployed pipeline will have the initial model step replaced with a new one. A status of the pipeline will be displayed after deployment and after the pipeline swap to show the model has been replaced from ccfraudoriginal to ccfraudreplacement, each with their own versions.

pipeline.deploy()

pipeline.status()

{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.2.145',
'name': 'engine-75bfd7dc9d-7p9qk',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraudoriginal',
'version': '3a03dc94-716e-46bb-84c8-91bc99ceb2c3',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.2.144',
'name': 'engine-lb-55dcdff64c-vf74s',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}

pipeline.replace_with_model_step(0, replacement_model).deploy()

pipeline.status()

{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.2.153',
'name': 'engine-96486c95d-zfchr',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraudreplacement',
'version': '714efd19-5c83-42a8-aece-24b4ba530925',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.2.154',
'name': 'engine-lb-55dcdff64c-9np9k',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}


Pre and Post Processing Steps

A Pipeline Step can be more than models - they can also be pre processing and post processing steps. For example, the Demand Curve Tutorial has both a pre and post processing steps that are added to the pipeline. The preprocessing step uses the following code:

import numpy
import pandas

import json

# add interaction terms for the model
def actual_preprocess(pdata):
pd = pdata.copy()
# convert boolean cust_known to 0/1
pd.cust_known = numpy.where(pd.cust_known, 1, 0)
# interact UnitPrice and cust_known
pd['UnitPriceXcust_known'] = pd.UnitPrice * pd.cust_known
return pd.loc[:, ['UnitPrice', 'cust_known', 'UnitPriceXcust_known']]

# If the data is a json string, call this wrapper instead
# Expected input:
# a dictionary with fields 'colnames', 'data'

# test that the code works here
def wallaroo_json(data):
obj = json.loads(data)
pdata = pandas.DataFrame(obj['query'],
columns=obj['colnames'])
pprocessed = actual_preprocess(pdata)

# return a dictionary, with the fields the model expect
return {
'tensor_fields': ['model_input'],
'model_input': pprocessed.to_numpy().tolist()
}


It is added as a Python module by uploading it as a model:

# load the preprocess module
module_pre = wl.upload_model("preprocess", "./preprocess.py").configure('python')


And then added to the pipeline as a step:

# now make a pipeline
demandcurve_pipeline = (wl.build_pipeline("demand-curve-pipeline")
.add_model_step(module_pre)
.add_model_step(demand_curve_model)
.add_model_step(module_post))


Remove a Pipeline Step

To remove a step from the pipeline, use the Pipeline remove_step(index) command, where the index is the array index for the pipeline’s steps.

In the following example the pipeline imdb_pipeline will have the step with the model smodel-o removed.

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.remove_step(1)
{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}]"}


Manage Pipeline Deployment Configuration

Pipelines can be deployed to allocate more or fewer resources through the DeploymentConfig object that sets the pipeline’s autoscaling parameters.

Autoscaling allows the user to define how many engines a pipeline starts with, the minimum amount of engines a pipeline can be using, and the maximum amount of engines a pipeline can scale up to. The pipeline scales up and down based on the average CPU utilization across the engines in a given pipeline as the user’s workload increases and decreases.

This is performed through the wallaroo.DeploymentConfigBuilder method that returns a wallaroo.deployment_config.DeploymentConfig object. The DeploymentConfig is then applied to a Wallaroo pipeline when it is deployed.

The following parameters are used for auto-scaling:

Parameter Default Value Purpose
replica_count 1 Sets the initial amount of engines for the pipeline.
replica_autoscale_min_max 1 Sets the minimum number of engines and the maximum amount of engines. The maximum parameter must be set by the user.
autoscale_cpu_utilization 50 An integer representing the average CPU utilization. The default value is 50, which represents an average of 50% CPU utilization for the engines in a pipeline.

The DeploymentConfig is then built with the a build method.

The following example a DeploymentConfig will be created and saved to the variable ccfraudDeployConfig. It will set the minimum engines to 2, the maximum to 5, and use 60% of CPU utilization. This will then be applied to the deployment of the pipeline ccfraudPipeline by specifying it’s deployment_config parameter.

ccfraudDeployConfig = (wallaroo.DeploymentConfigBuilder()
.replica_count(1)
.replica_autoscale_min_max(minimum=2, maximum=5)
.autoscale_cpu_utilization(60)
.build())

ccfraudPipeline.deploy(deployment_config=ccfraudDeployConfig)


Deploy a Pipeline

When a pipeline step is added or removed, the pipeline must be deployed through the pipeline deploy(). This allocates resources to the pipeline from the Kubernetes environment and make it available to submit information to perform inferences. This process typically takes 45 seconds.

Once complete, the pipeline status() command will show 'status':'Running'.

Pipeline deployments can be modified to enable auto-scaling to allow pipelines to allocate more or fewer resources based on need by setting the pipeline’s This will then be applied to the deployment of the pipelineccfraudPipelineby specifying it'sdeployment_config optional parameter. If this optional parameter is not passed, then the deployment will defer to default values. For more information, see Manage Pipeline Deployment Configuration.

In the following example, the pipeline imdb-pipeline that contains two steps will be deployed with default deployment configuration:

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok

imdb_pipeline.status()

{'status': 'Running',
'details': None,
'engines': [{'ip': '10.12.1.65',
'name': 'engine-778b65459-f9mt5',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'embedder-o',
'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
'status': 'Running'},
{'name': 'smodel-o',
'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.12.1.66',
'name': 'engine-lb-85846c64f8-ggg2t',
'status': 'Running',
'reason': None}]}


Troubleshooting Pipeline Deployment

If you deploy more pipelines than your environment can handle, or if you deploy more pipelines than your license allows, you may see an error like the following:


LimitError: You have reached a license limit in your Wallaroo instance. In order to add additional resources, you can remove some of your existing resources. If you have any questions contact us at community@wallaroo.ai: MAX_PIPELINES_LIMIT_EXCEEDED


Undeploy any unnecessary pipelines either through the SDK or through the Wallaroo Pipeline Dashboard, then attempt to redeploy the pipeline in question again.

Undeploy a Pipeline

When a pipeline is not currently needed, it can be undeployed and its resources turned back to the Kubernetes environment. To undeploy a pipeline, use the pipeline undeploy() command.

In this example, the aloha_pipeline will be undeployed:

aloha_pipeline.undeploy()

{'name': 'aloha-test-demo', 'create_time': datetime.datetime(2022, 3, 29, 20, 34, 3, 960957, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'aloha-2', 'version': 'a8e8abdc-c22f-416c-a13c-5fe162357430', 'sha': 'fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520'}]}}]"}


Get Pipeline Status

The pipeline status() command shows the current status, models, and other information on a pipeline.

The following example shows the pipeline imdb_pipeline status before and after it is deployed:

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok

imdb_pipeline.status()

{'status': 'Running',
'details': None,
'engines': [{'ip': '10.12.1.65',
'name': 'engine-778b65459-f9mt5',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'embedder-o',
'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
'status': 'Running'},
{'name': 'smodel-o',
'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.12.1.66',
'name': 'engine-lb-85846c64f8-ggg2t',
'status': 'Running',
'reason': None}]}


Get Pipeline Logs

Pipeline have their own set of log files that can be retrieved and analyzed as needed with the pipeline.logs(limit=100) command. This command takes the following parameters:

Parameter Type Description
limit Int Limits how many log files to display. Defaults to 100.

Typically this only shows the last 3 commands in a Python notebook for spacing purposes.

In this example, the last 50 logs to the pipeline ccfraudpipeline are requested. Only one is shown for brevity.

ccfraud_pipeline.logs(limit=50)


Timestamp Output Input Anomalies
2022-23-Aug 16:44:56 [array([[0.00149742]])] [[1.0678324729342086, 0.21778102664937624, -1.7115145261843976, 0.6822857209662413, 1.0138553066742804, -0.43350000129006655, 0.7395859436561657, -0.28828395953577357, -0.44726268795990787, 0.5146124987725894, 0.3791316964287545, 0.5190619748123175, -0.4904593221655364, 1.1656456468728569, -0.9776307444180006, -0.6322198962519854, -0.6891477694494687, 0.17833178574255615, 0.1397992467197424, -0.35542206494183326, 0.4394217876939808, 1.4588397511627804, -0.3886829614721505, 0.4353492889350186, 1.7420053483337177, -0.4434654615252943, -0.15157478906219238, -0.26684517248765616, -1.454961775612449]] 0

Anomaly Testing

Anomaly detection allows organizations to set validation parameters. A validation is added to a pipeline to test data based on a specific expression. If the expression is returned as False, this is detected as an anomaly and added to the InferenceResult object’s check_failures array and the pipeline logs.

Anomaly detection consists of the following steps:

• Set a validation: Add a validation to a pipeline that, when returned False, adds an entry to the InferenceResult object’s check_failures attribute with the expression that caused the failure.
• Display anomalies: Anomalies detected through a Pipeline’s validation attribute are displayed either through the InferenceResult object’s check_failures attribute, or through the pipeline’s logs.

Set A Validation

Validations are added to a pipeline through the wallaroo.pipeline add_validation method. The following parameters are required:

Parameter Type Description
name String (Required) The name of the validation
validation wallaroo.checks.Expression (Required) The validation expression that adds the result InferenceResult object’s check_failures attribute when expression result is False.

Validation expressions take the format value Expression, with the expression being in the form of a :py:Expression:. For example, if the model housing_model is part of the pipeline steps, then a validation expression may be housing_model.outputs[0][0] < 100.0: If the output of the housing_model inference is less than 100, then the validation is True and no action is taken. Any values over 100, the validation is False which triggers adding the anomaly to the InferenceResult object’s check_failures attribute.

Note that multiple validations can be created to allow for multiple anomalies detection.

In the following example, a validation is added to the pipeline to detect housing prices that are below 100 (represented as \$100 million), and trigger an anomaly for values above that level. When an inference is performed that triggers a validation failure, the results are displayed in the InferenceResult object’s check_failures attribute.

p = wl.build_pipeline('anomaly-housing-pipeline')
p = p.add_model_step(housing_model)
p = p.add_validation('price too high', housing_model.outputs[0][0] < 100.0)
pipeline = p.deploy()

test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145,  2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}
response_trigger = pipeline.infer(test_input)
print("\n")
print(response_trigger)

[InferenceResult({'check_failures': [{'False': {'expr': 'anomaly-housing.outputs[0][0] < 100'}}],
'elapsed': 15110549,
'model_name': 'anomaly-housing',
'model_version': 'c3cf1577-6666-48d3-b85c-5d4a6e6567ea',
'original_data': {'dense_16_input': [[0.02675675,
0.0,
0.02677953,
0.0,
0.0010046,
0.00951931,
0.14795322,
0.0027145,
2,
0.98536841,
0.02988655,
0.04031725,
0.04298041]]},
'outputs': [{'Float': {'data': [350.46990966796875], 'dim': [1, 1], 'v': 1}}],
'pipeline_name': 'anomaly-housing-model',
'time': 1651257043312})]


Display Anomalies

Anomalies detected through a Pipeline’s validation attribute are displayed either through the InferenceResult object’s check_failures attribute, or through the pipeline’s logs.

To display an anomaly through the InferenceResult object, display the check_failures attribute.

In the following example, the an InferenceResult where the validation failed will display the failure in the check_failures attribute:

test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145,  2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}
response_trigger = pipeline.infer(test_input)
print("\n")
print(response_trigger)

[InferenceResult({'check_failures': [{'False': {'expr': 'anomaly-housing-model.outputs[0][0] < '
'100'}}],
'elapsed': 12196540,
'model_name': 'anomaly-housing-model',
'model_version': 'a3b1c29f-c827-4aad-817d-485de464d59b',
'original_data': {'dense_16_input': [[0.02675675,
0.0,
0.02677953,
0.0,
0.0010046,
0.00951931,
0.14795322,
0.0027145,
2,
0.98536841,
0.02988655,
0.04031725,
0.04298041]]},
'outputs': [{'Float': {'data': [350.46990966796875], 'dim': [1, 1], 'v': 1}}],
'pipeline_name': 'anomaly-housing-pipeline',
'shadow_data': {},
'time': 1667416852255})]


The other methods is to use the pipeline.logs() method with the parameter valid=False, isolating the logs where the validation was returned as False.

In this example, a set of logs where the validation returned as False will be displayed:

pipeline.logs(valid=False)

Timestamp Output Input Anomalies
2022-02-Nov 19:20:52 [array([[350.46990967]])] [[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145, 2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]] 1

A/B Testing

A/B testing is a method that provides the ability to test competing ML models for performance, accuracy or other useful benchmarks. Different models are added to the same pipeline steps as follows:

• Control or Champion model: The model currently used for inferences.
• Challenger model(s): The model or set of models compared to the challenger model.

A/B testing splits a portion of the inference requests between the champion model and the one or more challengers through the add_random_split method. This method splits the inferences submitted to the model through a randomly weighted step.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model, 0.6 would route to the second.

Add Random Split

A random split step can be added to a pipeline through the add_random_split method.

The following parameters are used when adding a random split step to a pipeline:

Parameter Type Description
champion_weight Float (Required) The weight for the champion model.
champion_model Wallaroo.Model (Required) The uploaded champion model.
challenger_weight Float (Required) The weight of the challenger model.
challenger_model Wallaroo.Model (Required) The uploaded challenger model.
hash_key String(Optional) A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

Note that multiple challenger models with different weights can be added as the random split step.

add_random_split([(champion_weight, champion_model), (challenger_weight, challenger_model),  (challenger_weight2, challenger_model2),...], hash_key)


In this example, a pipeline will be built with a 1:2 weighted ratio between the champion and a single challenger model.

pipeline = (wl.build_pipeline("randomsplitpipeline-demo")
.add_random_split([(2, control), (1, challenger)]))


The results for a series of single are displayed to show the random weighted split between the two models in action:

results = []
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))

for result in results:
print(result[0].model())
print(result[0].data())

('aloha-control', 'ff81f634-8fb4-4a62-b873-93b02eb86ab4')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-control', 'ff81f634-8fb4-4a62-b873-93b02eb86ab4')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]


Replace With Random Split

If a pipeline already had steps as detailed in Add a Step to a Pipeline, this step can be replaced with a random split with the replace_with_random_split method.

The following parameters are used when adding a random split step to a pipeline:

Parameter Type Description
index Integer (Required) The pipeline step being replaced.
champion_weight Float (Required) The weight for the champion model.
champion_model Wallaroo.Model (Required) The uploaded champion model.
**challenger_weight Float (Required) The weight of the challenger model.
challenger_model Wallaroo.Model (Required) The uploaded challenger model.
hash_key String(Optional) A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

Note that one or more challenger models can be added for the random split step:

replace_with_random_split(index, [(champion_weight, champion_model), (challenger_weight, challenger_model)], (challenger_weight2, challenger_model2),...], hash_key)


A/B Testing Logs

Arrow Enabled A/B Testing Logs

For Arrow enabled environments, logs with the model names used are displayed with the Pipeline.logs method, located in the column out._model_split.

logs = experiment_pipeline.logs(limit=5)
display(logs.loc[:,['time', 'out._model_split', 'out.main']])

time out._model_split out.main
0 2023-03-03 19:08:35.653 [{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}] [0.9999754]
1 2023-03-03 19:08:35.702 [{“name”:“aloha-challenger”,“version”:“3acd3835-be72-42c4-bcae-84368f416998”,“sha”:“223d26869d24976942f53ccb40b432e8b7c39f9ffcf1f719f3929d7595bceaf3”}] [0.9999727]
2 2023-03-03 19:08:35.753 [{“name”:“aloha-challenger”,“version”:“3acd3835-be72-42c4-bcae-84368f416998”,“sha”:“223d26869d24976942f53ccb40b432e8b7c39f9ffcf1f719f3929d7595bceaf3”}] [0.6606688]
3 2023-03-03 19:08:35.799 [{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}] [0.9998954]
4 2023-03-03 19:08:35.846 [{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}] [0.99999803]

Pipeline Shadow Deployments

Wallaroo provides a method of testing the same data against two different models or sets of models at the same time through shadow deployments otherwise known as parallel deployments or A/B test. This allows data to be submitted to a pipeline with inferences running on several different sets of models. Typically this is performed on a model that is known to provide accurate results - the champion - and a model or set of models that is being tested to see if it provides more accurate or faster responses depending on the criteria known as the challenger(s). Multiple challengers can be tested against a single champion to determine which is “better” based on the organization’s criteria.

As described in the Wallaroo blog post The What, Why, and How of Model A/B Testing:

In data science, A/B tests can also be used to choose between two models in production, by measuring which model performs better in the real world. In this formulation, the control is often an existing model that is currently in production, sometimes called the champion. The treatment is a new model being considered to replace the old one. This new model is sometimes called the challenger…. Keep in mind that in machine learning, the terms experiments and trials also often refer to the process of finding a training configuration that works best for the problem at hand (this is sometimes called hyperparameter optimization).

When a shadow deployment is created, only the inference from the champion is returned in the InferenceResult Object data, while the result data for the shadow deployments is stored in the InferenceResult Object shadow_data.

Create Shadow Deployment

Create a parallel or shadow deployment for a pipeline with the pipeline.add_shadow_deploy(champion, challengers[]) method, where the champion is a Wallaroo Model object, and challengers[] is one or more Wallaroo Model objects.

Each inference request sent to the pipeline is sent to all the models. The prediction from the champion is returned by the pipeline, while the predictions from the challengers are not part of the standard output, but are kept stored in the shadow_data attribute and in the logs for later comparison.

In this example, a shadow deployment is created with the champion versus two challenger models.

champion = wl.upload_model(champion_model_name, champion_model_file).configure()
model2 = wl.upload_model(shadow_model_01_name, shadow_model_01_file).configure()
model3 = wl.upload_model(shadow_model_02_name, shadow_model_02_file).configure()

pipeline.add_shadow_deploy(champion, [model2, model3])
pipeline.deploy()


name cc-shadow
created 2022-08-04 20:06:55.102203+00:00
last_updated 2022-08-04 20:37:28.785947+00:00
deployed True
tags
steps ccfraud-lstm

Arrow Enabled Shadow Deploy Outputs

For Arrow enabled Wallaroo instances the model outputs are listed by column. The output data is set by the term out, followed by the name of the model. For the default model, this is out.dense_1, while the shadow deployed models are in the format out_{model name}.variable, where {model name} is the name of the shadow deployed model.

sample_data_file = './smoke_test.df.json'
response = pipeline.infer_from_file(sample_data_file)

time in.tensor out.dense_1 check_failures out_ccfraudrf.variable out_ccfraudxgb.variable
0 2023-03-03 17:35:28.859 [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756] [0.0014974177] 0 [1.0] [0.0005066991]

Arrow Disabled Shadow Deploy Outputs

For Arrow disabled environments, the output is from the Wallaroo InferenceResult object.

Running an inference in a pipeline that has shadow deployments enabled will have the inference run through the champion model returned in the InferenceResult Object’s data element, while the challengers is returned in the InferenceResult Object’s shadow_data element:

pipeline.infer_from_file(sample_data_file)

[InferenceResult({'check_failures': [],
'elapsed': 125102,
'model_name': 'ccfraud-lstm',
'model_version': '6b650c9c-e22f-4c50-97b2-7fce07f18607',
'original_data': {'tensor': [[1.0678324729342086,
0.21778102664937624,
-1.7115145261843976,
0.6822857209662413,
1.0138553066742804,
-0.43350000129006655,
0.7395859436561657,
-0.28828395953577357,
-0.44726268795990787,
0.5146124987725894,
0.3791316964287545,
0.5190619748123175,
-0.4904593221655364,
1.1656456468728569,
-0.9776307444180006,
-0.6322198962519854,
-0.6891477694494687,
0.17833178574255615,
0.1397992467197424,
-0.35542206494183326,
0.4394217876939808,
1.4588397511627804,
-0.3886829614721505,
0.4353492889350186,
1.7420053483337177,
-0.4434654615252943,
-0.15157478906219238,
-0.26684517248765616,
-1.454961775612449]]},
'outputs': [{'Float': {'data': [0.001497417688369751],
'dim': [1, 1],
'v': 1}}],
'pipeline_name': 'cc-shadow',
'shadow_data': {'ccfraud-rf': [{'Float': {'data': [1.0],
'dim': [1, 1],
'v': 1}}],
'ccfraud-xgb': [{'Float': {'data': [0.0005066990852355957],
'dim': [1, 1],
'v': 1}}]},
'time': 1659645473965})]


Retrieve Shadow Deployment Logs

Arrow Enabled Shadow Deploy Logs

For Arrow enabled Wallaroo instances the shadow deploy results are part of the Pipeline.logs() method. The output data is set by the term out, followed by the name of the model. For the default model, this is out.dense_1, while the shadow deployed models are in the format out_{model name}.variable, where {model name} is the name of the shadow deployed model.

logs = pipeline.logs()
display(logs)

time in.tensor out.dense_1 check_failures out_ccfraudrf.variable out_ccfraudxgb.variable
0 2023-03-03 17:35:28.859 [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756] [0.0014974177] 0 [1.0] [0.0005066991]
Arrow Disabled Shadow Deploy Logs

Inferences run against a pipeline that has shadow deployments (also known as a parallel deployment) will not be visible in the pipeline logs. To view the results against the challenger models of a shadow deployment, use the Pipeline.logs_shadow_deploy() method. The results will be grouped by inputs, allowing evaluation against multiple models performance based on the same data.

In this example, the Shadow Deployment logs are retrieved after an inference.

logs = pipeline.logs_shadow_deploy()
logs


Input [[1.0678324729342086, 0.21778102664937624, -1.7115145261843976, 0.6822857209662413, 1.0138553066742804, -0.43350000129006655, 0.7395859436561657, -0.28828395953577357, -0.44726268795990787, 0.5146124987725894, 0.3791316964287545, 0.5190619748123175, -0.4904593221655364, 1.1656456468728569, -0.9776307444180006, -0.6322198962519854, -0.6891477694494687, 0.17833178574255615, 0.1397992467197424, -0.35542206494183326, 0.4394217876939808, 1.4588397511627804, -0.3886829614721505, 0.4353492889350186, 1.7420053483337177, -0.4434654615252943, -0.15157478906219238, -0.26684517248765616, -1.454961775612449]]

Model Type Model Name Output Timestamp Model Version Elapsed
Primary ccfraud-lstm [array([[0.00149742]])] 2022-08-04T20:37:53.965000 6b650c9c-e22f-4c50-97b2-7fce07f18607 125102
Challenger ccfraud-rf [{‘Float’: {‘v’: 1, ‘dim’: [1, 1], ‘data’: [1.0]}}]
Challenger ccfraud-xgb [{‘Float’: {‘v’: 1, ‘dim’: [1, 1], ‘data’: [0.0005066990852355957]}}]

Get Pipeline URL Endpoint

The Pipeline URL Endpoint or the Pipeline Deploy URL is used to submit data to a pipeline to use for an inference. This is done through the pipeline _deployment._url() method.

In this example, the pipeline URL endpoint for the pipeline ccfraud_pipeline will be displayed:

ccfraud_pipeline._deployment._url()

'http://engine-lb.ccfraud-pipeline-1:29502/pipelines/ccfraud-pipeline'
`