Wallaroo MLOps API Essentials Guide: Pipeline Management

How to use the Wallaroo API for Pipeline Management

Pipeline Naming Requirements

Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. . and _ are not allowed.

Pipeline Management

Pipelines are managed through the Wallaroo API or the Wallaroo SDK. Pipelines are the vehicle used for deploying, serving, and monitoring ML models. For more information, see the Wallaroo Glossary.

Create Pipeline in a Workspace

Creates a new pipeline in the specified workspace.

  • Parameters
    • pipeline_id - (REQUIRED string): Name of the new pipeline.
    • workspace_id - (REQUIRED int): Numerical id of the workspace for the new pipeline.
    • definition - (REQUIRED string): Pipeline definitions, can be {} for none.

Example: Two pipelines are created in the workspace created in the step Create Workspace. One will be an empty pipeline without any models, the other will be created using the uploaded models in the Upload Model to Workspace step and no configuration details. The pipeline id, variant id, and variant version of each pipeline will be stored for later examples.

The variable example_workspace_id was created in a previous example.

# Create pipeline in a workspace
# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/pipelines/create"

data = {
  "pipeline_id": empty_pipeline_name,
  "workspace_id": example_workspace_id,
  "definition": {}
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()

empty_pipeline_id = response['pipeline_pk_id']
empty_pipeline_variant_id=response['pipeline_variant_pk_id']
example_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 7,
 'pipeline_variant_pk_id': 7,
 'pipeline_variant_version': 'a6dd2cee-58d6-4d24-9e25-f531dbbb95ad'}
# Create pipeline in a workspace with models
# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/create"

data = {
  "pipeline_id": model_pipeline_name,
  "workspace_id": example_workspace_id,
  "definition": {
      'steps': [
          {
            'ModelInference': 
              {
                  'models': 
                    [
                        {
                            'name': model_name, 
                            'version': example_model_version, 
                            'sha': example_model_sha
                        }
                    ]
              }
          }
        ]
      }
    }

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
model_pipeline_id = response['pipeline_pk_id']
model_pipeline_variant_id=response['pipeline_variant_pk_id']
model_pipeline_variant_version=['pipeline_variant_version']
response
{'pipeline_pk_id': 8,
 'pipeline_variant_pk_id': 8,
 'pipeline_variant_version': '55f45c16-591e-4a16-8082-3ab6d843b484'}

Deploy a Pipeline

Deploy a an existing pipeline. Note that for any pipeline that has model steps, they must be included either in model_configs, model_ids or models.

  • Parameters
    • deploy_id (REQUIRED string): The name for the pipeline deployment. This must match the name of the pipeline being deployed.
    • engine_config (OPTIONAL string): Additional configuration options for the pipeline.
    • pipeline_version_pk_id (REQUIRED int): Pipeline version id.
    • model_configs (OPTIONAL Array int): Ids of model configs to apply.
    • model_ids (OPTIONAL Array int): Ids of models to apply to the pipeline. If passed in, model_configs will be created automatically.
    • models (OPTIONAL Array models): If the model ids are not available as a pipeline step, the models’ data can be passed to it through this method. The options below are only required if models are provided as a parameter.
      • name (REQUIRED string): Name of the uploaded model that is in the same workspace as the pipeline.
      • version (REQUIRED string): Version of the model to use.
      • sha (REQUIRED string): SHA value of the model.
    • pipeline_id (REQUIRED int): Numerical value of the pipeline to deploy.
  • Returns
    • id (int): The deployment id.

Examples: Both the pipeline with model created in the step Create Pipeline in a Workspace will be deployed and their deployment information saved for later examples.

# Deploy a pipeline with models

# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/deploy"

model_deploy_id=model_pipeline_name

# example_model_deploy_id="test deployment name"

