Inference

How to perform inferences through a Wallaroo deployed ML model.

Wallaroo deployed ML models process inference requests via the following methods.

  • Wallaroo SDK: Wallaroo SDK provides methods for performing inference requests through ML Models deployed through the Wallaroo Ops Center.
  • API Requests: API requests through the deployed ML models inference endpoints for ML models deployed through Wallaroo Ops or Run Anywhere.

Inference Requests Via the Wallaroo SDK

Inferences are performed on deployed pipelines. This submits data to the pipeline, where it is processed through each of the pipeline’s steps with the output of the previous step providing the input for the next step. The final step will then output the result of all of the pipeline’s steps.

  • Inputs are either sent one of the following:

Apache Arrow is the recommended method of data inputs for inferences. Wallaroo inference data is based on Apache Arrow, which will return the fastest inference results and smaller data transfer amounts on average than JSON or DataFrame tables. Arrow tables also specify the data types used in their schema, insuring that the data sent and receives are exactly what is required. Using pandas DataFrame requires inferring the data type which may lead to data type mismatch issues.

For a complete example of using the Wallaroo SDK for inferencing, see the Wallaroo SDK Inference Tutorial.

Run Inference through Local Variable

The pipeline infer(data, timeout, dataset, dataset_exclude, dataset_separator) method performs an inference as defined by the pipeline steps and takes the following arguments:

  • data (REQUIRED): The data submitted to the pipeline for inference. The following data inputs are supported:
    • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
    • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
  • timeout (OPTIONAL): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds.
  • dataset (OPTIONAL): The datasets to be returned. The datasets available are:
    • *: Default. This translates to ["time", "in", "out", "anomaly"].
    • time: The DateTime of the inference request.
    • in: All inputs listed as in_{variable_name}.
    • out: All outputs listed as out_variable_name.
    • anomaly: Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.
      • count The number of anomalies detected as an integer. Each pipeline validation the returns True adds to the number of anomalies detected.
      • {validation name}: Each pipeline validation added to the pipeline is returned as the field anomaly.{validation name}. Validations that return True indicate an anomaly detected based on the validation expression, while False indicates no anomaly found for the validation.
    • metadata: IMPORTANT NOTE: See Metadata Requests Restrictions for specifications on how to the metadata dataset requests in combination with other fields.
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
      • Returns in the metadata.last_model field:
        • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
      • Returns in the metadata.pipeline_version field:
        • The pipeline version as a UUID value.
      • Returns in the metadata.partition:
    • metadata.elapsed: See Metadata Requests Restrictions for specifications on how to use meta or metadata dataset requests in combination with other fields.
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
  • dataset_exclude (OPTIONAL): Allows users to exclude parts of the dataset.
  • dataset_separator (OPTIONAL): Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.

Outputs of the inference are based on the model’s outputs as out.{model_output}. This model only has one output - dense_1, which is listed in the out.dense_1 column. If the model has multiple outputs, they would be listed as out.output1, out.output2, etc.

pipeline.infer metadata.elapsed Format

The inference result field metadata.elapsed format changes depending on the input type.

  • pandas DataFrame: If the inference request is in pandas Record format, metadata.elapsed is returned as an int.
  • Apache Arrow: If the inference request is an Apache Arrow table, metadata.elapsed is returned as pyarrow.Int64.

The following code is provided to convert metadata.elapsed into the same format for consistency.

import pyarrow as pa

if input_type == "json":
            parse_elapsed += results["metadata.elapsed"][0][0]
            inference_elapsed += results["metadata.elapsed"][0][1]
        elif input_type == "arrow":
            parse_elapsed += results["metadata.elapsed"][0][0].as_py()
            inference_elapsed += results["metadata.elapsed"][0][1].as_py()
        else:
            assert False, f"Bad input type {input_type}"
pipeline.infer metadata.elapsed Format Examples

The following demonstrates the differences between the metadata.elasped field from a DataFrame based inference request vs an Apache Arrow table inference request.

  • Apache Arrow based inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")

# unconverted raw data from Arrow table inference

# time to parse input data

display(result_arrow["metadata.elapsed"][0][0])

# time to inference from parsed input data

display(result_arrow["metadata.elapsed"][0][1])

