Wallaroo provides AI Workload Automation in the form of Wallaroo orchestrations and tasks to automate processes in a Wallaroo instance. For example:
AI Workload Automation flow works within 3 tiers:
Tier | Description |
---|---|
Orchestration | User created custom instructions that provide automated processes that follow the same steps every time without error. Orchestrations contain the instructions to be performed, uploaded as a .ZIP file with the instructions, requirements, and artifacts. |
Task | Instructions on when to run an Orchestration as a scheduled Task. Tasks can be Run Once, where is creates a single Task Run, or Run Scheduled, where a Task Run is created on a regular schedule based on the Kubernetes cronjob specifications. If a Task is Run Scheduled, it will create a new Task Run every time the schedule parameters are met until the Task is killed. |
Task Run | The execution of an task. These validate business operations are successful identify any unsuccessful task runs. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs. If the Task is Run Continuously, each Task Run will continue to execute until it either end successfully or terminates early. Once the current Task Run is ended, a new Task Run is generated within one minute of the previous Task Runs’s completion. |
One example may be of making donuts.
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python 3.10 script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. If no main.py is found, the orchestration will not be accepted. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. | A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
In a terminal with the zip
command, assemble artifacts as above and then create the archive. The zip
command is included by default with the Wallaroo JupyterHub service.
zip
commands take the following format, with {zipfilename}.zip
as the zip file to save the artifacts to, and each file thereafter as the files to add to the archive.
zip {zipfilename}.zip file1, file2, file3....
For example, the following command will add the files main.py
and requirements.txt
into the file hello.zip
.
$ zip hello.zip main.py requirements.txt
adding: main.py (deflated 47%)
adding: requirements.txt (deflated 52%)
dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4
The following recommendations are best practices for Wallaroo orchestrations.
pip
dependencies.wallaroo.Client
constructor auth_type
argument is ignored. Using wallaroo.Client()
is sufficient.wallaroo.in_task()
: Returns True
if the code is running within an orchestration task.wallaroo.task_args()
: Returns a Dict
of invocation-specific arguments passed to the run_
calls.requirements.txt
setting, which is not recommended), and running in the virtualized directory /home/jovyan/
.For deployments that autoscale from 0 replicas, replica_autoscale_min_max
is set with minimum=0
and replicas scale down to zero when there is no utilization based on the autoscale parameters. When a new inference request is made, the first replica is scaled up. Once the first replica is ready, inference requests proceed as normal.
When inferencing in this scenario, a timeout may occur waiting for the first replica to spool up. To handle situations where an autoscale deployment scales down to zero replicas, the following code example provides a way to “wake up” the pipeline with an inference request which may use mock or real data. Once the first replica is fully spooled up, inference requests proceed at full speed.
Once deployed, we check the pipeline’s deployment status to verify it is running. If the pipeline is still scaling, the process waits 10 seconds to allow it to finish scaling before attempting the initial inference again. Once an inference completes successfully, the inferences proceed as normal.
# verify deployment has the status `Running`
while pipeline.status()["status"] != 'Running':
try:
# attempt the inference
pipeline.infer(dataframe)
except:
# if an exception is thrown, pass it
pass
# wait 10 seconds before attempting the inference again
time.sleep(10)
# when the inference passes successfully, continue with other inferences as normal
pipeline.infer(dataframe2)
pipeline.infer(dataframe3)
The following demonstrates using the wallaroo.in_task()
and wallaroo.task_args()
methods within an Orchestration. This sample code uses wallaroo.in_task()
to verify whether or not the script is running as a Wallaroo Task. If true, it will gather the wallaroo.task_args()
and use them to set the workspace and pipeline. If False, then it sets the pipeline and workspace manually.
# get the arguments
wl = wallaroo.Client()
# if true, get the arguments passed to the task
if wl.in_task():
arguments = wl.task_args()
# arguments is a key/value pair, set the workspace and pipeline name
workspace_name = arguments['workspace_name']
pipeline_name = arguments['pipeline_name']
# False: We're not in a Task, so set the pipeline manually
else:
workspace_name="bigqueryworkspace"
pipeline_name="bigquerypipeline"
Uploads an orchestration to the Wallaroo workspace.
/v1/api/orchestration/upload
application/octet-stream
.workspace_id
as Content-Type application/json
.# retrieve the authorization token
headers = wl.auth.auth_header()
url = f"{APIURL}/v1/api/orchestration/upload"
fp = open("./api_inference_orchestration.zip", "rb")
metadata = {
"workspace_id": workspace_id
}
response = requests.post(
url,
headers=headers,
files=[("file", ("api_inference_orchestration.zip", fp, "application/octet-stream")),
("metadata", ("metadata", '{"workspace_id": 5}', "application/json"))],
).json()
display(response)
orchestration_id = response['id']
{'id': 'fd823818-91cf-4d78-9ec2-f74faa1a05f3'}
Uploads an orchestration to the Wallaroo workspace.
/v1/api/orchestration/list
Tasks are the implementation of an orchestration. Think of the orchestration as the instructions to follow, and the Task is the the order to implement it based on some parameters, and the Task Run is the actual execution.
Task Runs are generated from a Task.
Tasks are created at the workspace level.
Tasks are created from an orchestration through the following methods.
Task Type | Description |
---|---|
/v1/api/task/run_once | Generates only one Task Run whether it completes successfully or terminates early. |
/v1/api/task/run_scheduled | Generates a new Task Run each time the schedule parameters are met, with each Task Run having its own results and logs. New Task Runs are generated whether the previous Task Runs completed or not. |
/v1/api/task/run_continuously | Generate a new Task Run within one minute of the most recent Task Run completes either successfully or terminates early. |
Run Once aka Temporary Run tasks are created from an Orchestration with the request:
/v1/api/task/run_once
.id
of the orchestration to create the task from.id
of the workspace to create the task within.# retrieve the authorization token
headers = wl.auth.auth_header()
data = {
"workspace_id": workspace_id,
"orch_id": orchestration_id,
"json": {}
}
url=f"{APIURL}/v1/api/task/run_once"
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'id': '1cb1b550-384f-4476-8619-03e0aca71409'}
Run Scheduled tasks are created from an Orchestration with the request:
/v1/api/task/run_scheduled
.id
of the orchestration to create the task from.cron
format of: hour, minute, day_of_week, day_of_month, month
.id
of the workspace to create the task within.id
: The task id in UUID format.# retrieve the authorization token
headers = wl.auth.auth_header()
data = {
"workspace_id": workspace_id,
"orch_id": orchestration_id,
"schedule": "*/1 * * * *",
"json": {}
}
url=f"{APIURL}/v1/api/task/run_scheduled"
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'id': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599'}
Run Continuously Tasks are created from an Orchestration with the request:
/v1/api/task/run_continuously
id
of the orchestration to create the task from.id
of the workspace to create the task within.id
: The task id in UUID format.# retrieve the authorization token
headers = wl.auth.auth_header()
data = {
"name": "sample continuous task",
"workspace_id": workspace_id,
"orch_id": orchestration_id,
"json": {}
}
url=f"{APIURL}/v1/api/task/run_continuously"
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'id': '3823b4e3-1029-4948-8227-f6f34a800d0c'}
Retrieve the task by its id
.
/v1/api/task/get_by_id
.id
of the task to retrieve.pending
, started
, and failed
.kill
request.run_id
, status
, and created_at
.# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_by_id"
data = {
"id": task_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'name': None,
'id': '1cb1b550-384f-4476-8619-03e0aca71409',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3228',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_ID': '1cb1b550-384f-4476-8619-03e0aca71409'},
'auth_init': True,
'workspace_id': 5,
'flavor': 'exec_orch_oneshot',
'reap_threshold_secs': 900,
'exec_type': 'job',
'status': 'pending',
'input_data': {},
'killed': False,
'created_at': '2023-05-16T15:59:57.496421+00:00',
'updated_at': '2023-05-16T15:59:57.496421+00:00',
'last_runs': []}
Tasks tied to the same orchestration are retrieved through the following request.
/v1/api/task/get_tasks_by_orch_sha
ids
: List[string] List of tasks by UUID.# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_tasks_by_orch_sha"
data = {
"sha": orchestration_sha
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'ids': ['2424a9a7-2331-42f6-bd84-90643386b130',
'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
'a41fe4ae-b8a4-4e1f-a45a-114df64ae2bc']}
Retrieve all tasks in a workspace.
/v1/api/task/list
.pending
, started
, and failed
.kill
request.run_id
, status
, and created_at
.# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/list"
data = {
"workspace_id": workspace_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
[{'name': None,
'id': '33a2b093-b34f-46f1-9c59-7c4d7c0f3188',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3228',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_ID': '33a2b093-b34f-46f1-9c59-7c4d7c0f3188'},
'auth_init': True,
'workspace_id': 5,
'flavor': 'exec_orch_oneshot',
'reap_threshold_secs': 900,
'exec_type': 'job',
'status': 'started',
'input_data': {},
'killed': False,
'created_at': '2023-05-16T14:23:56.062627+00:00',
'updated_at': '2023-05-16T14:24:06.556855+00:00',
'last_runs': [{'run_id': 'da1ffa75-e3c4-40bc-9bac-001f430c61d2',
'status': 'success',
'created_at': '2023-05-16T14:24:02.364998+00:00'}]
}
]
Killed tasks have the following effects:
Note that a Task set to Run Scheduled will generate a new Task Run each time the schedule parameters are met until the Task is killed. A Task set to Run Once will generate only one Task Run, and a Run Continuously Task will only generate new Task Runs if the current Task Run either terminates or completes successfully. In all three cases, any active Task Runs are terminated and their associated Task Run Logs are deleted.
/v1/api/task/kill
.id
of th task.pending
, started
, and failed
.kill
request.run_id
, status
, and created_at
.# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/kill"
data = {
"id": scheduled_task_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'name': None,
'id': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599',
'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
'image_tag': 'v2023.2.0-main-3228',
'bind_secrets': ['minio'],
'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
'TASK_ID': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599'},
'auth_init': True,
'workspace_id': 5,
'schedule': '*/1 * * * *',
'reap_threshold_secs': 900,
'status': 'pending_kill',
'input_data': {},
'killed': False,
'created_at': '2023-05-16T16:01:15.947982+00:00',
'updated_at': '2023-05-16T16:01:16.456502+00:00',
'last_runs': [{'run_id': '3ff90356-b7b3-4909-b5c0-963acd19f3f5',
'status': 'success',
'created_at': '2023-05-16T16:02:02.159514+00:00'}]}
Task Runs are generated from a Task. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs. If the Task is Run Continuously, each Task Run will continue to execute until it either end successfully or terminates early. Once the current Task Run is ended, a new Task Run is generated within one minute of the previous Task Runs’s completion.
The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs
method that takes the following arguments. It returns the reverse chronological order of tasks runs listed by updated_at
.
/v1/api/task/list_task_runs
status
. If all
, returns all statuses. Status values are:running
: The task has started.failure
: The task failed.success
: The task completed.# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/list_task_runs"
data = {
"task_id": task_id
}
response=requests.post(url, headers=headers, json=data).json()
task_run_id = response[0]['run_id']
display(response)
[{'task': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
'run_id': '96a7f85f-e30c-40b5-9185-0dee5bd1a15e',
'status': 'success',
'created_at': '2023-05-22T21:08:33.112805+00:00',
'updated_at': '2023-05-22T21:08:33.112805+00:00'}]
Logs for a task run are retrieved through the following process.
/v1/api/task/get_logs_for_run
# retrieve the authorization token
headers = wl.auth.auth_header()
url=f"{APIURL}/v1/api/task/get_logs_for_run"
data = {
"id": task_run_id
}
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2023-05-22T21:09:17.683428502Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
'2023-05-22T21:09:17.683489102Z stdout F Getting the workspace apiorchestrationworkspacegsze',
'2023-05-22T21:09:17.683497403Z stdout F Getting the pipeline apipipelinegsze',
'2023-05-22T21:09:17.683504003Z stdout F Deploying the pipeline.',
'2023-05-22T21:09:17.683510203Z stdout F Performing sample inference.',
'2023-05-22T21:09:17.683516203Z stdout F time ... check_failures',
'2023-05-22T21:09:17.683521903Z stdout F 0 2023-05-22 21:08:37.779 ... 0',
'2023-05-22T21:09:17.683527803Z stdout F ',
'2023-05-22T21:09:17.683533603Z stdout F [1 rows x 4 columns]',
'2023-05-22T21:09:17.683540103Z stdout F Undeploying the pipeline']}