Statsmodel Forecast with Wallaroo Features: Parallel Infer and Database Simulation

Performing database driven inferences against the Statsmodel bike rentals model.

This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.

Statsmodel Forecast with Wallaroo Features: Parallel Inference and Simulated Database Inferences

This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:

  • Create and Train the Model: This first notebook shows how the model is trained from existing data.
  • Deploy and Sample Inference: With the model developed, we will deploy it into Wallaroo and perform a sample inference.
  • Parallel Infer: A sample of multiple weeks of data will be retrieved and submitted as an asynchronous parallel inference. The results will be collected and uploaded to a sample database.
  • External Connection: A sample data connection to Google BigQuery to retrieve input data and store the results in a table.
  • ML Workload Orchestration: Take all of the previous steps and automate the request into a single Wallaroo ML Workload Orchestration.

This step will use the simulated database simdb to gather 4 weeks of inference data, then submit the inference request through the asynchronous Pipeline method parallel_infer. This receives a List of inference data, submits it to the Wallaroo pipeline, then receives the results as a separate list with each inference matched to the input submitted.

The results are then compared against the actual data to see if the model was accurate.

Prerequisites

  • A Wallaroo instance version 2023.2.1 or greater.

References

Parallel Infer Steps

Import Libraries

The first step is to import the libraries that we will need.

import json
import os
import datetime

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np

from resources import simdb
from resources import util

import importlib
importlib.reload(util)

pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)

Initialize connection

Start a connect to the Wallaroo instance and save the connection into the variable wl.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Set Configurations

The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.

Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix variable is generated from a random set of 4 ASCII characters. To use the same workspace across the tutorial notebooks, hard code suffix and verify the workspace name created is is unique across the Wallaroo instance.

workspace_name = f'multiple-replica-forecast-tutorial'
pipeline_name = 'bikedaypipe'
model_name = 'bikedaymodel'

Set the Workspace and Pipeline

The workspace will be either used or created if it does not exist, along with the pipeline.

The pipeline was configured in the previous step with the forecast model set as a pipeline step, so we don’t need to reconfigure it.

workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)

wl.set_current_workspace(workspace)

pipeline = wl.get_pipeline(pipeline_name)
pipeline
namebikedaypipe
created2024-07-30 14:58:19.198996+00:00
last_updated2024-07-30 15:10:37.995443+00:00
deployedFalse
workspace_id12
workspace_namemultiple-replica-forecast-tutorial
archx86
accelnone
tags
versions6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8
stepsbikedaymodel
publishedFalse

We already uploaded the model in the previous notebook, so we will retrieve the most recent version of that model with wallaroo.client.Client.get_model(model_name) and use that in our demonstrations.

Deploy the Pipeline

We will now add the uploaded model as a step for the pipeline, then deploy it. The pipeline configuration will allow for multiple replicas of the pipeline to be deployed and spooled up in the cluster. Each pipeline replica will use 0.25 cpu and 512 Gi RAM.

# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
                        .replica_count(1)
                        .replica_autoscale_min_max(minimum=2, maximum=5)
                        .cpus(0.25)
                        .memory("512Mi")
                        .build()
                    )

pipeline.deploy(deployment_config = deploy_config)
namebikedaypipe
created2024-07-30 14:58:19.198996+00:00
last_updated2024-07-30 15:12:50.266127+00:00
deployedTrue
workspace_id12
workspace_namemultiple-replica-forecast-tutorial
archx86
accelnone
tags
versionsf0248c9e-8ce3-4038-8457-b48dff08adcf, 6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8
stepsbikedaymodel
publishedFalse
pipeline.status()
{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.28.1.14',
   'name': 'engine-85c5f89ff5-dc2vd',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'bikedaypipe',
      'status': 'Running',
      'version': 'f0248c9e-8ce3-4038-8457-b48dff08adcf'}]},
   'model_statuses': {'models': [{'name': 'bikedaymodel',
      'sha': '96b4b27039f697f8a36ad15481e2d318cf603995553200b553c53f87a254fb2c',
      'status': 'Running',
      'version': '2462b419-0477-43c5-99ad-ce4f013561c0'}]}}],
 'engine_lbs': [{'ip': '10.28.1.11',
   'name': 'engine-lb-6b59985857-hffrq',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': [{'ip': '10.28.1.12',
   'name': 'engine-sidekick-bikedaymodel-18-74df7478bf-d4dhw',
   'status': 'Running',
   'reason': None,
   'details': [],
   'statuses': '\n'}]}

Run Inference