<pyarrow.UInt64Scalar: 1253219>

<pyarrow.UInt64Scalar: 1275320>

parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')

'Time to parse input data: 1253219'

display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')

'Time to inference from parsed data: 1253219'
  • pandas DataFrame based inference request
# no conversion needed for pandas DataFrame

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")

display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")

'Time to parse input data: 51879691'

'Time to inference from parsed data: 2310435'

Run Inference Through Local Variable Example

The following example is an inference request using an Apache Arrow table. The inference result is returned as an Apache Arrow table, which is then converted into a Pandas DataFrame and a Polars DataFrame, with the results filtered based on results greater than 0.75.

result = ccfraud_pipeline.infer(ccfraud_input_1k_arrow_table)

display(result)

pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
  child 0, item: float
out.dense_1: list<inner: float not null> not null
  child 0, inner: float not null
anomaly.count: int8
----
time: [[2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,...,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562]]
in.tensor: [[[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],...,[0.49511018,-0.24993694,0.4553345,0.92427504,-0.36435103,...,1.1117147,-0.566654,0.12122019,0.06676402,0.6583282],[0.61188054,0.1726081,0.43105456,0.50321484,-0.27466634,...,0.30260187,0.081211455,-0.15578508,0.017189292,-0.7236631]]]
out.dense_1: [[[0.99300325],[0.99300325],...,[0.0008533001],[0.0012498498]]]
anomaly.count: [[0,0,0,0,0,...,0,0,0,0,0]]
import pyarrow as pa

list = [0.75]

outputs =  result.to_pandas()
# display(outputs)
filter = [elt[0] > 0.75 for elt in outputs['out.dense_1']]
outputs = outputs.loc[filter]
display(outputs)
&nbsptimein.tensorout.dense_1anomaly.count
02023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
12023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
22023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
32023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
1612023-03-20 18:55:09.562[-9.716793, 9.174981, -14.450761, 8.653825, -11.039951, 0.6602411, -22.825525, -9.919395, -8.064324, -16.737926, 4.852197, -12.563343, -1.0762653, -7.524591, -3.2938414, -9.62102, -15.6501045, -7.089741, 1.7687134, 5.044906, -11.365625, 4.5987034, 4.4777045, 0.31702697, -2.2731977, 0.07944675, -10.052058, -2.024108, -1.0611985][1.0]0
9412023-03-20 18:55:09.562[-0.50492376, 1.9348029, -3.4217603, 2.2165704, -0.6545315, -1.9004827, -1.6786858, 0.5380051, -2.7229102, -5.265194, 3.504164, -5.4661765, 0.68954825, -8.725291, 2.0267954, -5.4717045, -4.9123807, -1.6131229, 3.8021576, 1.3881834, 1.0676425, 0.28200775, -0.30759808, -0.48498034, 0.9507336, 1.5118006, 1.6385275, 1.072455, 0.7959132][0.9873102]0
import polars as pl

outputs =  pl.from_arrow(result)

display(outputs.filter(pl.col("out.dense_1").apply(lambda x: x[0]) > 0.75))
timein.tensorout.dense_1anomaly.count
datetime[ms]list[f32]list[f32]i8
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-9.716793, 9.174981, … -1.061198][1.0]0
2023-03-20 18:55:09.562[-0.504924, 1.934803, … 0.795913][0.98731]0

Metadata Requests Restrictions

The following restrictions are in place when requesting the datasets metadata or metadata.elapsed.

Standard Pipeline Steps

For the following Pipeline steps, metadata or metadata.elapsed must be requested with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["*", "metadata.elapsed"])

Effected pipeline steps:

  • add_model_step
  • replace_with_model_step
Testing Pipeline Steps

For the following Pipeline steps, meta or metadata.elapsed can not be included with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["metadata.elapsed"])

Effected pipeline steps:

  • add_random_split
  • replace_with_random_split
  • add_shadow_deploy
  • replace_with_shadow_deploy

Numpy Arrays as Inputs

Numpy arrays can be submitted as an input by containing it within a DataFrame. In this example, the input column is tensor, but can whatever the model expects.

dataframedata = pd.DataFrame({"tensor":[npArray]})

