This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:
This step will use the simulated database simdb
to gather 4 weeks of inference data, then submit the inference request through the asynchronous Pipeline method parallel_infer
. This receives a List of inference data, submits it to the Wallaroo pipeline, then receives the results as a separate list with each inference matched to the input submitted.
The results are then compared against the actual data to see if the model was accurate.
The first step is to import the libraries that we will need.
import json
import os
import datetime
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np
from resources import simdb
from resources import util
import importlib
importlib.reload(util)
pd.set_option('display.max_colwidth', None)
Start a connect to the Wallaroo instance and save the connection into the variable wl
.
# Login through local Wallaroo instance
wl = wallaroo.Client()
The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.
Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix
variable is generated from a random set of 4 ASCII characters. To use the same workspace across the tutorial notebooks, hard code suffix
and verify the workspace name created is is unique across the Wallaroo instance.
workspace_name = f'multiple-replica-forecast-tutorial'
pipeline_name = 'bikedaypipe'
model_name = 'bikedaymodel'
The workspace will be either used or created if it does not exist, along with the pipeline.
The pipeline was configured in the previous step with the forecast model set as a pipeline step, so we don’t need to reconfigure it.
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
pipeline = wl.get_pipeline(pipeline_name)
pipeline
name | bikedaypipe |
---|---|
created | 2024-07-30 14:58:19.198996+00:00 |
last_updated | 2024-07-30 15:10:37.995443+00:00 |
deployed | False |
workspace_id | 12 |
workspace_name | multiple-replica-forecast-tutorial |
arch | x86 |
accel | none |
tags | |
versions | 6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8 |
steps | bikedaymodel |
published | False |
We already uploaded the model in the previous notebook, so we will retrieve the most recent version of that model with wallaroo.client.Client.get_model(model_name)
and use that in our demonstrations.
We will now add the uploaded model as a step for the pipeline, then deploy it. The pipeline configuration will allow for multiple replicas of the pipeline to be deployed and spooled up in the cluster. Each pipeline replica will use 0.25 cpu and 512 Gi RAM.
# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
.replica_autoscale_min_max(minimum=2, maximum=5)
.cpus(0.25)
.memory("512Mi")
.build()
)
pipeline.deploy(deployment_config = deploy_config)
name | bikedaypipe |
---|---|
created | 2024-07-30 14:58:19.198996+00:00 |
last_updated | 2024-07-30 15:12:50.266127+00:00 |
deployed | True |
workspace_id | 12 |
workspace_name | multiple-replica-forecast-tutorial |
arch | x86 |
accel | none |
tags | |
versions | f0248c9e-8ce3-4038-8457-b48dff08adcf, 6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8 |
steps | bikedaymodel |
published | False |
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.28.1.14',
'name': 'engine-85c5f89ff5-dc2vd',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'bikedaypipe',
'status': 'Running',
'version': 'f0248c9e-8ce3-4038-8457-b48dff08adcf'}]},
'model_statuses': {'models': [{'name': 'bikedaymodel',
'sha': '96b4b27039f697f8a36ad15481e2d318cf603995553200b553c53f87a254fb2c',
'status': 'Running',
'version': '2462b419-0477-43c5-99ad-ce4f013561c0'}]}}],
'engine_lbs': [{'ip': '10.28.1.11',
'name': 'engine-lb-6b59985857-hffrq',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': [{'ip': '10.28.1.12',
'name': 'engine-sidekick-bikedaymodel-18-74df7478bf-d4dhw',
'status': 'Running',
'reason': None,
'details': [],
'statuses': '\n'}]}
For this example, we will forecast bike rentals by looking back one month from “today” which will be set as 2011-02-22. The data from 2011-01-23 to 2011-01-27 (the 5 days starting from one month back) are used to generate a forecast for what bike sales will be over the next week from “today”, which will be 2011-02-23 to 2011-03-01.
# retrieve forecast schedule
first_day, analysis_days = util.get_forecast_days()
print(f'Running analysis on {first_day}')
Running analysis on 2011-02-22
# connect to SQL data base
conn = simdb.get_db_connection()
print(f'Bike rentals table: {simdb.tablename}')
# create the query and retrieve data
query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=first_day)
print(query)
data = pd.read_sql_query(query, conn)
data.head()
Bike rentals table: bikerentals
select count from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22')
count | |
---|---|
0 | 986 |
1 | 1416 |
2 | 1985 |
3 | 506 |
4 | 431 |
pd.read_sql_query("select date, count from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22') LIMIT 5", conn)
date | count | |
---|---|---|
0 | 2011-01-23 | 986 |
1 | 2011-01-24 | 1416 |
2 | 2011-01-25 | 1985 |
3 | 2011-01-26 | 506 |
4 | 2011-01-27 | 431 |
df = pd.DataFrame({
'count': [data['count']]
})
df
count | |
---|---|
0 | 0 986 1 1416 2 1985 3 506 4 431 5 1167 6 1098 7 1096 8 1501 9 1360 10 1526 11 1550 12 1708 13 1005 14 1623 15 1712 16 1530 17 1605 18 1538 19 1746 20 1472 21 1589 22 1913 23 1815 24 2115 25 2475 26 2927 27 1635 28 1812 29 1107 30 1450 Name: count, dtype: int64 |
# send data to model for forecast
results = pipeline.infer(df)
results
time | in.count | out.forecast | out.weekly_average | anomaly.count | |
---|---|---|---|---|---|
0 | 2024-07-30 15:13:19.252 | [986, 1416, 1985, 506, 431, 1167, 1098, 1096, 1501, 1360, 1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450] | [1462, 1483, 1497, 1507, 1513, 1518, 1521] | 1500.1428 | 0 |
util.get_forecast_dates(first_day).to_list()
['2011-02-23',
'2011-02-24',
'2011-02-25',
'2011-02-26',
'2011-02-27',
'2011-02-28',
'2011-03-01']
# annotate with the appropriate dates (the next seven days)
resultframe = pd.DataFrame({
'date': util.get_forecast_dates(first_day).to_list(),
'forecast' : results['out.forecast'][0]
})
# # write the new data to the db table "bikeforecast"
resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')
# # display the db table
query = "select date, forecast from bikeforecast"
pd.read_sql_query(query, conn)
date | forecast | |
---|---|---|
0 | 2011-02-23 | 1462 |
1 | 2011-02-24 | 1483 |
2 | 2011-02-25 | 1497 |
3 | 2011-02-26 | 1507 |
4 | 2011-02-27 | 1513 |
5 | 2011-02-28 | 1518 |
6 | 2011-03-01 | 1521 |
Now we’ll go back staring at the “current data” of 2011-03-01, and fetch each week’s data across the month. This will be used to submit 5 inference requests through the Pipeline parallel_infer
method.
The inference data is saved into the inference_data
List - each element in the list will be a separate inference request.
# get our list of items to run through
inference_data = []
content_type = "application/json"
days = []
for day in analysis_days:
print(f"Current date: {day}")
days.append(day)
query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=day)
print(query)
data = pd.read_sql_query(query, conn)
inference_data.append(data.to_dict(orient='list'))
Current date: 2011-03-01
select count from bikerentals where date > DATE(DATE('2011-03-01'), '-1 month') AND date <= DATE('2011-03-01')
Current date: 2011-03-08
select count from bikerentals where date > DATE(DATE('2011-03-08'), '-1 month') AND date <= DATE('2011-03-08')
Current date: 2011-03-15
select count from bikerentals where date > DATE(DATE('2011-03-15'), '-1 month') AND date <= DATE('2011-03-15')
Current date: 2011-03-22
select count from bikerentals where date > DATE(DATE('2011-03-22'), '-1 month') AND date <= DATE('2011-03-22')
Current date: 2011-03-29
select count from bikerentals where date > DATE(DATE('2011-03-29'), '-1 month') AND date <= DATE('2011-03-29')
analysis_days
['2011-03-01', '2011-03-08', '2011-03-15', '2011-03-22', '2011-03-29']
df2 = pd.DataFrame(inference_data)
df2
count | |
---|---|
0 | [1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851] |
1 | [1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133] |
2 | [2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056] |
3 | [1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703] |
4 | [2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703, 2121, 1865, 2210, 2496, 1693, 2028, 2425] |
The List inference_data
will be submitted. Recall that the pipeline deployment can spool up to 5 replicas.
The pipeline parallel_infer(tensor_list, timeout, num_parallel, retries)
asynchronous method performs an inference as defined by the pipeline steps and takes the following arguments:
parallel_infer
is an asynchronous method that returns the Python callback list of tasks. Calling parallel_infer
should be called with the await
keyword to retrieve the callback results.
For more details, see the Wallaroo parallel inferences guide.
parallel_results = await pipeline.parallel_infer(tensor=df2, timeout=20, num_parallel=16, retries=2)
display(parallel_results)
time | in.count | out.forecast | out.weekly_average | anomaly.count | |
---|---|---|---|---|---|
0 | 2024-07-30 15:13:26.042 | [1526, 1550, 1708, 1005, 1623, 1712, 1530, 1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851] | [1764, 1749, 1743, 1741, 1740, 1740, 1740] | 1745.2858 | 0 |
1 | 2024-07-30 15:13:26.041 | [1605, 1538, 1746, 1472, 1589, 1913, 1815, 2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133] | [1735, 1858, 1755, 1841, 1770, 1829, 1780] | 1795.4286 | 0 |
2 | 2024-07-30 15:13:26.042 | [2115, 2475, 2927, 1635, 1812, 1107, 1450, 1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056] | [1878, 1851, 1858, 1856, 1857, 1856, 1856] | 1858.8572 | 0 |
3 | 2024-07-30 15:13:26.052 | [1917, 1807, 1461, 1969, 2402, 1446, 1851, 2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703] | [2363, 2316, 2277, 2243, 2215, 2192, 2172] | 2254.0000 | 0 |
4 | 2024-07-30 15:13:26.052 | [2134, 1685, 1944, 2077, 605, 1872, 2133, 1891, 623, 1977, 2132, 2417, 2046, 2056, 2192, 2744, 3239, 3117, 2471, 2077, 2703, 2121, 1865, 2210, 2496, 1693, 2028, 2425] | [2225, 2133, 2113, 2109, 2108, 2108, 2108] | 2129.1428 | 0 |
With our results, we’ll merge the results we have into the days we were looking to analyze. Then we can upload the results into the sample database and display the results.
# merge the days and the results
days_results = list(zip(days, parallel_results['out.forecast'].values))
# upload to the database
for day_result in days_results:
# display(day_result[0])
resultframe = pd.DataFrame({
'date' : util.get_forecast_dates(day_result[0]),
'forecast' : day_result[1][0]
})
display(resultframe)
resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')
date | forecast | |
---|---|---|
0 | 2011-03-02 | 1764 |
1 | 2011-03-03 | 1764 |
2 | 2011-03-04 | 1764 |
3 | 2011-03-05 | 1764 |
4 | 2011-03-06 | 1764 |
5 | 2011-03-07 | 1764 |
6 | 2011-03-08 | 1764 |
date | forecast | |
---|---|---|
0 | 2011-03-09 | 1735 |
1 | 2011-03-10 | 1735 |
2 | 2011-03-11 | 1735 |
3 | 2011-03-12 | 1735 |
4 | 2011-03-13 | 1735 |
5 | 2011-03-14 | 1735 |
6 | 2011-03-15 | 1735 |
date | forecast | |
---|---|---|
0 | 2011-03-16 | 1878 |
1 | 2011-03-17 | 1878 |
2 | 2011-03-18 | 1878 |
3 | 2011-03-19 | 1878 |
4 | 2011-03-20 | 1878 |
5 | 2011-03-21 | 1878 |
6 | 2011-03-22 | 1878 |
date | forecast | |
---|---|---|
0 | 2011-03-23 | 2363 |
1 | 2011-03-24 | 2363 |
2 | 2011-03-25 | 2363 |
3 | 2011-03-26 | 2363 |
4 | 2011-03-27 | 2363 |
5 | 2011-03-28 | 2363 |
6 | 2011-03-29 | 2363 |
date | forecast | |
---|---|---|
0 | 2011-03-30 | 2225 |
1 | 2011-03-31 | 2225 |
2 | 2011-04-01 | 2225 |
3 | 2011-04-02 | 2225 |
4 | 2011-04-03 | 2225 |
5 | 2011-04-04 | 2225 |
6 | 2011-04-05 | 2225 |
On April 1st, we can compare March forecasts to actuals
query = f'''SELECT bikeforecast.date AS date, forecast, count AS actual
FROM bikeforecast LEFT JOIN bikerentals
ON bikeforecast.date = bikerentals.date
WHERE bikeforecast.date >= DATE('2011-03-01')
AND bikeforecast.date < DATE('2011-04-01')
ORDER BY 1'''
print(query)
comparison = pd.read_sql_query(query, conn)
comparison
SELECT bikeforecast.date AS date, forecast, count AS actual
FROM bikeforecast LEFT JOIN bikerentals
ON bikeforecast.date = bikerentals.date
WHERE bikeforecast.date >= DATE('2011-03-01')
AND bikeforecast.date < DATE('2011-04-01')
ORDER BY 1
date | forecast | actual | |
---|---|---|---|
0 | 2011-03-01 | 1521 | 1851 |
1 | 2011-03-02 | 1764 | 2134 |
2 | 2011-03-03 | 1764 | 1685 |
3 | 2011-03-04 | 1764 | 1944 |
4 | 2011-03-05 | 1764 | 2077 |
5 | 2011-03-06 | 1764 | 605 |
6 | 2011-03-07 | 1764 | 1872 |
7 | 2011-03-08 | 1764 | 2133 |
8 | 2011-03-09 | 1735 | 1891 |
9 | 2011-03-10 | 1735 | 623 |
10 | 2011-03-11 | 1735 | 1977 |
11 | 2011-03-12 | 1735 | 2132 |
12 | 2011-03-13 | 1735 | 2417 |
13 | 2011-03-14 | 1735 | 2046 |
14 | 2011-03-15 | 1735 | 2056 |
15 | 2011-03-16 | 1878 | 2192 |
16 | 2011-03-17 | 1878 | 2744 |
17 | 2011-03-18 | 1878 | 3239 |
18 | 2011-03-19 | 1878 | 3117 |
19 | 2011-03-20 | 1878 | 2471 |
20 | 2011-03-21 | 1878 | 2077 |
21 | 2011-03-22 | 1878 | 2703 |
22 | 2011-03-23 | 2363 | 2121 |
23 | 2011-03-24 | 2363 | 1865 |
24 | 2011-03-25 | 2363 | 2210 |
25 | 2011-03-26 | 2363 | 2496 |
26 | 2011-03-27 | 2363 | 1693 |
27 | 2011-03-28 | 2363 | 2028 |
28 | 2011-03-29 | 2363 | 2425 |
29 | 2011-03-30 | 2225 | 1536 |
30 | 2011-03-31 | 2225 | 1685 |
Undeploy the pipeline and return the resources back to the Wallaroo instance.
conn.close()
pipeline.undeploy()
name | bikedaypipe |
---|---|
created | 2024-07-30 14:58:19.198996+00:00 |
last_updated | 2024-07-30 15:12:50.266127+00:00 |
deployed | False |
workspace_id | 12 |
workspace_name | multiple-replica-forecast-tutorial |
arch | x86 |
accel | none |
tags | |
versions | f0248c9e-8ce3-4038-8457-b48dff08adcf, 6ec6fc90-23c5-4d4a-8eab-71d636c3b6aa, fa29e9f1-3788-4b80-8f24-c7a7d34bca4d, 211ff7fd-8b2c-4027-abad-337465a5c3d1, d9a483b9-8b85-44d2-a556-c0cabf3285ab, 23fc8432-898e-4a3d-a9ae-b50200fea111, 7298a905-3a4b-452b-80fe-7429fe9984b8 |
steps | bikedaymodel |
published | False |