Python Model Shape Upload to Wallaroo with the Wallaroo SDK Tutorial
This tutorial can be downloaded as part of the Wallaroo Tutorials repository.
Python Model Upload to Wallaroo
Python scripts can be deployed to Wallaroo as Python Models. These are treated like other models, and are used for:
- ML Models: Models written entirely in Python script.
- Data Formatting: Typically preprocess or post process modules that shape incoming data into what a ML model expects, or receives data output by a ML model and changes the data for other processes to accept.
Models are added to Wallaroo pipelines as pipeline steps, with the data from the previous step submitted to the next one. Python steps require the entry method wallaroo_json
. These methods should be structured to receive and send pandas DataFrames as the inputs and outputs.
This allows inference requests to a Wallaroo pipeline to receive pandas DataFrames or Apache Arrow tables, and return the same for consistent results.
This tutorial will:
- Create a Wallaroo workspace and pipeline.
- Upload the sample Python model and ONNX model.
- Demonstrate the outputs of the ONNX model to an inference request.
- Demonstrate the functionality of the Python model in reshaping data after an inference request.
- Use both the ONNX model and the Python model together as pipeline steps to perform an inference request and export the data for use.
Prerequisites
- Wallaroo Version 2023.2.1 or above instance.
References
Tutorial Steps
Import Libraries
We’ll start with importing the libraries we need for the tutorial. The main libraries used are:
- Wallaroo: To connect with the Wallaroo instance and perform the MLOps commands.
pyarrow
: Used for formatting the data.pandas
: Used for pandas DataFrame tables.
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
from wallaroo.deployment_config import DeploymentConfigBuilder
import datetime
import pandas as pd
import pyarrow as pa
Connect to the Wallaroo Instance through the User Interface
The next step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Set Variables
We’ll set the name of our workspace, pipeline, models and files. Workspace names must be unique across the Wallaroo workspace. For this, we’ll add in a randomly generated 4 characters to the workspace name to prevent collisions with other users’ workspaces. If running this tutorial, we recommend hard coding the workspace name so it will function in the same workspace each time it’s run.
workspace_name = f'python-demo'
pipeline_name = f'python-step-demo-pipeline'
onnx_model_name = 'house-price-sample'
onnx_model_file_name = './models/house_price_keras.onnx'
python_model_name = 'python-step'
python_model_file_name = './models/step.zip'
Create a New Workspace
For our tutorial, we’ll create the workspace, set it as the current workspace, then the pipeline we’ll add our models to.
Create New Workspace References
- Wallaroo SDK Essentials Guide: Workspace Management
- Wallaroo SDK Essentials Guide: Pipeline Management
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
pipeline = wl.build_pipeline(pipeline_name)
pipeline
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:05:44.952816+00:00 |
deployed | (none) |
workspace_id | 30 |
workspace_name | python-demo |
arch | None |
accel | None |
tags | |
versions | 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | |
published | False |
Model Descriptions
We have two models we’ll be using.
./models/house_price_keras.onnx
: A ML model trained to forecast hour prices based on inputs. This forecast is stored in the columndense_2
../models/step.py
: A Python script that accepts the data from the house price model, and reformats the output. We’ll be using it as a post-processing step.
For the Python step, it contains the method wallaroo_json
as the entry point used by Wallaroo when deployed as a pipeline step. Our sample script has the following:
# take a dataframe output of the house price model, and reformat the `dense_2`
# column as `output`
def wallaroo_json(data: pd.DataFrame):
print(data)
return [{"output": [data["dense_2"].to_list()[0][0]]}]
As seen from the description, all those function will do it take the DataFrame output of the house price model, and output a DataFrame replacing the first element in the list from column dense_2
with output
.
Upload Models
Both of these models will be uploaded to our current workspace using the method upload_model(name, path, framework).configure(framework, input_schema, output_schema)
.
- For
./models/house_price_keras.onnx
, we will specify it asFramework.ONNX
. We do not need to specify the input and output schemas. - For
./models/step.py
, we will set the input and output schemas in the requiredpyarrow.lib.Schema
format.
Upload Model References
- Wallaroo SDK Essentials Guide: Model Uploads and Registrations: ONNX
- Wallaroo SDK Essentials Guide: Model Uploads and Registrations: Python Models
house_price_model = (wl.upload_model(onnx_model_name,
onnx_model_file_name,
framework=Framework.ONNX)
.configure('onnx',
tensor_fields=["tensor"]
)
)
Pipeline Steps
With our models uploaded, we’ll perform different configurations of the pipeline steps.
First we’ll add just the house price model to the pipeline, deploy it, and submit a sample inference.
# used to restrict the resources needed for this demonstration
deployment_config = DeploymentConfigBuilder() \
.cpus(0.25).memory('1Gi') \
.build()
# clear the pipeline if this tutorial was run before
pipeline.undeploy()
pipeline.clear()
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:05:44.952816+00:00 |
deployed | (none) |
workspace_id | 30 |
workspace_name | python-demo |
arch | None |
accel | None |
tags | |
versions | 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | |
published | False |
pipeline.add_model_step(house_price_model).deploy(deployment_config=deployment_config)
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.28.1.2',
'name': 'engine-554b7dfd75-4j4mj',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'python-step-demo-pipeline',
'status': 'Running',
'version': '915af6d2-f164-4b67-a1e6-acc39b2facc4'}]},
'model_statuses': {'models': [{'name': 'house-price-sample',
'sha': '809c9f9a3016e5ab2190900d5fcfa476ee7411aa7a9ac5d4041d1cbe874cf8b9',
'status': 'Running',
'version': '53f2f613-c5a0-40f0-ae20-b852548c7878'}]}}],
'engine_lbs': [{'ip': '10.28.1.3',
'name': 'engine-lb-6b59985857-9t5xs',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
## sample inference data
data = pd.DataFrame.from_dict({"tensor": [[0.6878518042239091,
0.17607340208535074,
-0.8695140830357148,
0.34638762962802144,
-0.0916270832672289,
-0.022063226781124278,
-0.13969884765926363,
1.002792335666138,
-0.3067449033633758,
0.9272000630461978,
0.28326687982544635,
0.35935375728372815,
-0.682562654045523,
0.532642794275658,
-0.22705189652659302,
0.5743846356405602,
-0.18805086358065454
]]})
results = pipeline.infer(data)
display(results)
time | in.tensor | out.dense_2 | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:08:05.694 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
Inference with Pipeline Step
Our inference result had the results in the out.dense_2
column. We’ll clear the pipeline, then add in as the pipeline step just the Python postprocessing step we’ve created. Then for our inference request, we’ll just submit the output of the house price model. Our result should be the first element in the array returned in the out.output
column.
input_schema = pa.schema([
pa.field('dense_2', pa.list_(pa.float32()))
])
output_schema = pa.schema([
pa.field('output', pa.list_(pa.float32()))
])
step = (wl.upload_model(python_model_name,
python_model_file_name,
framework=Framework.PYTHON,
input_schema=input_schema,
output_schema=output_schema
)
)
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime..successful
Ready
pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(step)
pipeline.deploy(deployment_config=deployment_config)
pipeline.status()
{'status': 'Starting',
'details': [],
'engines': [{'ip': '10.28.1.8',
'name': 'engine-5d57d4849d-wbngz',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'python-step-demo-pipeline',
'status': 'Running',
'version': 'ee274120-db36-4610-bbf7-8a35888c7e22'}]},
'model_statuses': {'models': [{'name': 'python-step',
'sha': 'a89a5a6172f085630b1f893d980b8390bdcc2fb081408905bacf8bc1c78f00d6',
'status': 'Running',
'version': 'a1597a4f-59e4-44e8-8904-8871feaa1176'}]}}],
'engine_lbs': [{'ip': '10.28.1.7',
'name': 'engine-lb-6b59985857-rsk8n',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': [{'ip': '10.28.1.9',
'name': 'engine-sidekick-python-step-41-cb85dc7d6-266qh',
'status': 'Running',
'reason': None,
'details': ['containers with unready status: [engine-sidekick-python-step-41]',
'containers with unready status: [engine-sidekick-python-step-41]'],
'statuses': None}]}
data = pd.DataFrame.from_dict({"dense_2": [12.886651]})
display(data)
python_result = pipeline.infer(data)
display(python_result)
dense_2 | |
---|---|
0 | 12.886651 |
time | in.dense_2 | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:15:27.361 | [] | [] | 0 |
Putting Both Models Together
Now we’ll do one last pipeline deployment with 2 steps:
- First the house price model that outputs the inference result into
dense_2
. - Second the python step so it will accept the output of the house price model, and reshape it into
output
.
inference_start = datetime.datetime.now()
pipeline.clear()
pipeline.add_model_step(house_price_model)
pipeline.add_model_step(step)
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:09:15.747015+00:00 |
deployed | True |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | python-step |
published | False |
pipeline.undeploy()
pipeline.deploy(deployment_config=deployment_config)
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:16:09.829183+00:00 |
deployed | True |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | 7fe9ffde-6c49-4599-a4ab-6b6f7a6aa5ac, ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | house-price-sample |
published | False |
data = pd.DataFrame.from_dict({"tensor": [[0.6878518042239091,
0.17607340208535074,
-0.8695140830357148,
0.34638762962802144,
-0.0916270832672289,
-0.022063226781124278,
-0.13969884765926363,
1.002792335666138,
-0.3067449033633758,
0.9272000630461978,
0.28326687982544635,
0.35935375728372815,
-0.682562654045523,
0.532642794275658,
-0.22705189652659302,
0.5743846356405602,
-0.18805086358065454
]]})
results = pipeline.infer(data)
display(results)
time | in.tensor | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-07-25 20:17:36.499 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
Pipeline Logs
As the data was exported by the pipeline step as a pandas DataFrame, it will be reflected in the pipeline logs. We’ll retrieve the most recent log from our most recent inference.
inference_end = datetime.datetime.now()
pipeline.logs(start_datetime=inference_start, end_datetime=inference_end)
time | in.tensor | out.output | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-09 16:03:53.059 | [0.6878518042, 0.1760734021, -0.869514083, 0.3... | [12.886651] | 0 |
Undeploy the Pipeline
With our tutorial complete, we’ll undeploy the pipeline and return the resources back to the cluster.
This process demonstrated how to structure a postprocessing Python script as a Wallaroo Pipeline step. This can be used for pre or post processing, Python based models, and other use cases.
pipeline.undeploy()
name | python-step-demo-pipeline |
---|---|
created | 2024-07-25 19:56:50.572311+00:00 |
last_updated | 2024-07-25 20:16:09.829183+00:00 |
deployed | False |
workspace_id | 30 |
workspace_name | python-demo |
arch | x86 |
accel | none |
tags | |
versions | 7fe9ffde-6c49-4599-a4ab-6b6f7a6aa5ac, ee274120-db36-4610-bbf7-8a35888c7e22, 915af6d2-f164-4b67-a1e6-acc39b2facc4, 9738f3df-3cd4-474a-b402-b9dc0349cd68, b47d1d07-ab29-48aa-9931-bc1ad0c2c5bd |
steps | house-price-sample |
published | False |