1 - Wallaroo ML Workload Orchestration API Tutorial

A tutorial on using the ML Workload Orchestration with the Wallaroo MLOps API

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo Connection and ML Workload Orchestration API Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a workspace and pipeline with a sample model.
  2. Upload Wallaroo ML Workload Orchestration through the Wallaroo MLOps API.
  3. List available orchestrations through the Wallaroo MLOps API.
  4. Run the orchestration once as a Run Once Task through the Wallaroo MLOps API and verify that the information was saved the pipeline logs.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time

import requests

# Used to create unique workspace and pipeline names
import string
import random

# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'gsze'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). If logging in externally, update the wallarooPrefix and wallarooSuffix variables with the proper DNS information. For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

API URL

The variable APIURL is used to specify the connection to the Wallaroo instance’s MLOps API URL, and is composed of the Wallaroo DNS prefix and suffix. For full details, see the Wallaroo API Connection Guide
.

The variables wallarooPrefix and wallarooSuffix variables will be used to derive the API url. For example, if the Wallaroo Prefix is doc-test and the url is example.com, then the MLOps API URL would be doc-test.api.example.com/v1/api/{request}.

Note the . is part of the prefix. If there is no prefix, then wallarooPrefix = ""

Set the Wallaroo Prefix and Suffix in the code segment below based on your Wallaroo instance.

# Setting variables for later steps

wallarooPrefix = "YOUR PREFIX."

wallarooSuffix = "YOUR SUFFIX"

APIURL = f"https://{wallarooPrefix}api.{wallarooSuffix}"

workspace_name = f'apiorchestrationworkspace{suffix}'
pipeline_name = f'apipipeline{suffix}'
model_name = f'apiorchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'

Helper Methods

The following helper methods are used to either create or get workspaces, pipelines, and connections.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

def get_connection(name, connection_type, connection_arguments):
    try:
        connection = wl.get_connection(name)
    except RequiredAttributeMissing:
        connection =wl.create_connection(name, 
                  connection_type, 
                  connection_arguments)
    return connection

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

workspace_id = workspace.id()

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
nameapipipelinegsze
created2023-05-22 20:48:30.700499+00:00
last_updated2023-05-22 20:48:30.700499+00:00
deployed(none)
tags
versions101b252a-623c-4185-a24d-ec00593dda79
steps
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ......... ok
nameapipipelinegsze
created2023-05-22 20:48:30.700499+00:00
last_updated2023-05-22 20:48:31.357336+00:00
deployedTrue
tags
versionsaac61b5a-e4f4-4ea3-9347-6482c330b5f5, 101b252a-623c-4185-a24d-ec00593dda79
stepsapiorchestrationmodelgsze

Wallaroo ML Workload Orchestration Example

With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
Python Library Requirements(Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

For our example, our orchestration will:

  1. Use the inference_results_connection to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline.
  2. Submit the inference results to the location specified in the external_inference_connection.

This sample script is stored in remote_inference/main.py with an empty requirements.txt file, and packaged into the orchestration as ./remote_inference/remote_inference.zip. We’ll display the steps in uploading the orchestration to the Wallaroo instance.

Note that the orchestration assumes the pipeline is already deployed.

API Upload the Orchestration

Orchestrations are uploaded via POST as a application/octet-stream with MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/upload
    • Content-Type multipart/form-data
  • PARAMETERS
    • file: The file uploaded as Content-Type as application/octet-stream.
    • metadata: Included as Content-Type as application/json with:
      • workspace_id: The numerical id of the workspace to upload the orchestration to.
      • name: The name of the orchestration.
      • The metadata specifying the workspace id and Content-Type as application/json.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/orchestration/upload"

fp = open("./api_inference_orchestration.zip", "rb")

metadata = f'{{"workspace_id": {workspace_id},"name": "apiorchestrationsample"}}'

response = requests.post(
    url,
    headers=headers,
    files=[
        ("file", 
            ("api_inference_orchestration.zip", fp, "application/octet-stream")
        ),
        ("metadata", 
            ("metadata", metadata, "application/json")
        )
    ],
).json()

display(response)
orchestration_id = response['id']
{'id': 'b951f7b8-0690-4004-86bf-cc9802359313'}

API List Orchestrations

A list of orchestrations retrieved via POST MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/list
  • PARAMETERS
    • workspace_id: The numerical identifier of the workspace associated with the orchestration.
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/orchestration/list"

data = {
    'workspace_id': workspace_id
}

response=requests.post(url, headers=headers, json=data)
display(response.json())
[{'id': 'b951f7b8-0690-4004-86bf-cc9802359313',
  'workspace_id': 8,
  'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'name': 'apiorchestrationsample',
  'file_name': 'api_inference_orchestration.zip',
  'task_id': 'c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef',
  'owner_id': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
  'status': 'pending_packaging',
  'created_at': '2023-05-22T20:48:41.233482+00:00',
  'updated_at': '2023-05-22T20:48:41.233482+00:00'}]

API Get Orchestration

A list of orchestrations retrieved via POST MLOps API route:

  • REQUEST
    • POST /v1/api/orchestration/get_by_id
  • PARAMETERS
    • id: The UUID of the orchestration being retrieved.
  • RETURNS
    • id: The ID of the orchestration in UUID format.
    • workspace_id: Numerical value of the workspace the orchestration was uploaded to.
    • sha: The SHA hash value of the orchestration.
    • file_name: The file name the orchestration was uploaded as.
    • task_id: The task id in UUID format for unpacking and preparing the orchestration.
    • owner_id: The Keycloak ID of the user that uploaded the orchestration.
    • status: The status of the orchestration. Status values are:
      • packing: Preparing the orchestration to be used as a task.
      • ready: The orchestration is ready to be deployed as a task.
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/orchestration/get_by_id"

data = {
    'id': orchestration_id
}

# loop until status is ready
status = None

while status != 'ready':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)

orchestration_sha = response['sha']
{'id': 'b951f7b8-0690-4004-86bf-cc9802359313',
 'workspace_id': 8,
 'sha': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
 'file_name': 'api_inference_orchestration.zip',
 'task_id': 'c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef',
 'owner_id': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
 'status': 'packaging'}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘packaging’}

{‘id’: ‘b951f7b8-0690-4004-86bf-cc9802359313’,
‘workspace_id’: 8,
‘sha’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘file_name’: ‘api_inference_orchestration.zip’,
’task_id’: ‘c9622cf8-cbe5-4e3c-b64c-dabd6c5b7fef’,
‘owner_id’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘status’: ‘ready’}

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggered
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.
Scheduledorchestration.run_scheduled(name, schedule, timeout, json_args)User provides schedule. Task runs exits whenever schedule dictates.

Run Task Once via API

We’ll do both a Run Once task and generate our Run Once Task from our orchestration. Orchestrations are started as a run once task with the following request:

  • REQUEST
    • POST /v1/api/orchestration/task/run_once
  • PARAMETERS
    • name (String Required): The name to assign to the task.
    • workspace_id (Integer Required): The numerical identifier of the workspace associated with the orchestration.
    • orch_id(String Required): The orchestration ID represented by a UUID.
    • json(Dict Required): The parameters to pass to the task.

Tasks are generated and run once with the Orchestration run_once method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "name": "api run once task",
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "json": {
        "workspace_name": workspace_name,
        "pipeline_name": pipeline_name
    }
}

import datetime
task_start = datetime.datetime.now()

url=f"{APIURL}/v1/api/task/run_once"

response=requests.post(url, headers=headers, json=data).json()
display(response)
task_id = response['id']
{'id': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150'}

Task Status via API

The list of tasks in the Wallaroo instance is retrieves through the Wallaroo MLOPs API request:

  • REQUEST
    • POST /v1/api/task/get_by_id
  • PARAMETERS
    • task: The numerical identifier of the workspace associated with the orchestration.
    • orch_id: The orchestration ID represented by a UUID.
    • json: The parameters to pass to the task.

For this example, the status of the previously created task will be generated, then looped until it has reached status started.

# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_by_id"

data = {
    "id": task_id
}

status = None

while status != 'started':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)
{'name': 'api run once task',
 'id': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3271',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150'},
 'auth_init': True,
 'workspace_id': 8,
 'flavor': 'exec_orch_oneshot',
 'reap_threshold_secs': 900,
 'exec_type': 'job',
 'status': 'pending',
 'input_data': {'pipeline_name': 'apipipelinegsze',
  'workspace_name': 'apiorchestrationworkspacegsze'},
 'killed': False,
 'created_at': '2023-05-22T21:08:31.099447+00:00',
 'updated_at': '2023-05-22T21:08:31.105312+00:00',
 'last_runs': []}

{’name’: ‘api run once task’,
‘id’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’,
‘image’: ‘proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy’,
‘image_tag’: ‘v2023.2.0-main-3271’,
‘bind_secrets’: [‘minio’],
’extra_env_vars’: {‘MINIO_URL’: ‘http://minio.wallaroo.svc.cluster.local:9000’,
‘ORCH_OWNER_ID’: ‘a6857628-b0aa-451e-a0a0-bbc1d6eea6e0’,
‘ORCH_SHA’: ‘d3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e’,
‘TASK_DEBUG’: ‘false’,
‘TASK_ID’: ‘c868aa44-f7fe-4e3d-b11d-e1e6af3ec150’},
‘auth_init’: True,
‘workspace_id’: 8,
‘flavor’: ’exec_orch_oneshot’,
‘reap_threshold_secs’: 900,
’exec_type’: ‘job’,
‘status’: ‘started’,
‘input_data’: {‘pipeline_name’: ‘apipipelinegsze’,
‘workspace_name’: ‘apiorchestrationworkspacegsze’},
‘killed’: False,
‘created_at’: ‘2023-05-22T21:08:31.099447+00:00’,
‘updated_at’: ‘2023-05-22T21:08:36.585775+00:00’,
’last_runs’: [{‘run_id’: ‘96a7f85f-e30c-40b5-9185-0dee5bd1a15e’,
‘status’: ‘running’,
‘created_at’: ‘2023-05-22T21:08:33.112805+00:00’,
‘updated_at’: ‘2023-05-22T21:08:33.112805+00:00’}]}

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.

time.sleep(30)

task_end = datetime.datetime.now()
display(task_end)

pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 22, 21, 9, 32, 447321)
timein.tensorout.variablecheck_failures
02023-05-22 21:08:37.779[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.7]0

Get Tasks by Orchestration SHA

Tasks tied to the same orchestration are retrieved through the following request.

  • REQUEST
    • POST /v1/api/task/get_tasks_by_orch_sha
  • PARAMETERS
    • sha: The orchestrations SHA hash.
  • RETURNS
    • ids: List[string] List of tasks by UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_tasks_by_orch_sha"

data = {
    "sha": orchestration_sha
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'ids': ['2424a9a7-2331-42f6-bd84-90643386b130',
  'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
  'a41fe4ae-b8a4-4e1f-a45a-114df64ae2bc']}

Task Last Runs History

The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs method that takes the following arguments. It returns the reverse chronological order of tasks runs listed by updated_at.

  • REQUEST
    • POST /v1/api/task/list_task_runs
  • PARAMETERS
    • task_id: The numerical identifier of the task.
    • status: Filters the task history by the status. If all, returns all statuses. Status values are:
      • running: The task has started.
      • failure: The task failed.
      • success: The task completed.
    • limit: The number of tasks runs to display.
  • RETURNS
    • ids: List of task runs ids in UUID.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/list_task_runs"

data = {
    "task_id": task_id
}

response=requests.post(url, headers=headers, json=data).json()
task_run_id = response[0]['run_id']
display(response)
[{'task': 'c868aa44-f7fe-4e3d-b11d-e1e6af3ec150',
  'run_id': '96a7f85f-e30c-40b5-9185-0dee5bd1a15e',
  'status': 'success',
  'created_at': '2023-05-22T21:08:33.112805+00:00',
  'updated_at': '2023-05-22T21:08:33.112805+00:00'}]

Get Task Run Logs

Logs for a task run are retrieved through the following process.

  • REQUEST
    • POST /v1/api/task/get_logs_for_run
  • PARAMETERS
    • id: The numerical identifier of the task run associated with the orchestration.
    • lines: The number of log lines to retrieve starting from the end of the log.
  • RETURNS
    • logs: Array of log entries.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_logs_for_run"

data = {
    "id": task_run_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2023-05-22T21:09:17.683428502Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
  '2023-05-22T21:09:17.683489102Z stdout F Getting the workspace apiorchestrationworkspacegsze',
  '2023-05-22T21:09:17.683497403Z stdout F Getting the pipeline apipipelinegsze',
  '2023-05-22T21:09:17.683504003Z stdout F Deploying the pipeline.',
  '2023-05-22T21:09:17.683510203Z stdout F Performing sample inference.',
  '2023-05-22T21:09:17.683516203Z stdout F                      time  ... check_failures',
  '2023-05-22T21:09:17.683521903Z stdout F 0 2023-05-22 21:08:37.779  ...              0',
  '2023-05-22T21:09:17.683527803Z stdout F ',
  '2023-05-22T21:09:17.683533603Z stdout F [1 rows x 4 columns]',
  '2023-05-22T21:09:17.683540103Z stdout F Undeploying the pipeline']}

Run Task Scheduled via API

The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args). This sets up a task to run on an regular schedule as defined by the schedule parameter in the cron service format. For example:

schedule={'42 * * * *'}

Runs on the 42nd minute of every hour.

The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.

schedule={'0 0 12 1-15 2 2024'}

The Run Scheduled Task request is available at the following address:

/v1/api/task/run_scheduled

And takes the following parameters.

  • name (String Required): The name to assign to the task.
  • orch_id (String Required): The UUID orchestration ID to create the task from.
  • workspace_id (Integer Required): The numberical identifier for the workspace.
  • schedule (String Required): The schedule as a single string in cron format.
  • timeout(Integer Optional): The timeout to complete the task in seconds.
  • json (String Required): The arguments to pass to the task.

For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill task to keep the task from running any further.

It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.

# retrieve the authorization token
headers = wl.auth.auth_header()

data = {
    "name": "scheduled api task",
    "workspace_id": workspace_id,
    "orch_id": orchestration_id,
    "schedule": "*/5 * * * *",
    "json": {
        "workspace_name": workspace_name,
        "pipeline_name": pipeline_name
    }
}

import datetime
task_start = datetime.datetime.now()

url=f"{APIURL}/v1/api/task/run_scheduled"

