Wallaroo ML Workload Orchestration API Tutorial

A tutorial on using the ML Workload Orchestration with the Wallaroo MLOps API

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo Connection and ML Workload Orchestration API Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a workspace and pipeline with a sample model.
  2. Upload Wallaroo ML Workload Orchestration through the Wallaroo MLOps API.
  3. List available orchestrations through the Wallaroo MLOps API.
  4. Run the orchestration once as a Run Once Task through the Wallaroo MLOps API and verify that the information was saved the pipeline logs.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time
import json

import requests

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). If logging in externally, update the wallarooPrefix and wallarooSuffix variables with the proper DNS information. For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

API URL and Tutorial Variables

The variable wl.api_endpoint is used to specify the connection to the Wallaroo instance’s MLOps API URL, and is composed of the Wallaroo DNS prefix and suffix. For full details, see the Wallaroo API Connection Guide.

The other variables set for workspace, model name, and pipeline are set so this tutorial can be executed with custom user settings.

# Setting variables for later steps

display(wl.api_endpoint)

workspace_name = f'apiorchestrationworkspace'
pipeline_name = f'apipipeline'
model_name = f'apiorchestrationmodel'
model_file_name = './models/rf_model.onnx'
'https://doc-test.api.wallarooexample.ai'

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)

workspace_id = workspace.id()

pipeline = wl.build_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
nameapipipeline
created2024-04-17 15:24:07.339055+00:00
last_updated2024-04-17 16:00:10.312051+00:00
deployedFalse
archx86
accelnone
tags
versionsfb5e3da5-bbe5-49ef-845f-118f9ea06a87, a5dff8e6-168a-48be-a2ef-a2daf7d440de, f5d3e376-0cdc-43ca-a77b-11d92ed71033
stepsapiorchestrationmodel
publishedFalse
#deploy the pipeline
pipeline.deploy()
nameapipipeline
created2024-04-17 15:24:07.339055+00:00
last_updated2024-04-17 16:00:16.674235+00:00
deployedTrue
archx86
accelnone
tags
versions37c27029-bf45-4f3b-b14b-65e722752ad5, fb5e3da5-bbe5-49ef-845f-118f9ea06a87, a5dff8e6-168a-48be-a2ef-a2daf7d440de, f5d3e376-0cdc-43ca-a77b-11d92ed71033
stepsapiorchestrationmodel
publishedFalse

Wallaroo ML Workload Orchestration Example

