.

.

Wallaroo SDK Essentials Guide: Pipelines

The classes and methods for managing Wallaroo pipelines and configurations.

1 - Wallaroo SDK Essentials Guide: Pipeline Management

How to create and manage Wallaroo Pipelines through the Wallaroo SDK

Pipelines are the method of taking submitting data and processing that data through the models. Each pipeline can have one or more steps that submit the data from the previous step to the next one. Information can be submitted to a pipeline as a file, or through the pipeline’s URL.

A pipeline’s metrics can be viewed through the Wallaroo Dashboard Pipeline Details and Metrics page.

Pipeline Naming Requirements

Pipeline names map onto Kubernetes objects, and must be DNS compliant. Pipeline names must be ASCII alpha-numeric characters or dash (-) only. . and _ are not allowed.

Create a Pipeline

New pipelines are created in the current workspace.

To create a new pipeline, use the Wallaroo Client build_pipeline("{Pipeline Name}") command.

The following example creates a new pipeline imdb-pipeline through a Wallaroo Client connection wl:

imdb_pipeline = wl.build_pipeline("imdb-pipeline")

imdb_pipeline.status()
{'status': 'Pipeline imdb-pipeline is not deployed'}

List All Pipelines

The Wallaroo Client method list_pipelines() lists all pipelines in a Wallaroo Instance.

The following example lists all pipelines in the wl Wallaroo Client connection:

wl.list_pipelines()

[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]

Select an Existing Pipeline

Rather than creating a new pipeline each time, an existing pipeline can be selected by using the list_pipelines() command and assigning one of the array members to a variable.

The following example sets the pipeline ccfraud-pipeline to the variable current_pipeline:

wl.list_pipelines()

[{'name': 'ccfraud-pipeline', 'create_time': datetime.datetime(2022, 4, 12, 17, 55, 41, 944976, tzinfo=tzutc()), 'definition': '[]'}]

current_pipeline = wl.list_pipelines()[0]

current_pipeline.status()