For this example, we will forecast bike rentals by looking back one month from “today” which will be set as 2011-02-22. The data from 2011-01-23 to 2011-01-27 (the 5 days starting from one month back) are used to generate a forecast for what bike sales will be over the next week from “today”, which will be 2011-02-23 to 2011-03-01.

# retrieve forecast schedule
first_day, analysis_days = util.get_forecast_days()

print(f'Running analysis on {first_day}')
Running analysis on 2011-02-22
# connect to SQL data base 
conn = simdb.get_db_connection()
print(f'Bike rentals table: {simdb.tablename}')

# create the query and retrieve data
query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=first_day)
print(query)
data = pd.read_sql_query(query, conn)
data.head()
Bike rentals table: bikerentals
select count from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22')
count
0986
11416
21985
3506
4431
pd.read_sql_query("select date, count from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22') LIMIT 5", conn)
datecount
02011-01-23986
12011-01-241416
22011-01-251985
32011-01-26506
42011-01-27431
df = pd.DataFrame({
    'count': [data['count']]
})
df
count
00 986 1 1416 2 1985 3 506 4 431 5 1167 6 1098 7 1096 8 1501 9 1360 10 1526 11 1550 12 1708 13 1005 14 1623 15 1712 16 1530 17 1605 18 1538 19 1746 20 1472 21 1589 22 1913 23 1815 24 2115 25 2475 26 2927 27 1635 28 1812 29 1107 30 1450 Name: count, dtype: int64
# send data to model for forecast

results = pipeline.infer(df)
results
timein.countout.forecastout.weekly_averageanomaly.count
02024-07-30 15:13:19.252[986, 1416, 1985, 506, 431, 1167, 1098, 1096, 1501, 1360, 1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450][1462, 1483, 1497, 1507, 1513, 1518, 1521]1500.14280
util.get_forecast_dates(first_day).to_list()
['2011-02-23',
 '2011-02-24',
 '2011-02-25',
 '2011-02-26',
 '2011-02-27',
 '2011-02-28',
 '2011-03-01']
# annotate with the appropriate dates (the next seven days)
resultframe = pd.DataFrame({
     'date': util.get_forecast_dates(first_day).to_list(),
     'forecast' : results['out.forecast'][0]
})

# # write the new data to the db table "bikeforecast"
resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')

# # display the db table
query = "select date, forecast from bikeforecast"
pd.read_sql_query(query, conn)
dateforecast
02011-02-231462
12011-02-241483
22011-02-251497
32011-02-261507
42011-02-271513
52011-02-281518
62011-03-011521

Four Weeks of Inference Data

Now we’ll go back staring at the “current data” of 2011-03-01, and fetch each week’s data across the month. This will be used to submit 5 inference requests through the Pipeline parallel_infer method.

The inference data is saved into the inference_data List - each element in the list will be a separate inference request.

# get our list of items to run through

inference_data = []

content_type = "application/json"

days = []

for day in analysis_days:
    print(f"Current date: {day}")
    days.append(day)
    query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=day)
    print(query)
    data = pd.read_sql_query(query, conn)
    inference_data.append(data.to_dict(orient='list'))
Current date: 2011-03-01
select count from bikerentals where date > DATE(DATE('2011-03-01'), '-1 month') AND date <= DATE('2011-03-01')
Current date: 2011-03-08
select count from bikerentals where date > DATE(DATE('2011-03-08'), '-1 month') AND date <= DATE('2011-03-08')
Current date: 2011-03-15
select count from bikerentals where date > DATE(DATE('2011-03-15'), '-1 month') AND date <= DATE('2011-03-15')
Current date: 2011-03-22
select count from bikerentals where date > DATE(DATE('2011-03-22'), '-1 month') AND date <= DATE('2011-03-22')
Current date: 2011-03-29
select count from bikerentals where date > DATE(DATE('2011-03-29'), '-1 month') AND date <= DATE('2011-03-29')
analysis_days
['2011-03-01', '2011-03-08', '2011-03-15', '2011-03-22', '2011-03-29']
df2 = pd.DataFrame(inference_data)
df2
count
0[1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851]
1[1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133]
2[2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056]
3[1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703]
4[2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703, 2121, 1865, 2210, 2496, 1693, 2028, 2425]

Parallel Inference Request

The List inference_data will be submitted. Recall that the pipeline deployment can spool up to 5 replicas.

The pipeline parallel_infer(tensor, timeout, num_parallel, retries) method performs an inference as defined by the pipeline steps and takes the following arguments:

  • tensor (REQUIRED List): The data submitted to the pipeline for inference as a List of the supported data types:
    • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
    • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
  • timeout (OPTIONAL int): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
  • num_parallel (OPTIONAL int): The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
  • retries (OPTIONAL int): The number of retries per inference request submitted.

