Wallaroo MLOps API Essentials Guide: Pipeline Management

How to use the Wallaroo API for Pipeline Management

Pipeline Naming Requirements

Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. . and _ are not allowed.

Pipeline Management

Pipelines are managed through the Wallaroo API or the Wallaroo SDK. Pipelines are the vehicle used for deploying, serving, and monitoring ML models. For more information, see the Wallaroo Glossary.

Create Pipeline in a Workspace

Creates a new pipeline in the specified workspace.

  • Parameters
    • pipeline_id - (REQUIRED string): Name of the new pipeline.
    • workspace_id - (REQUIRED int): Numerical id of the workspace for the new pipeline.
    • definition - (REQUIRED string): Pipeline definitions, can be {} for none.

Example: Two pipelines are created in the workspace created in the step Create Workspace. One will be an empty pipeline without any models, the other will be created using the uploaded models in the Upload Model to Workspace step and no configuration details. The pipeline id, variant id, and variant version of each pipeline will be stored for later examples.

The variable example_workspace_id was created in a previous example.

# Create pipeline in a workspace
# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/pipelines/create"

data = {
  "pipeline_id": empty_pipeline_name,
  "workspace_id": example_workspace_id,
  "definition": {}
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()

empty_pipeline_id = response['pipeline_pk_id']
empty_pipeline_variant_id=response['pipeline_variant_pk_id']
example_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 7,
 'pipeline_variant_pk_id': 7,
 'pipeline_variant_version': 'a6dd2cee-58d6-4d24-9e25-f531dbbb95ad'}
# Create pipeline in a workspace with models
# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/create"

data = {
  "pipeline_id": model_pipeline_name,
  "workspace_id": example_workspace_id,
  "definition": {
      'steps': [
          {
            'ModelInference': 
              {
                  'models': 
                    [
                        {
                            'name': model_name, 
                            'version': example_model_version, 
                            'sha': example_model_sha
                        }
                    ]
              }
          }
        ]
      }
    }

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
model_pipeline_id = response['pipeline_pk_id']
model_pipeline_variant_id=response['pipeline_variant_pk_id']
model_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 8,
 'pipeline_variant_pk_id': 8,
 'pipeline_variant_version': '55f45c16-591e-4a16-8082-3ab6d843b484'}

Deploy a Current Pipeline Version

