Wallaroo MLOps API Essentials Guide: ML Workload Orchestration Management

How to use the Wallaroo API for ML Workload Orchestration Management

Table of Contents

Wallaroo provides ML Workload Orchestrations and Tasks to automate processes in a Wallaroo instance. For example:

  • Deploy a pipeline, retrieve data through a data connector, submit the data for inferences, undeploy the pipeline
  • Replace a model with a new version
  • Retrieve shadow deployed inference results and submit them to a database

Orchestration Flow

ML Workload Orchestration flow works within 3 tiers:

TierDescription
ML Workload OrchestrationUser created custom instructions that provide automated processes that follow the same steps every time without error. Orchestrations contain the instructions to be performed, uploaded as a .ZIP file with the instructions, requirements, and artifacts.
TaskInstructions on when to run an Orchestration as a scheduled Task. Tasks can be Run Once, where is creates a single Task Run, or Run Scheduled, where a Task Run is created on a regular schedule based on the Kubernetes cronjob specifications. If a Task is Run Scheduled, it will create a new Task Run every time the schedule parameters are met until the Task is killed.
Task RunThe execution of an task. These validate business operations are successful identify any unsuccessful task runs. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs.
Wallaroo Components

One example may be of making donuts.

  • The ML Workload Orchestration is the recipe.
  • The Task is the order to make the donuts. It might be Run Once, so only one set of donuts are made, or Run Scheduled, so donuts are made every 2nd Sunday at 6 AM. If Run Scheduled, the donuts are made every time the schedule hits until the order is cancelled (aka killed).
  • The Task Run are the donuts with their own receipt of creation (logs, etc).

Orchestration Requirements

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint. If no main.py is found, the orchestration will not be accepted.
Python Library Requirements(Optional) requirements.txt file in the requirements file format.A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

Zip Instructions

In a terminal with the zip command, assemble artifacts as above and then create the archive. The zip command is included by default with the Wallaroo JupyterHub service.

zip commands take the following format, with {zipfilename}.zip as the zip file to save the artifacts to, and each file thereafter as the files to add to the archive.

zip {zipfilename}.zip file1, file2, file3....

For example, the following command will add the files main.py and requirements.txt into the file hello.zip.

$ zip hello.zip main.py requirements.txt 
  adding: main.py (deflated 47%)
  adding: requirements.txt (deflated 52%)

Example requirements.txt file

dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4

Orchestration Recommendations

The following recommendations will make using Wallaroo orchestrations.

  • The version of Python used should match the same version as in the Wallaroo JupyterHub service.
  • The same version of the Wallaroo SDK should match the server. For a 2023.2.1 Wallaroo instance, use the Wallaroo SDK version 2023.2.1.
  • Specify the version of pip dependencies.
  • The wallaroo.Client constructor auth_type argument is ignored. Using wallaroo.Client() is sufficient.
  • The following methods will assist with orchestrations:
    • wallaroo.in_task() : Returns True if the code is running within an orchestration task.
    • wallaroo.task_args(): Returns a Dict of invocation-specific arguments passed to the run_ calls.
  • Orchestrations will be run in the same way as running within the Wallaroo JupyterHub service, from the version of Python libraries (unless specifically overridden by the requirements.txt setting, which is not recommended), and running in the virtualized directory /home/jovyan/.

Inference from Zero Scaled Deployments

For deployments that autoscale from 0 replicas, replica_autoscale_min_max is set with minimum=0 and replicas scale down to zero when there is no utilization for 60 minutes. When a new inference request is made, the first replica is scaled up. Once the first replica is ready, inference requests proceed as normal.

When inferencing in this scenario, including within ML Workload Orchestrations, a timeout may occur waiting for the first replica to finish spooling up. To handle situations where an autoscale deployment scales down to zero replicas, the following code example provides a way to “wake up” the pipeline with an inference request which may use mock or real data. Once the first replica is fully spooled up, inference requests proceed at full speed.

The following deploys a pipeline with 4 cpus and 3 GB RAM per replica, with the autoscale set between 0 and 5 replicas.

Once deployed, we check the pipeline’s deployment status to verify it is running. If the pipeline is still scaling, the process waits 10 seconds to allow it to finish scaling. Once an inference completes successfully, the inferences proceed as normal.

# deployment configuration with autoscaling between 0 and 5 replicas
deployment_configuration = wallaroo.DeploymentConfigBuilder()
    .autoscale_cpu_utilization(75)
    .cpus(4)
    .memory('3Gi')
    .replica_autoscale_min_max(minimum=0, maximum=5)
    .build()

# deployment with the deployment configuration
pipeline.deploy(deployment_configuration)

# verify deployment has the status `Running`
while pipeline.status()["status"] != 'Running':
    try:
        # attempt the inference
        pipeline.infer(dataframe)
    except:
        # if an exception is thrown, pass it
        pass
    # wait 10 seconds before attempting the inference again
    time.sleep(10)
