Wallaroo MLOps API Essentials Guide: Pipeline Log Management

How to use the Wallaroo API for Pipeline Log Management

Table of Contents

Pipeline logs are retrieved through the Wallaroo MLOps API with the following request.

  • REQUEST URL
    • v1/api/pipelines/get_logs
  • Headers
    • Accept:
      • application/json; format=pandas-records: For the logs returned as pandas DataFrame
      • application/vnd.apache.arrow.file: for the logs returned as Apache Arrow
  • PARAMETERS
    • pipeline_id (String Required): The name of the pipeline.
    • workspace_id (Integer Required): The numerical identifier of the workspace.
    • cursor (String Optional): Cursor returned with a previous page of results from a pipeline log request, used to retrieve the next page of information.
    • order (String Optional Default: Desc): The order for log inserts returned. Valid values are:
      • Asc: In chronological order of inserts.
      • Desc: In reverse chronological order of inserts.
    • page_size (Integer Optional Default: 1000.): Max records per page.
    • start_time (String Optional): The start time of the period to retrieve logs for in RFC 3339 format for DateTime. Must be combined with end_time.
    • end_time (String Optional): The end time of the period to retrieve logs for in RFC 3339 format for DateTime. Must be combined with start_time.
  • RETURNS
    • The logs are returned by default as 'application/json; format=pandas-records' format. To request the logs as Apache Arrow tables, set the submission header Accept to application/vnd.apache.arrow.file.
    • Headers:
      • x-iteration-cursor: Used to retrieve the next page of results. This is not included if x-iteration-status is All.
      • x-iteration-status: Informs whether there are more records available outside of this log request parameters.
        • All: This page includes all logs available from this request. If x-iteration-status is All, then x-iteration-cursor is not provided.
        • SchemaChange: A change in the log schema caused by actions such as pipeline version, etc.
        • RecordLimited: The number of records exceeded from the page size, more records can be requested as the next page. There may be more records available to retrieve OR the record limit was reached for this request even if no more records are available in next cursor request.
        • ByteLimited: The number of records exceeded the pipeline log limit which is around 100K.

Standard Pipeline Logs Example

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/pipelines/get_logs"

# Standard log retrieval

data = {
    'pipeline_id': main_pipeline_name,
    'workspace_id': workspace_id
}

response = requests.post(url, headers=headers, json=data)
standard_logs = pd.DataFrame.from_records(response.json())

display(standard_logs.head(5).loc[:, ["time", "in", "out"]])
timeinout
01684423875900{'tensor': [4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0]}{'variable': [718013.75]}
11684423875900{'tensor': [2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0]}{'variable': [615094.56]}
21684423875900{'tensor': [3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0]}{'variable': [448627.72]}
31684423875900{'tensor': [4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0]}{'variable': [758714.2]}
41684423875900{'tensor': [3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0]}{'variable': [513264.7]}

Shadow Deploy Pipeline Logs Example

# Retrieve logs from specific date/time to only get the two DataFrame input inferences in ascending format

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/pipelines/get_logs"

# Standard log retrieval

data = {
    'pipeline_id': main_pipeline_name,
    'workspace_id': workspace_id,
    'order': 'Asc',
    'start_time': f'{shadow_date_start.isoformat()}',
    'end_time': f'{shadow_date_end.isoformat()}'
}

response = requests.post(url, headers=headers, json=data)
standard_logs = pd.DataFrame.from_records(response.json())

display(standard_logs.head(5).loc[:, ["time", "out", "out_logcontrolchallenger01", "out_logcontrolchallenger02"]])
timeoutout_logcontrolchallenger01out_logcontrolchallenger02
01684427140394{'variable': [718013.75]}{'variable': [659806.0]}{'variable': [704901.9]}
11684427140394{'variable': [615094.56]}{'variable': [732883.5]}{'variable': [695994.44]}
21684427140394{'variable': [448627.72]}{'variable': [419508.84]}{'variable': [416164.8]}
31684427140394{'variable': [758714.2]}{'variable': [634028.8]}{'variable': [655277.2]}
41684427140394{'variable': [513264.7]}{'variable': [427209.44]}{'variable': [426854.66]}

A/B Testing Pipeline Logs Example

# Retrieve logs from specific date/time to only get the two DataFrame input inferences in ascending format

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/pipelines/get_logs"

# Standard log retrieval

data = {
    'pipeline_id': main_pipeline_name,
    'workspace_id': workspace_id,
    'order': 'Asc',
    'start_time': f'{ab_date_start.isoformat()}',
    'end_time': f'{ab_date_end.isoformat()}'
}

