Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. .
and _
are not allowed.
Pipelines are managed through the Wallaroo API or the Wallaroo SDK. Pipelines are the vehicle used for deploying, serving, and monitoring ML models. For more information, see the Wallaroo Glossary.
Creates a new pipeline in the specified workspace.
{}
for none.Example: Two pipelines are created in the workspace created in the step Create Workspace. One will be an empty pipeline without any models, the other will be created using the uploaded models in the Upload Model to Workspace step and no configuration details. The pipeline id, variant id, and variant version of each pipeline will be stored for later examples.
The variable example_workspace_id
was created in a previous example.
# Create pipeline in a workspace
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/create"
data = {
"pipeline_id": empty_pipeline_name,
"workspace_id": example_workspace_id,
"definition": {}
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
empty_pipeline_id = response['pipeline_pk_id']
empty_pipeline_variant_id=response['pipeline_variant_pk_id']
example_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 7,
'pipeline_variant_pk_id': 7,
'pipeline_variant_version': 'a6dd2cee-58d6-4d24-9e25-f531dbbb95ad'}
# Create pipeline in a workspace with models
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/create"
data = {
"pipeline_id": model_pipeline_name,
"workspace_id": example_workspace_id,
"definition": {
'steps': [
{
'ModelInference':
{
'models':
[
{
'name': model_name,
'version': example_model_version,
'sha': example_model_sha
}
]
}
}
]
}
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
model_pipeline_id = response['pipeline_pk_id']
model_pipeline_variant_id=response['pipeline_variant_pk_id']
model_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 8,
'pipeline_variant_pk_id': 8,
'pipeline_variant_version': '55f45c16-591e-4a16-8082-3ab6d843b484'}
Deploy a an existing pipeline. Note that for any pipeline that has model steps, they must be included either in model_configs
, model_ids
or models
.
/pipelines/deploy
{"cpus": 1, "replica_count": 1, "memory": "999Mi"}
Available parameters include the following.cpus
: The number of CPUs to apply to the native runtime models in the pipeline. cpus
can be a fraction of a cpu, for example "cpus": 0.25
.gpus
: The number of GPUs to apply to the native runtime models. GPUs can only be allocated in whole numbers. Organizations should monitor how many GPUs are allocated to a pipelines to verify they have enough GPUs for all pipelines. If gpus
is called, then the deployment_label
must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance.replica_count
: The number of replicas of the pipeline to deploy. This allows for multiple deployments of the same models to be deployed to increase inferences through parallelization.replica_autoscale_min_max
: Provides replicas to be scaled from 0 to some maximum number of replicas. This allows pipelines to spin up additional replicas as more resources are required, then spin them back down to save on resources and costs. For example: "replica_autoscale_min_max": {"maximum": 2, "minimum":0}
autoscale_cpu_utilization
: Sets the average CPU percentage metric for when to load or unload another replica.disable_autoscale
: Disables autoscaling in the deployment configuration.memory
: Sets the amount of RAM to allocate the pipeline. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:KiB
(for KiloBytes)MiB
(for MegaBytes)GiB
(for GigaBytes)TiB
(for TeraBytes)lb_cpus
: Sets the number or fraction of CPUs to use for the pipeline’s load balancer, for example: 0.25, 1, 1.5, etc. The units, similar to the Kubernetes CPU definitions.lb_memory
: Sets the amount of RAM to allocate the pipeline’s load balancer. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:KiB
(for KiloBytes)MiB
(for MegaBytes)GiB
(for GigaBytes)TiB
(for TeraBytes)deployment_label
: Label used for Kubernetes labels.arch
: Determines which architecture to use. The available options are:x86
: Select x86 from available nodepools.arm
: ARM based architecture such as Ampere® Altra® Arm-based processor included with the following Azure virtual machines:sidekick_cpus
: Sets the number of CPUs to be used for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:model
: The sidekick model to configure.core_count
: Sets the number or fraction of CPUs to use.sidekick_arch
: Determines which architecture to use. The available options are:x86
: Select x86 from available nodepools.arm
: ARM based architecture such as Ampere® Altra® Arm-based processor included with the following Azure virtual machines:sidekick_memory
: Sets the memory available to for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:model
: The sidekick model to configure.memory_spec
: The amount of memory to allocated as memory unit values. The accepted unit values are:KiB
(for KiloBytes)MiB
(for MegaBytes)GiB
(for GigaBytes)TiB
(for TeraBytes)sidekick_env
: Environment variables submitted to the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. These are used specifically for containerized models that have environment variables that effect their performance. This takes the following parameters:model
: The sidekick model to configure.environment
: Dictionary inputssidekick_gpus
: Sets the number of GPUs to allocate for containerized runtimes. GPUs are only allocated in whole units, not as fractions. Organizations should be aware of the total number of GPUs available to the cluster, and monitor which pipeline deployment configurations have GPUs allocated to ensure they do not run out. If there are not enough GPUs to allocate to a pipeline deployment configuration, and error message will be deployed when the pipeline is deployed. If gpus
is called, then the deployment_label
must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance. This takes the following parameters:model
: The sidekick model to configure.core_count
: The number of GPUs to allocate.models
are provided as a parameter.
Examples: Both the pipeline with model created in the step Create Pipeline in a Workspace will be deployed and their deployment information saved for later examples.
# Deploy a pipeline with models
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/deploy"
model_deploy_id=model_pipeline_name
# example_model_deploy_id="test deployment name"
data = {
"deploy_id": model_deploy_id,
"pipeline_version_pk_id": model_pipeline_variant_id,
"models": [
{
"name": model_name,
"version":example_model_version,
"sha":example_model_sha
}
],
"pipeline_id": model_pipeline_id
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
display(response)
model_deployment_id=response['id']
# wait 45 seconds for the pipeline to complete deployment
import time
time.sleep(45)
{'id': 5}
Returns the deployment status.
Example: The deployed empty and model pipelines status will be displayed.
# Retrieve the token
headers = wl.auth.auth_header()
# Get model pipeline deployment
api_request = f"{APIURL}/v1/api/status/get_deployment"
data = {
"name": f"{model_deploy_id}-{model_deployment_id}"
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
'details': [],
'engines': [{'ip': '10.244.3.136',
'name': 'engine-76b8f76d58-vbwqs',
'status': 'Running',
'reason': None,
'details': [],
'pipeline_statuses': {'pipelines': [{'id': 'pipelinemodels',
'status': 'Running'}]},
'model_statuses': {'models': [{'name': 'apimodel',
'version': '0b989008-5f1d-453e-8085-98d97be1b722',
'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
'status': 'Running'}]}}],
'engine_lbs': [{'ip': '10.244.4.166',
'name': 'engine-lb-584f54c899-rdfkl',
'status': 'Running',
'reason': None,
'details': []}],
'sidekicks': []}
The API command /admin/get_pipeline_external_url
retrieves the external inference URL for a specific pipeline in a workspace.
In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace
will have it’s workspace id stored and used for the /admin/get_pipeline_external_url
request with the pipeline urlpipeline
.
The External Inference URL will be stored as a variable for the next step.
## Retrieve the pipeline's External Inference URL
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/admin/get_pipeline_external_url"
data = {
"workspace_id": example_workspace_id,
"pipeline_name": model_pipeline_name
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
print(response)
deployurl = response['url']
{'url': 'https://doc-test.api.wallarooexample.ai/v1/api/pipelines/infer/pipelinemodels-5/pipelinemodels'}
The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.
Deployed pipelines have their own Inference URL that accepts HTTP POST submissions.
For connections that are external to the Kubernetes cluster hosting the Wallaroo instance, model endpoints must be enabled.
The following headers are required for connecting the the Pipeline Deployment URL:
Authorization: This requires the JWT token in the format 'Bearer ' + token
. For example:
Authorization: Bearer abcdefg==
Content-Type:
For DataFrame formatted JSON:
Content-Type:application/json; format=pandas-records
For Arrow binary files, the Content-Type
is application/vnd.apache.arrow.file
.
Content-Type:application/vnd.apache.arrow.file
IMPORTANT NOTE: Verify that the pipeline deployed has status Running
before attempting an inference.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'
## Inference through external URL using dataframe
# retrieve the json data to submit
data = [
{
"tensor":[
1.0678324729,
0.2177810266,
-1.7115145262,
0.682285721,
1.0138553067,
-0.4335000013,
0.7395859437,
-0.2882839595,
-0.447262688,
0.5146124988,
0.3791316964,
0.5190619748,
-0.4904593222,
1.1656456469,
-0.9776307444,
-0.6322198963,
-0.6891477694,
0.1783317857,
0.1397992467,
-0.3554220649,
0.4394217877,
1.4588397512,
-0.3886829615,
0.4353492889,
1.7420053483,
-0.4434654615,
-0.1515747891,
-0.2668451725,
-1.4549617756
]
}
]
# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
requests.post(
deployurl,
json=data,
headers=headers)
.json()
)
display(response)
time | in | out | check_failures | metadata | |
---|---|---|---|---|---|
0 | 1684356836285 | {'tensor': [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756]} | {'dense_1': [0.0014974177]} | [] | {'last_model': '{"model_name":"apimodel","model_sha":"bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507"}', 'pipeline_version': '', 'elapsed': [163502, 309804]} |
Undeploys a deployed pipeline.
Example: Both the empty pipeline and pipeline with models deployed in the step Deploy a Pipeline will be undeployed.
# Undeploy pipeline with models
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/undeploy"
data = {
"pipeline_id": model_pipeline_id,
"deployment_id":model_deployment_id
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
Copies an existing pipeline into a new one in the same workspace. A new engine configuration can be set for the copied pipeline.
Example: The pipeline with models created in the step Create Pipeline in a Workspace will be copied into a new one.
## Copy a pipeline
# Retrieve the token
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/copy"
data = {
"name": example_copied_pipeline_name,
"workspace_id": example_workspace_id,
"source_pipeline": model_pipeline_id
}
response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'pipeline_pk_id': 9,
'pipeline_variant_pk_id': 9,
'pipeline_version': None,
'deployment': None}