# when the inference passes successfully, continue with other inferences as normal
pipeline.infer(dataframe2)
pipeline.infer(dataframe3)
...

Orchestration Code Samples

The following demonstrates using the wallaroo.in_task() and wallaroo.task_args() methods within an Orchestration. This sample code uses wallaroo.in_task() to verify whether or not the script is running as a Wallaroo Task. If true, it will gather the wallaroo.task_args() and use them to set the workspace and pipeline. If False, then it sets the pipeline and workspace manually.

# get the arguments
wl = wallaroo.Client()

# if true, get the arguments passed to the task
if wl.in_task():
  arguments = wl.task_args()
  
  # arguments is a key/value pair, set the workspace and pipeline name
  workspace_name = arguments['workspace_name']
  pipeline_name = arguments['pipeline_name']
  
# False:  We're not in a Task, so set the pipeline manually
else:
  workspace_name="bigqueryworkspace"
  pipeline_name="bigquerypipeline"

ML Workload Orchestration Methods

Upload Orchestration

Uploads an orchestration to the Wallaroo workspace.

  • REQUEST PATH
    • POST multipart/form-data /v1/api/orchestration/upload
  • PARAMETERS
    • file: The file data as Content-Type application/octet-stream.
    • metadata: The metadata including the workspace_id as Content-Type application/json.
  • RETURNS
    • id (String): The id of the orchestration in UUID format.
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/orchestration/upload"

fp = open("./api_inference_orchestration.zip", "rb")

metadata = {
    "workspace_id": workspace_id
}

response = requests.post(
    url,
    headers=headers,
    files=[("file", ("api_inference_orchestration.zip", fp, "application/octet-stream")), 
         ("metadata", ("metadata", '{"workspace_id": 5}', "application/json"))],
).json()

display(response)
orchestration_id = response['id']

{'id': 'fd823818-91cf-4d78-9ec2-f74faa1a05f3'}

List Orchestrations by Workspace

Uploads an orchestration to the Wallaroo workspace.

  • REQUEST PATH
    • POST /v1/api/orchestration/list
  • PARAMETERS
    • workspace_id (Integer Required): The numerical id of the workspace.
  • RETURNS
    • List[orchestrations]: A list of the orchestrations in the workspace.
      • id (String): The The id of the orchestration in UUID format.
      • sha: The sha hash of the orchestration.
      • name (String): The name of the orchestration.
      • file_name (String): The name of the file uploaded for the orchestration.
      • task_id (String): The task id managing unpacking and installing the orchestration.
      • owner_id (String): The user ID of the user that created the orchestration.
      • created_at (String): The timestamp of when the orchestration was created.
      • updated_at (String): The timestamp of when the orchestration was updated.

Tasks

Tasks are the implementation of an orchestration. Think of the orchestration as the instructions to follow, and the Task is the unit actually doing it.

Tasks are created at the workspace level.

Create Tasks

Tasks are created from an orchestration through the following methods.

Task TypeDescription
/v1/api/task/run_onceRun the task once based.
/v1/api/task/run_scheduledRun on a schedule, repeat every time the schedule fits the task until it is killed.

Run Task Once

Run Once aka Temporary Run tasks are created from an Orchestration with the request:

  • REQUEST PATH
    • POST /v1/api/task/run_once.
  • PARAMETERS
    • name (String Required): The name of the task to create.
    • orch_id (String Required): The id of the orchestration to create the task from.
    • timeout (Integer Optional): The timeout period to run the task before cancelling it in seconds.
    • workspace_id (Integer Required): The numerical id of the workspace to create the task within.
  • RETURNS
    • List[tasks]: A list of the tasks in the workspace.
      • id (String): The The id of the orchestration in UUID format.
      • sha: The sha hash of the orchestration.
      • name (String): The name of the orchestration.
      • file_name (String): The name of the file uploaded for the orchestration.
      • task_id (String): The task id managing unpacking and installing the orchestration.
      • owner_id (String): The user ID of the user that created the orchestration.
      • created_at (String): The timestamp of when the orchestration was created.
      • updated_at (String): The timestamp of when the orchestration was updated.
# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "json": {}
}

url=f"{APIURL}/v1/api/task/run_once"

response=requests.post(url, headers=headers, json=data).json()
display(response)

{'id': '1cb1b550-384f-4476-8619-03e0aca71409'}

Run Task Scheduled