This bypasses the need to convert the npArray to a List - the object itself can be embedded into the DataFrame table and submitted. For this example, a DataFrame with the column tensor that contains a numpy array will be submitted as an inference, and from the return only the column out.2519 will be displayed.

infResults = pipeline.infer(dataframedata, dataset=["*", "metadata.elapsed"])
display(infResults.loc[0]["out.2519"])

[44,
 44,
 44,
 44,
 82,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 84,
 84,
 44,
 84,
 44,
 44,
 44,
 61,
 44,
 86,
 44,
 44]

Run Inference From A File

To submit a data file directly to a pipeline, use the pipeline infer_from_file(data, timeout, dataset, dataset_exclude, dataset_separator) method. This performs an inference as defined by the pipeline steps and takes the following arguments:

  • data (REQUIRED): The name of the file submitted to the pipeline for inference.
    • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
    • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
  • timeout (OPTIONAL): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds. Inferences sent in a batch rather than individual inference requests are processed faster.
  • dataset (OPTIONAL): The datasets to be returned. By default this is set to ["*"] which returns, [“time”, “in”, “out”, “anomaly”].
  • dataset (OPTIONAL): The datasets to be returned. The datasets available are:
    • *: Default. This translates to ["time", "in", "out", "anomaly"].
    • time: The DateTime of the inference request.
    • in: All inputs listed as in_{variable_name}.
    • out: All outputs listed as out_variable_name.
    • anomaly: Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.
      • count The number of anomalies detected as an integer. Each pipeline validation the returns True adds to the number of anomalies detected.
      • {validation name}: Each pipeline validation added to the pipeline is returned as the field anomaly.{validation name}. Validations that return True indicate an anomaly detected based on the validation expression, while False indicates no anomaly found for the validation.
    • meta:
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
    • metadata.elapsed:
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
      • Returns in the metadata.last_model field:
        • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
  • dataset_exclude (OPTIONAL): Allows users to exclude parts of the dataset.
  • dataset_separator (OPTIONAL): Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.

pipeline.infer_from_file metadata.elapsed Format

The inference result field metadata.elapsed format changes depending on the input type.

  • pandas DataFrame: If the inference request is in pandas Record format, metadata.elapsed is returned as an int.
  • Apache Arrow: If the inference request is an Apache Arrow table, metadata.elapsed is returned as pyarrow.Int64.

The following code is provided to convert metadata.elapsed into the same format for consistency.

import pyarrow as pa

if input_type == "json":
            parse_elapsed += results["metadata.elapsed"][0][0]
            inference_elapsed += results["metadata.elapsed"][0][1]
        elif input_type == "arrow":
            parse_elapsed += results["metadata.elapsed"][0][0].as_py()
            inference_elapsed += results["metadata.elapsed"][0][1].as_py()
        else:
            assert False, f"Bad input type {input_type}"
pipeline.infer_from_file metadata.elapsed Format Examples

The following demonstrates the differences between the metadata.elasped field from a DataFrame based inference request vs an Apache Arrow table inference request.

  • Apache Arrow based inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")

# unconverted raw data from Arrow table inference

# time to parse input data

display(result_arrow["metadata.elapsed"][0][0])

# time to inference from parsed input data

display(result_arrow["metadata.elapsed"][0][1])

<pyarrow.UInt64Scalar: 1253219>

<pyarrow.UInt64Scalar: 1275320>

parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')

'Time to parse input data: 1253219'

display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')

'Time to inference from parsed data: 1253219'
  • pandas DataFrame based inference request
# no conversion needed for pandas DataFrame

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")

display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")

'Time to parse input data: 51879691'

'Time to inference from parsed data: 2310435'

Run Inference From A Example

In this example, an inference of 50K inferences as an Apache Arrow file will be submitted to a model trained for reviewing IMDB reviews, and the first 5 results displayed.

results = imdb_pipeline.infer_from_file('./data/test_data_50K.arrow')
import polars as pl

outputs =  pl.from_arrow(results)
display(outputs.head(5))

