Inference Automation
Table of Contents
Orchestration Requirements
Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:
Parameter | Type | Description |
---|---|---|
User Code | (Required) Python 3.10 script as .py files | If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. If no main.py is found, the orchestration will not be accepted. |
Python Library Requirements | (Optional) requirements.txt file in the requirements file format. | A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed. |
Other artifacts | Other artifacts such as files, data, or code to support the orchestration. |
Zip Instructions
In a terminal with the zip
command, assemble artifacts as above and then create the archive. The zip
command is included by default with the Wallaroo JupyterHub service.
zip
commands take the following format, with {zipfilename}.zip
as the zip file to save the artifacts to, and each file thereafter as the files to add to the archive.
zip {zipfilename}.zip file1, file2, file3....
For example, the following command will add the files main.py
and requirements.txt
into the file hello.zip
.
$ zip hello.zip main.py requirements.txt
adding: main.py (deflated 47%)
adding: requirements.txt (deflated 52%)
Example requirements.txt file
dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4
Orchestration Recommendations
The following recommendations will make using Wallaroo orchestrations.
- The version of Python used should match the same version as in the Wallaroo JupyterHub service.
- The same version of the Wallaroo SDK should match the server. For a 2023.2.1 Wallaroo instance, use the Wallaroo SDK version 2023.2.1.
- Specify the version of
pip
dependencies. - The
wallaroo.Client
constructorauth_type
argument is ignored. Usingwallaroo.Client()
is sufficient. - The following methods will assist with orchestrations:
wallaroo.in_task()
: ReturnsTrue
if the code is running within an orchestration task.wallaroo.task_args()
: Returns aDict
of invocation-specific arguments passed to therun_
calls.
- Orchestrations will be run in the same way as running within the Wallaroo JupyterHub service, from the version of Python libraries (unless specifically overridden by the
requirements.txt
setting, which is not recommended), and running in the virtualized directory/home/jovyan/
.
Inference from Zero Scaled Deployments
For deployments that autoscale from 0 replicas, replica_autoscale_min_max
is set with minimum=0
and replicas scale down to zero when there is no utilization based on the autoscale parameters. When a new inference request is made, the first replica is scaled up. Once the first replica is ready, inference requests proceed as normal.
When inferencing in this scenario, a timeout may occur waiting for the first replica to spool up. To handle situations where an autoscale deployment scales down to zero replicas, the following code example provides a way to “wake up” the pipeline with an inference request which may use mock or real data. Once the first replica is fully spooled up, inference requests proceed at full speed.
Once deployed, we check the pipeline’s deployment status to verify it is running. If the pipeline is still scaling, the process waits 10 seconds to allow it to finish scaling before attempting the initial inference again. Once an inference completes successfully, the inferences proceed as normal.
# verify deployment has the status `Running`
while pipeline.status()["status"] != 'Running':
try:
# attempt the inference
pipeline.infer(dataframe)
except:
# if an exception is thrown, pass it
pass
# wait 10 seconds before attempting the inference again
time.sleep(10)
# when the inference passes successfully, continue with other inferences as normal
pipeline.infer(dataframe2)
pipeline.infer(dataframe3)
Orchestration Code Samples
The following demonstrates using the wallaroo.in_task()
and wallaroo.task_args()
methods within an Orchestration. This sample code uses wallaroo.in_task()
to verify whether or not the script is running as a Wallaroo Task. If true, it will gather the wallaroo.task_args()
and use them to set the workspace and pipeline. If False, then it sets the pipeline and workspace manually.
# get the arguments
wl = wallaroo.Client()
# if true, get the arguments passed to the task
if wl.in_task():
arguments = wl.task_args()
# arguments is a key/value pair, set the workspace and pipeline name
workspace_name = arguments['workspace_name']
pipeline_name = arguments['pipeline_name']
# False: We're not in a Task, so set the pipeline manually
else:
workspace_name="bigqueryworkspace"
pipeline_name="bigquerypipeline"
Wallaroo provides ML Workload Orchestrations and Tasks to automate processes in a Wallaroo instance. For example:
- Deploy a pipeline, retrieve data through a data connector, submit the data for inferences, undeploy the pipeline
- Replace a model with a new version
- Retrieve shadow deployed inference results and submit them to a database
Orchestration Flow
ML Workload Orchestration flow works within 3 tiers:
Tier | Description |
---|---|
ML Workload Orchestration | User created custom instructions that provide automated processes that follow the same steps every time without error. Orchestrations contain the instructions to be performed, uploaded as a .ZIP file with the instructions, requirements, and artifacts. |
Task | Instructions on when to run an Orchestration as a scheduled Task. Tasks can be Run Once, where is creates a single Task Run, or Run Scheduled, where a Task Run is created on a regular schedule based on the Kubernetes cronjob specifications. If a Task is Run Scheduled, it will create a new Task Run every time the schedule parameters are met until the Task is killed. |
Task Run | The execution of an task. These validate business operations are successful identify any unsuccessful task runs. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs. If the Task is Run Continuously, each Task Run will continue to execute until it either end successfully or terminates early. Once the current Task Run is ended, a new Task Run is generated within one minute of the previous Task Runs’s completion. |
One example may be of making donuts.
- The ML Workload Orchestration is the recipe.
- The Task is the order to make the donuts. It might be Run Once, so only one set of donuts are made, or Run Scheduled, so donuts are made every 2nd Sunday at 6 AM. If Run Scheduled, the donuts are made every time the schedule hits until the order is cancelled (aka killed).
- The Task Run are the donuts with their own receipt of creation (logs, etc).
Manage ML Workload Orchestrations via the Wallaroo Dashboard
Orchestrations and their Tasks and Task Runs are visible through the Wallaroo Dashboard through the following process.
IMPORTANT NOTE
If you do not see the Workloads link at the top of the Wallaroo Dashboard, check with your Wallaroo administrator to verify that ML Workloads are enabled. See the ML Workload Orchestration Configuration Guide for full details.Task User Interface Overview
From the Wallaroo Dashboard, select the workspace where the workloads were uploaded to.
From the upper right corner, select Workloads.
The list of uploaded ML Workload Orchestrations are displayed with the following:
- Search Bar (A): Filter the list by workload name.
- Status Filter (B): Filter the list by:
- Only Active: Only show Active workloads.
- Only Inactive: Only show Inactive workloads.
- Only Error: Only show workloads flagged with an Error.
- Workload (C): The assigned name of the workload.
- Status (D): Whether the workload orchestration status is Active, Inactive, or has an Error.
- Created At (E): The date the Orchestration was uploaded.
- ID (F): The unique workload orchestration ID in UUID format.
Select a workload orchestration to view Tasks generated from this workload. The Orchestration Page has the following details:
Orchestration Details:
- Orchestration Name (A): The name assigned when the workload was created from the workload orchestration.
- Orchestration ID (B): The ID of the workload in UUID format.
- Created (C): The date the workload was uploaded.
- File (D): The file the workload was uploaded from.
- File Hash (E): The hash of the workload file.
- Task and Task Runs buttons as detailed below.
Task: Each Task generated from the Orchestration as either a Run Once or Run Scheduled. Run Once tasks generate a single Task Run, then terminate. Run Scheduled tasks generate a Task Run each time the schedule pattern occurs. For example, a Run Scheduled task set to run every 1st of the month at 4 PM will have 12 Task Runs generated from it a year later.
- Type (F): Tasks are shown as either Run Once (a lightning bolt icon) or Run Scheduled (circular arrow icon).
- Task Name (G): The name of the task.
- Task ID (H): The ID of the task in UUID format.
- Run At (I): The last scheduled run, and for Run Scheduled tasks, the next scheduled run. The actual time of execution may depend on system load, time differences and other factors.
- Actions (J): Allows the task to be Stopped. If a Run Scheduled Task, this will stop any further Task Runs.
Task Run: Each Task generates one or more Task Runs with the following details.
Task Run (A): The name of the Task that generated the Task Run, including the symbol for Run Once (a lightning bolt icon) or Run Scheduled (circular arrow icon).
Task ID (B): The ID of the Task that generated the Task Run in UUID format.
Status (C): The status of the Task Run as either Success, Failure, or Timed_out.
Run At (D): The date and time the task previous ran, and for Run Scheduled tasks, the next scheduled run.
Task Run Logs (E): The logs of the Task Run that displays an abbreviated Task Run logs.
Create a Task
To create a task using the Workload Orchestration UI from the Orchestration Details page:
Select Create Task.
Add the following:
- Task Name: The name of the Task created from the Workload Orchestration.
- Task Type:
- Select Run immediately to generate a Run Once task.
- Select Run recurringly to generate a Run Scheduled task. For Run Scheduled tasks, select the Months, Days, and Times that the task it to run. For example, selecting only January 15 8:00 PM will only run on that date, while selecting January, March, May at 1, 7, 15, and 30 at 8:00 PM will generate a Task Run for each of those days that meet those conditions.
- Select Run indefinitely to generate a Run Continuously task. These launch a Task Run that executes until either complete or terminated, then launch another Task within one minute until the Task is killed.
- Advanced Settings:
- Json arguments: Tasks accept Json arguments that are used to modify how each tasks is run. See the Wallaroo SDK Essentials Guide: ML Workload Orchestration for details on creating a Workload Orchestration that accepts additional Task arguments.
- Timeout (sec): How long the Task should run before stopping. This is used to terminate tasks that may be in an infinite loop.
Manage ML Workload Orchestrations via the Wallaroo SDK
The following methods are provided for creating and listing orchestrations.
Orchestration Methods
Create Orchestration
An orchestration is created through the wallaroo.client.upload_orchestration(path)
with the following parameters.
For the uploads, either the path
to the .zip file is required, or bytes_buffer
with name
are required. path
can not be used with bytes_buffer
and name
, and vice versa.
Parameter | Type | Description |
---|---|---|
path | String (Optional) | The path to the .zip file that contains the orchestration package. Can not be use with bytes_buffer and name are used. |
file_name | String (Optional) | The file name to give to the zip file when uploaded. |
bytes_buffer | [bytes] (Optional) | The .zip file object to be uploaded. Can not be used with path . Note that if the zip file is uploaded as from the bytes_buffer parameter and file_name is not included, then the file name in the Wallaroo orchestrations list will be - . |
name | String (Optional) | Sets the name of the byte uploaded zip file. |
List Orchestrations
All orchestrations for a Wallaroo instances are listed via the Wallaroo Client list_orchestrations()
method. The orchestration list returned is based on the workspaces the user is a member of and the filtered parameters, in reverse chronological order.
Admin users have unrestricted access to all workspaces. For more details, see Wallaroo Enterprise User Management.
List Orchestrations Parameters
Parameter | Type | Description |
---|---|---|
workspace_id | (Int) (Optional) | The numerical identifier of the workspace to filter by. |
workspace_name | (String) (Optional) | The name of the workspace to filter by. |
List Orchestrations Returns
Returns a List of Orchestrations based on the filtered parameters. Each Orchestration includes the following fields.
Fields | Type | Description |
---|---|---|
id | String | The UUID identifier for the orchestration. |
last run status | String | The last reported status the task. Valid values are:
|
sha | String | The sha value of the uploaded orchestration. |
name | String | The name of the orchestration |
filename | String | The name of the uploaded orchestration file. |
created at | DateTime | The date and time the orchestration was uploaded to the Wallaroo instance. |
updated at | DateTime | The date and time a new version of the orchestration was uploaded. |
workspace_id | Int | The numerical identifier of the workspace the orchestration is associated with. |
workspace_name | String | The name of the workspace the orchestration is associated with. |
List Orchestration Errors
The errors for this method include:
- If the parameter
workspace_id
is not an integer. - If the parameter
workspace_name
is not a String.
List Orchestrations Examples
List all orchestrations available across workspaces, then filtered by workspace id, then filtered by workspace name.
wl.list_orchestrations()
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
d9fa4b3b-d4bf-4e45-8c1a-a4318e0869c1 | None | ready | remote_inference.zip | d8419b...259f19 | 2024-16-Jul 19:01:58 | 2024-16-Jul 19:03:00 | 28 | simpleorchestrationworkspace |
b4d4254e-58fc-432a-b98c-9e805415b913 | uploadedbytesdemo | ready | inferencetest.zip | d8419b...259f19 | 2024-16-Jul 19:03:04 | 2024-16-Jul 19:03:59 | 28 | simpleorchestrationworkspace |
b6333bc8-ae2e-4574-856f-7c63338edc47 | None | ready | remote_inference.zip | d8419b...259f19 | 2024-16-Jul 19:29:50 | 2024-16-Jul 19:30:44 | 30 | simpleorchestrationworkspace2 |
wl.list_orchestrations(workspace_id=30)
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
b6333bc8-ae2e-4574-856f-7c63338edc47 | None | ready | remote_inference.zip | d8419b...259f19 | 2024-16-Jul 19:29:50 | 2024-16-Jul 19:30:44 | 30 | simpleorchestrationworkspace2 |
wl.list_orchestrations(workspace_name="simpleorchestrationworkspace")
id | name | status | filename | sha | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|
d9fa4b3b-d4bf-4e45-8c1a-a4318e0869c1 | None | ready | remote_inference.zip | d8419b...259f19 | 2024-16-Jul 19:01:58 | 2024-16-Jul 19:03:00 | 28 | simpleorchestrationworkspace |
b4d4254e-58fc-432a-b98c-9e805415b913 | uploadedbytesdemo | ready | inferencetest.zip | d8419b...259f19 | 2024-16-Jul 19:03:04 | 2024-16-Jul 19:03:59 | 28 | simpleorchestrationworkspace |
Task Methods
Tasks are the implementation of an orchestration. Think of the orchestration as the instructions to follow, and the Task is the unit actually doing it.
Tasks are set at the workspace level.
Create Tasks
Tasks are created from an orchestration through the following methods.
Task Type | Orchestration Method | Description |
---|---|---|
Run Once | run_once | Create one Task Run and end when the Task Run is finished executing or until the timeout for the task is reached . |
Run Scheduled | run_scheduled | Based on a schedule, create a new Task Run until the Scheduled Task is terminated. Repeat every time the schedule pattern is fulfilled (every hour, every Tuesday at 2 PM, etc). Continue generating new Task Runs until the Run Scheduled Task is terminated. |
Run Continuously | run_continuously | Generate a Task Run and continue running that task. These tasks typically have a repeating loop that continues to run until the Run Continuously Task is issued the kill command. |
Tasks have the following parameters.
Parameter | Type | Description |
---|---|---|
id | String | The UUID identifier for the task. |
last run status | String | The last reported status the task. Values are:
|
type | String | The type of the task. Values are:
|
active | Boolean | True : The task is scheduled or running. False : The task has completed or has been issued the kill command. |
schedule | String | The cron style schedule for the task. If the task is not a scheduled one, then the schedule will be - . |
created at | DateTime | The date and time the task was started. |
updated at | DateTime | The date and time the task was updated. |
Run Task Once
Temporary Run tasks are created from the Orchestration run_once(name, json_args, timeout)
with the following parameters.
Parameter | Type | Description |
---|---|---|
name | String (Required) | The designated name of the task. |
json_args | Dict (Required) | Arguments for the orchestration, such as { "dogs": 3.9, "cats": 8.1} |
timeout | int (Optional) | Timeout period in seconds. |
task = orchestration.run_once(name="house price run once 2", json_args={"workspace_name": workspace_name,
"pipeline_name":pipeline_name,
"connection_name": connection_name
}
)
task
Field | Value |
---|---|
ID | f0e27d6a-6a98-4d26-b240-266f08560c48 |
Name | house price run once 2 |
Last Run Status | unknown |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2023-22-May 19:58:32 |
Updated At | 2023-22-May 19:58:32 |
Run Task Scheduled
A task can be scheduled via the Orchestration run_scheduled
method.
Scheduled tasks are run every time the schedule period is met. This uses the same settings as the cron
utility.
Scheduled tasks include the following parameters.
Parameter | Type | Description |
---|---|---|
name | String (Required) | The name of the task. |
schedule | String (Required) | Schedule in the cron format of: hour, minute, day_of_week, day_of_month, month . |
timeout | int (Optional) | Timeout period in seconds. |
json_args | Dict (Required) | Arguments for the task, such as { "dogs": 3.9, "cats": 8.1} |
The schedule uses the same method as the cron
service. For example, the following schedule:
schedule={'42 * * * *'}
Runs on the 42nd minute of every hour. The following schedule:
schedule={'00 1 * * 0'}
Indicates “At 1:00 AM on Sunday.”
For a shortcut in creating cron formatted schedules, see sites such as the Cron expression generator by Cronhub.
task_scheduled = orchestration.run_scheduled(name="schedule example",
timeout=600,
schedule=schedule,
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": connection_name
})
task_scheduled
Field | Value |
---|---|
ID | 4af57c61-dfa9-43eb-944e-559135495df4 |
Name | schedule example |
Last Run Status | unknown |
Type | Scheduled Run |
Active | True |
Schedule | */5 * * * * |
Created At | 2023-22-May 20:08:25 |
Updated At | 2023-22-May 20:08:25 |
Run Task Continuously
Run Continuously Tasks are launched via the wallaroo.Orchestration.run_continuously
method. These tasks continue to run until the kill task command is issued. Typical use cases include:
- Polling a data store for new records, and use those records for a new inference request.
- Check inference results for anomalies on a regular basis and send a message to interested parties.
The main difference between Run Scheduled and Run Continuously Tasks is that Run Scheduled generates a new Task Run each time a scheduled pattern is met, with each Task Run having its own Task Run Logs.
A Run Continuously Task is not bound by schedule, but on Task Run execution and completion. When a Run Continuously Task is launched:
- A new Task Run is generated.
- The Task Run will execute until either successful completion or failure (for example: an uncaught exception causes the script to terminate).
- Within one minute of the script’s completion or termination, a new Task Run is generated. Compare this with a Scheduled Task, which only generates a new Task Run based on the schedule.
- Each separate Task Run has its own logs.
- This cycle continues until the the kill task command is issued to the Run Continuously Task.
This means that if a Run Continuously Task has an infinite loop that continues to run without terminating (for example: polling a database for new records every 5 minutes), only one Task Run is generated until the kill
command is issued.
The following are two examples of an Orchestration and how they Run Continuously Tasks behave.
The following script executes and completes without any loops, and each time it completes, a new Task Run is generated with its own Task Run Logs.
import wallaroo
import pandas as pd
import pyarrow as pa
import requests
import time
# create a Wallaroo client
wl = wallaroo.Client()
# get the arguments
arguments = wl.task_args()
if "workspace_name" in arguments:
workspace_name = arguments['workspace_name']
else:
workspace_name="simpleorchestrationworkspace"
if "pipeline_name" in arguments:
pipeline_name = arguments['pipeline_name']
else:
pipeline_name="simpleorchestrationpipeline"
if "connection_name" in arguments:
connection_name = arguments['connection_name']
else:
connection_name = "external_inference_connection"
# print the parameters
print(workspace_name)
print(pipeline_name)
print(connection_name)
The following script will loop forever, generating only one Task Run, since the script never completes.
import wallaroo
import pandas as pd
import pyarrow as pa
import requests
import time
wl = wallaroo.Client()
# Setting variables for later steps
# get the arguments
arguments = wl.task_args()
if "workspace_name" in arguments:
workspace_name = arguments['workspace_name']
else:
workspace_name="simpleorchestrationworkspace"
if "pipeline_name" in arguments:
pipeline_name = arguments['pipeline_name']
else:
pipeline_name="simpleorchestrationpipeline"
if "connection_name" in arguments:
connection_name = arguments['connection_name']
else:
connection_name = "external_inference_connection"
while True:
print(workspace_name)
print(pipeline_name)
print(connection_name)
# wait 60 seconds then print again
time.sleep(60)
A Continuous Task is created via the Orchestration run_continuous
method and includes the following parameters.
Parameter | Type | Description |
---|---|---|
name | String (Required) | The name of the task. |
json_args | Dict (Required) | Arguments for the task, such as { "dogs": 3.9, "cats": 8.1} |
The following example generates a new Run Continuously Task and passes a set of arguments for the Task via the json_args
parameter.
task_continuously = orchestration.run_continuously(name=sample_task_name,
json_args={"workspace_name": workspace_name,
"pipeline_name": pipeline_name,
"connection_name": connection_name
})
display(task_continuously)
ID | b2053205-1706-4548-be35-e7dbf59cdb5e |
Name | continuoustaskdemo |
Last Run Status | running |
Type | Temporary Run |
Active | True |
Schedule | - |
Created At | 2024-24-Sep 18:01:33 |
Updated At | 2024-24-Sep 18:01:33 |
Workspace ID | 9 |
Workspace Name | continuoustutorial |
Run Continuously Tasks Best Practices
The following best practices are recommended for Run Continuously Tasks.
- For faster Task Run Log updates, flush Python Buffers: Outputs from commands including
print
typically do not appear in the Task Run Logs until the Python buffers are flushed, usually when the Python script completes. This allows the script to perform at peak performance. To have outputs show up in the Task Run logs faster for debugging or other purposes:- Flushing the output immediately. For example,
print('sample', flush=True)
. - For
helm
based installations of Wallaroo, shorten thedbcleaner.schedule
so the outputs are flushed sooner.
- Flushing the output immediately. For example,
- Set log storage: By default, Task Run Logs are preserved for running Run Continuously Tasks for 30 days. For
helm
based installation of Wallaroo, this is adjusted through thedbcleaner.maxAgeDays
setting.
Tasks with Inference from Zero Scaled Deployments
For deployments that autoscale from 0 replicas, replica_autoscale_min_max
is set with minimum=0
and replicas scale down to zero when there is no utilization for 60 minutes. When a new inference request is made, the first replica is scaled up. Once the first replica is ready, inference requests proceed as normal.
When inferencing in this scenario, a timeout may occur waiting for the first replica to finish spooling up. To handle situations where an autoscale deployment scales down to zero replicas, the following code example provides a way to “wake up” the pipeline with an inference request which may use mock or real data. Once the first replica is fully spooled up, inference requests proceed at full speed.
The following deploys a pipeline with 4 cpus and 3 GB RAM per replica, with the autoscale set between 0
and 5
replicas.
Once deployed, we check the pipeline’s deployment status to verify it is running. If the pipeline is still scaling, the process waits 10 seconds to allow it to finish scaling. Once an inference completes successfully, the inferences proceed as normal.
# deployment configuration with autoscaling between 0 and 5 replicas
deployment_configuration = wallaroo.DeploymentConfigBuilder()
.autoscale_cpu_utilization(75)
.cpus(4)
.memory('3Gi')
.replica_autoscale_min_max(minimum=0, maximum=5)
.build()
# deployment with the deployment configuration
pipeline.deploy(deployment_configuration)
# verify deployment has the status `Running`
while pipeline.status()["status"] != 'Running':
try:
# attempt the inference
pipeline.infer(dataframe)
except:
# if an exception is thrown, pass it
pass
# wait 10 seconds before attempting the inference again
time.sleep(10)
# when the inference passes successfully, continue with other inferences as normal
pipeline.infer(dataframe2)
pipeline.infer(dataframe3)
...
List Tasks
The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks()
method. The task list returned is based on the workspaces the user is a member of and the filtered parameters, in reverse chronological order.
Admin users have unrestricted access to all workspaces. For more details, see Wallaroo Enterprise User Management.
List Tasks Parameters
Parameter | Type | Description |
---|---|---|
killed | Boolean (Optional Default: False ) | Returns tasks depending on whether they have been issued the kill command. False returns all tasks whether killed or not. True only returns killed tasks. |
workspace_id | (Int) (Optional) | The numerical identifier of the workspace to filter by. |
workspace_name | (String) (Optional) | The name of the workspace to filter by. |
List Tasks Returns
A List of tasks based on the filtered parameters with the following fields per task:
Field | Type | Description |
---|---|---|
id | Integer | The numerical identifier of the task. |
name | String | The name of the task. |
last run status | String | The status of the most recent run task. |
type | String | The type of task - Run Once or Run Scheduled. |
active | Boolean | Whether the task is active or killed. |
schedule | String | If a Run Scheduled task, the schedule for the task in cron format. |
created at | String | The status of the most recent run task. |
created at | Datetime | The DateTime the task was created. |
updated at | Datetime | The DateTime the task was last updated. |
workspace id | Integer | The numerical id of the workspace the task is associated with. |
workspace name | String | The name of the workspace the task is associated with. |
List Tasks Errors
The errors for this method include:
- If the parameter
workspace_id
is not an integer. - If the parameter
workspace_name
is not a String.
List Tasks Examples
Lists all tasks available across workspaces, then filtered by workspace id, then filtered by workspace name.
wl.list_tasks()
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
e44070f4-2638-4778-9d87-b13d457181ec | simpletaskdemo | running | Temporary Run | True | - | 2024-16-Jul 19:31:38 | 2024-16-Jul 19:31:44 | 30 | simpleorchestrationworkspace2 |
5cd594fe-36fd-4db5-9000-8b090a8fa9e3 | simple_inference_schedule | running | Scheduled Run | True | */5 * * * * | 2024-16-Jul 19:18:07 | 2024-16-Jul 19:18:08 | 28 | simpleorchestrationworkspace |
2de50c93-dbe3-45af-ae9d-657540275405 | simpletaskdemo | success | Temporary Run | True | - | 2024-16-Jul 19:15:47 | 2024-16-Jul 19:17:39 | 28 | simpleorchestrationworkspace |
01e13d2e-a402-4b43-b790-ab76148bba51 | simpletaskdemo | failure | Temporary Run | True | - | 2024-16-Jul 19:03:05 | 2024-16-Jul 19:03:32 | 28 | simpleorchestrationworkspace |
wl.list_tasks(workspace_id=30)
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
e44070f4-2638-4778-9d87-b13d457181ec | simpletaskdemo | failure | Temporary Run | True | - | 2024-16-Jul 19:31:38 | 2024-16-Jul 19:31:49 | 30 | simpleorchestrationworkspace2 |
wl.list_tasks(workspace_name="simpleorchestrationworkspace")
id | name | last run status | type | active | schedule | created at | updated at | workspace id | workspace name |
---|---|---|---|---|---|---|---|---|---|
5cd594fe-36fd-4db5-9000-8b090a8fa9e3 | simple_inference_schedule | running | Scheduled Run | True | */5 * * * * | 2024-16-Jul 19:18:07 | 2024-16-Jul 19:18:08 | 28 | simpleorchestrationworkspace |
2de50c93-dbe3-45af-ae9d-657540275405 | simpletaskdemo | success | Temporary Run | True | - | 2024-16-Jul 19:15:47 | 2024-16-Jul 19:17:39 | 28 | simpleorchestrationworkspace |
01e13d2e-a402-4b43-b790-ab76148bba51 | simpletaskdemo | failure | Temporary Run | True | - | 2024-16-Jul 19:03:05 | 2024-16-Jul 19:03:32 | 28 | simpleorchestrationworkspace |
Get Task Status
The status of a task is retrieved through the Task status()
method and returns the following.
Parameter | Type | Description |
---|---|---|
status | String | The current status of the task. Values are:
|
display(task2.status())
'started'
Kill a Task
Killing a task removes the schedule or removes it from a service. Tasks are killed with the Task kill()
method, and returns a message with the status of the kill procedure.
Killed tasks have the following effects:
- Any active Task Runs are halted.
- All Task Runs Logs generated from any Task Runs created by the Task are deleted.
Note that a Task set to Run Scheduled will generate a new Task Run each time the schedule parameters are met until the Task is killed. A Task set to Run Once will generate only one Task Run, and a Run Continuously Task will only generate new Task Runs if the current Task Run either terminates or completes successfully. In all three cases, any active Task Runs are terminated and their associated Task Run Logs are deleted.
task2.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
Task Runs
Task Runs are generated from a Task.
- Run Once Tasks generated only one Task Run whether it completes successfully or terminates early.
- Run Once Tasks generate a new Task Run each time the schedule parameters are met, with each Task Run having its own results and logs. New Task Runs are generated whether the previous Task Runs completed or not.
- Run Continuously Tasks generate a new Task Run within one minute of the most recent Task Run completes either successfully or terminates early.
Task Last Runs History
The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs
method that takes the following arguments.
Parameter | Type | Description |
---|---|---|
status | String (Optional *Default: all ) | Filters the task history by the status . If all , returns all statuses. Status values are:
|
limit | Integer (Optional) | Limits the number of task runs returned. |
This returns the following in reverse chronological order by updated at
.
Parameter | Type | Description |
---|---|---|
task id | String | Task id in UUID format. |
pod id | String | Pod id in UUID format. |
status | String | Status of the task. Status values are:
|
created at | DateTime | Date and time the task was created at. |
updated at | DateTime | Date and time the task was updated. |
task.last_runs()
task id | pod id | status | created at | updated at |
---|---|---|---|---|
f0e27d6a-6a98-4d26-b240-266f08560c48 | 7d9d73d5-df11-44ed-90c1-db0e64c7f9b8 | success | 2023-22-May 19:58:35 | 2023-22-May 19:58:35 |
Task Run Logs
The output of a task is displayed with the Task Run logs()
method that takes the following parameters.
Parameter | Type | Description |
---|---|---|
limit | Integer (Optional) | Limits the lines returned from the task run log. The limit parameter is based on the log tail - starting from the last line of the log file, then working up until the limit of lines is reached. This is useful for viewing final outputs, exceptions, etc. |
The Task Run logs()
returns the log entries as a string list, with each entry as an item in the list.
- IMPORTANT NOTE: It may take around a minute for task run logs to be integrated into the Wallaroo log database.
# give time for the task to complete and the log files entered
time.sleep(60)
recent_run = task.last_runs()[0]
display(recent_run.logs())
2023-22-May 19:59:29 Getting the pipeline orchestrationpipelinetgiq
2023-22-May 19:59:29 Getting arrow table file
2023-22-May 19:59:29 Inference time. Displaying results after.
2023-22-May 19:59:29 pyarrow.Table
2023-22-May 19:59:29 time: timestamp[ms]
2023-22-May 19:59:29 in.tensor: list<item: float> not null
2023-22-May 19:59:29 child 0, item: float
2023-22-May 19:59:29 out.variable: list<inner: float not null> not null
2023-22-May 19:59:29 anomaly.count: int8
2023-22-May 19:59:29 child 0, inner: float not null
2023-22-May 19:59:29 ----
2023-22-May 19:59:29 time: [[2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,...,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767]]
2023-22-May 19:59:29 in.tensor: [[[4,2.5,2900,5505,2,...,2970,5251,12,0,0],[2,2.5,2170,6361,1,...,2310,7419,6,0,0],...,[3,1.75,2910,37461,1,...,2520,18295,47,0,0],[3,2,2005,7000,1,...,1750,4500,34,0,0]]]
2023-22-May 19:59:29 check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
2023-22-May 19:59:29 out.variable: [[[718013.75],[615094.56],...,[706823.56],[581003]]]</code></pre>