Async Infer with Aloha Tutorial

How to perform asynchronous inferences via async_infer

Table of Contents

This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.

Async Infer with Aloha Tutorial

This tutorial demonstrates using async infer through the Wallaroo SDK. The method async wallaroo.pipeline.Pipeline.async_infer provides asynchronous inference requests to a deployed model. This allows organizations to submit inference requests, proceed with other tasks, then respond when the inference request is complete. For more details, see Run Asynchronous Inference through Local Variable

In this notebook we will walk through a simple pipeline deployment, then use async_infer inference on a model. For this example we will be using an open source model that uses an Aloha CNN LSTM model for classifying Domain names as being either legitimate or being used for nefarious purposes such as malware distribution.

Prerequisites

  • An installed Wallaroo instance.
  • The following Python libraries installed:
    • os
    • wallaroo: The Wallaroo SDK. Included with the Wallaroo JupyterHub service by default.
    • pandas: Pandas, mainly used for Pandas DataFrame
    • pyarrow: PyArrow for Apache Arrow support
    • [httpx]: Provides async_infer with the AsyncClient object.

Tutorial Goals

For our example, we will perform the following:

  • Create a workspace for our work.
  • Upload the Aloha model.
  • Create a pipeline that can ingest our submitted data, submit it to the model, and export the results
  • Run a sample async through a pandas DataFrame.
  • Run a sample async through an Apache Arrow Table.

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

import wallaroo
from wallaroo.object import EntityNotFoundError

# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
import pyarrow as pa

import httpx
# Login through local Wallaroo instance

wl = wallaroo.Client()

Create the Workspace

We will create a workspace to work in and call it the “alohaworkspace”, then set it as current workspace environment. We’ll also create our pipeline in advance as alohapipeline. The model name and the model file will be specified for use in later steps.

workspace_name = f'alohaworkspace'
pipeline_name = f'alohapipeline'
model_name = f'alohamodel'
model_file_name = './alohacnnlstm.zip'
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)

wl.set_current_workspace(workspace)

aloha_pipeline = wl.build_pipeline(pipeline_name)
aloha_pipeline
namealohapipeline
created2024-09-27 15:55:29.014069+00:00
last_updated2024-09-30 17:45:06.917400+00:00
deployedFalse
workspace_id10
workspace_namealohaworkspace
archx86
accelnone
tags
versionse6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a
stepsalohamodel
publishedFalse

We can verify the workspace is created the current default workspace with the get_current_workspace() command.

wl.get_current_workspace()
{'name': 'alohaworkspace', 'id': 10, 'archived': False, 'created_by': 'f75a8629-70ab-4100-8b37-04cf79e667ee', 'created_at': '2024-09-27T15:55:28.840886+00:00', 'models': [{'name': 'alohamodel', 'versions': 7, 'owner_id': '""', 'last_update_time': datetime.datetime(2024, 9, 27, 17, 35, 20, 903455, tzinfo=tzutc()), 'created_at': datetime.datetime(2024, 9, 27, 15, 55, 32, 12846, tzinfo=tzutc())}], 'pipelines': [{'name': 'alohapipeline', 'create_time': datetime.datetime(2024, 9, 27, 15, 55, 29, 14069, tzinfo=tzutc()), 'definition': '[]'}]}

Upload the Models

Now we will upload our models. Note that for this example we are applying the model from a .ZIP file. The Aloha model is a protobuf file that has been defined for evaluating web pages, and we will configure it to use data in the tensorflow format.

from wallaroo.framework import Framework

model = wl.upload_model(model_name, 
                        model_file_name,
                        framework=Framework.TENSORFLOW
                        )

Deploy a model

Now that we have a model that we want to use we will create a deployment for it.

We will tell the deployment we are using a tensorflow model and give the deployment name and the configuration we want for the deployment.

To do this, we’ll create our pipeline that can ingest the data, pass the data to our Aloha model, and give us a final output. We’ll call our pipeline aloha-test-demo, then deploy it so it’s ready to receive data. The deployment process usually takes about 45 seconds.

  • Note: If you receive an error that the pipeline could not be deployed because there are not enough resources, undeploy any other pipelines and deploy this one again. This command can quickly undeploy all pipelines to regain resources. We recommend not running this command in a production environment since it will cancel any running pipelines:
for p in wl.list_pipelines(): p.undeploy()
aloha_pipeline.add_model_step(model)
namealohapipeline
created2024-09-27 15:55:29.014069+00:00
last_updated2024-09-30 17:45:06.917400+00:00
deployedFalse
workspace_id10
workspace_namealohaworkspace
archx86
accelnone
tags
versionse6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a
stepsalohamodel
publishedFalse
aloha_pipeline.deploy(wait_for_status=False)
Deployment initiated for alohapipeline. Please check pipeline status.
namealohapipeline
created2024-09-27 15:55:29.014069+00:00
last_updated2024-09-30 17:45:08.304572+00:00
deployedTrue
workspace_id10
workspace_namealohaworkspace
archx86
accelnone
tags
versions23788026-3fca-4667-b29f-f1f8d3e06dd9, e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a
stepsalohamodel
publishedFalse

