This can be downloaded as part of the Wallaroo Tutorials repository.
This tutorial provides a quick set of methods and examples regarding Wallaroo connections and AI Workload Automation from Wallaroo orchestrations. For full details, see the Wallaroo Documentation site.
Wallaroo provides Wallaroo connections and AI Workload Automation from Wallaroo orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.
The tutorial will demonstrate the following:
For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.
Here we’ll import the various libraries we’ll use for the tutorial.
import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
import pyarrow as pa
import time
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
# Login through local Wallaroo instance
wl = wallaroo.Client()
# Setting variables for later steps
workspace_name = f'simpleorchestrationworkspace'
pipeline_name = f'simpleorchestrationtutorial'
model_name = f'simpleorchestrationmodel'
model_file_name = './models/rf_model.onnx'
inference_connection_name = f'external_inference_connection_sample'
inference_connection_type = "HTTP"
inference_connection_argument = {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-automate/orchestration_sdk_simple_tutorial/data/xtest-1k.arrow'}
We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.
We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
pipeline = wl.build_pipeline(pipeline_name)
We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.
# Upload the model
housing_model_control = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX)
.configure(tensor_fields=["tensor"])
)
# Add the model as a pipeline step
pipeline.clear()
pipeline.add_model_step(housing_model_control)
name | simpleorchestrationtutorial |
---|---|
created | 2024-12-09 20:55:16.678469+00:00 |
last_updated | 2024-12-09 20:58:36.402309+00:00 |
deployed | True |
workspace_id | 11 |
workspace_name | simpleorchestrationworkspace |
arch | x86 |
accel | none |
tags | |
versions | ad484f85-d883-4b68-b570-ba9226cfd50e, 55a63232-8069-4800-80f6-1738cc1c83a8, 9c89d4a8-b42d-4381-a189-10cda5020de6, e97cd75a-72f9-486e-be03-accf47b9d111 |
steps | simpleorchestrationmodel |
published | False |
#deploy the pipeline
pipeline.deploy(wait_for_status=False)
Deployment initiated for simpleorchestrationtutorial. Please check pipeline status.
name | simpleorchestrationtutorial |
---|---|
created | 2024-12-09 20:55:16.678469+00:00 |
last_updated | 2024-12-09 20:58:40.614698+00:00 |
deployed | True |
workspace_id | 11 |
workspace_name | simpleorchestrationworkspace |
arch | x86 |
accel | none |
tags | |
versions | de61b33b-cb0d-46ae-aa24-5a51e5287d87, ad484f85-d883-4b68-b570-ba9226cfd50e, 55a63232-8069-4800-80f6-1738cc1c83a8, 9c89d4a8-b42d-4381-a189-10cda5020de6, e97cd75a-72f9-486e-be03-accf47b9d111 |
steps | simpleorchestrationmodel |
published | False |
# check the pipeline status before performing an inference
import time
while pipeline.status()['status'] != 'Running':
time.sleep(15)
pipeline.status()
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.28.0.3',
'name': 'engine-c6995ccd4-kbhd7',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'simpleorchestrationtutorial',
'status': 'Running',
'version': 'de61b33b-cb0d-46ae-aa24-5a51e5287d87'}]},
'model_statuses': {'models': [{'model_version_id': 5,
'name': 'simpleorchestrationmodel',
'sha': 'e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6',
'status': 'Running',
'version': 'd9ee6c33-8a50-41b6-92f6-5c1b6cdeec17'}]}}],
'engine_lbs': [{'ip': '10.28.0.4',
'name': 'engine-lb-6676794678-p8x2d',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
We will create the data source connection via the Wallaroo client command create_connection
.
Connections are created with the Wallaroo client command create_connection
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error. |
type | string (Required) | The user defined type of connection. |
details | Dict (Required) | User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339} , or {'token':'abcde123==', 'host':'example.com', 'port:1234'} , or other user defined combinations. |
name
as an existing data connection will result in an error.We’ll also create a data connection named inference_results_connection
with our helper function get_connection
that will either create or retrieve a connection if it already exists. From there we’ll create out connections:
houseprice_arrow_table
: An Apache Arrow file stored on GitHub that will be used for our inference input.wl.create_connection(inference_connection_name, inference_connection_type, inference_connection_argument)
Field | Value |
---|---|
Name | external_inference_connection_sample |
Connection Type | HTTP |
Details | ***** |
Created At | 2024-12-09T20:58:43.896309+00:00 |
Linked Workspaces | [] |
The Wallaroo client method get_connection(name)
retrieves the connection that matches the name
parameter. We’ll retrieve our connection and store it as inference_source_connection
.
inference_source_connection = wl.get_connection(name=inference_connection_name)
display(inference_source_connection)
Field | Value |
---|---|
Name | external_inference_connection_sample |
Connection Type | HTTP |
Details | ***** |
Created At | 2024-12-09T20:58:43.896309+00:00 |
Linked Workspaces | [] |
The method Workspace add_connection(connection_name)
adds a Data Connection to a workspace, and takes the following parameters.
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the Data Connection |
We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.
workspace.add_connection(inference_connection_name)
workspace.list_connections()
name | connection type | details | created at | linked workspaces |
---|---|---|---|---|
external_inference_connection_sample | HTTP | ***** | 2024-12-09T20:58:43.896309+00:00 | ['simpleorchestrationworkspace'] |
With the pipeline deployed and our connections set, we generate our Wallaroo orchestration. See the AI Workload Automation guide for full details.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. | |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
For our example, our orchestration will:
inference_results_connection
to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline.external_inference_connection
.This sample script is stored in remote_inference/main.py
with an empty requirements.txt
file, and packaged into the orchestration as ./remote_inference/remote_inference.zip
. We’ll display the steps in uploading the orchestration to the Wallaroo instance.
Note that the orchestration assumes the pipeline is already deployed.
Orchestrations are uploaded with the Wallaroo client upload_orchestration(path)
method with the following parameters.
Parameter | Type | Description |
---|---|---|
path | string (Required) | The path to the ZIP file to be uploaded. |
Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.
For this example, the orchestration ./remote_inference/remote_inference.zip
will be uploaded and saved to the variable orchestration
.
We will loop until the uploaded orchestration’s status
displays ready
.
orchestration = wl.upload_orchestration(path="./remote_inference/remote_inference.zip")
while orchestration.status() != 'ready' and orchestration.status() != 'error':
print(orchestration.status())
time.sleep(5)
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
26de20c7-4068-4f03-9ea1-d5bb534491e4 | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 19:48:45 | 2024-09-Dec 19:50:04 | 10 | simpleorchestrationworkspace2 |
5182f394-bf3f-4c8d-9ccc-f03ee56cbf1a | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 19:50:09 | 2024-09-Dec 19:51:08 | 10 | simpleorchestrationworkspace2 |
35eaff98-9683-4f76-bcd5-0b8f07f1651b | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 20:58:45 | 2024-09-Dec 20:59:31 | 11 | simpleorchestrationworkspace |
Another method to upload the orchestration is as a file object. For that, we will open the zip file as a binary, then upload it using the bytes_buffer
parameter to specify the file object, and the file_name
to give it a new name.
zipfile = open("./remote_inference/remote_inference.zip", "rb").read()
wl.upload_orchestration(bytes_buffer=zipfile, file_name="inferencetest.zip", name="uploadedbytesdemo")
Field | Value |
---|---|
ID | 962b2ba9-498c-4802-a04f-22438dcd99a0 |
Name | uploadedbytesdemo |
File Name | inferencetest.zip |
SHA | 4dd816a96203936dbca3386b6549736382886c84ff5ee4f480e6fb65b3df4e12 |
Status | pending_packaging |
Created At | 2024-09-Dec 20:59:37 |
Updated At | 2024-09-Dec 20:59:37 |
Workspace ID | 11 |
Workspace Name | simpleorchestrationworkspace |
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
26de20c7-4068-4f03-9ea1-d5bb534491e4 | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 19:48:45 | 2024-09-Dec 19:50:04 | 10 | simpleorchestrationworkspace2 |
5182f394-bf3f-4c8d-9ccc-f03ee56cbf1a | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 19:50:09 | 2024-09-Dec 19:51:08 | 10 | simpleorchestrationworkspace2 |
35eaff98-9683-4f76-bcd5-0b8f07f1651b | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 20:58:45 | 2024-09-Dec 20:59:31 | 11 | simpleorchestrationworkspace |
962b2ba9-498c-4802-a04f-22438dcd99a0 | uploadedbytesdemo | pending_packaging | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 20:59:37 | 2024-09-Dec 20:59:37 | 11 | simpleorchestrationworkspace |
Once an Orchestration has the status ready
, it can be run as a task. Tasks have three run options.
Type | SDK Call | How triggered |
---|---|---|
Once | orchestration.run_once(name, json_args, timeout) | Task runs once and exits. |
Scheduled | orchestration.run_scheduled(name, schedule, timeout, json_args) | User provides schedule. Task runs exits whenever schedule dictates. |
We’ll do both a Run Once task and generate our Run Once Task from our orchestration.
Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout)
method. Any arguments for the orchestration are passed in as a Dict
. If there are no arguments, then an empty set {}
is passed.
# Example: run once
import datetime
task_start = datetime.datetime.now()
task = orchestration.run_once(name="simpletaskdemo",
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": inference_connection_name
})
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks()
method. This returns an array list of the following.
Parameter | Type | Description |
---|---|---|
id | string | The UUID identifier for the task. |
last run status | string | The last reported status the task. Values are:
|
type | string | The type of the task. Values are:
|
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
For this example, the status of the previously created task will be generated, then looped until it has reached status started
.
while task.status() != "started":
display(task.status())
time.sleep(5)
'pending'
'pending'
We can view the inferences from our logs and verify that new entries were added from our task. We can do that with the task last_runs()
method to see the list of task runs executed, then show the log from the last completed task run.
task.last_runs()[0]
Field | Value |
---|---|
Task | 5b95265f-52b5-4324-9d77-8004f0d9da4e |
Pod ID | 83aa2d14-0f32-45cc-8dcf-7a2c264080a3 |
Status | success |
Created At | 2024-09-Dec 20:59:44 |
Updated At | 2024-09-Dec 20:59:44 |
task.last_runs()[0].logs()
(no logs yet)
The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args)
. This sets up a task to run on an regular schedule as defined by the schedule
parameter in the cron
service format. For example:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour.
The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.
schedule={'0 0 12 1-15 2 2024'}
For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill
task to keep the task from running any further.
It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.
scheduled_task = orchestration.run_scheduled(name="simple_inference_schedule",
schedule="*/5 * * * *",
timeout=120,
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": inference_connection_name
})
while scheduled_task.status() != "started":
display(scheduled_task.status())
time.sleep(5)
#wait 420 seconds to give the scheduled event time to finish
time.sleep(420)
scheduled_task.last_runs()[0].logs()
2024-09-Dec 21:05:02 WALLAROO: Setting up task run...
2024-09-Dec 21:05:05 WALLAROO: *** If you don't see any output, Python is probably buffering stdout. ***
2024-09-Dec 21:05:05 WALLAROO: *** Consider `print('...', flush=True)` or `sys.stdout.flush()` in your orchestration code. ***
2024-09-Dec 21:05:08 Getting the workspace simpleorchestrationworkspace
2024-09-Dec 21:05:05 WALLAROO: Task environment ready. Running...
2024-09-Dec 21:05:08 Getting the pipeline simpleorchestrationtutorial
2024-09-Dec 21:05:08 Getting arrow table file
2024-09-Dec 21:05:08 Inference time. Displaying results after.
2024-09-Dec 21:05:08 1 2024-12-09 21:05:08.782 ... 0
2024-09-Dec 21:05:08 0 2024-12-09 21:05:08.782 ... 0
2024-09-Dec 21:05:08 time ... anomaly.count
2024-09-Dec 21:05:08 2 2024-12-09 21:05:08.782 ... 0
2024-09-Dec 21:05:08 3 2024-12-09 21:05:08.782 ... 0
2024-09-Dec 21:05:08 4 2024-12-09 21:05:08.782 ... 0
2024-09-Dec 21:05:08
2024-09-Dec 21:05:08 [5 rows x 4 columns]
2024-09-Dec 21:05:09 WALLAROO: Task run complete. Status: success
With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
7736e7fc-0652-45ec-9b4c-45a647d38dc5 | simple_inference_schedule | success | Scheduled Run | True | */5 * * * * | 2024-09-Dec 20:59:52 | 2024-09-Dec 20:59:52 | 11 | simpleorchestrationworkspace |
5b95265f-52b5-4324-9d77-8004f0d9da4e | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 20:59:39 | 2024-09-Dec 20:59:51 | 11 | simpleorchestrationworkspace |
f7aa8740-d104-4eb9-a3b8-09f79b300d9f | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 19:50:11 | 2024-09-Dec 19:50:44 | 10 | simpleorchestrationworkspace2 |
scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
7736e7fc-0652-45ec-9b4c-45a647d38dc5 | simple_inference_schedule | success | Scheduled Run | True | */5 * * * * | 2024-09-Dec 20:59:52 | 2024-09-Dec 21:06:55 | 11 | simpleorchestrationworkspace |
5b95265f-52b5-4324-9d77-8004f0d9da4e | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 20:59:39 | 2024-09-Dec 20:59:51 | 11 | simpleorchestrationworkspace |
f7aa8740-d104-4eb9-a3b8-09f79b300d9f | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 19:50:11 | 2024-09-Dec 19:50:44 | 10 | simpleorchestrationworkspace2 |
With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.
pipeline.undeploy()
name | simpleorchestrationtutorial |
---|---|
created | 2024-12-09 20:55:16.678469+00:00 |
last_updated | 2024-12-09 20:58:40.614698+00:00 |
deployed | False |
workspace_id | 11 |
workspace_name | simpleorchestrationworkspace |
arch | x86 |
accel | none |
tags | |
versions | de61b33b-cb0d-46ae-aa24-5a51e5287d87, ad484f85-d883-4b68-b570-ba9226cfd50e, 55a63232-8069-4800-80f6-1738cc1c83a8, 9c89d4a8-b42d-4381-a189-10cda5020de6, e97cd75a-72f9-486e-be03-accf47b9d111 |
steps | simpleorchestrationmodel |
published | False |
The following examples show listing and retrieving orchestrations and tasks with additional filters.
List all orchestrations available across workspaces, then filtered by workspace id, then filtered by workspace name.
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
26de20c7-4068-4f03-9ea1-d5bb534491e4 | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 19:48:45 | 2024-09-Dec 19:50:04 | 10 | simpleorchestrationworkspace2 |
5182f394-bf3f-4c8d-9ccc-f03ee56cbf1a | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 19:50:09 | 2024-09-Dec 19:51:08 | 10 | simpleorchestrationworkspace2 |
35eaff98-9683-4f76-bcd5-0b8f07f1651b | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 20:58:45 | 2024-09-Dec 20:59:31 | 11 | simpleorchestrationworkspace |
962b2ba9-498c-4802-a04f-22438dcd99a0 | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 20:59:37 | 2024-09-Dec 21:00:30 | 11 | simpleorchestrationworkspace |
wl.list_orchestrations(workspace_id=workspace.id())
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
35eaff98-9683-4f76-bcd5-0b8f07f1651b | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 20:58:45 | 2024-09-Dec 20:59:31 | 11 | simpleorchestrationworkspace |
962b2ba9-498c-4802-a04f-22438dcd99a0 | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 20:59:37 | 2024-09-Dec 21:00:30 | 11 | simpleorchestrationworkspace |
wl.list_orchestrations(workspace_name=workspace_name)
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
35eaff98-9683-4f76-bcd5-0b8f07f1651b | None | ready | remote_inference.zip | 4dd816...df4e12 | 2024-09-Dec 20:58:45 | 2024-09-Dec 20:59:31 | 11 | simpleorchestrationworkspace |
962b2ba9-498c-4802-a04f-22438dcd99a0 | uploadedbytesdemo | ready | inferencetest.zip | 4dd816...df4e12 | 2024-09-Dec 20:59:37 | 2024-09-Dec 21:00:30 | 11 | simpleorchestrationworkspace |
Lists all tasks available across workspaces, then filtered by workspace id, then filtered by workspace name.
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
5b95265f-52b5-4324-9d77-8004f0d9da4e | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 20:59:39 | 2024-09-Dec 20:59:51 | 11 | simpleorchestrationworkspace |
f7aa8740-d104-4eb9-a3b8-09f79b300d9f | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 19:50:11 | 2024-09-Dec 19:50:44 | 10 | simpleorchestrationworkspace2 |
wl.list_tasks(workspace_id=workspace.id())
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
5b95265f-52b5-4324-9d77-8004f0d9da4e | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 20:59:39 | 2024-09-Dec 20:59:51 | 11 | simpleorchestrationworkspace |
wl.list_tasks(workspace_name=workspace_name)
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
5b95265f-52b5-4324-9d77-8004f0d9da4e | simpletaskdemo | success | Temporary Run | True | - | 2024-09-Dec 20:59:39 | 2024-09-Dec 20:59:51 | 11 | simpleorchestrationworkspace |