The following tutorials show how to use the Wallaroo ML Workload Orchestration feature with different data connections and situations.
ML Workload Orchestration Tutorials
- 1: Wallaroo ML Workload Orchestration API Tutorial
- 2: Wallaroo Connection API with Google BigQuery Tutorial
- 3: Wallaroo ML Workload Orchestration Simple Tutorial
- 4: Wallaroo ML Workload Orchestration Google BigQuery with House Price Model Tutorial
- 5: Wallaroo ML Workload Orchestration Google BigQuery with Statsmodel Forecast Tutorial
- 6: Wallaroo ML Workload Orchestration Comprehensive Tutorial
1 - Wallaroo ML Workload Orchestration API Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo Connection and ML Workload Orchestration API Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a workspace and pipeline with a sample model.
- Upload Wallaroo ML Workload Orchestration through the Wallaroo MLOps API.
- List available orchestrations through the Wallaroo MLOps API.
- Run the orchestration once as a Run Once Task through the Wallaroo MLOps API and verify that the information was saved the pipeline logs.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
import requests
# Used to create unique workspace and pipeline names
import string
import random
# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'gsze'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. If logging in externally, update the wallarooPrefix
and wallarooSuffix
variables with the proper DNS information. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
API URL
The variable APIURL
is used to specify the connection to the Wallaroo instance’s MLOps API URL, and is composed of the Wallaroo DNS prefix and suffix. For full details, see the Wallaroo API Connection Guide
.
The variables wallarooPrefix
and wallarooSuffix
variables will be used to derive the API url. For example, if the Wallaroo Prefix is doc-test
and the url is example.com
, then the MLOps API URL would be doc-test.api.example.com/v1/api/{request}
.
Note the .
is part of the prefix. If there is no prefix, then wallarooPrefix = ""
Set the Wallaroo Prefix and Suffix in the code segment below based on your Wallaroo instance.
# Setting variables for later steps
wallarooPrefix = "YOUR PREFIX."
wallarooSuffix = "YOUR SUFFIX"
APIURL = f"https://{wallarooPrefix}api.{wallarooSuffix}"
workspace_name = f'apiorchestrationworkspace{suffix}'
pipeline_name = f'apipipeline{suffix}'
model_name = f'apiorchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
def get_connection(name, connection_type, connection_arguments):
try:
connection = wl.get_connection(name)
except RequiredAttributeMissing:
connection =wl.create_connection(name,
connection_type,
connection_arguments)
return connection
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
workspace_id = workspace.id()
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(housing_model_control)
name | apipipelinegsze |
---|---|
created | 2023-05-22 20:48:30.700499+00:00 |
last_updated | 2023-05-22 20:48:30.700499+00:00 |
deployed | (none) |
tags | |
versions | 101b252a-623c-4185-a24d-ec00593dda79 |
steps |
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ......... ok
name | apipipelinegsze |
---|---|
created | 2023-05-22 20:48:30.700499+00:00 |
last_updated | 2023-05-22 20:48:31.357336+00:00 |
deployed | True |
tags | |
versions | aac61b5a-e4f4-4ea3-9347-6482c330b5f5, 101b252a-623c-4185-a24d-ec00593dda79 |
steps | apiorchestrationmodelgsze |
Wallaroo ML Workload Orchestration Example
With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
- Use the
inference_results_connection
to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline. - Submit the inference results to the location specified in the
external_inference_connection
.
This sample script is stored in remote_inference/main.py
with an empty requirements.txt
file, and packaged into the orchestration as ./remote_inference/remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
API Upload the Orchestration
Orchestrations are uploaded via POST as a application/octet-stream
with MLOps API route:
- REQUEST
- POST
/v1/api/orchestration/upload
- Content-Type
multipart/form-data
- POST
- PARAMETERS
file
: The file uploaded as Content-Type asapplication/octet-stream
.metadata
: Included as Content-Type asapplication/json
with:- workspace_id: The numerical id of the workspace to upload the orchestration to.
- name: The name of the orchestration.
- The metadata specifying the workspace id and Content-Type as
application/json
.
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/orchestration/upload"
fp = open("./api_inference_orchestration.zip", "rb")
metadata = f'{{"workspace_id": {workspace_id},"name": "apiorchestrationsample"}}'
response = requests.post(
url,
headers=headers,
files=[
("file",
("api_inference_orchestration.zip", fp, "application/octet-stream")
),
("metadata",
("metadata", metadata, "application/json")
)
],
).json()
display(response)
orchestration_id = response['id']
{'id': 'b951f7b8-0690-4004-86bf-cc9802359313'}
API List Orchestrations
A list of orchestrations retrieved via POST MLOps API route:
- REQUEST
- POST
/v1/api/orchestration/list
- POST
- PARAMETERS
- workspace_id: The numerical identifier of the workspace associated with the orchestration.
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/orchestration/list"
data = {
'workspace_id': workspace_id
}
response=requests.post(url, headers=headers, json=data)
display(response.json())
[{'id': 'b951f7b8-0690-4004-86bf-cc9802359313',
'workspace_id': 8,
'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'name': 'apiorchestrationsample',
'file_name': 'api_inference_orchestration.zip',
'task_id': 'c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef',
'owner_id': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
'status': 'pending_packaging',
'created_at': '2023-05-22T20:48:41.233482+00:00',
'updated_at': '2023-05-22T20:48:41.233482+00:00'}]
API Get Orchestration
A list of orchestrations retrieved via POST MLOps API route:
- REQUEST
- POST
/v1/api/orchestration/get_by_id
- POST
- PARAMETERS
- id: The UUID of the orchestration being retrieved.
- RETURNS
- id: The ID of the orchestration in UUID format.
- workspace_id: Numerical value of the workspace the orchestration was uploaded to.
- sha: The SHA hash value of the orchestration.
- file_name: The file name the orchestration was uploaded as.
- task_id: The task id in UUID format for unpacking and preparing the orchestration.
- owner_id: The Keycloak ID of the user that uploaded the orchestration.
- status: The status of the orchestration. Status values are:
packing
: Preparing the orchestration to be used as a task.ready
: The orchestration is ready to be deployed as a task.
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/orchestration/get_by_id"
data = {
'id': orchestration_id
}
# loop until status is ready
status = None
while status != 'ready':
response=requests.post(url, headers=headers, json=data).json()
display(response)
status = response['status']
time.sleep(10)
orchestration_sha = response['sha']
{'id': 'b951f7b8-0690-4004-86bf-cc9802359313',
'workspace_id': 8,
'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'file_name': 'api_inference_orchestration.zip',
'task_id': 'c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef',
'owner_id': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
'status': 'packaging'}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}
{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘ready’}
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
Run Task Once via API
We’ll do both a Run Once task and generate our Run Once Task from our orchestration. Orchestrations are started as a run once task with the following request:
- REQUEST
- POST
/v1/api/orchestration/task/run_once
- POST
- PARAMETERS
- name (String Required): The name to assign to the task.
- workspace_id (Integer Required): The numerical identifier of the workspace associated with the orchestration.
- orch_id(String Required): The orchestration ID represented by a UUID.
- json(Dict Required): The parameters to pass to the task.
Tasks are generated and run once with the Orchestration run_once
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
# retrieve the authorization token
headers = wl.auth.auth_header()
data = {
"name": "api run once task",
"workspace_id": workspace_id,
"orch_id": orchestration_id,
"json": {
"workspace_name": workspace_name,
"pipeline_name": pipeline_name
}
}
import datetime
task_start = datetime.datetime.now()
url=f"{APIURL}/v1/api/task/run_once"
response=requests.post(url, headers=headers, json=data).json()
display(response)
task_id = response['id']
{'id': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150'}
Task Status via API
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo MLOPs API request:
- REQUEST
- POST
/v1/api/task/get_by_id
- POST
- PARAMETERS
- task: The numerical identifier of the workspace associated with the orchestration.
- orch_id: The orchestration ID represented by a UUID.
- json: The parameters to pass to the task.
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_by_id"
data = {
"id": task_id
}
status = None
while status != 'started':
response=requests.post(url, headers=headers, json=data).json()
display(response)
status = response['status']
time.sleep(10)
{'name': 'api run once task',
'id': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3271',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_DEBUG': 'false',
'TASK_ID': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150'},
'auth_init': True,
'workspace_id': 8,
'flavor': 'exec_orch_oneshot',
'reap_threshold_secs': 900,
'exec_type': 'job',
'status': 'pending',
'input_data': {'pipeline_name': 'apipipelinegsze',
'workspace_name': 'apiorchestrationworkspacegsze'},
'killed': False,
'created_at': '2023-05-22T21:08:31.099447+00:00',
'updated_at': '2023-05-22T21:08:31.105312+00:00',
'last_runs': []}
{’name’: ‘api run once task’,
‘id’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’,
‘image’: ‘proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy’,
‘image_tag’: ‘v2023.2.0-main-3271’,
‘bind_secrets’: [‘minio’],
’extra_env_vars’: {‘MINIO_URL’: ‘http://minio.wallaroo.svc.cluster.local:9000’,
‘ORCH_OWNER_ID’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘ORCH_SHA’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘TASK_DEBUG’: ‘false’,
‘TASK_ID’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’},
‘auth_init’: True,
‘workspace_id’: 8,
‘flavor’: ’exec_orch_oneshot’,
‘reap_threshold_secs’: 900,
’exec_type’: ‘job’,
‘status’: ‘started’,
‘input_data’: {‘pipeline_name’: ‘apipipelinegsze’,
‘workspace_name’: ‘apiorchestrationworkspacegsze’},
‘killed’: False,
‘created_at’: ‘2023-05-22T21:08:31.099447+00:00’,
‘updated_at’: ‘2023-05-22T21:08:36.585775+00:00’,
’last_runs’: [{‘run_id’: ‘96a7f85f-e30c-40b5-9185-0dee5bd1a15e’,
‘status’: ‘running’,
‘created_at’: ‘2023-05-22T21:08:33.112805+00:00’,
‘updated_at’: ‘2023-05-22T21:08:33.112805+00:00’}]}
Task Results
We can view the inferences from our logs and verify that new entries were added from our task. In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.
time.sleep(30)
task_end = datetime.datetime.now()
display(task_end)
pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 22, 21, 9, 32, 447321)
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-22 21:08:37.779 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.7] | 0 |
Get Tasks by Orchestration SHA
Tasks tied to the same orchestration are retrieved through the following request.
- REQUEST
- POST
/v1/api/task/get_tasks_by_orch_sha
- POST
- PARAMETERS
- sha: The orchestrations SHA hash.
- RETURNS
ids
: List[string] List of tasks by UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_tasks_by_orch_sha"
data = {
"sha": orchestration_sha
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'ids': ['2424a9a7-2331-42f6-bd84-90643386b130',
'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
'a41fe4ae-b8a4-4e1f-a45a-114df64ae2bc']}
Task Last Runs History
The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs
method that takes the following arguments. It returns the reverse chronological order of tasks runs listed by updated_at
.
- REQUEST
- POST
/v1/api/task/list_task_runs
- POST
- PARAMETERS
- task_id: The numerical identifier of the task.
- status: Filters the task history by the
status
. Ifall
, returns all statuses. Status values are:running
: The task has started.failure
: The task failed.success
: The task completed.
- limit: The number of tasks runs to display.
- RETURNS
- ids: List of task runs ids in UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/list_task_runs"
data = {
"task_id": task_id
}
response=requests.post(url, headers=headers, json=data).json()
task_run_id = response[0]['run_id']
display(response)
[{'task': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
'run_id': '96a7f85f-e30c-40b5-9185-0dee5bd1a15e',
'status': 'success',
'created_at': '2023-05-22T21:08:33.112805+00:00',
'updated_at': '2023-05-22T21:08:33.112805+00:00'}]
Get Task Run Logs
Logs for a task run are retrieved through the following process.
- REQUEST
- POST
/v1/api/task/get_logs_for_run
- POST
- PARAMETERS
- id: The numerical identifier of the task run associated with the orchestration.
- lines: The number of log lines to retrieve starting from the end of the log.
- RETURNS
- logs: Array of log entries.
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_logs_for_run"
data = {
"id": task_run_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2023-05-22T21:09:17.683428502Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
'2023-05-22T21:09:17.683489102Z stdout F Getting the workspace apiorchestrationworkspacegsze',
'2023-05-22T21:09:17.683497403Z stdout F Getting the pipeline apipipelinegsze',
'2023-05-22T21:09:17.683504003Z stdout F Deploying the pipeline.',
'2023-05-22T21:09:17.683510203Z stdout F Performing sample inference.',
'2023-05-22T21:09:17.683516203Z stdout F time ... check_failures',
'2023-05-22T21:09:17.683521903Z stdout F 0 2023-05-22 21:08:37.779 ... 0',
'2023-05-22T21:09:17.683527803Z stdout F ',
'2023-05-22T21:09:17.683533603Z stdout F [1 rows x 4 columns]',
'2023-05-22T21:09:17.683540103Z stdout F Undeploying the pipeline']}
Run Task Scheduled via API
The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args)
. This sets up a task to run on an regular schedule as defined by the schedule
parameter in the cron
service format. For example:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour.
The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.
schedule={'0 0 12 1-15 2 2024'}
The Run Scheduled Task request is available at the following address:
/v1/api/task/run_scheduled
And takes the following parameters.
- name (String Required): The name to assign to the task.
- orch_id (String Required): The UUID orchestration ID to create the task from.
- workspace_id (Integer Required): The numberical identifier for the workspace.
- schedule (String Required): The schedule as a single string in
cron
format. - timeout(Integer Optional): The timeout to complete the task in seconds.
- json (String Required): The arguments to pass to the task.
For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill
task to keep the task from running any further.
It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.
# retrieve the authorization token
headers = wl.auth.auth_header()
data = {
"name": "scheduled api task",
"workspace_id": workspace_id,
"orch_id": orchestration_id,
"schedule": "*/5 * * * *",
"json": {
"workspace_name": workspace_name,
"pipeline_name": pipeline_name
}
}
import datetime
task_start = datetime.datetime.now()
url=f"{APIURL}/v1/api/task/run_scheduled"
response=requests.post(url, headers=headers, json=data).json()
display(response)
scheduled_task_id = response['id']
{'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910'}
# loop until the task is started
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_by_id"
data = {
"id": scheduled_task_id
}
status = None
while status != 'started':
response=requests.post(url, headers=headers, json=data).json()
display(response)
status = response['status']
time.sleep(10)
{'name': 'scheduled api task',
'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3271',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_DEBUG': 'false',
'TASK_ID': '87c2c7c0-e17e-4c00-9407-bfc44d632910'},
'auth_init': True,
'workspace_id': 8,
'schedule': '*/5 * * * *',
'reap_threshold_secs': 900,
'status': 'started',
'input_data': {'pipeline_name': 'apipipelinegsze',
'workspace_name': 'apiorchestrationworkspacegsze'},
'killed': False,
'created_at': '2023-05-22T21:10:22.43957+00:00',
'updated_at': '2023-05-22T21:10:22.871615+00:00',
'last_runs': []}
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/list_task_runs"
data = {
"task_id": scheduled_task_id
}
# loop until we have a single run task
length = 0
while length == 0:
response=requests.post(url, headers=headers, json=data).json()
display(response)
length = len(response)
time.sleep(10)
task_run_id = response[0]['run_id']
[{'task': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
'run_id': 'ad3d427a-3643-4f10-9b94-116688b32355',
'status': 'running',
'created_at': '2023-05-22T21:15:02.148274+00:00',
'updated_at': '2023-05-22T21:15:02.148274+00:00'}]
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_logs_for_run"
data = {
"id": task_run_id
}
# loop until we get a log
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2023-05-22T21:15:55.548754679Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
'2023-05-22T21:15:55.54880928Z stdout F Getting the workspace apiorchestrationworkspacegsze',
'2023-05-22T21:15:55.54881708Z stdout F Getting the pipeline apipipelinegsze',
'2023-05-22T21:15:55.54882328Z stdout F Deploying the pipeline.',
'2023-05-22T21:15:55.54883088Z stdout F Performing sample inference.',
'2023-05-22T21:15:55.54883628Z stdout F time ... check_failures',
'2023-05-22T21:15:55.54884248Z stdout F 0 2023-05-22 21:15:17.420 ... 0',
'2023-05-22T21:15:55.54884788Z stdout F ',
'2023-05-22T21:15:55.54885348Z stdout F [1 rows x 4 columns]',
'2023-05-22T21:15:55.54885938Z stdout F Undeploying the pipeline']}
Cleanup
With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.
Kill Task via API
Started tasks are killed through the following request:
/v1/api/task/kill
And takes the following parameters.
- orch_id (String Required): The UUID orchestration ID to create the task from.
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/kill"
data = {
"id": scheduled_task_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'name': 'scheduled api task',
'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3271',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_DEBUG': 'false',
'TASK_ID': '87c2c7c0-e17e-4c00-9407-bfc44d632910'},
'auth_init': True,
'workspace_id': 8,
'schedule': '*/5 * * * *',
'reap_threshold_secs': 900,
'status': 'pending_kill',
'input_data': {'pipeline_name': 'apipipelinegsze',
'workspace_name': 'apiorchestrationworkspacegsze'},
'killed': False,
'created_at': '2023-05-22T21:10:22.43957+00:00',
'updated_at': '2023-05-22T21:10:22.871615+00:00',
'last_runs': [{'run_id': 'ad3d427a-3643-4f10-9b94-116688b32355',
'status': 'success',
'created_at': '2023-05-22T21:15:02.148274+00:00',
'updated_at': '2023-05-22T21:15:02.148274+00:00'}]}
Close Resources
With the tutorial complete, we’ll verify the pipeline is closed so the resources are assigned back to the Wallaroo instance.
pipeline.undeploy()
ok
name | apipipelinegsze |
---|---|
created | 2023-05-22 20:48:30.700499+00:00 |
last_updated | 2023-05-22 20:48:31.357336+00:00 |
deployed | False |
tags | |
versions | aac61b5a-e4f4-4ea3-9347-6482c330b5f5, 101b252a-623c-4185-a24d-ec00593dda79 |
steps | apiorchestrationmodelgsze |
2 - Wallaroo Connection API with Google BigQuery Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo Connection and ML Workload Orchestration with BigQuery House Price Model Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections to organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
This tutorial will focus on using Google BigQuery as the data source.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
- Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
- Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
- Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
- Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
- The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.
google-cloud-bigquery
: Specifically for its support for Google BigQuery.google-auth
: Used to authenticate for bigquery.db-dtypes
: Converts the BigQuery results to Apache Arrow table or pandas DataFrame.
Tutorial Resources
- Models:
models/rf_model.onnx
: A model that predicts house price values.
- Data:
data/xtest-1.df.json
anddata/xtest-1k.df.json
: DataFrame JSON inference inputs with 1 input and 1,000 inputs.data/xtest-1k.arrow
: Apache Arrow inference inputs with 1 input and 1,000 inputs.- Sample inference inputs in
CSV
that can be imported into Google BigQuery.data/xtest-1k.df.json
: Random sample housing prices.data/smallinputs.df.json
: Sample housing prices that return results lower than $1.5 million.data/biginputs.df.json
: Sample housing prices that return results higher than $1.5 million.
- SQL queries to create the inputs/outputs tables with schema.
./resources/create_inputs_table.sql
: Inputs table with schema../resources/create_outputs_table.sql
: Outputs table with schema../resources/housrpricesga_inputs.avro
: Avro container of inputs table.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
import json
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import requests
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0rc3'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. If logging in externally, update the wallarooPrefix
and wallarooSuffix
variables with the proper DNS information. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
API URL
The variable APIURL
is used to specify the connection to the Wallaroo instance’s MLOps API URL, and is composed of the Wallaroo DNS prefix and suffix. For full details, see the Wallaroo API Connection Guide
.
The variables wallarooPrefix
and wallarooSuffix
variables will be used to derive the API url. For example, if the Wallaroo Prefix is doc-test.
and the url is example.com
, then the MLOps API URL would be doc-test.api.example.com/v1/api/{request}
.
Note the .
is part of the prefix. If there is no prefix, then wallarooPrefix = ""
Set the Wallaroo Prefix and Suffix in the code segment below based on your Wallaroo instance.
Variable Declaration
The following variables will be used for our big query testing.
We’ll use two connections:
- bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
- bigquery_output_connection: The connection that will upload inference results into a BigQuery table.
Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json
and ./bigquery_service_account_output_key.json
that include the service account key file(SAK) information, as well as the dataset and table used.
Field | Included in SAK |
---|---|
type | √ |
project_id | √ |
private_key_id | √ |
private_key | √ |
client_email | √ |
auth_uri | √ |
token_uri | √ |
auth_provider_x509_cert_url | √ |
client_x509_cert_url | √ |
database | 🚫 |
table | 🚫 |
# Setting variables for later steps
wallarooPrefix = "YOUR PREFIX."
wallarooSuffix = "YOUR SUFFIX"
APIURL = f"https://{wallarooPrefix}api.{wallarooSuffix}"
# Setting variables for later steps
workspace_name = 'bigqueryapiworkspace'
pipeline_name = 'bigqueryapipipeline'
model_name = 'bigqueryapimodel'
model_file_name = './models/rf_model.onnx'
bigquery_connection_input_name = f"bigqueryhouseapiinput{suffix}"
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open('./bigquery_service_account_input_key.json'))
bigquery_connection_output_name = f"bigqueryhouseapioutputs{suffix}"
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open('./bigquery_service_account_output_key.json'))
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
def get_connection(name, connection_type, connection_arguments):
try:
connection = wl.get_connection(name)
except RequiredAttributeMissing:
connection =wl.create_connection(name,
connection_type,
connection_arguments)
return connection
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
workspace_id = workspace.id()
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(housing_model_control)
name | bigqueryapipipeline |
---|---|
created | 2023-05-17 16:06:45.400005+00:00 |
last_updated | 2023-05-17 16:06:48.150211+00:00 |
deployed | True |
tags | |
versions | 5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3 |
steps | bigqueryapimodel |
#deploy the pipeline
pipeline.deploy()
name | bigqueryapipipeline |
---|---|
created | 2023-05-17 16:06:45.400005+00:00 |
last_updated | 2023-05-17 16:11:23.317215+00:00 |
deployed | True |
tags | |
versions | 153a11e5-7968-450c-aef5-d1be17c6b173, 5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3 |
steps | bigqueryapimodel |
Connection Management via the Wallaroo MLOps API
The following steps will demonstration using the Wallaroo MLOps API to:
- Create the BigQuery connections
- Add the connections to the targeted workspace
- Use the connections for inference requests and uploading the results to a BigQuery dataset table.
Create Connections via API
We will create the data source connection via the Wallaroo api request:
/v1/api/connections/create
This takes the following parameters:
name (String Required): The name of the connection.
type (String Required): The user defined type of connection.
details (String Required): User defined configuration details for the data connection. These can be
{'username':'dataperson', 'password':'datapassword', 'port': 3339}
, or{'token':'abcde123==', 'host':'example.com', 'port:1234'}
, or other user defined combinations.IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same
name
as an existing data connection will result in an error.
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/connections/create"
# input connection
data = {
'name': bigquery_connection_input_name,
'type' : bigquery_connection_input_type,
'details': bigquery_connection_input_argument
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
# saved for later steps
connection_input_id = response['id']
{'id': '839779d7-d4b3-4e1c-953a-2f5ef55f1bb2'}
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/connections/create"
# output connection
data = {
'name': bigquery_connection_output_name,
'type' : bigquery_connection_output_type,
'details': bigquery_connection_output_argument
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
# saved for later steps
connection_output_id = response['id']
{'id': 'ef0c62e7-e59f-4c43-bdff-05ed0976cffa'}
Add Connections to Workspace via API
The connections will be added to the sample workspace with the MLOps API request:
/v1/api/connections/add_to_workspace
This takes the following parameters:
- workspace_id (String Required): The name of the connection.
- connection_id (String Required): The UUID connection ID
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/connections/add_to_workspace"
data = {
'workspace_id': workspace_id,
'connection_id': connection_input_id
}
response=requests.post(url, headers=headers, json=data)
display(response.json())
data = {
'workspace_id': workspace_id,
'connection_id': connection_output_id
}
response=requests.post(url, headers=headers, json=data)
display(response.json())
{'id': '3877da9d-e450-4b9a-85dd-7f68127b1313'}
{‘id’: ‘15f247ab-833f-468a-b7c5-9b50831052e6’}
Connect to Google BigQuery
With our connections set, we’ll now use them for an inference request through the following steps:
- Retrieve the input data from a BigQuery request from the input connection details.
- Perform the inference.
- Upload the inference results into another BigQuery table from the output connection details.
Create Google Credentials
From our BigQuery request, we’ll create the credentials for our BigQuery connection.
We will use the MLOps API call:
/v1/api/connections/get
to retrieve the connection. This request takes the following parameters:
- name (String Required): The name of the connection.
# get the connection input details
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/connections/get"
data = {
'name': bigquery_connection_input_name
}
connection_input_details=requests.post(url, headers=headers, json=data).json()['details']
# get the connection output details
# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/connections/get"
data = {
'name': bigquery_connection_output_name
}
connection_output_details=requests.post(url, headers=headers, json=data).json()['details']
# Set the bigquery credentials
bigquery_input_credentials = service_account.Credentials.from_service_account_info(
connection_input_details)
bigquery_output_credentials = service_account.Credentials.from_service_account_info(
connection_output_details)
Connect to Google BigQuery
We can now generate a client from our connection details, specifying the project that was included in the big_query_connection
details.
bigqueryinputclient = bigquery.Client(
credentials=bigquery_input_credentials,
project=connection_input_details['project_id']
)
bigqueryoutputclient = bigquery.Client(
credentials=bigquery_output_credentials,
project=connection_output_details['project_id']
)
Query Data
Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json
. The table is expected to be in the format of the file ./data/xtest-1k.df.json
.
inference_dataframe_input = bigqueryinputclient.query(
f"""
SELECT tensor
FROM {connection_input_details['dataset']}.{connection_input_details['table']}"""
).to_dataframe()
inference_dataframe_input.head(5)
tensor | |
---|---|
0 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] |
1 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] |
2 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] |
3 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] |
4 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] |
Sample Inference
With our data retrieved, we’ll perform an inference and display the results.
result = pipeline.infer(inference_dataframe_input)
display(result.head(5))
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-17 16:12:53.970 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-17 16:12:53.970 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-17 16:12:53.970 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-17 16:12:53.970 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-17 16:12:53.970 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
Upload the Results
With the query complete, we’ll upload the results back to the BigQuery dataset.
output_table = bigqueryoutputclient.get_table(f"{connection_output_details['dataset']}.{connection_output_details['table']}")
bigqueryoutputclient.insert_rows_from_dataframe(
output_table,
dataframe=result.rename(columns={"in.tensor":"in_tensor", "out.variable":"out_variable"})
)
[[], []]
Verify the Upload
We can verify the upload by requesting the last few rows of the output table.
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {connection_output_details['dataset']}.{connection_output_details['table']}
ORDER BY time DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
time | in_tensor | out_variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-17 16:12:53.970 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
1 | 2023-05-17 16:12:53.970 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
2 | 2023-05-17 16:12:53.970 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
3 | 2023-05-17 16:12:53.970 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
4 | 2023-05-17 16:12:53.970 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
Cleanup
With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.
pipeline.undeploy()
name | bigqueryapipipeline |
---|---|
created | 2023-05-17 16:06:45.400005+00:00 |
last_updated | 2023-05-17 16:11:23.317215+00:00 |
deployed | False |
tags | |
versions | 153a11e5-7968-450c-aef5-d1be17c6b173, 5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3 |
steps | bigqueryapimodel |
3 - Wallaroo ML Workload Orchestration Simple Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo Connection and ML Workload Orchestration Simple Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a Wallaroo connection to retrieving information from an external source.
- Upload Wallaroo ML Workload Orchestration.
- Run the orchestration once as a Run Once Task and verify that the information was saved the pipeline logs.
- Schedule the orchestration as a Scheduled Task and verify that the information was saved to the pipeline logs.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
# Used to create unique workspace and pipeline names
import string
import random
# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'dtzw'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
# Setting variables for later steps
workspace_name = f'simpleorchestrationworkspace{suffix}'
pipeline_name = f'simpleorchestrationpipeline{suffix}'
model_name = f'simpleorchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'
inference_connection_name = f'external_inference_connection{suffix}'
inference_connection_type = "HTTP"
inference_connection_argument = {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(housing_model_control)
name | simpleorchestrationpipelinedtzw |
---|---|
created | 2023-05-23 15:26:00.268667+00:00 |
last_updated | 2023-05-23 15:26:00.268667+00:00 |
deployed | (none) |
tags | |
versions | 9a5afba3-c664-4d57-8c08-fc072d3f549c |
steps |
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ....................... ok
name | simpleorchestrationpipelinedtzw |
---|---|
created | 2023-05-23 15:26:00.268667+00:00 |
last_updated | 2023-05-23 15:26:00.568944+00:00 |
deployed | True |
tags | |
versions | 24f5d5e9-59fe-4440-9e08-78c0003226df, 9a5afba3-c664-4d57-8c08-fc072d3f549c |
steps | simpleorchestrationmodeldtzw |
Create Connections
We will create the data source connection via the Wallaroo client command create_connection
.
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Required) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
- IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same
name
as an existing data connection will result in an error.
We’ll also create a data connection named inference_results_connection
with our helper function get_connection
that will either create or retrieve a connection if it already exists. From there we’ll create out connections:
houseprice_arrow_table
: An Apache Arrow file stored on GitHub that will be used for our inference input.
wl.create_connection(inference_connection_name, inference_connection_type, inference_connection_argument)
Field | Value |
---|---|
Name | external_inference_connectiondtzw |
Connection Type | HTTP |
Details | ***** |
Created At | 2023-05-23T15:26:24.613152+00:00 |
Linked Workspaces | [] |
Get Connection by Name
The Wallaroo client method get_connection(name)
retrieves the connection that matches the name
parameter. We’ll retrieve our connection and store it as inference_source_connection
.
inference_source_connection = wl.get_connection(name=inference_connection_name)
display(inference_source_connection)
Field | Value |
---|---|
Name | external_inference_connectiondtzw |
Connection Type | HTTP |
Details | ***** |
Created At | 2023-05-23T15:26:24.613152+00:00 |
Linked Workspaces | [] |
Add Connection to Workspace
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.
workspace.add_connection(inference_connection_name)
workspace.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
external_inference_connectiondtzw | HTTP | ***** | 2023-05-23T15:26:24.613152+00:00 | ['simpleorchestrationworkspacedtzw'] |
Wallaroo ML Workload Orchestration Example
With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
- Use the
inference_results_connection
to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline. - Submit the inference results to the location specified in the
external_inference_connection
.
This sample script is stored in remote_inference/main.py
with an empty requirements.txt
file, and packaged into the orchestration as ./remote_inference/remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./remote_inference/remote_inference.zip
will be uploaded and saved to the variable orchestration
.
Orchestration Status
We will loop until the uploaded orchestration’s status
displays ready
.
orchestration = wl.upload_orchestration(path="./remote_inference/remote_inference.zip")
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
701c991a-a716-4bca-aa69-afd957ff189e | None | ready | remote_inference.zip | b4593d...d5a86f | 2023-23-May 15:26:24 | 2023-23-May 15:27:13 |
Upload Orchestration via File Object
Another method to upload the orchestration is as a file object. For that, we will open the zip file as a binary, then upload it using the bytes_buffer
parameter to specify the file object, and the file_name
to give it a new name.
zipfile = open("./remote_inference/remote_inference.zip", "rb").read()
wl.upload_orchestration(bytes_buffer=zipfile, file_name="inferencetest.zip", name="uploadedbytesdemo")
Field | Value |
---|---|
ID | 7683f0f9-13fa-4257-840a-8e1cf8b12089 |
Name | uploadedbytesdemo |
File Name | inferencetest.zip |
SHA | b4593d6084e07e9ad1b57367258ca425d7f290540ab4378b8cba168b91d5a86f |
Status | pending_packaging |
Created At | 2023-23-May 15:27:15 |
Updated At | 2023-23-May 15:27:15 |
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
701c991a-a716-4bca-aa69-afd957ff189e | None | ready | remote_inference.zip | b4593d...d5a86f | 2023-23-May 15:26:24 | 2023-23-May 15:27:13 |
7683f0f9-13fa-4257-840a-8e1cf8b12089 | uploadedbytesdemo | pending_packaging | inferencetest.zip | b4593d...d5a86f | 2023-23-May 15:27:15 | 2023-23-May 15:27:15 |
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
Run Task Once
We’ll do both a Run Once task and generate our Run Once Task from our orchestration.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
# Example: run once
import datetime
task_start = datetime.datetime.now()
task = orchestration.run_once(name="simpletaskdemo",
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": inference_connection_name
})
Task Status
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks()
method. This returns an array list of the following.
Parameter | Type | Description |
---|---|---|
id | string | The UUID identifier for the task. |
last run status | string | The last reported status the task. Values are:
|
type | string | The type of the task. Values are:
|
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
while task.status() != "started":
display(task.status())
time.sleep(5)
'pending'
‘pending’
‘pending’
Task Results
We can view the inferences from our logs and verify that new entries were added from our task. We can do that with the task logs()
method.
In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.
time.sleep(60)
task_end = datetime.datetime.now()
display(task_end)
pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 23, 15, 28, 30, 718361)
Warning: Pipeline log size limit exceeded. Please request logs using export_logs
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:27:28.001 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-23 15:27:28.001 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-23 15:27:28.001 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-23 15:27:28.001 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-23 15:27:28.001 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
... | ... | ... | ... | ... |
487 | 2023-05-23 15:27:28.001 | [3.0, 1.5, 1030.0, 8414.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1030.0, 0.0, 47.7654, -122.297, 1750.0, 8414.0, 47.0, 0.0, 0.0] | [340764.53] | 0 |
488 | 2023-05-23 15:27:28.001 | [4.0, 2.75, 2450.0, 15002.0, 1.0, 0.0, 0.0, 5.0, 9.0, 2450.0, 0.0, 47.4268, -122.343, 2650.0, 15055.0, 40.0, 0.0, 0.0] | [508746.75] | 0 |
489 | 2023-05-23 15:27:28.001 | [2.0, 1.0, 1010.0, 4000.0, 1.0, 0.0, 0.0, 3.0, 6.0, 1010.0, 0.0, 47.5536, -122.267, 1040.0, 4000.0, 103.0, 0.0, 0.0] | [435628.56] | 0 |
490 | 2023-05-23 15:27:28.001 | [3.0, 2.5, 1330.0, 1200.0, 3.0, 0.0, 0.0, 3.0, 7.0, 1330.0, 0.0, 47.7034, -122.344, 1330.0, 1206.0, 12.0, 0.0, 0.0] | [342604.4] | 0 |
491 | 2023-05-23 15:27:28.001 | [2.0, 1.75, 2770.0, 19700.0, 2.0, 0.0, 0.0, 3.0, 8.0, 1780.0, 990.0, 47.7581, -122.365, 2360.0, 9700.0, 31.0, 0.0, 0.0] | [536371.25] | 0 |
492 rows × 4 columns
Scheduled Run Task Example
The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args)
. This sets up a task to run on an regular schedule as defined by the schedule
parameter in the cron
service format. For example:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour.
The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.
schedule={'0 0 12 1-15 2 2024'}
For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill
task to keep the task from running any further.
It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.
scheduled_task_start = datetime.datetime.now()
scheduled_task = orchestration.run_scheduled(name="simple_inference_schedule",
schedule="*/5 * * * *",
timeout=120,
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": inference_connection_name
})
while scheduled_task.status() != "started":
display(scheduled_task.status())
time.sleep(5)
'pending'
#wait 420 seconds to give the scheduled event time to finish
time.sleep(420)
scheduled_task_end = datetime.datetime.now()
pipeline.logs(start_datetime = scheduled_task_start, end_datetime = scheduled_task_end)
Warning: Pipeline log size limit exceeded. Please request logs using export_logs
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:30:08.633 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-23 15:30:08.633 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-23 15:30:08.633 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-23 15:30:08.633 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-23 15:30:08.633 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
... | ... | ... | ... | ... |
487 | 2023-05-23 15:30:08.633 | [3.0, 1.5, 1030.0, 8414.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1030.0, 0.0, 47.7654, -122.297, 1750.0, 8414.0, 47.0, 0.0, 0.0] | [340764.53] | 0 |
488 | 2023-05-23 15:30:08.633 | [4.0, 2.75, 2450.0, 15002.0, 1.0, 0.0, 0.0, 5.0, 9.0, 2450.0, 0.0, 47.4268, -122.343, 2650.0, 15055.0, 40.0, 0.0, 0.0] | [508746.75] | 0 |
489 | 2023-05-23 15:30:08.633 | [2.0, 1.0, 1010.0, 4000.0, 1.0, 0.0, 0.0, 3.0, 6.0, 1010.0, 0.0, 47.5536, -122.267, 1040.0, 4000.0, 103.0, 0.0, 0.0] | [435628.56] | 0 |
490 | 2023-05-23 15:30:08.633 | [3.0, 2.5, 1330.0, 1200.0, 3.0, 0.0, 0.0, 3.0, 7.0, 1330.0, 0.0, 47.7034, -122.344, 1330.0, 1206.0, 12.0, 0.0, 0.0] | [342604.4] | 0 |
491 | 2023-05-23 15:30:08.633 | [2.0, 1.75, 2770.0, 19700.0, 2.0, 0.0, 0.0, 3.0, 8.0, 1780.0, 990.0, 47.7581, -122.365, 2360.0, 9700.0, 31.0, 0.0, 0.0] | [536371.25] | 0 |
492 rows × 4 columns
Kill Task
With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
d0b6a83c-a3a1-41f0-98c9-92422d2544c4 | simple_inference_schedule | success | Scheduled Run | True | */5 * * * * | 2023-23-May 15:28:30 | 2023-23-May 15:28:31 |
74046e01-68ab-42a0-bcd2-3493cbc66576 | simpletaskdemo | success | Temporary Run | True | - | 2023-23-May 15:27:15 | 2023-23-May 15:27:26 |
scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
74046e01-68ab-42a0-bcd2-3493cbc66576 | simpletaskdemo | success | Temporary Run | True | - | 2023-23-May 15:27:15 | 2023-23-May 15:27:26 |
Cleanup
With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.
pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ..................................... ok
name | simpleorchestrationpipelinedtzw |
---|---|
created | 2023-05-23 15:26:00.268667+00:00 |
last_updated | 2023-05-23 15:26:00.568944+00:00 |
deployed | False |
tags | |
versions | 24f5d5e9-59fe-4440-9e08-78c0003226df, 9a5afba3-c664-4d57-8c08-fc072d3f549c |
steps | simpleorchestrationmodeldtzw |
4 - Wallaroo ML Workload Orchestration Google BigQuery with House Price Model Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo ML Workload Orchestration House Price Model Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
This tutorial will focus on using Google BigQuery as the data source.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
- Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
- Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
- Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
- Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
- The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.
google-cloud-bigquery
: Specifically for its support for Google BigQuery.google-auth
: Used to authenticate for bigquery.db-dtypes
: Converts the BigQuery results to Apache Arrow table or pandas DataFrame.
Tutorial Resources
- Models:
models/rf_model.onnx
: A model that predicts house price values.
- Data:
data/xtest-1.df.json
anddata/xtest-1k.df.json
: DataFrame JSON inference inputs with 1 input and 1,000 inputs.data/xtest-1k.arrow
: Apache Arrow inference inputs with 1 input and 1,000 inputs.- Sample inference inputs in
CSV
that can be imported into Google BigQuery.data/xtest-1k.df.json
: Random sample housing prices.data/smallinputs.df.json
: Sample housing prices that return results lower than $1.5 million.data/biginputs.df.json
: Sample housing prices that return results higher than $1.5 million.
- SQL queries to create the inputs/outputs tables with schema.
./resources/create_inputs_table.sql
: Inputs table with schema../resources/create_outputs_table.sql
: Outputs table with schema../resources/housrpricesga_inputs.avro
: Avro container of inputs table.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
import json
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
# used for unique connection names
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0+dfca0605e'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Variable Declaration
The following variables will be used for our big query testing.
We’ll use two connections:
- bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
- bigquery_output_connection: The connection that will upload inference results into a BigQuery table.
Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json
and ./bigquery_service_account_output_key.json
that include the service account key file(SAK) information, as well as the dataset and table used.
Field | Included in SAK |
---|---|
type | √ |
project_id | √ |
private_key_id | √ |
private_key | √ |
client_email | √ |
auth_uri | √ |
token_uri | √ |
auth_provider_x509_cert_url | √ |
client_x509_cert_url | √ |
database | 🚫 |
table | 🚫 |
# Setting variables for later steps
workspace_name = f'bigqueryworkspace{suffix}'
pipeline_name = f'bigquerypipeline{suffix}'
model_name = f'bigquerymodel{suffix}'
model_file_name = './models/rf_model.onnx'
bigquery_connection_input_name = 'bigqueryhouseinputs{suffix}'
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open("./bigquery_service_account_input_key.json"))
bigquery_connection_output_name = 'bigqueryhouseoutputs{suffix}'
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open("./bigquery_service_account_output_key.json"))
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(housing_model_control)
name | bigquerypipelinechbp |
---|---|
created | 2023-05-23 15:17:30.123708+00:00 |
last_updated | 2023-05-23 15:17:30.123708+00:00 |
deployed | (none) |
tags | |
versions | fefa4278-04e0-4d1a-8d5b-9cc9c5275832 |
steps |
#deploy the pipeline to set the pipeline steps
pipeline.deploy()
Waiting for deployment - this will take up to 45s ............ ok
name | bigquerypipelinechbp |
---|---|
created | 2023-05-23 15:17:30.123708+00:00 |
last_updated | 2023-05-23 15:17:30.428928+00:00 |
deployed | True |
tags | |
versions | 3dd0653a-12b0-4298-b91b-1e0b712716c5, fefa4278-04e0-4d1a-8d5b-9cc9c5275832 |
steps | bigquerymodelchbp |
Create Connections
We will create the data source connection via the Wallaroo client command create_connection
.
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Required) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
- IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same
name
as an existing data connection will result in an error.
connection_input = wl.create_connection(bigquery_connection_input_name, bigquery_connection_input_type, bigquery_connection_input_argument)
connection_output = wl.create_connection(bigquery_connection_output_name, bigquery_connection_output_type, bigquery_connection_output_argument)
wl.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
bigqueryhouseinputs | BIGQUERY | ***** | 2023-05-23T14:35:26.896064+00:00 | ['bigqueryworkspace'] |
bigqueryhouseoutputs | BIGQUERY | ***** | 2023-05-23T14:35:26.932685+00:00 | ['bigqueryworkspace'] |
bigqueryhouseinputs-jcw | BIGQUERY | ***** | 2023-05-23T14:37:22.103147+00:00 | ['bigqueryworkspace-jcw'] |
bigqueryhouseoutputs-jcw | BIGQUERY | ***** | 2023-05-23T14:37:22.141179+00:00 | ['bigqueryworkspace-jcw'] |
bigqueryhouseinputs{suffix} | BIGQUERY | ***** | 2023-05-23T15:05:29.850628+00:00 | ['bigqueryworkspacekbcy'] |
bigqueryhouseoutputs{suffix} | BIGQUERY | ***** | 2023-05-23T15:05:30.298941+00:00 | ['bigqueryworkspacekbcy'] |
bigqueryforecastinputsrklr | BIGQUERY | ***** | 2023-05-23T15:05:58.206726+00:00 | ['bigquerystatsmodelworkspacerklr'] |
bigqueryforecastoutputsrklr | BIGQUERY | ***** | 2023-05-23T15:05:58.673528+00:00 | ['bigquerystatsmodelworkspacerklr'] |
Get Connection by Name
The Wallaroo client method get_connection(name)
retrieves the connection that matches the name
parameter. We’ll retrieve our connection and store it as inference_source_connection
.
big_query_input_connection = wl.get_connection(name=bigquery_connection_input_name)
big_query_output_connection = wl.get_connection(name=bigquery_connection_output_name)
display(big_query_input_connection)
display(big_query_output_connection)
Field | Value |
---|---|
Name | bigqueryhouseinputs{suffix} |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:05:29.850628+00:00 |
Linked Workspaces | ['bigqueryworkspacekbcy'] |
Field | Value |
---|---|
Name | bigqueryhouseoutputs{suffix} |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:05:30.298941+00:00 |
Linked Workspaces | ['bigqueryworkspacekbcy'] |
Add Connection to Workspace
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.
workspace.add_connection(bigquery_connection_input_name)
workspace.add_connection(bigquery_connection_output_name)
workspace.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
bigqueryhouseinputs{suffix} | BIGQUERY | ***** | 2023-05-23T15:05:29.850628+00:00 | ['bigqueryworkspacekbcy', 'bigqueryworkspacechbp'] |
bigqueryhouseoutputs{suffix} | BIGQUERY | ***** | 2023-05-23T15:05:30.298941+00:00 | ['bigqueryworkspacekbcy', 'bigqueryworkspacechbp'] |
Big Query Connection Inference Example
We can test the BigQuery connection with a simple inference to our deployed pipeline. We’ll request the data, format the table into a pandas DataFrame, then submit it for an inference request.
Create Google Credentials
From our BigQuery request, we’ll create the credentials for our BigQuery connection.
bigquery_input_credentials = service_account.Credentials.from_service_account_info(
big_query_input_connection.details())
bigquery_output_credentials = service_account.Credentials.from_service_account_info(
big_query_output_connection.details())
Connect to Google BigQuery
We can now generate a client from our connection details, specifying the project that was included in the big_query_connection
details.
bigqueryinputclient = bigquery.Client(
credentials=bigquery_input_credentials,
project=big_query_input_connection.details()['project_id']
)
bigqueryoutputclient = bigquery.Client(
credentials=bigquery_output_credentials,
project=big_query_output_connection.details()['project_id']
)
Query Data
Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json
. The table is expected to be in the format of the file ./data/xtest-1k.df.json
.
inference_dataframe_input = bigqueryinputclient.query(
f"""
SELECT tensor
FROM {big_query_input_connection.details()['dataset']}.{big_query_input_connection.details()['table']}"""
).to_dataframe()
inference_dataframe_input.head(5)
tensor | |
---|---|
0 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] |
1 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] |
2 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] |
3 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] |
4 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] |
Sample Inference
With our data retrieved, we’ll perform an inference and display the results.
result = pipeline.infer(inference_dataframe_input)
display(result.head(5))
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-23 15:17:47.498 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-23 15:17:47.498 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-23 15:17:47.498 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
Upload the Results
With the query complete, we’ll upload the results back to the BigQuery dataset.
output_table = bigqueryoutputclient.get_table(f"{big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}")
bigqueryoutputclient.insert_rows_from_dataframe(
output_table,
dataframe=result.rename(columns={"in.tensor":"in_tensor", "out.variable":"out_variable"})
)
[[], []]
Wallaroo ML Workload Orchestration Example
With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
- Use the
bigquery_remote_inference
to open a connection to the input and output tables. - Deploy the pipeline.
- Perform an inference with the input data.
- Save the inference results to the output table.
- Undeploy the pipeline.
This sample script is stored in bigquery_remote_inference/main.py
with an requirements.txt
file having the specific libraries for the Google BigQuery connection., and packaged into the orchestration as ./bigquery_remote_inference/bigquery_remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip
will be uploaded and saved to the variable orchestration
. Then we will loop until the uploaded orchestration’s status
displays ready
.
orchestration = wl.upload_orchestration(path="./bigquery_remote_inference/bigquery_remote_inference.zip")
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
78ea6222-c102-4eb1-9705-166233a12257 | None | ready | bigquery_remote_inference.zip | 582f33...a6957c | 2023-23-May 15:17:48 | 2023-23-May 15:18:39 |
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
Run Task Once
We’ll do both a Run Once task and generate our Run Once Task from our orchestration.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
We’ll display the last 5 rows of our BigQuery output table, then start the task that will perform the same inference we did above.
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY time DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
time | in_tensor | out_variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
1 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
2 | 2023-05-23 15:17:47.498 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
3 | 2023-05-23 15:17:47.498 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
4 | 2023-05-23 15:17:47.498 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
# Example: run once
import datetime
task_start = datetime.datetime.now()
task = orchestration.run_once(name="big query single run", json_args={})
task
Field | Value |
---|---|
ID | 59b20af0-4f2e-4371-9175-e0b69a94fb91 |
Name | big query single run |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-23-May 15:18:46 |
Updated At | 2023-23-May 15:18:46 |
Task Status
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks()
method. This returns an array list of the following.
Parameter | Type | Description |
---|---|---|
id | string | The UUID identifier for the task. |
last run status | string | The last reported status the task. Values are:
|
type | string | The type of the task. Values are:
|
schedule | string | The schedule for the task. If a run once task, the schedule will be - . |
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
while task.status() != "started":
display(task.status())
time.sleep(5)
'pending'
‘pending’
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
59b20af0-4f2e-4371-9175-e0b69a94fb91 | big query single run | failure | Temporary Run | True | - | 2023-23-May 15:18:46 | 2023-23-May 15:18:51 |
Task Results
We can view the inferences from our logs and verify that new entries were added from our task. We’ll query the last 5 rows of our inference output table after a wait of 60 seconds.
time.sleep(60)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY time DESC LIMIT 5"""
).to_dataframe()
display(task_inference_results)
time | in_tensor | out_variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
1 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
2 | 2023-05-23 15:17:47.498 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
3 | 2023-05-23 15:17:47.498 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
4 | 2023-05-23 15:17:47.498 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
Scheduled Run Task Example
The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args)
. This sets up a task to run on an regular schedule as defined by the schedule
parameter in the cron
service format. For example:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour.
The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.
schedule={'0 0 12 1-15 2 2024'}
For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill
task to keep the task from running any further.
It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY time DESC LIMIT 5"""
).to_dataframe()
display(task_inference_results.tail(5))
scheduled_task = orchestration.run_scheduled(name="simple_inference_schedule", schedule="*/5 * * * *", timeout=120, json_args={})
time | in_tensor | out_variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
1 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
2 | 2023-05-23 15:17:47.498 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
3 | 2023-05-23 15:17:47.498 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
4 | 2023-05-23 15:17:47.498 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
while scheduled_task.status() != "started":
display(scheduled_task.status())
time.sleep(5)
'pending'
#wait 420 seconds to give the scheduled event time to finish
time.sleep(420)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY time DESC LIMIT 5"""
).to_dataframe()
display(task_inference_results.tail(5))
time | in_tensor | out_variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
1 | 2023-05-23 15:17:47.498 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
2 | 2023-05-23 15:17:47.498 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
3 | 2023-05-23 15:17:47.498 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
4 | 2023-05-23 15:17:47.498 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
Kill Task
With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.
scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
Cleanup
With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.
pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ..................................... ok
name | bigquerypipelinechbp |
---|---|
created | 2023-05-23 15:17:30.123708+00:00 |
last_updated | 2023-05-23 15:17:30.428928+00:00 |
deployed | False |
tags | |
versions | 3dd0653a-12b0-4298-b91b-1e0b712716c5, fefa4278-04e0-4d1a-8d5b-9cc9c5275832 |
steps | bigquerymodelchbp |
5 - Wallaroo ML Workload Orchestration Google BigQuery with Statsmodel Forecast Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo ML Workload Orchestrations and BigQuery Connections with Statsmodel Tutorial
This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.
Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
Definitions
- Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
- Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
- Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.
This tutorial will focus on using Google BigQuery as the data source for supplying the inference data to perform inferences through a Statsmodel ML model.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
- Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
- Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
- Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
- Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
- The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.
google-cloud-bigquery
: Specifically for its support for Google BigQuery.google-auth
: Used to authenticate for bigquery.db-dtypes
: Converts the BigQuery results to Apache Arrow table or pandas DataFrame.
Tutorial Resources
- Models:
models/bike_day_model.pkl
: The statsmodel model predicts how many bikes will be rented on each of the next 7 days, based on the previous 7 days’ bike rentals, temperature, and wind speed. This model only accepts shapes (7,4) - 7 rows (representing the last 7 days) and 4 columns (representing the fieldstemp
,holiday
,workingday
,windspeed
). Additional files to support this example are:infer.py
: The inference script that is part of thestatsmodel
.
- Data:
data/day.csv
: Data used to train the samplestatsmodel
example.data/bike_day_eval.json
: Data used for inferences. This will be transferred to a BigQuery table as shown in the demonstration.
- Resources:
resources/bigquery_service_account_input_key.json
: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference data.resources/bigquery_service_account_output_key.json
: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference result data.resources/statsmodel_forecast_inputs.sql
: SQL script to create the inputs table schema, populated with the values from./data/day.csv
.resources/statsmodel_forecast_outputs.sql
: SQL script to create the outputs table schema.
Initial Steps
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Load Libraries
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import time
import json
# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes
import string
import random
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0+dfca0605e'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
Variable Declaration
The following variables will be used for our big query testing.
We’ll use two connections:
- bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
- bigquery_output_connection: The connection that will upload inference results into a BigQuery table.
Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json
and ./bigquery_service_account_output_key.json
that include the service account key file(SAK) information, as well as the dataset and table used.
Field | Included in SAK |
---|---|
type | √ |
project_id | √ |
private_key_id | √ |
private_key | √ |
client_email | √ |
auth_uri | √ |
token_uri | √ |
auth_provider_x509_cert_url | √ |
client_x509_cert_url | √ |
database | 🚫 |
table | 🚫 |
# Setting variables for later steps
workspace_name = f'bigquerystatsmodelworkspace{suffix}'
pipeline_name = f'bigquerystatsmodelpipeline{suffix}'
model_name = f'bigquerystatsmodelmodel{suffix}'
model_file_name = './models/bike_day_model.pkl'
bigquery_connection_input_name = f'bigqueryforecastinputs{suffix}'
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open('./resources/bigquery_service_account_input_key.json'))
bigquery_connection_output_name = f'bigqueryforecastoutputs{suffix}'
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open('./resources/bigquery_service_account_output_key.json'))
Helper Methods
The following helper methods are used to either create or get workspaces, pipelines, and connections.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
bike_day_model = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(runtime="python", tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(bike_day_model)
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:19:43.097185+00:00 |
deployed | (none) |
tags | |
versions | fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps |
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ............. ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:19:43.390612+00:00 |
deployed | True |
tags | |
versions | ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
Sample Inferences
We’ll perform some quick sample inferences with the local file data to verity the pipeline deployed and is ready for inferences. Once done, we’ll undeploy the pipeline.
## perform inferences
results = pipeline.infer_from_file('./data/bike_day_eval.json', data_format="custom-json")
print(results)
[{'forecast': [1882.3784555157672, 2130.607915701861, 2340.84005381799, 2895.754978552066, 2163.657515565616, 1509.1792126509536, 2431.1838923957016]}]
Create Connections
We will create the data source connection via the Wallaroo client command create_connection
.
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Required) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
- IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same
name
as an existing data connection will result in an error.
See the statsmodel_forecast_inputs
and statsmodel_forecast_outputs
details listed above for the table schema used for our example.
connection_input = wl.create_connection(bigquery_connection_input_name, bigquery_connection_input_type, bigquery_connection_input_argument)
connection_output = wl.create_connection(bigquery_connection_output_name, bigquery_connection_output_type, bigquery_connection_output_argument)
display(connection_input)
display(connection_output)
Field | Value |
---|---|
Name | bigqueryforecastinputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.159691+00:00 |
Linked Workspaces | [] |
Field | Value |
---|---|
Name | bigqueryforecastoutputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.195628+00:00 |
Linked Workspaces | [] |
Get Connection by Name
The Wallaroo client method get_connection(name)
retrieves the connection that matches the name
parameter. We’ll retrieve our connection and store it as inference_source_connection
.
big_query_input_connection = wl.get_connection(name=bigquery_connection_input_name)
big_query_output_connection = wl.get_connection(name=bigquery_connection_output_name)
display(big_query_input_connection)
display(big_query_output_connection)
Field | Value |
---|---|
Name | bigqueryforecastinputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.159691+00:00 |
Linked Workspaces | [] |
Field | Value |
---|---|
Name | bigqueryforecastoutputsgztp |
Connection Type | BIGQUERY |
Details | ***** |
Created At | 2023-05-23T15:19:58.195628+00:00 |
Linked Workspaces | [] |
Add Connection to Workspace
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.
workspace.add_connection(bigquery_connection_input_name)
workspace.add_connection(bigquery_connection_output_name)
workspace.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
bigqueryforecastinputsgztp | BIGQUERY | ***** | 2023-05-23T15:19:58.159691+00:00 | ['bigquerystatsmodelworkspacegztp'] |
bigqueryforecastoutputsgztp | BIGQUERY | ***** | 2023-05-23T15:19:58.195628+00:00 | ['bigquerystatsmodelworkspacegztp'] |
Big Query Connection Inference Example
We can test the BigQuery connection with a simple inference to our deployed pipeline. We’ll request the data, format the table into a pandas DataFrame, then submit it for an inference request.
Create Google Credentials
From our BigQuery request, we’ll create the credentials for our BigQuery connection.
bigquery_input_credentials = service_account.Credentials.from_service_account_info(
big_query_input_connection.details())
bigquery_output_credentials = service_account.Credentials.from_service_account_info(
big_query_output_connection.details())
Connect to Google BigQuery
We can now generate a client from our connection details, specifying the project that was included in the big_query_connection
details.
bigqueryinputclient = bigquery.Client(
credentials=bigquery_input_credentials,
project=big_query_input_connection.details()['project_id']
)
bigqueryoutputclient = bigquery.Client(
credentials=bigquery_output_credentials,
project=big_query_output_connection.details()['project_id']
)
Query Data
Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json
.
We’ll grab the last 7 days of data - with every record assumed to be one day - and use that for our inference request.
inference_dataframe_input = bigqueryinputclient.query(
f"""
(select dteday, temp, holiday, workingday, windspeed
FROM {big_query_input_connection.details()['dataset']}.{big_query_input_connection.details()['table']}
ORDER BY dteday DESC LIMIT 7)
ORDER BY dteday
"""
).to_dataframe().drop(columns=['dteday'])
# convert to a dict, show the first 7 rows
display(inference_dataframe_input.to_dict())
{'temp': {0: 0.291304,
1: 0.243333,
2: 0.254167,
3: 0.253333,
4: 0.253333,
5: 0.255833,
6: 0.215833},
'holiday': {0: 1, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0},
'workingday': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0, 5: 0, 6: 1},
'windspeed': {0: 0.168726,
1: 0.316546,
2: 0.350133,
3: 0.155471,
4: 0.124383,
5: 0.350754,
6: 0.154846}}
Sample Inference
With our data retrieved, we’ll perform an inference and display the results.
#deploy the pipeline
pipeline.deploy()
ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:20:00.500498+00:00 |
deployed | True |
tags | |
versions | f0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
results = pipeline.infer(inference_dataframe_input.to_dict())
display(results[0]['forecast'])
[1231.2556997246595,
1627.3643469089343,
1674.3769827243134,
1621.9273295873882,
1140.7465817903185,
1211.5223974364667,
1457.1896450382922]
Upload the Results
With the query complete, we’ll upload the results back to the BigQuery dataset.
output_table = bigqueryoutputclient.get_table(f"{big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}")
job = bigqueryoutputclient.query(
f"""
INSERT {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
VALUES
(current_timestamp(), "{results[0]['forecast']}")
"""
)
# Get the last insert to the output table to verify
# wait 10 seconds for the insert to finish
time.sleep(10)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Wallaroo ML Workload Orchestration Example
With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
- Use the
bigquery_remote_inference
to open a connection to the input and output tables. - Deploy the pipeline.
- Perform an inference with the input data.
- Save the inference results to the output table.
- Undeploy the pipeline.
This sample script is stored in bigquery_statsmodel_remote_inference/main.py
with an requirements.txt
file having the specific libraries for the Google BigQuery connection., and packaged into the orchestration as ./bigquery_statsmodel_remote_inference/bigquery_statsmodel_remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip
will be uploaded and saved to the variable orchestration
. Then we will loop until the orchestration status is ready
.
pipeline.deploy()
ok
name | bigquerystatsmodelpipelinegztp |
---|---|
created | 2023-05-23 15:19:43.097185+00:00 |
last_updated | 2023-05-23 15:20:13.581488+00:00 |
deployed | True |
tags | |
versions | 2af13cbf-2dae-4dc9-85fa-ae06c04b6a54, f0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf |
steps | bigquerystatsmodelmodelgztp |
orchestration = wl.upload_orchestration(path="./bigquery_remote_inference/bigquery_remote_inference.zip")
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
2186543f-f7f6-46b2-8334-63d73ddb0204 | None | ready | bigquery_remote_inference.zip | 66945c...e28259 | 2023-23-May 15:20:13 | 2023-23-May 15:21:06 |
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
Run Task Once
We’ll do both a Run Once task and generate our Run Once Task from our orchestration.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
We’ll display the last 5 rows of our BigQuery output table, then start the task that will perform the same inference we did above.
# Get the last insert to the output table to verify
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
# Example: run once
task = orchestration.run_once(name="big query statsmodel run once", json_args={})
task
Field | Value |
---|---|
ID | edf924ae-5a17-4596-adbe-b888b7102d33 |
Name | big query statsmodel run once |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-23-May 15:21:11 |
Updated At | 2023-23-May 15:21:11 |
Task Status
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks()
method. This returns an array list of the following.
Parameter | Type | Description |
---|---|---|
id | string | The UUID identifier for the task. |
last run status | string | The last reported status the task. Values are:
|
type | string | The type of the task. Values are:
|
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
while task.status() != "started":
display(task.status())
time.sleep(5)
'pending'
‘pending’
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
edf924ae-5a17-4596-adbe-b888b7102d33 | big query statsmodel run once | failure | Temporary Run | True | - | 2023-23-May 15:21:11 | 2023-23-May 15:21:16 |
Task Results
We can view the inferences from our logs and verify that new entries were added from our task. We’ll query the last 5 rows of our inference output table after a wait of 60 seconds.
time.sleep(30)
# Get the last insert to the output table to verify
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Scheduled Run Task Example
The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args)
. This sets up a task to run on an regular schedule as defined by the schedule
parameter in the cron
service format. For example:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour.
The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.
schedule={'0 0 12 1-15 2 2024'}
For our example, we will create a scheduled task to run every 1 minute, display the inference results, then use the Orchestration kill
task to keep the task from running any further.
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
scheduled_task = orchestration.run_scheduled(name="simple_statsmodel_inference_schedule", schedule="*/1 * * * *", timeout=120, json_args={})
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
while scheduled_task.status() != "started":
display(scheduled_task.status())
time.sleep(5)
'pending'
#wait 120 seconds to give the scheduled event time to finish
time.sleep(60)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Kill Task
With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.
scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
Running Task with Custom Parameters
Right now, our task assumes the workspace, pipeline, and connections all have the names we defined above. For this example, we’ll set up a new pipeline with the same pipeline step, but name it bigquerystatsmodelpipeline02
.
When we create our task, we’ll add that pipeline name as an argument to our task. Within our orchestrations main.py
there is a code block that takes in the task arguments, then sets the pipeline name:
arguments = wl.task_args()
if "pipeline_name" in arguments:
pipeline_name = arguments['pipeline_name']
else:
pipeline_name="bigquerystatsmodelpipeline"
We’ll pass along our new pipeline name as { "pipeline_name": "bigquerystatsmodelpipeline02" }
and track the task progress as before.
newpipeline_name = 'bigquerystatsmodelpipeline02'
pipeline02 = get_pipeline(newpipeline_name)
# add the model as the pipeline step
pipeline02.add_model_step(bike_day_model)
name | bigquerystatsmodelpipeline02 |
---|---|
created | 2023-05-23 15:23:02.136077+00:00 |
last_updated | 2023-05-23 15:23:02.136077+00:00 |
deployed | (none) |
tags | |
versions | b2c56497-7a50-434a-8973-6d91af80f143 |
steps |
# required to set the pipeline steps
pipeline02.deploy()
Waiting for deployment - this will take up to 45s ........ ok
name | bigquerystatsmodelpipeline02 |
---|---|
created | 2023-05-23 15:23:02.136077+00:00 |
last_updated | 2023-05-23 15:24:48.381545+00:00 |
deployed | True |
tags | |
versions | 9dc1f8c5-e9ab-48e7-9b54-290fed5bb28c, 1b44f058-9ab6-44dd-8de6-c52acfdf4fc5, b2c56497-7a50-434a-8973-6d91af80f143 |
steps | bigquerystatsmodelmodelgztp |
# Get the last insert to the output table to verify
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
# Generate the run once task with the new parameter
task = orchestration.run_once(name="parameter sample", json_args={ "pipeline_name": newpipeline_name })
display(task)
# wait for the task to run
while task.status() != "started":
display(task.status())
time.sleep(5)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Field | Value |
---|---|
ID | 2fb3cba9-d515-44da-bd9e-59661b4372ef |
Name | parameter sample |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-23-May 15:24:59 |
Updated At | 2023-23-May 15:24:59 |
'pending'
‘pending’
# wait 30 seconds then display the results
time.sleep(30)
task_inference_results = bigqueryoutputclient.query(
f"""
SELECT *
FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
ORDER BY date DESC
LIMIT 5
"""
).to_dataframe()
display(task_inference_results)
date | forecast | |
---|---|---|
0 | 2023-05-23 15:20:01.622083+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
1 | 2023-05-23 15:06:06.678170+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
2 | 2023-05-23 14:57:00.200596+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
3 | 2023-05-19 22:39:01.932839+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
4 | 2023-05-19 22:33:27.655703+00:00 | [1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922] |
Conclusion
With that, our tutorial is over. Please feel free to use this tutorial code in your own Wallaroo related projects. Our last task will be to undeploy our pipelines to restore the resources back to the Wallaroo instance.
pipeline.undeploy()
pipeline02.undeploy()
ok
Waiting for undeployment - this will take up to 45s ..................................... ok
name | bigquerystatsmodelpipeline02 |
---|---|
created | 2023-05-23 15:23:02.136077+00:00 |
last_updated | 2023-05-23 15:24:48.381545+00:00 |
deployed | False |
tags | |
versions | 9dc1f8c5-e9ab-48e7-9b54-290fed5bb28c, 1b44f058-9ab6-44dd-8de6-c52acfdf4fc5, b2c56497-7a50-434a-8973-6d91af80f143 |
steps | bigquerystatsmodelmodelgztp |
6 - Wallaroo ML Workload Orchestration Comprehensive Tutorial
This can be downloaded as part of the Wallaroo Tutorials repository.
ML Workload Orchestration Comprehensive Tutorial
This tutorial provides a complete set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration.
Wallaroo provides data connections, orchestrations, and tasks to provide organizations with a method of creating and managing automated tasks that can either be run on demand, on a regular schedule, or as a service so they respond to requests.
Object | Description |
---|---|
Orchestration | A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance |
Task | An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service. |
Connection | Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations. |
A typical flow in the orchestration, task and connection life cycle is:
- (Optional) A connection is defined with information such as username, connection URL, tokens, etc.
- One or more connections are applied to a workspace for users to implement in their code or orchestrations.
- An orchestration is created to perform some set instructions. For example:
- Deploy a pipeline, request data from an external service, store the results in an external database, then undeploy the pipeline.
- Download a ML Model then replace a current pipeline step with the new version.
- Collect log files from a deployed pipeline once every hour and submit it to a Kafka or other service.
- A task is created that specifies the orchestration to perform and the schedule:
- Run once.
- Run on a schedule (based on
cron
like settings). - Run as a service to be run whenever requested.
- Once the use for a task is complete, it is killed and its schedule or service removed.
Tutorial Goals
The tutorial will demonstrate the following:
- Create a simple connection to retrieve an Apache Arrow table file from a GitHub registry.
- Create an orchestration that retrieves the Apache Arrow table file from the location defined by the connection, deploy a pipeline, perform an inference, then undeploys the pipeline.
- Implement the orchestration as a task that runs every minute.
- Display the logs from the pipeline after 5 minutes to verify the task is running.
Tutorial Required Libraries
The following libraries are required for this tutorial, and included by default in a Wallaroo instance’s JupyterHub service.
IMPORTANT NOTE: These libraries are already installed in the Wallaroo JupyterHub service. Do not uninstall and reinstall the Wallaroo SDK with the command below.
wallaroo: The Wallaroo SDK.
pandas: The pandas data analysis library.
pyarrow: The Apache Arrow Python library.
The specific versions used are set in the file ./resources/requirements.txt
. Supported libraries are automatically installed with the pypi
or conda
commands. For example, from the root of this tutorials folder:
pip install -r ./resources/requirements.txt
Initialization
The first step is to connect to a Wallaroo instance. We’ll load the libraries and set our client connection settings
Workspace, Model and Pipeline Setup
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
import wallaroo
from wallaroo.object import EntityNotFoundError
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import requests
# Used to create unique workspace and pipeline names
import string
import random
# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'tgiq'
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
# Setting variables for later steps
workspace_name = f'orchestrationworkspace{suffix}'
pipeline_name = f'orchestrationpipeline{suffix}'
model_name = f'orchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'
connection_name = f'houseprice_arrow_table{suffix}'
Helper Methods
The following helper methods are used to either create or get workspaces and pipelines.
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
Create the Workspace and Pipeline
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
pipeline = get_pipeline(pipeline_name)
Upload the Model and Deploy Pipeline
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.add_model_step(housing_model_control)
name | orchestrationpipelinetgiq |
---|---|
created | 2023-05-22 19:54:06.933674+00:00 |
last_updated | 2023-05-22 19:54:06.933674+00:00 |
deployed | (none) |
tags | |
versions | ed5bf4b1-1d5d-4ff9-8a23-2c1e44a8e672 |
steps |
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ........................ ok
name | orchestrationpipelinetgiq |
---|---|
created | 2023-05-22 19:54:06.933674+00:00 |
last_updated | 2023-05-22 19:54:08.008312+00:00 |
deployed | True |
tags | |
versions | a7336408-20ef-4b65-8167-c2f80c968a21, ed5bf4b1-1d5d-4ff9-8a23-2c1e44a8e672 |
steps | orchestrationmodeltgiq |
Sample Inferences
We’ll perform some quick sample inferences using an Apache Arrow table as the input. Once that’s finished, we’ll undeploy the pipeline and return the resources back to the Wallaroo instance.
# sample inferences
batch_inferences = pipeline.infer_from_file('./data/xtest-1k.arrow')
large_inference_result = batch_inferences.to_pandas()
display(large_inference_result.head(20))
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-22 19:54:33.671 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-22 19:54:33.671 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-22 19:54:33.671 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-22 19:54:33.671 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-22 19:54:33.671 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
5 | 2023-05-22 19:54:33.671 | [3.0, 2.0, 2140.0, 4923.0, 1.0, 0.0, 0.0, 4.0, 8.0, 1070.0, 1070.0, 47.6902, -122.339, 1470.0, 4923.0, 86.0, 0.0, 0.0] | [668288.0] | 0 |
6 | 2023-05-22 19:54:33.671 | [4.0, 3.5, 3590.0, 5334.0, 2.0, 0.0, 2.0, 3.0, 9.0, 3140.0, 450.0, 47.6763, -122.267, 2100.0, 6250.0, 9.0, 0.0, 0.0] | [1004846.5] | 0 |
7 | 2023-05-22 19:54:33.671 | [3.0, 2.0, 1280.0, 960.0, 2.0, 0.0, 0.0, 3.0, 9.0, 1040.0, 240.0, 47.602, -122.311, 1280.0, 1173.0, 0.0, 0.0, 0.0] | [684577.2] | 0 |
8 | 2023-05-22 19:54:33.671 | [4.0, 2.5, 2820.0, 15000.0, 2.0, 0.0, 0.0, 4.0, 9.0, 2820.0, 0.0, 47.7255, -122.101, 2440.0, 15000.0, 29.0, 0.0, 0.0] | [727898.1] | 0 |
9 | 2023-05-22 19:54:33.671 | [3.0, 2.25, 1790.0, 11393.0, 1.0, 0.0, 0.0, 3.0, 8.0, 1790.0, 0.0, 47.6297, -122.099, 2290.0, 11894.0, 36.0, 0.0, 0.0] | [559631.1] | 0 |
10 | 2023-05-22 19:54:33.671 | [3.0, 1.5, 1010.0, 7683.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1010.0, 0.0, 47.72, -122.318, 1550.0, 7271.0, 61.0, 0.0, 0.0] | [340764.53] | 0 |
11 | 2023-05-22 19:54:33.671 | [3.0, 2.0, 1270.0, 1323.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1270.0, 0.0, 47.6934, -122.342, 1330.0, 1323.0, 8.0, 0.0, 0.0] | [442168.06] | 0 |
12 | 2023-05-22 19:54:33.671 | [4.0, 1.75, 2070.0, 9120.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1250.0, 820.0, 47.6045, -122.123, 1650.0, 8400.0, 57.0, 0.0, 0.0] | [630865.6] | 0 |
13 | 2023-05-22 19:54:33.671 | [4.0, 1.0, 1620.0, 4080.0, 1.5, 0.0, 0.0, 3.0, 7.0, 1620.0, 0.0, 47.6696, -122.324, 1760.0, 4080.0, 91.0, 0.0, 0.0] | [559631.1] | 0 |
14 | 2023-05-22 19:54:33.671 | [4.0, 3.25, 3990.0, 9786.0, 2.0, 0.0, 0.0, 3.0, 9.0, 3990.0, 0.0, 47.6784, -122.026, 3920.0, 8200.0, 10.0, 0.0, 0.0] | [909441.1] | 0 |
15 | 2023-05-22 19:54:33.671 | [4.0, 2.0, 1780.0, 19843.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1780.0, 0.0, 47.4414, -122.154, 2210.0, 13500.0, 52.0, 0.0, 0.0] | [313096.0] | 0 |
16 | 2023-05-22 19:54:33.671 | [4.0, 2.5, 2130.0, 6003.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2130.0, 0.0, 47.4518, -122.12, 1940.0, 4529.0, 11.0, 0.0, 0.0] | [404040.8] | 0 |
17 | 2023-05-22 19:54:33.671 | [3.0, 1.75, 1660.0, 10440.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1040.0, 620.0, 47.4448, -121.77, 1240.0, 10380.0, 36.0, 0.0, 0.0] | [292859.5] | 0 |
18 | 2023-05-22 19:54:33.671 | [3.0, 2.5, 2110.0, 4118.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2110.0, 0.0, 47.3878, -122.153, 2110.0, 4044.0, 25.0, 0.0, 0.0] | [338357.88] | 0 |
19 | 2023-05-22 19:54:33.671 | [4.0, 2.25, 2200.0, 11250.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1300.0, 900.0, 47.6845, -122.201, 2320.0, 10814.0, 94.0, 0.0, 0.0] | [682284.6] | 0 |
Create Wallaroo Connection
Connections are created at the Wallaroo instance level, typically by a MLOps or DevOps engineer, then applied to a workspace.
For this section:
- We will create a sample connection that just has a URL to the same Arrow table file we used in the previous step.
- We’ll apply the data connection to the workspace above.
- For a quick demonstration, we’ll use the connection to retrieve the Arrow table file and use it for a quick sample inference.
Create Connection
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Requires) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
We’ll create the connection named houseprice_arrow_table
, set it to the type HTTPFILE
, and provide the details as 'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'
- the location for our sample Arrow table inference input.
wl.create_connection(connection_name,
"HTTPFILE",
{'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
)
Field | Value |
---|---|
Name | houseprice_arrow_tabletgiq |
Connection Type | HTTPFILE |
Details | ***** |
Created At | 2023-05-22T19:54:33.723860+00:00 |
Linked Workspaces | [] |
List Data Connections
The Wallaroo Client list_connections()
method lists all connections for the Wallaroo instance.
wl.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
houseprice_arrow_tabletgiq | HTTPFILE | ***** | 2023-05-22T19:54:33.723860+00:00 | [] |
Add Connection to Workspace
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add this connection to our sample workspace.
workspace.add_connection(connection_name)
Get Connection
Connections are retrieved by the Wallaroo Client get_connection(name)
method.
connection = wl.get_connection(connection_name)
Connection Details
The Connection method details()
retrieves a the connection details()
as a dict
.
display(connection.details())
{'host': 'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
Using a Connection Example
For this example, the connection will be used to retrieve the Apache Arrow file referenced in the connection, and use that to turn it into an Apache Arrow table, then use that for a sample inference.
# Deploy the pipeline
pipeline.deploy()
# Retrieve the file
# set accept as apache arrow table
headers = {
'Accept': 'application/vnd.apache.arrow.file'
}
response = requests.get(
connection.details()['host'],
headers=headers
)
# Arrow table is retrieved
with pa.ipc.open_file(response.content) as reader:
arrow_table = reader.read_all()
results = pipeline.infer(arrow_table)
result_table = results.to_pandas()
display(result_table.head(20))
ok
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-22 19:54:34.320 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-22 19:54:34.320 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-22 19:54:34.320 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-22 19:54:34.320 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-22 19:54:34.320 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
5 | 2023-05-22 19:54:34.320 | [3.0, 2.0, 2140.0, 4923.0, 1.0, 0.0, 0.0, 4.0, 8.0, 1070.0, 1070.0, 47.6902, -122.339, 1470.0, 4923.0, 86.0, 0.0, 0.0] | [668288.0] | 0 |
6 | 2023-05-22 19:54:34.320 | [4.0, 3.5, 3590.0, 5334.0, 2.0, 0.0, 2.0, 3.0, 9.0, 3140.0, 450.0, 47.6763, -122.267, 2100.0, 6250.0, 9.0, 0.0, 0.0] | [1004846.5] | 0 |
7 | 2023-05-22 19:54:34.320 | [3.0, 2.0, 1280.0, 960.0, 2.0, 0.0, 0.0, 3.0, 9.0, 1040.0, 240.0, 47.602, -122.311, 1280.0, 1173.0, 0.0, 0.0, 0.0] | [684577.2] | 0 |
8 | 2023-05-22 19:54:34.320 | [4.0, 2.5, 2820.0, 15000.0, 2.0, 0.0, 0.0, 4.0, 9.0, 2820.0, 0.0, 47.7255, -122.101, 2440.0, 15000.0, 29.0, 0.0, 0.0] | [727898.1] | 0 |
9 | 2023-05-22 19:54:34.320 | [3.0, 2.25, 1790.0, 11393.0, 1.0, 0.0, 0.0, 3.0, 8.0, 1790.0, 0.0, 47.6297, -122.099, 2290.0, 11894.0, 36.0, 0.0, 0.0] | [559631.1] | 0 |
10 | 2023-05-22 19:54:34.320 | [3.0, 1.5, 1010.0, 7683.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1010.0, 0.0, 47.72, -122.318, 1550.0, 7271.0, 61.0, 0.0, 0.0] | [340764.53] | 0 |
11 | 2023-05-22 19:54:34.320 | [3.0, 2.0, 1270.0, 1323.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1270.0, 0.0, 47.6934, -122.342, 1330.0, 1323.0, 8.0, 0.0, 0.0] | [442168.06] | 0 |
12 | 2023-05-22 19:54:34.320 | [4.0, 1.75, 2070.0, 9120.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1250.0, 820.0, 47.6045, -122.123, 1650.0, 8400.0, 57.0, 0.0, 0.0] | [630865.6] | 0 |
13 | 2023-05-22 19:54:34.320 | [4.0, 1.0, 1620.0, 4080.0, 1.5, 0.0, 0.0, 3.0, 7.0, 1620.0, 0.0, 47.6696, -122.324, 1760.0, 4080.0, 91.0, 0.0, 0.0] | [559631.1] | 0 |
14 | 2023-05-22 19:54:34.320 | [4.0, 3.25, 3990.0, 9786.0, 2.0, 0.0, 0.0, 3.0, 9.0, 3990.0, 0.0, 47.6784, -122.026, 3920.0, 8200.0, 10.0, 0.0, 0.0] | [909441.1] | 0 |
15 | 2023-05-22 19:54:34.320 | [4.0, 2.0, 1780.0, 19843.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1780.0, 0.0, 47.4414, -122.154, 2210.0, 13500.0, 52.0, 0.0, 0.0] | [313096.0] | 0 |
16 | 2023-05-22 19:54:34.320 | [4.0, 2.5, 2130.0, 6003.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2130.0, 0.0, 47.4518, -122.12, 1940.0, 4529.0, 11.0, 0.0, 0.0] | [404040.8] | 0 |
17 | 2023-05-22 19:54:34.320 | [3.0, 1.75, 1660.0, 10440.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1040.0, 620.0, 47.4448, -121.77, 1240.0, 10380.0, 36.0, 0.0, 0.0] | [292859.5] | 0 |
18 | 2023-05-22 19:54:34.320 | [3.0, 2.5, 2110.0, 4118.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2110.0, 0.0, 47.3878, -122.153, 2110.0, 4044.0, 25.0, 0.0, 0.0] | [338357.88] | 0 |
19 | 2023-05-22 19:54:34.320 | [4.0, 2.25, 2200.0, 11250.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1300.0, 900.0, 47.6845, -122.201, 2320.0, 10814.0, 94.0, 0.0, 0.0] | [682284.6] | 0 |
Remove Connection from Workspace
The Workspace method remove_connection(connection_name)
removes the connection from the workspace, but does not delete the connection from the Wallaroo instance. This method takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | String (Required) | The name of the connection to be removed |
The previous connection will be removed from the workspace, then the workspace connections displayed to verify it has been removed.
workspace.remove_connection(connection_name)
display(workspace.list_connections())
(no connections)
Delete Connection
The Connection method delete_connection()
removes the connection from the Wallaroo instance, and all attachments in workspaces they were connected to.
connection.delete_connection()
wl.list_connections()
(no connections)
Orchestration Tutorial
The next series of examples will build on what we just did. So far we have:
- Deployed a pipeline, performed sample inferences with a local Apache Arrow file, displayed the results, then undeployed the pipeline.
- Deployed a pipeline, use a Wallaroo connection details to retrieve a remote Apache Arrow file, performed inferences and displayed the results, then undeployed the pipeline.
For the orchestration tutorial, we’ll do the same thing only package it into a separate python script and upload it to the Wallaroo instance, then create a task from that orchestration and perform our sample inferences again.
Orchestration Requirements
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
- The ZIP file should not contain any directories - only files at the top level.
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | Python scripts for the orchestration to run. If the file main.py exists, that will be the entrypoint. Otherwise, if only one .py exists, then that will be the entrypoint. |
Python Library Requirements | (Required) requirements.txt file in the requirements file format. This is in the root of the zip file, and there can only be one requirements.txt file for the orchestration. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
Zip Instructions
In a terminal with the zip
command, assemble artifacts as above and then create the archive. The zip
command is included by default with the Wallaroo JupyterHub service.
zip
commands take the following format, with {zipfilename}.zip
as the zip file to save the artifacts to, and each file thereafter as the files to add to the archive.
zip {zipfilename}.zip file1, file2, file3....
For example, the following command will add the files main.py
and requirements.txt
into the file hello.zip
.
$ zip hello.zip main.py requirements.txt
adding: main.py (deflated 47%)
adding: requirements.txt (deflated 52%)
Orchestration Recommendations
The following recommendations will make using Wallaroo orchestrations
- The version of Python used should match the same version as in the Wallaroo JupyterHub service.
- The same version of the Wallaroo SDK should match the server. For a 2023.2 Wallaroo instance, use the Wallaroo SDK version 2023.2.
- Specify the version of
pip
dependencies. - The
wallaroo.Client
constructorauth_type
argument is ignored. Usingwallaroo.Client()
is sufficient. - The following methods will assist with orchestrations:
wallaroo.in_task()
: ReturnsTrue
if the code is running within an Orchestrator task.wallaroo.task_args()
: Returns aDict
of invocation-specific arguments passed to therun_
calls.
- Use
print
commands so outputs are saved to the task’s log files.
Example requirements.txt file
dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4
Sample Orchestrator
The following orchestrator artifacts are in the directory ./remote_inference
and includes the file main.py
with the following code:
import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd
import pyarrow as pa
import requests
wl = wallaroo.Client()
# Setting variables for later steps
# get the arguments
arguments = wl.task_args()
if "workspace_name" in arguments:
workspace_name = arguments['workspace_name']
else:
workspace_name="orchestrationworkspace"
if "pipeline_name" in arguments:
pipeline_name = arguments['pipeline_name']
else:
pipeline_name="orchestrationpipeline"
if "connection_name" in arguments:
connection_name = arguments['connection_name']
else:
connection_name = "houseprice_arrow_table"
# helper methods to retrieve workspaces and pipelines
def get_workspace(name):
workspace = None
for ws in wl.list_workspaces():
if ws.name() == name:
workspace= ws
if(workspace == None):
workspace = wl.create_workspace(name)
return workspace
def get_pipeline(name):
try:
pipeline = wl.pipelines_by_name(name)[0]
except EntityNotFoundError:
pipeline = wl.build_pipeline(name)
return pipeline
print(f"Getting the workspace {workspace_name}")
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)
print(f"Getting the pipeline {pipeline_name}")
pipeline = get_pipeline(pipeline_name)
pipeline.deploy()
# Get the connection - assuming it will be the only one
inference_source_connection = wl.get_connection(name=connection_name)
print(f"Getting arrow table file")
# Retrieve the file
# set accept as apache arrow table
headers = {
'Accept': 'application/vnd.apache.arrow.file'
}
response = requests.get(
inference_source_connection.details()['host'],
headers=headers
)
# Arrow table is retrieved
with pa.ipc.open_file(response.content) as reader:
arrow_table = reader.read_all()
print("Inference time. Displaying results after.")
# Perform the inference
result = pipeline.infer(arrow_table)
print(result)
pipeline.undeploy()
This is saved to the file ./remote_inference/remote_inference.zip
.
Preparing the Wallaroo Instance
To prepare the Wallaroo instance, we’ll once again create the Wallaroo connection houseprice_arrow_table
and apply it to the workspace.
wl.create_connection(connection_name,
"HTTPFILE",
{'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
)
workspace.add_connection(connection_name)
Upload the Orchestration
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./remote_inference/remote_inference.zip
will be uploaded and saved to the variable orchestration
.
orchestration = wl.upload_orchestration(name="comprehensive sample", path="./remote_inference/remote_inference.zip")
Orchestration Status
The Orchestration method status()
displays the current status of the uploaded orchestration.
Status | Description |
---|---|
— | — |
pending_packaging | The orchestration is uploaded, but packaging hasn’t started yet. |
packaging | The orchestration is being packaged for use with the Wallaroo instance. |
ready | The orchestration is ready for use. |
For this example, the status of the orchestration will be displayed then looped until it has reached status ready
.
import time
while orchestration.status() != 'ready':
print(orchestration.status())
time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
List Orchestrations
Orchestrations are listed with the Wallaroo Client list_orchestrations()
which returns a list of available orchestrations.
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at |
---|---|---|---|---|---|---|
0f90e606-09f8-409b-a306-cb04ec4c011a | comprehensive sample | ready | remote_inference.zip | b88e93...2396fb | 2023-22-May 19:55:15 | 2023-22-May 19:56:09 |
Task Management Tutorial
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered | Purpose |
---|---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. | Single batch, experimentation |
Scheduled | orchestration.run_scheduled() | User provides schedule. Task runs exits whenever schedule dictates. | Recurrent batch ETL. |
Task Run Once
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
For our example, we will pass the workspace, pipeline, and connection into our task.
# Example: run once
import datetime
task_start = datetime.datetime.now()
task = orchestration.run_once(name="house price run once 2", json_args={"workspace_name": workspace_name,
"pipeline_name":pipeline_name,
"connection_name": connection_name
}
)
task
Field | Value |
---|---|
ID | f0e27d6a-6a98-4d26-b240-266f08560c48 |
Name | house price run once 2 |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-22-May 19:58:32 |
Updated At | 2023-22-May 19:58:32 |
taskfail.last_runs()
task id | pod id | status | created at | updated at |
---|---|---|---|---|
5ee51c78-a1c6-41e4-86a6-77110ce26161 | 844902e0-5ff3-4c15-b497-e173aa3ce0d5 | running | 2023-22-May 20:15:08 | 2023-22-May 20:15:08 |
List Tasks
The list of tasks in the Wallaroo instance is retrieved through the Wallaroo Client list_tasks()
method that accepts the following parameters.
Parameter | Type | Description |
---|---|---|
killed | Boolean (Optional Default: False ) | Returns tasks depending on whether they have been issued the kill command. False returns all tasks whether killed or not. True only returns killed tasks. |
This returns an array list of the following in reverse chronological order from updated at
.
Parameter | Type | Description |
---|---|---|
id | string | The UUID identifier for the task. |
last run status | string | The last reported status the task. Values are:
|
type | string | The type of the task. Values are:
|
active | Boolean | True : The task is scheduled or running. False : The task has completed or has been issued the kill command. |
schedule | string | The cron style schedule for the task. If the task is not a scheduled one, then the schedule will be - . |
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
f0e27d6a-6a98-4d26-b240-266f08560c48 | house price run once 2 | running | Temporary Run | True | - | 2023-22-May 19:58:32 | 2023-22-May 19:58:38 |
36509ef8-98da-42a0-913f-e6e929dedb15 | house price run once | success | Temporary Run | True | - | 2023-22-May 19:56:37 | 2023-22-May 19:56:48 |
Task Status
The status of the task is returned with the Task status()
method that returned the tasks status. Tasks can have the following status.
pending
: The task has not been started or is being prepared.started
: The task has started to execute.
while task.status() != "started":
display(task.status())
time.sleep(5)
Task Last Runs History
The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs
method that takes the following arguments.
Parameter | Type | Description |
---|---|---|
status | String (Optional *Default: all ) | Filters the task history by the status . If all , returns all statuses. Status values are:
|
limit | Integer (Optional) | Limits the number of task runs returned. |
This returns the following in reverse chronological order by updated at
.
Parameter | Type | Description |
---|---|---|
task id | string | Task id in UUID format. |
pod id | string | Pod id in UUID format. |
status | string | Status of the task. Status values are:
|
created at | DateTime | Date and time the task was created at. |
updated at | DateTime | Date and time the task was updated. |
task.last_runs()
task id | pod id | status | created at | updated at |
---|---|---|---|---|
f0e27d6a-6a98-4d26-b240-266f08560c48 | 7d9d73d5-df11-44ed-90c1-db0e64c7f9b8 | success | 2023-22-May 19:58:35 | 2023-22-May 19:58:35 |
Task Run Logs
The output of a task is displayed with the Task Run logs()
method that takes the following parameters.
Parameter | Type | Description |
---|---|---|
limit | Integer (Optional) | Limits the lines returned from the task run log. The limit parameter is based on the log tail - starting from the last line of the log file, then working up until the limit of lines is reached. This is useful for viewing final outputs, exceptions, etc. |
The Task Run logs()
returns the log entries as a string list, with each entry as an item in the list.
- IMPORTANT NOTE: It may take around a minute for task run logs to be integrated into the Wallaroo log database.
# give time for the task to complete and the log files entered
time.sleep(60)
recent_run = task.last_runs()[0]
display(recent_run.logs())
2023-22-May 19:59:29 Getting the workspace orchestrationworkspacetgiq
2023-22-May 19:59:29 Getting the pipeline orchestrationpipelinetgiq
2023-22-May 19:59:29 Getting arrow table file
2023-22-May 19:59:29 Inference time. Displaying results after.
2023-22-May 19:59:29 pyarrow.Table
2023-22-May 19:59:29 time: timestamp[ms]
2023-22-May 19:59:29 in.tensor: list not null
2023-22-May 19:59:29 child 0, item: float
2023-22-May 19:59:29 out.variable: list not null
2023-22-May 19:59:29 check_failures: int8
2023-22-May 19:59:29 child 0, inner: float not null
2023-22-May 19:59:29 ----
2023-22-May 19:59:29 time: [[2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,...,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767]]
2023-22-May 19:59:29 in.tensor: [[[4,2.5,2900,5505,2,...,2970,5251,12,0,0],[2,2.5,2170,6361,1,...,2310,7419,6,0,0],...,[3,1.75,2910,37461,1,...,2520,18295,47,0,0],[3,2,2005,7000,1,...,1750,4500,34,0,0]]]
2023-22-May 19:59:29 check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
2023-22-May 19:59:29 out.variable: [[[718013.75],[615094.56],...,[706823.56],[581003]]]
Failed Task Logs
We can create a task that fails and show it in the last_runs
list, then retrieve the logs to display why it failed.
# Example: run once
import datetime
task_start = datetime.datetime.now()
taskfail = orchestration.run_once(name="house price run once 2", json_args={"workspace_name": "bob",
"pipeline_name":"does not exist",
"connection_name": connection_name
}
)
while taskfail.status() != "started":
display(taskfail.status())
time.sleep(5)
Field | Value |
---|---|
ID | 54229bd2-c388-4196-9ce2-f76503a27f99 |
Name | house price run once 2 |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-22-May 20:17:16 |
Updated At | 2023-22-May 20:17:16 |
# time.sleep(60)
taskfail.last_runs()
task id | pod id | status | created at | updated at |
---|---|---|---|---|
54229bd2-c388-4196-9ce2-f76503a27f99 | 79d7fe3e-ac8c-4f9c-8288-c9c207fb0a5e | failure | 2023-22-May 20:17:18 | 2023-22-May 20:17:18 |
# time.sleep(60)
taskfaillogs = taskfail.last_runs()[0].logs()
display(taskfaillogs)
2023-22-May 20:17:22 Getting the workspace bob
2023-22-May 20:17:22 File "/home/jovyan/main.py", line 43, in get_pipeline
2023-22-May 20:17:22 Traceback (most recent call last):
2023-22-May 20:17:22 Getting the pipeline does not exist
2023-22-May 20:17:22 pipeline = wl.pipelines_by_name(name)[0]
2023-22-May 20:17:22 File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/client.py", line 1064, in pipelines_by_name
2023-22-May 20:17:22 raise EntityNotFoundError("Pipeline", {"pipeline_name": pipeline_name})
2023-22-May 20:17:22 wallaroo.object.EntityNotFoundError: Pipeline not found: {'pipeline_name': 'does not exist'}
2023-22-May 20:17:22
2023-22-May 20:17:22 During handling of the above exception, another exception occurred:
2023-22-May 20:17:22 Traceback (most recent call last):
2023-22-May 20:17:22
2023-22-May 20:17:22 File "/home/jovyan/main.py", line 54, in
2023-22-May 20:17:22 pipeline = get_pipeline(pipeline_name)
2023-22-May 20:17:22 pipeline = wl.build_pipeline(name)
2023-22-May 20:17:22 File "/home/jovyan/main.py", line 45, in get_pipeline
2023-22-May 20:17:22 File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/client.py", line 1102, in build_pipeline
2023-22-May 20:17:22 require_dns_compliance(pipeline_name)
2023-22-May 20:17:22 File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/checks.py", line 274, in require_dns_compliance
2023-22-May 20:17:22 wallaroo.object.InvalidNameError: Name 'does not exist is invalid: must be DNS-compatible (ASCII alpha-numeric plus dash (-))
2023-22-May 20:17:22 raise InvalidNameError(
Task Results
We can view the inferences from our logs and verify that new entries were added from our task. In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.
task_end = datetime.datetime.now()
display(task_end)
pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 22, 20, 1, 25, 418564)
Warning: Pipeline log size limit exceeded. Please request logs using export_logs
time | in.tensor | out.variable | check_failures | |
---|---|---|---|---|
0 | 2023-05-22 19:58:49.767 | [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0] | [718013.75] | 0 |
1 | 2023-05-22 19:58:49.767 | [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0] | [615094.56] | 0 |
2 | 2023-05-22 19:58:49.767 | [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0] | [448627.72] | 0 |
3 | 2023-05-22 19:58:49.767 | [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0] | [758714.2] | 0 |
4 | 2023-05-22 19:58:49.767 | [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0] | [513264.7] | 0 |
... | ... | ... | ... | ... |
501 | 2023-05-22 19:58:49.767 | [3.0, 2.5, 1570.0, 1433.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1570.0, 0.0, 47.6858, -122.336, 1570.0, 2652.0, 4.0, 0.0, 0.0] | [557391.25] | 0 |
502 | 2023-05-22 19:58:49.767 | [3.0, 2.5, 2390.0, 15669.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2390.0, 0.0, 47.7446, -122.193, 2640.0, 12500.0, 24.0, 0.0, 0.0] | [741973.6] | 0 |
503 | 2023-05-22 19:58:49.767 | [3.0, 0.75, 920.0, 20412.0, 1.0, 1.0, 2.0, 5.0, 6.0, 920.0, 0.0, 47.4781, -122.49, 1162.0, 54705.0, 64.0, 0.0, 0.0] | [338418.8] | 0 |
504 | 2023-05-22 19:58:49.767 | [4.0, 2.5, 2800.0, 246114.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2800.0, 0.0, 47.6586, -121.962, 2750.0, 60351.0, 15.0, 0.0, 0.0] | [765468.75] | 0 |
505 | 2023-05-22 19:58:49.767 | [2.0, 1.0, 1120.0, 9912.0, 1.0, 0.0, 0.0, 4.0, 6.0, 1120.0, 0.0, 47.3735, -122.43, 1540.0, 9750.0, 34.0, 0.0, 0.0] | [309800.75] | 0 |
506 rows × 4 columns
Scheduled Tasks
Scheduled tasks are run with the Orchestration run_scheduled
method. We’ll set it up to run every 5 minutes, then check the results.
It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.
task_start = datetime.datetime.now()
schedule = "*/5 * * * *"
task_scheduled = orchestration.run_scheduled(name="schedule example",
timeout=600,
schedule=schedule,
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": connection_name
})
while task_scheduled.status() != "started":
display(task_scheduled.status())
time.sleep(5)
task_scheduled
'pending'
Field | Value |
---|---|
ID | 4af57c61-dfa9-43eb-944e-559135495df4 |
Name | schedule example |
Last Run Status | unknown |
Type | Scheduled Run |
Active | True |
Schedule | */5 * * * * |
Created At | 2023-22-May 20:08:25 |
Updated At | 2023-22-May 20:08:25 |
time.sleep(420)
recent_run = task_scheduled.last_runs()[0]
display(recent_run.logs())
2023-22-May 20:11:02 Getting the workspace orchestrationworkspacetgiq
2023-22-May 20:11:02 Getting the pipeline orchestrationpipelinetgiq
2023-22-May 20:11:02 Inference time. Displaying results after.
2023-22-May 20:11:02 Getting arrow table file
2023-22-May 20:11:02 pyarrow.Table
2023-22-May 20:11:02 time: timestamp[ms]
2023-22-May 20:11:02 in.tensor: list not null
2023-22-May 20:11:02 child 0, item: float
2023-22-May 20:11:02 out.variable: list not null
2023-22-May 20:11:02 child 0, inner: float not null
2023-22-May 20:11:02 check_failures: int8
2023-22-May 20:11:02 ----
2023-22-May 20:11:02 time: [[2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,...,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271]]
2023-22-May 20:11:02 in.tensor: [[[4,2.5,2900,5505,2,...,2970,5251,12,0,0],[2,2.5,2170,6361,1,...,2310,7419,6,0,0],...,[3,1.75,2910,37461,1,...,2520,18295,47,0,0],[3,2,2005,7000,1,...,1750,4500,34,0,0]]]
2023-22-May 20:11:02 check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
2023-22-May 20:11:02 out.variable: [[[718013.75],[615094.56],...,[706823.56],[581003]]]
Kill a Task
Killing a task removes the schedule or removes it from a service. Tasks are killed with the Task kill()
method, and returns a message with the status of the kill procedure.
If necessary, all tasks can be killed through the following script.
- IMPORTANT NOTE: This command will kill all running tasks - scheduled or otherwise. Only use this if required.
# Kill all tasks
for t in wl.list_tasks(): t.kill()
When listed with Wallaroo client task_list(killed=True)
, the field active
displays tasks that are killed (False
) or either completed running or still scheduled to run (True
).
task_scheduled.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
wl.list_tasks(killed=True)
id | name | last run status | type | active | schedule | created at | updated at |
---|---|---|---|---|---|---|---|
4af57c61-dfa9-43eb-944e-559135495df4 | schedule example | success | Scheduled Run | False | */5 * * * * | 2023-22-May 20:08:25 | 2023-22-May 20:13:12 |
dc185e24-cf89-4a97-b6f0-33fc3d67da72 | schedule example | unknown | Scheduled Run | False | */5 * * * * | 2023-22-May 20:05:47 | 2023-22-May 20:06:22 |
f0e27d6a-6a98-4d26-b240-266f08560c48 | house price run once 2 | success | Temporary Run | True | - | 2023-22-May 19:58:32 | 2023-22-May 19:58:38 |
36509ef8-98da-42a0-913f-e6e929dedb15 | house price run once | success | Temporary Run | True | - | 2023-22-May 19:56:37 | 2023-22-May 19:56:48 |
Cleaning Up
With the tutorial complete we will undeploy the pipeline and ensure the resources are returned back to the Wallaroo instance.
pipeline.undeploy()