response=requests.post(url, headers=headers, json=data).json()
display(response)
scheduled_task_id = response['id']
{'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910'}
# loop until the task is started

# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_by_id"

data = {
    "id": scheduled_task_id
}

status = None

while status != 'started':
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    status = response['status']
    time.sleep(10)
{'name': 'scheduled api task',
 'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3271',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': '87c2c7c0-e17e-4c00-9407-bfc44d632910'},
 'auth_init': True,
 'workspace_id': 8,
 'schedule': '*/5 * * * *',
 'reap_threshold_secs': 900,
 'status': 'started',
 'input_data': {'pipeline_name': 'apipipelinegsze',
  'workspace_name': 'apiorchestrationworkspacegsze'},
 'killed': False,
 'created_at': '2023-05-22T21:10:22.43957+00:00',
 'updated_at': '2023-05-22T21:10:22.871615+00:00',
 'last_runs': []}
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/list_task_runs"

data = {
    "task_id": scheduled_task_id
}

# loop until we have a single run task
length = 0
while length == 0:
    response=requests.post(url, headers=headers, json=data).json()
    display(response)
    length = len(response)
    time.sleep(10)
task_run_id = response[0]['run_id']
[{'task': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
  'run_id': 'ad3d427a-3643-4f10-9b94-116688b32355',
  'status': 'running',
  'created_at': '2023-05-22T21:15:02.148274+00:00',
  'updated_at': '2023-05-22T21:15:02.148274+00:00'}]
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/get_logs_for_run"

data = {
    "id": task_run_id
}

# loop until we get a log
response=requests.post(url, headers=headers, json=data).json()
display(response)
{'logs': ["2023-05-22T21:15:55.548754679Z stdout F {'pipeline_name': 'apipipelinegsze', 'workspace_name': 'apiorchestrationworkspacegsze'}",
  '2023-05-22T21:15:55.54880928Z stdout F Getting the workspace apiorchestrationworkspacegsze',
  '2023-05-22T21:15:55.54881708Z stdout F Getting the pipeline apipipelinegsze',
  '2023-05-22T21:15:55.54882328Z stdout F Deploying the pipeline.',
  '2023-05-22T21:15:55.54883088Z stdout F Performing sample inference.',
  '2023-05-22T21:15:55.54883628Z stdout F                      time  ... check_failures',
  '2023-05-22T21:15:55.54884248Z stdout F 0 2023-05-22 21:15:17.420  ...              0',
  '2023-05-22T21:15:55.54884788Z stdout F ',
  '2023-05-22T21:15:55.54885348Z stdout F [1 rows x 4 columns]',
  '2023-05-22T21:15:55.54885938Z stdout F Undeploying the pipeline']}

Cleanup

With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.

Kill Task via API

Started tasks are killed through the following request:

/v1/api/task/kill

And takes the following parameters.

  • orch_id (String Required): The UUID orchestration ID to create the task from.
# retrieve the authorization token
headers = wl.auth.auth_header()

url=f"{APIURL}/v1/api/task/kill"

data = {
    "id": scheduled_task_id
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
{'name': 'scheduled api task',
 'id': '87c2c7c0-e17e-4c00-9407-bfc44d632910',
 'image': 'proxy.replicated.com/proxy/wallaroo/ghcr.io/wallaroolabs/arbex-orch-deploy',
 'image_tag': 'v2023.2.0-main-3271',
 'bind_secrets': ['minio'],
 'extra_env_vars': {'MINIO_URL': 'http://minio.wallaroo.svc.cluster.local:9000',
  'ORCH_OWNER_ID': 'a6857628-b0aa-451e-a0a0-bbc1d6eea6e0',
  'ORCH_SHA': 'd3b93c9f280734106376e684792aa8b4285d527092fe87d89c74ec804f8e169e',
  'TASK_DEBUG': 'false',
  'TASK_ID': '87c2c7c0-e17e-4c00-9407-bfc44d632910'},
 'auth_init': True,
 'workspace_id': 8,
 'schedule': '*/5 * * * *',
 'reap_threshold_secs': 900,
 'status': 'pending_kill',
 'input_data': {'pipeline_name': 'apipipelinegsze',
  'workspace_name': 'apiorchestrationworkspacegsze'},
 'killed': False,
 'created_at': '2023-05-22T21:10:22.43957+00:00',
 'updated_at': '2023-05-22T21:10:22.871615+00:00',
 'last_runs': [{'run_id': 'ad3d427a-3643-4f10-9b94-116688b32355',
   'status': 'success',
   'created_at': '2023-05-22T21:15:02.148274+00:00',
   'updated_at': '2023-05-22T21:15:02.148274+00:00'}]}

Close Resources

With the tutorial complete, we’ll verify the pipeline is closed so the resources are assigned back to the Wallaroo instance.

pipeline.undeploy()
 ok
nameapipipelinegsze
created2023-05-22 20:48:30.700499+00:00
last_updated2023-05-22 20:48:31.357336+00:00
deployedFalse
tags
versionsaac61b5a-e4f4-4ea3-9347-6482c330b5f5, 101b252a-623c-4185-a24d-ec00593dda79
stepsapiorchestrationmodelgsze

2 - Wallaroo Connection API with Google BigQuery Tutorial

A tutorial on using the Wallaroo MLOps API with Wallaroo Connections for Google BigQuery connections.

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo Connection and ML Workload Orchestration with BigQuery House Price Model Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections to organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

This tutorial will focus on using Google BigQuery as the data source.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
  2. Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
  3. Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
  4. Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
  5. Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support
  • The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.

Tutorial Resources

  • Models:
    • models/rf_model.onnx: A model that predicts house price values.
  • Data:
    • data/xtest-1.df.json and data/xtest-1k.df.json: DataFrame JSON inference inputs with 1 input and 1,000 inputs.
    • data/xtest-1k.arrow: Apache Arrow inference inputs with 1 input and 1,000 inputs.
    • Sample inference inputs in CSV that can be imported into Google BigQuery.
      • data/xtest-1k.df.json: Random sample housing prices.
      • data/smallinputs.df.json: Sample housing prices that return results lower than $1.5 million.
      • data/biginputs.df.json: Sample housing prices that return results higher than $1.5 million.
    • SQL queries to create the inputs/outputs tables with schema.
      • ./resources/create_inputs_table.sql: Inputs table with schema.
      • ./resources/create_outputs_table.sql: Outputs table with schema.
      • ./resources/housrpricesga_inputs.avro: Avro container of inputs table.

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time
import json

# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes

import requests

# used for unique connection names

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0rc3'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). If logging in externally, update the wallarooPrefix and wallarooSuffix variables with the proper DNS information. For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

API URL

The variable APIURL is used to specify the connection to the Wallaroo instance’s MLOps API URL, and is composed of the Wallaroo DNS prefix and suffix. For full details, see the Wallaroo API Connection Guide
.

The variables wallarooPrefix and wallarooSuffix variables will be used to derive the API url. For example, if the Wallaroo Prefix is doc-test. and the url is example.com, then the MLOps API URL would be doc-test.api.example.com/v1/api/{request}.

Note the . is part of the prefix. If there is no prefix, then wallarooPrefix = ""

Set the Wallaroo Prefix and Suffix in the code segment below based on your Wallaroo instance.

Variable Declaration

The following variables will be used for our big query testing.

We’ll use two connections:

  • bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
  • bigquery_output_connection: The connection that will upload inference results into a BigQuery table.

Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json and ./bigquery_service_account_output_key.json that include the service account key file(SAK) information, as well as the dataset and table used.

FieldIncluded in SAK
type
project_id
private_key_id
private_key
client_email
auth_uri
token_uri
auth_provider_x509_cert_url
client_x509_cert_url
database🚫
table🚫
# Setting variables for later steps

wallarooPrefix = "YOUR PREFIX."
wallarooSuffix = "YOUR SUFFIX"
APIURL = f"https://{wallarooPrefix}api.{wallarooSuffix}"

# Setting variables for later steps

workspace_name = 'bigqueryapiworkspace'
pipeline_name = 'bigqueryapipipeline'
model_name = 'bigqueryapimodel'
model_file_name = './models/rf_model.onnx'

bigquery_connection_input_name = f"bigqueryhouseapiinput{suffix}"
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open('./bigquery_service_account_input_key.json'))

bigquery_connection_output_name = f"bigqueryhouseapioutputs{suffix}"
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open('./bigquery_service_account_output_key.json'))

Helper Methods

The following helper methods are used to either create or get workspaces, pipelines, and connections.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

def get_connection(name, connection_type, connection_arguments):
    try:
        connection = wl.get_connection(name)
    except RequiredAttributeMissing:
        connection =wl.create_connection(name, 
                  connection_type, 
                  connection_arguments)
    return connection

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

workspace_id = workspace.id()

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
namebigqueryapipipeline
created2023-05-17 16:06:45.400005+00:00
last_updated2023-05-17 16:06:48.150211+00:00
deployedTrue
tags
versions5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3
stepsbigqueryapimodel
#deploy the pipeline
pipeline.deploy()
namebigqueryapipipeline
created2023-05-17 16:06:45.400005+00:00
last_updated2023-05-17 16:11:23.317215+00:00
deployedTrue
tags
versions153a11e5-7968-450c-aef5-d1be17c6b173, 5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3
stepsbigqueryapimodel

Connection Management via the Wallaroo MLOps API

The following steps will demonstration using the Wallaroo MLOps API to:

  • Create the BigQuery connections
  • Add the connections to the targeted workspace
  • Use the connections for inference requests and uploading the results to a BigQuery dataset table.

Create Connections via API

We will create the data source connection via the Wallaroo api request:

/v1/api/connections/create

This takes the following parameters:

  • name (String Required): The name of the connection.

  • type (String Required): The user defined type of connection.

  • details (String Required): User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339}, or {'token':'abcde123==', 'host':'example.com', 'port:1234'}, or other user defined combinations.

  • IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same name as an existing data connection will result in an error.

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/connections/create"