Deploy a an existing pipeline. Note that for any pipeline that has model steps, they must be included either in model_configs, model_ids or models.

  • Endpoint
    • /pipelines/deploy
  • Parameters
    • deploy_id (REQUIRED string): The name for the pipeline deployment.
    • engine_config (OPTIONAL string): Additional configuration options for the pipeline. These set the memory, replicas, and other settings. For example: {"cpus": 1, "replica_count": 1, "memory": "999Mi"} Available parameters include the following.
      • cpus: The number of CPUs to apply to the native runtime models in the pipeline. cpus can be a fraction of a cpu, for example "cpus": 0.25.
      • gpus: The number of GPUs to apply to the native runtime models. GPUs can only be allocated in whole numbers. Organizations should monitor how many GPUs are allocated to a pipelines to verify they have enough GPUs for all pipelines. If gpus is called, then the deployment_label must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance.
      • replica_count: The number of replicas of the pipeline to deploy. This allows for multiple deployments of the same models to be deployed to increase inferences through parallelization.
      • replica_autoscale_min_max: Provides replicas to be scaled from 0 to some maximum number of replicas. This allows pipelines to spin up additional replicas as more resources are required, then spin them back down to save on resources and costs. For example: "replica_autoscale_min_max": {"maximum": 2, "minimum":0}
      • autoscale_cpu_utilization: Sets the average CPU percentage metric for when to load or unload another replica.
      • disable_autoscale: Disables autoscaling in the deployment configuration.
      • memory: Sets the amount of RAM to allocate the pipeline. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:
        • KiB (for KiloBytes)
        • MiB (for MegaBytes)
        • GiB (for GigaBytes)
        • TiB (for TeraBytes)
      • lb_cpus: Sets the number or fraction of CPUs to use for the pipeline’s load balancer, for example: 0.25, 1, 1.5, etc. The units, similar to the Kubernetes CPU definitions.
      • lb_memory: Sets the amount of RAM to allocate the pipeline’s load balancer. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:
        • KiB (for KiloBytes)
        • MiB (for MegaBytes)
        • GiB (for GigaBytes)
        • TiB (for TeraBytes)
      • deployment_label: Label used for Kubernetes labels.
      • arch: Determines which architecture to use. The available options are:
      • sidekick_cpus: Sets the number of CPUs to be used for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:
        • model: The sidekick model to configure.
        • core_count: Sets the number or fraction of CPUs to use.
      • sidekick_arch: Determines which architecture to use. The available options are:
      • sidekick_memory: Sets the memory available to for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:
        • model: The sidekick model to configure.
        • memory_spec: The amount of memory to allocated as memory unit values. The accepted unit values are:
          • KiB (for KiloBytes)
          • MiB (for MegaBytes)
          • GiB (for GigaBytes)
          • TiB (for TeraBytes)
      • sidekick_env: Environment variables submitted to the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. These are used specifically for containerized models that have environment variables that effect their performance. This takes the following parameters:
        • model: The sidekick model to configure.
        • environment: Dictionary inputs
      • sidekick_gpus: Sets the number of GPUs to allocate for containerized runtimes. GPUs are only allocated in whole units, not as fractions. Organizations should be aware of the total number of GPUs available to the cluster, and monitor which pipeline deployment configurations have GPUs allocated to ensure they do not run out. If there are not enough GPUs to allocate to a pipeline deployment configuration, and error message will be deployed when the pipeline is deployed. If gpus is called, then the deployment_label must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance. This takes the following parameters:
        • model: The sidekick model to configure.
        • core_count: The number of GPUs to allocate.
    • pipeline_version_pk_id (REQUIRED int): Pipeline version id.
    • model_configs (OPTIONAL Array int): Ids of model configs to apply.
    • model_ids (OPTIONAL Array int): Ids of models to apply to the pipeline. If passed in, model_configs will be created automatically.
    • models (OPTIONAL Array models): If the model ids are not available as a pipeline step, the models’ data can be passed to it through this method. The options below are only required if models are provided as a parameter.
      • name (REQUIRED string): Name of the uploaded model that is in the same workspace as the pipeline.
      • version (REQUIRED string): Version of the model to use.
      • sha (REQUIRED string): SHA value of the model.
    • pipeline_id (REQUIRED int): Numerical value of the pipeline to deploy.
  • Returns
    • id (int): The deployment id.

 

Examples: Both the pipeline with model created in the step Create Pipeline in a Workspace will be deployed and their deployment information saved for later examples.

# Deploy a pipeline with models

# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/deploy"

model_deploy_id=model_pipeline_name

# example_model_deploy_id="test deployment name"

