This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
The following tutorial demonstrates using the Wallaroo MLOps API to retrieve Wallaroo metrics data for a Hugging Face summarization model. These requests are compliant with Prometheus API endpoints.
This tutorial is split into two sections:
This tutorial assumes the following:
This part of the tutorial generates the inference results used for the rest of the tutorial.
The first step is to import the libraries required.
import json
import numpy as np
import pandas as pd
import pytz
import datetime
import requests
from requests.auth import HTTPBasicAuth
import wallaroo
import pyarrow as pa
import time
A connection to Wallaroo is established via the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
wl = wallaroo.Client()
Next create the Wallaroo workspace and set it as the default workspace for this session - from this point on, model uploads and other commands will default to this workspace.
The workspace id is stored for further use.
workspace = wl.get_workspace(name="metric-retrieval-summarization-tutorial", create_if_not_exist=True)
wl.set_current_workspace(workspace)
{'name': 'metric-retrieval-summarization-tutorial', 'id': 1716, 'archived': False, 'created_by': '7d603858-88e0-472e-8f71-e41094afd7ec', 'created_at': '2025-08-11T16:56:50.306331+00:00', 'models': [], 'pipelines': []}
This monitoring pipeline consists of a Hugging Face sentiment analyzer step.
The following model is used:
toxic_bert
: A Hugging Face Text Classification model that evaluates LLM outputs and outputs an array of scores including:identity_hate
insult
obscene
severe_toxic
threat
toxic
We upload the model via the wallaroo.client.Client.upload_models
method.
# upload the sentiment analyzer
input_schema = pa.schema([
pa.field('inputs', pa.string()), # required
pa.field('top_k', pa.int64()),
])
output_schema = pa.schema([
pa.field('label', pa.list_(pa.string(), list_size=6)), # list with a number of items same as top_k, list_size can be skipped but may lead in worse performance
pa.field('score', pa.list_(pa.float64(), list_size=6)), # list with a number of items same as top_k, list_size can be skipped but may lead in worse performance
])
framework=wallaroo.framework.Framework.HUGGING_FACE_TEXT_CLASSIFICATION
model_name = "toxic-bert-analysis"
model_file_name = './models/unitary-toxic-bert.zip'
bert_model = wl.upload_model(model_name,
model_file_name,
framework=framework,
input_schema=input_schema,
output_schema=output_schema,
convert_wait=False)
while bert_model.status() != "ready" and bert_model.status() != "error":
print(bert_model.status())
time.sleep(10)
print(bert_model.status())
pending_load_container
pending_load_container
pending_load_container
pending_load_container
attempting_load_container
attempting_load_container
attempting_load_container
ready
Models are deployed through the following process:
For more details of this process, see ML Operations: Inference
# create the pipeline
pipeline_name = "metrics-retrieval-tutorial-summarization-pipeline"
pipeline = wl.build_pipeline(pipeline_name)
# add the model as a pipeline step
pipeline.add_model_step(bert_model)
# set the deployment configuration for 4 cpus, 1 replica, 8 Gi RAM
deployment_config = wallaroo.DeploymentConfigBuilder() \
.cpus(0.25).memory('1Gi') \
.sidekick_cpus(bert_model, 4) \
.sidekick_memory(bert_model, "8Gi") \
.build()
# deploy the pipeline
pipeline.deploy(deployment_config=deployment_config, wait_for_status=False)
# saved for later steps
deploy = pipeline._deployment
Deployment initiated for metrics-retrieval-tutorial-summarization-pipeline. Please check pipeline status.
# wait until deployment is complete before continuing
import time
time.sleep(15)
while pipeline.status()['status'] != 'Running':
time.sleep(15)
print("Waiting for deployment.")
pipeline.status()['status']
pipeline.status()['status']
Waiting for deployment.
Waiting for deployment.
'Running'
The following sample inferences are used to generate inference logs records. Metric retrieval works best with a longer history of inference results; feel free to rerun this section as needed to create additional records for further testing.
The following will run for one minute.
import time
timeout = time.time() + 60 # 1 minutes from now
while True:
if time.time() > timeout:
break
pipeline.infer_from_file("./data/sample_input.json")
The following retrieves the inference log results for the pipeline.
pipeline.logs()
Warning: There are more logs available. Please set a larger limit or request a file using export_logs.
time | in.inputs | in.top_k | out.label | out.score | anomaly.count | |
---|---|---|---|---|---|---|
0 | 2025-08-11 18:11:26.873 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
1 | 2025-08-11 18:11:26.350 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
2 | 2025-08-11 18:11:25.860 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
3 | 2025-08-11 18:11:25.350 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
4 | 2025-08-11 18:11:24.845 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
... | ... | ... | ... | ... | ... | ... |
95 | 2025-08-11 18:10:35.855 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
96 | 2025-08-11 18:10:35.372 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
97 | 2025-08-11 18:10:34.862 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
98 | 2025-08-11 18:10:34.370 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
99 | 2025-08-11 18:10:33.828 | Wallaroo.AI is an AI platform that enables dev... | 6 | [toxic, obscene, insult, identity_hate, threat... | [0.0006922021857462823, 0.00018145183275919408... | 0 |
100 rows × 6 columns
With this part complete, we undeploy the pipeline.
pipeline.undeploy()
name | metrics-retrieval-tutorial-summarization-pipeline |
---|---|
created | 2025-08-11 17:03:20.112487+00:00 |
last_updated | 2025-08-11 17:03:20.687997+00:00 |
deployed | False |
workspace_id | 1716 |
workspace_name | metric-retrieval-summarization-tutorial |
arch | x86 |
accel | none |
tags | |
versions | bce33e24-033e-44d8-ae84-2f563e503c61, e76a23c3-9174-4c89-a613-e41b05bd462a |
steps | toxic-bert-analysis |
published | False |
The Wallaroo MLOps API allows for metrics retrieval. These are used to track:
These inference endpoints are compliant with Prometheus endpoints.
pipelineID
: The pipeline’s numerical identifier, retrieved from the Wallaroo SDK with wallaroo.pipeline.Pipeline.name()
. For example:
pipeline.name()
sample-pipeline-name
deployment_id
: The Kubernetes namespace for the deployment.
English Name | Parameterized Query | Example Query | Description |
---|---|---|---|
Requests per second | sum by (pipeline_name) (rate(latency_histogram_ns_count{pipeline_name="{pipelineID}"}[{step}s])) | sum by (deploy_id) (rate(latency_histogram_ns_count{deploy_id="deployment_id"}[10s])) | Number of processed requests per second to a pipeline. |
Cluster inference rate | sum by (pipeline_name) (rate(tensor_throughput_batch_count{pipeline_name="{pipelineID}"}[{step}s])) | sum by (deploy_id) (rate(tensor_throughput_batch_count{deploy_id="deployment_id"}[10s])) | Number of inferences processed per second. This notably differs from requests per second when batch inference requests are made. |
P50 inference latency | histogram_quantile(0.50, sum(rate(latency_histogram_ns_bucket{{deploy_id="{deploy_id}"}}[{step_interval}])) by (le)) / 1e6 | histogram_quantile(0.50, sum(rate(latency_histogram_ns_bucket{deploy_id="deployment_id"}[10s])) by (le)) / 1e6 | Histogram for P90 total inference time spent per message in an engine, includes transport to and from the sidekick in the case there is one. |
P95 inference latency | histogram_quantile(0.95, sum(rate(latency_histogram_ns_bucket{{deploy_id="{deploy_id}"}}[{step_interval}])) by (le)) / 1e6 | histogram_quantile(0.95, sum(rate(latency_histogram_ns_bucket{deploy_id="deployment_id"}[10s])) by (le)) / 1e6 | Histogram for P95 total inference time spent per message in an engine, includes transport to and from the sidekick in the case there is one. |
P99 inference latency | histogram_quantile(0.99, sum(rate(latency_histogram_ns_bucket{{deploy_id="{deploy_id}"}}[{step_interval}])) by (le)) / 1e6 | histogram_quantile(0.99, sum(rate(latency_histogram_ns_bucket{deploy_id="deployment_id"}[10s])) by (le)) / 1e6 | Histogram for P99 total inference time spent per message in an engine, includes transport to and from the sidekick in the case there is one. |
Engine replica count | count(container_memory_usage_bytes{namespace="{pipeline_namespace}", container="engine"}) or vector(0) | count(container_memory_usage_bytes{namespace="deployment_id", container="engine"}) or vector(0) | Number of engine replicas currently running in a pipeline |
Sidekick replica count | count(container_memory_usage_bytes{namespace="{pipeline_namespace}", container=~"engine-sidekick-.*"}) or vector(0) | count(container_memory_usage_bytes{namespace="deployment_id", container=~"engine-sidekick-.*"}) or vector(0) | Number of sidekick replicas currently running in a pipeline |
Output tokens per second (TPS) | sum by (kubernetes_namespace) (rate(vllm:generation_tokens_total{kubernetes_namespace="{pipeline_namespace}"}[{step_interval}])) | sum by (kubernetes_namespace) (rate(vllm:generation_tokens_total{kubernetes_namespace="deployment_id"}[10s])) | LLM output tokens per second: this is the number of tokens generated per second for a LLM deployed in Wallaroo with vLLM |
P99 Time to first token (TTFT) | histogram_quantile(0.99, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="{pipeline_namespace}"}[{step_interval}])) by (le)) * 1000 | histogram_quantile(0.99, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="deployment_id"}[10s])) by (le)) * 1000 | P99 time to first token: P99 for time to generate the first token for LLMs deployed in Wallaroo with vLLM |
P95 Time to first token (TTFT) | histogram_quantile(0.95, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="{pipeline_namespace}"}[{step_interval}])) by (le)) * 1000 | histogram_quantile(0.95, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="deployment_id"}[10s])) by (le)) * 1000 | P95 time to first token: P95 for time to generate the first token for LLMs deployed in Wallaroo with vLLM |
P50 Time to first token (TTFT) | histogram_quantile(0.50, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="{pipeline_namespace}"}[{step_interval}])) by (le)) * 1000 | histogram_quantile(0.50, sum(rate(vllm:time_to_first_token_seconds_bucket{kubernetes_namespace="deployment_id"}[10s])) by (le)) * 1000 | P50 time to first token: P50 for time to generate the first token for LLMs deployed in Wallaroo with vLLM |
/v1/api/metrics/query
(GET)/v1/api/metrics/query
(POST)For full details, see the Wallaroo MLOps API Reference Guide
Parameter | Type | Description |
---|---|---|
query | String | The Prometheus expression query string. |
time | String | The evaluation timestamp in either RFC3339 format or Unix timestamp. |
timeout | String | The evaluation timeout in duration format (5m for 5 minutes, etc). |
Field | Type | Description | |
---|---|---|---|
status | String | The status of the request of either success or error . | |
data | Dict | The response data. | |
data.resultType | String | The type of query result. | |
data.result | String | DateTime of the model’s creation. | |
errorType | String | The error type if status is error . | |
errorType | String | The error messages if status is error . | |
warnings | Array[String] | An array of error messages. |
/v1/api/metrics/query_range
(GET)/v1/api/metrics/query_range
(POST)For full details, see the Wallaroo MLOps API Reference Guide
Parameter | Type | Description |
---|---|---|
query | String | The Prometheus expression query string. |
start | String | The starting timestamp in either RFC3339 format or Unix timestamp, inclusive. |
end | String | The ending timestamp in either RFC3339 format or Unix timestamp. |
step | String | Query resolution step width in either duration format or as a float number of seconds. |
timeout | String | The evaluation timeout in duration format (5m for 5 minutes, etc). |
Field | Type | Description | |
---|---|---|---|
status | String | The status of the request of either success or error . | |
data | Dict | The response data. | |
resultType | String | The type of query result. For query range, always matrix . | |
result | String | DateTime of the model’s creation. | |
errorType | String | The error type if status is error . | |
errorType | String | The error messages if status is error . | |
warnings | Array[String] | An array of error messages. |
The following request shows an example of a Query Range request for requests per second. For this example, the following Wallaroo SDK methods are used:
wl.api_endpoint
: Retrieves the API endpoint for the Wallaroo Ops server.wl.auth.auth_header()
: Retrieves the authentication bearer tokens.# set prometheus requirements
pipeline_id = pipeline_name # the name of the pipeline
step = "1m" # the step of the calculation
# this will also format the timezone in the parsing section
timezone = "US/Central"
selected_timezone = pytz.timezone(timezone)
# Define the start and end times
# adjust these values based on your your historical data.
data_start = selected_timezone.localize(datetime.datetime(2025, 8, 10, 9, 0, 0))
data_end = selected_timezone.localize(datetime.datetime(2025, 8, 12, 9, 59, 59))
# this is the URL to get prometheus metrics
query_url = f"{wl.api_endpoint}/v1/metrics/api/v1/query_range"
# Retrieve the token
headers = wl.auth.auth_header()
# Convert to UTC and get the Unix timestamps
start_timestamp = int(data_start.astimezone(pytz.UTC).timestamp())
end_timestamp = int(data_end.astimezone(pytz.UTC).timestamp())
query_rps = f'sum by (pipeline_name) (rate(latency_histogram_ns_count{{pipeline_name="{pipeline_id}"}}[{step}]))'
#request parameters
params_rps = {
'query': query_rps,
'start': start_timestamp,
'end': end_timestamp,
'step': step
}
response_rps = requests.get(query_url, headers=headers, params=params_rps)
if response_rps.status_code == 200:
print("Requests Per Second Data:")
# display(response_rps.json()['data']['result'][0]['values'])
display(list(filter(lambda x: x[1] != '0', response_rps.json()['data']['result'][0]['values'])))
else:
print("Failed to fetch RPS data:", response_rps.status_code, response_rps.text)
Requests Per Second Data:
[[1754932200, '1.8166157575757578'],
[1754935860, '1.0545454545454545'],
[1754935920, '0.77652']]
The following shows the query inference rate.
query_inference_rate = f'sum by (pipeline_name) (rate(tensor_throughput_batch_count{{pipeline_name="{pipeline_id}"}}[{step}]))'
# inference rte
params_inference_rate = {
'query': query_inference_rate,
'start': start_timestamp,
'end': end_timestamp,
'step': step
}
response_inference_rate = requests.get(query_url, headers=headers, params=params_inference_rate)
if response_inference_rate.status_code == 200:
print("Cluster Inference Rate Data:")
# filter out non-zero
display(list(filter(lambda x: x[1] != '0', response_rps.json()['data']['result'][0]['values'])))
else:
print("Failed to fetch Inference Rate data:", response_inference_rate.status_code, response_inference_rate.text)
Cluster Inference Rate Data:
[[1754932200, '1.8166157575757578'],
[1754935860, '1.0545454545454545'],
[1754935920, '0.77652']]