shape: (5, 4)
timein.tensorout.dense_1anomaly.count
datetime[ms]list[f32]list[f32]i8
2023-03-20 20:53:50.170[11.0, 6.0, … 0.0][0.898019]0
2023-03-20 20:53:50.170[54.0, 548.0, … 20.0][0.056597]0
2023-03-20 20:53:50.170[1.0, 9259.0, … 1.0][0.92608]0
2023-03-20 20:53:50.170[10.0, 25.0, … 0.0][0.926919]0
2023-03-20 20:53:50.170[10.0, 37.0, … 0.0][0.661858]0

In this example, an inference will be submitted to the ccfraud_pipeline with the file smoke_test.df.json, a DataFrame formatted JSON file.

result = ccfraud_pipeline.infer_from_file('./data/smoke_test.df.json')
 timein.tensorout.dense_1anomaly.count
02023-02-15 23:07:07.497[1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756][0.0014974177]0

Parallel Inferences

Wallaroo pipelines allow for multiple replicas of the pipeline and models to be deployed. This allows for parallel inferences to increase the speed of multiple inferences requests. Wallaroo does so by scaling multiple replicas of the deployed pipeline and models based on the pipeline configuration. See Pipeline Deployment Configuration.

Parallel Inference Use Cases

Parallel inferences are most useful when:

  • Inference request inputs are extremely large - for example, greater than 4 GB. Parallen inference requests allow that request to be split into more manageable sizes and submitted in one request, with each segment split as a separate inference request automatically.
  • Inference inputs come from different data sources. This allows organizations to query data from different sources, add each query result to the list, then submit the entire list as one request and receive the results fast.
  • Image processing, where the entire image is of a extreme size and resolution where submitting the entire image requires large memory and bandwidth. The image can be resolved into separate pieces, then all the pieces submitted in one requests to allow parallelization to examine each individual piece and return the results faster than analyzing the entire large image.

It is highly recommended that the data elements included in the parallel inference List are all of the same data type. For example: all of the elements of the list should be a pandas DataFrame OR all an Apache Arrow table. This makes processing the returned information easier rather than trying to parse what type of data is received.

For example, if the parallel inference input list should be in the format:

 Data Type
0DataFrame
1DataFrame
2DataFrame
3DataFrame

And not:

 Data Type
0DataFrame
1Apache Arrow
2DataFrame
3Apache Arrow

Parallel Inferences Method

The pipeline parallel_infer(tensor_list, timeout, num_parallel, retries) asynchronous method performs an inference as defined by the pipeline steps and takes the following arguments:

  • tensor_list (REQUIRED List): The data submitted to the pipeline for inference as a List of the supported data types:
    • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
    • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
  • timeout (OPTIONAL int): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
  • num_parallel (OPTIONAL int): The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
  • retries (OPTIONAL int): The number of retries per inference request submitted.

parallel_infer is an asynchronous method that returns the Python callback list of tasks. Calling parallel_infer should be called with the await keyword to retrieve the callback results.

For example, the following will split a single pandas DataFrame table into rows, and submit each row as a separate DataFrame table. Once complete, each separate table is submitted via parallel_infer, and the results collected together as a new List. For this example, there are 4 replicas set in the pipeline deployment configuration.

dataset = []
for index, row in test_data.head(200).iterrows():
    dataset.append(row.to_frame('text_input').reset_index())

# we have a list of 200 dataframes - run as in inference
parallel_results = await pipeline.parallel_infer(dataset, timeout=10, num_parallel=8, retries=1)

Parallel Inference Returns

The await pipeline.parallel_infer method asynchronously returns a List of inference results. This includes how inference requests match the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects. For example: a parallel inference request with 3 DataFrame tables in the list will return a list with 3 DataFrame tables.

Inference failures are tied to the object in the List that caused the failure. For example, a List with [dataframe1, dataframe2, dataframe3] where dataframe2 is malformed, then the List returned from await pipeline.parallel_infer would be [some inference result, error inference result, some inference result]. Results are returned in the same order of the data submitted.

Output Formats

DataFrame and Arrow

Output formats are based on the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects.

The default columns returned are:

  • time: The DateTime of the inference request.
  • in: The input data.
  • out: The output data. Outputs of the inference are based on the model’s outputs as out.{model_output}. This model only has one output - dense_1, which is listed in the out.dense_1 column. If the model has multiple outputs, they would be listed as out.{outputname1}, out.{outputname2}, etc.
  • anomaly: Whether any Pipeline validation parameters were triggered. Includes the following fields.
    • anomaly.count: Any anomalies detected from validations.
    • anomaly.{validation}: The validation that triggered the anomaly detection and whether it is True (indicating an anomaly was detected) or False. For more details, see Wallaroo SDK Essentials Guide: Anomaly Detection

