Wallaroo ML Workload Orchestration Google BigQuery with Statsmodel Forecast Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo ML Workload Orchestrations and BigQuery Connections with Statsmodel Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
This tutorial will focus on using Google BigQuery as the data source for supplying the inference data to perform inferences through a Statsmodel ML model.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
- Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
- Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
- Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
- Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
- The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.
google-cloud-bigquery
: Specifically for its support for Google BigQuery.google-auth
: Used to authenticate for bigquery.db-dtypes
: Converts the BigQuery results to Apache Arrow table or pandas DataFrame.
Tutorial Resources
- Models:
models/bike_day_model.pkl
: The statsmodel model predicts how many bikes will be rented on each of the next 7 days, based on the previous 7 days’ bike rentals, temperature, and wind speed. This model only accepts shapes (7,4) - 7 rows (representing the last 7 days) and 4 columns (representing the fieldstemp
,holiday
,workingday
,windspeed
). Additional files to support this example are:infer.py
: The inference script that is part of thestatsmodel
.
- Data:
data/day.csv
: Data used to train the samplestatsmodel
example.data/bike_day_eval.json
: Data used for inferences. This will be transferred to a BigQuery table as shown in the demonstration.
- Resources:
resources/bigquery_service_account_input_key.json
: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference data.resources/bigquery_service_account_output_key.json
: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference result data.resources/statsmodel_forecast_inputs.sql
: SQL script to create the inputs table schema, populated with the values from./data/day.csv
.resources/statsmodel_forecast_outputs.sql
: SQL script to create the outputs table schema.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
import json
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0+dfca0605e'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Variable Declaration
The following variables will be used for our big query testing.
We’ll use two connections:
- bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
- bigquery_output_connection: The connection that will upload inference results into a BigQuery table.
Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json
and ./bigquery_service_account_output_key.json
that include the service account key file(SAK) information, as well as the dataset and table used.
Field | Included in SAK |
---|---|
type | √ |
project_id | √ |
private_key_id | √ |
private_key | √ |
client_email | √ |
auth_uri | √ |
token_uri | √ |
auth_provider_x509_cert_url | √ |
client_x509_cert_url | √ |
database | 🚫 |
table | 🚫 |
# Setting variables for later steps
workspace_name = f'bigquerystatsmodelworkspace{suffix}'
pipeline_name = f'bigquerystatsmodelpipeline{suffix}'
model_name = f'bigquerystatsmodelmodel{suffix}'
model_file_name = './models/bike_day_model.pkl'
bigquery_connection_input_name = f'bigqueryforecastinputs{suffix}'
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open('./resources/bigquery_service_account_input_key.json'))
bigquery_connection_output_name = f'bigqueryforecastoutputs{suffix}'
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open('./resources/bigquery_service_account_output_key.json'))
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
bike_day_model = wl.upload_model(model_name, model_file_name, framework=wallaroo.framework.Framework.ONNX).configure(runtime="python")
# Add the model as a pipeline step
pipeline.add_model_step(bike_day_model)
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:19:43.097185+00:00 |
deployed | (none) |
tags | |
versions | fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps |
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ............. ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:19:43.390612+00:00 |
deployed | True |
tags | |
versions | ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
Sample Inferences
We’ll perform some quick sample inferences with the local file data to verity the pipeline deployed and is ready for inferences. Once done, we’ll undeploy the pipeline.
## perform inferences
results = pipeline.infer_from_file('./data/bike_day_eval.json', data_format="custom-json")
print(results)
[{'forecast': [1882.3784555157672, 2130.607915701861, 2340.84005381799, 2895.754978552066, 2163.657515565616, 1509.1792126509536, 2431.1838923957016]}]
Create Connections
We will create the data source connection via the Wallaroo client command create_connection
.
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Required) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
- IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same
name
as an existing data connection will result in an error.
See the statsmodel_forecast_inputs
and statsmodel_forecast_outputs
details listed above for the table schema used for our example.
connection_input = wl.create_connection(bigquery_connection_input_name, bigquery_connection_input_type, bigquery_connection_input_argument)
connection_output = wl.create_connection(bigquery_connection_output_name, bigquery_connection_output_type, bigquery_connection_output_argument)
display(connection_input)
display(connection_output)
Field | Value |
---|---|
Name | bigqueryforecastinputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.159691+00:00 |
Linked Workspaces | [] |
Field | Value |
---|---|
Name | bigqueryforecastoutputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.195628+00:00 |
Linked Workspaces | [] |
Get Connection by Name
The Wallaroo client method get_connection(name)
retrieves the connection that matches the name
parameter. We’ll retrieve our connection and store it as inference_source_connection
.
big_query_input_connection = wl.get_connection(name=bigquery_connection_input_name)
big_query_output_connection = wl.get_connection(name=bigquery_connection_output_name)
display(big_query_input_connection)
display(big_query_output_connection)
Field | Value |
---|---|
Name | bigqueryforecastinputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.159691+00:00 |
Linked Workspaces | [] |
Field | Value |
---|---|
Name | bigqueryforecastoutputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.195628+00:00 |
Linked Workspaces | [] |
Add Connection to Workspace
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.
workspace.add_connection(bigquery_connection_input_name)
workspace.add_connection(bigquery_connection_output_name)
workspace.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
bigqueryforecastinputsgztp | BIGQUERY | ***** | 2023-05-23T15:19:58.159691+00:00 | ['bigquerystatsmodelworkspacegztp'] |
bigqueryforecastoutputsgztp | BIGQUERY | ***** | 2023-05-23T15:19:58.195628+00:00 | ['bigquerystatsmodelworkspacegztp'] |
Big Query Connection Inference Example
We can test the BigQuery connection with a simple inference to our deployed pipeline. We’ll request the data, format the table into a pandas DataFrame, then submit it for an inference request.
Create Google Credentials
From our BigQuery request, we’ll create the credentials for our BigQuery connection.
bigquery_input_credentials = service_account.Credentials.from_service_account_info(
big_query_input_connection.details())
bigquery_output_credentials = service_account.Credentials.from_service_account_info(
big_query_output_connection.details())
Connect to Google BigQuery
We can now generate a client from our connection details, specifying the project that was included in the big_query_connection
details.
bigqueryinputclient = bigquery.Client(
credentials=bigquery_input_credentials,
project=big_query_input_connection.details()['project_id']
)
bigqueryoutputclient = bigquery.Client(
credentials=bigquery_output_credentials,
project=big_query_output_connection.details()['project_id']
)
Query Data
Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json
.
We’ll grab the last 7 days of data - with every record assumed to be one day - and use that for our inference request.
inference_dataframe_input = bigqueryinputclient.query(
f"""
(select dteday, temp, holiday, workingday, windspeed
FROM {big_query_input_connection.details()['dataset']}.{big_query_input_connection.details()['table']}
ORDER BY dteday DESC LIMIT 7)
ORDER BY dteday
"""
).to_dataframe().drop(columns=['dteday'])
# convert to a dict, show the first 7 rows
display(inference_dataframe_input.to_dict())
{'temp': {0: 0.291304,
1: 0.243333,
2: 0.254167,
3: 0.253333,
4: 0.253333,
5: 0.255833,
6: 0.215833},
'holiday': {0: 1, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0},
'workingday': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0, 5: 0, 6: 1},
'windspeed': {0: 0.168726,
1: 0.316546,
2: 0.350133,
3: 0.155471,
4: 0.124383,
5: 0.350754,
6: 0.154846}}
Sample Inference
With our data retrieved, we’ll perform an inference and display the results.
#deploy the pipeline
pipeline.deploy()
ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:20:00.500498+00:00 |
deployed | True |
tags | |
versions | f0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
results = pipeline.infer(inference_dataframe_input.to_dict())
display(results[0]['forecast'])
[1231.2556997246595,
1627.3643469089343,
1674.3769827243134,
1621.9273295873882,
1140.7465817903185,
1211.5223974364667,
1457.1896450382922]
Upload the Results
With the query complete, we’ll upload the results back to the BigQuery dataset.
output_table = bigqueryoutputclient.get_table(f"{big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}")
job = bigqueryoutputclient.query(
f"""
INSERT {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
VALUES
(current_timestamp(), "{results[0]['forecast']}")
"""
)
# Get the last insert to the output table to verify
# wait 10 seconds for the insert to finish
time.sleep(10)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Wallaroo ML Workload Orchestration Example
With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
- Use the
bigquery_remote_inference
to open a connection to the input and output tables. - Deploy the pipeline.
- Perform an inference with the input data.
- Save the inference results to the output table.
- Undeploy the pipeline.
This sample script is stored in bigquery_statsmodel_remote_inference/main.py
with an requirements.txt
file having the specific libraries for the Google BigQuery connection., and packaged into the orchestration as ./bigquery_statsmodel_remote_inference/bigquery_statsmodel_remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip
will be uploaded and saved to the variable orchestration
. Then we will loop until the orchestration status is ready
.
pipeline.deploy()
ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:20:13.581488+00:00 |
deployed | True |
tags | |
versions | 2af13cbf-2dae-4dc9-85fa-ae06c04b6a54, f0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
orchestration = wl.upload_orchestration(path="./bigquery_remote_inference/bigquery_remote_inference.zip")
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
2186543f-f7f6-46b2-8334-63d73ddb0204 | None | ready | bigquery_remote_inference.zip | 66945c...e28259 | 2023-23-May 15:20:13 | 2023-23-May 15:21:06 |
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
Run Task Once
We’ll do both a Run Once task and generate our Run Once Task from our orchestration.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
We’ll display the last 5 rows of our BigQuery output table, then start the task that will perform the same inference we did above.
# Get the last insert to the output table to verify
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+ |