1 - Statsmodel Forecast with Wallaroo Features: Deploy and Test Infer
Deploy the sample Statsmodel and perform sample inferences.
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Statsmodel Forecast with Wallaroo Features: Deploy and Test Infer
This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:
- Create and Train the Model: This first notebook shows how the model is trained from existing data.
- Deploy and Sample Inference: With the model developed, we will deploy it into Wallaroo and perform a sample inference.
- Parallel Infer: A sample of multiple weeks of data will be retrieved and submitted as an asynchronous parallel inference. The results will be collected and uploaded to a sample database.
- External Connection: A sample data connection to Google BigQuery to retrieve input data and store the results in a table.
- ML Workload Orchestration: Take all of the previous steps and automate the request into a single Wallaroo ML Workload Orchestration.
In the previous step “Statsmodel Forecast with Wallaroo Features: Model Creation”, the statsmodel was trained and saved to the Python file forecast.py
. This file will now be uploaded to a Wallaroo instance as a Python model, then used for sample inferences.
Prerequisites
- A Wallaroo instance version 2023.2.1 or greater.
References
Tutorial Steps
Import Libraries
The first step is to import the libraries that we will need.
import json
import os
import datetime
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)
'2023.2.1rc2'
Initialize connection
Start a connect to the Wallaroo instance and save the connection into the variable wl
.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Set Configurations
The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.
Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix
variable is generated from a random set of 4 ASCII characters. To use the same workspace each time, hard code suffix
and verify the workspace name created is is unique across the Wallaroo instance.
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = f'multiple-replica-forecast-tutorial-{suffix}'
pipeline_name = 'bikedaypipe'
model_name = 'bikedaymodel'
Set the Workspace and Pipeline
The workspace will be either used or created if it does not exist, along with the pipeline.
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload Model
The Python model created in “Forecast and Parallel Infer with Statsmodel: Model Creation” will now be uploaded. Note that the Framework and runtime are set to python
.
model_file_name = 'forecast.py'
bike_day_model = wl.upload_model(model_name, model_file_name, Framework.PYTHON).configure(runtime="python")
Deploy the Pipeline
We will now add the uploaded model as a step for the pipeline, then deploy it. The pipeline configuration will allow for multiple replicas of the pipeline to be deployed and spooled up in the cluster. Each pipeline replica will use 0.25 cpu and 512 Gi RAM.
# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
.replica_count(1)
.replica_autoscale_min_max(minimum=2, maximum=5)
.cpus(0.25)
.memory("512Mi")
.build()
)
pipeline.add_model_step(bike_day_model).deploy(deployment_config = deploy_config)
name | bikedaypipe |
---|
created | 2023-07-14 15:50:50.014326+00:00 |
---|
last_updated | 2023-07-14 15:50:52.029628+00:00 |
---|
deployed | True |
---|
tags | |
---|
versions | 7aae4653-9e9f-468c-b266-4433be652313, 48983f9b-7c43-41fe-9688-df72a6aa55e9 |
---|
steps | bikedaymodel |
---|
Run Inference
Run a test inference to verify the pipeline is operational from the sample test data stored in ./data/testdata_dict.json
.
inferencedata = json.load(open("./data/testdata_dict.json"))
results = pipeline.infer(inferencedata)
display(results)
[{'forecast': [1764, 1749, 1743, 1741, 1740, 1740, 1740]}]
Undeploy the Pipeline
Undeploy the pipeline and return the resources back to the Wallaroo instance.
name | bikedaypipe |
---|
created | 2023-07-14 15:50:50.014326+00:00 |
---|
last_updated | 2023-07-14 15:50:52.029628+00:00 |
---|
deployed | False |
---|
tags | |
---|
versions | 7aae4653-9e9f-468c-b266-4433be652313, 48983f9b-7c43-41fe-9688-df72a6aa55e9 |
---|
steps | bikedaymodel |
---|
2 - Statsmodel Forecast with Wallaroo Features: Parallel Inference
Performing parallel inferences against the Statsmodel bike rentals model.
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Statsmodel Forecast with Wallaroo Features: Parallel Inference
This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:
- Create and Train the Model: This first notebook shows how the model is trained from existing data.
- Deploy and Sample Inference: With the model developed, we will deploy it into Wallaroo and perform a sample inference.
- Parallel Infer: A sample of multiple weeks of data will be retrieved and submitted as an asynchronous parallel inference. The results will be collected and uploaded to a sample database.
- External Connection: A sample data connection to Google BigQuery to retrieve input data and store the results in a table.
- ML Workload Orchestration: Take all of the previous steps and automate the request into a single Wallaroo ML Workload Orchestration.
This step will use the simulated database simdb
to gather 4 weeks of inference data, then submit the inference request through the asynchronous Pipeline method parallel_infer
. This receives a List of inference data, submits it to the Wallaroo pipeline, then receives the results as a separate list with each inference matched to the input submitted.
The results are then compared against the actual data to see if the model was accurate.
Prerequisites
- A Wallaroo instance version 2023.2.1 or greater.
References
Parallel Infer Steps
Import Libraries
The first step is to import the libraries that we will need.
import json
import os
import datetime
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np
from resources import simdb
from resources import util
pd.set_option('display.max_colwidth', None)
display(wallaroo.__version__)
'2023.2.1rc2'
Initialize connection
Start a connect to the Wallaroo instance and save the connection into the variable wl
.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Set Configurations
The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.
Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix
variable is generated from a random set of 4 ASCII characters. To use the same workspace across the tutorial notebooks, hard code suffix
and verify the workspace name created is is unique across the Wallaroo instance.
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = f'multiple-replica-forecast-tutorial-{suffix}'
pipeline_name = 'bikedaypipe'
model_name = 'bikedaymodel'
Set the Workspace and Pipeline
The workspace will be either used or created if it does not exist, along with the pipeline.
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
model_file_name = 'forecast.py'
bike_day_model = wl.upload_model(model_name, model_file_name, Framework.PYTHON).configure(runtime="python")
Upload Model
The Python model created in “Forecast and Parallel Infer with Statsmodel: Model Creation” will now be uploaded. Note that the Framework and runtime are set to python
.
pipeline.add_model_step(bike_day_model)
name | bikedaypipe |
---|
created | 2023-07-14 15:50:50.014326+00:00 |
---|
last_updated | 2023-07-14 15:50:52.029628+00:00 |
---|
deployed | False |
---|
tags | |
---|
versions | 7aae4653-9e9f-468c-b266-4433be652313, 48983f9b-7c43-41fe-9688-df72a6aa55e9 |
---|
steps | bikedaymodel |
---|
Deploy the Pipeline
We will now add the uploaded model as a step for the pipeline, then deploy it. The pipeline configuration will allow for multiple replicas of the pipeline to be deployed and spooled up in the cluster. Each pipeline replica will use 0.25 cpu and 512 Gi RAM.
# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
.replica_count(1)
.replica_autoscale_min_max(minimum=2, maximum=5)
.cpus(0.25)
.memory("512Mi")
.build()
)
pipeline.deploy(deployment_config = deploy_config)
name | bikedaypipe |
---|
created | 2023-07-14 15:53:07.284131+00:00 |
---|
last_updated | 2023-07-14 15:56:07.413409+00:00 |
---|
deployed | True |
---|
tags | |
---|
versions | 9c67dd93-014c-4cc9-9b44-549829e613ad, 258dafaf-c272-4bda-881b-5998a4a9be26 |
---|
steps | bikedaymodel |
---|
Run Inference
For this example, we will forecast bike rentals by looking back one month from “today” which will be set as 2011-02-22. The data from 2011-01-23 to 2011-01-27 (the 5 days starting from one month back) are used to generate a forecast for what bike sales will be over the next week from “today”, which will be 2011-02-23 to 2011-03-01.
# retrieve forecast schedule
first_day, analysis_days = util.get_forecast_days()
print(f'Running analysis on {first_day}')
Running analysis on 2011-02-22
# connect to SQL data base
conn = simdb.get_db_connection()
print(f'Bike rentals table: {simdb.tablename}')
# create the query and retrieve data
query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=first_day)
print(query)
data = pd.read_sql_query(query, conn)
data.head()
Bike rentals table: bikerentals
select cnt from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22')
|
cnt |
0 |
986 |
1 |
1416 |
2 |
1985 |
3 |
506 |
4 |
431 |
pd.read_sql_query("select date, cnt from bikerentals where date > DATE(DATE('2011-02-22'), '-1 month') AND date <= DATE('2011-02-22') LIMIT 5", conn)
|
date |
cnt |
0 |
2011-01-23 |
986 |
1 |
2011-01-24 |
1416 |
2 |
2011-01-25 |
1985 |
3 |
2011-01-26 |
506 |
4 |
2011-01-27 |
431 |
# send data to model for forecast
results = pipeline.infer(data.to_dict(orient='list'))[0]
results
{'forecast': [1462, 1483, 1497, 1507, 1513, 1518, 1521]}
# annotate with the appropriate dates (the next seven days)
resultframe = pd.DataFrame({
'date' : util.get_forecast_dates(first_day),
'forecast' : results['forecast']
})
# write the new data to the db table "bikeforecast"
resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')
# display the db table
query = "select date, forecast from bikeforecast"
pd.read_sql_query(query, conn)
|
date |
forecast |
0 |
2011-02-23 |
1462 |
1 |
2011-02-24 |
1483 |
2 |
2011-02-25 |
1497 |
3 |
2011-02-26 |
1507 |
4 |
2011-02-27 |
1513 |
5 |
2011-02-28 |
1518 |
6 |
2011-03-01 |
1521 |
Four Weeks of Inference Data
Now we’ll go back staring at the “current data” of 2011-03-01, and fetch each week’s data across the month. This will be used to submit 5 inference requests through the Pipeline parallel_infer
method.
The inference data is saved into the inference_data
List - each element in the list will be a separate inference request.
# get our list of items to run through
inference_data = []
content_type = "application/json"
days = []
for day in analysis_days:
print(f"Current date: {day}")
days.append(day)
query = util.mk_dt_range_query(tablename=simdb.tablename, forecast_day=day)
print(query)
data = pd.read_sql_query(query, conn)
inference_data.append(data.to_dict(orient='list'))
Current date: 2011-03-01
select cnt from bikerentals where date > DATE(DATE('2011-03-01'), '-1 month') AND date <= DATE('2011-03-01')
Current date: 2011-03-08
select cnt from bikerentals where date > DATE(DATE('2011-03-08'), '-1 month') AND date <= DATE('2011-03-08')
Current date: 2011-03-15
select cnt from bikerentals where date > DATE(DATE('2011-03-15'), '-1 month') AND date <= DATE('2011-03-15')
Current date: 2011-03-22
select cnt from bikerentals where date > DATE(DATE('2011-03-22'), '-1 month') AND date <= DATE('2011-03-22')
Current date: 2011-03-29
select cnt from bikerentals where date > DATE(DATE('2011-03-29'), '-1 month') AND date <= DATE('2011-03-29')
Parallel Inference Request
The List inference_data
will be submitted. Recall that the pipeline deployment can spool up to 5 replicas.
The pipeline parallel_infer(tensor_list, timeout, num_parallel, retries)
asynchronous method performs an inference as defined by the pipeline steps and takes the following arguments:
- tensor_list (REQUIRED List): The data submitted to the pipeline for inference as a List of the supported data types:
- pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
- Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
- timeout (OPTIONAL int): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
- num_parallel (OPTIONAL int): The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
- retries (OPTIONAL int): The number of retries per inference request submitted.
parallel_infer
is an asynchronous method that returns the Python callback list of tasks. Calling parallel_infer
should be called with the await
keyword to retrieve the callback results.
For more details, see the Wallaroo parallel inferences guide.
parallel_results = await pipeline.parallel_infer(tensor_list=inference_data, timeout=20, num_parallel=16, retries=2)
display(parallel_results)
[[{'forecast': [1764, 1749, 1743, 1741, 1740, 1740, 1740]}],
[{'forecast': [1735, 1858, 1755, 1841, 1770, 1829, 1780]}],
[{'forecast': [1878, 1851, 1858, 1856, 1857, 1856, 1856]}],
[{'forecast': [2363, 2316, 2277, 2243, 2215, 2192, 2172]}],
[{'forecast': [2225, 2133, 2113, 2109, 2108, 2108, 2108]}]]
Upload into DataBase
With our results, we’ll merge the results we have into the days we were looking to analyze. Then we can upload the results into the sample database and display the results.
# merge the days and the results
days_results = list(zip(days, parallel_results))
# upload to the database
for day_result in days_results:
resultframe = pd.DataFrame({
'date' : util.get_forecast_dates(day_result[0]),
'forecast' : day_result[1][0]['forecast']
})
resultframe.to_sql('bikeforecast', conn, index=False, if_exists='append')
On April 1st, we can compare March forecasts to actuals
query = f'''SELECT bikeforecast.date AS date, forecast, cnt AS actual
FROM bikeforecast LEFT JOIN bikerentals
ON bikeforecast.date = bikerentals.date
WHERE bikeforecast.date >= DATE('2011-03-01')
AND bikeforecast.date < DATE('2011-04-01')
ORDER BY 1'''
print(query)
comparison = pd.read_sql_query(query, conn)
comparison
SELECT bikeforecast.date AS date, forecast, cnt AS actual
FROM bikeforecast LEFT JOIN bikerentals
ON bikeforecast.date = bikerentals.date
WHERE bikeforecast.date >= DATE('2011-03-01')
AND bikeforecast.date < DATE('2011-04-01')
ORDER BY 1
|
date |
forecast |
actual |
0 |
2011-03-02 |
1764 |
2134 |
1 |
2011-03-03 |
1749 |
1685 |
2 |
2011-03-04 |
1743 |
1944 |
3 |
2011-03-05 |
1741 |
2077 |
4 |
2011-03-06 |
1740 |
605 |
5 |
2011-03-07 |
1740 |
1872 |
6 |
2011-03-08 |
1740 |
2133 |
7 |
2011-03-09 |
1735 |
1891 |
8 |
2011-03-10 |
1858 |
623 |
9 |
2011-03-11 |
1755 |
1977 |
10 |
2011-03-12 |
1841 |
2132 |
11 |
2011-03-13 |
1770 |
2417 |
12 |
2011-03-14 |
1829 |
2046 |
13 |
2011-03-15 |
1780 |
2056 |
14 |
2011-03-16 |
1878 |
2192 |
15 |
2011-03-17 |
1851 |
2744 |
16 |
2011-03-18 |
1858 |
3239 |
17 |
2011-03-19 |
1856 |
3117 |
18 |
2011-03-20 |
1857 |
2471 |
19 |
2011-03-21 |
1856 |
2077 |
20 |
2011-03-22 |
1856 |
2703 |
21 |
2011-03-23 |
2363 |
2121 |
22 |
2011-03-24 |
2316 |
1865 |
23 |
2011-03-25 |
2277 |
2210 |
24 |
2011-03-26 |
2243 |
2496 |
25 |
2011-03-27 |
2215 |
1693 |
26 |
2011-03-28 |
2192 |
2028 |
27 |
2011-03-29 |
2172 |
2425 |
28 |
2011-03-30 |
2225 |
1536 |
29 |
2011-03-31 |
2133 |
1685 |
Undeploy the Pipeline
Undeploy the pipeline and return the resources back to the Wallaroo instance.
conn.close()
pipeline.undeploy()
name | bikedaypipe |
---|
created | 2023-07-14 15:53:07.284131+00:00 |
---|
last_updated | 2023-07-14 15:56:07.413409+00:00 |
---|
deployed | False |
---|
tags | |
---|
versions | 9c67dd93-014c-4cc9-9b44-549829e613ad, 258dafaf-c272-4bda-881b-5998a4a9be26 |
---|
steps | bikedaymodel |
---|
3 - Statsmodel Forecast with Wallaroo Features: Data Connection
Using an external data connection for inference inputs and results with the bike rental prediction Statsmodel model.
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Statsmodel Forecast with Wallaroo Features: Data Connection
This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:
- Create and Train the Model: This first notebook shows how the model is trained from existing data.
- Deploy and Sample Inference: With the model developed, we will deploy it into Wallaroo and perform a sample inference.
- Parallel Infer: A sample of multiple weeks of data will be retrieved and submitted as an asynchronous parallel inference. The results will be collected and uploaded to a sample database.
- External Connection: A sample data connection to Google BigQuery to retrieve input data and store the results in a table.
- ML Workload Orchestration: Take all of the previous steps and automate the request into a single Wallaroo ML Workload Orchestration.
For this step, we will use a Google BigQuery dataset to retrieve the inference information, predict the next month of sales, then store those predictions into another table. This will use the Wallaroo Connection feature to create a Connection, assign it to our workspace, then perform our inferences by using the Connection details to connect to the BigQuery dataset and tables.
Prerequisites
- A Wallaroo instance version 2023.2.1 or greater.
- Install the libraries from
./resources/requirements.txt
that include the following:
- google-cloud-bigquery==3.10.0
- google-auth==2.17.3
- db-dtypes==1.1.1
References
Statsmodel Forecast Connection Steps
Import Libraries
The first step is to import the libraries that we will need.
import json
import os
import datetime
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np
from resources import simdb
from resources import util
pd.set_option('display.max_colwidth', None)
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import time
display(wallaroo.__version__)
'2023.3.0+65834aca6'
Initialize connection
Start a connect to the Wallaroo instance and save the connection into the variable wl
.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Set Configurations
The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.
Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix
variable is generated from a random set of 4 ASCII characters. To use the same workspace across the tutorial notebooks, hard code suffix
and verify the workspace name created is is unique across the Wallaroo instance.
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = f'multiple-replica-forecast-tutorial-{suffix}'
pipeline_name = 'bikedaypipe'
model_name = 'bikedaymodel'
Set the Workspace and Pipeline
The workspace will be either used or created if it does not exist, along with the pipeline.
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload Model
The Python model created in “Forecast and Parallel Infer with Statsmodel: Model Creation” will now be uploaded. Note that the Framework and runtime are set to python
.
model_file_name = 'forecast.py'
bike_day_model = wl.upload_model(model_name, model_file_name, Framework.PYTHON).configure(runtime="python")
pipeline.add_model_step(bike_day_model)
name | bikedaypipe |
---|
created | 2023-06-28 20:11:58.734248+00:00 |
---|
last_updated | 2023-06-29 21:10:19.250680+00:00 |
---|
deployed | True |
---|
tags | |
---|
versions | 93b113a2-f31a-4e05-883e-66a3d1fa10fb, 7d687c43-a833-4585-b607-7085eff16e9d, 504bb140-d9e2-4964-8f82-27b1d234f7f2, db1a14ad-c40c-41ac-82db-0cdd372172f3, 01d60d1c-7834-4d1f-b9a8-8ad569e114b6, a165cbbb-84d9-42e7-99ec-aa8e244aeb55, 0fefef8b-105e-4a6e-9193-d2e6d61248a1 |
---|
steps | bikedaymodel |
---|
Deploy the Pipeline
We will now add the uploaded model as a step for the pipeline, then deploy it. The pipeline configuration will allow for multiple replicas of the pipeline to be deployed and spooled up in the cluster. Each pipeline replica will use 0.25 cpu and 512 Gi RAM.
# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
.replica_count(4)
.cpus(0.25)
.memory("512Mi")
.build()
)
pipeline.deploy(deployment_config = deploy_config)
ok
name | bikedaypipe |
---|
created | 2023-06-28 20:11:58.734248+00:00 |
---|
last_updated | 2023-06-29 21:12:00.676013+00:00 |
---|
deployed | True |
---|
tags | |
---|
versions | f5051ddf-1111-49e6-b914-f8d24f1f6a8a, 93b113a2-f31a-4e05-883e-66a3d1fa10fb, 7d687c43-a833-4585-b607-7085eff16e9d, 504bb140-d9e2-4964-8f82-27b1d234f7f2, db1a14ad-c40c-41ac-82db-0cdd372172f3, 01d60d1c-7834-4d1f-b9a8-8ad569e114b6, a165cbbb-84d9-42e7-99ec-aa8e244aeb55, 0fefef8b-105e-4a6e-9193-d2e6d61248a1 |
---|
steps | bikedaymodel |
---|
Create the Connection
We have already demonstrated through the other notebooks in this series that we can use the statsmodel forecast model to perform an inference through a simulated database. Now we’ll create a Wallaroo connection that will store the credentials to a Google BigQuery database containining the information we’re looking for.
The details of the connection are stored in the file ./resources/bigquery_service_account_statsmodel.json
that include the service account key file(SAK) information, as well as the dataset and table used. The details on how to generate the table and data for the sample bike_rentals
table are stored in the file ./resources/create_bike_rentals.table
, with the data used stored in ./resources/bike_rentals.csv
.
Wallaroo connections are created through the Wallaroo Client create_connection(name, type, details)
method. See the Wallaroo SDK Essentials Guide: Data Connections Management guide for full details.
With the credentials are three other important fields:
dataset
: The BigQuery dataset from the project specified in the service account credentials file.
input_table
: The table used for inference inputs.
output_table
: The table used to store results.
We’ll add the helper method get_connection
. If the connection already exists, then Wallaroo will return an error. If the connection with the same name already exists, it will retrieve it. Verify that the connection does not already exist in the Wallaroo instance for proper functioning of this tutorial.
forecast_connection_input_name = f'statsmodel-bike-rentals-{suffix}'
forecast_connection_input_type = "BIGQUERY"
forecast_connection_input_argument = json.load(open('./resources/bigquery_service_account_statsmodel.json'))
statsmodel_connection = wl.create_connection(forecast_connection_input_name,
forecast_connection_input_type,
forecast_connection_input_argument)
display(statsmodel_connection)
Field |
Value |
Name | statsmodel-bike-rentals-jch |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-06-29T19:55:17.866728+00:00 |
Linked Workspaces | ['multiple-replica-forecast-tutorial-jch'] |
Add Connection to Workspace
We’ll now add the connection to our workspace so it can be retrieved by other workspace users. The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace.
workspace.add_connection(forecast_connection_input_name)
Retrieve Connection from Workspace
To simulate a data scientist’s procedural flow, we’ll now retrieve the connection from the workspace.
The method Workspace list_connections()
displays a list of connections attached to the workspace. By default the details field is obfuscated. Specific connections are retrieved by specifying their position in the returned list.
forecast_connection = workspace.list_connections()[0]
display(forecast_connection)
Field |
Value |
Name | statsmodel-bike-rentals-jch |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-06-29T19:55:17.866728+00:00 |
Linked Workspaces | ['multiple-replica-forecast-tutorial-jch'] |
Run Inference from BigQuery Table
We’ll now retrieve sample data through the Wallaroo connection, and perform a sample inference. The connection details are retrieved through the Connection details()
method.
The process is:
- Create the BigQuery credentials.
- Connect to the BigQuery dataset.
- Retrieve the inference data.
bigquery_statsmodel_credentials = service_account.Credentials.from_service_account_info(
forecast_connection.details())
bigquery_statsmodel_client = bigquery.Client(
credentials=bigquery_statsmodel_credentials,
project=forecast_connection.details()['project_id']
)
inference_inputs = bigquery_statsmodel_client.query(
f"""
select dteday as date, cnt FROM {forecast_connection.details()['dataset']}.{forecast_connection.details()['input_table']}
where dteday > DATE_SUB(DATE('2011-02-22'),
INTERVAL 1 month) AND dteday <= DATE('2011-02-22')
ORDER BY dteday
LIMIT 5
"""
).to_dataframe().apply({"date":str, "cnt":int}).to_dict(orient='list')
# the original table sends back the date schema as a date, not text. We'll convert it here.
# inference_inputs = inference_inputs.apply({"date":str, "cnt":int})
display(inference_inputs)
{'date': ['2011-01-23',
'2011-01-24',
'2011-01-25',
'2011-01-26',
'2011-01-27'],
'cnt': [986, 1416, 1985, 506, 431]}
With the data retrieved, we’ll perform an inference through it and display the result.
results = pipeline.infer(inference_inputs)
results
[{'forecast': [1177, 1023, 1082, 1060, 1068, 1065, 1066]}]
Four Weeks of Inference Data
Now we’ll go back staring at the “current data” of the next month in 2011, and fetch the previous month to that date, then use that to predict what sales will be over the next 7 days.
The inference data is saved into the inference_data
List - each element in the list will be a separate inference request.
# Start by getting the current month - we'll alway assume we're in 2011 to match the data store
month = datetime.datetime.now().month
month=5
start_date = f"{month+1}-1-2011"
display(start_date)
'6-1-2011'
def get_forecast_days(firstdate) :
days = [i*7 for i in [-1,0,1,2,3,4]]
deltadays = pd.to_timedelta(pd.Series(days), unit='D')
analysis_days = (pd.to_datetime(firstdate) + deltadays).dt.date
analysis_days = [str(day) for day in analysis_days]
analysis_days
seed_day = analysis_days.pop(0)
return analysis_days
forecast_dates = get_forecast_days(start_date)
display(forecast_dates)
['2011-06-01', '2011-06-08', '2011-06-15', '2011-06-22', '2011-06-29']
# get our list of items to run through
inference_data = []
days = []
# get the days from the start date to the end date
def get_forecast_dates(forecast_day: str, nforecast=7):
days = [i for i in range(nforecast)]
deltadays = pd.to_timedelta(pd.Series(days), unit='D')
last_day = pd.to_datetime(forecast_day)
dates = last_day + deltadays
datestr = dates.dt.date.astype(str)
return datestr
# used to generate our queries
def mk_dt_range_query(*, tablename: str, forecast_day: str) -> str:
assert isinstance(tablename, str)
assert isinstance(forecast_day, str)
query = f"""
select cnt from {tablename} where
dteday >= DATE_SUB(DATE('{forecast_day}'), INTERVAL 1 month)
AND dteday < DATE('{forecast_day}')
ORDER BY dteday
"""
return query
for day in forecast_dates:
print(f"Current date: {day}")
day_range=get_forecast_dates(day)
days.append({"date": day_range})
query = mk_dt_range_query(tablename=f"{forecast_connection.details()['dataset']}.{forecast_connection.details()['input_table']}", forecast_day=day)
print(query)
data = bigquery_statsmodel_client.query(query).to_dataframe().apply({"cnt":int}).to_dict(orient='list')
# add the date into the list
inference_data.append(data)
Current date: 2011-06-01
select cnt from release_testing_2023_2.bike_rentals where
dteday >= DATE_SUB(DATE('2011-06-01'), INTERVAL 1 month)
AND dteday < DATE('2011-06-01')
ORDER BY dteday
Current date: 2011-06-08
select cnt from release_testing_2023_2.bike_rentals where
dteday >= DATE_SUB(DATE('2011-06-08'), INTERVAL 1 month)
AND dteday < DATE('2011-06-08')
ORDER BY dteday
Current date: 2011-06-15
select cnt from release_testing_2023_2.bike_rentals where
dteday >= DATE_SUB(DATE('2011-06-15'), INTERVAL 1 month)
AND dteday < DATE('2011-06-15')
ORDER BY dteday
Current date: 2011-06-22
select cnt from release_testing_2023_2.bike_rentals where
dteday >= DATE_SUB(DATE('2011-06-22'), INTERVAL 1 month)
AND dteday < DATE('2011-06-22')
ORDER BY dteday
Current date: 2011-06-29
select cnt from release_testing_2023_2.bike_rentals where
dteday >= DATE_SUB(DATE('2011-06-29'), INTERVAL 1 month)
AND dteday < DATE('2011-06-29')
ORDER BY dteday
parallel_results = await pipeline.parallel_infer(tensor_list=inference_data, timeout=20, num_parallel=16, retries=2)
display(parallel_results)
[[{'forecast': [4373, 4385, 4379, 4382, 4380, 4381, 4380]}],
[{'forecast': [4666, 4582, 4560, 4555, 4553, 4553, 4552]}],
[{'forecast': [4683, 4634, 4625, 4623, 4622, 4622, 4622]}],
[{'forecast': [4732, 4637, 4648, 4646, 4647, 4647, 4647]}],
[{'forecast': [4692, 4698, 4699, 4699, 4699, 4699, 4699]}]]
days_results = list(zip(days, parallel_results))
# merge our parallel results into the predicted date sales
# results_table = pd.DataFrame(list(zip(days, parallel_results)),
# columns=["date", "forecast"])
results_table = pd.DataFrame(columns=["date", "forecast"])
# display(days_results)
for date in days_results:
# display(date)
new_days = date[0]['date'].tolist()
new_forecast = date[1][0]['forecast']
new_results = list(zip(new_days, new_forecast))
results_table = results_table.append(pd.DataFrame(list(zip(new_days, new_forecast)), columns=['date','forecast']))
Based on all of the predictions, here are the results for the next month.
|
date |
forecast |
0 |
2011-06-01 |
4373 |
1 |
2011-06-02 |
4385 |
2 |
2011-06-03 |
4379 |
3 |
2011-06-04 |
4382 |
4 |
2011-06-05 |
4380 |
5 |
2011-06-06 |
4381 |
6 |
2011-06-07 |
4380 |
0 |
2011-06-08 |
4666 |
1 |
2011-06-09 |
4582 |
2 |
2011-06-10 |
4560 |
3 |
2011-06-11 |
4555 |
4 |
2011-06-12 |
4553 |
5 |
2011-06-13 |
4553 |
6 |
2011-06-14 |
4552 |
0 |
2011-06-15 |
4683 |
1 |
2011-06-16 |
4634 |
2 |
2011-06-17 |
4625 |
3 |
2011-06-18 |
4623 |
4 |
2011-06-19 |
4622 |
5 |
2011-06-20 |
4622 |
6 |
2011-06-21 |
4622 |
0 |
2011-06-22 |
4732 |
1 |
2011-06-23 |
4637 |
2 |
2011-06-24 |
4648 |
3 |
2011-06-25 |
4646 |
4 |
2011-06-26 |
4647 |
5 |
2011-06-27 |
4647 |
6 |
2011-06-28 |
4647 |
0 |
2011-06-29 |
4692 |
1 |
2011-06-30 |
4698 |
2 |
2011-07-01 |
4699 |
3 |
2011-07-02 |
4699 |
4 |
2011-07-03 |
4699 |
5 |
2011-07-04 |
4699 |
6 |
2011-07-05 |
4699 |
Upload into DataBase
With our results, we’ll upload the results into the table listed in our connection as the results_table
. To save time, we’ll just upload the dataframe directly with the Google Query insert_rows_from_dataframe
method.
output_table = bigquery_statsmodel_client.get_table(f"{forecast_connection.details()['dataset']}.{forecast_connection.details()['results_table']}")
bigquery_statsmodel_client.insert_rows_from_dataframe(
output_table,
dataframe=results_table
)
[[]]
We’ll grab the last 5 results from our results table to verify the data was inserted.
# Get the last insert to the output table to verify
# wait 10 seconds for the insert to finish
time.sleep(10)
task_inference_results = bigquery_statsmodel_client.query(
f"""
SELECT *
FROM {forecast_connection.details()['dataset']}.{forecast_connection.details()['results_table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
|
date |
forecast |
0 |
2011-07-05 |
4699 |
1 |
2011-07-05 |
4699 |
2 |
2011-07-04 |
4699 |
3 |
2011-07-04 |
4699 |
4 |
2011-07-03 |
4699 |
Undeploy the Pipeline
Undeploy the pipeline and return the resources back to the Wallaroo instance.
Waiting for undeployment - this will take up to 45s ..................................... ok
name | bikedaypipe |
---|
created | 2023-06-28 20:11:58.734248+00:00 |
---|
last_updated | 2023-06-29 21:12:00.676013+00:00 |
---|
deployed | False |
---|
tags | |
---|
versions | f5051ddf-1111-49e6-b914-f8d24f1f6a8a, 93b113a2-f31a-4e05-883e-66a3d1fa10fb, 7d687c43-a833-4585-b607-7085eff16e9d, 504bb140-d9e2-4964-8f82-27b1d234f7f2, db1a14ad-c40c-41ac-82db-0cdd372172f3, 01d60d1c-7834-4d1f-b9a8-8ad569e114b6, a165cbbb-84d9-42e7-99ec-aa8e244aeb55, 0fefef8b-105e-4a6e-9193-d2e6d61248a1 |
---|
steps | bikedaymodel |
---|
4 - Statsmodel Forecast with Wallaroo Features: ML Workload Orchestration
Automating the bike rental Statsmodel forecasting model.
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Statsmodel Forecast with Wallaroo Features: ML Workload Orchestration
This tutorial series demonstrates how to use Wallaroo to create a Statsmodel forecasting model based on bike rentals. This tutorial series is broken down into the following:
- Create and Train the Model: This first notebook shows how the model is trained from existing data.
- Deploy and Sample Inference: With the model developed, we will deploy it into Wallaroo and perform a sample inference.
- Parallel Infer: A sample of multiple weeks of data will be retrieved and submitted as an asynchronous parallel inference. The results will be collected and uploaded to a sample database.
- External Connection: A sample data connection to Google BigQuery to retrieve input data and store the results in a table.
- ML Workload Orchestration: Take all of the previous steps and automate the request into a single Wallaroo ML Workload Orchestration.
This step will expand upon using the Connection and create a ML Workload Orchestration that automates requesting the inference data, submitting it in parallel, and storing the results into a database table.
Prerequisites
- A Wallaroo instance version 2023.2.1 or greater.
- Install the libraries from
./resources/requirements.txt
that include the following:
- google-cloud-bigquery==3.10.0
- google-auth==2.17.3
- db-dtypes==1.1.1
References
Orchestrations, Taks, and Tasks Runs
We’ve details how Wallaroo Connections work. Now we’ll use Orchestrations, Tasks, and Task Runs.
Item |
Description |
Orchestration |
ML Workload orchestration allows data scientists and ML Engineers to automate and scale production ML workflows in Wallaroo to ensure a tight feedback loop and continuous tuning of models from training to production. Wallaroo platform users (data scientists or ML Engineers) have the ability to deploy, automate and scale recurring batch production ML workloads that can ingest data from predefined data sources to run inferences in Wallaroo, chain pipelines, and send inference results to predefined destinations to analyze model insights and assess business outcomes. |
Task |
An implementation of an Orchestration. Tasks can be either Run Once : They run once and upon completion, stop. Run Scheduled : The task runs whenever a specific cron like schedule is reached. Scheduled tasks will run until the kill command is issued. |
Task Run |
The execusion of a task. For Run Once tasks, there will be only one Run Task . A Run Scheduled tasks will have multiple tasks, one for every time the schedule parameter is met. Task Runs have their own log files that can be examined to track progress and results. |
Statsmodel Forecast Connection Steps
Import Libraries
The first step is to import the libraries that we will need.
import json
import os
import datetime
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np
from resources import simdb
from resources import util
pd.set_option('display.max_colwidth', None)
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import time
display(wallaroo.__version__)
'2023.3.0+785595cda'
Initialize connection
Start a connect to the Wallaroo instance and save the connection into the variable wl
.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Set Configurations
The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.
Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, the suffix
variable is generated from a random set of 4 ASCII characters. To use the same workspace across the tutorial notebooks, hard code suffix
and verify the workspace name created is is unique across the Wallaroo instance.
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
workspace_name = f'multiple-replica-forecast-tutorial-{suffix}'
pipeline_name = 'bikedaypipe'
connection_name = f'statsmodel-bike-rentals-{suffix}'
Set the Workspace and Pipeline
The workspace will be either used or created if it does not exist, along with the pipeline.
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Deploy Pipeline
The pipeline is already set witht the model. For our demo we’ll verify that it’s deployed.
# Set the deployment to allow for additional engines to run
deploy_config = (wallaroo.DeploymentConfigBuilder()
.replica_count(4)
.cpus(0.25)
.memory("512Mi")
.build()
)
pipeline.deploy(deployment_config = deploy_config)
Waiting for deployment - this will take up to 45s .................... ok
name | bikedaypipe |
---|
created | 2023-06-30 15:42:56.781150+00:00 |
---|
last_updated | 2023-06-30 15:45:23.267621+00:00 |
---|
deployed | True |
---|
tags | |
---|
versions | 6552b04e-d074-4773-982b-a2885ce6f9bf, b884c20c-c491-46ec-b438-74384a963acc, 4e8d2a88-1a41-482c-831d-f057a48e18c1 |
---|
steps | bikedaymodel |
---|
BigQuery Sample Orchestration
The orchestration that will automate this process is ./resources/forecast-bigquer-orchestration.zip
. The files used are stored in the directory forecast-bigquery-orchestration
, created with the command:
zip -r forecast-bigquery-connection.zip main.py requirements.txt
.
This contains the following:
requirements.txt
: The Python requirements file to specify the following libraries used:
google-cloud-bigquery==3.10.0
google-auth==2.17.3
db-dtypes==1.1.1
main.py
: The entry file that takes the previous statsmodel BigQuery connection and statsmodel Forecast model and uses it to predict the next month’s sales based on the previous month’s performance. The details are listed below. Since we are using the async parallel_infer
, we’ll use the asyncio
library to run our sample main
method.
import json
import os
import datetime
import asyncio
import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import time
async def main():
wl = wallaroo.Client()
# get the arguments
arguments = wl.task_args()
if "workspace_name" in arguments:
workspace_name = arguments['workspace_name']
else:
workspace_name="multiple-replica-forecast-tutorial"
if "pipeline_name" in arguments:
pipeline_name = arguments['pipeline_name']
else:
pipeline_name="bikedaypipe"
if "bigquery_connection_input_name" in arguments:
bigquery_connection_name = arguments['bigquery_connection_input_name']
else:
bigquery_connection_name = "statsmodel-bike-rentals"
print(bigquery_connection_name)
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
print(f"Pipeline not found:{name}")
return pipeline
print(f"BigQuery Connection: {bigquery_connection_name}")
forecast_connection = wl.get_connection(bigquery_connection_name)
print(f"Workspace: {workspace_name}")
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
print(workspace)
# the pipeline is assumed to be deployed
print(f"Pipeline: {pipeline_name}")
pipeline = get_pipeline(pipeline_name)
print(pipeline)
print("Getting date and input query.")
bigquery_statsmodel_credentials = service_account.Credentials.from_service_account_info(
forecast_connection.details())
bigquery_statsmodel_client = bigquery.Client(
credentials=bigquery_statsmodel_credentials,
project=forecast_connection.details()['project_id']
)
print("Get the current month and retrieve next month's forecasts")
month = datetime.datetime.now().month
start_date = f"{month+1}-1-2011"
print(f"Start date: {start_date}")
def get_forecast_days(firstdate) :
days = [i*7 for i in [-1,0,1,2,3,4]]
deltadays = pd.to_timedelta(pd.Series(days), unit='D')
analysis_days = (pd.to_datetime(firstdate) + deltadays).dt.date
analysis_days = [str(day) for day in analysis_days]
analysis_days
seed_day = analysis_days.pop(0)
return analysis_days
forecast_dates = get_forecast_days(start_date)
print(f"Forecast dates: {forecast_dates}")
# get our list of items to run through
inference_data = []
days = []
# get the days from the start date to the end date
def get_forecast_dates(forecast_day: str, nforecast=7):
days = [i for i in range(nforecast)]
deltadays = pd.to_timedelta(pd.Series(days), unit='D')
last_day = pd.to_datetime(forecast_day)
dates = last_day + deltadays
datestr = dates.dt.date.astype(str)
return datestr
# used to generate our queries
def mk_dt_range_query(*, tablename: str, forecast_day: str) -> str:
assert isinstance(tablename, str)
assert isinstance(forecast_day, str)
query = f"""
select cnt from {tablename} where
dteday >= DATE_SUB(DATE('{forecast_day}'), INTERVAL 1 month)
AND dteday < DATE('{forecast_day}')
ORDER BY dteday
"""
return query
for day in forecast_dates:
print(f"Current date: {day}")
day_range=get_forecast_dates(day)
days.append({"date": day_range})
query = mk_dt_range_query(tablename=f"{forecast_connection.details()['dataset']}.{forecast_connection.details()['input_table']}", forecast_day=day)
print(query)
data = bigquery_statsmodel_client.query(query).to_dataframe().apply({"cnt":int}).to_dict(orient='list')
# add the date into the list
inference_data.append(data)
print(inference_data)
parallel_results = await pipeline.parallel_infer(tensor_list=inference_data, timeout=20, num_parallel=16, retries=2)
days_results = list(zip(days, parallel_results))
print(days_results)
# merge our parallel results into the predicted date sales
results_table = pd.DataFrame(columns=["date", "forecast"])
# match the dates to predictions
# display(days_results)
for date in days_results:
# display(date)
new_days = date[0]['date'].tolist()
new_forecast = date[1][0]['forecast']
new_results = list(zip(new_days, new_forecast))
results_table = results_table.append(pd.DataFrame(list(zip(new_days, new_forecast)), columns=['date','forecast']))
print("Uploading results to results table.")
output_table = bigquery_statsmodel_client.get_table(f"{forecast_connection.details()['dataset']}.{forecast_connection.details()['results_table']}")
bigquery_statsmodel_client.insert_rows_from_dataframe(
output_table,
dataframe=results_table
)
asyncio.run(main())
This orchestration allows a user to specify the workspace, pipeline, and data connection. As long as they all match the previous conditions, then the orchestration will run successfully.
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter |
Type |
Description |
path |
string (Required) |
The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip
will be uploaded and saved to the variable orchestration
. Then we will loop until the uploaded orchestration’s status
displays ready
.
orchestration = wl.upload_orchestration(name="statsmodel-orchestration", path="./resources/forecast-bigquery-orchestration.zip")
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
id | name | status | filename | sha | created at | updated at |
---|
8211497d-292a-4145-b28b-f6364e12544e | statsmodel-orchestration | packaging | forecast-bigquery-orchestration.zip | 44f591...1fa8d6 | 2023-30-Jun 15:45:48 | 2023-30-Jun 15:45:58 |
f8f31494-41c4-4336-bfd6-5b3b1607dedc | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | 27ad14...306ad1 | 2023-30-Jun 15:51:08 | 2023-30-Jun 15:51:57 |
fd776f89-ea63-45e9-b8d6-a749074fd579 | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | bd6a0e...3a6a09 | 2023-30-Jun 16:45:50 | 2023-30-Jun 16:46:39 |
8200995b-3e33-49f4-ac4f-98ea2b1330db | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | 8d0c2f...a3c89f | 2023-30-Jun 15:54:14 | 2023-30-Jun 15:55:07 |
5449a104-abc5-423d-a973-31a3cfdf8b55 | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | e00646...45d2a7 | 2023-30-Jun 16:12:39 | 2023-30-Jun 16:13:29 |
9fd1e58c-942d-495b-b3bd-d51f5c03b5ed | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | bd6a0e...3a6a09 | 2023-30-Jun 16:48:53 | 2023-30-Jun 16:49:44 |
73f2e90a-13ab-4182-bde1-0fe55c4446cf | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | f78c26...f494d9 | 2023-30-Jun 16:27:37 | 2023-30-Jun 16:28:31 |
64b085c7-5317-4152-81c3-c0c77b4f683b | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | 37257f...4b4547 | 2023-30-Jun 16:39:49 | 2023-30-Jun 16:40:38 |
4a3a73ab-014c-4aa4-9896-44c313d80daa | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | 23bf29...17b780 | 2023-30-Jun 16:52:45 | 2023-30-Jun 16:53:38 |
b4ef4449-9afe-4fba-aaa0-b7fd49687443 | statsmodel-orchestration | ready | forecast-bigquery-orchestration.zip | d4f02b...0e6c5d | 2023-30-Jun 16:42:29 | 2023-30-Jun 16:43:26 |
Create the Task
The orchestration is now ready to be implemented as a Wallaroo Task. We’ll just run it once as an example. This specific Orchestration that creates the Task assumes that the pipeline is deployed, and accepts the arguments:
- workspace_name
- pipeline_name
- bigquery_connection_name
We’ll supply the workspaces, pipeline and connection created in previous steps and stored in the initial variables above. Verify these exist and match the existing workspace, pipeline and connection used in the previous notebooks in this series.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
task = orchestration.run_once(name="statsmodel single run", json_args={"workspace_name":workspace_name, "pipeline_name": pipeline_name, "bigquery_connection_input_name":connection_name})
Monitor Run with Task Status
We’ll monitor the run first with it’s status.
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
while task.status() != "started":
display(task.status())
time.sleep(5)
'pending'
‘pending’
'statsmodel-bike-rentals-jch'
List Tasks
We’ll use the Wallaroo client list_tasks
method to view the tasks currently running.
id | name | last run status | type | active | schedule | created at | updated at |
---|
c7279e5e-e162-42f8-90ce-b7c0c0bb30f8 | statsmodel single run | running | Temporary Run | True | - | 2023-30-Jun 16:53:41 | 2023-30-Jun 16:53:47 |
a47dbca0-e568-44d3-9715-1fed0f17b9a7 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:49:44 | 2023-30-Jun 16:49:54 |
15c80ad0-537f-4e6a-84c6-6c2f35b5f441 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:46:41 | 2023-30-Jun 16:46:51 |
d0935da6-480a-420d-a70c-570160b0b6b3 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:44:50 | 2023-30-Jun 16:44:56 |
e510e8c5-048b-43b1-9524-974934a9e4f5 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:43:30 | 2023-30-Jun 16:43:35 |
0f62befb-c788-4779-bcfb-0595e3ca6f24 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:40:39 | 2023-30-Jun 16:40:50 |
f00c6a97-32f9-4124-bf86-34a0068c1314 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:28:32 | 2023-30-Jun 16:28:38 |
10c8af33-8ff4-4aae-b08d-89665bcb0481 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:13:30 | 2023-30-Jun 16:13:35 |
9ae4e6e6-3849-4039-acfe-6810699edef8 | statsmodel single run | failure | Temporary Run | True | - | 2023-30-Jun 16:00:05 | 2023-30-Jun 16:00:15 |
Display Task Run Results
The Task Run is the implementation of the task - the actual running of the script and it’s results. Tasks that are Run Once will only have one Task Run, while a Task set to Run Scheduled will have a Task Run for each time the task is executed. Each Task Run has its own set of logs and results that are monitoried through the Task Run logs()
method.
We’ll wait 30 seconds, then retrieve the task run for our generated task, then start checking the logs for our task run. It may take longer than 30 seconds to launch the task, so be prepared to run the .logs()
method again to view the logs.
#wait 30 seconds for the task to finish
time.sleep(30)
statsmodel_task_run = task.last_runs()[0]
statsmodel_task_run.logs()
2023-30-Jun 16:53:57 statsmodel-bike-rentals-jch
2023-30-Jun 16:53:57 BigQuery Connection: statsmodel-bike-rentals-jch
2023-30-Jun 16:53:57 Workspace: multiple-replica-forecast-tutorial-jch
2023-30-Jun 16:53:57 {'name': 'multiple-replica-forecast-tutorial-jch', 'id': 7, 'archived': False, 'created_by': '34b86cac-021e-4cf0-aa30-40da7db5a77f', 'created_at': '2023-06-30T15:42:56.551195+00:00', 'models': [{'name': 'bikedaymodel', 'versions': 1, 'owner_id': '""', 'last_update_time': datetime.datetime(2023, 6, 30, 15, 42, 56, 979723, tzinfo=tzutc()), 'created_at': datetime.datetime(2023, 6, 30, 15, 42, 56, 979723, tzinfo=tzutc())}], 'pipelines': [{'name': 'bikedaypipe', 'create_time': datetime.datetime(2023, 6, 30, 15, 42, 56, 781150, tzinfo=tzutc()), 'definition': '[]'}]}
2023-30-Jun 16:53:57 Pipeline: bikedaypipe
2023-30-Jun 16:53:57 {'name': 'bikedaypipe', 'create_time': datetime.datetime(2023, 6, 30, 15, 42, 56, 781150, tzinfo=tzutc()), 'definition': '[]'}
2023-30-Jun 16:53:57 Getting date and input query.
2023-30-Jun 16:53:57 Get the current month and retrieve next month's forecasts
2023-30-Jun 16:53:57 Start date: 7-1-2011
2023-30-Jun 16:53:57 Forecast dates: ['2011-07-01', '2011-07-08', '2011-07-15', '2011-07-22', '2011-07-29']
2023-30-Jun 16:53:57 Current date: 2011-07-01
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 select cnt from release_testing_2023_2.bike_rentals where
2023-30-Jun 16:53:57 dteday >= DATE_SUB(DATE('2011-07-01'), INTERVAL 1 month)
2023-30-Jun 16:53:57 AND dteday < DATE('2011-07-01')
2023-30-Jun 16:53:57 ORDER BY dteday
2023-30-Jun 16:53:57 Current date: 2011-07-08
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 select cnt from release_testing_2023_2.bike_rentals where
2023-30-Jun 16:53:57 dteday >= DATE_SUB(DATE('2011-07-08'), INTERVAL 1 month)
2023-30-Jun 16:53:57 ORDER BY dteday
2023-30-Jun 16:53:57 AND dteday < DATE('2011-07-08')
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 Current date: 2011-07-15
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 select cnt from release_testing_2023_2.bike_rentals where
2023-30-Jun 16:53:57 dteday >= DATE_SUB(DATE('2011-07-15'), INTERVAL 1 month)
2023-30-Jun 16:53:57 ORDER BY dteday
2023-30-Jun 16:53:57 AND dteday < DATE('2011-07-15')
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 Current date: 2011-07-22
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 select cnt from release_testing_2023_2.bike_rentals where
2023-30-Jun 16:53:57 dteday >= DATE_SUB(DATE('2011-07-22'), INTERVAL 1 month)
2023-30-Jun 16:53:57 AND dteday < DATE('2011-07-22')
2023-30-Jun 16:53:57 ORDER BY dteday
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 Current date: 2011-07-29
2023-30-Jun 16:53:57 select cnt from release_testing_2023_2.bike_rentals where
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 dteday >= DATE_SUB(DATE('2011-07-29'), INTERVAL 1 month)
2023-30-Jun 16:53:57 ORDER BY dteday
2023-30-Jun 16:53:57 AND dteday < DATE('2011-07-29')
2023-30-Jun 16:53:57
2023-30-Jun 16:53:57 [({'date': 0 2011-07-01
2023-30-Jun 16:53:57 [{'cnt': [3974, 4968, 5312, 5342, 4906, 4548, 4833, 4401, 3915, 4586, 4966, 4460, 5020, 4891, 5180, 3767, 4844, 5119, 4744, 4010, 4835, 4507, 4790, 4991, 5202, 5305, 4708, 4648, 5225, 5515]}, {'cnt': [4401, 3915, 4586, 4966, 4460, 5020, 4891, 5180, 3767, 4844, 5119, 4744, 4010, 4835, 4507, 4790, 4991, 5202, 5305, 4708, 4648, 5225, 5515, 5362, 5119, 4649, 6043, 4665, 4629, 4592]}, {'cnt': [5180, 3767, 4844, 5119, 4744, 4010, 4835, 4507, 4790, 4991, 5202, 5305, 4708, 4648, 5225, 5515, 5362, 5119, 4649, 6043, 4665, 4629, 4592, 4040, 5336, 4881, 4086, 4258, 4342, 5084]}, {'cnt': [4507, 4790, 4991, 5202, 5305, 4708, 4648, 5225, 5515, 5362, 5119, 4649, 6043, 4665, 4629, 4592, 4040, 5336, 4881, 4086, 4258, 4342, 5084, 5538, 5923, 5302, 4458, 4541, 4332, 3784]}, {'cnt': [5225, 5515, 5362, 5119, 4649, 6043, 4665, 4629, 4592, 4040, 5336, 4881, 4086, 4258, 4342, 5084, 5538, 5923, 5302, 4458, 4541, 4332, 3784, 3387, 3285, 3606, 3840, 4590, 4656, 4390]}]
2023-30-Jun 16:53:57 1 2011-07-02
2023-30-Jun 16:53:57 2 2011-07-03
2023-30-Jun 16:53:57 3 2011-07-04
2023-30-Jun 16:53:57 4 2011-07-05
2023-30-Jun 16:53:57 5 2011-07-06
2023-30-Jun 16:53:57 6 2011-07-07
2023-30-Jun 16:53:57 dtype: object}, [{'forecast': [4894, 4767, 4786, 4783, 4783, 4783, 4783]}]), ({'date': 0 2011-07-08
2023-30-Jun 16:53:57 2 2011-07-10
2023-30-Jun 16:53:57 1 2011-07-09
2023-30-Jun 16:53:57 4 2011-07-12
2023-30-Jun 16:53:57 3 2011-07-11
2023-30-Jun 16:53:57 5 2011-07-13
2023-30-Jun 16:53:57 6 2011-07-14
2023-30-Jun 16:53:57 dtype: object}, [{'forecast': [4842, 4839, 4836, 4833, 4831, 4830, 4828]}]), ({'date': 0 2011-07-15
2023-30-Jun 16:53:57 1 2011-07-16
2023-30-Jun 16:53:57 2 2011-07-17
2023-30-Jun 16:53:57 3 2011-07-18
2023-30-Jun 16:53:57 4 2011-07-19
2023-30-Jun 16:53:57 5 2011-07-20
2023-30-Jun 16:53:57 6 2011-07-21
2023-30-Jun 16:53:57 dtype: object}, [{'forecast': [4895, 4759, 4873, 4777, 4858, 4789, 4848]}]), ({'date': 0 2011-07-22
2023-30-Jun 16:53:57 1 2011-07-23
2023-30-Jun 16:53:57 2 2011-07-24
2023-30-Jun 16:53:57 3 2011-07-25
2023-30-Jun 16:53:57 5 2011-07-27
2023-30-Jun 16:53:57 4 2011-07-26
2023-30-Jun 16:53:57 6 2011-07-28
2023-30-Jun 16:53:57 dtype: object}, [{'forecast': [4559, 4953, 4829, 4868, 4856, 4860, 4858]}]), ({'date': 0 2011-07-29
2023-30-Jun 16:53:57 1 2011-07-30
2023-30-Jun 16:53:57 3 2011-08-01
2023-30-Jun 16:53:57 2 2011-07-31
2023-30-Jun 16:53:57 5 2011-08-03
2023-30-Jun 16:53:57 4 2011-08-02
2023-30-Jun 16:53:57 6 2011-08-04
2023-30-Jun 16:53:57 dtype: object}, [{'forecast': [4490, 4549, 4586, 4610, 4624, 4634, 4640]}])]
2023-30-Jun 16:53:57 Uploading results to results table.
Undeploy the Pipeline
Undeploy the pipeline and return the resources back to the Wallaroo instance.
Waiting for undeployment - this will take up to 45s ..................................... ok
name | bikedaypipe |
---|
created | 2023-06-30 15:42:56.781150+00:00 |
---|
last_updated | 2023-06-30 15:45:23.267621+00:00 |
---|
deployed | False |
---|
tags | |
---|
versions | 6552b04e-d074-4773-982b-a2885ce6f9bf, b884c20c-c491-46ec-b438-74384a963acc, 4e8d2a88-1a41-482c-831d-f057a48e18c1 |
---|
steps | bikedaymodel |
---|