Wallaroo MLOps API Inferencing with Pipeline Inference URL Tutorial
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo API Inference Tutorial
Wallaroo provides the ability to perform inferences through deployed pipelines via the Wallaroo SDK and the Wallaroo MLOps API. This tutorial demonstrates performing inferences using the Wallaroo MLOps API.
This tutorial provides the following:
ccfraud.onnx
: A pre-trained credit card fraud detection model.data/cc_data_1k.arrow
,data/cc_data_10k.arrow
: Sample testing data in Apache Arrow format with 1,000 and 10,000 records respectively.wallaroo-model-endpoints-api.py
: A code-only version of this tutorial as a Python script.
This tutorial and sample data comes from the Machine Learning Group’s demonstration on Credit Card Fraud detection.
Prerequisites
The following is required for this tutorial:
- A deployed Wallaroo instance with Model Endpoints Enabled
- The following Python libraries:
Tutorial Goals
This demonstration provides a quick tutorial on performing inferences using the Wallaroo MLOps API using a deployed pipeline’s Inference URL. This following steps will be performed:
- Connect to a Wallaroo instance using the Wallaroo SDK and environmental variables. This bypasses the browser link confirmation for a seamless login, and provides a simple method of retrieving the JWT token used for Wallaroo MLOps API calls. For more information, see the Wallaroo SDK Essentials Guide: Client Connection and the Wallaroo MLOps API Essentials Guide.
- Create a workspace for our models and pipelines.
- Upload the
ccfraud
model. - Create a pipeline and add the
ccfraud
model as a pipeline step. - Run sample inferences with pandas DataFrame inputs and Apache Arrow inputs.
Retrieve Token
There are two methods of retrieving the JWT token used to authenticate to the Wallaroo instance’s API service:
- Wallaroo SDK. This method requires a Wallaroo based user.
- API Client Secret. This is the recommended method as it is user independent. It allows any valid user to make an inference request.
This tutorial will use the Wallaroo SDK method for convenience with environmental variables for a seamless login without browser validation. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.
API Request Methods
All Wallaroo API endpoints follow the format:
https://$WALLAROODOMAIN/v1/api$COMMAND
Where $COMMAND
is the specific endpoint. For example, for the command to list of workspaces in the Wallaroo instance would use the above format based on these settings:
$WALLAROODOMAIN
:example.wallaroo.ai
$COMMAND
:/workspaces/list
This would create the following API endpoint:
https://example.wallaroo.ai/v1/api/workspaces/list
Connect to Wallaroo
For this example, a connection to the Wallaroo SDK is used. This will be used to retrieve the JWT token for the MLOps API calls.
import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd
import os
import base64
import pyarrow as pa
import requests
from requests.auth import HTTPBasicAuth
import json
# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)
pd.set_option('display.max_columns', None)
# Login through local Wallaroo instance
wl = wallaroo.Client()
Retrieve the JWT Token
As mentioned earlier, there are multiple methods of authenticating to the Wallaroo instance for MLOps API calls. This tutorial will use the Wallaroo SDK method Wallaroo Client wl.auth.auth_header()
method, extracting the token from the response.
We will also use the wallaroo.client.Client.api_endpoint
that provides the MLOps API URL for the rest of our methods. This saved us from having to derive it from the DNS address. See the Wallaroo Documentation Site for more details on using the MLOps API and connection methods.
Reference: MLOps API Retrieve Token Through Wallaroo SDK
display(wl.api_endpoint)
display(wl.auth.auth_header())
'http://api-lb:8080'
{‘Authorization’: ‘Bearer eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJVLWZnXzZZcDc4Z1RxRWpsNTVUX2plc2k4c3VGTlZFdnNTQnY2WkF0MGVFIn0.eyJleHAiOjE3MjE2ODYyODYsImlhdCI6MTcyMTY4NjIyNiwiYXV0aF90aW1lIjoxNzIxNjgyODk3LCJqdGkiOiI3ZmZjYTA0OS1kZTkzLTRlYjgtYjE4MC05MDYwMzFjYWJiMzUiLCJpc3MiOiJodHRwczovL2F1dG9zY2FsZS11YXQtZ2NwLndhbGxhcm9vLmRldi9hdXRoL3JlYWxtcy9tYXN0ZXIiLCJhdWQiOlsibWFzdGVyLXJlYWxtIiwiYWNjb3VudCJdLCJzdWIiOiJmZjc3NTUyMC03MmI1LTRmOGYtYTc1NS1mM2NkMjhiODQ2MmYiLCJ0eXAiOiJCZWFyZXIiLCJhenAiOiJzZGstY2xpZW50Iiwic2Vzc2lvbl9zdGF0ZSI6IjcyMmJkNzg3LTFlOTktNDNhNC1iNTRkLWUwYTI0NDVmMTBjMCIsImFjciI6IjAiLCJyZWFsbV9hY2Nlc3MiOnsicm9sZXMiOlsiZGVmYXVsdC1yb2xlcy1tYXN0ZXIiLCJvZmZsaW5lX2FjY2VzcyIsInVtYV9hdXRob3JpemF0aW9uIl19LCJyZXNvdXJjZV9hY2Nlc3MiOnsibWFzdGVyLXJlYWxtIjp7InJvbGVzIjpbInZpZXctcmVhbG0iLCJ2aWV3LXVzZXJzIiwicXVlcnktZ3JvdXBzIiwicXVlcnktdXNlcnMiXX0sImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoiZW1haWwgb3BlbmlkIHByb2ZpbGUiLCJzaWQiOiI3MjJiZDc4Ny0xZTk5LTQzYTQtYjU0ZC1lMGEyNDQ1ZjEwYzAiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsImh0dHBzOi8vaGFzdXJhLmlvL2p3dC9jbGFpbXMiOnsieC1oYXN1cmEtdXNlci1pZCI6ImZmNzc1NTIwLTcyYjUtNGY4Zi1hNzU1LWYzY2QyOGI4NDYyZiIsIngtaGFzdXJhLXVzZXItZW1haWwiOiJqb2huLmh1bW1lbEB3YWxsYXJvby5haSIsIngtaGFzdXJhLWRlZmF1bHQtcm9sZSI6InVzZXIiLCJ4LWhhc3VyYS1hbGxvd2VkLXJvbGVzIjpbInVzZXIiXSwieC1oYXN1cmEtdXNlci1ncm91cHMiOiJ7fSJ9LCJuYW1lIjoiSm9obiBIYW5zYXJpY2siLCJwcmVmZXJyZWRfdXNlcm5hbWUiOiJqb2huLmh1bW1lbEB3YWxsYXJvby5haSIsImdpdmVuX25hbWUiOiJKb2huIiwiZmFtaWx5X25hbWUiOiJIYW5zYXJpY2siLCJlbWFpbCI6ImpvaG4uaHVtbWVsQHdhbGxhcm9vLmFpIn0.NEZiIQ7f4C2vp8MMD4C6VJVnWD5dGFH-OVgHURTab8S_kFURBjs6iNWULbDGHnHpTYQsN-1n8bJOmRxWyTe4UIpVummRl2fjgsovGW9lAzFHk2bIw5SvwqUwe3xwT07b8zrG1n4m-7WOp8B3oIIlYcqMiqB2qAE43dNadD2oqDXXpiDM_cEoODw_0wEeoW2df2r2-qirckDzlrY4_whJZ8PMlRXgwlwEvrUbA5hRxEkXqge5KJArgF-JMdq0kdtekRx1i0mgdBqK5TSdTqtG2AoEgoJq1-lN3R-onwl0vr8IVGld1diPUFiSYAA7mgAMI9sqWNOp7zVmyY1khYkimQ’}
Create Workspace
In a production environment, the Wallaroo workspace that contains the pipeline and models would be created and deployed. We will quickly recreate those steps using the MLOps API. If the workspace and pipeline have already been created through the Wallaroo SDK Inference Tutorial, then we can skip directly to Deploy Pipeline.
Workspaces are created through the MLOps API with the /v1/api/workspaces/create
command. This requires the workspace name be provided, and that the workspace not already exist in the Wallaroo instance.
Reference: MLOps API Create Workspace
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
# Create workspace
apiRequest = f"{wl.api_endpoint}/v1/api/workspaces/create"
workspace_name = f"apiinferenceexampleworkspace"
data = {
"workspace_name": workspace_name
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
# Stored for future examples
workspaceId = response['workspace_id']
{'workspace_id': 163}
Upload Model
The model is uploaded using the /v1/api/models/upload_and_convert
command. This uploads a ML Model to a Wallaroo workspace via POST with Content-Type: multipart/form-data
and takes the following parameters:
- Parameters
- name - (REQUIRED string): Name of the model
- visibility - (OPTIONAL string): The visibility of the model as either
public
orprivate
. - workspace_id - (REQUIRED int): The numerical id of the workspace to upload the model to. Stored earlier as
workspaceId
.
Directly after we will use the /models/list_versions
to retrieve model details used for later steps.
Reference: Wallaroo MLOps API Essentials Guide: Model Management: Upload Model to Workspace
import onnx
model = onnx.load("./ccfraud.onnx")
output =[node.name for node in model.graph.output]
input_all = [node.name for node in model.graph.input]
input_initializer = [node.name for node in model.graph.initializer]
net_feed_input = list(set(input_all) - set(input_initializer))
print('Inputs: ', net_feed_input)
print('Outputs: ', output)
Inputs: ['dense_input']
Outputs: ['dense_1']
## upload model
# Retrieve the token
headers = wl.auth.auth_header()
apiRequest = f"{wl.api_endpoint}/v1/api/models/upload_and_convert"
framework='onnx'
model_name = f"ccfraud"
data = {
"name": model_name,
"visibility": "public",
"workspace_id": workspaceId,
"conversion": {
"framework": framework,
"python_version": "3.8",
"requirements": [],
"tensor_fields": ["tensor"]
}
}
files = {
"metadata": (None, json.dumps(data), "application/json"),
'file': (model_name, open('./ccfraud.onnx', 'rb'), "application/octet-stream")
}
response = requests.post(apiRequest, files=files, headers=headers).json()
display(response)
modelId=response['insert_models']['returning'][0]['models'][0]['id']
{'insert_models': {'returning': [{'models': [{'id': 275}]}]}}
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/get_by_id"
data = {
"id": modelId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{'id': 275,
'owner_id': '""',
'workspace_id': 163,
'name': 'ccfraud',
'updated_at': '2024-07-22T22:11:06.958587+00:00',
'created_at': '2024-07-22T22:11:06.958587+00:00',
'model_config': {'id': 323,
'runtime': 'onnx',
'tensor_fields': None,
'filter_threshold': None}}
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/list_versions"
data = {
"model_id": model_name,
"models_pk_id" : modelId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
[{'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'models_pk_id': 273,
'model_version': 'f210d271-f966-4dda-9ba6-26212c605bba',
'owner_id': 'ff775520-72b5-4f8f-a755-f3cd28b8462f',
'model_id': 'ccfraud',
'id': 273,
'file_name': 'ccfraud.onnx',
'image_path': None,
'status': 'ready'},
{'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'models_pk_id': 273,
'model_version': '5ff6480c-00b1-4a47-abef-990848bab985',
'owner_id': 'ff775520-72b5-4f8f-a755-f3cd28b8462f',
'model_id': 'ccfraud',
'id': 274,
'file_name': 'ccfraud.onnx',
'image_path': None,
'status': 'ready'},
{'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'models_pk_id': 275,
'model_version': 'c542fcf2-1878-461a-b346-44a2480d9eda',
'owner_id': 'ff775520-72b5-4f8f-a755-f3cd28b8462f',
'model_id': 'ccfraud',
'id': 275,
'file_name': 'ccfraud',
'image_path': None,
'status': 'pending_load_container'}]
model_version_id = response[0]['id']
model_version = response[0]['model_version']
display(model_version)
model_sha = response[0]['sha']
display(model_sha)
'f210d271-f966-4dda-9ba6-26212c605bba'
‘bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507’
For our ONNX model, we will insert a model configuration that will allow us to submit the field tensor
for an input.
# Get the model details
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/models/insert_model_config"
data = {
"model_version_id": model_version_id,
"tensor_fields": [
"tensor"
]
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{'model_config': {'id': 324,
'model_version_id': 273,
'runtime': 'onnx',
'filter_threshold': None,
'tensor_fields': ['tensor'],
'input_schema': None,
'output_schema': None,
'batch_config': None,
'sidekick_uri': None}}
Create Pipeline
Create Pipeline in a Workspace with the /v1/api/pipelines/create
command. This creates a new pipeline in the specified workspace.
- Parameters
- pipeline_id - (REQUIRED string): Name of the new pipeline.
- workspace_id - (REQUIRED int): Numerical id of the workspace for the new pipeline. Stored earlier as
workspaceId
. - definition - (REQUIRED string): Pipeline definitions, can be
{}
for none.
For our example, we are setting the pipeline steps through the definition
field. This will direct inference requests to the model before output.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Create Pipeline in a Workspace
# Create pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/create"
pipeline_name=f"apiinferenceexamplepipeline"
data = {
"pipeline_id": pipeline_name,
"workspace_id": workspaceId,
"definition": {'steps': [{'ModelInference': {'models': [{'name': f'{model_name}', 'version': model_version, 'sha': model_sha}]}}]}
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
pipeline_id = response['pipeline_pk_id']
pipeline_variant_id=response['pipeline_variant_pk_id']
pipeline_variant_version=['pipeline_variant_version']
Deploy Pipeline
With the pipeline created and the model uploaded into the workspace, the pipeline can be deployed. This will allocate resources from the Kubernetes cluster hosting the Wallaroo instance and prepare the pipeline to process inference requests.
Pipelines are deployed through the MLOps API command /v1/api/pipelines/deploy
which takes the following parameters:
- Parameters
- deploy_id (REQUIRED string): The name for the pipeline deployment.
- engine_config (OPTIONAL string): Additional configuration options for the pipeline.
- pipeline_version_pk_id (REQUIRED int): Pipeline version id. Captured earlier as
pipeline_variant_id
. - model_configs (OPTIONAL Array int): Ids of model configs to apply.
- model_ids (OPTIONAL Array int): Ids of models to apply to the pipeline. If passed in, model_configs will be created automatically.
- models (OPTIONAL Array models): If the model ids are not available as a pipeline step, the models’ data can be passed to it through this method. The options below are only required if
models
are provided as a parameter.- name (REQUIRED string): Name of the uploaded model that is in the same workspace as the pipeline. Captured earlier as the
model_name
variable. - version (REQUIRED string): Version of the model to use.
- sha (REQUIRED string): SHA value of the model.
- name (REQUIRED string): Name of the uploaded model that is in the same workspace as the pipeline. Captured earlier as the
- pipeline_id (REQUIRED int): Numerical value of the pipeline to deploy.
- Returns
- id (int): The deployment id.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Deploy a Pipeline
# Deploy Pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/deploy"
exampleModelDeployId=pipeline_name
data = {
"deploy_id": exampleModelDeployId,
"pipeline_version_pk_id": pipeline_variant_id,
"model_ids": [
modelId
],
"pipeline_id": pipeline_id
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
exampleModelDeploymentId=response['id']
# wait 45 seconds for the pipeline to complete deployment
import time
time.sleep(45)
{'id': 98}
Get Deployment Status
This returns the deployment status - we’re waiting until the deployment has the status “Ready.”
- Parameters
- name - (REQUIRED string): The deployment in the format {deployment_name}-{deploymnent-id}.
Example: The deployed empty and model pipelines status will be displayed.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
# Get model pipeline deployment
api_request = f"{wl.api_endpoint}/v1/api/status/get_deployment"
data = {
"name": f"{pipeline_name}-{exampleModelDeploymentId}"
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.4.3.7',
'name': 'engine-84c8497b5f-8mz5t',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'apiinferenceexamplepipeline',
'status': 'Running',
'version': '5393ebaf-84db-47ed-bea0-9e9b65050c9e'}]},
'model_statuses': {'models': [{'name': 'ccfraud',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running',
'version': 'f210d271-f966-4dda-9ba6-26212c605bba'}]}}],
'engine_lbs': [{'ip': '10.4.3.6',
'name': 'engine-lb-75cf576f7f-npmdh',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
Get External Inference URL
The API command /admin/get_pipeline_external_url
retrieves the external inference URL for a specific pipeline in a workspace.
- Parameters
- workspace_id (REQUIRED integer): The workspace integer id.
- pipeline_name (REQUIRED string): The name of the pipeline.
In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace
will have it’s workspace id stored and used for the /admin/get_pipeline_external_url
request with the pipeline urlpipeline
.
The External Inference URL will be stored as a variable for the next step.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Get External Inference URL
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
## Retrieve the pipeline's External Inference URL
apiRequest = f"{wl.api_endpoint}/v1/api/admin/get_pipeline_external_url"
data = {
"workspace_id": workspaceId,
"pipeline_name": pipeline_name
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
deployurl = response['url']
deployurl
'https://autoscale-uat-gcp.wallaroo.dev/v1/api/pipelines/infer/apiinferenceexamplepipeline-98/apiinferenceexamplepipeline'
Perform Inference Through External URL
The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.
For this example, the externalUrl
retrieved through the Get External Inference URL is used to submit a single inference request through the data file data-1.json
.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Perform Inference Through External URL
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'
## Inference through external URL using dataframe
# retrieve the json data to submit
data = [
{
"tensor":[
1.0678324729,
0.2177810266,
-1.7115145262,
0.682285721,
1.0138553067,
-0.4335000013,
0.7395859437,
-0.2882839595,
-0.447262688,
0.5146124988,
0.3791316964,
0.5190619748,
-0.4904593222,
1.1656456469,
-0.9776307444,
-0.6322198963,
-0.6891477694,
0.1783317857,
0.1397992467,
-0.3554220649,
0.4394217877,
1.4588397512,
-0.3886829615,
0.4353492889,
1.7420053483,
-0.4434654615,
-0.1515747891,
-0.2668451725,
-1.4549617756
]
}
]
# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
requests.post(
deployurl,
json=data,
headers=headers)
.json()
)
display(response.loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1721686314102 | {'dense_1': [0.0014974177]} |
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/vnd.apache.arrow.file'
# set accept as apache arrow table
headers['Accept']="application/vnd.apache.arrow.file"
# Submit arrow file
dataFile="./data/cc_data_10k.arrow"
data = open(dataFile,'rb').read()
response = requests.post(
deployurl,
headers=headers,
data=data,
verify=True
)
# Arrow table is retrieved
with pa.ipc.open_file(response.content) as reader:
arrow_table = reader.read_all()
# convert to Polars DataFrame and display the first 5 rows
display(arrow_table.to_pandas().head(5).loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1721686314158 | {'dense_1': [0.99300325]} |
1 | 1721686314158 | {'dense_1': [0.99300325]} |
2 | 1721686314158 | {'dense_1': [0.99300325]} |
3 | 1721686314158 | {'dense_1': [0.99300325]} |
4 | 1721686314158 | {'dense_1': [0.0010916889]} |
Undeploy the Pipeline
With the tutorial complete, we’ll undeploy the pipeline with /v1/api/pipelines/undeploy
and return the resources back to the Wallaroo instance.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Undeploy a Pipeline
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
apiRequest = f"{wl.api_endpoint}/v1/api/pipelines/undeploy"
data = {
"pipeline_id": pipeline_id,
"deployment_id":exampleModelDeploymentId
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
display(response)
{}
Wallaroo supports the ability to perform inferences through the SDK and through the API for each deployed pipeline. For more information on how to use Wallaroo, see the Wallaroo Documentation Site for full details.