This tutorial can be downloaded as part of the Wallaroo Tutorials repository.
Python scripts can be deployed to Wallaroo as Python Models. These are treated like other models, and are used for:
Models are added to Wallaroo pipelines as pipeline steps, with the data from the previous step submitted to the next one. Python steps require the entry method wallaroo_json
. These methods should be structured to receive and send pandas DataFrames as the inputs and outputs.
This allows inference requests to a Wallaroo pipeline to receive pandas DataFrames or Apache Arrow tables, and return the same for consistent results.
This tutorial will:
We’ll start with importing the libraries we need for the tutorial. The main libraries used are:
pyarrow
: Used for formatting the data.pandas
: Used for pandas DataFrame tables.import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
from wallaroo.deployment_config import DeploymentConfigBuilder
import datetime
import pandas as pd
import pyarrow as pa
The next step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
We’ll set the name of our workspace, pipeline, models and files. Workspace names must be unique across the Wallaroo workspace. For this, we’ll add in a randomly generated 4 characters to the workspace name to prevent collisions with other users’ workspaces. If running this tutorial, we recommend hard coding the workspace name so it will function in the same workspace each time it’s run.
workspace_name = f'python-demo'
pipeline_name = f'python-step-demo-pipeline'
onnx_model_name = 'house-price-sample'
onnx_model_file_name = './models/house_price_keras.onnx'
python_model_name = 'python-step'
python_model_file_name = './models/step.zip'
For our tutorial, we’ll create the workspace, set it as the current workspace, then the pipeline we’ll add our models to.
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
pipeline = wl.build_pipeline(pipeline_name)
pipeline
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:05:44.952816+00:00 |
deployed | (none) |
workspace_id | 30 |
workspace_name | python-demo |
arch | None |
accel | None |
tags | |
versions | 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | |
published | False |
We have two models we’ll be using.
./models/house_price_keras.onnx
: A ML model trained to forecast hour prices based on inputs. This forecast is stored in the column dense_2
../models/step.py
: A Python script that accepts the data from the house price model, and reformats the output. We’ll be using it as a post-processing step.For the Python step, it contains the method wallaroo_json
as the entry point used by Wallaroo when deployed as a pipeline step. Our sample script has the following:
# take a dataframe output of the house price model, and reformat the `dense_2`
# column as `output`
def wallaroo_json(data: pd.DataFrame):
print(data)
return [{"output": [data["dense_2"].to_list()[0][0]]}]
As seen from the description, all those function will do it take the DataFrame output of the house price model, and output a DataFrame replacing the first element in the list from column dense_2
with output
.
Both of these models will be uploaded to our current workspace using the method upload_model(name, path, framework).configure(framework, input_schema, output_schema)
.
./models/house_price_keras.onnx
, we will specify it as Framework.ONNX
. We do not need to specify the input and output schemas../models/step.py
, we will set the input and output schemas in the required pyarrow.lib.Schema
format.house_price_model = (wl.upload_model(onnx_model_name,
onnx_model_file_name,
framework=Framework.ONNX)
.configure('onnx',
tensor_fields=["tensor"]
)
)
With our models uploaded, we’ll perform different configurations of the pipeline steps.
First we’ll add just the house price model to the pipeline, deploy it, and submit a sample inference.
# used to restrict the resources needed for this demonstration
deployment_config = DeploymentConfigBuilder() \
.cpus(0.25).memory('1Gi') \
.build()
# clear the pipeline if this tutorial was run before
pipeline.undeploy()
pipeline.clear()
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:05:44.952816+00:00 |
deployed | (none) |
workspace_id | 30 |
workspace_name | python-demo |
arch | None |
accel | None |
tags | |
versions | 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | |
published | False |
pipeline.add_model_step(house_price_model).deploy(deployment_config=deployment_config)
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.28.1.2',
'name': 'engine-554b7dfd75-4j4mj',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'python-step-demo-pipeline',
'status': 'Running',
'version': '915af6d2-f164-4b67-a1e6-acc39b2facc4'}]},
'model_statuses': {'models': [{'name': 'house-price-sample',
'sha': '809c9f9a3016e5ab2190900d5fcfa476ee7411aa7a9ac5d4041d1cbe874cf8b9',
'status': 'Running',
'version': '53f2f613-c5a0-40f0-ae20-b852548c7878'}]}}],
'engine_lbs': [{'ip': '10.28.1.3',
'name': 'engine-lb-6b59985857-9t5xs',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
## sample inference data
data = pd.DataFrame.from_dict({"tensor": [[0.6878518042239091,
0.17607340208535074,
-0.8695140830357148,
0.34638762962802144,
-0.0916270832672289,
-0.022063226781124278,
-0.13969884765926363,
1.002792335666138,
-0.3067449033633758,
0.9272000630461978,
0.28326687982544635,
0.35935375728372815,
-0.682562654045523,
0.532642794275658,
-0.22705189652659302,
0.5743846356405602,
-0.18805086358065454
]]})
results = pipeline.infer(data)
display(results)
time | in.tensor | out.dense_2 | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:08:05.694 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
Our inference result had the results in the out.dense_2
column. We’ll clear the pipeline, then add in as the pipeline step just the Python postprocessing step we’ve created. Then for our inference request, we’ll just submit the output of the house price model. Our result should be the first element in the array returned in the out.output
column.
input_schema = pa.schema([
pa.field('dense_2', pa.list_(pa.float32()))
])
output_schema = pa.schema([
pa.field('output', pa.list_(pa.float32()))
])
step = (wl.upload_model(python_model_name,
python_model_file_name,
framework=Framework.PYTHON,
input_schema=input_schema,
output_schema=output_schema
)
)
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime..successful
Ready
pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(step)
pipeline.deploy(deployment_config=deployment_config)
pipeline.status()
{'status': 'Starting',
'details': [],
'engines': [{'ip': '10.28.1.8',
'name': 'engine-5d57d4849d-wbngz',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'python-step-demo-pipeline',
'status': 'Running',
'version': 'ee274120-db36-4610-bbf7-8a35888c7e22'}]},
'model_statuses': {'models': [{'name': 'python-step',
'sha': 'a89a5a6172f085630b1f893d980b8390bdcc2fb081408905bacf8bc1c78f00d6',
'status': 'Running',
'version': 'a1597a4f-59e4-44e8-8904-8871feaa1176'}]}}],
'engine_lbs': [{'ip': '10.28.1.7',
'name': 'engine-lb-6b59985857-rsk8n',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': [{'ip': '10.28.1.9',
'name': 'engine-sidekick-python-step-41-cb85dc7d6-266qh',
'status': 'Running',
'reason': None,
'details': ['containers with unready status: [engine-sidekick-python-step-41]',
'containers with unready status: [engine-sidekick-python-step-41]'],
'statuses': None}]}
data = pd.DataFrame.from_dict({"dense_2": [12.886651]})
display(data)
python_result = pipeline.infer(data)
display(python_result)
dense_2 | |
---|---|
0 | 12.886651 |
time | in.dense_2 | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:15:27.361 | [] | [] | 0 |
Now we’ll do one last pipeline deployment with 2 steps:
dense_2
.output
.inference_start = datetime.datetime.now()
pipeline.clear()
pipeline.add_model_step(house_price_model)
pipeline.add_model_step(step)
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:09:15.747015+00:00 |
deployed | True |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | python-step |
published | False |
pipeline.undeploy()
pipeline.deploy(deployment_config=deployment_config)
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:16:09.829183+00:00 |
deployed | True |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | 7fe9ffde-6c49-4599-a4ab-6b6f7a6aa5ac, ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | house-price-sample |
published | False |
data = pd.DataFrame.from_dict({"tensor": [[0.6878518042239091,
0.17607340208535074,
-0.8695140830357148,
0.34638762962802144,
-0.0916270832672289,
-0.022063226781124278,
-0.13969884765926363,
1.002792335666138,
-0.3067449033633758,
0.9272000630461978,
0.28326687982544635,
0.35935375728372815,
-0.682562654045523,
0.532642794275658,
-0.22705189652659302,
0.5743846356405602,
-0.18805086358065454
]]})
results = pipeline.infer(data)
display(results)
time | in.tensor | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:17:36.499 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
As the data was exported by the pipeline step as a pandas DataFrame, it will be reflected in the pipeline logs. We’ll retrieve the most recent log from our most recent inference.
inference_end = datetime.datetime.now()
pipeline.logs(start_datetime=inference_start, end_datetime=inference_end)
time | in.tensor | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-09 16:03:53.059 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
With our tutorial complete, we’ll undeploy the pipeline and return the resources back to the cluster.
This process demonstrated how to structure a postprocessing Python script as a Wallaroo Pipeline step. This can be used for pre or post processing, Python based models, and other use cases.
pipeline.undeploy()
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:16:09.829183+00:00 |
deployed | False |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | 7fe9ffde-6c49-4599-a4ab-6b6f7a6aa5ac, ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | house-price-sample |
published | False |