With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
Python Library Requirements(Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

For our example, our orchestration will:

  1. Use the inference_results_connection to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline.
  2. Submit the inference results to the location specified in the external_inference_connection.

This sample script is stored in remote_inference/main.py with an empty requirements.txt file, and packaged into the orchestration as ./remote_inference/remote_inference.zip. We’ll display the steps in uploading the orchestration to the Wallaroo instance.

Note that the orchestration assumes the pipeline is already deployed.

API Upload the Orchestration

Orchestrations are uploaded via POST as a application/octet-stream with MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/upload
    • Content-Type multipart/form-data
  • PARAMETERS
    • file: The file uploaded as Content-Type as application/octet-stream.
    • metadata: Included as Content-Type as application/json with:
      • workspace_id: The numerical id of the workspace to upload the orchestration to.
      • name: The name of the orchestration.
      • The metadata specifying the workspace id and Content-Type as application/json.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{wl.api_endpoint}/v1/api/orchestration/upload"

fp = open("./api_inference_orchestration.zip", "rb")

metadata = json.dumps({
        "workspace_id": workspace_id,
        "name": "apiorchestrationsample_001"
    })

response = requests.post(
    url,
    headers=headers,
    files=[
        ("file", 
            ("api_inference_orchestration.zip", fp, "application/octet-stream")
        ),
        ("metadata", 
            ("metadata", metadata, "application/json")
        )
    ],
).json()

display(response)
orchestration_id = response['id']
{'id': 'a74bb333-e4bf-495d-b500-fc5d1146ba2b'}

API List Orchestrations

A list of orchestrations retrieved via POST MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/list
  • PARAMETERS
    • workspace_id: The numerical identifier of the workspace associated with the orchestration.
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{wl.api_endpoint}/v1/api/orchestration/list"

data = {
    'workspace_id': workspace_id
}

response=requests.post(url, headers=headers, json=data)
display(response.json())
[{'id': 'a74bb333-e4bf-495d-b500-fc5d1146ba2b',
  'workspace_id': 33,
  'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'name': 'apiorchestrationsample_001',
  'file_name': 'api_inference_orchestration.zip',
  'task_id': '82008f80-8430-4e45-a446-e1c8ecf266f1',
  'owner_id': '65124b18-8382-49af-b3c8-ada3b9df3330',
  'status': 'pending_packaging',
  'created_at': '2024-04-17T16:00:33.935691+00:00',
  'updated_at': '2024-04-17T16:00:33.935691+00:00',
  'image_path': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy:v2024.1.0-main-4921',
  'created_on_version': '2024.1.0'}]

API Get Orchestration

A list of orchestrations retrieved via POST MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/get_by_id
  • PARAMETERS
    • id: The UUID of the orchestration being retrieved.
  • RETURNS
    • id: The ID of the orchestration in UUID format.
    • workspace_id: Numerical value of the workspace the orchestration was uploaded to.
    • sha: The SHA hash value of the orchestration.
    • file_name: The file name the orchestration was uploaded as.
    • task_id: The task id in UUID format for unpacking and preparing the orchestration.
    • owner_id: The Keycloak ID of the user that uploaded the orchestration.
    • status: The status of the orchestration. Status values are:
      • packing: Preparing the orchestration to be used as a task.
      • ready: The orchestration is ready to be deployed as a task.
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{wl.api_endpoint}/v1/api/orchestration/get_by_id"

data = {
    'id': orchestration_id
}

# loop until status is ready
status = None

while status != 'ready':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)

orchestration_sha = response['sha']
{'id': 'a74bb333-e4bf-495d-b500-fc5d1146ba2b',
 'name': 'apiorchestrationsample_001',
 'created_at': '2024-04-17T16:00:33.935691+00:00',
 'workspace_id': 33,
 'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
 'file_name': 'api_inference_orchestration.zip',
 'task_id': '82008f80-8430-4e45-a446-e1c8ecf266f1',
 'owner_id': '65124b18-8382-49af-b3c8-ada3b9df3330',
 'status': 'pending_packaging'}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘packaging’}

{‘id’: ‘a74bb333-e4bf-495d-b500-fc5d1146ba2b’,
’name’: ‘apiorchestrationsample_001’,
‘created_at’: ‘2024-04-17T16:00:33.935691+00:00’,
‘workspace_id’: 33,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘82008f80-8430-4e45-a446-e1c8ecf266f1’,
‘owner_id’: ‘65124b18-8382-49af-b3c8-ada3b9df3330’,
‘status’: ‘ready’}

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggered
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.
Scheduledorchestration.run_scheduled(name, schedule, timeout, json_args)User provides schedule. Task runs exits whenever schedule dictates.

Run Task Once via API

We’ll do both a Run Once task and generate our Run Once Task from our orchestration. Orchestrations are started as a run once task with the following request:

  • REQUEST
    • POST /v1/api/orchestration/task/run_once
  • PARAMETERS
    • name (String Required): The name to assign to the task.
    • workspace_id (Integer Required): The numerical identifier of the workspace associated with the orchestration.
    • orch_id(String Required): The orchestration ID represented by a UUID.
    • json(Dict Required): The parameters to pass to the task.

Tasks are generated and run once with the Orchestration run_once method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "name": "api run once task",
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "json": {
        "workspace_name": workspace_name,
        "pipeline_name": pipeline_name
    }
}

import datetime
task_start = datetime.datetime.now()

url=f"{wl.api_endpoint}/v1/api/task/run_once"

response=requests.post(url, headers=headers, json=data).json()
display(response)
task_id = response['id']
{'id': 'f2c1e08a-1c33-4d24-87d3-c64cdd5610eb'}

