This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo provides the ability to perform inferences through deployed pipelines via the Wallaroo SDK and the Wallaroo MLOps API. This tutorial demonstrates performing inferences using the Wallaroo MLOps API.
This tutorial provides the following:
ccfraud.onnx
: A pre-trained credit card fraud detection model.data/cc_data_1k.arrow
, data/cc_data_10k.arrow
: Sample testing data in Apache Arrow format with 1,000 and 10,000 records respectively.wallaroo-model-endpoints-api.py
: A code-only version of this tutorial as a Python script.This tutorial and sample data comes from the Machine Learning Group’s demonstration on Credit Card Fraud detection.
The following is required for this tutorial:
This demonstration provides a quick tutorial on performing inferences using the Wallaroo MLOps API using a deployed pipeline’s Inference URL. This following steps will be performed:
ccfraud
model.ccfraud
model as a pipeline step.There are two methods of retrieving the JWT token used to authenticate to the Wallaroo instance’s API service:
This tutorial will use the Wallaroo SDK method for convenience with environmental variables for a seamless login without browser validation. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.
All Wallaroo API endpoints follow the format:
https://$WALLAROODOMAIN/v1/api$COMMAND
Where $COMMAND
is the specific endpoint. For example, for the command to list of workspaces in the Wallaroo instance would use the above format based on these settings:
$WALLAROODOMAIN
: example.wallaroo.ai
$COMMAND
: /workspaces/list
This would create the following API endpoint:
https://example.wallaroo.ai/v1/api/workspaces/list
For this example, a connection to the Wallaroo SDK is used. This will be used to retrieve the JWT token for the MLOps API calls.
import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd
import os
import base64
import pyarrow as pa
import requests
from requests.auth import HTTPBasicAuth
import json
# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)
# Login through local Wallaroo instance
wl = wallaroo.Client()
As mentioned earlier, there are multiple methods of authenticating to the Wallaroo instance for MLOps API calls. This tutorial will use the Wallaroo SDK method Wallaroo Client wl.auth.auth_header()
method, extracting the token from the response.
We will also use the wallaroo.client.Client.api_endpoint
that provides the MLOps API URL for the rest of our methods. This saved us from having to derive it from the DNS address. See the Wallaroo Documentation Site for more details on using the MLOps API and connection methods.
Reference: MLOps API Retrieve Token Through Wallaroo SDK
display(wl.api_endpoint)
display(wl.auth.auth_header())
'https://doc-test.wallarooexample.ai'
{‘Authorization’: ‘Bearer eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJVck9FV3NYUGtvcEFjSU5CYmRrWWFFTFMzSzJiMGlSd21pdWgxb3VVbWhFIn0.eyJleHAiOjE3MTMyOTk3NTAsImlhdCI6MTcxMzI5OTY5MCwiYXV0aF90aW1lIjoxNzEzMjk4Njg5LCJqdGkiOiIzZWQxOTUxOS1hNzJmLTRjYjctOWZiMy0yNmIzZDNlNjg5ZWMiLCJpc3MiOiJodHRwczovL2RvYy10ZXN0LmtleWNsb2FrLndhbGxhcm9vY29tbXVuaXR5Lm5pbmphL2F1dGgvcmVhbG1zL21hc3RlciIsImF1ZCI6WyJtYXN0ZXItcmVhbG0iLCJhY2NvdW50Il0sInN1YiI6IjY1MTI0YjE4LTgzODItNDlhZi1iM2M4LWFkYTNiOWRmMzMzMCIsInR5cCI6IkJlYXJlciIsImF6cCI6InNkay1jbGllbnQiLCJzZXNzaW9uX3N0YXRlIjoiNjk0Mzg5NTgtZWUzMS00ZTI5LTkwYWUtMTk0ZWQ3ZDRiODhlIiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJjcmVhdGUtcmVhbG0iLCJkZWZhdWx0LXJvbGVzLW1hc3RlciIsIm9mZmxpbmVfYWNjZXNzIiwiYWRtaW4iLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7Im1hc3Rlci1yZWFsbSI6eyJyb2xlcyI6WyJ2aWV3LWlkZW50aXR5LXByb3ZpZGVycyIsInZpZXctcmVhbG0iLCJtYW5hZ2UtaWRlbnRpdHktcHJvdmlkZXJzIiwiaW1wZXJzb25hdGlvbiIsImNyZWF0ZS1jbGllbnQiLCJtYW5hZ2UtdXNlcnMiLCJxdWVyeS1yZWFsbXMiLCJ2aWV3LWF1dGhvcml6YXRpb24iLCJxdWVyeS1jbGllbnRzIiwicXVlcnktdXNlcnMiLCJtYW5hZ2UtZXZlbnRzIiwibWFuYWdlLXJlYWxtIiwidmlldy1ldmVudHMiLCJ2aWV3LXVzZXJzIiwidmlldy1jbGllbnRzIiwibWFuYWdlLWF1dGhvcml6YXRpb24iLCJtYW5hZ2UtY2xpZW50cyIsInF1ZXJ5LWdyb3VwcyJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJlbWFpbCBwcm9maWxlIG9wZW5pZCIsInNpZCI6IjY5NDM4OTU4LWVlMzEtNGUyOS05MGFlLTE5NGVkN2Q0Yjg4ZSIsImVtYWlsX3ZlcmlmaWVkIjp0cnVlLCJodHRwczovL2hhc3VyYS5pby9qd3QvY2xhaW1zIjp7IngtaGFzdXJhLXVzZXItaWQiOiI2NTEyNGIxOC04MzgyLTQ5YWYtYjNjOC1hZGEzYjlkZjMzMzAiLCJ4LWhhc3VyYS1kZWZhdWx0LXJvbGUiOiJhZG1pbiIsIngtaGFzdXJhLWFsbG93ZWQtcm9sZXMiOlsidXNlciIsImFkbWluIl0sIngtaGFzdXJhLXVzZXItZ3JvdXBzIjoie30ifSwicHJlZmVycmVkX3VzZXJuYW1lIjoiam9obi5oYW5zYXJpY2tAd2FsbGFyb28uYWkiLCJlbWFpbCI6ImpvaG4uaGFuc2FyaWNrQHdhbGxhcm9vLmFpIn0.mSqcAuCFq5sT7AIo2kA4j7EqnJ5F7NbXcPgh_HxGSaeMtxpjsL2EzkYgBP0Td-Ne58KsSvvJ3kjV6aBWXx9uGRIbfKeGi0w5CC9J0jy0jqvj6smP_Oty1gsaolqYc0vpSsD5HFzhnDOHQ09p3y1HDnBw0nRRv1Uh4oZO98h26wQnndklKzTtC8Qimvxa2MGvXrwQmBEVNJdosPFhN5ofL0-3S2cdr3L-iCuohtKuP28q9O9_bLuMOnxzv63z4rlNOTHdMZYCATyg4L_b7hu96sg8nVgZCJbbQCylwibyBj17iBKApXCe0E4MAEMe_DOa49cur6eMC2S_hF4y8J0BAg’}
In a production environment, the Wallaroo workspace that contains the pipeline and models would be created and deployed. We will quickly recreate those steps using the MLOps API. If the workspace and pipeline have already been created through the Wallaroo SDK Inference Tutorial, then we can skip directly to Deploy Pipeline.
Workspaces are created through the MLOps API with the /v1/api/workspaces/create
command. This requires the workspace name be provided, and that the workspace not already exist in the Wallaroo instance.
Reference: MLOps API Create Workspace
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
# Create workspace
apiRequest = f"{wl.api_endpoint}/v1/api/workspaces/create"
workspace_name = f"apiinferenceexampleworkspace"
data = {
"workspace_name": workspace_name
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
# Stored for future examples
workspaceId = response['workspace_id']
{'workspace_id': 24}
The model is uploaded using the /v1/api/models/upload_and_convert
command. This uploads a ML Model to a Wallaroo workspace via POST with Content-Type: multipart/form-data
and takes the following parameters:
public
or private
.workspaceId
.Directly after we will use the /models/list_versions
to retrieve model details used for later steps.
Reference: Wallaroo MLOps API Essentials Guide: Model Management: Upload Model to Workspace
import onnx
model = onnx.load("./ccfraud.onnx")
output =[node.name for node in model.graph.output]
input_all = [node.name for node in model.graph.input]
input_initializer = [node.name for node in model.graph.initializer]
net_feed_input = list(set(input_all) - set(input_initializer))
print('Inputs: ', net_feed_input)
print('Outputs: ', output)
Inputs: ['dense_input']
Outputs: ['dense_1']
## upload model
# Retrieve the token
headers = wl.auth.auth_header()
apiRequest = f"{wl.api_endpoint}/v1/api/models/upload_and_convert"
framework='onnx'
model_name = f"ccfraud"
data = {
"name": model_name,
"visibility": "public",
"workspace_id": workspaceId,
"conversion": {
"framework": framework,
"python_version": "3.8",
"requirements": [],
"tensor_fields": ["tensor"]
}
}
files = {
"metadata": (None, json.dumps(data), "application/json"),
'file': (model_name, open('./ccfraud.onnx', 'rb'), "application/octet-stream")
}
response = requests.post(apiRequest, files=files, headers=headers).json()
display(response)
modelId=response['insert_models']['returning'][0]['models'][0]['id']
{'insert_models': {'returning': [{'models': [{'id': 31}]}]}}
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/get_by_id"
data = {
"id": modelId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{'id': 31,
'owner_id': '""',
'workspace_id': 24,
'name': 'ccfraud',
'updated_at': '2024-04-16T20:35:35.021389+00:00',
'created_at': '2024-04-16T20:35:35.021389+00:00',
'model_config': {'id': 49,
'runtime': 'onnx',
'tensor_fields': None,
'filter_threshold': None}}
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/list_versions"
data = {
"model_id": model_name,
"models_pk_id" : modelId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
[{'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'models_pk_id': 31,
'model_version': 'b4227586-327a-4484-bba0-a1a33fac3baf',
'owner_id': '""',
'model_id': 'ccfraud',
'id': 31,
'file_name': 'ccfraud',
'image_path': None,
'status': 'attempting_load_container'},
{'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'models_pk_id': 30,
'model_version': '3c3042bd-86cd-46ef-ac29-cfca1369d90e',
'owner_id': '""',
'model_id': 'ccfraud',
'id': 30,
'file_name': 'ccfraud.onnx',
'image_path': None,
'status': 'ready'}]
model_version_id = response[0]['id']
model_version = response[0]['model_version']
display(model_version)
model_sha = response[0]['sha']
display(model_sha)
'b4227586-327a-4484-bba0-a1a33fac3baf'
‘bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507’
For our ONNX model, we will insert a model configuration that will allow us to submit the field tensor
for an input.
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/insert_model_config"
data = {
"model_version_id": model_version_id,
"tensor_fields": [
"tensor"
]
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{'model_config': {'id': 50,
'model_version_id': 31,
'runtime': 'onnx',
'filter_threshold': None,
'tensor_fields': ['tensor'],
'input_schema': None,
'output_schema': None,
'batch_config': None,
'sidekick_uri': None}}
Create Pipeline in a Workspace with the /v1/api/pipelines/create
command. This creates a new pipeline in the specified workspace.
workspaceId
.{}
for none.For our example, we are setting the pipeline steps through the definition
field. This will direct inference requests to the model before output.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Create Pipeline in a Workspace
# Create pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/create"
pipeline_name=f"apiinferenceexamplepipeline"
data = {
"pipeline_id": pipeline_name,
"workspace_id": workspaceId,
"definition": {'steps': [{'ModelInference': {'models': [{'name': f'{model_name}', 'version': model_version, 'sha': model_sha}]}}]}
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
pipeline_id = response['pipeline_pk_id']
pipeline_variant_id=response['pipeline_variant_pk_id']
pipeline_variant_version=['pipeline_variant_version']
With the pipeline created and the model uploaded into the workspace, the pipeline can be deployed. This will allocate resources from the Kubernetes cluster hosting the Wallaroo instance and prepare the pipeline to process inference requests.
Pipelines are deployed through the MLOps API command /v1/api/pipelines/deploy
which takes the following parameters:
pipeline_variant_id
.models
are provided as a parameter.model_name
variable.Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Deploy a Pipeline
# Deploy Pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/deploy"
exampleModelDeployId=pipeline_name
data = {
"deploy_id": exampleModelDeployId,
"pipeline_version_pk_id": pipeline_variant_id,
"model_ids": [
modelId
],
"pipeline_id": pipeline_id
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
exampleModelDeploymentId=response['id']
# wait 45 seconds for the pipeline to complete deployment
import time
time.sleep(45)
{'id': 19}
This returns the deployment status - we’re waiting until the deployment has the status “Ready.”
Example: The deployed empty and model pipelines status will be displayed.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
# Get model pipeline deployment
api_request = f"{wl.api_endpoint}/v1/api/status/get_deployment"
data = {
"name": f"{pipeline_name}-{exampleModelDeploymentId}"
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.28.0.87',
'name': 'engine-5879f6ff68-glvjh',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'apiinferenceexamplepipeline',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'ccfraud',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running',
'version': 'b4227586-327a-4484-bba0-a1a33fac3baf'}]}}],
'engine_lbs': [{'ip': '10.28.3.224',
'name': 'engine-lb-d7cc8fc9c-zs4t8',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
The API command /admin/get_pipeline_external_url
retrieves the external inference URL for a specific pipeline in a workspace.
In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace
will have it’s workspace id stored and used for the /admin/get_pipeline_external_url
request with the pipeline urlpipeline
.
The External Inference URL will be stored as a variable for the next step.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Get External Inference URL
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
## Retrieve the pipeline's External Inference URL
apiRequest = f"{wl.api_endpoint}/v1/api/admin/get_pipeline_external_url"
data = {
"workspace_id": workspaceId,
"pipeline_name": pipeline_name
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
deployurl = response['url']
deployurl
'https://doc-test.wallarooexample.ai/v1/api/pipelines/infer/apiinferenceexamplepipeline-19/apiinferenceexamplepipeline'
The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.
For this example, the externalUrl
retrieved through the Get External Inference URL is used to submit a single inference request through the data file data-1.json
.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Perform Inference Through External URL
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'
## Inference through external URL using dataframe
# retrieve the json data to submit
data = [
{
"tensor":[
1.0678324729,
0.2177810266,
-1.7115145262,
0.682285721,
1.0138553067,
-0.4335000013,
0.7395859437,
-0.2882839595,
-0.447262688,
0.5146124988,
0.3791316964,
0.5190619748,
-0.4904593222,
1.1656456469,
-0.9776307444,
-0.6322198963,
-0.6891477694,
0.1783317857,
0.1397992467,
-0.3554220649,
0.4394217877,
1.4588397512,
-0.3886829615,
0.4353492889,
1.7420053483,
-0.4434654615,
-0.1515747891,
-0.2668451725,
-1.4549617756
]
}
]
# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
requests.post(
deployurl,
json=data,
headers=headers)
.json()
)
display(response.loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1713300490585 | {'dense_1': [0.0014974177]} |
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/vnd.apache.arrow.file'
# set accept as apache arrow table
headers['Accept']="application/vnd.apache.arrow.file"
# Submit arrow file
dataFile="./data/cc_data_10k.arrow"
data = open(dataFile,'rb').read()
response = requests.post(
deployurl,
headers=headers,
data=data,
verify=True
)
# Arrow table is retrieved
with pa.ipc.open_file(response.content) as reader:
arrow_table = reader.read_all()
# convert to Polars DataFrame and display the first 5 rows
display(arrow_table.to_pandas().head(5).loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1713300498196 | {'dense_1': [0.99300325]} |
1 | 1713300498196 | {'dense_1': [0.99300325]} |
2 | 1713300498196 | {'dense_1': [0.99300325]} |
3 | 1713300498196 | {'dense_1': [0.99300325]} |
4 | 1713300498196 | {'dense_1': [0.0010916889]} |
With the tutorial complete, we’ll undeploy the pipeline with /v1/api/pipelines/undeploy
and return the resources back to the Wallaroo instance.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Undeploy a Pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/undeploy"
data = {
"pipeline_id": pipeline_id,
"deployment_id":exampleModelDeploymentId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{}
Wallaroo supports the ability to perform inferences through the SDK and through the API for each deployed pipeline. For more information on how to use Wallaroo, see the Wallaroo Documentation Site for full details.