Wallaroo MLOps API Essentials Guide: Pipeline Management
Pipeline Naming Requirements
Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. .
and _
are not allowed.
Pipeline Management
Pipelines are managed through the Wallaroo API or the Wallaroo SDK. Pipelines are the vehicle used for deploying, serving, and monitoring ML models. For more information, see the Wallaroo Glossary.
Create Pipeline in a Workspace
- Endpoint:
/v1/api/pipelines/create
Creates a new pipeline in the specified workspace.
Create Pipeline in a Workspace Parameters
Field | Â | Type | Description |
---|---|---|---|
pipeline_id | Â | String (Required) | Name of the new pipeline. |
workspace_id | Â | Integer (Required) | Numerical id of the workspace for the new pipeline. |
definition | Â | String (Required) | Pipeline definitions, can be {} for none. This is where the pipeline steps are set. |
 | steps | List[steps] | The pipeline steps to add to the pipeline. |
Model Inference Pipeline Step
Pipeline steps from models follow the following schema.
{
"ModelInference": {
"models": [
{
"name": "{name of model: String}",
"version": {model version: Integer},
"sha": "{model sha: String}"
}
]
}
}
Create Pipeline in a Workspace Returns
Field | Type | Description |
---|---|---|
pipeline_pk_id | Integer | The pipeline id. |
pipeline_variant_pk_id | Integer | The pipeline version id. |
pipeline_variant_version | String | The pipeline version UUID identifier. |
Create Pipeline in a Workspace Examples
Two pipelines are created in the workspace id 10
. This assumes that the workspace is created and has models uploaded to it.
One pipeline is an empty pipeline without any models.
For the other pipeline, sample models are uploaded then added pipeline.
The pipeline id, variant id, and variant version of each pipeline will be stored for later examples.
Create empty pipeline via Requests.
# Create pipeline in a workspace
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/create"
example_workspace_id = 10
data = {
"pipeline_id": "api-empty-pipeline",
"workspace_id": example_workspace_id,
"definition": {}
}
response = requests.post(endpoint,
json=data,
headers=headers,
verify=True).json()
empty_pipeline_id = response['pipeline_pk_id']
empty_pipeline_variant_id=response['pipeline_variant_pk_id']
example_pipeline_variant_version=['pipeline_variant_version']
display(json.dumps(response))
'{"pipeline_pk_id": 25, "pipeline_variant_pk_id": 26, "pipeline_variant_version": "c29a277a-10b9-48f9-a738-aafb296df8c2"}'
Create empty pipeline via curl.
curl {wl.api_endpoint}/v1/api/pipelines/create \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{"pipeline_id": "api-empty-pipeline", "workspace_id": {example_workspace_id},"definition": {{}} }}'
{"pipeline_pk_id":25,"pipeline_variant_pk_id":27,"pipeline_variant_version":"f6241f32-85a8-4ad8-9e71-da2763717811"}
Create pipeline with model steps via Requests.
# Create pipeline in a workspace with models
# First upload a model
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/models/upload_and_convert"
workspace_id = 10
framework='onnx'
example_model_name = f"api-sample-model"
metadata = {
"name": example_model_name,
"visibility": "public",
"workspace_id": workspace_id,
"conversion": {
"framework": framework,
"python_version": "3.8",
"requirements": []
}
}
files = {
"metadata": (None, json.dumps(metadata), "application/json"),
'file': (example_model_name, open('./models/ccfraud.onnx', 'rb'), "application/octet-stream")
}
response = requests.post(endpoint, files=files, headers=headers).json()
example_model_id = response['insert_models']['returning'][0]['models'][0]['id']
# Second, get the model version
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/models/list_versions"
data = {
"model_id": example_model_name,
"models_pk_id": example_model_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
example_model_sha = response[-1]['sha']
example_model_version = response[-1]['model_version']
# Now create the pipeline with the new model
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/create"
data = {
"pipeline_id": "api-pipeline-with-models",
"workspace_id": example_workspace_id,
"definition": {
'steps': [
{
'ModelInference':
{
'models':
[
{
'name': example_model_name,
'version': example_model_version,
'sha': example_model_sha
}
]
}
}
]
}
}
response = requests.post(endpoint,
json=data,
headers=headers,
verify=True).json()
display(json.dumps(response))
# saved for later steps
model_pipeline_id = response['pipeline_pk_id']
model_pipeline_variant_id=response['pipeline_variant_pk_id']
model_pipeline_variant_version=['pipeline_variant_version']
'{"pipeline_pk_id": 28, "pipeline_variant_pk_id": 29, "pipeline_variant_version": "5d0326fa-6753-4252-bb56-3b2106a8c671"}'
Create pipeline with model steps via curl.
curl {wl.api_endpoint}/v1/api/pipelines/create \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"pipeline_id": "api-pipeline-with-models", \
"workspace_id": {example_workspace_id}, \
"definition": {{ \
"steps": [ \
{{ \
"ModelInference": \
{{ \
"models": \
[ \
{{ \
"name": "{example_model_name}", \
"version": "{example_model_version}", \
"sha": "{example_model_sha}" \
}} \
] \
}} \
}} \
] \
}} \
}}'
{
"pipeline_pk_id": 28,
"pipeline_variant_pk_id": 30,
"pipeline_variant_version": "82148e63-3950-4ec4-99f7-b9a22212bfdf"
}
GPU Support
Wallaroo 2023.2.1 and above supports Kubernetes nodepools with Nvidia Cuda GPUs.
See the Create GPU Nodepools for Kubernetes Clusters guide for instructions on adding GPU enabled nodepools to a Kubernetes cluster.
IMPORTANT NOTE
If allocating GPUs to a Wallaroo pipeline, thedeployment_label
configuration option must be used.Deploy a Pipeline
- Endpoint:
/v1/api/pipelines/deploy
Deploy a an existing pipeline. Note that for any pipeline that has model steps, they must be included either in model_configs
, model_ids
or models
.
These updates can be edited in the Wallaroo Dashboard after the initial deployment. For more details, see Deployment Configuration via the Wallaroo Dashboard.
Deploy a Pipeline Parameters
Field | Â | Type | Description |
---|---|---|---|
deploy_id | Â | String (REQUIRED) | The name for the pipeline deployment. This must match the name of the pipeline being deployed. |
engine_config | Â | String (OPTIONAL) | Additional configuration options for the pipeline. |
pipeline_version_pk_id | Â | Integer REQUIRED) | Pipeline version id. |
model_configs | Â | List[Integer] (OPTIONAL) | Ids of model configs to apply. |
model_ids | Â | List[Integer] (OPTIONAL) | Ids of models to apply to the pipeline. If passed in, model_configs will be created automatically. |
models | Â | List[models] (OPTIONAL) | If the model ids are not available as a pipeline step, the models’ data can be passed to it through this method. The options below are only required if models are provided as a parameter. |
 | name | String (REQUIRED) | Name of the uploaded model that is in the same workspace as the pipeline. |
 | version | String (REQUIRED) | Version of the model to use. |
 | sha | String (REQUIRED) | SHA value of the model. |