Task Status via API

The list of tasks in the Wallaroo instance is retrieves through the Wallaroo MLOPs API request:

  • REQUEST
    • POST /v1/api/task/get_by_id
  • PARAMETERS
    • task: The numerical identifier of the workspace associated with the orchestration.
    • orch_id: The orchestration ID represented by a UUID.
    • json: The parameters to pass to the task.

For this example, the status of the previously created task will be generated, then looped until it has reached status started.

# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/get_by_id"

data = {
    "id": task_id
}

status = None

while status != 'started':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)
{'name': 'api run once task',
 'id': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3271',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150'},
 'auth_init': True,
 'workspace_id': 8,
 'flavor': 'exec_orch_oneshot',
 'reap_threshold_secs': 900,
 'exec_type': 'job',
 'status': 'pending',
 'input_data': {'pipeline_name': 'apipipelinegsze',
  'workspace_name': 'apiorchestrationworkspacegsze'},
 'killed': False,
 'created_at': '2023-05-22T21:08:31.099447+00:00',
 'updated_at': '2023-05-22T21:08:31.105312+00:00',
 'last_runs': []}

{’name’: ‘api run once task’,
‘id’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’,
‘image’: ‘proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy’,
‘image_tag’: ‘v2023.2.0-main-3271’,
‘bind_secrets’: [‘minio’],
’extra_env_vars’: {‘MINIO_URL’: ‘http://minio.wallaroo.svc.cluster.local:9000’,
‘ORCH_OWNER_ID’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘ORCH_SHA’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘TASK_DEBUG’: ‘false’,
‘TASK_ID’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’},
‘auth_init’: True,
‘workspace_id’: 8,
‘flavor’: ’exec_orch_oneshot’,
‘reap_threshold_secs’: 900,
’exec_type’: ‘job’,
‘status’: ‘started’,
‘input_data’: {‘pipeline_name’: ‘apipipelinegsze’,
‘workspace_name’: ‘apiorchestrationworkspacegsze’},
‘killed’: False,
‘created_at’: ‘2023-05-22T21:08:31.099447+00:00’,
‘updated_at’: ‘2023-05-22T21:08:36.585775+00:00’,
’last_runs’: [{‘run_id’: ‘96a7f85f-e30c-40b5-9185-0dee5bd1a15e’,
‘status’: ‘running’,
‘created_at’: ‘2023-05-22T21:08:33.112805+00:00’,
‘updated_at’: ‘2023-05-22T21:08:33.112805+00:00’}]}

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.

time.sleep(30)

task_end = datetime.datetime.now()
display(task_end)

pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2024, 4, 17, 10, 3, 9, 493952)
timein.tensorout.variableanomaly.count
02024-04-17 16:02:42.943[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.7]0

Get Tasks by Orchestration SHA

Tasks tied to the same orchestration are retrieved through the following request.

  • REQUEST
    • POST /v1/api/task/get_tasks_by_orch_sha
  • PARAMETERS
    • sha: The orchestrations SHA hash.
  • RETURNS
    • ids: List[string] List of tasks by UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/get_tasks_by_orch_sha"

data = {
    "sha": orchestration_sha
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'ids': ['f2c1e08a-1c33-4d24-87d3-c64cdd5610eb']}

Task Last Runs History

The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs method that takes the following arguments. It returns the reverse chronological order of tasks runs listed by updated_at.

  • REQUEST
    • POST /v1/api/task/list_task_runs
  • PARAMETERS
    • task_id: The numerical identifier of the task.
    • status: Filters the task history by the status. If all, returns all statuses. Status values are:
      • running: The task has started.
      • failure: The task failed.
      • success: The task completed.
    • limit: The number of tasks runs to display.
  • RETURNS
    • ids: List of task runs ids in UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/list_task_runs"

data = {
    "task_id": task_id
}