response = requests.post(url, headers=headers, json=data)
standard_logs = pd.DataFrame.from_records(response.json())

display(standard_logs.head(5).loc[:, ["time", "out"]])
timeout
01684427501820{'_model_split': ['{"name":"logcontrolchallenger02","version":"89dba25e-a11e-453d-9bcc-cddf8d6ddea0","sha":"ed6065a79d841f7e96307bb20d5ef22840f15da0b587efb51425c7ad60589d6a"}'], 'variable': [715947.75]}
11684427502196{'_model_split': ['{"name":"logcontrolchallenger01","version":"07fe7686-9bd0-4fd3-9a7c-0e933a74003c","sha":"31e92d6ccb27b041a324a7ac22cf95d9d6cc3aa7e8263a229f7c4aec4938657c"}'], 'variable': [341386.34]}
21684427503778{'_model_split': ['{"name":"logapicontrol","version":"70b76ecb-55c2-4d68-be9b-b440b11e6499","sha":"e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6"}'], 'variable': [1039781.2]}
31684427504566{'_model_split': ['{"name":"logcontrolchallenger02","version":"89dba25e-a11e-453d-9bcc-cddf8d6ddea0","sha":"ed6065a79d841f7e96307bb20d5ef22840f15da0b587efb51425c7ad60589d6a"}'], 'variable': [411090.75]}
41684427505094{'_model_split': ['{"name":"logcontrolchallenger02","version":"89dba25e-a11e-453d-9bcc-cddf8d6ddea0","sha":"ed6065a79d841f7e96307bb20d5ef22840f15da0b587efb51425c7ad60589d6a"}'], 'variable': [296175.66]}

Pipeline Log Storage

Pipeline logs have a set allocation of storage space and data requirements.

Pipeline Log Storage Warnings

To prevent storage and performance issues, inference result data may be dropped from pipeline logs by the following standards:

  • Columns are progressively removed from the row starting with the largest input data size and working to the smallest, then the same for outputs.

For example, Computer Vision ML Models typically have large inputs and output values - a single pandas DataFrame inference request may be over 13 MB in size, and the inference results nearly as large. To prevent pipeline log storage issues, the input may be dropped from the pipeline logs, and if additional space is needed, the inference outputs would follow. The time column is preserved.

If a pipeline has dropped columns for space purposes, this will be displayed when a log request is made with the following warning, with {columns} replaced with the dropped columns.

The inference log is above the allowable limit and the following columns may have been suppressed for various rows in the logs: {columns}. To review the dropped columns for an individual inferences suppressed data, include dataset=["metadata"] in the log request.

Review Dropped Columns

To review what columns are dropped from pipeline logs for storage reasons, include the dataset metadata in the request to view the column metadata.dropped. This metadata field displays a List of any columns dropped from the pipeline logs.

For example:

# retrieve the authorization token
headers = wl.auth.auth_header()

url = f"{APIURL}/v1/api/pipelines/get_logs"

# Standard log retrieval

data = {
    'pipeline_id': main_pipeline_name,
    'workspace_id': workspace_id
}

response = requests.post(url, headers=headers, json=data)
standard_logs = pd.DataFrame.from_records(response.json())

display(len(standard_logs))
display(standard_logs.head(5).loc[:, ["time", "metadata"]])
cursor = response.headers['x-iteration-cursor']
 timemetadata
01688760035752{’last_model’: ‘{“model_name”:“logapicontrol”,“model_sha”:“e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6”}’, ‘pipeline_version’: ‘’, ’elapsed’: [112967, 267146], ‘dropped’: []}
11688760036054{’last_model’: ‘{“model_name”:“logapicontrol”,“model_sha”:“e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6”}’, ‘pipeline_version’: ‘’, ’elapsed’: [37127, 594183], ‘dropped’: []}
21688759857040{’last_model’: ‘{“model_name”:“logapicontrol”,“model_sha”:“e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6”}’, ‘pipeline_version’: ‘’, ’elapsed’: [111082, 253184], ‘dropped’: []}
31688759857526{’last_model’: ‘{“model_name”:“logapicontrol”,“model_sha”:“e22a0831aafd9917f3cc87a15ed267797f80e2afa12ad7d8810ca58f173b8cc6”}’, ‘pipeline_version’: ‘’, ’elapsed’: [43962, 265740], ‘dropped’: []}

Suppressed Data Elements

Data elements that do not fit the supported data types below, such as None or Null values, are not supported in pipeline logs. When present, undefined data will be written in the place of the null value, typically zeroes. Any null list values will present an empty list.