Columns returned are controlled by the dataset_exclude array parameter, which specifies which output columns to ignore. For example, if a model outputs the columns out.rambo, out.main, out.glibnar, using the parameter dataset_exclude=["out.rambo", "out.glibnar"] will exclude those columns from the output.

Inference Requests via API Requests

Retrieve Token

There are two methods of retrieving the JWT token used to authenticate to the Wallaroo instance’s API service:

  • Wallaroo SDK. This method requires a Wallaroo based user.
  • API Client Secret. This is the recommended method as it is user independent. It allows any valid user to make an inference request.

This tutorial will use the Wallaroo SDK method for convenience with environmental variables for a seamless login without browser validation. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.

API Request Methods

All Wallaroo API endpoints follow the format:

  • https://$URLPREFIX.api.$URLSUFFIX/v1/api$COMMAND

Where $COMMAND is the specific endpoint. For example, for the command to list of workspaces in the Wallaroo instance would use the above format based on these settings:

  • $URLPREFIX: smooth-moose-1617
  • $URLSUFFIX: example.wallaroo.ai
  • $COMMAND: /workspaces/list

This would create the following API endpoint:

  • https://smooth-moose-1617.api.example.wallaroo.ai/v1/api/workspaces/list

Connect to Wallaroo

For this example, a connection to the Wallaroo SDK is used. This will be used to retrieve the JWT token for the MLOps API calls.

This example will store the user’s credentials into the file ./creds.json which contains the following:

{
    "username": "{Connecting User's Username}", 
    "password": "{Connecting User's Password}", 
    "email": "{Connecting User's Email Address}"
}

Replace the username, password, and email fields with the user account connecting to the Wallaroo instance. This allows a seamless connection to the Wallaroo instance and bypasses the standard browser based confirmation link. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.

Update wallarooPrefix = "YOUR PREFIX." and wallarooSuffix = "YOUR SUFFIX" to match the Wallaroo instance used for this demonstration. Note the . is part of the prefix. If there is no prefix, then wallarooPrefix = ""

import wallaroo
from wallaroo.object import EntityNotFoundError

import pandas as pd
import os
import base64

import pyarrow as pa

import requests
from requests.auth import HTTPBasicAuth

# Used to create unique workspace and pipeline names
import string
import random

# make a random 4 character prefix
suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))
display(suffix)

import json

# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)
'atwc'
# Retrieve the login credentials.
os.environ["WALLAROO_SDK_CREDENTIALS"] = './creds.json.example'

# wl = wallaroo.Client(auth_type="user_password")

# Client connection from local Wallaroo instance
wallarooPrefix = ""
wallarooSuffix = "autoscale-uat-ee.wallaroo.dev"

wl = wallaroo.Client(api_endpoint=f"https://{wallarooPrefix}api.{wallarooSuffix}", 
                    auth_endpoint=f"https://{wallarooPrefix}keycloak.{wallarooSuffix}", 
                    auth_type="user_password")
wallarooPrefix = "YOUR PREFIX."
wallarooPrefix = "YOUR SUFFIX"

wallarooPrefix = ""
wallarooSuffix = "autoscale-uat-ee.wallaroo.dev"

APIURL=f"https://{wallarooPrefix}api.{wallarooSuffix}"
APIURL
'https://api.autoscale-uat-ee.wallaroo.dev'

Retrieve the JWT Token

As mentioned earlier, there are multiple methods of authenticating to the Wallaroo instance for MLOps API calls. This tutorial will use the Wallaroo SDK method Wallaroo Client wl.auth.auth_header() method, extracting the token from the response.

Reference: MLOps API Retrieve Token Through Wallaroo SDK

# Retrieve the token
headers = wl.auth.auth_header()
display(headers)
{'Authorization': 'Bearer exampleabcdefg'}

Get External Inference URL