response=requests.post(url, headers=headers, json=data).json()
task_run_id = response[0]['run_id']
display(response)
[{'task': 'f2c1e08a-1c33-4d24-87d3-c64cdd5610eb',
  'run_id': '4ac28b96-005c-4ae4-a288-707c419401aa',
  'status': 'success',
  'created_at': '2024-04-17T16:02:34.45439+00:00',
  'updated_at': '2024-04-17T16:02:34.45439+00:00'}]

Get Task Run Logs

Logs for a task run are retrieved through the following process.

  • REQUEST
    • POST /v1/api/task/get_logs_for_run
  • PARAMETERS
    • id: The numerical identifier of the task run associated with the orchestration.
    • lines: The number of log lines to retrieve starting from the end of the log.
  • RETURNS
    • logs: Array of log entries.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/get_logs_for_run"

data = {
    "id": task_run_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2024-04-17T16:03:19.756251122Z stdout F {'pipeline_name': 'apipipeline', 'workspace_name': 'apiorchestrationworkspace'}",
  '2024-04-17T16:03:19.756440433Z stdout F Getting the workspace apiorchestrationworkspace',
  '2024-04-17T16:03:19.756451353Z stdout F Getting the pipeline apipipeline',
  '2024-04-17T16:03:19.756458473Z stdout F Deploying the pipeline.',
  '2024-04-17T16:03:19.756463673Z stdout F Performing sample inference.',
  '2024-04-17T16:03:19.756473662Z stdout F                      time  ... anomaly.count',
  '2024-04-17T16:03:19.756486613Z stdout F ',
  '2024-04-17T16:03:19.756481193Z stdout F 0 2024-04-17 16:02:42.943  ...             0',
  '2024-04-17T16:03:19.756491122Z stdout F [1 rows x 4 columns]',
  '2024-04-17T16:03:19.756495402Z stdout F Undeploying the pipeline']}

Run Task Scheduled via API

The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args). This sets up a task to run on an regular schedule as defined by the schedule parameter in the cron service format. For example:

schedule={'42 * * * *'}

Runs on the 42nd minute of every hour.

The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.

schedule={'0 0 12 1-15 2 2024'}

The Run Scheduled Task request is available at the following address:

/v1/api/task/run_scheduled

And takes the following parameters.

  • name (String Required): The name to assign to the task.
  • orch_id (String Required): The UUID orchestration ID to create the task from.
  • workspace_id (Integer Required): The numberical identifier for the workspace.
  • schedule (String Required): The schedule as a single string in cron format.
  • timeout(Integer Optional): The timeout to complete the task in seconds.
  • json (String Required): The arguments to pass to the task.

For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill task to keep the task from running any further.

It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.

# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "name": "scheduled api task",
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "schedule": "*/5 * * * *",
    "json": {
        "workspace_name": workspace_name,
        "pipeline_name": pipeline_name
    }
}

import datetime
task_start = datetime.datetime.now()

url=f"{wl.api_endpoint}/v1/api/task/run_scheduled"

response=requests.post(url, headers=headers, json=data).json()
display(response)
scheduled_task_id = response['id']
{'id': '168f6d90-2861-4896-85b4-53ffa26f09fc'}
# loop until the task is started

# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/get_by_id"

data = {
    "id": scheduled_task_id
}

status = None

while status != 'started':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)
{'id': '168f6d90-2861-4896-85b4-53ffa26f09fc',
 'name': 'scheduled api task',
 'run_type': 'CronJob',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2024.1.0-main-4921',
 'arch': 'x86',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_ID': 'a74bb333-e4bf-495d-b500-fc5d1146ba2b',
  'ORCH_OWNER_ID': '65124b18-8382-49af-b3c8-ada3b9df3330',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': '168f6d90-2861-4896-85b4-53ffa26f09fc'},
 'auth_init': True,
 'workspace_id': 33,
 'schedule': '*/5 * * * *',
 'reap_threshold_secs': 900,
 'ns_prefix': None,
 'status': 'started',
 'input_data': {'pipeline_name': 'apipipeline',
  'workspace_name': 'apiorchestrationworkspace'},
 'killed': False,
 'created_at': '2024-04-17T16:03:50.202818+00:00',
 'updated_at': '2024-04-17T16:03:52.006625+00:00',
 'last_runs': []}
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/list_task_runs"