data = {
    "deploy_id": model_deploy_id,
    "pipeline_version_pk_id": model_pipeline_variant_id,
    "models": [
        {
            "name": model_name,
            "version":example_model_version,
            "sha":example_model_sha
        }
    ],
    "pipeline_id": model_pipeline_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
display(response)
model_deployment_id=response['id']

# wait 45 seconds for the pipeline to complete deployment
import time
time.sleep(45)
{'id': 5}

Get Deployment Status

Returns the deployment status.

  • Parameters
    • name - (REQUIRED string): The deployment in the format {deployment_name}-{deploymnent-id}.

Example: The deployed empty and model pipelines status will be displayed.

# Retrieve the token 
headers = wl.auth.auth_header()

# Get model pipeline deployment

api_request = f"{APIURL}/v1/api/status/get_deployment"

data = {
  "name": f"{model_deploy_id}-{model_deployment_id}"
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.244.3.136',
   'name': 'engine-76b8f76d58-vbwqs',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'pipelinemodels',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'apimodel',
      'version': '0b989008-5f1d-453e-8085-98d97be1b722',
      'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.4.166',
   'name': 'engine-lb-584f54c899-rdfkl',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': []}

Get External Inference URL

The API command /admin/get_pipeline_external_url retrieves the external inference URL for a specific pipeline in a workspace.

  • Parameters
    • workspace_id (REQUIRED integer): The workspace integer id.
    • pipeline_name (REQUIRED string): The name of the deployment.

In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace will have it’s workspace id stored and used for the /admin/get_pipeline_external_url request with the pipeline urlpipeline.

The External Inference URL will be stored as a variable for the next step.

## Retrieve the pipeline's External Inference URL

# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/admin/get_pipeline_external_url"

data = {
    "workspace_id": example_workspace_id,
    "pipeline_name": model_pipeline_name
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
print(response)
deployurl = response['url']
{'url': 'https://doc-test.api.wallarooexample.ai/v1/api/pipelines/infer/pipelinemodels-5/pipelinemodels'}

Perform Inference Through External URL

The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.

Deployed pipelines have their own Inference URL that accepts HTTP POST submissions.

For connections that are external to the Kubernetes cluster hosting the Wallaroo instance, model endpoints must be enabled.

HTTP Headers

The following headers are required for connecting the the Pipeline Deployment URL:

  • Authorization: This requires the JWT token in the format 'Bearer ' + token. For example:

    Authorization: Bearer abcdefg==
    
  • Content-Type:

  • For DataFrame formatted JSON:

    Content-Type:application/json; format=pandas-records
    
  • For Arrow binary files, the Content-Type is application/vnd.apache.arrow.file.

    Content-Type:application/vnd.apache.arrow.file
    
  • IMPORTANT NOTE: Verify that the pipeline deployed has status Running before attempting an inference.

# Retrieve the token
headers = wl.auth.auth_header()

# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'

## Inference through external URL using dataframe

# retrieve the json data to submit
data = [
    {
        "tensor":[
            1.0678324729,
            0.2177810266,
            -1.7115145262,
            0.682285721,
            1.0138553067,
            -0.4335000013,
            0.7395859437,
            -0.2882839595,
            -0.447262688,
            0.5146124988,
            0.3791316964,
            0.5190619748,
            -0.4904593222,
            1.1656456469,
            -0.9776307444,
            -0.6322198963,
            -0.6891477694,
            0.1783317857,
            0.1397992467,
            -0.3554220649,
            0.4394217877,
            1.4588397512,
            -0.3886829615,
            0.4353492889,
            1.7420053483,
            -0.4434654615,
            -0.1515747891,
            -0.2668451725,
            -1.4549617756
        ]
    }
]

# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
    requests.post(
        deployurl, 
        json=data, 
        headers=headers)
        .json()
    )

display(response)
time in out check_failures metadata
0 1684356836285 {'tensor': [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756]} {'dense_1': [0.0014974177]} [] {'last_model': '{"model_name":"apimodel","model_sha":"bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507"}', 'pipeline_version': '', 'elapsed': [163502, 309804]}

Undeploy a Pipeline

Undeploys a deployed pipeline.

  • Parameters
    • pipeline_id - (REQUIRED int): The numerical id of the pipeline.
    • deployment_id - (REQUIRED int): The numerical id of the deployment.
  • Returns
    • Nothing if the call is successful.

Example: Both the empty pipeline and pipeline with models deployed in the step Deploy a Pipeline will be undeployed.

# Undeploy pipeline with models
# Retrieve the token 
headers = wl.auth.auth_header()
api_request = f"{APIURL}/v1/api/pipelines/undeploy"

data = {
    "pipeline_id": model_pipeline_id,
    "deployment_id":model_deployment_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response

Copy a Pipeline

Copies an existing pipeline into a new one in the same workspace. A new engine configuration can be set for the copied pipeline.

  • Parameters
    • name - (REQUIRED string): The name of the new pipeline.
    • workspace_id - (REQUIRED int): The numerical id of the workspace to copy the source pipeline from.
    • source_pipeline - (REQUIRED int): The numerical id of the pipeline to copy from.
    • deploy - (OPTIONAL string): Name of the deployment.
    • engine_config - (OPTIONAL string): Engine configuration options.
    • pipeline_version - (OPTIONAL string): Optional version of the copied pipeline to create.

Example: The pipeline with models created in the step Create Pipeline in a Workspace will be copied into a new one.

## Copy a pipeline

# Retrieve the token 
headers = wl.auth.auth_header()

api_request = f"{APIURL}/v1/api/pipelines/copy"

data = {
  "name": example_copied_pipeline_name,
  "workspace_id": example_workspace_id,
  "source_pipeline": model_pipeline_id
}

response = requests.post(api_request, json=data, headers=headers, verify=True).json()
response
{'pipeline_pk_id': 9,
 'pipeline_variant_pk_id': 9,
 'pipeline_version': None,
 'deployment': None}