Async Infer with Aloha Tutorial
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Async Infer with Aloha Tutorial
This tutorial demonstrates using async infer through the Wallaroo SDK. The method async wallaroo.pipeline.Pipeline.async_infer
provides asynchronous inference requests to a deployed model. This allows organizations to submit inference requests, proceed with other tasks, then respond when the inference request is complete. For more details, see Run Asynchronous Inference through Local Variable
In this notebook we will walk through a simple pipeline deployment, then use async_infer
inference on a model. For this example we will be using an open source model that uses an Aloha CNN LSTM model for classifying Domain names as being either legitimate or being used for nefarious purposes such as malware distribution.
Prerequisites
- An installed Wallaroo instance.
- The following Python libraries installed:
Tutorial Goals
For our example, we will perform the following:
- Create a workspace for our work.
- Upload the Aloha model.
- Create a pipeline that can ingest our submitted data, submit it to the model, and export the results
- Run a sample async through a pandas DataFrame.
- Run a sample async through an Apache Arrow Table.
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
import wallaroo
from wallaroo.object import EntityNotFoundError
# to display dataframe tables
from IPython.display import display
# used to display dataframe information without truncating
import pandas as pd
pd.set_option('display.max_colwidth', None)
import pyarrow as pa
import httpx
# Login through local Wallaroo instance
wl = wallaroo.Client()
Create the Workspace
We will create a workspace to work in and call it the “alohaworkspace”, then set it as current workspace environment. We’ll also create our pipeline in advance as alohapipeline
. The model name and the model file will be specified for use in later steps.
workspace_name = f'alohaworkspace'
pipeline_name = f'alohapipeline'
model_name = f'alohamodel'
model_file_name = './alohacnnlstm.zip'
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
aloha_pipeline = wl.build_pipeline(pipeline_name)
aloha_pipeline
name | alohapipeline |
---|---|
created | 2024-09-27 15:55:29.014069+00:00 |
last_updated | 2024-09-30 17:45:06.917400+00:00 |
deployed | False |
workspace_id | 10 |
workspace_name | alohaworkspace |
arch | x86 |
accel | none |
tags | |
versions | e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a |
steps | alohamodel |
published | False |
We can verify the workspace is created the current default workspace with the get_current_workspace()
command.
wl.get_current_workspace()
{'name': 'alohaworkspace', 'id': 10, 'archived': False, 'created_by': 'f75a8629-70ab-4100-8b37-04cf79e667ee', 'created_at': '2024-09-27T15:55:28.840886+00:00', 'models': [{'name': 'alohamodel', 'versions': 7, 'owner_id': '""', 'last_update_time': datetime.datetime(2024, 9, 27, 17, 35, 20, 903455, tzinfo=tzutc()), 'created_at': datetime.datetime(2024, 9, 27, 15, 55, 32, 12846, tzinfo=tzutc())}], 'pipelines': [{'name': 'alohapipeline', 'create_time': datetime.datetime(2024, 9, 27, 15, 55, 29, 14069, tzinfo=tzutc()), 'definition': '[]'}]}
Upload the Models
Now we will upload our models. Note that for this example we are applying the model from a .ZIP file. The Aloha model is a protobuf file that has been defined for evaluating web pages, and we will configure it to use data in the tensorflow
format.
from wallaroo.framework import Framework
model = wl.upload_model(model_name,
model_file_name,
framework=Framework.TENSORFLOW
)
Deploy a model
Now that we have a model that we want to use we will create a deployment for it.
We will tell the deployment we are using a tensorflow model and give the deployment name and the configuration we want for the deployment.
To do this, we’ll create our pipeline that can ingest the data, pass the data to our Aloha model, and give us a final output. We’ll call our pipeline aloha-test-demo
, then deploy it so it’s ready to receive data. The deployment process usually takes about 45 seconds.
- Note: If you receive an error that the pipeline could not be deployed because there are not enough resources, undeploy any other pipelines and deploy this one again. This command can quickly undeploy all pipelines to regain resources. We recommend not running this command in a production environment since it will cancel any running pipelines:
for p in wl.list_pipelines(): p.undeploy()
aloha_pipeline.add_model_step(model)
name | alohapipeline |
---|---|
created | 2024-09-27 15:55:29.014069+00:00 |
last_updated | 2024-09-30 17:45:06.917400+00:00 |
deployed | False |
workspace_id | 10 |
workspace_name | alohaworkspace |
arch | x86 |
accel | none |
tags | |
versions | e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a |
steps | alohamodel |
published | False |
aloha_pipeline.deploy(wait_for_status=False)
Deployment initiated for alohapipeline. Please check pipeline status.
name | alohapipeline |
---|---|
created | 2024-09-27 15:55:29.014069+00:00 |
last_updated | 2024-09-30 17:45:08.304572+00:00 |
deployed | True |
workspace_id | 10 |
workspace_name | alohaworkspace |
arch | x86 |
accel | none |
tags | |
versions | 23788026-3fca-4667-b29f-f1f8d3e06dd9, e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a |
steps | alohamodel |
published | False |
We can verify that the pipeline is running and list what models are associated with it.
import time
while aloha_pipeline.status()['status'] != 'Running':
print(aloha_pipeline.status()['status'])
time.sleep(15)
Starting
Error
Error
Starting
Starting
Interferences
Async Infer from Pandas DataFrame
Now that the pipeline is deployed and our Aloha model is in place, we’ll perform an async inference via the async_infer
method. We’ll start with a single row Pandas DataFrame and show the results.
We’ll define our AsyncClient
used for other example.
async_client = httpx.AsyncClient()
smoke_test = pd.DataFrame.from_records(
[
{
"text_input":[
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
28,
16,
32,
23,
29,
32,
30,
19,
26,
17
]
}
]
)
result_async = await aloha_pipeline.async_infer(tensor=smoke_test,
async_client=async_client,
retries=2)
display(result_async.loc[:, ["time","out.main"]])
time | out.main | |
---|---|---|
0 | 2024-09-30 17:46:24.859 | [0.997564] |
Async Infer From Apache Arrow Table
For this example, we’ll use three data sources of Apache Arrow tables to simulate drawing data, submitting an inference, then drawing data from another source while the asynchronous inference executes. When all async inferences are complete, we’ll publish all the results at once.
async def run_one_batch(file):
with pa.ipc.open_file(file) as source:
table = source.read_all() # to get pyarrow table
# ... potentially do other processing here or request data from a feature store ...
result_async_batch = await aloha_pipeline.async_infer(tensor=table,
async_client=async_client,
retries=2)
return result_async_batch
# Process three batches in parallel. Inference on the first will start while
# the second file is read, and so on. Multiple inferences may be running at
# the same time if the engine replicas are > 1.
tasks = []
for file in ["./data/data_25k.arrow", "./data/data_1k.arrow", "./data/data_25k.arrow"]:
tasks.append(run_one_batch(file))
# Wait for all three batches to finish.
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
print(f"Number of rows for task: {result.num_rows}")
display(result)
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,...,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...
Number of rows for task: 1000
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,...,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,29,12,36,31,12],...,[0,0,0,0,0,...,35,16,35,27,16],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[0.00002837503],...,[0.0000056315566],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[0.000012753118],...,[0.0000033642746],[1.1029468e-9]]]
out.cryptolocker: [[[0.012099549],[0.025435215],...,[0.13612255],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[6.150966e-10],...,[5.6732154e-11],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[2.321774e-10],...,[2.773063e-8],[8.43847e-15]]]
out.kraken: [[[0.00031977257],[0.051351104],...,[0.0025221605],[0.30495816]]]
out.locky: [[[0.011029262],[0.022038758],...,[0.05455697],[0.116279885]]]
out.main: [[[0.997564],[0.9885122],...,[0.9998954],[0.99999803]]]
...
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,...,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...
Undeploy Pipeline
When finished with our tests, we will undeploy the pipeline so we have the Kubernetes resources back for other tasks. Note that if the deployment variable is unchanged aloha_pipeline.deploy() will restart the inference engine in the same configuration as before.
aloha_pipeline.undeploy()
Waiting for undeployment - this will take up to 45s .................................... ok
name | alohapipeline |
---|---|
created | 2024-09-27 15:55:29.014069+00:00 |
last_updated | 2024-09-30 17:45:08.304572+00:00 |
deployed | False |
workspace_id | 10 |
workspace_name | alohaworkspace |
arch | x86 |
accel | none |
tags | |
versions | 23788026-3fca-4667-b29f-f1f8d3e06dd9, e6c18566-3132-4ac3-b681-d300a20dd2ec, fc2695f2-65e6-4000-8526-81556bf02ab7, a71d43b9-31dc-4bd6-b3cd-8c536818935d, 11c86ec2-b7a4-4ce4-a600-394964cd66e3, 388b0cef-e26d-48ec-aceb-50c557ec9a0e, ebb3189c-eddb-423e-94b0-1ff1a9e5c3d8, c1dfccbf-2b82-418d-898c-7ac4a1d6d321, 000af8fb-c459-4dcc-873e-401a255e234b, 838c79a3-5602-41bb-ab93-fca01e41fa7a, ba1138e7-ef33-44e0-9c5a-6210fa259aa7, ea6a5027-3cbc-4a1f-bf71-1626b04f1167, 429df761-836c-4733-a1ff-5e7685ca43ab, c451bf1a-662c-4d8e-9c91-f59055756c26, 1631104a-2833-4209-80fb-4f10770504a5, 239f949a-1b9f-4e74-a026-6132bc0d0add, 118debb6-be7a-4360-89eb-954e7bde67f4, db50ad71-81ce-4592-a322-82b152f8d40d, f55651b8-8a96-4017-a223-37682f0e7047, 1fbf9314-41b3-41b5-ac05-f9563aa9c14a |
steps | alohamodel |
published | False |