Demand Curve Quick Start Guide

The Demand Curve Quick Start Guide demonstrates how to use Wallaroo to chart a demand curve based on submitted data. This example uses a model plus preprocess and postprocessing steps.

This worksheet demonstrates a Wallaroo pipeline with data preprocessing, a model, and data postprocessing.

The model is a “demand curve” that predicts the expected number of units of a product that will be sold to a customer as a function of unit price and facts about the customer. Such models can be used for price optimization or sales volume forecasting.

Data preprocessing is required to create the features used by the model. Simple postprocessing prevents nonsensical estimates (e.g. negative units sold).

Open a Connection to Wallaroo

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

The other libraries shown below are used for this example.

import json
import wallaroo
import pandas
import numpy
import conversion
wl = wallaroo.Client()

Now that the Wallaroo client has been initialized, we can create the workspace and call it demandcurve-workspace, then set it as our current workspace.

new_workspace = wl.create_workspace("demandcurve-workspace")
_ = wl.set_current_workspace(new_workspace)

Just to make sure, let’s list our current workspace. If everything is going right, it will show us we’re in the demandcurve-workspace.

wl.get_current_workspace()
    {'name': 'demandcurve-workspace', 'id': 3, 'archived': False, 'created_by': '45e6b641-fe57-4fb2-83d2-2c2bd201efe8', 'created_at': '2022-03-29T20:21:32.732178+00:00', 'models': [], 'pipelines': []}

With our workspace established, we’ll upload three models:

  • demand_curve_v1.onnx: Our demand_curve model. We’ll store the upload configuration into demand_curve_model.
  • preprocess: Takes the data and prepares it for the demand curve model. We’ll store the upload configuration into module_pre.
  • postprocess: Takes the results from our demand curve model and prepares it for our display. We’ll store the upload configuration into module_post.

Note that the order we upload our models isn’t important - we’ll be establishing the actual process of moving data from one model to the next when we set up our pipeline.

# upload to wallaroo
demand_curve_model = wl.upload_model('demandcurve', "./demand_curve_v1.onnx").configure()
# load the preprocess module
module_pre = wl.upload_model("preprocess", "./preprocess.py").configure('python')
# load the postprocess module
module_post = wl.upload_model("postprocess", "./postprocess.py").configure('python')

With our models uploaded, we’re going to create our own pipeline and give it three steps:

  • First, start with the preprocess module we called module_pre to prepare the data.
  • Second, we apply the data to our demand_curve_model.
  • And finally, we prepare our data for output with the module_post.
# now make a pipeline
demandcurve_pipeline = (wl.build_pipeline("demand-curve-pipeline")
                        .add_model_step(module_pre)
                        .add_model_step(demand_curve_model)
                        .add_model_step(module_post))

And with that - let’s deploy our model pipeline. This usually takes about 45 seconds for the deployment to finish.