data = {
    "task_id": scheduled_task_id
}

# loop until we have a single run task
length = 0
while length == 0:
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    length = len(response)
    time.sleep(60)
task_run_id = response[0]['run_id']
[{'task': '168f6d90-2861-4896-85b4-53ffa26f09fc',
  'run_id': '085070a1-9ece-4f85-87bf-17675de64655',
  'status': 'running',
  'created_at': '2024-04-17T16:05:01.670602+00:00',
  'updated_at': '2024-04-17T16:05:01.670602+00:00'}]
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/get_logs_for_run"

data = {
    "id": task_run_id
}

# loop until we get a log
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2024-04-17T16:06:06.469755892Z stdout F {'pipeline_name': 'apipipeline', 'workspace_name': 'apiorchestrationworkspace'}",
  '2024-04-17T16:06:06.469876442Z stdout F Getting the workspace apiorchestrationworkspace',
  '2024-04-17T16:06:06.469886242Z stdout F Getting the pipeline apipipeline',
  '2024-04-17T16:06:06.469890692Z stdout F Deploying the pipeline.',
  '2024-04-17T16:06:06.469893762Z stdout F Performing sample inference.',
  '2024-04-17T16:06:06.469896982Z stdout F                      time  ... anomaly.count',
  '2024-04-17T16:06:06.469903512Z stdout F ',
  '2024-04-17T16:06:06.469906612Z stdout F [1 rows x 4 columns]',
  '2024-04-17T16:06:06.469900582Z stdout F 0 2024-04-17 16:05:28.132  ...             0',
  '2024-04-17T16:06:06.469910382Z stdout F Undeploying the pipeline']}

Cleanup

With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.

Kill Task via API

Started tasks are killed through the following request:

/v1/api/task/kill

And takes the following parameters.

  • orch_id (String Required): The UUID orchestration ID to create the task from.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{wl.api_endpoint}/v1/api/task/kill"

data = {
    "id": scheduled_task_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'id': '168f6d90-2861-4896-85b4-53ffa26f09fc',
 'name': 'scheduled api task',
 'run_type': 'CronJob',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2024.1.0-main-4921',
 'arch': 'x86',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_ID': 'a74bb333-e4bf-495d-b500-fc5d1146ba2b',
  'ORCH_OWNER_ID': '65124b18-8382-49af-b3c8-ada3b9df3330',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': '168f6d90-2861-4896-85b4-53ffa26f09fc'},
 'auth_init': True,
 'workspace_id': 33,
 'schedule': '*/5 * * * *',
 'reap_threshold_secs': 900,
 'ns_prefix': None,
 'status': 'pending_kill',
 'input_data': {'pipeline_name': 'apipipeline',
  'workspace_name': 'apiorchestrationworkspace'},
 'killed': False,
 'created_at': '2024-04-17T16:03:50.202818+00:00',
 'updated_at': '2024-04-17T16:03:52.006625+00:00',
 'last_runs': [{'run_id': '085070a1-9ece-4f85-87bf-17675de64655',
   'status': 'success',
   'created_at': '2024-04-17T16:05:01.670602+00:00',
   'updated_at': '2024-04-17T16:05:01.670602+00:00'}]}

Close Resources

With the tutorial complete, we’ll verify the pipeline is closed so the resources are assigned back to the Wallaroo instance.

pipeline.undeploy()
nameapipipeline
created2024-04-17 15:24:07.339055+00:00
last_updated2024-04-17 16:00:16.674235+00:00
deployedFalse
archx86
accelnone
tags
versions37c27029-bf45-4f3b-b14b-65e722752ad5, fb5e3da5-bbe5-49ef-845f-118f9ea06a87, a5dff8e6-168a-48be-a2ef-a2daf7d440de, f5d3e376-0cdc-43ca-a77b-11d92ed71033
stepsapiorchestrationmodel
publishedFalse