{'status': 'Running',
 'details': None,
 'engines': [{'ip': '10.244.5.4',
   'name': 'engine-7fcc7df596-hvlxb',
   'status': 'Running',
   'reason': None,
   'pipeline_statuses': {'pipelines': [{'id': 'ccfraud-pipeline',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'ccfraud-model',
      'version': '4624e8a8-1414-4408-8b40-e03da4b5cb68',
      'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.1.24',
   'name': 'engine-lb-85846c64f8-mtq9p',
   'status': 'Running',
   'reason': None}]}

Pipeline Steps

Once a pipeline has been created, or during its creation process, a pipeline step can be added. The pipeline step refers to the model that will perform an inference off of the data submitted to it. Each time a step is added, it is added to the pipeline’s models array.

Pipeline steps are not saved until the pipeline is deployed. Until then, pipeline steps are stored in local memory as a potential pipeline configuration until the pipeline is deployed.

Add a Step to a Pipeline

A pipeline step is added through the pipeline add_model_step({Model}) command.

In the following example, two models uploaded to the workspace are added as pipeline step:

imdb_pipeline.add_model_step(embedder)
imdb_pipeline.add_model_step(smodel)

imdb_pipeline.status()

{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}

Replace a Pipeline Step

The model specified in a pipeline step can be replaced with the pipeline method replace_with_model_step(index, model).

The following parameters are used for replacing a pipeline step:

ParameterDefault ValuePurpose
indexnullThe pipeline step to be replaced. Pipeline steps follow array numbering, where the first step is 0, etc.
modelnullThe new model to be used in the pipeline step.

In the following example, a deployed pipeline will have the initial model step replaced with a new one. A status of the pipeline will be displayed after deployment and after the pipeline swap to show the model has been replaced from ccfraudoriginal to ccfraudreplacement, each with their own versions.

pipeline.deploy()

pipeline.status()

{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.244.2.145',
   'name': 'engine-75bfd7dc9d-7p9qk',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'ccfraudoriginal',
      'version': '3a03dc94-716e-46bb-84c8-91bc99ceb2c3',
      'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.2.144',
   'name': 'engine-lb-55dcdff64c-vf74s',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': []}

pipeline.replace_with_model_step(0, replacement_model).deploy()

pipeline.status()

{'status': 'Running',
 'details': [],
 'engines': [{'ip': '10.244.2.153',
   'name': 'engine-96486c95d-zfchr',
   'status': 'Running',
   'reason': None,
   'details': [],
   'pipeline_statuses': {'pipelines': [{'id': 'hotswappipeline',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'ccfraudreplacement',
      'version': '714efd19-5c83-42a8-aece-24b4ba530925',
      'sha': 'bc85ce596945f876256f41515c7501c399fd97ebcb9ab3dd41bf03f8937b4507',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.244.2.154',
   'name': 'engine-lb-55dcdff64c-9np9k',
   'status': 'Running',
   'reason': None,
   'details': []}],
 'sidekicks': []}
Pre and Post Processing Steps

A Pipeline Step can be more than models - they can also be pre processing and post processing steps. For example, the Demand Curve Tutorial has both a pre and post processing steps that are added to the pipeline. The preprocessing step uses the following code:

import numpy
import pandas

import json

# add interaction terms for the model
def actual_preprocess(pdata):
    pd = pdata.copy()
    # convert boolean cust_known to 0/1
    pd.cust_known = numpy.where(pd.cust_known, 1, 0)
    # interact UnitPrice and cust_known
    pd['UnitPriceXcust_known'] = pd.UnitPrice * pd.cust_known
    return pd.loc[:, ['UnitPrice', 'cust_known', 'UnitPriceXcust_known']]

# If the data is a json string, call this wrapper instead
# Expected input:
# a dictionary with fields 'colnames', 'data'

# test that the code works here
def wallaroo_json(data):
    obj = json.loads(data)
    pdata = pandas.DataFrame(obj['query'],
                             columns=obj['colnames'])
    pprocessed = actual_preprocess(pdata)
    
   # return a dictionary, with the fields the model expect
    return {
       'tensor_fields': ['model_input'],
       'model_input': pprocessed.to_numpy().tolist()
    }

It is added as a Python module by uploading it as a model:

# load the preprocess module
module_pre = wl.upload_model("preprocess", "./preprocess.py").configure('python')

And then added to the pipeline as a step:

# now make a pipeline
demandcurve_pipeline = (wl.build_pipeline("demand-curve-pipeline")
                        .add_model_step(module_pre)
                        .add_model_step(demand_curve_model)
                        .add_model_step(module_post))

Remove a Pipeline Step

To remove a step from the pipeline, use the Pipeline remove_step(index) command, where the index is the array index for the pipeline’s steps.

In the following example the pipeline imdb_pipeline will have the step with the model smodel-o removed.

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.remove_step(1)
{'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}]"}

Clear All Pipeline Steps

The Pipeline clear() method removes all pipeline steps from a pipeline. Note that pipeline steps are not saved until the pipeline is deployed.

Manage Pipeline Deployment Configuration

For full details on pipeline deployment configurations, see Wallaroo SDK Essentials Guide: Pipeline Deployment Configuration.

Deploy a Pipeline

When a pipeline step is added or removed, the pipeline must be deployed through the pipeline deploy(). This allocates resources to the pipeline from the Kubernetes environment and make it available to submit information to perform inferences. This process typically takes 45 seconds.

Once complete, the pipeline status() command will show 'status':'Running'.

Pipeline deployments can be modified to enable auto-scaling to allow pipelines to allocate more or fewer resources based on need by setting the pipeline’s This will then be applied to the deployment of the pipelineccfraudPipelineby specifying it'sdeployment_config` optional parameter. If this optional parameter is not passed, then the deployment will defer to default values. For more information, see Manage Pipeline Deployment Configuration.

In the following example, the pipeline imdb-pipeline that contains two steps will be deployed with default deployment configuration:

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok

imdb_pipeline.status()

{'status': 'Running',
 'details': None,
 'engines': [{'ip': '10.12.1.65',
   'name': 'engine-778b65459-f9mt5',
   'status': 'Running',
   'reason': None,
   'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'embedder-o',
      'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
      'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
      'status': 'Running'},
     {'name': 'smodel-o',
      'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
      'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.12.1.66',
   'name': 'engine-lb-85846c64f8-ggg2t',
   'status': 'Running',
   'reason': None}]}

Troubleshooting Pipeline Deployment

If you deploy more pipelines than your environment can handle, or if you deploy more pipelines than your license allows, you may see an error like the following:


LimitError: You have reached a license limit in your Wallaroo instance. In order to add additional resources, you can remove some of your existing resources. If you have any questions contact us at community@wallaroo.ai: MAX_PIPELINES_LIMIT_EXCEEDED

Undeploy any unnecessary pipelines either through the SDK or through the Wallaroo Pipeline Dashboard, then attempt to redeploy the pipeline in question again.

Undeploy a Pipeline

When a pipeline is not currently needed, it can be undeployed and its resources turned back to the Kubernetes environment. To undeploy a pipeline, use the pipeline undeploy() command.

In this example, the aloha_pipeline will be undeployed:

aloha_pipeline.undeploy()

{'name': 'aloha-test-demo', 'create_time': datetime.datetime(2022, 3, 29, 20, 34, 3, 960957, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'aloha-2', 'version': 'a8e8abdc-c22f-416c-a13c-5fe162357430', 'sha': 'fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520'}]}}]"}

Get Pipeline Status

The pipeline status() command shows the current status, models, and other information on a pipeline.

The following example shows the pipeline imdb_pipeline status before and after it is deployed:

imdb_pipeline.status

<bound method Pipeline.status of {'name': 'imdb-pipeline', 'create_time': datetime.datetime(2022, 3, 30, 21, 21, 31, 127756, tzinfo=tzutc()), 'definition': "[{'ModelInference': {'models': [{'name': 'embedder-o', 'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d', 'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4'}]}}, {'ModelInference': {'models': [{'name': 'smodel-o', 'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19', 'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650'}]}}]"}>

imdb_pipeline.deploy()
Waiting for deployment - this will take up to 45s ...... ok

imdb_pipeline.status()

{'status': 'Running',
 'details': None,
 'engines': [{'ip': '10.12.1.65',
   'name': 'engine-778b65459-f9mt5',
   'status': 'Running',
   'reason': None,
   'pipeline_statuses': {'pipelines': [{'id': 'imdb-pipeline',
      'status': 'Running'}]},
   'model_statuses': {'models': [{'name': 'embedder-o',
      'version': '1c16d21d-fe4c-4081-98bc-65fefa465f7d',
      'sha': 'd083fd87fa84451904f71ab8b9adfa88580beb92ca77c046800f79780a20b7e4',
      'status': 'Running'},
     {'name': 'smodel-o',
      'version': '8d311ba3-c336-48d3-99cd-85d95baa6f19',
      'sha': '3473ea8700fbf1a1a8bfb112554a0dde8aab36758030dcde94a9357a83fd5650',
      'status': 'Running'}]}}],
 'engine_lbs': [{'ip': '10.12.1.66',
   'name': 'engine-lb-85846c64f8-ggg2t',
   'status': 'Running',
   'reason': None}]}

Anomaly Testing

Anomaly detection allows organizations to set validation parameters. A validation is added to a pipeline to test data based on a specific expression. If the expression is returned as False, this is detected as an anomaly and added to the InferenceResult object’s check_failures array and the pipeline logs.

Anomaly detection consists of the following steps:

  • Set a validation: Add a validation to a pipeline that, when returned False, adds an entry to the InferenceResult object’s check_failures attribute with the expression that caused the failure.
  • Display anomalies: Anomalies detected through a Pipeline’s validation attribute are displayed either through the InferenceResult object’s check_failures attribute, or through the pipeline’s logs.

Set A Validation

Validations are added to a pipeline through the wallaroo.pipeline add_validation method. The following parameters are required:

ParameterTypeDescription
nameString (Required)The name of the validation
validationwallaroo.checks.Expression (Required)The validation expression that adds the result InferenceResult object’s check_failures attribute when expression result is False. The validation checks the expression against both the data value and the data type.

Validation expressions take the format value Expression, with the expression being in the form of a :py:Expression:. For example, if the model housing_model is part of the pipeline steps, then a validation expression may be housing_model.outputs[0][0] < 100.0: If the output of the housing_model inference is less than 100, then the validation is True and no action is taken. Any values over 100, the validation is False which triggers adding the anomaly to the InferenceResult object’s check_failures attribute.

Note that multiple validations can be created to allow for multiple anomalies detection.

In the following example, a validation is added to the pipeline to detect housing prices that are below 100 (represented as $100 million), and trigger an anomaly for values above that level. When an inference is performed that triggers a validation failure, the results are displayed in the InferenceResult object’s check_failures attribute.

p = wl.build_pipeline('anomaly-housing-pipeline')
p = p.add_model_step(housing_model)
p = p.add_validation('price too high', housing_model.outputs[0][0] < 100.0)
pipeline = p.deploy()

test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145,  2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}
response_trigger = pipeline.infer(test_input)
print("\n")
print(response_trigger)

[InferenceResult({'check_failures': [{'False': {'expr': 'anomaly-housing.outputs[0][0] < 100'}}],
 'elapsed': 15110549,
 'model_name': 'anomaly-housing',
 'model_version': 'c3cf1577-6666-48d3-b85c-5d4a6e6567ea',
 'original_data': {'dense_16_input': [[0.02675675,
                                       0.0,
                                       0.02677953,
                                       0.0,
                                       0.0010046,
                                       0.00951931,
                                       0.14795322,
                                       0.0027145,
                                       2,
                                       0.98536841,
                                       0.02988655,
                                       0.04031725,
                                       0.04298041]]},
 'outputs': [{'Float': {'data': [350.46990966796875], 'dim': [1, 1], 'v': 1}}],
 'pipeline_name': 'anomaly-housing-model',
 'time': 1651257043312})]

Display Anomalies

Anomalies detected through a Pipeline’s validation attribute are displayed either through the InferenceResult object’s check_failures attribute, or through the pipeline’s logs.

To display an anomaly through the InferenceResult object, display the check_failures attribute.

In the following example, the an InferenceResult where the validation failed will display the failure in the check_failures attribute:

test_input = {"dense_16_input":[[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145,  2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]}
response_trigger = pipeline.infer(test_input)
print("\n")
print(response_trigger)

[InferenceResult({'check_failures': [{'False': {'expr': 'anomaly-housing-model.outputs[0][0] < '
                                       '100'}}],
 'elapsed': 12196540,
 'model_name': 'anomaly-housing-model',
 'model_version': 'a3b1c29f-c827-4aad-817d-485de464d59b',
 'original_data': {'dense_16_input': [[0.02675675,
                                       0.0,
                                       0.02677953,
                                       0.0,
                                       0.0010046,
                                       0.00951931,
                                       0.14795322,
                                       0.0027145,
                                       2,
                                       0.98536841,
                                       0.02988655,
                                       0.04031725,
                                       0.04298041]]},
 'outputs': [{'Float': {'data': [350.46990966796875], 'dim': [1, 1], 'v': 1}}],
 'pipeline_name': 'anomaly-housing-pipeline',
 'shadow_data': {},
 'time': 1667416852255})]

The other methods is to use the pipeline.logs() method with the parameter valid=False, isolating the logs where the validation was returned as False.

In this example, a set of logs where the validation returned as False will be displayed:

pipeline.logs(valid=False)
TimestampOutputInputAnomalies
2022-02-Nov 19:20:52[array([[350.46990967]])][[0.02675675, 0.0, 0.02677953, 0.0, 0.0010046, 0.00951931, 0.14795322, 0.0027145, 2, 0.98536841, 0.02988655, 0.04031725, 0.04298041]]1

A/B Testing

A/B testing is a method that provides the ability to test competing ML models for performance, accuracy or other useful benchmarks. Different models are added to the same pipeline steps as follows:

  • Control or Champion model: The model currently used for inferences.
  • Challenger model(s): The model or set of models compared to the challenger model.

A/B testing splits a portion of the inference requests between the champion model and the one or more challengers through the add_random_split method. This method splits the inferences submitted to the model through a randomly weighted step.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model, 0.6 would route to the second.

Add Random Split

A random split step can be added to a pipeline through the add_random_split method.

The following parameters are used when adding a random split step to a pipeline:

ParameterTypeDescription
champion_weightFloat (Required)The weight for the champion model.
champion_modelWallaroo.Model (Required)The uploaded champion model.
challenger_weightFloat (Required)The weight of the challenger model.
challenger_modelWallaroo.Model (Required)The uploaded challenger model.
hash_keyString(Optional)A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

Note that multiple challenger models with different weights can be added as the random split step.

add_random_split([(champion_weight, champion_model), (challenger_weight, challenger_model),  (challenger_weight2, challenger_model2),...], hash_key)

In this example, a pipeline will be built with a 2:1 weighted ratio between the champion and a single challenger model.

pipeline = (wl.build_pipeline("randomsplitpipeline-demo")
            .add_random_split([(2, control), (1, challenger)]))

The results for a series of single are displayed to show the random weighted split between the two models in action:

results = []
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))
results.append(experiment_pipeline.infer_from_file("data/data-1.json"))

for result in results:
    print(result[0].model())
    print(result[0].data())

('aloha-control', 'ff81f634-8fb4-4a62-b873-93b02eb86ab4')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-control', 'ff81f634-8fb4-4a62-b873-93b02eb86ab4')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

('aloha-challenger', '87fdfe08-170e-4231-a0b9-543728d6fc57')
[array([[0.00151959]]), array([[0.98291481]]), array([[0.01209957]]), array([[4.75912966e-05]]), array([[2.02893716e-05]]), array([[0.00031977]]), array([[0.01102928]]), array([[0.99756402]]), array([[0.01034162]]), array([[0.00803896]]), array([[0.01615506]]), array([[0.00623623]]), array([[0.00099858]]), array([[1.79337805e-26]]), array([[1.38899512e-27]])]

Replace With Random Split

If a pipeline already had steps as detailed in Add a Step to a Pipeline, this step can be replaced with a random split with the replace_with_random_split method.

The following parameters are used when adding a random split step to a pipeline:

ParameterTypeDescription
indexInteger (Required)The pipeline step being replaced.
champion_weightFloat (Required)The weight for the champion model.
champion_modelWallaroo.Model (Required)The uploaded champion model.
**challenger_weightFloat (Required)The weight of the challenger model.
challenger_modelWallaroo.Model (Required)The uploaded challenger model.
hash_keyString(Optional)A key used instead of a random number for model selection. This must be between 0.0 and 1.0.

Note that one or more challenger models can be added for the random split step:

replace_with_random_split(index, [(champion_weight, champion_model), (challenger_weight, challenger_model)], (challenger_weight2, challenger_model2),...], hash_key)

A/B Testing Logs

A/B Testing logs entries contain the model used for the inferences in the column out._model_split.

logs = experiment_pipeline.logs(limit=5)
display(logs.loc[:,['time', 'out._model_split', 'out.main']])
timeout._model_splitout.main
02023-03-03 19:08:35.653[{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}][0.9999754]
12023-03-03 19:08:35.702[{“name”:“aloha-challenger”,“version”:“3acd3835-be72-42c4-bcae-84368f416998”,“sha”:“223d26869d24976942f53ccb40b432e8b7c39f9ffcf1f719f3929d7595bceaf3”}][0.9999727]
22023-03-03 19:08:35.753[{“name”:“aloha-challenger”,“version”:“3acd3835-be72-42c4-bcae-84368f416998”,“sha”:“223d26869d24976942f53ccb40b432e8b7c39f9ffcf1f719f3929d7595bceaf3”}][0.6606688]
32023-03-03 19:08:35.799[{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}][0.9998954]
42023-03-03 19:08:35.846[{“name”:“aloha-control”,“version”:“89389786-0c17-4214-938c-aa22dd28359f”,“sha”:“fd998cd5e4964bbbb4f8d29d245a8ac67df81b62be767afbceb96a03d1a01520”}][0.99999803]

Pipeline Shadow Deployments

Wallaroo provides a method of testing the same data against two different models or sets of models at the same time through shadow deployments otherwise known as parallel deployments or A/B test. This allows data to be submitted to a pipeline with inferences running on several different sets of models. Typically this is performed on a model that is known to provide accurate results - the champion - and a model or set of models that is being tested to see if it provides more accurate or faster responses depending on the criteria known as the challenger(s). Multiple challengers can be tested against a single champion to determine which is “better” based on the organization’s criteria.

As described in the Wallaroo blog post The What, Why, and How of Model A/B Testing:

In data science, A/B tests can also be used to choose between two models in production, by measuring which model performs better in the real world. In this formulation, the control is often an existing model that is currently in production, sometimes called the champion. The treatment is a new model being considered to replace the old one. This new model is sometimes called the challenger….
Keep in mind that in machine learning, the terms experiments and trials also often refer to the process of finding a training configuration that works best for the problem at hand (this is sometimes called hyperparameter optimization).

When a shadow deployment is created, only the inference from the champion is returned in the InferenceResult Object data, while the result data for the shadow deployments is stored in the InferenceResult Object shadow_data.

Create Shadow Deployment

Create a parallel or shadow deployment for a pipeline with the pipeline.add_shadow_deploy(champion, challengers[]) method, where the champion is a Wallaroo Model object, and challengers[] is one or more Wallaroo Model objects.

Each inference request sent to the pipeline is sent to all the models. The prediction from the champion is returned by the pipeline, while the predictions from the challengers are not part of the standard output, but are kept stored in the shadow_data attribute and in the logs for later comparison.

In this example, a shadow deployment is created with the champion versus two challenger models.

champion = wl.upload_model(champion_model_name, champion_model_file).configure()
model2 = wl.upload_model(shadow_model_01_name, shadow_model_01_file).configure()
model3 = wl.upload_model(shadow_model_02_name, shadow_model_02_file).configure()
   
pipeline.add_shadow_deploy(champion, [model2, model3])
pipeline.deploy()
  
namecc-shadow
created2022-08-04 20:06:55.102203+00:00
last_updated2022-08-04 20:37:28.785947+00:00
deployedTrue
tags
stepsccfraud-lstm

Shadow Deploy Outputs

Model outputs are listed by column based on the model’s outputs. The output data is set by the term out, followed by the name of the model. For the default model, this is out.{variable_name}, while the shadow deployed models are in the format out_{model name}.variable, where {model name} is the name of the shadow deployed model.

sample_data_file = './smoke_test.df.json'
response = pipeline.infer_from_file(sample_data_file)
timein.tensorout.dense_1check_failuresout_ccfraudrf.variableout_ccfraudxgb.variable
02023-03-03 17:35:28.859[1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756][0.0014974177]0[1.0][0.0005066991]

Retrieve Shadow Deployment Logs

Shadow deploy results are part of the Pipeline.logs() method. The output data is set by the term out, followed by the name of the model. For the default model, this is out.dense_1, while the shadow deployed models are in the format out_{model name}.variable, where {model name} is the name of the shadow deployed model.

logs = pipeline.logs()
display(logs)
timein.tensorout.dense_1check_failuresout_ccfraudrf.variableout_ccfraudxgb.variable
02023-03-03 17:35:28.859[1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756][0.0014974177]0[1.0][0.0005066991]

Get Pipeline URL Endpoint

The Pipeline URL Endpoint or the Pipeline Deploy URL is used to submit data to a pipeline to use for an inference. This is done through the pipeline _deployment._url() method.

In this example, the pipeline URL endpoint for the pipeline ccfraud_pipeline will be displayed:

ccfraud_pipeline._deployment._url()

'http://engine-lb.ccfraud-pipeline-1:29502/pipelines/ccfraud-pipeline'

2 - Wallaroo SDK Essentials Guide: Pipeline Deployment Configuration

Details on pipeline configurations and settings

Pipeline deployments configurations allow tailoring of a pipeline’s resources to match an organization’s and model’s requirements. Pipelines may require more memory, CPU cores, or GPUs to run to run all its steps efficiently. Pipeline deployment configurations also allow for multiple replicas of a model in a pipeline to provide scalability.

Create Pipeline Configuration

Setting a pipeline deployment configuration follows this process:

  1. Pipeline deployment configurations are created through the wallaroo ‘deployment_config.DeploymentConfigBuilder()](https://docs.wallaroo.ai/20230201/wallaroo-developer-guides/wallaroo-sdk-guides/wallaroo-sdk-reference-guide/deployment_config/#DeploymentConfigBuilder) class.
  2. Once the configuration options are set the pipeline deployment configuration is set with the deployment_config.build() method.
  3. The pipeline deployment configuration is then applied when the pipeline is deployed.

The following example shows a pipeline deployment configuration with 1 replica, 1 cpu, and 2Gi of memory set to be allocated to the pipeline.

deployment_config = wallaroo.DeploymentConfigBuilder()
                    .replica_count(1)
                    .cpus(1)
                    .memory("2Gi")
                    .build()

pipeline.deploy(deployment_config = deployment_config)

Pipeline resources can be configured with autoscaling. Autoscaling allows the user to define how many engines a pipeline starts with, the minimum amount of engines a pipeline uses, and the maximum amount of engines a pipeline can scale to. The pipeline scales up and down based on the average CPU utilization across the engines in a given pipeline as the user’s workload increases and decreases.

Pipeline Resource Configurations

Pipeline deployment configurations deal with two major components:

  • Native Runtimes: Models that are deployed “as is” with the Wallaroo engine (Onnx, etc).
  • Containerized Runtimes: Models that are packaged into a container then deployed as a container with the Wallaroo engine (MLFlow, etc).

These configurations can be mixed - both native runtimes and containerized runtimes deployed to the same pipeline, with resources allocated to each runtimes in different configurations.

The following resources configurations are available through the wallaroo.deployment_config object.

GPU and CPU Allocation

CPUs are allocated in fractions of total CPU power similar to the Kubernetes CPU definitions. cpus(0.25), cpus(1.0), etc are valid values.

GPUs can only be allocated by entire integer units from the GPU enabled nodepools. gpus(1), gpus(2), etc are valid values, while gpus(0.25) are not.

Organizations should be aware of how many GPUs are allocated to the cluster. If all GPUs are already allocated to other pipelines, or if there are not enough GPUs to fulfill the request, the pipeline deployment will fail and return an error message.

GPU Support

Wallaroo 2023.2.1 and above supports Kubernetes nodepools with Nvidia Cuda GPUs.

See the Create GPU Nodepools for Kubernetes Clusters guide for instructions on adding GPU enabled nodepools to a Kubernetes cluster.

Native Runtime Configuration Methods

MethodParametersDescriptionEnterprise Only Feature
replica_count(count: int)The number of replicas of the pipeline to deploy. This allows for multiple deployments of the same models to be deployed to increase inferences through parallelization.
replica_autoscale_min_max(maximum: int, minimum: int = 0)Provides replicas to be scaled from 0 to some maximum number of replicas. This allows pipelines to spin up additional replicas as more resources are required, then spin them back down to save on resources and costs.
autoscale_cpu_utilization(cpu_utilization_percentage: int)Sets the average CPU percentage metric for when to load or unload another replica.
disable_autoscaleDisables autoscaling in the deployment configuration. 
cpus(core_count: float)Sets the number or fraction of CPUs to use for the pipeline, for example: 0.25, 1, 1.5, etc. The units are similar to the Kubernetes CPU definitions. 
gpus(core_count: int)Sets the number of GPUs to allocate for native runtimes. GPUs are only allocated in whole units, not as fractions. Organizations should be aware of the total number of GPUs available to the cluster, and monitor which pipeline deployment configurations have gpus allocated to ensure they do not run out. If there are not enough gpus to allocate to a pipeline deployment configuration, and error message will be deployed when the pipeline is deployed. If gpus is called, then the deployment_label must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance.
memorymemory_spec: strSets the amount of RAM to allocate the pipeline. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:
  • KiB (for KiloBytes)
  • MiB (for MegaBytes)
  • GiB (for GigaBytes)
  • TiB (for TeraBytes)
The values are similar to the Kubernetes memory resource units format.
 
lb_cpus(core_count: float)Sets the number or fraction of CPUs to use for the pipeline’s load balancer, for example: 0.25, 1, 1.5, etc. The units, similar to the Kubernetes CPU definitions. 
lb_memorymemory_spec: strSets the amount of RAM to allocate the pipeline’s load balancer. The memory_spec string is in the format “{size as number}{unit value}”. The accepted unit values are:
  • KiB (for KiloBytes)
  • MiB (for MegaBytes)
  • GiB (for GigaBytes)
  • TiB (for TeraBytes)
The values are similar to the Kubernetes memory resource units format.
 
deployment_label Label used for Kubernetes labels. Required if gpus are set and must match the GPU nodepool label.

Containerized Runtime Configuration Methods

MethodParametersDescriptionEnterprise Only Feature
sidekick_cpus(model: wallaroo.model.Model, core_count: float)Sets the number of CPUs to be used for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:
  • Model model: The sidekick model to configure.
  • float core_count: Number of CPU cores to use in this sidekick.
 
sidekick_memory(model: wallaroo.model.Model, memory_spec: str)Sets the memory available to for the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. The parameters are as follows:
  • Model model: The sidekick model to configure.
  • memory_spec: The amount of memory to allocated as memory unit values. The accepted unit values are:
    • KiB (for KiloBytes)
    • MiB (for MegaBytes)
    • GiB (for GigaBytes)
    • TiB (for TeraBytes)
    The values are similar to the Kubernetes memory resource units format.
 
sidekick_env(model: wallaroo.model.Model, environment: Dict[str, str])Environment variables submitted to the model’s sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment. These are used specifically for containerized models that have environment variables that effect their performance. 
sidekick_gpus(model: wallaroo.model.Model, core_count: int)Sets the number of GPUs to allocate for containerized runtimes. GPUs are only allocated in whole units, not as fractions. Organizations should be aware of the total number of GPUs available to the cluster, and monitor which pipeline deployment configurations have gpus allocated to ensure they do not run out. If there are not enough gpus to allocate to a pipeline deployment configuration, and error message will be deployed when the pipeline is deployed. If called, then the deployment_label must be called and match the GPU Nodepool for the Wallaroo Cluster hosting the Wallaroo instance.

Examples

Native Runtime Deployment

The following will set native runtime deployment to one quarter of a CPU with 1 Gi of Ram:

deployment_config = DeploymentConfigBuilder() \
    .cpus(0.25).memory('1Gi') \
    .build()

This example sets the replica count to 1, then sets the auto-scale to vary between 2 to 5 replicas depending on need, with 1 CPU and 1 GI RAM allocated per replica.

deploy_config = (wallaroo.DeploymentConfigBuilder()
                        .replica_count(1)
                        .replica_autoscale_min_max(minimum=2, maximum=5)
                        .cpus(1)
                        .memory("1Gi")
                        .build()
                    )

The following configuration allocates 1 GPU to the pipeline for native runtimes.

deployment_config = DeploymentConfigBuilder()
                    .cpus(0.25)
                    .memory('1Gi')
                    .gpus(1)
                    .deployment_label('doc-gpu-label:true')
                    .build()

Containerized Runtime Deployment

The following configuration allocates 0.25 CPU and 1Gi RAM to the containerized runtime sm_model, and passes that runtime environmental variables used for timeout settings.

deployment_config = DeploymentConfigBuilder()
                    .sidekick_cpus(sm_model, 0.25)
                    .sidekick_memory(sm_model, '1Gi')
                    .sidekick_env(sm_model, 
                        {"GUNICORN_CMD_ARGS":
                        "__timeout=188 --workers=1"}
                    )
                    .build()

This example shows allocating 1 GPU to the containerized runtime model sm_model.

deployment_config = DeploymentConfigBuilder()
                    .sidekick_gpus(sm_model, 1)
                    .deployment_label('doc-gpu-label:true')
                    .sidekick_memory(sm_model, '1Gi')
                    .build()

Mixed Environments

The following configuration allocates 1 gpu to the pipeline for native runtimes, then another gpu to the containerized runtime sm_model for a total of 2 gpus allocated to the pipeline: one gpu for native runtimes, another gpu for the containerized runtime model sm_model.

deployment_config = DeploymentConfigBuilder()
                    .cpus(0.25)
                    .memory('1Gi')
                    .gpus(1)
                    .sidekick_gpus(sm_model, 1)
                    .deployment_label('doc-gpu-label:true')
                    .build()

3 - Wallaroo SDK Essentials Guide: Pipeline Log Management

How to create and manage Wallaroo Pipelines through the Wallaroo SDK

Pipeline have their own set of log files that are retrieved and analyzed as needed with the either through:

  • The Pipeline logs method (returns either a DataFrame or Apache Arrow).
  • The Pipeline export_logs method (saves either a DataFrame file in JSON format, or an Apache Arrow file).

Get Pipeline Logs

Pipeline logs are retrieved through the Pipeline logs method. By default, logs are returned as a DataFrame in reverse chronological order of insertion, with the most recent files displayed first.

Pipeline logs are segmented by pipeline versions. For example, if a new model step is added to a pipeline, a model swapped out of a pipeline step, etc - this generated a new pipeline version. log method requests will return logs based on the parameter that match the pipeline version. To request logs of a specific pipeline version, specify the start_datetime and end_datetime parameters based on the pipeline version logs requested.

This command takes the following parameters.

ParameterTypeDescription
limitInt (Optional) (Default: 100)Limits how many log records to display. If there are more pipeline logs than are being displayed, the Warning message Pipeline log record limit exceeded will be displayed. For example, if 100 log files were requested and there are a total of 1,000, the warning message will be displayed.
start_datetime and end_datetimeDateTime (Optional)Limits logs to all logs between the start_datetime and end_datetime DateTime parameters. These comply with the Python datetime library for formats such as:
  • datetime.datetime.now()
  • datetime.datetime(2023, 3, 28, 14, 25, 51, 660058, tzinfo=tzutc()) (March 28, 2023 14:25:51:660058 UTC time zone)

Both parameters must be provided. Submitting a logs() request with only start_datetime or end_datetime will generate an exception.
If start_datetime and end_datetime are provided as parameters even with any other parameter, then the records are returned in chronological order, with the oldest record displayed first.
datasetList[String] (OPTIONAL)The datasets to be returned. The datasets available are:
  • *: Default. This translates to ["time", "in", "out", "check_failures"].
  • time: The DateTime of the inference request.
  • in: All inputs listed as in_{variable_name}.
  • out: All outputs listed as out_variable_name.
  • check_failures: Flags whether an Anomaly or Validation Check was triggered. 0 indicates no checks weretriggers, 1 or greater indicates a check was triggered.
  • meta: Returns metadata. IMPORTANT NOTE: See Metadata RequestsRestrictions for specifications on how this dataset can be used with otherdatasets.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
    • Returns in the metadata.last_model field:
      • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
    • Returns in the metadata.pipeline_version field:
      • The pipeline version as a UUID value.
  • metadata.elapsed: IMPORTANT NOTE: See Metadata Requests Restrictionsfor specifications on how this dataset can be used with other datasets.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
dataset_excludeList[String] (OPTIONAL)Exclude specified datasets.
dataset_separatorSequence[[String], string] (OPTIONAL)If set to “.”, return dataset will be flattened.
arrowBoolean (Optional) (Default: False)If arrow is set to True, then the logs are returned as an Apache Arrow table. If arrow=False, then the logs are returned as a pandas DataFrame.

All of the parameters can be used together, but start_datetime and end_datetime must be combined; if one is used, then so must the other. If start_datetime and end_datetime are used with any other parameter, then the log results are in chronological order of record insertion.

Log requests are limited to around 100k in size. For requests greater than 100k in size, use the Pipeline export_logs() method.

Logs include the following standard datasets:

ParameterTypeDescription
timeDateTimeThe DateTime the inference request was made.
in.{variable} The input(s) for the inference request. Each input is listed as in.{variable_name}. For example, in.text_input, in.square_foot, in.number_of_rooms, etc.
out The outputs(s) for the inference request, based on the ML model’s outputs. Each output is listed as out.{variable_name}. For example, out.maximum_offer_price, out.minimum_asking_price, out.trade_in_value, etc.
check_failuresIntHow many validation checks were triggered by the inference. For more information, see Anomaly Testing
out_{model_name}.{variable} Only returned when using Pipeline Shadow Deployments. For each model in the shadow deploy step, their output is listed in the format out_{model_name}.{variable}. For example, out_shadow_model_xgb.maximum_offer_price, out_shadow_model_xgb.minimum_asking_price, out_shadow_model_xgb.trade_in_value, etc.
out._model_split Only returned when using A/B Testing, used to display the model_name, model_version, and model_sha of the model used for the inference.

In this example, the last 50 logs to the pipeline mainpipeline between two sample dates. In this case, all of the time column fields are the same since the inference request was sent as a batch.

logs = mainpipeline.logs(start_datetime=date_start, end_datetime=date_end)

display(len(logs))
display(logs)

538
 timein.tensorout.variablecheck_failures
02023-04-24 18:09:33.970[4.0, 2.5, 2900.0, 5505.0, 2.0, 0.0, 0.0, 3.0, 8.0, 2900.0, 0.0, 47.6063, -122.02, 2970.0, 5251.0, 12.0, 0.0, 0.0][718013.75]0
12023-04-24 18:09:33.970[2.0, 2.5, 2170.0, 6361.0, 1.0, 0.0, 2.0, 3.0, 8.0, 2170.0, 0.0, 47.7109, -122.017, 2310.0, 7419.0, 6.0, 0.0, 0.0][615094.56]0
22023-04-24 18:09:33.970[3.0, 2.5, 1300.0, 812.0, 2.0, 0.0, 0.0, 3.0, 8.0, 880.0, 420.0, 47.5893, -122.317, 1300.0, 824.0, 6.0, 0.0, 0.0][448627.72]0
32023-04-24 18:09:33.970[4.0, 2.5, 2500.0, 8540.0, 2.0, 0.0, 0.0, 3.0, 9.0, 2500.0, 0.0, 47.5759, -121.994, 2560.0, 8475.0, 24.0, 0.0, 0.0][758714.2]0
42023-04-24 18:09:33.970[3.0, 1.75, 2200.0, 11520.0, 1.0, 0.0, 0.0, 4.0, 7.0, 2200.0, 0.0, 47.7659, -122.341, 1690.0, 8038.0, 62.0, 0.0, 0.0][513264.7]0
5332023-04-24 18:09:33.970[3.0, 2.5, 1750.0, 7208.0, 2.0, 0.0, 0.0, 3.0, 8.0, 1750.0, 0.0, 47.4315, -122.192, 2050.0, 7524.0, 20.0, 0.0, 0.0][311909.6]0
5342023-04-24 18:09:33.970[5.0, 1.75, 2330.0, 6450.0, 1.0, 0.0, 1.0, 3.0, 8.0, 1330.0, 1000.0, 47.4959, -122.367, 2330.0, 8258.0, 57.0, 0.0, 0.0][448720.28]0
5352023-04-24 18:09:33.970[4.0, 3.5, 4460.0, 16271.0, 2.0, 0.0, 2.0, 3.0, 11.0, 4460.0, 0.0, 47.5862, -121.97, 4540.0, 17122.0, 13.0, 0.0, 0.0][1208638.0]0
5362023-04-24 18:09:33.970[3.0, 2.75, 3010.0, 1842.0, 2.0, 0.0, 0.0, 3.0, 9.0, 3010.0, 0.0, 47.5836, -121.994, 2950.0, 4200.0, 3.0, 0.0, 0.0][795841.06]0
5372023-04-24 18:09:33.970[2.0, 1.5, 1780.0, 4750.0, 1.0, 0.0, 0.0, 4.0, 7.0, 1080.0, 700.0, 47.6859, -122.395, 1690.0, 5962.0, 67.0, 0.0, 0.0][558463.3]0

538 rows × 4 columns

Metadata Requests Restrictions

The following restrictions are in place when requesting the datasets metadata or metadata.elapsed.

Standard Pipeline Steps

For the following Pipeline steps, metadata or metadata.elapsed must be requested with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["*", "metadata.elapsed"])

Effected pipeline steps:

  • add_model_step
  • replace_with_model_step

Testing Pipeline Steps

For the following Pipeline steps, meta or metadata.elapsed can not be included with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["metadata.elapsed"])