Run Once aka Temporary Run tasks are created from an Orchestration with the request:

  • REQUEST PATH
    • POST /v1/api/task/run_once.
  • PARAMETERS
    • name (String Optional): The name of the task to create.
    • orch_id (String Required): The id of the orchestration to create the task from.
    • schedule (String Required): Schedule in the cron format of: hour, minute, day_of_week, day_of_month, month.
    • timeout (Integer Optional): The timeout period to run the task before cancelling it in seconds.
    • workspace_id (Integer Required): The numerical id of the workspace to create the task within.
  • RETURNS
    • id: The task id in UUID format.
# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "schedule": "*/1 * * * *",
    "json": {}
}

url=f"{APIURL}/v1/api/task/run_scheduled"

response=requests.post(url, headers=headers, json=data).json()
display(response)

{'id': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599'}

Get Task by ID

Retrieve the task by its id.

  • REQUEST PATH
    • POST /v1/api/task/get_by_id.
  • PARAMETERS
    • id (String Required): The id of the task to retrieve.
  • RETURNS (partial list)
    • name (String|None): The name of the task.
    • id (String): The The id of the task in UUID format.
    • image (String): The Docker image used to run the task.
    • image_tag (String): The Docker tag for the image used to run the task.
    • bind_secrets (List[String]): The service secrets used to run the task.
    • extra_env_vars (Dict): The additional variables used to run the task.
    • auth_init (Bool Default: True): Whether the authorization to run this task is automatically enabled. This allows the task to use Wallaroo resources.
    • status (String): The status of the task. Status are: pending, started, and failed.
    • workspace_id: The workspace the task is connected to.
    • killed: Whether the task has been issued the kill request.
    • created_at (String): The timestamp of when the orchestration was created.
    • updated_at (String): The timestamp of when the orchestration was updated.
    • last_runs (List[runs]): List of previous runs that display the run_id, status, and created_at.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_by_id"

data = {
    "id": task_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)

{'name': None,
 'id': '1cb1b550-384f-4476-8619-03e0aca71409',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3228',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_ID': '1cb1b550-384f-4476-8619-03e0aca71409'},
 'auth_init': True,
 'workspace_id': 5,
 'flavor': 'exec_orch_oneshot',
 'reap_threshold_secs': 900,
 'exec_type': 'job',
 'status': 'pending',
 'input_data': {},
 'killed': False,
 'created_at': '2023-05-16T15:59:57.496421+00:00',
 'updated_at': '2023-05-16T15:59:57.496421+00:00',
 'last_runs': []}

Get Tasks by Orchestration SHA

Tasks tied to the same orchestration are retrieved through the following request.

  • REQUEST
    • POST /v1/api/task/get_tasks_by_orch_sha
  • PARAMETERS
    • sha: The orchestrations SHA hash.
  • RETURNS
    • ids: List[string] List of tasks by UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_tasks_by_orch_sha"

data = {
    "sha": orchestration_sha
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'ids': ['2424a9a7-2331-42f6-bd84-90643386b130',
  'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
  'a41fe4ae-b8a4-4e1f-a45a-114df64ae2bc']}

List Tasks in Workspace

Retrieve all tasks in a workspace.

  • REQUEST PATH
    • POST /v1/api/task/list.
  • PARAMETERS
    • workspace_id (String Required): The numerical id of the workspace.
  • RETURNS (partial list)
    • List[tasks]: A list of the tasks with the following attributes.
      • name (String|None): The name of the task.
      • id (String): The The id of the task in UUID format.
      • image (String): The Docker image used to run the task.
      • image_tag (String): The Docker tag for the image used to run the task.
      • bind_secrets (List[String]): The service secrets used to run the task.
      • extra_env_vars (Dict): The additional variables used to run the task.
      • auth_init (Bool Default: True): Whether the authorization to run this task is automatically enabled. This allows the task to use Wallaroo resources.
      • status (String): The status of the task. Status are: pending, started, and failed.
      • workspace_id: The workspace the task is connected to.
      • killed: Whether the task has been issued the kill request.
      • created_at (String): The timestamp of when the orchestration was created.
      • updated_at (String): The timestamp of when the orchestration was updated.
      • last_runs (List[runs]): List of previous runs that display the run_id, status, and created_at.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/list"

data = {
    "workspace_id": workspace_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)

[{'name': None,
  'id': '33a2b093-b34f-46f1-9c59-7c4d7c0f3188',
  'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
  'image_tag': 'v2023.2.0-main-3228',
  'bind_secrets': ['minio'],
  'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
   'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
   'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
   'TASK_ID': '33a2b093-b34f-46f1-9c59-7c4d7c0f3188'},
  'auth_init': True,
  'workspace_id': 5,
  'flavor': 'exec_orch_oneshot',
  'reap_threshold_secs': 900,
  'exec_type': 'job',
  'status': 'started',
  'input_data': {},
  'killed': False,
  'created_at': '2023-05-16T14:23:56.062627+00:00',
  'updated_at': '2023-05-16T14:24:06.556855+00:00',
  'last_runs': [{'run_id': 'da1ffa75-e3c4-40bc-9bac-001f430c61d2',
    'status': 'success',
    'created_at': '2023-05-16T14:24:02.364998+00:00'}]
    }
]