The API command /admin/get_pipeline_external_url retrieves the external inference URL for a specific pipeline in a workspace.

  • Parameters
    • workspace_id (REQUIRED integer): The workspace integer id.
    • pipeline_name (REQUIRED string): The name of the pipeline.

In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace will have it’s workspace id stored and used for the /admin/get_pipeline_external_url request with the pipeline urlpipeline.

The External Inference URL is stored a variable for the next step.

Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Get External Inference URL

# Retrieve the token
headers = wl.auth.auth_header()

# set Content-Type type
headers['Content-Type']='application/json'

## Retrieve the pipeline's External Inference URL

apiRequest = f"{APIURL}/v1/api/admin/get_pipeline_external_url"

data = {
    "workspace_id": workspaceId,
    "pipeline_name": pipeline_name
}

response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
deployurl = response['url']
deployurl
'https://api.autoscale-uat-ee.wallaroo.dev/v1/api/pipelines/infer/vsnaapiinferenceexamplepipeline-260/vsnaapiinferenceexamplepipeline'

Perform Inference via API

HTTP Headers

The following headers are required for connecting the the Pipeline Deployment URL:

  • Authorization: This requires the JWT token in the format 'Bearer ' + token. For example:

    Authorization: Bearer abcdefg==
    
  • Content-Type:

    • For DataFrame formatted JSON:

      Content-Type:application/json; format=pandas-records
      
    • For Arrow binary files, the Content-Type is application/vnd.apache.arrow.file.

      Content-Type:application/vnd.apache.arrow.file
      
  • Accept

    • Accept: application/json; format=pandas-records: The inference result is returned as a JSON in pandas Record format.
    • Accept: application/vnd.apache.arrow.file: The inference result is returned as a binary in Apache Arrow format.

The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.

For this example, the externalUrl retrieved through the Get External Inference URL is used to submit a single inference request through the data file data-1.json.

Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Perform Inference Through External URL

Perform Inference via API Example: pandas Record

The following example demonstrates performing an inference using a pandas Record format input.

# Retrieve the token
headers = wl.auth.auth_header()

# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'

## Inference through external URL using dataframe

# retrieve the json data to submit
data = [
    {
        "tensor":[
            1.0678324729,
            0.2177810266,
            -1.7115145262,
            0.682285721,
            1.0138553067,
            -0.4335000013,
            0.7395859437,
            -0.2882839595,
            -0.447262688,
            0.5146124988,
            0.3791316964,
            0.5190619748,
            -0.4904593222,
            1.1656456469,
            -0.9776307444,
            -0.6322198963,
            -0.6891477694,
            0.1783317857,
            0.1397992467,
            -0.3554220649,
            0.4394217877,
            1.4588397512,
            -0.3886829615,
            0.4353492889,
            1.7420053483,
            -0.4434654615,
            -0.1515747891,
            -0.2668451725,
            -1.4549617756
        ]
    }
]

# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
    requests.post(
        deployurl, 
        json=data, 
        headers=headers)
        .json()
    )

display(response.loc[:,["time", "out"]])
timeout
01688750664105{'dense_1': [0.0014974177]}

Perform Inference via API Example: Apache Arrow

The following example demonstrates performing an inference using an Apache Arrow table as the input. The response is transformed into a pandas DataFrame for easier display.

# Retrieve the token
headers = wl.auth.auth_header()

# set Content-Type type
headers['Content-Type']='application/vnd.apache.arrow.file'

# set accept as apache arrow table
headers['Accept']="application/vnd.apache.arrow.file"

# Submit arrow file
dataFile="./data/cc_data_10k.arrow"

data = open(dataFile,'rb').read()

response = requests.post(
                    deployurl, 
                    headers=headers, 
                    data=data, 
                    verify=True
                )

# Arrow table is retrieved 
with pa.ipc.open_file(response.content) as reader:
    arrow_table = reader.read_all()

# convert to Polars DataFrame and display the first 5 rows
display(arrow_table.to_pandas().head(5).loc[:,["time", "out"]])
timeout
01688750664889{'dense_1': [0.99300325]}
11688750664889{'dense_1': [0.99300325]}
21688750664889{'dense_1': [0.99300325]}
31688750664889{'dense_1': [0.99300325]}
41688750664889{'dense_1': [0.0010916889]}