We can verify that the pipeline is running and list what models are associated with it.

import time

while aloha_pipeline.status()['status'] != 'Running':
    print(aloha_pipeline.status()['status'])
    time.sleep(15)
Starting
Error
Error
Starting
Starting

Interferences

Async Infer from Pandas DataFrame

Now that the pipeline is deployed and our Aloha model is in place, we’ll perform an async inference via the async_infer method. We’ll start with a single row Pandas DataFrame and show the results.

We’ll define our AsyncClient used for other example.

async_client = httpx.AsyncClient()

smoke_test = pd.DataFrame.from_records(
    [
    {
        "text_input":[
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            28,
            16,
            32,
            23,
            29,
            32,
            30,
            19,
            26,
            17
        ]
    }
]
)

result_async = await aloha_pipeline.async_infer(tensor=smoke_test,
                                          async_client=async_client,
                                          retries=2)
display(result_async.loc[:, ["time","out.main"]])
timeout.main
02024-09-30 17:46:24.859[0.997564]

Async Infer From Apache Arrow Table

For this example, we’ll use three data sources of Apache Arrow tables to simulate drawing data, submitting an inference, then drawing data from another source while the asynchronous inference executes. When all async inferences are complete, we’ll publish all the results at once.

async def run_one_batch(file):
    with pa.ipc.open_file(file) as source:
        table = source.read_all() # to get pyarrow table

    # ... potentially do other processing here or request data from a feature store ...
    
    result_async_batch = await aloha_pipeline.async_infer(tensor=table,
                                                      async_client=async_client,
                                                      retries=2)
    return result_async_batch

# Process three batches in parallel. Inference on the first will start while
# the second file is read, and so on. Multiple inferences may be running at
# the same time if the engine replicas are > 1.
tasks = []
for file in ["./data/data_25k.arrow", "./data/data_1k.arrow", "./data/data_25k.arrow"]:
     tasks.append(run_one_batch(file))

# Wait for all three batches to finish.
results = await asyncio.gather(*tasks, return_exceptions=True)
aloha_pipeline.infer_from_file()
for result in results:
    print(f"Number of rows for task: {result.num_rows}")
    display(result)
Number of rows for task: 24954

pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null

time: [[2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,…,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406]]
in.text_input: [[[0,0,0,0,0,…,32,30,19,26,17],[0,0,0,0,0,…,18,35,18,22,23],…,[0,0,0,0,0,…,27,28,19,33,23],[0,0,0,0,0,…,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],…,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],…,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],…,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],…,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],…,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],…,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],…,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],…,[1],[0.99999803]]]

Number of rows for task: 1000

pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null

time: [[2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,…,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384]]
in.text_input: [[[0,0,0,0,0,…,32,30,19,26,17],[0,0,0,0,0,…,29,12,36,31,12],…,[0,0,0,0,0,…,35,16,35,27,16],[0,0,0,0,0,…,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[0.00002837503],…,[0.0000056315566],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[0.000012753118],…,[0.0000033642746],[1.1029468e-9]]]
out.cryptolocker: [[[0.012099549],[0.025435215],…,[0.13612255],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[6.150966e-10],…,[5.6732154e-11],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[2.321774e-10],…,[2.773063e-8],[8.43847e-15]]]
out.kraken: [[[0.00031977257],[0.051351104],…,[0.0025221605],[0.30495816]]]
out.locky: [[[0.011029262],[0.022038758],…,[0.05455697],[0.116279885]]]
out.main: [[[0.997564],[0.9885122],…,[0.9998954],[0.99999803]]]

Number of rows for task: 24954

pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null

time: [[2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,…,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395]]
in.text_input: [[[0,0,0,0,0,…,32,30,19,26,17],[0,0,0,0,0,…,18,35,18,22,23],…,[0,0,0,0,0,…,27,28,19,33,23],[0,0,0,0,0,…,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],…,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],…,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],…,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],…,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],…,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],…,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],…,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],…,[1],[0.99999803]]]

Undeploy Pipeline

When finished with our tests, we will undeploy the pipeline so we have the Kubernetes resources back for other tasks. Note that if the deployment variable is unchanged aloha_pipeline.deploy() will restart the inference engine in the same configuration as before.

aloha_pipeline.undeploy()
Waiting for undeployment - this will take up to 45s .................................... ok
namealohapipeline
created2024-09-27 15:55:29.014069+00:00
last_updated2024-09-30 17:45:08.304572+00:00
deployedFalse
workspace_id10
workspace_namealohaworkspace
archx86
accelnone
tags
versions23788026-3fca-4667-b29f-f1f8d3e06dd9, e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a
stepsalohamodel
publishedFalse