Wallaroo SDK Essentials Guide: Pipeline Management
Table of Contents
Pipelines are the method of taking submitting data and processing that data through the models. Each pipeline can have one or more steps that submit the data from the previous step to the next one. Information can be submitted to a pipeline as a file, or through the pipeline’s URL.
A pipeline’s metrics can be viewed through the Wallaroo Dashboard Pipeline Details and Metrics page.
Pipeline Naming Requirements
Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. .
and _
are not allowed.
Create a Pipeline
New pipelines are created in the current workspace.
NOTICE
Pipeline names are not forced to be unique. You can have 50 pipelines all named my-pipeline
, which can cause confusion in determining which workspace to use.
It is recommended that organizations agree on a naming convention and select pipeline to use rather than creating a new one each time.
To create a new pipeline, use the Wallaroo Client build_pipeline("{Pipeline Name}")
command.
The following example creates a new pipeline imdb-pipeline
through a Wallaroo Client connection wl
:
imdb_pipeline = wl.build_pipeline("imdb-pipeline")
imdb_pipeline.status()
{'status': 'Pipeline imdb-pipeline is not deployed'}
List All Pipelines
The Wallaroo Client method list_pipelines()
lists all pipelines in a Wallaroo Instance.
The following example lists all pipelines in the wl
Wallaroo Client connection:
wl.list_pipelines()
[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]
Select an Existing Pipeline
Rather than creating a new pipeline each time, an existing pipeline can be selected by using the list_pipelines()
command and assigning one of the array members to a variable.
The following example sets the pipeline ccfraud-pipeline
to the variable current_pipeline
:
wl.list_pipelines()
[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]
current_pipeline = wl.list_pipelines()[0]
current_pipeline.status()
{'status': 'Running',
'details': None,
'engines': [{'ip': '10.244.5.4',
'name': 'engine-7fcc7df596-hvlxb',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'ccfraud-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraud-model',
'version': '4624e8a8-1414-4408-8b40-e03da4b5cb68',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.1.24',
'name': 'engine-lb-85846c64f8-mtq9p',
'status': 'Running',
'reason': None}]}
Pipeline Steps
Once a pipeline has been created, or during its creation process, a pipeline step can be added. The pipeline step refers to the model that will perform an inference off of the data submitted to it. Each time a step is added, it is added to the pipeline’s models
array.
Pipeline steps are not saved until the pipeline is deployed. Until then, pipeline steps are stored in local memory as a potential pipeline configuration until the pipeline is deployed.
Add a Step to a Pipeline
A pipeline step is added through the pipeline add_model_step({Model})
command.
In the following example, two models uploaded to the workspace are added as pipeline step:
imdb_pipeline.add_model_step(embedder)
imdb_pipeline.add_model_step(smodel)
imdb_pipeline.status()
{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}
Replace a Pipeline Step
The model specified in a pipeline step can be replaced with the pipeline method replace_with_model_step(index, model)
.
IMPORTANT NOTE
Pipeline steps can be replaced while a pipeline is deployed. This allows organizations to have pipelines deployed in a production environment and hot-swap out models for new versions without impacting performance or inferencing downtime.The following parameters are used for replacing a pipeline step:
Parameter | Default Value | Purpose |
---|---|---|
index | null | The pipeline step to be replaced. Pipeline steps follow array numbering, where the first step is 0 , etc. |
model | null | The new model to be used in the pipeline step. |
In the following example, a deployed pipeline will have the initial model step replaced with a new one. A status of the pipeline will be displayed after deployment and after the pipeline swap to show the model has been replaced from ccfraudoriginal
to ccfraudreplacement
, each with their own versions.
pipeline.deploy()
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.2.145',
'name': 'engine-75bfd7dc9d-7p9qk',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraudoriginal',
'version': '3a03dc94-716e-46bb-84c8-91bc99ceb2c3',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.2.144',
'name': 'engine-lb-55dcdff64c-vf74s',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
pipeline.replace_with_model_step(0, replacement_model).deploy()
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.2.153',
'name': 'engine-96486c95d-zfchr',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraudreplacement',
'version': '714efd19-5c83-42a8-aece-24b4ba530925',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.2.154',
'name': 'engine-lb-55dcdff64c-9np9k',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
Pre and Post Processing Steps
A Pipeline Step can be more than models - they can also be pre processing and post processing steps. For example, the Demand Curve Tutorial has both a pre and post processing steps that are added to the pipeline. The preprocessing step uses the following code:
import numpy
import pandas
import json
# add interaction terms for the model
def actual_preprocess(pdata):
pd = pdata.copy()
# convert boolean cust_known to 0/1
pd.cust_known = numpy.where(pd.cust_known, 1, 0)
# interact UnitPrice and cust_known
pd['UnitPriceXcust_known'] = pd.UnitPrice * pd.cust_known
return pd.loc[:, ['UnitPrice', 'cust_known', 'UnitPriceXcust_known']]
# If the data is a json string, call this wrapper instead
# Expected input:
# a dictionary with fields 'colnames', 'data'
# test that the code works here
def wallaroo_json(data):
obj = json.loads(data)
pdata = pandas.DataFrame(obj['query'],
columns=obj['colnames'])
pprocessed = actual_preprocess(pdata)
# return a dictionary, with the fields the model expect
return {
'tensor_fields': ['model_input'],
'model_input': pprocessed.to_numpy().tolist()
}
It is added as a Python module by uploading it as a model:
# load the preprocess module
module_pre = wl.upload_model("preprocess", "./preprocess.py").configure('python')
And then added to the pipeline as a step:
# now make a pipeline
demandcurve_pipeline = (wl.build_pipeline("demand-curve-pipeline")
.add_model_step(module_pre)
.add_model_step(demand_curve_model)
.add_model_step(module_post))
Remove a Pipeline Step
To remove a step from the pipeline, use the Pipeline remove_step(index)
command, where the index
is the array index for the pipeline’s steps.
In the following example the pipeline imdb_pipeline
will have the step with the model smodel-o
removed.
imdb_pipeline.status
<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>
imdb_pipeline.remove_step(1)
{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}]"}
Clear All Pipeline Steps
The Pipeline clear()
method removes all pipeline steps from a pipeline. Note that pipeline steps are not saved until the pipeline is deployed.
Manage Pipeline Deployment Configuration
For full details on pipeline deployment configurations, see Wallaroo SDK Essentials Guide: Pipeline Deployment Configuration.
Deploy a Pipeline
When a pipeline step is added or removed, the pipeline must be deployed through the pipeline deploy()
. This allocates resources to the pipeline from the Kubernetes environment and make it available to submit information to perform inferences. This process typically takes 45 seconds.
Once complete, the pipeline status()
command will show 'status':'Running'
.
Pipeline deployments can be modified to enable auto-scaling to allow pipelines to allocate more or fewer resources based on need by setting the pipeline’s This will then be applied to the deployment of the pipeline
ccfraudPipelineby specifying it's
deployment_config` optional parameter. If this optional parameter is not passed, then the deployment will defer to default values. For more information, see Manage Pipeline Deployment Configuration.
In the following example, the pipeline imdb-pipeline
that contains two steps will be deployed with default deployment configuration:
imdb_pipeline.status
<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>
imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok
imdb_pipeline.status()
{'status': 'Running',
'details': None,
'engines': [{'ip': '10.12.1.65',
'name': 'engine-778b65459-f9mt5',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'embedder-o',
'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
'status': 'Running'},
{'name': 'smodel-o',
'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.12.1.66',
'name': 'engine-lb-85846c64f8-ggg2t',
'status': 'Running',
'reason': None}]}
Troubleshooting Pipeline Deployment
If you deploy more pipelines than your environment can handle, or if you deploy more pipelines than your license allows, you may see an error like the following:
LimitError: You have reached a license limit in your Wallaroo instance. In order to add additional resources, you can remove some of your existing resources. If you have any questions contact us at community@wallaroo.ai: MAX_PIPELINES_LIMIT_EXCEEDED
Undeploy any unnecessary pipelines either through the SDK or through the Wallaroo Pipeline Dashboard, then attempt to redeploy the pipeline in question again.
Undeploy a Pipeline
When a pipeline is not currently needed, it can be undeployed and its resources turned back to the Kubernetes environment. To undeploy a pipeline, use the pipeline undeploy()
command.
In this example, the aloha_pipeline
will be undeployed:
aloha_pipeline.undeploy()
{'name': 'aloha-test-demo', 'create_time': datetime.datetime(2022, 3, 29, 20, 34, 3, 960957, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'aloha-2', 'version': 'a8e8abdc-c22f-416c-a13c-5fe162357430', 'sha': 'fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520'}]}}]"}
Get Pipeline Status
The pipeline status()
command shows the current status, models, and other information on a pipeline.
The following example shows the pipeline imdb_pipeline
status before and after it is deployed:
imdb_pipeline.status
<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>
imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok
imdb_pipeline.status()
{'status': 'Running',
'details': None,
'engines': [{'ip': '10.12.1.65',
'name': 'engine-778b65459-f9mt5',
'status': 'Running',
'reason': None,
'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'embedder-o',
'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
'status': 'Running'},
{'name': 'smodel-o',
'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.12.1.66',
'name': 'engine-lb-85846c64f8-ggg2t',
'status': 'Running',
'reason': None}]}
Anomaly Testing
Anomaly detection allows organizations to set validation parameters. A validation is added to a pipeline to test data based on a specific expression. If the expression is returned as False
, this is detected as an anomaly and added to the InferenceResult object’s check_failures
array and the pipeline logs.
Anomaly detection consists of the following steps:
- Set a validation: Add a validation to a pipeline that, when returned
False
, adds an entry to theInferenceResult
object’scheck_failures
attribute with the expression that caused the failure. - Display anomalies: Anomalies detected through a Pipeline’s
validation
attribute are displayed either through theInferenceResult
object’scheck_failures
attribute, or through the pipeline’s logs.
Set A Validation
Validations are added to a pipeline through the wallaroo.pipeline
add_validation
method. The following parameters are required:
Parameter | Type | Description |
---|---|---|
name | String (Required) | The name of the validation |
validation | wallaroo.checks.Expression (Required) | The validation expression that adds the result InferenceResult object’s check_failures attribute when expression result is False . The validation checks the expression against both the data value and the data type. |
Validation expressions take the format value Expression
, with the expression being in the form of a :py:Expression:
. For example, if the model housing_model
is part of the pipeline steps, then a validation expression may be housing_model.outputs[0][0] < 100.0
: If the output of the housing_model
inference is less than 100, then the validation is True
and no action is taken. Any values over 100, the validation is False
which triggers adding the anomaly to the InferenceResult
object’s check_failures
attribute.
IMPORTANT NOTE
Validations test for the expression value and the data type. For example: 100
is considered an integer data type, while 100.0
is considered a float data type.
If the data type is an integer, and the value the expression is testing against is a float, then the validation check will always be triggered. Verify that the data type is properly set in the validation expression to ensure correct validation check results.
Note that multiple validations can be created to allow for multiple anomalies detection.
In the following example, a validation is added to the pipeline to detect housing prices that are below 100 (represented as $100 million), and trigger an anomaly for values above that level. When an inference is performed that triggers a validation failure, the results are displayed in the InferenceResult
object’s check_failures
attribute.
p = wl.build_pipeline('anomaly-housing-pipeline')
p = p.add_model_step(housing_model)
p = p.add_validation('price too high', housing_model.outputs[0][0] < 100.0)
pipeline