demandcurve_pipeline.deploy()
    Waiting for deployment - this will take up to 45s .......... ok

    {'name': 'demand-curve-pipeline', 'create_time': datetime.datetime(2022, 3, 29, 20, 21, 33, 264321, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': '159b9e99-edb6-4c5e-8336-63bc6000623e', 'sha': 'c328e2d5bf0adeb96f37687ab4da32cecf5f2cc789fa3a427ec0dbd2c3b8b663'}]}}, {'ModelInference': {'models': [{'name': 'demandcurve', 'version': '4f5193fc-9c18-4851-8489-42e61d095588', 'sha': '2820b42c9e778ae259918315f25afc8685ecab9967bad0a3d241e6191b414a0d'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '77ee154c-d64c-49dd-985a-96f4c2931b6e', 'sha': '4bd3109602e999a3a5013893cd2eff1a434fd9f06d6e3e681724232db6fdd40d'}]}}]"}

We can check the status of our pipeline to make sure everything was set up correctly:

demandcurve_pipeline.status()
    {'status': 'Running',
     'details': None,
     'engines': [{'ip': '10.12.1.32',
       'name': 'engine-5568595449-kdjz7',
       'status': 'Running',
       'reason': None,
       'pipeline_statuses': {'pipelines': [{'id': 'demand-curve-pipeline',
          'status': 'Running'}]},
       'model_statuses': {'models': [{'name': 'preprocess',
          'version': '159b9e99-edb6-4c5e-8336-63bc6000623e',
          'sha': 'c328e2d5bf0adeb96f37687ab4da32cecf5f2cc789fa3a427ec0dbd2c3b8b663',
          'status': 'Running'},
         {'name': 'demandcurve',
          'version': '4f5193fc-9c18-4851-8489-42e61d095588',
          'sha': '2820b42c9e778ae259918315f25afc8685ecab9967bad0a3d241e6191b414a0d',
          'status': 'Running'},
         {'name': 'postprocess',
          'version': '77ee154c-d64c-49dd-985a-96f4c2931b6e',
          'sha': '4bd3109602e999a3a5013893cd2eff1a434fd9f06d6e3e681724232db6fdd40d',
          'status': 'Running'}]}}],
     'engine_lbs': [{'ip': '10.12.1.31',
       'name': 'engine-lb-85846c64f8-t7pz4',
       'status': 'Running',
       'reason': None}]}

Everything is ready. Let’s feed our pipeline some data. We have some information prepared with the daily_purchases.csv spreadsheet. We’ll start with just one row to make sure that everything is working correctly.

# read in some purchase data
purchases = pandas.read_csv('daily_purchases.csv')

# start with a one-row data frame for testing
subsamp_raw = purchases.iloc[0:1,: ]
subsamp_raw

# create the input dictionary from the original one-line dataframe
input_dict = conversion.pandas_to_dict(subsamp_raw)

result = demandcurve_pipeline.infer(input_dict)
result
    [InferenceResult({'check_failures': [],
      'elapsed': 413114,
      'model_name': 'postprocess',
      'model_version': '77ee154c-d64c-49dd-985a-96f4c2931b6e',
      'original_data': {'colnames': ['Date',
                                     'cust_known',
                                     'StockCode',
                                     'UnitPrice',
                                     'UnitsSold'],
                        'query': [['2010-12-01', False, '21928', 4.21, 1]]},
      'outputs': [{'Json': {'data': [{'original': {'outputs': [{'Double': {'data': [6.68025518653071],
                                                                           'dim': [1,
                                                                                   1],
                                                                           'v': 1}}]},
                                      'prediction': [6.68025518653071]}],
                            'dim': [1],
                            'v': 1}}],
      'pipeline_name': 'demand-curve-pipeline',
      'time': 1648585307936})]

We can see from the prediction field that the demand curve has a predicted slope of 6.68 from our sample data. We can isolate that by specifying just the data output below.

result[0].data()
    [array([6.68025519])]

Bulk Inference

The initial test went perfectly. Now let’s throw some more data into our pipeline. We’ll draw 10 random rows from our spreadsheet, perform an inference from that, and then display the results and the logs showing the pipeline’s actions.

# Let's do 10 rows at once (drawn randomly)
ix = numpy.random.choice(purchases.shape[0], size=10, replace=False)
output = demandcurve_pipeline.infer(conversion.pandas_to_dict(purchases.iloc[ix,: ]))
output[0].data()
    [array([ 6.77154593,  0.        ,  6.77154593, 40.57067889,  6.77154593,
            49.73419364,  6.77154593, 33.12532316,  6.77154593, 33.12532316])]

Undeploy the Pipeline

Once we’ve finished with our demand curve demo, we’ll undeploy the pipeline and give the resources back to our Kubernetes cluster.

demandcurve_pipeline.undeploy()
    {'name': 'demand-curve-pipeline', 'create_time': datetime.datetime(2022, 3, 29, 20, 21, 33, 264321, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'preprocess', 'version': '159b9e99-edb6-4c5e-8336-63bc6000623e', 'sha': 'c328e2d5bf0adeb96f37687ab4da32cecf5f2cc789fa3a427ec0dbd2c3b8b663'}]}}, {'ModelInference': {'models': [{'name': 'demandcurve', 'version': '4f5193fc-9c18-4851-8489-42e61d095588', 'sha': '2820b42c9e778ae259918315f25afc8685ecab9967bad0a3d241e6191b414a0d'}]}}, {'ModelInference': {'models': [{'name': 'postprocess', 'version': '77ee154c-d64c-49dd-985a-96f4c2931b6e', 'sha': '4bd3109602e999a3a5013893cd2eff1a434fd9f06d6e3e681724232db6fdd40d'}]}}]"}

Thank you for being a part of this demonstration. If you have additional questions, please feel free to contact us at Wallaroo.