Kill Task

Kills the task so it will not generate a new Task Run. Note that a Task set to Run Scheduled will generate a new Task Run each time the schedule parameters are met until the Task is killed. A Task set to Run Once will generate only one Task Run, so does not need to be killed.

  • REQUEST PATH
    • POST /v1/api/task/kill.
  • PARAMETERS
    • id (String Required): The id of th task.
  • RETURNS (partial list)
    • name (String|None): The name of the task.
    • id (String): The The id of the task in UUID format.
    • image (String): The Docker image used to run the task.
    • image_tag (String): The Docker tag for the image used to run the task.
    • bind_secrets (List[String]): The service secrets used to run the task.
    • extra_env_vars (Dict): The additional variables used to run the task.
    • auth_init (Bool Default: True): Whether the authorization to run this task is automatically enabled. This allows the task to use Wallaroo resources.
    • status (String): The status of the task. Status are: pending, started, and failed.
    • workspace_id: The workspace the task is connected to.
    • killed: Whether the task has been issued the kill request.
    • created_at (String): The timestamp of when the orchestration was created.
    • updated_at (String): The timestamp of when the orchestration was updated.
    • last_runs (List[runs]): List of previous runs that display the run_id, status, and created_at.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/kill"

data = {
    "id": scheduled_task_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)

{'name': None,
 'id': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3228',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': '3aac9f67-3050-4502-915f-fc2f871ee350',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_ID': 'd81c6e65-3b1f-42d7-8d2f-3cfc0eb51599'},
 'auth_init': True,
 'workspace_id': 5,
 'schedule': '*/1 * * * *',
 'reap_threshold_secs': 900,
 'status': 'pending_kill',
 'input_data': {},
 'killed': False,
 'created_at': '2023-05-16T16:01:15.947982+00:00',
 'updated_at': '2023-05-16T16:01:16.456502+00:00',
 'last_runs': [{'run_id': '3ff90356-b7b3-4909-b5c0-963acd19f3f5',
   'status': 'success',
   'created_at': '2023-05-16T16:02:02.159514+00:00'}]}

Task Runs

Task Runs are generated from a Task. If the Task is Run Once, then only one Task Run is generated. If the Task is a Run Scheduled task, then a new Task Run will be created each time the schedule parameters are met, with each Task Run having its own results and logs.

Task Last Runs History

The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs method that takes the following arguments. It returns the reverse chronological order of tasks runs listed by updated_at.

  • REQUEST
    • POST /v1/api/task/list_task_runs
  • PARAMETERS
    • task_id: The numerical identifier of the task.
    • status: Filters the task history by the status. If all, returns all statuses. Status values are:
      • running: The task has started.
      • failure: The task failed.
      • success: The task completed.
    • limit: The number of tasks runs to display.
  • RETURNS
    • ids: List of task runs ids in UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/list_task_runs"

data = {
    "task_id": task_id
}

response=requests.post(url, headers=headers, json=data).json()
task_run_id = response[0]['run_id']
display(response)
[{'task': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
  'run_id': '96a7f85f-e30c-40b5-9185-0dee5bd1a15e',
  'status': 'success',
  'created_at': '2023-05-22T21:08:33.112805+00:00',
  'updated_at': '2023-05-22T21:08:33.112805+00:00'}]

Get Task Run Logs

Logs for a task run are retrieved through the following process.

  • REQUEST
    • POST /v1/api/task/get_logs_for_run
  • PARAMETERS
    • id: The numerical identifier of the task run associated with the orchestration.
    • lines: The number of log lines to retrieve starting from the end of the log.
  • RETURNS
    • logs: Array of log entries.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_logs_for_run"

data = {
    "id": task_run_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
    {'logs': ["2023-05-22T21:09:17.683428502Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
      '2023-05-22T21:09:17.683489102Z stdout F Getting the workspace apiorchestrationworkspacegsze',
      '2023-05-22T21:09:17.683497403Z stdout F Getting the pipeline apipipelinegsze',
      '2023-05-22T21:09:17.683504003Z stdout F Deploying the pipeline.',
      '2023-05-22T21:09:17.683510203Z stdout F Performing sample inference.',
      '2023-05-22T21:09:17.683516203Z stdout F                      time  ... check_failures',
      '2023-05-22T21:09:17.683521903Z stdout F 0 2023-05-22 21:08:37.779  ...              0',
      '2023-05-22T21:09:17.683527803Z stdout F ',
      '2023-05-22T21:09:17.683533603Z stdout F [1 rows x 4 columns]',
      '2023-05-22T21:09:17.683540103Z stdout F Undeploying the pipeline']}