Effected pipeline steps:

  • add_random_split
  • replace_with_random_split
  • add_shadow_deploy
  • replace_with_shadow_deploy

Export Pipeline Logs as File

The Pipeline method export_logs returns the Pipeline records as either by default pandas records in Newline Delimited JSON (NDJSON) format, or an Apache Arrow table files.

The output files are by default stores in the current working directory ./logs with the default prefix as the {pipeline name}-1, {pipeline name}-2, etc.

The suffix by default will be json for pandas records in Newline Delimited JSON (NDJSON) format files. Logs are segmented by pipeline version across the limit, data_size_limit, or start_datetime and end_datetime parameters.

By default, logs are returned as a pandas record in NDJSON in reverse chronological order of insertion, with the most recent log insertions displayed first.

Pipeline logs are segmented by pipeline versions. For example, if a new model step is added to a pipeline, a model swapped out of a pipeline step, etc - this generated a new pipeline version.

This command takes the following parameters.

ParameterTypeDescription
directoryString (Optional) (Default: logs)Logs are exported to a file from current working directory to directory.
file_prefixString (Optional) (Default: The name of the pipeline)The name of the exported files. By default, this will the name of the pipeline and is segmented by pipeline version between the limits or the start and end period. For example: ’logpipeline-1.json`, etc.
data_size_limitString (Optional) (Default: 100MB)The maximum size for the exported data in bytes. Note that file size is approximate to the request; a request of 10MiB may return 10.3MB of data. The fields are in the format “{size as number} {unit value}”, and can include a space so “10 MiB” and “10MiB” are the same. The accepted unit values are:
  • KiB (for KiloBytes)
  • MiB (for MegaBytes)
  • GiB (for GigaBytes)
  • TiB (for TeraBytes)
limitInt (Optional) (Default: 100)Limits how many log records to display. Defaults to 100. If there are more pipeline logs than are being displayed, the Warning message Pipeline log record limit exceeded will be displayed. For example, if 100 log files were requested and there are a total of 1,000, the warning message will be displayed.
start_datetime and end_datetimeDateTime (Optional)Limits logs to all logs between the start_datetime and end_datetime DateTime parameters. These comply with the Python datetime library for formats such as:
  • datetime.datetime.now()
  • datetime.datetime(2023, 3, 28, 14, 25, 51, 660058, tzinfo=tzutc()) (March 28, 2023 14:25:51:660058 UTC time zone)

Both parameters must be provided. Submitting a logs() request with only start_datetime or end_datetime will generate an exception.
If start_datetime and end_datetime are provided as parameters even with any other parameter, then the records are returned in chronological order, with the oldest record displayed first.
filenameString (Required)The file name to save the log file to. The requesting user must have write access to the file location. The requesting user must have write permission to the file location, and the target directory for the file must already exist. For example: If the file is set to /var/wallaroo/logs/pipeline.json, then the directory /var/wallaroo/logs must already exist. Otherwise file names are only limited by standard file naming rules for the target environment.
datasetList (OPTIONAL)The datasets to be returned. The datasets available are:
  • *: Default. This translates to ["time", "in", "out", "check_failures"].
  • time: The DateTime of the inference request.
  • in: All inputs listed as in_{variable_name}.
  • out: All outputs listed as out_variable_name.
  • check_failures: Flags whether an Anomaly or Validation Check was triggered. 0 indicates no checks were triggered, 1 or greater indicates a check was triggered.
  • meta: Returns metadata. IMPORTANT NOTE: See Metadata RequestsRestrictions for specifications on how this dataset can be used with other datasets.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
    • Returns in the metadata.last_model field:
      • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
    • Returns in the metadata.pipeline_version field:
      • The pipeline version as a UUID value.
  • metadata.elapsed: IMPORTANT NOTE: See Metadata Requests Restrictionsfor specifications on how this dataset can be used with other datasets.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
dataset_excludeList[String] (OPTIONAL)Exclude specified datasets.
dataset_separatorSequence[[String], string] (OPTIONAL)If set to “.”, return dataset will be flattened.
arrowBoolean (Optional)Defaults to False. If arrow=True, then the logs are returned as an Apache Arrow table. If arrow=False, then the logs are returned as pandas record in NDJSON that can be imported into a pandas DataFrame.

All of the parameters can be used together, but start_datetime and end_datetime must be combined; if one is used, then so must the other. If start_datetime and end_datetime are used with any other parameter, then the log results are in chronological order of record insertion.

File sizes are limited to around 10 MB in size. If the requested log file is greater than 10 MB, a Warning will be displayed indicating the end date of the log file downloaded so the request can be adjusted to capture the requested log files.

In this example, the log files are saved as both Pandas DataFrame and Apache Arrow.

# Save the DataFrame version of the log file

mainpipeline.export_logs()
display(os.listdir('./logs'))

mainpipeline.export_logs(arrow=True)
display(os.listdir('./logs'))

    Warning: There are more logs available. Please set a larger limit to export more data.
    

    ['pipeline-logs-1.json']

    Warning: There are more logs available. Please set a larger limit to export more data.
    

    ['pipeline-logs-1.arrow', 'pipeline-logs-1.json']

Pipeline Log Storage

Pipeline logs have a set allocation of storage space and data requirements.

Pipeline Log Storage Warnings

To prevent storage and performance issues, inference result data may be dropped from pipeline logs by the following standards:

  • Columns are progressively removed from the row starting with the largest input data size and working to the smallest, then the same for outputs.

For example, Computer Vision ML Models typically have large inputs and output values - a single pandas DataFrame inference request may be over 13 MB in size, and the inference results nearly as large. To prevent pipeline log storage issues, the input may be dropped from the pipeline logs, and if additional space is needed, the inference outputs would follow. The time column is preserved.

If a pipeline has dropped columns for space purposes, this will be displayed when a log request is made with the following warning, with {columns} replaced with the dropped columns.

The inference log is above the allowable limit and the following columns may have been suppressed for various rows in the logs: {columns}. To review the dropped columns for an individual inferences suppressed data, include dataset=["metadata"] in the log request.

Review Dropped Columns

To review what columns are dropped from pipeline logs for storage reasons, include the dataset metadata in the request to view the column metadata.dropped. This metadata field displays a List of any columns dropped from the pipeline logs.

For example:

metadatalogs = mainpipeline.logs(dataset=["time", "metadata"])
 timemetadata.dropped
02023-07-0615:47:03.673
12023-07-0615:47:03.673
22023-07-0615:47:03.673
32023-07-0615:47:03.673
42023-07-0615:47:03.673
952023-07-0615:47:03.673
962023-07-0615:47:03.673
972023-07-0615:47:03.673
982023-07-0615:47:03.673
992023-07-0615:47:03.673

Suppressed Data Elements

Data elements that do not fit the supported data types below, such as None or Null values, are not supported in pipeline logs. When present, undefined data will be written in the place of the null value, typically zeroes. Any null list values will present an empty list.