pipeline_id | Â | *Integer (REQUIRED) | Numerical value of the pipeline to deploy. |
Deploy a Pipeline Returns
Field | Type | Description |
---|---|---|
id | Integer | The deployment id. |
Deploy a Pipeline Returns
The pipeline with models created in the step Create Pipeline in a Workspace will be deployed and their deployment information saved for later examples.
Deploy pipeline via Requests.
# Deploy a pipeline with models
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/deploy"
# verify this matches the pipeline with model created earlier
pipeline_with_models_id = "api-pipeline-with-models"
data = {
"deploy_id": pipeline_with_models_id,
"pipeline_version_pk_id": model_pipeline_variant_id,
"models": [
{
"name": example_model_name,
"version":example_model_version,
"sha":example_model_sha
}
],
"pipeline_id": model_pipeline_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
display(response)
model_deployment_id=response['id']
{'id': 14}
Deploy pipeline via curl.
curl {wl.api_endpoint}/v1/api/pipelines/deploy \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"deploy_id": "{pipeline_with_models_id}", \
"pipeline_version_pk_id": {model_pipeline_variant_id}, \
"models": [ \
{{ \
"name": "{example_model_name}", \
"version": "{example_model_version}", \
"sha": "{example_model_sha}" \
}} \
], \
"pipeline_id": {model_pipeline_id} \
}}'
{"id":14}
Get Deployment Status
- Endpoint:
/v1/api/status/get_deployment
Get Deployment Status Parameters
Field | Type | Description |
---|---|---|
id | String (Required) | The deployment in the format {deployment_name}-{deployment-id} . |
Get Deployment Status Returns
Field | Type | Description |
---|---|---|
status | String | Status of the pipeline deployment. Values are: Running : the Deployment successfully started. Starting : The Deployment is still loading. Error : There is an error with the deployment. |
details | List[details] | The list of deployment details. |
engines | List[engines] | A list of engines deployed in the pipeline. |
engine_lbs | List[engine_lbs] | A list of engine load balancers. |
sidekicks | List[sidekicks] | A list of deployment sidekicks. These are used for Containerized Deployment Runtimes. |
Get Deployment Status Examples
The deployed pipeline with model details from the previous step is displayed.
Get deployment status via Requests.
# Retrieve the token
headers = wl.auth.auth_header()
# Get model pipeline deployment
endpoint = f"{wl.api_endpoint}/v1/api/status/get_deployment"
data = {
"name": f"{pipeline_with_models_id}-{model_deployment_id}"
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.3.145',
'name': 'engine-797d8958d9-fsszh',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'api-pipeline-with-models',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'api-sample-model',
'version': 'bdfc8c60-b5bc-4c0e-aa87-157cd52895b6',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.4.157',
'name': 'engine-lb-584f54c899-qfjgp',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
Get deployment status via curl.
curl {wl.api_endpoint}/v1/api/status/get_deployment \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"name": "{pipeline_with_models_id}-{model_deployment_id}" \
}}'
{
"status": "Running",
"details": [],
"engines": [
{
"ip": "10.244.3.145",
"name": "engine-797d8958d9-fsszh",
"status": "Running",
"reason": null,
"details": [],
"pipeline_statuses": {
"pipelines": [{ "id": "api-pipeline-with-models", "status": "Running" }]
},
"model_statuses": {
"models": [
{
"name": "api-sample-model",
"version": "bdfc8c60-b5bc-4c0e-aa87-157cd52895b6",
"sha": "bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507",
"status": "Running"
}
]
}
}
],
"engine_lbs": [
{
"ip": "10.244.4.157",
"name": "engine-lb-584f54c899-qfjgp",
"status": "Running",
"reason": null,
"details": []
}
],
"sidekicks": []
}
Target Architecture for ARM Deployment
Model deployments inherit the architecture of the model. No additional deployment configurations are required to set the architecture.
Note that that architecture must be available in the cluster Wallaroo is deployed on. For instructions on adding ARM
nodepools to a cluster, see Create ARM Nodepools for Kubernetes Clusters.
The following example shows a model uploaded with the architecture set to arm
, and the deployment configuration inheriting that setting.
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/models/upload_and_convert"
workspace_id = 10
framework='onnx'
model_name = f"api-sample-model"
metadata = {
"name": model_name,
"visibility": "public",
"workspace_id": workspace_id,
"conversion": {
"framework": framework,
"python_version": "3.8",
"requirements": [],
"arch": "arm"
}
}
files = {
"metadata": (None, json.dumps(metadata), "application/json"),
'file': (model_name, open('./models/ccfraud.onnx', 'rb'), "application/octet-stream")
}
response = requests.post(endpoint, files=files, headers=headers).json()
# Create the pipeline and set the model. Note that the architecture setting is not required to deploy on ARM nodepools
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/deploy"
# verify this matches the pipeline with model created earlier
pipeline_with_models_id = "api-pipeline-with-models"
data = {
"deploy_id": pipeline_with_models_id,
"pipeline_version_pk_id": model_pipeline_variant_id,
"models": [
{
"name": model_name,
"version":example_model_version,
"sha":example_model_sha
}
],
"pipeline_id": model_pipeline_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
display(response)
model_deployment_id=response['id']
## deploy the pipeline. No additional deployment configurations are required to support the architecture
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/deploy"
# verify this matches the pipeline with model created earlier
pipeline_with_models_id = "api-pipeline-with-models"
data = {
"deploy_id": pipeline_with_models_id,
"pipeline_version_pk_id": model_pipeline_variant_id,
"models": [
{
"name": model_name,
"version":example_model_version,
"sha":example_model_sha
}
],
"pipeline_id": model_pipeline_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
Get External Inference URL
- Endpoint:
/v1/api/admin/get_pipeline_external_url
Retrieves the external inference URL for a specific pipeline in a workspace.
Get External Inference URL Parameters
Field | Type | Description |
---|---|---|
workspace_id | Integer (*REQUIRED) | The workspace integer id. |
pipeline_name | String (REQUIRED) | The name of the deployment. |
Get External Inference URL Returns
Field | Type | Description |
---|---|---|
url | String | The pipeline’s external inference URL. |
Get External Inference URL Examples
In this example, the pipeline’s external inference URL from the previous example is retrieved.
Get external inference url via Requests.
## Retrieve the pipeline's External Inference URL
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/admin/get_pipeline_external_url"
data = {
"workspace_id": example_workspace_id,
"pipeline_name": pipeline_with_models_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
print(response)
deployurl = response['url']
{'url': 'https://doc-test.wallarooexample.ai/v1/api/pipelines/infer/api-pipeline-with-models-14/api-pipeline-with-models'}
Get external inference url via Requests.
curl {wl.api_endpoint}/v1/api/admin/get_pipeline_external_url \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"workspace_id": {example_workspace_id}, \
"pipeline_name": "{pipeline_with_models_id}" \
}}'
{"url":"https://doc-test.wallarooexample.ai/v1/api/pipelines/infer/api-pipeline-with-models-14/api-pipeline-with-models"}
Perform Inference Through External URL
The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.
Deployed pipelines have their own Inference URL that accepts HTTP POST submissions.
For connections that are external to the Kubernetes cluster hosting the Wallaroo instance, model endpoints must be enabled.
HTTP Headers
The following headers are required for connecting the the Pipeline Deployment URL:
Authorization: This requires the JWT token in the format
'Bearer ' + token
. For example:Authorization: Bearer abcdefg==
Content-Type:
For DataFrame formatted JSON:
Content-Type:application/json; format=pandas-records
For Arrow binary files, the
Content-Type
isapplication/vnd.apache.arrow.file
.Content-Type:application/vnd.apache.arrow.file
IMPORTANT NOTE: Verify that the pipeline deployed has status
Running
before attempting an inference.
Perform inference via external url via Requests.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'
## Inference through external URL using dataframe
# retrieve the json data to submit
data = [
{
"dense_input":[
1.0678324729,
0.2177810266,
-1.7115145262,
0.682285721,
1.0138553067,
-0.4335000013,
0.7395859437,
-0.2882839595,
-0.447262688,
0.5146124988,
0.3791316964,
0.5190619748,
-0.4904593222,
1.1656456469,
-0.9776307444,
-0.6322198963,
-0.6891477694,
0.1783317857,
0.1397992467,
-0.3554220649,
0.4394217877,
1.4588397512,
-0.3886829615,
0.4353492889,
1.7420053483,
-0.4434654615,
-0.1515747891,
-0.2668451725,
-1.4549617756
]
}
]
# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
requests.post(
deployurl,
json=data,
headers=headers)
.json()
)
display(response)
time | in | out | check_failures | metadata | |
---|---|---|---|---|---|
0 | 1701376879347 | {'dense_input': [1.0678324729, 0.2177810266, -... | {'dense_1': [0.0014974177]} | [] | {'last_model': '{"model_name":"api-sample-mode... |
Perform inference via external url via curl.
curl {deployurl} \
-H "Content-Type: application/json; format=pandas-records" \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
--data '[ \
{{ \
"dense_input":[ \
1.0678324729, \
0.2177810266, \
-1.7115145262, \
0.682285721, \
1.0138553067, \
-0.4335000013, \
0.7395859437, \
-0.2882839595, \
-0.447262688, \
0.5146124988, \
0.3791316964, \
0.5190619748, \
-0.4904593222, \
1.1656456469, \
-0.9776307444, \
-0.6322198963, \
-0.6891477694, \
0.1783317857, \
0.1397992467, \
-0.3554220649, \
0.4394217877, \
1.4588397512, \
-0.3886829615, \
0.4353492889, \
1.7420053483, \
-0.4434654615, \
-0.1515747891, \
-0.2668451725, \
-1.4549617756 \
] \
}} \
]'
[
{
"time": 1701377058894,
"in": {
"dense_input": [
1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067,
-0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988,
0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444,
-0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649,
0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483,
-0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756
]
},
"out": { "dense_1": [0.0014974177] },
"check_failures": [],
"metadata": {
"last_model": "{\"model_name\":\"api-sample-model\",\"model_sha\":\"bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507\"}",
"pipeline_version": "",
"elapsed": [53101, 318104],
"dropped": [],
"partition": "engine-797d8958d9-fsszh"
}
}
]
Undeploy a Pipeline
- Endpoint:
/v1/api/pipelines/undeploy
Undeploys a deployed pipeline.
Undeploy a Pipeline Parameters
Field | Type | Description |
---|---|---|
pipeline_id | Integer (*REQUIRED) | The numerical id of the pipeline. |
deployment_id | Integer (*REQUIRED) | The numerical id of the deployment. |
Undeploy a Pipeline Returns
Nothing if the call is successful.
Undeploy a Pipeline Examples
The pipeline with models deployed is undeployed.
Undeploy the pipeline via Requests.
# Undeploy pipeline with models
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/undeploy"
data = {
"pipeline_id": model_pipeline_id,
"deployment_id":model_deployment_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
display(response)
Undeploy the pipeline via curl.
curl {wl.api_endpoint}/v1/api/pipelines/undeploy \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"pipeline_id": {model_pipeline_id}, \
"deployment_id": {model_deployment_id} \
}}'
null
Copy a Pipeline
Copies an existing pipeline into a new one in the same workspace. A new engine configuration can be set for the copied pipeline.
Copy a Pipeline Parameters
Copy a Pipeline Returns
- Parameters
- name - (REQUIRED string): The name of the new pipeline.
- workspace_id - (REQUIRED int): The numerical id of the workspace to copy the source pipeline from.
- source_pipeline - (REQUIRED int): The numerical id of the pipeline to copy from.
- deploy - (OPTIONAL string): Name of the deployment.
- engine_config - (OPTIONAL string): Engine configuration options.
- pipeline_version - (OPTIONAL string): Optional version of the copied pipeline to create.
Copy a Pipeline Examples
The pipeline with models created in the step Create Pipeline in a Workspace will be copied into a new one.
Copy a pipeline via Requests.
## Copy a pipeline
# Retrieve the token
headers = wl.auth.auth_header()
endpoint = f"{wl.api_endpoint}/v1/api/pipelines/copy"
data = {
"name": "api-copied-pipeline-requests",
"workspace_id": example_workspace_id,
"source_pipeline": model_pipeline_id
}
response = requests.post(endpoint, json=data, headers=headers, verify=True).json()
response
{'pipeline_pk_id': 9,
'pipeline_variant_pk_id': 9,
'pipeline_version': None,
'deployment': None}
Copy a pipeline via curl.
curl {wl.api_endpoint}/v1/api/pipelines/copy \
-H "Authorization: {wl.auth.auth_header()['Authorization']}" \
-H "Content-Type: application/json" \
--data '{{ \
"name": "api-copied-pipeline-curl", \
"workspace_id": {example_workspace_id}, \
"source_pipeline": {model_pipeline_id} \
}}'
{
"pipeline_pk_id": 32,
"pipeline_variant_pk_id": 32,
"pipeline_version": null,
"deployment": null
}