Demand Curve Quick Start Guide
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Demand Curve Pipeline Tutorial
This worksheet demonstrates a Wallaroo pipeline with data preprocessing, a model, and data postprocessing.
The model is a “demand curve” that predicts the expected number of units of a product that will be sold to a customer as a function of unit price and facts about the customer. Such models can be used for price optimization or sales volume forecasting. This is purely a “toy” demonstration, but is useful for detailing the process of working with models and pipelines.
Data preprocessing is required to create the features used by the model. Simple postprocessing prevents nonsensical estimates (e.g. negative units sold).
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed:
os
wallaroo
: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
import json
import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas
import numpy
import conversion
from wallaroo.object import EntityNotFoundError
import pyarrow as pa
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)
# ignoring warnings for demonstration
import warnings
warnings.filterwarnings('ignore')
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Now that the Wallaroo client has been initialized, we can create the workspace and call it demandcurveworkspace
, then set it as our current workspace. We’ll also create our pipeline so it’s ready when we add our models to it.
We’ll set some variables and methods to create our workspace, pipelines and models. Note that as of the July 2022 release of Wallaroo, workspace names must be unique. Pipelines with the same name will be created as a new version when built.
workspace_name = 'demandcurveworkspace'
pipeline_name = 'demandcurvepipeline'
model_name = 'demandcurvemodel'
model_file_name = './models/demand_curve_v1.onnx'
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
demandcurve_pipeline = wl.build_pipeline(pipeline_name)
demandcurve_pipeline
name | demandcurvepipeline |
---|---|
created | 2024-07-22 21:39:54.411345+00:00 |
last_updated | 2024-07-23 14:56:35.240924+00:00 |
deployed | False |
workspace_id | 161 |
workspace_name | demandcurveworkspace |
arch | x86 |
accel | none |
tags | |
versions | edd05f9d-523c-4be6-83d3-09dc4c1ec545, 8f1db6fb-58ba-4a83-9880-c3b4cb1de48f, 915021b6-e0cc-4d47-99c2-43d04cb83398 |
steps | demandcurvemodel |
published | False |
With our workspace established, we’ll upload three models:
./models/preprocess_dc_byop.zip
: A preprocess model step that formats the data into a tensor that the model can inference from../models/demand_curve_v1.onnx
: Our demand_curve model. We’ll store the upload configuration intodemand_curve_model
../models/postprocess_dc_byop.zip
: A postprocess model step that will zero out any negative values and return the output variable as “prediction”.
Note that the order we upload our models isn’t important - we’ll be establishing the actual process of moving data from one model to the next when we set up our pipeline.
demand_curve_model = wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX
).configure(tensor_fields=["tensor"])
input_schema = pa.schema([
pa.field('Date', pa.string()),
pa.field('cust_known', pa.bool_()),
pa.field('StockCode', pa.int32()),
pa.field('UnitPrice', pa.float32()),
pa.field('UnitsSold', pa.int32())
])
output_schema = pa.schema([
pa.field('tensor', pa.list_(pa.float64()))
])
preprocess_step = wl.upload_model('curve-preprocess',
'./models/preprocess_dc_byop.zip',
framework=wallaroo.framework.Framework.CUSTOM,
input_schema=input_schema,
output_schema=output_schema)
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime...successful
Ready
input_schema = pa.schema([
pa.field('variable', pa.list_(pa.float64()))
])
output_schema = pa.schema([
pa.field('prediction', pa.list_(pa.float64()))
])
postprocess_step = wl.upload_model('curve-postprocess',
'./models/postprocess_dc_byop.zip',
framework=wallaroo.framework.Framework.CUSTOM,
input_schema=input_schema,
output_schema=output_schema)
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime..successful
Ready
With our models uploaded, we’re going to create our own pipeline and give it three steps:
- The preprocess step to put the data into a tensor format.
- Then we apply the data to our
demand_curve_model
. - And finally, we prepare our data for output with the
module_post
.
# now make a pipeline
demandcurve_pipeline.undeploy()
demandcurve_pipeline.clear()
demandcurve_pipeline.add_model_step(preprocess_step)
demandcurve_pipeline.add_model_step(demand_curve_model)
demandcurve_pipeline.add_model_step(postprocess_step)
ok
name | demandcurvepipeline |
---|---|
created | 2024-07-22 21:39:54.411345+00:00 |
last_updated | 2024-07-23 14:56:35.240924+00:00 |
deployed | False |
workspace_id | 161 |
workspace_name | demandcurveworkspace |
arch | x86 |
accel | none |
tags | |
versions | edd05f9d-523c-4be6-83d3-09dc4c1ec545, 8f1db6fb-58ba-4a83-9880-c3b4cb1de48f, 915021b6-e0cc-4d47-99c2-43d04cb83398 |
steps | demandcurvemodel |
published | False |
And with that - let’s deploy our model pipeline. This usually takes about 45 seconds for the deployment to finish.
deploy_config = wallaroo.DeploymentConfigBuilder().replica_count(1).cpus(1).memory("1Gi").build()
demandcurve_pipeline.deploy(deployment_config=deploy_config)
demandcurve_pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.4.2.40',
'name': 'engine-5744fbddb4-9bdlt',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'demandcurvepipeline',
'status': 'Running',
'version': 'ae33d660-608f-4d7f-8183-78b0fc7cc440'}]},
'model_statuses': {'models': [{'name': 'demandcurvemodel',
'sha': '2820b42c9e778ae259918315f25afc8685ecab9967bad0a3d241e6191b414a0d',
'status': 'Running',
'version': '9b3a8640-e2b6-4de0-9096-eac369f87a29'},
{'name': 'curve-postprocess',
'sha': 'cf4cb335761e2bd5f238bd13f70e777f1fcc1eb31837ebea9cf3eb55c8faeb2f',
'status': 'Running',
'version': 'd8b4fff0-3603-48e3-88be-004cd0fc717b'},
{'name': 'curve-preprocess',
'sha': '22d6886115cbf667cfb7dbd394730625e09d0f8a8ff853848a7edebdb3c26f01',
'status': 'Running',
'version': '096e5c3b-21d6-4887-92f6-84acdbf8153d'}]}}],
'engine_lbs': [{'ip': '10.4.2.39',
'name': 'engine-lb-75cf576f7f-xd264',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': [{'ip': '10.4.2.38',
'name': 'engine-sidekick-curve-postprocess-279-6799b7dfd7-js744',
'status': 'Running',
'reason': None,
'details': [],
'statuses': '\n'},
{'ip': '10.4.3.3',
'name': 'engine-sidekick-curve-preprocess-278-6b6cf8fdcf-pvnv8',
'status': 'Running',
'reason': None,
'details': [],
'statuses': '\n'}]}
Everything is ready. Let’s feed our pipeline some data. We have some information prepared with the daily_purchasses.csv
spreadsheet. We’ll start with just one row to make sure that everything is working correctly.
# read in some purchase data
purchases = pandas.read_csv('daily_purchases.csv')
# start with a one-row data frame for testing
subsamp_raw = purchases.iloc[0:1,: ]
subsamp_raw
Date | cust_known | StockCode | UnitPrice | UnitsSold | |
---|---|---|---|---|---|
0 | 2010-12-01 | False | 21928 | 4.21 | 1 |
result = demandcurve_pipeline.infer(subsamp_raw)
display(result)
time | in.Date | in.StockCode | in.UnitPrice | in.UnitsSold | in.cust_known | out.prediction | anomaly.count | |
---|---|---|---|---|---|---|---|---|
0 | 2024-07-23 15:01:47.076 | 2010-12-01 | None | 4.21 | 1 | False | [6.680255142999893] | 0 |
We can see from the out.prediction
field that the demand curve has a predicted slope of 6.68 from our sample data. We can isolate that by specifying just the data output below.
display(result.loc[0, ['out.prediction']][0])
[6.680255142999893]
Bulk Inference
The initial test went perfectly. Now let’s throw some more data into our pipeline. We’ll draw 10 random rows from our spreadsheet, perform an inference from that, and then display the results and the logs showing the pipeline’s actions.
ix = numpy.random.choice(purchases.shape[0], size=10, replace=False)
converted = conversion.pandas_to_dict(purchases.iloc[ix,: ])
test_df = pd.DataFrame(converted['query'], columns=converted['colnames'])
display(test_df)
output = demandcurve_pipeline.infer(test_df)
display(output)
Date | cust_known | StockCode | UnitPrice | UnitsSold | |
---|---|---|---|---|---|
0 | 2011-04-27 | True | 85099B | 2.08 | 64 |
1 | 2011-11-25 | True | 23581 | 2.08 | 50 |
2 | 2011-08-09 | True | 21931 | 2.08 | 30 |
3 | 2011-09-07 | False | 22411 | 4.13 | 36 |
4 | 2011-04-19 | True | 21033 | 2.08 | 10 |
5 | 2011-09-15 | False | 22663 | 4.13 | 3 |
6 | 2011-09-11 | True | 22386 | 2.08 | 17 |
7 | 2011-01-14 | True | 22411 | 1.95 | 30 |
8 | 2011-03-23 | False | 21931 | 4.13 | 5 |
9 | 2011-07-27 | False | 85099B | 4.13 | 1 |
time | in.Date | in.StockCode | in.UnitPrice | in.UnitsSold | in.cust_known | out.prediction | anomaly.count | |
---|---|---|---|---|---|---|---|---|
0 | 2024-07-23 15:01:51.323 | 2011-04-27 | None | 2.08 | 64 | True | [33.125327529877765] | 0 |
1 | 2024-07-23 15:01:51.323 | 2011-11-25 | None | 2.08 | 50 | True | [33.125327529877765] | 0 |
2 | 2024-07-23 15:01:51.323 | 2011-08-09 | None | 2.08 | 30 | True | [33.125327529877765] | 0 |
3 | 2024-07-23 15:01:51.323 | 2011-09-07 | None | 4.13 | 36 | False | [6.7715457962084376] | 0 |
4 | 2024-07-23 15:01:51.323 | 2011-04-19 | None | 2.08 | 10 | True | [33.125327529877765] | 0 |
5 | 2024-07-23 15:01:51.323 | 2011-09-15 | None | 4.13 | 3 | False | [6.7715457962084376] | 0 |
6 | 2024-07-23 15:01:51.323 | 2011-09-11 | None | 2.08 | 17 | True | [33.125327529877765] | 0 |
7 | 2024-07-23 15:01:51.323 | 2011-01-14 | None | 1.95 | 30 | True | [40.57067616108544] | 0 |
8 | 2024-07-23 15:01:51.323 | 2011-03-23 | None | 4.13 | 5 | False | [6.7715457962084376] | 0 |
9 | 2024-07-23 15:01:51.323 | 2011-07-27 | None | 4.13 | 1 | False | [6.7715457962084376] | 0 |
Undeploy the Pipeline
Once we’ve finished with our demand curve demo, we’ll undeploy the pipeline and give the resources back to our Kubernetes cluster.
demandcurve_pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ................................... ok
name | demandcurvepipeline |
---|---|
created | 2024-07-22 21:39:54.411345+00:00 |
last_updated | 2024-07-23 14:58:00.732488+00:00 |
deployed | False |
workspace_id | 161 |
workspace_name | demandcurveworkspace |
arch | x86 |
accel | none |
tags | |
versions | ae33d660-608f-4d7f-8183-78b0fc7cc440, edd05f9d-523c-4be6-83d3-09dc4c1ec545, 8f1db6fb-58ba-4a83-9880-c3b4cb1de48f, 915021b6-e0cc-4d47-99c2-43d04cb83398 |
steps | demandcurvemodel |
published | False |
Thank you for being a part of this demonstration. If you have additional questions, please feel free to contact us at Wallaroo.