data = {
    "deploy_id": model_deploy_id,
    "pipeline_version_pk_id": model_pipeline_variant_id,
    "models": [
        {
            "name": model_name,
            "version":example_model_version,
            "sha":example_model_sha
        }
    ],
    "pipeline_id": model_pipeline_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
display(response)
model_deployment_id=response['id']

# wait 45 seconds for the pipeline to complete deployment
import time
time.sleep(45)
{'id': 5}

Get Deployment Status

Returns the deployment status.

  • Parameters
    • name - (REQUIRED string): The deployment in the format {deployment_name}-{deploymnent-id}.

Example: The deployed empty and model pipelines status will be displayed.

# Retrieve the token 
headers = wl.auth.auth_header()

# Get model pipeline deployment

api_request = f"{APIURL}/v1/api/status/get_deployment"

data = {
  "name": f"{model_deploy_id}-{model_deployment_id}"
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.244.3.136',
   'name': 'engine-76b8f76d58-vbwqs',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'pipelinemodels',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'apimodel',
      'version': '0b989008-5f1d-453e-8085-98d97be1b722',
      'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.4.166',
   'name': 'engine-lb-584f54c899-rdfkl',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': []}

Get External Inference URL

The API command /admin/get_pipeline_external_url retrieves the external inference URL for a specific pipeline in a workspace.

  • Parameters
    • workspace_id (REQUIRED integer): The workspace integer id.
    • pipeline_name (REQUIRED string): The name of the deployment.

In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace will have it’s workspace id stored and used for the /admin/get_pipeline_external_url request with the pipeline urlpipeline.

The External Inference URL will be stored as a variable for the next step.

## Retrieve the pipeline's External Inference URL

# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/admin/get_pipeline_external_url"

data = {
    "workspace_id": example_workspace_id,
    "pipeline_name": model_pipeline_name
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
print(response)
deployurl = response['url']
{'url': 'https://doc-test.api.wallarooexample.ai/v1/api/pipelines/infer/pipelinemodels-5/pipelinemodels'}

Perform Inference Through External URL

The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.

Deployed pipelines have their own Inference URL that accepts HTTP POST submissions.

For connections that are external to the Kubernetes cluster hosting the Wallaroo instance, model endpoints must be enabled.

HTTP Headers

The following headers are required for connecting the the Pipeline Deployment URL:

  • Authorization: This requires the JWT token in the format 'Bearer ' + token. For example:

    Authorization: Bearer abcdefg==
    
  • Content-Type:

  • For DataFrame formatted JSON:

    Content-Type:application/json; format=pandas-records
    
  • For Arrow binary files, the Content-Type is application/vnd.apache.arrow.file.

    Content-Type:application/vnd.apache.arrow.file
    
  • IMPORTANT NOTE: Verify that the pipeline deployed has status Running before attempting an inference.

# Retrieve the token
headers = wl.auth.auth_header()

# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'

## Inference through external URL using dataframe

# retrieve the json data to submit
data = [
    {
        "tensor":[
            1.0678324729,
            0.2177810266,
            -1.7115145262,
            0.682285721,
            1.0138553067,
            -0.4335000013,
            0.7395859437,
            -0.2882839595,
            -0.447262688,
            0.5146124988,
            0.3791316964,
            0.5190619748,
            -0.4904593222,
            1.1656456469,
            -0.9776307444,
            -0.6322198963,
            -0.6891477694,
            0.1783317857,
            0.1397992467,
            -0.3554220649,
            0.4394217877,
            1.4588397512,
            -0.3886829615,
            0.4353492889,
            1.7420053483,
            -0.4434654615,
            -0.1515747891,
            -0.2668451725,
            -1.4549617756
        ]
    }
]

# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
    requests.post(
        deployurl, 
        json=data, 
        headers=headers)
        .json()
    )

display(response)
timeinoutcheck_failuresmetadata
01684356836285{'tensor': [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756]}{'dense_1': [0.0014974177]}[]{'last_model': '{"model_name":"apimodel","model_sha":"bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507"}', 'pipeline_version': '', 'elapsed': [163502, 309804]}

Undeploy a Pipeline

Undeploys a deployed pipeline.

  • Parameters
    • pipeline_id - (REQUIRED int): The numerical id of the pipeline.
    • deployment_id - (REQUIRED int): The numerical id of the deployment.
  • Returns
    • Nothing if the call is successful.

Example: Both the empty pipeline and pipeline with models deployed in the step Deploy a Pipeline will be undeployed.

# Undeploy pipeline with models
# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/undeploy"

data = {
    "pipeline_id": model_pipeline_id,
    "deployment_id":model_deployment_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response

Copy a Pipeline

Copies an existing pipeline into a new one in the same workspace. A new engine configuration can be set for the copied pipeline.

  • Parameters
    • name - (REQUIRED string): The name of the new pipeline.
    • workspace_id - (REQUIRED int): The numerical id of the workspace to copy the source pipeline from.
    • source_pipeline - (REQUIRED int): The numerical id of the pipeline to copy from.
    • deploy - (OPTIONAL string): Name of the deployment.
    • engine_config - (OPTIONAL string): Engine configuration options.
    • pipeline_version - (OPTIONAL string): Optional version of the copied pipeline to create.

Example: The pipeline with models created in the step Create Pipeline in a Workspace will be copied into a new one.

## Copy a pipeline

# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/pipelines/copy"

data = {
  "name": example_copied_pipeline_name,
  "workspace_id": example_workspace_id,
  "source_pipeline": model_pipeline_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'pipeline_pk_id': 9,
 'pipeline_variant_pk_id': 9,
 'pipeline_version': None,
 'deployment': None}