parallel_infer is an asynchronous method that returns the Python callback list of tasks. Calling parallel_infer should be called with the await keyword to retrieve the callback results.

For more details, see the Wallaroo parallel inferences guide.

parallel_results = await pipeline.parallel_infer(tensor=df2, timeout=20, num_parallel=16, retries=2)

display(parallel_results)
timein.countout.forecastout.weekly_averageanomaly.count
02024-07-30 15:13:26.042[1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851][1764, 1749, 1743, 1741, 1740, 1740, 1740]1745.28580
12024-07-30 15:13:26.041[1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133][1735, 1858, 1755, 1841, 1770, 1829, 1780]1795.42860
22024-07-30 15:13:26.042[2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056][1878, 1851, 1858, 1856, 1857, 1856, 1856]1858.85720
32024-07-30 15:13:26.052[1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703][2363, 2316, 2277, 2243, 2215, 2192, 2172]2254.00000
42024-07-30 15:13:26.052[2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703, 2121, 1865, 2210, 2496, 1693, 2028, 2425][2225, 2133, 2113, 2109, 2108, 2108, 2108]2129.14280

Upload into DataBase

With our results, we’ll merge the results we have into the days we were looking to analyze. Then we can upload the results into the sample database and display the results.

# merge the days and the results

days_results = list(zip(days, parallel_results['out.forecast'].values))
# upload to the database
for day_result in days_results:
    # display(day_result[0])
    resultframe = pd.DataFrame({
        'date' : util.get_forecast_dates(day_result[0]),
        'forecast' : day_result[1][0]
    })
    display(resultframe)
    resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')
dateforecast
02011-03-021764
12011-03-031764
22011-03-041764
32011-03-051764
42011-03-061764
52011-03-071764
62011-03-081764
dateforecast
02011-03-091735
12011-03-101735
22011-03-111735
32011-03-121735
42011-03-131735
52011-03-141735
62011-03-151735
dateforecast
02011-03-161878
12011-03-171878
22011-03-181878
32011-03-191878
42011-03-201878
52011-03-211878
62011-03-221878
dateforecast
02011-03-232363
12011-03-242363
22011-03-252363
32011-03-262363
42011-03-272363
52011-03-282363
62011-03-292363
dateforecast
02011-03-302225
12011-03-312225
22011-04-012225
32011-04-022225
42011-04-032225
52011-04-042225
62011-04-052225

On April 1st, we can compare March forecasts to actuals

query = f'''SELECT bikeforecast.date AS date, forecast, count AS actual
            FROM bikeforecast LEFT JOIN bikerentals
            ON bikeforecast.date = bikerentals.date
            WHERE bikeforecast.date >= DATE('2011-03-01')
            AND bikeforecast.date <  DATE('2011-04-01')
            ORDER BY 1'''

print(query)

comparison = pd.read_sql_query(query, conn)
comparison
SELECT bikeforecast.date AS date, forecast, count AS actual
            FROM bikeforecast LEFT JOIN bikerentals
            ON bikeforecast.date = bikerentals.date
            WHERE bikeforecast.date >= DATE('2011-03-01')
            AND bikeforecast.date <  DATE('2011-04-01')
            ORDER BY 1
dateforecastactual
02011-03-0115211851
12011-03-0217642134
22011-03-0317641685
32011-03-0417641944
42011-03-0517642077
52011-03-061764605
62011-03-0717641872
72011-03-0817642133
82011-03-0917351891
92011-03-101735623
102011-03-1117351977
112011-03-1217352132
122011-03-1317352417
132011-03-1417352046
142011-03-1517352056
152011-03-1618782192
162011-03-1718782744
172011-03-1818783239
182011-03-1918783117
192011-03-2018782471
202011-03-2118782077
212011-03-2218782703
222011-03-2323632121
232011-03-2423631865
242011-03-2523632210
252011-03-2623632496
262011-03-2723631693
272011-03-2823632028
282011-03-2923632425
292011-03-3022251536
302011-03-3122251685

Undeploy the Pipeline

Undeploy the pipeline and return the resources back to the Wallaroo instance.

conn.close()
pipeline.undeploy()
namebikedaypipe
created2024-07-30 14:58:19.198996+00:00
last_updated2024-07-30 15:12:50.266127+00:00
deployedFalse
workspace_id12
workspace_namemultiple-replica-forecast-tutorial
archx86
accelnone
tags
versionsf0248c9e-8ce3-4038-8457-b48dff08adcf, 6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8
stepsbikedaymodel
publishedFalse