# input connection
data = {
    'name': bigquery_connection_input_name,
    'type' : bigquery_connection_input_type,
    'details': bigquery_connection_input_argument
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
# saved for later steps
connection_input_id = response['id']
{'id': '839779d7-d4b3-4e1c-953a-2f5ef55f1bb2'}
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/connections/create"

# output connection
data = {
    'name': bigquery_connection_output_name,
    'type' : bigquery_connection_output_type,
    'details': bigquery_connection_output_argument
}

response=requests.post(url, headers=headers, json=data).json()
display(response)
# saved for later steps
connection_output_id = response['id']
{'id': 'ef0c62e7-e59f-4c43-bdff-05ed0976cffa'}

Add Connections to Workspace via API

The connections will be added to the sample workspace with the MLOps API request:

/v1/api/connections/add_to_workspace

This takes the following parameters:

  • workspace_id (String Required): The name of the connection.
  • connection_id (String Required): The UUID connection ID
# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/connections/add_to_workspace"

data = {
    'workspace_id': workspace_id,
    'connection_id': connection_input_id
}

response=requests.post(url, headers=headers, json=data)
display(response.json())

data = {
    'workspace_id': workspace_id,
    'connection_id': connection_output_id
}

response=requests.post(url, headers=headers, json=data)
display(response.json())
{'id': '3877da9d-e450-4b9a-85dd-7f68127b1313'}

{‘id’: ‘15f247ab-833f-468a-b7c5-9b50831052e6’}

Connect to Google BigQuery

With our connections set, we’ll now use them for an inference request through the following steps:

  1. Retrieve the input data from a BigQuery request from the input connection details.
  2. Perform the inference.
  3. Upload the inference results into another BigQuery table from the output connection details.

Create Google Credentials

From our BigQuery request, we’ll create the credentials for our BigQuery connection.

We will use the MLOps API call:

/v1/api/connections/get

to retrieve the connection. This request takes the following parameters:

  • name (String Required): The name of the connection.
# get the connection input details

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/connections/get"

data = {
    'name': bigquery_connection_input_name
}

connection_input_details=requests.post(url, headers=headers, json=data).json()['details']
# get the connection output details

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/connections/get"

data = {
    'name': bigquery_connection_output_name
}

connection_output_details=requests.post(url, headers=headers, json=data).json()['details']
# Set the bigquery credentials

bigquery_input_credentials = service_account.Credentials.from_service_account_info(
    connection_input_details)

bigquery_output_credentials = service_account.Credentials.from_service_account_info(
    connection_output_details)

Connect to Google BigQuery

We can now generate a client from our connection details, specifying the project that was included in the big_query_connection details.

bigqueryinputclient = bigquery.Client(
    credentials=bigquery_input_credentials, 
    project=connection_input_details['project_id']
)
bigqueryoutputclient = bigquery.Client(
    credentials=bigquery_output_credentials, 
    project=connection_output_details['project_id']
)

Query Data

Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json. The table is expected to be in the format of the file ./data/xtest-1k.df.json.

inference_dataframe_input = bigqueryinputclient.query(
        f"""
        SELECT tensor
        FROM {connection_input_details['dataset']}.{connection_input_details['table']}"""
    ).to_dataframe()
inference_dataframe_input.head(5)
tensor
0[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0]
1[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0]
2[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0]
3[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0]
4[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0]

Sample Inference

With our data retrieved, we’ll perform an inference and display the results.

result = pipeline.infer(inference_dataframe_input)
display(result.head(5))
timein.tensorout.variablecheck_failures
02023-05-17 16:12:53.970[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-17 16:12:53.970[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-17 16:12:53.970[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-17 16:12:53.970[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-17 16:12:53.970[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0

Upload the Results

With the query complete, we’ll upload the results back to the BigQuery dataset.

output_table = bigqueryoutputclient.get_table(f"{connection_output_details['dataset']}.{connection_output_details['table']}")

bigqueryoutputclient.insert_rows_from_dataframe(
    output_table, 
    dataframe=result.rename(columns={"in.tensor":"in_tensor", "out.variable":"out_variable"})
)
[[], []]

Verify the Upload

We can verify the upload by requesting the last few rows of the output table.

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {connection_output_details['dataset']}.{connection_output_details['table']}
        ORDER BY time DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
timein_tensorout_variablecheck_failures
02023-05-17 16:12:53.970[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
12023-05-17 16:12:53.970[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
22023-05-17 16:12:53.970[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
32023-05-17 16:12:53.970[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
42023-05-17 16:12:53.970[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0

Cleanup

With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.

pipeline.undeploy()
namebigqueryapipipeline
created2023-05-17 16:06:45.400005+00:00
last_updated2023-05-17 16:11:23.317215+00:00
deployedFalse
tags
versions153a11e5-7968-450c-aef5-d1be17c6b173, 5e78de4f-51f1-43ce-8c2c-d724a05f856a, d2f03a9c-dfbb-4a03-a014-e70aa80902e3
stepsbigqueryapimodel

3 - Wallaroo ML Workload Orchestration Simple Tutorial

A tutorial on using the ML Workload Orchestration for simple inference automation.

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo Connection and ML Workload Orchestration Simple Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a Wallaroo connection to retrieving information from an external source.
  2. Upload Wallaroo ML Workload Orchestration.
  3. Run the orchestration once as a Run Once Task and verify that the information was saved the pipeline logs.
  4. Schedule the orchestration as a Scheduled Task and verify that the information was saved to the pipeline logs.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time

# Used to create unique workspace and pipeline names
import string
import random

# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'dtzw'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()
# Setting variables for later steps

workspace_name = f'simpleorchestrationworkspace{suffix}'
pipeline_name = f'simpleorchestrationpipeline{suffix}'
model_name = f'simpleorchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'

inference_connection_name = f'external_inference_connection{suffix}'
inference_connection_type = "HTTP"
inference_connection_argument = {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}

Helper Methods

The following helper methods are used to either create or get workspaces, pipelines, and connections.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
namesimpleorchestrationpipelinedtzw
created2023-05-23 15:26:00.268667+00:00
last_updated2023-05-23 15:26:00.268667+00:00
deployed(none)
tags
versions9a5afba3-c664-4d57-8c08-fc072d3f549c
steps
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ....................... ok
namesimpleorchestrationpipelinedtzw
created2023-05-23 15:26:00.268667+00:00
last_updated2023-05-23 15:26:00.568944+00:00
deployedTrue
tags
versions24f5d5e9-59fe-4440-9e08-78c0003226df, 9a5afba3-c664-4d57-8c08-fc072d3f549c
stepssimpleorchestrationmodeldtzw

Create Connections

We will create the data source connection via the Wallaroo client command create_connection.

Connections are created with the Wallaroo client command create_connection with the following parameters.

ParameterTypeDescription
namestring (Required)The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error.
typestring (Required)The user defined type of connection.
detailsDict (Required)User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339}, or {'token':'abcde123==', 'host':'example.com', 'port:1234'}, or other user defined combinations.
  • IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same name as an existing data connection will result in an error.

We’ll also create a data connection named inference_results_connection with our helper function get_connection that will either create or retrieve a connection if it already exists. From there we’ll create out connections:

  • houseprice_arrow_table: An Apache Arrow file stored on GitHub that will be used for our inference input.
wl.create_connection(inference_connection_name, inference_connection_type, inference_connection_argument)
FieldValue
Nameexternal_inference_connectiondtzw
Connection TypeHTTP
Details*****
Created At2023-05-23T15:26:24.613152+00:00
Linked Workspaces[]

Get Connection by Name

The Wallaroo client method get_connection(name) retrieves the connection that matches the name parameter. We’ll retrieve our connection and store it as inference_source_connection.

inference_source_connection = wl.get_connection(name=inference_connection_name)
display(inference_source_connection)
FieldValue
Nameexternal_inference_connectiondtzw
Connection TypeHTTP
Details*****
Created At2023-05-23T15:26:24.613152+00:00
Linked Workspaces[]

Add Connection to Workspace

The method Workspace add_connection(connection_name) adds a Data Connection to a workspace, and takes the following parameters.

ParameterTypeDescription
namestring (Required)The name of the Data Connection

We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.

workspace.add_connection(inference_connection_name)
workspace.list_connections()
nameconnection typedetailscreated atlinked workspaces
external_inference_connectiondtzwHTTP*****2023-05-23T15:26:24.613152+00:00['simpleorchestrationworkspacedtzw']

Wallaroo ML Workload Orchestration Example

With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
Python Library Requirements(Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

For our example, our orchestration will:

  1. Use the inference_results_connection to open a HTTP Get connection to the inference data file and use it in an inference request in the deployed pipeline.
  2. Submit the inference results to the location specified in the external_inference_connection.

This sample script is stored in remote_inference/main.py with an empty requirements.txt file, and packaged into the orchestration as ./remote_inference/remote_inference.zip. We’ll display the steps in uploading the orchestration to the Wallaroo instance.

Note that the orchestration assumes the pipeline is already deployed.

Upload the Orchestration

Orchestrations are uploaded with the Wallaroo client upload_orchestration(path) method with the following parameters.

ParameterTypeDescription
pathstring (Required)The path to the ZIP file to be uploaded.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

For this example, the orchestration ./remote_inference/remote_inference.zip will be uploaded and saved to the variable orchestration.

Orchestration Status

We will loop until the uploaded orchestration’s status displays ready.

orchestration = wl.upload_orchestration(path="./remote_inference/remote_inference.zip")

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
701c991a-a716-4bca-aa69-afd957ff189eNonereadyremote_inference.zipb4593d...d5a86f2023-23-May 15:26:242023-23-May 15:27:13

Upload Orchestration via File Object

Another method to upload the orchestration is as a file object. For that, we will open the zip file as a binary, then upload it using the bytes_buffer parameter to specify the file object, and the file_name to give it a new name.

zipfile = open("./remote_inference/remote_inference.zip", "rb").read()

wl.upload_orchestration(bytes_buffer=zipfile, file_name="inferencetest.zip", name="uploadedbytesdemo")
FieldValue
ID7683f0f9-13fa-4257-840a-8e1cf8b12089
Nameuploadedbytesdemo
File Nameinferencetest.zip
SHAb4593d6084e07e9ad1b57367258ca425d7f290540ab4378b8cba168b91d5a86f
Statuspending_packaging
Created At2023-23-May 15:27:15
Updated At2023-23-May 15:27:15
wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
701c991a-a716-4bca-aa69-afd957ff189eNonereadyremote_inference.zipb4593d...d5a86f2023-23-May 15:26:242023-23-May 15:27:13
7683f0f9-13fa-4257-840a-8e1cf8b12089uploadedbytesdemopending_packaginginferencetest.zipb4593d...d5a86f2023-23-May 15:27:152023-23-May 15:27:15

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggered
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.
Scheduledorchestration.run_scheduled(name, schedule, timeout, json_args)User provides schedule. Task runs exits whenever schedule dictates.

Run Task Once

We’ll do both a Run Once task and generate our Run Once Task from our orchestration.

Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout) method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

# Example: run once

import datetime
task_start = datetime.datetime.now()
task = orchestration.run_once(name="simpletaskdemo", 
                              json_args={"workspace_name": workspace_name, 
                                         "pipeline_name": pipeline_name,
                                         "connection_name": inference_connection_name
                                            })

Task Status

The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks() method. This returns an array list of the following.

ParameterTypeDescription
idstringThe UUID identifier for the task.
last run statusstringThe last reported status the task. Values are:
  • pending: The task has not been started.
  • started: The task has been scheduled to execute.
  • pending_kill: The task kill command has been issued and the task is scheduled to be stopped.
typestringThe type of the task. Values are:
  • Temporary Run: The task runs once then stop.
  • Scheduled Run: The task repeats on a cron like schedule.
created atDateTimeThe date and time the task was started.
updated atDateTimeThe date and time the task was updated.

For this example, the status of the previously created task will be generated, then looped until it has reached status started.

while task.status() != "started":
    display(task.status())
    time.sleep(5)
'pending'

‘pending’

‘pending’

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. We can do that with the task logs() method.

In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.

time.sleep(60)

task_end = datetime.datetime.now()
display(task_end)

pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 23, 15, 28, 30, 718361)

Warning: Pipeline log size limit exceeded. Please request logs using export_logs

timein.tensorout.variablecheck_failures
02023-05-23 15:27:28.001[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-23 15:27:28.001[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-23 15:27:28.001[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-23 15:27:28.001[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-23 15:27:28.001[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
...............
4872023-05-23 15:27:28.001[3.0, 1.5, 1030.0, 8414.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1030.0, 0.0, 47.7654, -122.297, 1750.0, 8414.0, 47.0, 0.0, 0.0][340764.53]0
4882023-05-23 15:27:28.001[4.0, 2.75, 2450.0, 15002.0, 1.0, 0.0, 0.0, 5.0, 9.0, 2450.0, 0.0, 47.4268, -122.343, 2650.0, 15055.0, 40.0, 0.0, 0.0][508746.75]0
4892023-05-23 15:27:28.001[2.0, 1.0, 1010.0, 4000.0, 1.0, 0.0, 0.0, 3.0, 6.0, 1010.0, 0.0, 47.5536, -122.267, 1040.0, 4000.0, 103.0, 0.0, 0.0][435628.56]0
4902023-05-23 15:27:28.001[3.0, 2.5, 1330.0, 1200.0, 3.0, 0.0, 0.0, 3.0, 7.0, 1330.0, 0.0, 47.7034, -122.344, 1330.0, 1206.0, 12.0, 0.0, 0.0][342604.4]0
4912023-05-23 15:27:28.001[2.0, 1.75, 2770.0, 19700.0, 2.0, 0.0, 0.0, 3.0, 8.0, 1780.0, 990.0, 47.7581, -122.365, 2360.0, 9700.0, 31.0, 0.0, 0.0][536371.25]0

492 rows × 4 columns

Scheduled Run Task Example

The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args). This sets up a task to run on an regular schedule as defined by the schedule parameter in the cron service format. For example:

schedule={'42 * * * *'}

Runs on the 42nd minute of every hour.

The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.

schedule={'0 0 12 1-15 2 2024'}

For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill task to keep the task from running any further.

It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.

scheduled_task_start = datetime.datetime.now()

scheduled_task = orchestration.run_scheduled(name="simple_inference_schedule", 
                                             schedule="*/5 * * * *", 
                                             timeout=120, 
                                             json_args={"workspace_name": workspace_name, 
                                                        "pipeline_name": pipeline_name,
                                                        "connection_name": inference_connection_name
                                            })
while scheduled_task.status() != "started":
    display(scheduled_task.status())
    time.sleep(5)
'pending'
#wait 420 seconds to give the scheduled event time to finish
time.sleep(420)
scheduled_task_end = datetime.datetime.now()

pipeline.logs(start_datetime = scheduled_task_start, end_datetime = scheduled_task_end)
Warning: Pipeline log size limit exceeded. Please request logs using export_logs
timein.tensorout.variablecheck_failures
02023-05-23 15:30:08.633[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-23 15:30:08.633[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-23 15:30:08.633[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-23 15:30:08.633[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-23 15:30:08.633[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
...............
4872023-05-23 15:30:08.633[3.0, 1.5, 1030.0, 8414.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1030.0, 0.0, 47.7654, -122.297, 1750.0, 8414.0, 47.0, 0.0, 0.0][340764.53]0
4882023-05-23 15:30:08.633[4.0, 2.75, 2450.0, 15002.0, 1.0, 0.0, 0.0, 5.0, 9.0, 2450.0, 0.0, 47.4268, -122.343, 2650.0, 15055.0, 40.0, 0.0, 0.0][508746.75]0
4892023-05-23 15:30:08.633[2.0, 1.0, 1010.0, 4000.0, 1.0, 0.0, 0.0, 3.0, 6.0, 1010.0, 0.0, 47.5536, -122.267, 1040.0, 4000.0, 103.0, 0.0, 0.0][435628.56]0
4902023-05-23 15:30:08.633[3.0, 2.5, 1330.0, 1200.0, 3.0, 0.0, 0.0, 3.0, 7.0, 1330.0, 0.0, 47.7034, -122.344, 1330.0, 1206.0, 12.0, 0.0, 0.0][342604.4]0
4912023-05-23 15:30:08.633[2.0, 1.75, 2770.0, 19700.0, 2.0, 0.0, 0.0, 3.0, 8.0, 1780.0, 990.0, 47.7581, -122.365, 2360.0, 9700.0, 31.0, 0.0, 0.0][536371.25]0

492 rows × 4 columns

Kill Task

With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.

wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
d0b6a83c-a3a1-41f0-98c9-92422d2544c4simple_inference_schedulesuccessScheduled RunTrue*/5 * * * *2023-23-May 15:28:302023-23-May 15:28:31
74046e01-68ab-42a0-bcd2-3493cbc66576simpletaskdemosuccessTemporary RunTrue-2023-23-May 15:27:152023-23-May 15:27:26
scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
74046e01-68ab-42a0-bcd2-3493cbc66576simpletaskdemosuccessTemporary RunTrue-2023-23-May 15:27:152023-23-May 15:27:26

Cleanup

With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.

pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ..................................... ok
namesimpleorchestrationpipelinedtzw
created2023-05-23 15:26:00.268667+00:00
last_updated2023-05-23 15:26:00.568944+00:00
deployedFalse
tags
versions24f5d5e9-59fe-4440-9e08-78c0003226df, 9a5afba3-c664-4d57-8c08-fc072d3f549c
stepssimpleorchestrationmodeldtzw

4 - Wallaroo ML Workload Orchestration Google BigQuery with House Price Model Tutorial

A tutorial on using the ML Workload Orchestration with BigQuery and the house price model.

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo ML Workload Orchestration House Price Model Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

This tutorial will focus on using Google BigQuery as the data source.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
  2. Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
  3. Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
  4. Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
  5. Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support
  • The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.

Tutorial Resources

  • Models:
    • models/rf_model.onnx: A model that predicts house price values.
  • Data:
    • data/xtest-1.df.json and data/xtest-1k.df.json: DataFrame JSON inference inputs with 1 input and 1,000 inputs.
    • data/xtest-1k.arrow: Apache Arrow inference inputs with 1 input and 1,000 inputs.
    • Sample inference inputs in CSV that can be imported into Google BigQuery.
      • data/xtest-1k.df.json: Random sample housing prices.
      • data/smallinputs.df.json: Sample housing prices that return results lower than $1.5 million.
      • data/biginputs.df.json: Sample housing prices that return results higher than $1.5 million.
    • SQL queries to create the inputs/outputs tables with schema.
      • ./resources/create_inputs_table.sql: Inputs table with schema.
      • ./resources/create_outputs_table.sql: Outputs table with schema.
      • ./resources/housrpricesga_inputs.avro: Avro container of inputs table.

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time
import json

# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes

# used for unique connection names

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0+dfca0605e'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Variable Declaration

The following variables will be used for our big query testing.

We’ll use two connections:

  • bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
  • bigquery_output_connection: The connection that will upload inference results into a BigQuery table.

Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json and ./bigquery_service_account_output_key.json that include the service account key file(SAK) information, as well as the dataset and table used.

FieldIncluded in SAK
type
project_id
private_key_id
private_key
client_email
auth_uri
token_uri
auth_provider_x509_cert_url
client_x509_cert_url
database🚫
table🚫
# Setting variables for later steps

workspace_name = f'bigqueryworkspace{suffix}'
pipeline_name = f'bigquerypipeline{suffix}'
model_name = f'bigquerymodel{suffix}'
model_file_name = './models/rf_model.onnx'

bigquery_connection_input_name = 'bigqueryhouseinputs{suffix}'
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open("./bigquery_service_account_input_key.json"))

bigquery_connection_output_name = 'bigqueryhouseoutputs{suffix}'
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open("./bigquery_service_account_output_key.json"))

Helper Methods

The following helper methods are used to either create or get workspaces, pipelines, and connections.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
namebigquerypipelinechbp
created2023-05-23 15:17:30.123708+00:00
last_updated2023-05-23 15:17:30.123708+00:00
deployed(none)
tags
versionsfefa4278-04e0-4d1a-8d5b-9cc9c5275832
steps
#deploy the pipeline to set the pipeline steps
pipeline.deploy()
Waiting for deployment - this will take up to 45s ............ ok
namebigquerypipelinechbp
created2023-05-23 15:17:30.123708+00:00
last_updated2023-05-23 15:17:30.428928+00:00
deployedTrue
tags
versions3dd0653a-12b0-4298-b91b-1e0b712716c5, fefa4278-04e0-4d1a-8d5b-9cc9c5275832
stepsbigquerymodelchbp

Create Connections

We will create the data source connection via the Wallaroo client command create_connection.

Connections are created with the Wallaroo client command create_connection with the following parameters.

ParameterTypeDescription
namestring (Required)The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error.
typestring (Required)The user defined type of connection.
detailsDict (Required)User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339}, or {'token':'abcde123==', 'host':'example.com', 'port:1234'}, or other user defined combinations.
  • IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same name as an existing data connection will result in an error.
connection_input = wl.create_connection(bigquery_connection_input_name, bigquery_connection_input_type, bigquery_connection_input_argument)
connection_output = wl.create_connection(bigquery_connection_output_name, bigquery_connection_output_type, bigquery_connection_output_argument)
wl.list_connections()
nameconnection typedetailscreated atlinked workspaces
bigqueryhouseinputsBIGQUERY*****2023-05-23T14:35:26.896064+00:00['bigqueryworkspace']
bigqueryhouseoutputsBIGQUERY*****2023-05-23T14:35:26.932685+00:00['bigqueryworkspace']
bigqueryhouseinputs-jcwBIGQUERY*****2023-05-23T14:37:22.103147+00:00['bigqueryworkspace-jcw']
bigqueryhouseoutputs-jcwBIGQUERY*****2023-05-23T14:37:22.141179+00:00['bigqueryworkspace-jcw']
bigqueryhouseinputs{suffix}BIGQUERY*****2023-05-23T15:05:29.850628+00:00['bigqueryworkspacekbcy']
bigqueryhouseoutputs{suffix}BIGQUERY*****2023-05-23T15:05:30.298941+00:00['bigqueryworkspacekbcy']
bigqueryforecastinputsrklrBIGQUERY*****2023-05-23T15:05:58.206726+00:00['bigquerystatsmodelworkspacerklr']
bigqueryforecastoutputsrklrBIGQUERY*****2023-05-23T15:05:58.673528+00:00['bigquerystatsmodelworkspacerklr']

Get Connection by Name

The Wallaroo client method get_connection(name) retrieves the connection that matches the name parameter. We’ll retrieve our connection and store it as inference_source_connection.

big_query_input_connection = wl.get_connection(name=bigquery_connection_input_name)
big_query_output_connection = wl.get_connection(name=bigquery_connection_output_name)
display(big_query_input_connection)
display(big_query_output_connection)
FieldValue
Namebigqueryhouseinputs{suffix}
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:05:29.850628+00:00
Linked Workspaces['bigqueryworkspacekbcy']
FieldValue
Namebigqueryhouseoutputs{suffix}
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:05:30.298941+00:00
Linked Workspaces['bigqueryworkspacekbcy']

Add Connection to Workspace

The method Workspace add_connection(connection_name) adds a Data Connection to a workspace, and takes the following parameters.

ParameterTypeDescription
namestring (Required)The name of the Data Connection

We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.

workspace.add_connection(bigquery_connection_input_name)
workspace.add_connection(bigquery_connection_output_name)

workspace.list_connections()
nameconnection typedetailscreated atlinked workspaces
bigqueryhouseinputs{suffix}BIGQUERY*****2023-05-23T15:05:29.850628+00:00['bigqueryworkspacekbcy', 'bigqueryworkspacechbp']
bigqueryhouseoutputs{suffix}BIGQUERY*****2023-05-23T15:05:30.298941+00:00['bigqueryworkspacekbcy', 'bigqueryworkspacechbp']

Big Query Connection Inference Example

We can test the BigQuery connection with a simple inference to our deployed pipeline. We’ll request the data, format the table into a pandas DataFrame, then submit it for an inference request.

Create Google Credentials

From our BigQuery request, we’ll create the credentials for our BigQuery connection.

bigquery_input_credentials = service_account.Credentials.from_service_account_info(
    big_query_input_connection.details())

bigquery_output_credentials = service_account.Credentials.from_service_account_info(
    big_query_output_connection.details())

Connect to Google BigQuery

We can now generate a client from our connection details, specifying the project that was included in the big_query_connection details.

bigqueryinputclient = bigquery.Client(
    credentials=bigquery_input_credentials, 
    project=big_query_input_connection.details()['project_id']
)
bigqueryoutputclient = bigquery.Client(
    credentials=bigquery_output_credentials, 
    project=big_query_output_connection.details()['project_id']
)

Query Data

Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json. The table is expected to be in the format of the file ./data/xtest-1k.df.json.

inference_dataframe_input = bigqueryinputclient.query(
        f"""
        SELECT tensor
        FROM {big_query_input_connection.details()['dataset']}.{big_query_input_connection.details()['table']}"""
    ).to_dataframe()
inference_dataframe_input.head(5)
tensor
0[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0]
1[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0]
2[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0]
3[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0]
4[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0]

Sample Inference

With our data retrieved, we’ll perform an inference and display the results.

result = pipeline.infer(inference_dataframe_input)
display(result.head(5))
timein.tensorout.variablecheck_failures
02023-05-23 15:17:47.498[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-23 15:17:47.498[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-23 15:17:47.498[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-23 15:17:47.498[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-23 15:17:47.498[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0

Upload the Results

With the query complete, we’ll upload the results back to the BigQuery dataset.

output_table = bigqueryoutputclient.get_table(f"{big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}")

bigqueryoutputclient.insert_rows_from_dataframe(
    output_table, 
    dataframe=result.rename(columns={"in.tensor":"in_tensor", "out.variable":"out_variable"})
)
[[], []]

Wallaroo ML Workload Orchestration Example

With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
Python Library Requirements(Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

For our example, our orchestration will:

  1. Use the bigquery_remote_inference to open a connection to the input and output tables.
  2. Deploy the pipeline.
  3. Perform an inference with the input data.
  4. Save the inference results to the output table.
  5. Undeploy the pipeline.

This sample script is stored in bigquery_remote_inference/main.py with an requirements.txt file having the specific libraries for the Google BigQuery connection., and packaged into the orchestration as ./bigquery_remote_inference/bigquery_remote_inference.zip. We’ll display the steps in uploading the orchestration to the Wallaroo instance.

Note that the orchestration assumes the pipeline is already deployed.

Upload the Orchestration

Orchestrations are uploaded with the Wallaroo client upload_orchestration(path) method with the following parameters.

ParameterTypeDescription
pathstring (Required)The path to the ZIP file to be uploaded.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip will be uploaded and saved to the variable orchestration. Then we will loop until the uploaded orchestration’s status displays ready.

orchestration = wl.upload_orchestration(path="./bigquery_remote_inference/bigquery_remote_inference.zip")

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
78ea6222-c102-4eb1-9705-166233a12257Nonereadybigquery_remote_inference.zip582f33...a6957c2023-23-May 15:17:482023-23-May 15:18:39

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggered
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.
Scheduledorchestration.run_scheduled(name, schedule, timeout, json_args)User provides schedule. Task runs exits whenever schedule dictates.

Run Task Once

We’ll do both a Run Once task and generate our Run Once Task from our orchestration.

Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout) method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

We’ll display the last 5 rows of our BigQuery output table, then start the task that will perform the same inference we did above.

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY time DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
timein_tensorout_variablecheck_failures
02023-05-23 15:17:47.498[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
12023-05-23 15:17:47.498[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
22023-05-23 15:17:47.498[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
32023-05-23 15:17:47.498[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
42023-05-23 15:17:47.498[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
# Example: run once

import datetime
task_start = datetime.datetime.now()

task = orchestration.run_once(name="big query single run", json_args={})
task
FieldValue
ID59b20af0-4f2e-4371-9175-e0b69a94fb91
Namebig query single run
Last Run Statusunknown
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-23-May 15:18:46
Updated At2023-23-May 15:18:46

Task Status

The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks() method. This returns an array list of the following.

ParameterTypeDescription
idstringThe UUID identifier for the task.
last run statusstringThe last reported status the task. Values are:
  • pending: The task has not been started.
  • started: The task has been scheduled to execute.
  • pending_kill: The task kill command has been issued and the task is scheduled to be stopped.
typestringThe type of the task. Values are:
  • Temporary Run: The task runs once then stop.
  • Scheduled Run: The task repeats on a cron like schedule.
schedulestringThe schedule for the task. If a run once task, the schedule will be -.
created atDateTimeThe date and time the task was started.
updated atDateTimeThe date and time the task was updated.

For this example, the status of the previously created task will be generated, then looped until it has reached status started.

while task.status() != "started":
    display(task.status())
    time.sleep(5)
'pending'

‘pending’

wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
59b20af0-4f2e-4371-9175-e0b69a94fb91big query single runfailureTemporary RunTrue-2023-23-May 15:18:462023-23-May 15:18:51

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. We’ll query the last 5 rows of our inference output table after a wait of 60 seconds.

time.sleep(60)

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY time DESC LIMIT 5"""
    ).to_dataframe()

display(task_inference_results)
timein_tensorout_variablecheck_failures
02023-05-23 15:17:47.498[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
12023-05-23 15:17:47.498[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
22023-05-23 15:17:47.498[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
32023-05-23 15:17:47.498[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
42023-05-23 15:17:47.498[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0

Scheduled Run Task Example

The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args). This sets up a task to run on an regular schedule as defined by the schedule parameter in the cron service format. For example:

schedule={'42 * * * *'}

Runs on the 42nd minute of every hour.

The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.

schedule={'0 0 12 1-15 2 2024'}

For our example, we will create a scheduled task to run every 5 minutes, display the inference results, then use the Orchestration kill task to keep the task from running any further.

It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY time DESC LIMIT 5"""
    ).to_dataframe()

display(task_inference_results.tail(5))

scheduled_task = orchestration.run_scheduled(name="simple_inference_schedule", schedule="*/5 * * * *", timeout=120, json_args={})
timein_tensorout_variablecheck_failures
02023-05-23 15:17:47.498[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
12023-05-23 15:17:47.498[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
22023-05-23 15:17:47.498[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
32023-05-23 15:17:47.498[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
42023-05-23 15:17:47.498[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
while scheduled_task.status() != "started":
    display(scheduled_task.status())
    time.sleep(5)
'pending'
#wait 420 seconds to give the scheduled event time to finish
time.sleep(420)
task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY time DESC LIMIT 5"""
    ).to_dataframe()

display(task_inference_results.tail(5))
timein_tensorout_variablecheck_failures
02023-05-23 15:17:47.498[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
12023-05-23 15:17:47.498[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
22023-05-23 15:17:47.498[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
32023-05-23 15:17:47.498[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
42023-05-23 15:17:47.498[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0

Kill Task

With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.

scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>

Cleanup

With the tutorial complete, we can undeploy the pipeline and return the resources back to the Wallaroo instance.

pipeline.undeploy()
Waiting for undeployment - this will take up to 45s ..................................... ok
namebigquerypipelinechbp
created2023-05-23 15:17:30.123708+00:00
last_updated2023-05-23 15:17:30.428928+00:00
deployedFalse
tags
versions3dd0653a-12b0-4298-b91b-1e0b712716c5, fefa4278-04e0-4d1a-8d5b-9cc9c5275832
stepsbigquerymodelchbp

5 - Wallaroo ML Workload Orchestration Google BigQuery with Statsmodel Forecast Tutorial

A tutorial on using the ML Workload Orchestration with BigQuery and the Statsmodel Forecast.

This can be downloaded as part of the Wallaroo Tutorials repository.

Wallaroo ML Workload Orchestrations and BigQuery Connections with Statsmodel Tutorial

This tutorial provides a quick set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration. For full details, see the Wallaroo Documentation site.

Wallaroo provides Data Connections and ML Workload Orchestrations to provide organizations with a method of creating and managing automated tasks that can either be run on demand or a regular schedule.

Definitions

  • Orchestration: A set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance as a .zip file.
  • Task: An implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
  • Connection: Definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

This tutorial will focus on using Google BigQuery as the data source for supplying the inference data to perform inferences through a Statsmodel ML model.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a Wallaroo connection to retrieving information from a Google BigQuery source table.
  2. Create a Wallaroo connection to store inference results into a Google BigQuery destination table.
  3. Upload Wallaroo ML Workload Orchestration that supports BigQuery connections with the connection details.
  4. Run the orchestration once as a Run Once Task and verify that the inference request succeeded and the inference results were saved to the external data store.
  5. Schedule the orchestration as a Scheduled Task and verify that the inference request succeeded and the inference results were saved to the external data store.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed. These are included by default in a Wallaroo instance’s JupyterHub service.
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support
  • The following Python libraries. These are not included in a Wallaroo instance’s JupyterHub service.

Tutorial Resources

  • Models:
    • models/bike_day_model.pkl: The statsmodel model predicts how many bikes will be rented on each of the next 7 days, based on the previous 7 days’ bike rentals, temperature, and wind speed. This model only accepts shapes (7,4) - 7 rows (representing the last 7 days) and 4 columns (representing the fields temp, holiday, workingday, windspeed). Additional files to support this example are:
    • infer.py: The inference script that is part of the statsmodel.
  • Data:
    • data/day.csv: Data used to train the sample statsmodel example.
    • data/bike_day_eval.json: Data used for inferences. This will be transferred to a BigQuery table as shown in the demonstration.
  • Resources:
    • resources/bigquery_service_account_input_key.json: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference data.
    • resources/bigquery_service_account_output_key.json: Example service key to authenticate to a Google BigQuery project with the dataset and table used for the inference result data.
    • resources/statsmodel_forecast_inputs.sql: SQL script to create the inputs table schema, populated with the values from ./data/day.csv.
    • resources/statsmodel_forecast_outputs.sql: SQL script to create the outputs table schema.

Initial Steps

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

Load Libraries

Here we’ll import the various libraries we’ll use for the tutorial.

import wallaroo
from wallaroo.object import EntityNotFoundError, RequiredAttributeMissing

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import time
import json

# for Big Query connections
from google.cloud import bigquery
from google.oauth2 import service_account
import db_dtypes

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
wallaroo.__version__
'2023.2.0+dfca0605e'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Variable Declaration

The following variables will be used for our big query testing.

We’ll use two connections:

  • bigquery_input_connection: The connection that will draw inference input data from a BigQuery table.
  • bigquery_output_connection: The connection that will upload inference results into a BigQuery table.

Not that for the connection arguments, we’ll retrieve the information from the files ./bigquery_service_account_input_key.json and ./bigquery_service_account_output_key.json that include the service account key file(SAK) information, as well as the dataset and table used.

FieldIncluded in SAK
type
project_id
private_key_id
private_key
client_email
auth_uri
token_uri
auth_provider_x509_cert_url
client_x509_cert_url
database🚫
table🚫
# Setting variables for later steps

workspace_name = f'bigquerystatsmodelworkspace{suffix}'
pipeline_name = f'bigquerystatsmodelpipeline{suffix}'
model_name = f'bigquerystatsmodelmodel{suffix}'
model_file_name = './models/bike_day_model.pkl'

bigquery_connection_input_name = f'bigqueryforecastinputs{suffix}'
bigquery_connection_input_type = "BIGQUERY"
bigquery_connection_input_argument = json.load(open('./resources/bigquery_service_account_input_key.json'))

bigquery_connection_output_name = f'bigqueryforecastoutputs{suffix}'
bigquery_connection_output_type = "BIGQUERY"
bigquery_connection_output_argument = json.load(open('./resources/bigquery_service_account_output_key.json'))

Helper Methods

The following helper methods are used to either create or get workspaces, pipelines, and connections.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

bike_day_model = (wl.upload_model(model_name, 
                                  model_file_name, 
                                  framework=wallaroo.framework.Framework.ONNX)
                                  .configure(runtime="python", tensor_fields=["tensor"])
                )

# Add the model as a pipeline step

pipeline.add_model_step(bike_day_model)
namebigquerystatsmodelpipelinegztp
created2023-05-23 15:19:43.097185+00:00
last_updated2023-05-23 15:19:43.097185+00:00
deployed(none)
tags
versionsfa33af33-3cf3-43c9-8e2a-4f0b549d84bf
steps
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ............. ok
namebigquerystatsmodelpipelinegztp
created2023-05-23 15:19:43.097185+00:00
last_updated2023-05-23 15:19:43.390612+00:00
deployedTrue
tags
versionsee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf
stepsbigquerystatsmodelmodelgztp

Sample Inferences

We’ll perform some quick sample inferences with the local file data to verity the pipeline deployed and is ready for inferences. Once done, we’ll undeploy the pipeline.

## perform inferences

results = pipeline.infer_from_file('./data/bike_day_eval.json', data_format="custom-json")
print(results)
[{'forecast': [1882.3784555157672, 2130.607915701861, 2340.84005381799, 2895.754978552066, 2163.657515565616, 1509.1792126509536, 2431.1838923957016]}]

Create Connections

We will create the data source connection via the Wallaroo client command create_connection.

Connections are created with the Wallaroo client command create_connection with the following parameters.

ParameterTypeDescription
namestring (Required)The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error.
typestring (Required)The user defined type of connection.
detailsDict (Required)User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339}, or {'token':'abcde123==', 'host':'example.com', 'port:1234'}, or other user defined combinations.
  • IMPORTANT NOTE: Data connections names must be unique. Attempting to create a data connection with the same name as an existing data connection will result in an error.

See the statsmodel_forecast_inputs and statsmodel_forecast_outputs details listed above for the table schema used for our example.

connection_input = wl.create_connection(bigquery_connection_input_name, bigquery_connection_input_type, bigquery_connection_input_argument)
connection_output = wl.create_connection(bigquery_connection_output_name, bigquery_connection_output_type, bigquery_connection_output_argument)

display(connection_input)
display(connection_output)
FieldValue
Namebigqueryforecastinputsgztp
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:19:58.159691+00:00
Linked Workspaces[]
FieldValue
Namebigqueryforecastoutputsgztp
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:19:58.195628+00:00
Linked Workspaces[]

Get Connection by Name

The Wallaroo client method get_connection(name) retrieves the connection that matches the name parameter. We’ll retrieve our connection and store it as inference_source_connection.

big_query_input_connection = wl.get_connection(name=bigquery_connection_input_name)
big_query_output_connection = wl.get_connection(name=bigquery_connection_output_name)
display(big_query_input_connection)
display(big_query_output_connection)
FieldValue
Namebigqueryforecastinputsgztp
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:19:58.159691+00:00
Linked Workspaces[]
FieldValue
Namebigqueryforecastoutputsgztp
Connection TypeBIGQUERY
Details*****
Created At2023-05-23T15:19:58.195628+00:00
Linked Workspaces[]

Add Connection to Workspace

The method Workspace add_connection(connection_name) adds a Data Connection to a workspace, and takes the following parameters.

ParameterTypeDescription
namestring (Required)The name of the Data Connection

We’ll add both connections to our sample workspace, then list the connections available to the workspace to confirm.

workspace.add_connection(bigquery_connection_input_name)
workspace.add_connection(bigquery_connection_output_name)

workspace.list_connections()
nameconnection typedetailscreated atlinked workspaces
bigqueryforecastinputsgztpBIGQUERY*****2023-05-23T15:19:58.159691+00:00['bigquerystatsmodelworkspacegztp']
bigqueryforecastoutputsgztpBIGQUERY*****2023-05-23T15:19:58.195628+00:00['bigquerystatsmodelworkspacegztp']

Big Query Connection Inference Example

We can test the BigQuery connection with a simple inference to our deployed pipeline. We’ll request the data, format the table into a pandas DataFrame, then submit it for an inference request.

Create Google Credentials

From our BigQuery request, we’ll create the credentials for our BigQuery connection.

bigquery_input_credentials = service_account.Credentials.from_service_account_info(
    big_query_input_connection.details())

bigquery_output_credentials = service_account.Credentials.from_service_account_info(
    big_query_output_connection.details())

Connect to Google BigQuery

We can now generate a client from our connection details, specifying the project that was included in the big_query_connection details.

bigqueryinputclient = bigquery.Client(
    credentials=bigquery_input_credentials, 
    project=big_query_input_connection.details()['project_id']
)
bigqueryoutputclient = bigquery.Client(
    credentials=bigquery_output_credentials, 
    project=big_query_output_connection.details()['project_id']
)

Query Data

Now we’ll create our query and retrieve information from out dataset and table as defined in the file bigquery_service_account_key.json.

We’ll grab the last 7 days of data - with every record assumed to be one day - and use that for our inference request.

inference_dataframe_input = bigqueryinputclient.query(
        f"""
        (select dteday, temp, holiday, workingday, windspeed
        FROM {big_query_input_connection.details()['dataset']}.{big_query_input_connection.details()['table']}
        ORDER BY dteday DESC LIMIT 7)
        ORDER BY dteday
        """
    ).to_dataframe().drop(columns=['dteday'])
# convert to a dict, show the first 7 rows
display(inference_dataframe_input.to_dict())
{'temp': {0: 0.291304,
  1: 0.243333,
  2: 0.254167,
  3: 0.253333,
  4: 0.253333,
  5: 0.255833,
  6: 0.215833},
 'holiday': {0: 1, 1: 0, 2: 0, 3: 0, 4: 0, 5: 0, 6: 0},
 'workingday': {0: 0, 1: 1, 2: 1, 3: 1, 4: 0, 5: 0, 6: 1},
 'windspeed': {0: 0.168726,
  1: 0.316546,
  2: 0.350133,
  3: 0.155471,
  4: 0.124383,
  5: 0.350754,
  6: 0.154846}}

Sample Inference

With our data retrieved, we’ll perform an inference and display the results.

#deploy the pipeline
pipeline.deploy()
 ok
namebigquerystatsmodelpipelinegztp
created2023-05-23 15:19:43.097185+00:00
last_updated2023-05-23 15:20:00.500498+00:00
deployedTrue
tags
versionsf0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf
stepsbigquerystatsmodelmodelgztp
results = pipeline.infer(inference_dataframe_input.to_dict())
display(results[0]['forecast'])
[1231.2556997246595,
 1627.3643469089343,
 1674.3769827243134,
 1621.9273295873882,
 1140.7465817903185,
 1211.5223974364667,
 1457.1896450382922]

Upload the Results

With the query complete, we’ll upload the results back to the BigQuery dataset.

output_table = bigqueryoutputclient.get_table(f"{big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}")

job = bigqueryoutputclient.query(
        f"""
        INSERT {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        VALUES
        (current_timestamp(), "{results[0]['forecast']}")
        """
    )
# Get the last insert to the output table to verify
# wait 10 seconds for the insert to finish
time.sleep(10)
task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]

Wallaroo ML Workload Orchestration Example

With the pipeline deployed and our connections set, we will now generate our ML Workload Orchestration. See the Wallaroo ML Workload Orchestrations guide for full details.

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

ParameterTypeDescription
User Code(Required) Python script as .py filesIf main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
Python Library Requirements(Optional) requirements.txt file in the requirements file format. A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be included in the requirements.txt. Multiple requirements.txt files are not allowed.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

For our example, our orchestration will:

  1. Use the bigquery_remote_inference to open a connection to the input and output tables.
  2. Deploy the pipeline.
  3. Perform an inference with the input data.
  4. Save the inference results to the output table.
  5. Undeploy the pipeline.

This sample script is stored in bigquery_statsmodel_remote_inference/main.py with an requirements.txt file having the specific libraries for the Google BigQuery connection., and packaged into the orchestration as ./bigquery_statsmodel_remote_inference/bigquery_statsmodel_remote_inference.zip. We’ll display the steps in uploading the orchestration to the Wallaroo instance.

Note that the orchestration assumes the pipeline is already deployed.

Upload the Orchestration

Orchestrations are uploaded with the Wallaroo client upload_orchestration(path) method with the following parameters.

ParameterTypeDescription
pathstring (Required)The path to the ZIP file to be uploaded.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

For this example, the orchestration ./bigquery_remote_inference/bigquery_remote_inference.zip will be uploaded and saved to the variable orchestration. Then we will loop until the orchestration status is ready.

pipeline.deploy()
 ok
namebigquerystatsmodelpipelinegztp
created2023-05-23 15:19:43.097185+00:00
last_updated2023-05-23 15:20:13.581488+00:00
deployedTrue
tags
versions2af13cbf-2dae-4dc9-85fa-ae06c04b6a54, f0c22a0a-7e6a-4d49-91ba-e93daf575e6b, ee07358d-b6d5-46f3-a5bc-f98baae7ddca, fa33af33-3cf3-43c9-8e2a-4f0b549d84bf
stepsbigquerystatsmodelmodelgztp
orchestration = wl.upload_orchestration(path="./bigquery_remote_inference/bigquery_remote_inference.zip")

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
2186543f-f7f6-46b2-8334-63d73ddb0204Nonereadybigquery_remote_inference.zip66945c...e282592023-23-May 15:20:132023-23-May 15:21:06

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggered
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.
Scheduledorchestration.run_scheduled(name, schedule, timeout, json_args)User provides schedule. Task runs exits whenever schedule dictates.

Run Task Once

We’ll do both a Run Once task and generate our Run Once Task from our orchestration.

Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout) method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

We’ll display the last 5 rows of our BigQuery output table, then start the task that will perform the same inference we did above.

# Get the last insert to the output table to verify

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
# Example: run once
task = orchestration.run_once(name="big query statsmodel run once", json_args={})
task
FieldValue
IDedf924ae-5a17-4596-adbe-b888b7102d33
Namebig query statsmodel run once
Last Run Statusunknown
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-23-May 15:21:11
Updated At2023-23-May 15:21:11

Task Status

The list of tasks in the Wallaroo instance is retrieves through the Wallaroo Client list_tasks() method. This returns an array list of the following.

ParameterTypeDescription
idstringThe UUID identifier for the task.
last run statusstringThe last reported status the task. Values are:
  • pending: The task has not been started.
  • started: The task has been scheduled to execute.
  • pending_kill: The task kill command has been issued and the task is scheduled to be stopped.
typestringThe type of the task. Values are:
  • Temporary Run: The task runs once then stop.
  • Scheduled Run: The task repeats on a cron like schedule.
created atDateTimeThe date and time the task was started.
updated atDateTimeThe date and time the task was updated.

For this example, the status of the previously created task will be generated, then looped until it has reached status started.

while task.status() != "started":
    display(task.status())
    time.sleep(5)
'pending'

‘pending’

wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
edf924ae-5a17-4596-adbe-b888b7102d33big query statsmodel run oncefailureTemporary RunTrue-2023-23-May 15:21:112023-23-May 15:21:16

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. We’ll query the last 5 rows of our inference output table after a wait of 60 seconds.

time.sleep(30)

# Get the last insert to the output table to verify

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]

Scheduled Run Task Example

The other method of using tasks is as a scheduled run through the Orchestration run_scheduled(name, schedule, timeout, json_args). This sets up a task to run on an regular schedule as defined by the schedule parameter in the cron service format. For example:

schedule={'42 * * * *'}

Runs on the 42nd minute of every hour.

The following schedule runs every day at 12 noon from February 1 to February 15 2024 - and then ends.

schedule={'0 0 12 1-15 2 2024'}

For our example, we will create a scheduled task to run every 1 minute, display the inference results, then use the Orchestration kill task to keep the task from running any further.

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)

scheduled_task = orchestration.run_scheduled(name="simple_statsmodel_inference_schedule", schedule="*/1 * * * *", timeout=120, json_args={})
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
while scheduled_task.status() != "started":
    display(scheduled_task.status())
    time.sleep(5)
'pending'
#wait 120 seconds to give the scheduled event time to finish
time.sleep(60)
task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]

Kill Task

With our testing complete, we will kill the scheduled task so it will not run again. First we’ll show all the tasks to verify that our task is there, then issue it the kill command.

scheduled_task.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>

Running Task with Custom Parameters

Right now, our task assumes the workspace, pipeline, and connections all have the names we defined above. For this example, we’ll set up a new pipeline with the same pipeline step, but name it bigquerystatsmodelpipeline02.

When we create our task, we’ll add that pipeline name as an argument to our task. Within our orchestrations main.py there is a code block that takes in the task arguments, then sets the pipeline name:

arguments = wl.task_args()
if "pipeline_name" in arguments:
        pipeline_name = arguments['pipeline_name']
    else:
        pipeline_name="bigquerystatsmodelpipeline"

We’ll pass along our new pipeline name as { "pipeline_name": "bigquerystatsmodelpipeline02" } and track the task progress as before.

newpipeline_name = 'bigquerystatsmodelpipeline02'

pipeline02 = get_pipeline(newpipeline_name)
# add the model as the pipeline step
pipeline02.add_model_step(bike_day_model)
namebigquerystatsmodelpipeline02
created2023-05-23 15:23:02.136077+00:00
last_updated2023-05-23 15:23:02.136077+00:00
deployed(none)
tags
versionsb2c56497-7a50-434a-8973-6d91af80f143
steps
# required to set the pipeline steps
pipeline02.deploy()
Waiting for deployment - this will take up to 45s ........ ok
namebigquerystatsmodelpipeline02
created2023-05-23 15:23:02.136077+00:00
last_updated2023-05-23 15:24:48.381545+00:00
deployedTrue
tags
versions9dc1f8c5-e9ab-48e7-9b54-290fed5bb28c, 1b44f058-9ab6-44dd-8de6-c52acfdf4fc5, b2c56497-7a50-434a-8973-6d91af80f143
stepsbigquerystatsmodelmodelgztp
# Get the last insert to the output table to verify

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)

# Generate the run once task with the new parameter
task = orchestration.run_once(name="parameter sample", json_args={ "pipeline_name": newpipeline_name })
display(task)

# wait for the task to run
while task.status() != "started":
    display(task.status())
    time.sleep(5)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
FieldValue
ID2fb3cba9-d515-44da-bd9e-59661b4372ef
Nameparameter sample
Last Run Statusunknown
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-23-May 15:24:59
Updated At2023-23-May 15:24:59
'pending'

‘pending’

# wait 30 seconds then display the results
time.sleep(30)

task_inference_results = bigqueryoutputclient.query(
        f"""
        SELECT *
        FROM {big_query_output_connection.details()['dataset']}.{big_query_output_connection.details()['table']}
        ORDER BY date DESC
        LIMIT 5
        """
    ).to_dataframe()

display(task_inference_results)
dateforecast
02023-05-23 15:20:01.622083+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
12023-05-23 15:06:06.678170+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
22023-05-23 14:57:00.200596+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
32023-05-19 22:39:01.932839+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]
42023-05-19 22:33:27.655703+00:00[1231.2556997246595, 1627.3643469089343, 1674.3769827243134, 1621.9273295873882, 1140.7465817903185, 1211.5223974364667, 1457.1896450382922]

Conclusion

With that, our tutorial is over. Please feel free to use this tutorial code in your own Wallaroo related projects. Our last task will be to undeploy our pipelines to restore the resources back to the Wallaroo instance.

pipeline.undeploy()
pipeline02.undeploy()
 ok
Waiting for undeployment - this will take up to 45s ..................................... ok
namebigquerystatsmodelpipeline02
created2023-05-23 15:23:02.136077+00:00
last_updated2023-05-23 15:24:48.381545+00:00
deployedFalse
tags
versions9dc1f8c5-e9ab-48e7-9b54-290fed5bb28c, 1b44f058-9ab6-44dd-8de6-c52acfdf4fc5, b2c56497-7a50-434a-8973-6d91af80f143
stepsbigquerystatsmodelmodelgztp

6 - Wallaroo ML Workload Orchestration Comprehensive Tutorial

A tutorial on using the ML Workload Orchestration for more examples of Wallaroo connections and ML Workload Orchestrations.

This can be downloaded as part of the Wallaroo Tutorials repository.

ML Workload Orchestration Comprehensive Tutorial

This tutorial provides a complete set of methods and examples regarding Wallaroo Connections and Wallaroo ML Workload Orchestration.

Wallaroo provides data connections, orchestrations, and tasks to provide organizations with a method of creating and managing automated tasks that can either be run on demand, on a regular schedule, or as a service so they respond to requests.

ObjectDescription
OrchestrationA set of instructions written as a python script with a requirements library. Orchestrations are uploaded to the Wallaroo instance
TaskAn implementation of an orchestration. Tasks are run either once when requested, on a repeating schedule, or as a service.
ConnectionDefinitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source. Usually paired with orchestrations.

A typical flow in the orchestration, task and connection life cycle is:

  1. (Optional) A connection is defined with information such as username, connection URL, tokens, etc.
  2. One or more connections are applied to a workspace for users to implement in their code or orchestrations.
  3. An orchestration is created to perform some set instructions. For example:
    1. Deploy a pipeline, request data from an external service, store the results in an external database, then undeploy the pipeline.
    2. Download a ML Model then replace a current pipeline step with the new version.
    3. Collect log files from a deployed pipeline once every hour and submit it to a Kafka or other service.
  4. A task is created that specifies the orchestration to perform and the schedule:
    1. Run once.
    2. Run on a schedule (based on cron like settings).
    3. Run as a service to be run whenever requested.
  5. Once the use for a task is complete, it is killed and its schedule or service removed.

Tutorial Goals

The tutorial will demonstrate the following:

  1. Create a simple connection to retrieve an Apache Arrow table file from a GitHub registry.
  2. Create an orchestration that retrieves the Apache Arrow table file from the location defined by the connection, deploy a pipeline, perform an inference, then undeploys the pipeline.
  3. Implement the orchestration as a task that runs every minute.
  4. Display the logs from the pipeline after 5 minutes to verify the task is running.

Tutorial Required Libraries

The following libraries are required for this tutorial, and included by default in a Wallaroo instance’s JupyterHub service.

  • IMPORTANT NOTE: These libraries are already installed in the Wallaroo JupyterHub service. Do not uninstall and reinstall the Wallaroo SDK with the command below.

  • wallaroo: The Wallaroo SDK.

  • pandas: The pandas data analysis library.

  • pyarrow: The Apache Arrow Python library.

The specific versions used are set in the file ./resources/requirements.txt. Supported libraries are automatically installed with the pypi or conda commands. For example, from the root of this tutorials folder:

pip install -r ./resources/requirements.txt

Initialization

The first step is to connect to a Wallaroo instance. We’ll load the libraries and set our client connection settings

Workspace, Model and Pipeline Setup

For this tutorial, we’ll create a workspace, upload our sample model and deploy a pipeline. We’ll perform some quick sample inferences to verify that everything it working.

import wallaroo
from wallaroo.object import EntityNotFoundError

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa

import requests

# Used to create unique workspace and pipeline names
import string
import random

# make a random 4 character suffix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)
'tgiq'

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()
# Setting variables for later steps

workspace_name = f'orchestrationworkspace{suffix}'
pipeline_name = f'orchestrationpipeline{suffix}'
model_name = f'orchestrationmodel{suffix}'
model_file_name = './models/rf_model.onnx'
connection_name = f'houseprice_arrow_table{suffix}'

Helper Methods

The following helper methods are used to either create or get workspaces and pipelines.

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

Create the Workspace and Pipeline

We’ll now create our workspace and pipeline for the tutorial. If this tutorial has been run previously, then this will retrieve the existing ones with the assumption they’re for us with this tutorial.

We’ll set the retrieved workspace as the current workspace in the SDK, so all commands will default to that workspace.

workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

pipeline = get_pipeline(pipeline_name)

Upload the Model and Deploy Pipeline

We’ll upload our model into our sample workspace, then add it as a pipeline step before deploying the pipeline to it’s ready to accept inference requests.

# Upload the model

housing_model_control = (wl.upload_model(model_name, 
                                         model_file_name, 
                                         framework=wallaroo.framework.Framework.ONNX)
                                         .configure(tensor_fields=["tensor"])
                        )

# Add the model as a pipeline step

pipeline.add_model_step(housing_model_control)
nameorchestrationpipelinetgiq
created2023-05-22 19:54:06.933674+00:00
last_updated2023-05-22 19:54:06.933674+00:00
deployed(none)
tags
versionsed5bf4b1-1d5d-4ff9-8a23-2c1e44a8e672
steps
#deploy the pipeline
pipeline.deploy()
Waiting for deployment - this will take up to 45s ........................ ok
nameorchestrationpipelinetgiq
created2023-05-22 19:54:06.933674+00:00
last_updated2023-05-22 19:54:08.008312+00:00
deployedTrue
tags
versionsa7336408-20ef-4b65-8167-c2f80c968a21, ed5bf4b1-1d5d-4ff9-8a23-2c1e44a8e672
stepsorchestrationmodeltgiq

Sample Inferences

We’ll perform some quick sample inferences using an Apache Arrow table as the input. Once that’s finished, we’ll undeploy the pipeline and return the resources back to the Wallaroo instance.

# sample inferences

batch_inferences = pipeline.infer_from_file('./data/xtest-1k.arrow')

large_inference_result =  batch_inferences.to_pandas()
display(large_inference_result.head(20))
timein.tensorout.variablecheck_failures
02023-05-22 19:54:33.671[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-22 19:54:33.671[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-22 19:54:33.671[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-22 19:54:33.671[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-22 19:54:33.671[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
52023-05-22 19:54:33.671[3.0, 2.0, 2140.0, 4923.0, 1.0, 0.0, 0.0, 4.0, 8.0, 1070.0, 1070.0, 47.6902, -122.339, 1470.0, 4923.0, 86.0, 0.0, 0.0][668288.0]0
62023-05-22 19:54:33.671[4.0, 3.5, 3590.0, 5334.0, 2.0, 0.0, 2.0, 3.0, 9.0, 3140.0, 450.0, 47.6763, -122.267, 2100.0, 6250.0, 9.0, 0.0, 0.0][1004846.5]0
72023-05-22 19:54:33.671[3.0, 2.0, 1280.0, 960.0, 2.0, 0.0, 0.0, 3.0, 9.0, 1040.0, 240.0, 47.602, -122.311, 1280.0, 1173.0, 0.0, 0.0, 0.0][684577.2]0
82023-05-22 19:54:33.671[4.0, 2.5, 2820.0, 15000.0, 2.0, 0.0, 0.0, 4.0, 9.0, 2820.0, 0.0, 47.7255, -122.101, 2440.0, 15000.0, 29.0, 0.0, 0.0][727898.1]0
92023-05-22 19:54:33.671[3.0, 2.25, 1790.0, 11393.0, 1.0, 0.0, 0.0, 3.0, 8.0, 1790.0, 0.0, 47.6297, -122.099, 2290.0, 11894.0, 36.0, 0.0, 0.0][559631.1]0
102023-05-22 19:54:33.671[3.0, 1.5, 1010.0, 7683.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1010.0, 0.0, 47.72, -122.318, 1550.0, 7271.0, 61.0, 0.0, 0.0][340764.53]0
112023-05-22 19:54:33.671[3.0, 2.0, 1270.0, 1323.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1270.0, 0.0, 47.6934, -122.342, 1330.0, 1323.0, 8.0, 0.0, 0.0][442168.06]0
122023-05-22 19:54:33.671[4.0, 1.75, 2070.0, 9120.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1250.0, 820.0, 47.6045, -122.123, 1650.0, 8400.0, 57.0, 0.0, 0.0][630865.6]0
132023-05-22 19:54:33.671[4.0, 1.0, 1620.0, 4080.0, 1.5, 0.0, 0.0, 3.0, 7.0, 1620.0, 0.0, 47.6696, -122.324, 1760.0, 4080.0, 91.0, 0.0, 0.0][559631.1]0
142023-05-22 19:54:33.671[4.0, 3.25, 3990.0, 9786.0, 2.0, 0.0, 0.0, 3.0, 9.0, 3990.0, 0.0, 47.6784, -122.026, 3920.0, 8200.0, 10.0, 0.0, 0.0][909441.1]0
152023-05-22 19:54:33.671[4.0, 2.0, 1780.0, 19843.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1780.0, 0.0, 47.4414, -122.154, 2210.0, 13500.0, 52.0, 0.0, 0.0][313096.0]0
162023-05-22 19:54:33.671[4.0, 2.5, 2130.0, 6003.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2130.0, 0.0, 47.4518, -122.12, 1940.0, 4529.0, 11.0, 0.0, 0.0][404040.8]0
172023-05-22 19:54:33.671[3.0, 1.75, 1660.0, 10440.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1040.0, 620.0, 47.4448, -121.77, 1240.0, 10380.0, 36.0, 0.0, 0.0][292859.5]0
182023-05-22 19:54:33.671[3.0, 2.5, 2110.0, 4118.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2110.0, 0.0, 47.3878, -122.153, 2110.0, 4044.0, 25.0, 0.0, 0.0][338357.88]0
192023-05-22 19:54:33.671[4.0, 2.25, 2200.0, 11250.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1300.0, 900.0, 47.6845, -122.201, 2320.0, 10814.0, 94.0, 0.0, 0.0][682284.6]0

Create Wallaroo Connection

Connections are created at the Wallaroo instance level, typically by a MLOps or DevOps engineer, then applied to a workspace.

For this section:

  1. We will create a sample connection that just has a URL to the same Arrow table file we used in the previous step.
  2. We’ll apply the data connection to the workspace above.
  3. For a quick demonstration, we’ll use the connection to retrieve the Arrow table file and use it for a quick sample inference.

Create Connection

Connections are created with the Wallaroo client command create_connection with the following parameters.

ParameterTypeDescription
namestring (Required)The name of the connection. This must be unique - if submitting the name of an existing connection it will return an error.
typestring (Required)The user defined type of connection.
detailsDict (Requires)User defined configuration details for the data connection. These can be {'username':'dataperson', 'password':'datapassword', 'port': 3339}, or {'token':'abcde123==', 'host':'example.com', 'port:1234'}, or other user defined combinations.

We’ll create the connection named houseprice_arrow_table, set it to the type HTTPFILE, and provide the details as 'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow' - the location for our sample Arrow table inference input.

wl.create_connection(connection_name, 
                  "HTTPFILE", 
                  {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
                  )
FieldValue
Namehouseprice_arrow_tabletgiq
Connection TypeHTTPFILE
Details*****
Created At2023-05-22T19:54:33.723860+00:00
Linked Workspaces[]

List Data Connections

The Wallaroo Client list_connections() method lists all connections for the Wallaroo instance.

wl.list_connections()
nameconnection typedetailscreated atlinked workspaces
houseprice_arrow_tabletgiqHTTPFILE*****2023-05-22T19:54:33.723860+00:00[]

Add Connection to Workspace

The method Workspace add_connection(connection_name) adds a Data Connection to a workspace, and takes the following parameters.

ParameterTypeDescription
namestring (Required)The name of the Data Connection

We’ll add this connection to our sample workspace.

workspace.add_connection(connection_name)

Get Connection

Connections are retrieved by the Wallaroo Client get_connection(name) method.

connection = wl.get_connection(connection_name)

Connection Details

The Connection method details() retrieves a the connection details() as a dict.

display(connection.details())
{'host': 'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}

Using a Connection Example

For this example, the connection will be used to retrieve the Apache Arrow file referenced in the connection, and use that to turn it into an Apache Arrow table, then use that for a sample inference.

# Deploy the pipeline 
pipeline.deploy()

# Retrieve the file
# set accept as apache arrow table
headers = {
    'Accept': 'application/vnd.apache.arrow.file'
}

response = requests.get(
                    connection.details()['host'], 
                    headers=headers
                )

# Arrow table is retrieved 
with pa.ipc.open_file(response.content) as reader:
    arrow_table = reader.read_all()

results = pipeline.infer(arrow_table)

result_table = results.to_pandas()
display(result_table.head(20))
 ok
timein.tensorout.variablecheck_failures
02023-05-22 19:54:34.320[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-22 19:54:34.320[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-22 19:54:34.320[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-22 19:54:34.320[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-22 19:54:34.320[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
52023-05-22 19:54:34.320[3.0, 2.0, 2140.0, 4923.0, 1.0, 0.0, 0.0, 4.0, 8.0, 1070.0, 1070.0, 47.6902, -122.339, 1470.0, 4923.0, 86.0, 0.0, 0.0][668288.0]0
62023-05-22 19:54:34.320[4.0, 3.5, 3590.0, 5334.0, 2.0, 0.0, 2.0, 3.0, 9.0, 3140.0, 450.0, 47.6763, -122.267, 2100.0, 6250.0, 9.0, 0.0, 0.0][1004846.5]0
72023-05-22 19:54:34.320[3.0, 2.0, 1280.0, 960.0, 2.0, 0.0, 0.0, 3.0, 9.0, 1040.0, 240.0, 47.602, -122.311, 1280.0, 1173.0, 0.0, 0.0, 0.0][684577.2]0
82023-05-22 19:54:34.320[4.0, 2.5, 2820.0, 15000.0, 2.0, 0.0, 0.0, 4.0, 9.0, 2820.0, 0.0, 47.7255, -122.101, 2440.0, 15000.0, 29.0, 0.0, 0.0][727898.1]0
92023-05-22 19:54:34.320[3.0, 2.25, 1790.0, 11393.0, 1.0, 0.0, 0.0, 3.0, 8.0, 1790.0, 0.0, 47.6297, -122.099, 2290.0, 11894.0, 36.0, 0.0, 0.0][559631.1]0
102023-05-22 19:54:34.320[3.0, 1.5, 1010.0, 7683.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1010.0, 0.0, 47.72, -122.318, 1550.0, 7271.0, 61.0, 0.0, 0.0][340764.53]0
112023-05-22 19:54:34.320[3.0, 2.0, 1270.0, 1323.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1270.0, 0.0, 47.6934, -122.342, 1330.0, 1323.0, 8.0, 0.0, 0.0][442168.06]0
122023-05-22 19:54:34.320[4.0, 1.75, 2070.0, 9120.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1250.0, 820.0, 47.6045, -122.123, 1650.0, 8400.0, 57.0, 0.0, 0.0][630865.6]0
132023-05-22 19:54:34.320[4.0, 1.0, 1620.0, 4080.0, 1.5, 0.0, 0.0, 3.0, 7.0, 1620.0, 0.0, 47.6696, -122.324, 1760.0, 4080.0, 91.0, 0.0, 0.0][559631.1]0
142023-05-22 19:54:34.320[4.0, 3.25, 3990.0, 9786.0, 2.0, 0.0, 0.0, 3.0, 9.0, 3990.0, 0.0, 47.6784, -122.026, 3920.0, 8200.0, 10.0, 0.0, 0.0][909441.1]0
152023-05-22 19:54:34.320[4.0, 2.0, 1780.0, 19843.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1780.0, 0.0, 47.4414, -122.154, 2210.0, 13500.0, 52.0, 0.0, 0.0][313096.0]0
162023-05-22 19:54:34.320[4.0, 2.5, 2130.0, 6003.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2130.0, 0.0, 47.4518, -122.12, 1940.0, 4529.0, 11.0, 0.0, 0.0][404040.8]0
172023-05-22 19:54:34.320[3.0, 1.75, 1660.0, 10440.0, 1.0, 0.0, 0.0, 3.0, 7.0, 1040.0, 620.0, 47.4448, -121.77, 1240.0, 10380.0, 36.0, 0.0, 0.0][292859.5]0
182023-05-22 19:54:34.320[3.0, 2.5, 2110.0, 4118.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2110.0, 0.0, 47.3878, -122.153, 2110.0, 4044.0, 25.0, 0.0, 0.0][338357.88]0
192023-05-22 19:54:34.320[4.0, 2.25, 2200.0, 11250.0, 1.5, 0.0, 0.0, 5.0, 7.0, 1300.0, 900.0, 47.6845, -122.201, 2320.0, 10814.0, 94.0, 0.0, 0.0][682284.6]0

Remove Connection from Workspace

The Workspace method remove_connection(connection_name) removes the connection from the workspace, but does not delete the connection from the Wallaroo instance. This method takes the following parameters.

ParameterTypeDescription
nameString (Required)The name of the connection to be removed

The previous connection will be removed from the workspace, then the workspace connections displayed to verify it has been removed.

workspace.remove_connection(connection_name)

display(workspace.list_connections())

(no connections)

Delete Connection

The Connection method delete_connection() removes the connection from the Wallaroo instance, and all attachments in workspaces they were connected to.

connection.delete_connection()

wl.list_connections()

(no connections)

Orchestration Tutorial

The next series of examples will build on what we just did. So far we have:

  • Deployed a pipeline, performed sample inferences with a local Apache Arrow file, displayed the results, then undeployed the pipeline.
  • Deployed a pipeline, use a Wallaroo connection details to retrieve a remote Apache Arrow file, performed inferences and displayed the results, then undeployed the pipeline.

For the orchestration tutorial, we’ll do the same thing only package it into a separate python script and upload it to the Wallaroo instance, then create a task from that orchestration and perform our sample inferences again.

Orchestration Requirements

Orchestrations are uploaded to the Wallaroo instance as a ZIP file with the following requirements:

  • The ZIP file should not contain any directories - only files at the top level.
ParameterTypeDescription
User Code(Required) Python script as .py filesPython scripts for the orchestration to run. If the file main.py exists, that will be the entrypoint. Otherwise, if only one .py exists, then that will be the entrypoint.
Python Library Requirements(Required) requirements.txt file in the requirements file format. This is in the root of the zip file, and there can only be one requirements.txt file for the orchestration.
Other artifacts Other artifacts such as files, data, or code to support the orchestration.

Zip Instructions

In a terminal with the zip command, assemble artifacts as above and then create the archive. The zip command is included by default with the Wallaroo JupyterHub service.

zip commands take the following format, with {zipfilename}.zip as the zip file to save the artifacts to, and each file thereafter as the files to add to the archive.

zip {zipfilename}.zip file1, file2, file3....

For example, the following command will add the files main.py and requirements.txt into the file hello.zip.

$ zip hello.zip main.py requirements.txt 
  adding: main.py (deflated 47%)
  adding: requirements.txt (deflated 52%)

Orchestration Recommendations

The following recommendations will make using Wallaroo orchestrations

  • The version of Python used should match the same version as in the Wallaroo JupyterHub service.
  • The same version of the Wallaroo SDK should match the server. For a 2023.2 Wallaroo instance, use the Wallaroo SDK version 2023.2.
  • Specify the version of pip dependencies.
  • The wallaroo.Client constructor auth_type argument is ignored. Using wallaroo.Client() is sufficient.
  • The following methods will assist with orchestrations:
    • wallaroo.in_task() : Returns True if the code is running within an Orchestrator task.
    • wallaroo.task_args(): Returns a Dict of invocation-specific arguments passed to the run_ calls.
  • Use print commands so outputs are saved to the task’s log files.

Example requirements.txt file

dbt-bigquery==1.4.3
dbt-core==1.4.5
dbt-extractor==0.4.1
dbt-postgres==1.4.5
google-api-core==2.8.2
google-auth==2.11.0
google-auth-oauthlib==0.4.6
google-cloud-bigquery==3.3.2
google-cloud-bigquery-storage==2.15.0
google-cloud-core==2.3.2
google-cloud-storage==2.5.0
google-crc32c==1.5.0
google-pasta==0.2.0
google-resumable-media==2.3.3
googleapis-common-protos==1.56.4

Sample Orchestrator

The following orchestrator artifacts are in the directory ./remote_inference and includes the file main.py with the following code:

import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd
import pyarrow as pa
import requests

wl = wallaroo.Client()

# Setting variables for later steps

# get the arguments
arguments = wl.task_args()

if "workspace_name" in arguments:
    workspace_name = arguments['workspace_name']
else:
    workspace_name="orchestrationworkspace"

if "pipeline_name" in arguments:
    pipeline_name = arguments['pipeline_name']
else:
    pipeline_name="orchestrationpipeline"

if "connection_name" in arguments:
    connection_name = arguments['connection_name']
else:
    connection_name = "houseprice_arrow_table"

# helper methods to retrieve workspaces and pipelines

def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

def get_pipeline(name):
    try:
        pipeline = wl.pipelines_by_name(name)[0]
    except EntityNotFoundError:
        pipeline = wl.build_pipeline(name)
    return pipeline

print(f"Getting the workspace {workspace_name}")
workspace = get_workspace(workspace_name)
wl.set_current_workspace(workspace)

print(f"Getting the pipeline {pipeline_name}")
pipeline = get_pipeline(pipeline_name)
pipeline.deploy()
# Get the connection - assuming it will be the only one

inference_source_connection = wl.get_connection(name=connection_name)

print(f"Getting arrow table file")
# Retrieve the file
# set accept as apache arrow table
headers = {
    'Accept': 'application/vnd.apache.arrow.file'
}

response = requests.get(
                    inference_source_connection.details()['host'], 
                    headers=headers
                )

# Arrow table is retrieved 
with pa.ipc.open_file(response.content) as reader:
    arrow_table = reader.read_all()

print("Inference time.  Displaying results after.")
# Perform the inference
result = pipeline.infer(arrow_table)
print(result)

pipeline.undeploy()

This is saved to the file ./remote_inference/remote_inference.zip.

Preparing the Wallaroo Instance

To prepare the Wallaroo instance, we’ll once again create the Wallaroo connection houseprice_arrow_table and apply it to the workspace.

wl.create_connection(connection_name, 
                  "HTTPFILE", 
                  {'host':'https://github.com/WallarooLabs/Wallaroo_Tutorials/raw/main/wallaroo-testing-tutorials/houseprice-saga/data/xtest-1k.arrow'}
                  )

workspace.add_connection(connection_name)

Upload the Orchestration

Orchestrations are uploaded with the Wallaroo client upload_orchestration(path) method with the following parameters.

ParameterTypeDescription
pathstring (Required)The path to the ZIP file to be uploaded.

Once uploaded, the deployment will be prepared and any requirements will be downloaded and installed.

For this example, the orchestration ./remote_inference/remote_inference.zip will be uploaded and saved to the variable orchestration.

orchestration = wl.upload_orchestration(name="comprehensive sample", path="./remote_inference/remote_inference.zip")

Orchestration Status

The Orchestration method status() displays the current status of the uploaded orchestration.

StatusDescription
pending_packagingThe orchestration is uploaded, but packaging hasn’t started yet.
packagingThe orchestration is being packaged for use with the Wallaroo instance.
readyThe orchestration is ready for use.

For this example, the status of the orchestration will be displayed then looped until it has reached status ready.

import time

while orchestration.status() != 'ready':
    print(orchestration.status())
    time.sleep(5)
pending_packaging
pending_packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging
packaging

List Orchestrations

Orchestrations are listed with the Wallaroo Client list_orchestrations() which returns a list of available orchestrations.

wl.list_orchestrations()
idnamestatusfilenameshacreated atupdated at
0f90e606-09f8-409b-a306-cb04ec4c011acomprehensive samplereadyremote_inference.zipb88e93...2396fb2023-22-May 19:55:152023-22-May 19:56:09

Task Management Tutorial

Once an Orchestration has the status ready, it can be run as a task. Tasks have three run options.

TypeSDK CallHow triggeredPurpose
Onceorchestration.run_once(name, json_args, timeout)Task runs once and exits.Single batch, experimentation
Scheduledorchestration.run_scheduled()User provides schedule. Task runs exits whenever schedule dictates.Recurrent batch ETL.

Task Run Once

Tasks are generated and run once with the Orchestration run_once(name, json_args, timeout) method. Any arguments for the orchestration are passed in as a Dict. If there are no arguments, then an empty set {} is passed.

For our example, we will pass the workspace, pipeline, and connection into our task.

# Example: run once

import datetime
task_start = datetime.datetime.now()

task = orchestration.run_once(name="house price run once 2", json_args={"workspace_name": workspace_name, 
                                                                           "pipeline_name":pipeline_name,
                                                                           "connection_name": connection_name
                                                                           }
                            )
task
FieldValue
IDf0e27d6a-6a98-4d26-b240-266f08560c48
Namehouse price run once 2
Last Run Statusunknown
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-22-May 19:58:32
Updated At2023-22-May 19:58:32
taskfail.last_runs()
task idpod idstatuscreated atupdated at
5ee51c78-a1c6-41e4-86a6-77110ce26161844902e0-5ff3-4c15-b497-e173aa3ce0d5running2023-22-May 20:15:082023-22-May 20:15:08

List Tasks

The list of tasks in the Wallaroo instance is retrieved through the Wallaroo Client list_tasks() method that accepts the following parameters.

ParameterTypeDescription
killedBoolean (Optional Default: False)Returns tasks depending on whether they have been issued the kill command. False returns all tasks whether killed or not. True only returns killed tasks.

This returns an array list of the following in reverse chronological order from updated at.

ParameterTypeDescription
idstringThe UUID identifier for the task.
last run statusstringThe last reported status the task. Values are:
  • unknown: The task has not been started or is being prepared.
  • ready: The task is scheduled to execute.
  • running: The task has started.
  • failure: The task failed.
  • success: The task completed.
typestringThe type of the task. Values are:
  • Temporary Run: The task runs once then stop.
  • Scheduled Run: The task repeats on a cron like schedule.
  • Service Run: The task runs as a service and executes when its service port is activated.
activeBooleanTrue: The task is scheduled or running. False: The task has completed or has been issued the kill command.
schedulestringThe cron style schedule for the task. If the task is not a scheduled one, then the schedule will be -.
created atDateTimeThe date and time the task was started.
updated atDateTimeThe date and time the task was updated.
wl.list_tasks()
idnamelast run statustypeactiveschedulecreated atupdated at
f0e27d6a-6a98-4d26-b240-266f08560c48house price run once 2runningTemporary RunTrue-2023-22-May 19:58:322023-22-May 19:58:38
36509ef8-98da-42a0-913f-e6e929dedb15house price run oncesuccessTemporary RunTrue-2023-22-May 19:56:372023-22-May 19:56:48

Task Status

The status of the task is returned with the Task status() method that returned the tasks status. Tasks can have the following status.

  • pending: The task has not been started or is being prepared.
  • started: The task has started to execute.
while task.status() != "started":
    display(task.status())
    time.sleep(5)

Task Last Runs History

The history of a task, which each deployment of the task is known as a task run is retrieved with the Task last_runs method that takes the following arguments.

ParameterTypeDescription
statusString (Optional *Default: all)Filters the task history by the status. If all, returns all statuses. Status values are:
  • running: The task has started.
  • failure: The task failed.
  • success: The task completed.
limitInteger (Optional)Limits the number of task runs returned.

This returns the following in reverse chronological order by updated at.

ParameterTypeDescription
task idstringTask id in UUID format.
pod idstringPod id in UUID format.
statusstringStatus of the task. Status values are:
  • running: The task has started.
  • failure: The task failed.
  • success: The task completed.
created atDateTimeDate and time the task was created at.
updated atDateTimeDate and time the task was updated.
task.last_runs()
task idpod idstatuscreated atupdated at
f0e27d6a-6a98-4d26-b240-266f08560c487d9d73d5-df11-44ed-90c1-db0e64c7f9b8success2023-22-May 19:58:352023-22-May 19:58:35

Task Run Logs

The output of a task is displayed with the Task Run logs() method that takes the following parameters.

ParameterTypeDescription
limitInteger (Optional)Limits the lines returned from the task run log. The limit parameter is based on the log tail - starting from the last line of the log file, then working up until the limit of lines is reached. This is useful for viewing final outputs, exceptions, etc.

The Task Run logs() returns the log entries as a string list, with each entry as an item in the list.

  • IMPORTANT NOTE: It may take around a minute for task run logs to be integrated into the Wallaroo log database.
# give time for the task to complete and the log files entered
time.sleep(60)
recent_run = task.last_runs()[0]
display(recent_run.logs())
2023-22-May 19:59:29 Getting the workspace orchestrationworkspacetgiq
2023-22-May 19:59:29 Getting the pipeline orchestrationpipelinetgiq
2023-22-May 19:59:29 Getting arrow table file
2023-22-May 19:59:29 Inference time.  Displaying results after.
2023-22-May 19:59:29 pyarrow.Table
2023-22-May 19:59:29 time: timestamp[ms]
2023-22-May 19:59:29 in.tensor: list not null
2023-22-May 19:59:29   child 0, item: float
2023-22-May 19:59:29 out.variable: list not null
2023-22-May 19:59:29 check_failures: int8
2023-22-May 19:59:29   child 0, inner: float not null
2023-22-May 19:59:29 ----
2023-22-May 19:59:29 time: [[2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,...,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767,2023-05-22 19:58:49.767]]
2023-22-May 19:59:29 in.tensor: [[[4,2.5,2900,5505,2,...,2970,5251,12,0,0],[2,2.5,2170,6361,1,...,2310,7419,6,0,0],...,[3,1.75,2910,37461,1,...,2520,18295,47,0,0],[3,2,2005,7000,1,...,1750,4500,34,0,0]]]
2023-22-May 19:59:29 check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
2023-22-May 19:59:29 out.variable: [[[718013.75],[615094.56],...,[706823.56],[581003]]]

Failed Task Logs

We can create a task that fails and show it in the last_runs list, then retrieve the logs to display why it failed.

# Example: run once

import datetime
task_start = datetime.datetime.now()

taskfail = orchestration.run_once(name="house price run once 2", json_args={"workspace_name": "bob", 
                                                                           "pipeline_name":"does not exist",
                                                                           "connection_name": connection_name
                                                                           }
                            )

while taskfail.status() != "started":
    display(taskfail.status())
    time.sleep(5)
FieldValue
ID54229bd2-c388-4196-9ce2-f76503a27f99
Namehouse price run once 2
Last Run Statusunknown
TypeTemporary Run
ActiveTrue
Schedule-
Created At2023-22-May 20:17:16
Updated At2023-22-May 20:17:16
# time.sleep(60)
taskfail.last_runs()
task idpod idstatuscreated atupdated at
54229bd2-c388-4196-9ce2-f76503a27f9979d7fe3e-ac8c-4f9c-8288-c9c207fb0a5efailure2023-22-May 20:17:182023-22-May 20:17:18
# time.sleep(60)
taskfaillogs = taskfail.last_runs()[0].logs()
display(taskfaillogs)
2023-22-May 20:17:22 Getting the workspace bob
2023-22-May 20:17:22   File "/home/jovyan/main.py", line 43, in get_pipeline
2023-22-May 20:17:22 Traceback (most recent call last):
2023-22-May 20:17:22 Getting the pipeline does not exist
2023-22-May 20:17:22     pipeline = wl.pipelines_by_name(name)[0]
2023-22-May 20:17:22   File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/client.py", line 1064, in pipelines_by_name
2023-22-May 20:17:22     raise EntityNotFoundError("Pipeline", {"pipeline_name": pipeline_name})
2023-22-May 20:17:22 wallaroo.object.EntityNotFoundError: Pipeline not found: {'pipeline_name': 'does not exist'}
2023-22-May 20:17:22 
2023-22-May 20:17:22 During handling of the above exception, another exception occurred:
2023-22-May 20:17:22 Traceback (most recent call last):
2023-22-May 20:17:22 
2023-22-May 20:17:22   File "/home/jovyan/main.py", line 54, in 
2023-22-May 20:17:22     pipeline = get_pipeline(pipeline_name)
2023-22-May 20:17:22     pipeline = wl.build_pipeline(name)
2023-22-May 20:17:22   File "/home/jovyan/main.py", line 45, in get_pipeline
2023-22-May 20:17:22   File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/client.py", line 1102, in build_pipeline
2023-22-May 20:17:22     require_dns_compliance(pipeline_name)
2023-22-May 20:17:22   File "/home/jovyan/venv/lib/python3.9/site-packages/wallaroo/checks.py", line 274, in require_dns_compliance
2023-22-May 20:17:22 wallaroo.object.InvalidNameError: Name 'does not exist is invalid: must be DNS-compatible (ASCII alpha-numeric plus dash (-))
2023-22-May 20:17:22     raise InvalidNameError(

Task Results

We can view the inferences from our logs and verify that new entries were added from our task. In our case, we’ll assume the task once started takes about 1 minute to run (deploy the pipeline, run the inference, undeploy the pipeline). We’ll add in a wait of 1 minute, then display the logs during the time period the task was running.

task_end = datetime.datetime.now()
display(task_end)

pipeline.logs(start_datetime = task_start, end_datetime = task_end)
datetime.datetime(2023, 5, 22, 20, 1, 25, 418564)

Warning: Pipeline log size limit exceeded. Please request logs using export_logs

timein.tensorout.variablecheck_failures
02023-05-22 19:58:49.767[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-05-22 19:58:49.767[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-05-22 19:58:49.767[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-05-22 19:58:49.767[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-05-22 19:58:49.767[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
...............
5012023-05-22 19:58:49.767[3.0, 2.5, 1570.0, 1433.0, 3.0, 0.0, 0.0, 3.0, 8.0, 1570.0, 0.0, 47.6858, -122.336, 1570.0, 2652.0, 4.0, 0.0, 0.0][557391.25]0
5022023-05-22 19:58:49.767[3.0, 2.5, 2390.0, 15669.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2390.0, 0.0, 47.7446, -122.193, 2640.0, 12500.0, 24.0, 0.0, 0.0][741973.6]0
5032023-05-22 19:58:49.767[3.0, 0.75, 920.0, 20412.0, 1.0, 1.0, 2.0, 5.0, 6.0, 920.0, 0.0, 47.4781, -122.49, 1162.0, 54705.0, 64.0, 0.0, 0.0][338418.8]0
5042023-05-22 19:58:49.767[4.0, 2.5, 2800.0, 246114.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2800.0, 0.0, 47.6586, -121.962, 2750.0, 60351.0, 15.0, 0.0, 0.0][765468.75]0
5052023-05-22 19:58:49.767[2.0, 1.0, 1120.0, 9912.0, 1.0, 0.0, 0.0, 4.0, 6.0, 1120.0, 0.0, 47.3735, -122.43, 1540.0, 9750.0, 34.0, 0.0, 0.0][309800.75]0

506 rows × 4 columns

Scheduled Tasks

Scheduled tasks are run with the Orchestration run_scheduled method. We’ll set it up to run every 5 minutes, then check the results.

It is recommended that orchestrations that have pipeline deploy or undeploy commands be spaced out no less than 5 minutes to prevent colliding with other tasks that use the same pipeline.

task_start = datetime.datetime.now()
schedule = "*/5 * * * *"
task_scheduled = orchestration.run_scheduled(name="schedule example", 
                                             timeout=600, 
                                             schedule=schedule, 
                                             json_args={"workspace_name": workspace_name, 
                                                        "pipeline_name": pipeline_name,
                                                        "connection_name": connection_name
                                            })
while task_scheduled.status() != "started":
    display(task_scheduled.status())
    time.sleep(5)
task_scheduled
'pending'
FieldValue
ID4af57c61-dfa9-43eb-944e-559135495df4
Nameschedule example
Last Run Statusunknown
TypeScheduled Run
ActiveTrue
Schedule*/5 * * * *
Created At2023-22-May 20:08:25
Updated At2023-22-May 20:08:25
time.sleep(420)
recent_run = task_scheduled.last_runs()[0]
display(recent_run.logs())
2023-22-May 20:11:02 Getting the workspace orchestrationworkspacetgiq
2023-22-May 20:11:02 Getting the pipeline orchestrationpipelinetgiq
2023-22-May 20:11:02 Inference time.  Displaying results after.
2023-22-May 20:11:02 Getting arrow table file
2023-22-May 20:11:02 pyarrow.Table
2023-22-May 20:11:02 time: timestamp[ms]
2023-22-May 20:11:02 in.tensor: list not null
2023-22-May 20:11:02   child 0, item: float
2023-22-May 20:11:02 out.variable: list not null
2023-22-May 20:11:02   child 0, inner: float not null
2023-22-May 20:11:02 check_failures: int8
2023-22-May 20:11:02 ----
2023-22-May 20:11:02 time: [[2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,...,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271,2023-05-22 20:10:23.271]]
2023-22-May 20:11:02 in.tensor: [[[4,2.5,2900,5505,2,...,2970,5251,12,0,0],[2,2.5,2170,6361,1,...,2310,7419,6,0,0],...,[3,1.75,2910,37461,1,...,2520,18295,47,0,0],[3,2,2005,7000,1,...,1750,4500,34,0,0]]]
2023-22-May 20:11:02 check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
2023-22-May 20:11:02 out.variable: [[[718013.75],[615094.56],...,[706823.56],[581003]]]

Kill a Task

Killing a task removes the schedule or removes it from a service. Tasks are killed with the Task kill() method, and returns a message with the status of the kill procedure.

If necessary, all tasks can be killed through the following script.

  • IMPORTANT NOTE: This command will kill all running tasks - scheduled or otherwise. Only use this if required.
# Kill all tasks
for t in wl.list_tasks(): t.kill()

When listed with Wallaroo client task_list(killed=True) , the field active displays tasks that are killed (False) or either completed running or still scheduled to run (True).

task_scheduled.kill()
<ArbexStatus.PENDING_KILL: 'pending_kill'>
wl.list_tasks(killed=True)
idnamelast run statustypeactiveschedulecreated atupdated at
4af57c61-dfa9-43eb-944e-559135495df4schedule examplesuccessScheduled RunFalse*/5 * * * *2023-22-May 20:08:252023-22-May 20:13:12
dc185e24-cf89-4a97-b6f0-33fc3d67da72schedule exampleunknownScheduled RunFalse*/5 * * * *2023-22-May 20:05:472023-22-May 20:06:22
f0e27d6a-6a98-4d26-b240-266f08560c48house price run once 2successTemporary RunTrue-2023-22-May 19:58:322023-22-May 19:58:38
36509ef8-98da-42a0-913f-e6e929dedb15house price run oncesuccessTemporary RunTrue-2023-22-May 19:56:372023-22-May 19:56:48

Cleaning Up

With the tutorial complete we will undeploy the pipeline and ensure the resources are returned back to the Wallaroo instance.

pipeline.undeploy()