Wallaroo SDK Essentials Guide: Inference Management

How to use Wallaroo SDK for inferences

Table of Contents

Inference Requests Via the Wallaroo SDK

Inferences are performed on deployed pipelines. This submits data to the pipeline, where it is processed through each of the pipeline’s steps with the output of the previous step providing the input for the next step. The final step will then output the result of all of the pipeline’s steps.

  • Inputs are either sent one of the following:

Apache Arrow is the recommended method of data inputs for inferences. Wallaroo inference data is based on Apache Arrow, which will return the fastest inference results and smaller data transfer amounts on average than JSON or DataFrame tables. Arrow tables also specify the data types used in their schema, insuring that the data sent and receives are exactly what is required. Using pandas DataFrame requires inferring the data type which may lead to data type mismatch issues.

For a complete example of using the Wallaroo SDK for inferencing, see the Wallaroo SDK Inference Tutorial.

Inference

The method wallaroo.pipeline.Pipeline.infer(data, timeout, dataset, dataset_exclude, dataset_separator) method performs an inference as defined by the pipeline steps.

Inference Parameters

wallaroo.pipeline.Pipeline.infer takes the following parameters:

ParameterTypeDescription
tensorpandas.DataFrame or Apache Arrow (Preferred) (REQUIRED)The data submitted to the pipeline for inference. The following data inputs are supported:
  • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
  • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
timeout(Integer OR Float) (OPTIONAL)A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds.
datasetList[String] (OPTIONAL)The datasets to be returned. The datasets available are:
  • *: Default.This translates to ["time", "in", "out", "anomaly"].
  • time: The DateTime of the inference request.
  • in: All inputs listed as in.{variable_name}.
  • out: All outputs listed as out.{variable_name}.
  • anomaly: Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.
    • count The number of anomalies detected as an integer. Each pipeline validation the returns True adds to the number of anomalies detected.
    • {validation name}: Each pipeline validation added to the pipeline is returned as the field anomaly.{validation name}. Validations that return True indicate an anomaly detected based on the validation expression, while False indicates no anomaly found for the validation.
  • metadata: IMPORTANT NOTE: See Metadata Requests Restrictions for specifications on how to the metadata dataset requests in combination with other fields.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
    • Returns in the metadata.last_model field:
      • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
      • Returns in the metadata.pipeline_version field:
        • The pipeline version as a UUID value.
    • Returns in the metadata.partition:
    • Returns in the metadata.partition:
    • metadata.elapsed: See Metadata Requests Restrictions for specifications on how to use meta or metadata dataset requests in combination with other fields.
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
dataset_excludeList[String] (OPTIONAL)Allows users to exclude parts of the dataset.
dataset_separatorList[String] (OPTIONAL)Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.

Inference Returns

  • If the tensor input is a pandas DataFrame, returns a pandas DataFrame.
  • If the tensor input is an Apache Arrow Table, returns an Apache Arrow table.

The following datasets are returned by default, modified by the dataset and dataset_exclude parameters.

FieldDescription
timeThe DateTime of the inference request.
inAll inputs listed as in.{variable_name}
outAll outputs listed as out.{variable_name}.
anomalyFlags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.

pipeline.infer metadata.elapsed Format

The inference result field metadata.elapsed format changes depending on the input type.

  • pandas DataFrame: If the inference request is in pandas Record format, metadata.elapsed is returned as an int.
  • Apache Arrow: If the inference request is an Apache Arrow table, metadata.elapsed is returned as pyarrow.Int64.

The following code is provided to convert metadata.elapsed into the same format for consistency.

import pyarrow as pa

if input_type == "json":
            parse_elapsed += results["metadata.elapsed"][0][0]
            inference_elapsed += results["metadata.elapsed"][0][1]
        elif input_type == "arrow":
            parse_elapsed += results["metadata.elapsed"][0][0].as_py()
            inference_elapsed += results["metadata.elapsed"][0][1].as_py()
        else:
            assert False, f"Bad input type {input_type}"
pipeline.infer metadata.elapsed Format Examples

The following demonstrates the differences between the metadata.elasped field from a DataFrame based inference request vs an Apache Arrow table inference request.

  • Apache Arrow based inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")

# unconverted raw data from Arrow table inference

# time to parse input data

display(result_arrow["metadata.elapsed"][0][0])

# time to inference from parsed input data

display(result_arrow["metadata.elapsed"][0][1])

<pyarrow.UInt64Scalar: 1253219>

<pyarrow.UInt64Scalar: 1275320>

parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')

'Time to parse input data: 1253219'

display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')

'Time to inference from parsed data: 1253219'
  • pandas DataFrame based inference request
# no conversion needed for pandas DataFrame

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")

display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")

'Time to parse input data: 51879691'

'Time to inference from parsed data: 2310435'

Inference Example

The following example is an inference request using an Apache Arrow table. The inference result is returned as an Apache Arrow table, which is then converted into a Pandas DataFrame and a Polars DataFrame, with the results filtered based on results greater than 0.75.

result = ccfraud_pipeline.infer(ccfraud_input_1k_arrow_table)

display(result)

pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
  child 0, item: float
out.dense_1: list<inner: float not null> not null
  child 0, inner: float not null
anomaly.count: int8
----
time: [[2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,...,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562]]
in.tensor: [[[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],...,[0.49511018,-0.24993694,0.4553345,0.92427504,-0.36435103,...,1.1117147,-0.566654,0.12122019,0.06676402,0.6583282],[0.61188054,0.1726081,0.43105456,0.50321484,-0.27466634,...,0.30260187,0.081211455,-0.15578508,0.017189292,-0.7236631]]]
out.dense_1: [[[0.99300325],[0.99300325],...,[0.0008533001],[0.0012498498]]]
anomaly.count: [[0,0,0,0,0,...,0,0,0,0,0]]
import pyarrow as pa

list = [0.75]

outputs =  result.to_pandas()
# display(outputs)
filter = [elt[0] > 0.75 for elt in outputs['out.dense_1']]
outputs = outputs.loc[filter]
display(outputs)
&nbsptimein.tensorout.dense_1anomaly.count
02023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
12023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
22023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
32023-03-20 18:55:09.562[-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212][0.99300325]0
1612023-03-20 18:55:09.562[-9.716793, 9.174981, -14.450761, 8.653825, -11.039951, 0.6602411, -22.825525, -9.919395, -8.064324, -16.737926, 4.852197, -12.563343, -1.0762653, -7.524591, -3.2938414, -9.62102, -15.6501045, -7.089741, 1.7687134, 5.044906, -11.365625, 4.5987034, 4.4777045, 0.31702697, -2.2731977, 0.07944675, -10.052058, -2.024108, -1.0611985][1.0]0
9412023-03-20 18:55:09.562[-0.50492376, 1.9348029, -3.4217603, 2.2165704, -0.6545315, -1.9004827, -1.6786858, 0.5380051, -2.7229102, -5.265194, 3.504164, -5.4661765, 0.68954825, -8.725291, 2.0267954, -5.4717045, -4.9123807, -1.6131229, 3.8021576, 1.3881834, 1.0676425, 0.28200775, -0.30759808, -0.48498034, 0.9507336, 1.5118006, 1.6385275, 1.072455, 0.7959132][0.9873102]0
import polars as pl

outputs =  pl.from_arrow(result)

display(outputs.filter(pl.col("out.dense_1").apply(lambda x: x[0]) > 0.75))
timein.tensorout.dense_1anomaly.count
datetime[ms]list[f32]list[f32]i8
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-1.06033, 2.354497, … -1.446321][0.993003]0
2023-03-20 18:55:09.562[-9.716793, 9.174981, … -1.061198][1.0]0
2023-03-20 18:55:09.562[-0.504924, 1.934803, … 0.795913][0.98731]0

Metadata Requests Restrictions

The following restrictions are in place when requesting the datasets metadata or metadata.elapsed.

Standard Pipeline Steps

For the following Pipeline steps, metadata or metadata.elapsed must be requested with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["*", "metadata.elapsed"])

Effected pipeline steps:

  • add_model_step
  • replace_with_model_step
Testing Pipeline Steps

For the following Pipeline steps, meta or metadata.elapsed can not be included with the * parameter. For example:

result = mainpipeline.infer(normal_input, dataset=["metadata.elapsed"])

Effected pipeline steps:

  • add_random_split
  • replace_with_random_split
  • add_shadow_deploy
  • replace_with_shadow_deploy

Numpy Arrays as Inputs

Numpy arrays can be submitted as an input by containing it within a DataFrame. In this example, the input column is tensor, but can whatever the model expects.

dataframedata = pd.DataFrame({"tensor":[npArray]})

This bypasses the need to convert the npArray to a List - the object itself can be embedded into the DataFrame table and submitted. For this example, a DataFrame with the column tensor that contains a numpy array will be submitted as an inference, and from the return only the column out.2519 will be displayed.

infResults = pipeline.infer(dataframedata, dataset=["*", "metadata.elapsed"])
display(infResults.loc[0]["out.2519"])

[44,
 44,
 44,
 44,
 82,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 44,
 84,
 84,
 44,
 84,
 44,
 44,
 44,
 61,
 44,
 86,
 44,
 44]

Inference from File

To submit a data file directly to a pipeline, use the method wallaroo.pipeline.Pipeline.infer_from_file(data, timeout, dataset, dataset_exclude, dataset_separator) method.

Inference from File Parameters

infer_from_file takes the following arguments.

ParameterTypeDescription
filenameString* (REQUIRED)The data submitted to the pipeline for inference as either a pandas Record in JSON format, or Apache Arrow Table. The following data inputs are supported:
  • pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
  • Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
timeout(Integer or Float) (OPTIONAL)A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds.
datasetList[String] (OPTIONAL)The datasets to be returned. The datasets available are:
  • *: Default.This translates to ["time", "in", "out", "anomaly"].
  • time: The DateTime of the inference request.
  • in: All inputs listed as in.{variable_name}.
  • out: All outputs listed as out.{variable_name}.
  • anomaly: Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.
    • count The number of anomalies detected as an integer. Each pipeline validation the returns True adds to the number of anomalies detected.
    • {validation name}: Each pipeline validation added to the pipeline is returned as the field anomaly.{validation name}. Validations that return True indicate an anomaly detected based on the validation expression, while False indicates no anomaly found for the validation.
  • metadata: IMPORTANT NOTE: See Metadata Requests Restrictions for specifications on how to the metadata dataset requests in combination with other fields.
    • Returns in the metadata.elapsed field:
      • A list of time in nanoseconds for:
        • The time to serialize the input.
        • How long each step took.
    • Returns in the metadata.last_model field:
      • A dict with each Python step as:
        • model_name: The name of the model in the pipeline step.
        • model_sha : The sha hash of the model in the pipeline step.
      • Returns in the metadata.pipeline_version field:
        • The pipeline version as a UUID value.
    • Returns in the metadata.partition:
    • Returns in the metadata.partition:
    • metadata.elapsed: See Metadata Requests Restrictions for specifications on how to use meta or metadata dataset requests in combination with other fields.
      • Returns in the metadata.elapsed field:
        • A list of time in nanoseconds for:
          • The time to serialize the input.
          • How long each step took.
dataset_excludeList[String] (OPTIONAL)Allows users to exclude parts of the dataset.
dataset_separatorList[String] (OPTIONAL)Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.

Inference From File Returns

Inference Returns

  • If the tensor input is a pandas DataFrame, returns a pandas DataFrame.
  • If the tensor input is an Apache Arrow Table, returns an Apache Arrow table.

The following datasets are returned by default, modified by the dataset and dataset_exclude parameters.

FieldDescription
timeThe DateTime of the inference request.
inAll inputs listed as in.{variable_name}
outAll outputs listed as out.{variable_name}.
anomalyFlags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.

pipeline.infer_from_file metadata.elapsed Format

The inference result field metadata.elapsed format changes depending on the input type.

  • pandas DataFrame: If the inference request is in pandas Record format, metadata.elapsed is returned as an int.
  • Apache Arrow: If the inference request is an Apache Arrow table, metadata.elapsed is returned as pyarrow.Int64.

The following code is provided to convert metadata.elapsed into the same format for consistency.

import pyarrow as pa

if input_type == "json":
            parse_elapsed += results["metadata.elapsed"][0][0]
            inference_elapsed += results["metadata.elapsed"][0][1]
        elif input_type == "arrow":
            parse_elapsed += results["metadata.elapsed"][0][0].as_py()
            inference_elapsed += results["metadata.elapsed"][0][1].as_py()
        else:
            assert False, f"Bad input type {input_type}"
pipeline.infer_from_file metadata.elapsed Format Examples

The following demonstrates the differences between the metadata.elasped field from a DataFrame based inference request vs an Apache Arrow table inference request.

  • Apache Arrow based inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")

# unconverted raw data from Arrow table inference

# time to parse input data

display(result_arrow["metadata.elapsed"][0][0])

# time to inference from parsed input data

display(result_arrow["metadata.elapsed"][0][1])

<pyarrow.UInt64Scalar: 1253219>

<pyarrow.UInt64Scalar: 1275320>

parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')

'Time to parse input data: 1253219'

display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')

'Time to inference from parsed data: 1253219'
  • pandas DataFrame based inference request
# no conversion needed for pandas DataFrame

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])

display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")

display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")

'Time to parse input data: 51879691'

'Time to inference from parsed data: 2310435'

Inference From File Example

In this example, an inference of 50K inferences as an Apache Arrow file will be submitted to a model trained for reviewing IMDB reviews, and the first 5 results displayed.

results = imdb_pipeline.infer_from_file('./data/test_data_50K.arrow')
import polars as pl

outputs =  pl.from_arrow(results)
display(outputs.head(5))

shape: (5, 4)
timein.tensorout.dense_1anomaly.count
datetime[ms]list[f32]list[f32]i8
2023-03-20 20:53:50.170[11.0, 6.0, … 0.0][0.898019]0
2023-03-20 20:53:50.170[54.0, 548.0, … 20.0][0.056597]0
2023-03-20 20:53:50.170[1.0, 9259.0, … 1.0][0.92608]0
2023-03-20 20:53:50.170[10.0, 25.0, … 0.0][0.926919]0
2023-03-20 20:53:50.170[10.0, 37.0, … 0.0][0.661858]0

In this example, an inference will be submitted to the ccfraud_pipeline with the file smoke_test.df.json, a DataFrame formatted JSON file.

result = ccfraud_pipeline.infer_from_file('./data/smoke_test.df.json')
 timein.tensorout.dense_1anomaly.count
02023-02-15 23:07:07.497[1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756][0.0014974177]0

Asynchronous Inference

The method async wallaroo.pipeline.Pipeline.async_infer provides asynchronous inference requests to a deployed model. This allows organizations to submit inference requests, proceed with other tasks, then respond when the inference request is complete.

Asynchronous Inference Parameters

async_infer takes the following parameters.

ParameterTypeDescription
tensorpandas.core.frame.DataFrame, pyarrow.lib.Table (Required)Accepts either a pandas DataFrame or an Apache Arrow Table.
async_clienthttpx.AsyncClient (Required)AsyncClient to use for async inference.
timeoutInteger, Float, None (Optional Default: 15)The timeout period for an inference request in seconds.
retriesInteger, None (Optional)The number of retries from connection errors.
datasetList[String], None (Optional)The datasets to return. See DataFrame and Arrow for details on Wallaroo inference datasets.
dataset_excludeList[String], None (Optional)The datasets to exclude. See DataFrame and Arrow for details on Wallaroo inference datasets.
dataset_separatorString (Optional)Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.

Asynchronous Inference Returns

async_infer returns:

  • If the tensor input is a pandas DataFrame, returns a pandas DataFrame.
  • If the tensor input is an Apache Arrow Table, returns an Apache Arrow table.

The following datasets are returned by default, modified by the dataset and dataset_exclude parameters.

FieldDescription
timeThe DateTime of the inference request.
inAll inputs listed as in.{variable_name}
outAll outputs listed as out.{variable_name}.
anomalyFlags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True. For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset.

Asynchronous Inference Example

The following example demonstrates using async_infer to perform an inference request with other tasks executed between the inference request and receiving the result.

For this example:

  • The method run_one_batch reads an Apache Arrow Table and submits it through async_infer.
  • Three files are provided and used as inputs to execute run_one_batch three times.
  • The results from each async_infer are gathered together and published as one set.
async def run_one_batch(file):
    with pa.ipc.open_file(file) as source:
        table = source.read_all() # to get pyarrow table

    # ... potentially do other processing here or request data from a feature store ...
    
    result_async_batch = await aloha_pipeline.async_infer(tensor=table,
                                                      async_client=async_client,
                                                      retries=2)
    return result_async_batch

# Process three batches in parallel. Inference on the first will start while
# the second file is read, and so on. Multiple inferences may be running at
# the same time if the engine replicas are > 1.
tasks = []
for file in ["./data/data_25k.arrow", "./data/data_1k.arrow", "./data/data_25k.arrow"]:
     tasks.append(run_one_batch(file))

# Wait for all three batches to finish.
results = await asyncio.gather(*tasks, return_exceptions=True)

for result in results:
    print(f"Number of rows for task: {result.num_rows}")
    display(result)
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
  child 0, item: float
out.banjori: list<inner: float not null> not null
  child 0, inner: float not null
out.corebot: list<inner: float not null> not null
  child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
  child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
  child 0, inner: float not null
out.gozi: list<inner: float not null> not null
  child 0, inner: float not null
out.kraken: list<inner: float not null> not null
  child 0, inner: float not null
out.locky: list<inner: float not null> not null
  child 0, inner: float not null
out.main: list<inner: float not null> not null
  child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
  child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
  child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
  child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
  child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
  child 0, inner: float not null
out.simda: list<inner: float not null> not null
  child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
  child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,...,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...
Number of rows for task: 1000
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
  child 0, item: float
out.banjori: list<inner: float not null> not null
  child 0, inner: float not null
out.corebot: list<inner: float not null> not null
  child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
  child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
  child 0, inner: float not null
out.gozi: list<inner: float not null> not null
  child 0, inner: float not null
out.kraken: list<inner: float not null> not null
  child 0, inner: float not null
out.locky: list<inner: float not null> not null
  child 0, inner: float not null
out.main: list<inner: float not null> not null
  child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
  child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
  child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
  child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
  child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
  child 0, inner: float not null
out.simda: list<inner: float not null> not null
  child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
  child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,...,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,29,12,36,31,12],...,[0,0,0,0,0,...,35,16,35,27,16],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[0.00002837503],...,[0.0000056315566],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[0.000012753118],...,[0.0000033642746],[1.1029468e-9]]]
out.cryptolocker: [[[0.012099549],[0.025435215],...,[0.13612255],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[6.150966e-10],...,[5.6732154e-11],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[2.321774e-10],...,[2.773063e-8],[8.43847e-15]]]
out.kraken: [[[0.00031977257],[0.051351104],...,[0.0025221605],[0.30495816]]]
out.locky: [[[0.011029262],[0.022038758],...,[0.05455697],[0.116279885]]]
out.main: [[[0.997564],[0.9885122],...,[0.9998954],[0.99999803]]]
...
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
  child 0, item: float
out.banjori: list<inner: float not null> not null
  child 0, inner: float not null
out.corebot: list<inner: float not null> not null
  child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
  child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
  child 0, inner: float not null
out.gozi: list<inner: float not null> not null
  child 0, inner: float not null
out.kraken: list<inner: float not null> not null
  child 0, inner: float not null
out.locky: list<inner: float not null> not null
  child 0, inner: float not null
out.main: list<inner: float not null> not null
  child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
  child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
  child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
  child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
  child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
  child 0, inner: float not null
out.simda: list<inner: float not null> not null
  child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
  child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,...,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...

Parallel Inference

The await wallaroo.pipeline.Pipeline.parallel_infer(tensor, timeout, num_parallel, retries) asynchronous method performs an inference as defined by the pipeline steps.

Parallel Inference Parameters

ParameterTypeDescription
tensorpandas.DataFrame OR pyarrow.Table (Required)The data submitted to the pipeline for inference a pandas.DataFrame or Apache Arrow pyarrow.Table
timeoutInteger (Optional)A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
num_parallelInteger (Optional)The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
retriesInteger (Optional)The number of retries per inference request submitted.
datasetList(String) (Optional)The dataset of the inference result to return. By default this is set to ["*"] which returns [“time”, “in”, “out”, “anomaly”]. Other available datasets - [“metadata”]. IMPORTANT NOTE: 'time' must be included in the returned datasets for the Wallaroo SDK, or the SDK will return an error. n. For example: dataset=['time', 'out'] returned the datasets time and in, and no others.
dataset_excludeList(String) (Optional)The datasets to exclude in the inference results returned values. For example: dataset=['*'], dataset_exclude['in'] would exclude the in dataset from being returned, but include the default datasets of [’time’, ‘out’, ‘anomaly’]. IMPORTANT NOTE: 'time' can not be excluded for the Wallaroo SDK, or the SDK will return an error.

Parallel Inference Returns

wallaroo.pipeline.Pipeline.parallel_infer returns one of the following based on the tensor input parameter.

  • If a pandas DataFrame was submitted, a pandas DataFrame is returned.
  • If an Apache Arrow table was submitted, an Apache Arrow table is returned.

The following fields are returned based on the dataset and dataset_exclude parameters.

ParameterTypeDescription
timeDateTimeThe DateTime of the returned inference result.
inAnyThe input parameters. Each in dataset field correlates to the input field from the inference request. For example, the inputs ['year_built', 'last_renovated', 'number_of_bedrooms'] generates the in dataset `[‘in.year_built’, ‘in.last_renovated’, ‘in.number_of_bedrooms’]
outAnyThe output parameters. Each out dataset field correlates to the final output of the pipeline. For example, if the final output is ['house_price', 'recommended_starting_bid'], the output fields would be ['out.house_price', 'out.recommended_starting_bid].
anomalyAnyThe detected anomalies based on the validations added to the pipeline. The field anomaly.count is always included, which displays a count of all validations that returned True, which indicates a detected anomaly. Other anomalies are listed by anomaly.{validation_name}. For more information, see Wallaroo Anomaly Detection.
metaDictIncludes the following fields with the metadata dataset.
  1. metadata.elapsed:A List of time in nanoseconds for:
    1. The time to serialize the input.
    2. How long each step took.
  2. metadata.last_model: A dict with each Python step as:
    1. model_name: The name of the model in the pipeline step.
    2. model_sha : The sha hash of the model in the pipeline step.
  3. metadata.pipeline_version: The pipeline version as a UUID value.
  4. metadata.partition: The partition used to store the inference results from this pipeline. This is mainly used when adding Wallaroo Server edge deployments to a published pipeline and separating the inference results from those edge deployments.

Parallel Inference Examples

Sequential Inference Example

The first example will show taking a pandas DataFrame with 25,000 rows, splitting those into 25,000 separate DataFrames with one row each, then submitting them sequentially. This simulates a situation where the data input sizes are so large they must be broken up for more efficient transmission and inferencing.

test_data = pd.read_json("./data/data_25k.df.json")
test_list = []

for index, row in test_data.head(1000).iterrows():
    test_list.append(row.to_frame('text_input').reset_index(drop=True))
# show the first row as an example
test_list[0]
text_input
0[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17]
#
# Run the inference sequentially to establish a baseline
#
now = datetime.datetime.now()

results = []
for df in test_list:
    results.append(aloha_pipeline.infer(tensor=df, timeout=10))

total_sequential = datetime.datetime.now() - now

print(f"Elapsed = {total_sequential.total_seconds()} : {len(results)}")
Elapsed = 37.917126 : 1000
Parallel Infer with DataFrame Example

The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.

We then compare it to the List of pandas DataFrames submitted sequentially.

timeout_secs=30
now = datetime.datetime.now()
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data.head(1000), 
                                                       timeout=timeout_secs, 
                                                       num_parallel=2*REPLICAS, 
                                                       retries=3)

total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 7.691442 : 1000
parallel_results.head(20)
timein.text_inputout.banjoriout.corebotout.cryptolockerout.dircryptout.goziout.krakenout.lockyout.mainout.matsnuout.pykspaout.qakbotout.ramdoout.ramnitout.simdaout.suppoboxanomaly.count
02024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17][0.0015195842][0.98291475][0.012099553][4.7591206e-05][2.0289332e-05][0.00031977228][0.011029261][0.997564][0.010341614][0.008038961][0.016155045][0.0062362333][0.0009985747][1.7933435e-26][1.3889844e-27]0
12024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 20, 19, 27, 14, 17, 24, 13, 23, 20, 18, 35, 18, 22, 23][7.447168e-18][6.735899e-08][0.17081985][1.3220147e-09][1.2758705e-24][0.22559549][0.3420985][0.99999994][0.3080186][0.1828217][3.8022332e-11][0.2062254][0.15215823][1.1701893e-30][3.1513734e-38]0
22024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 33, 25, 36, 25, 31, 14, 32, 36, 25, 12, 35, 34, 30, 28, 27, 24, 29, 27][2.8598976e-21][9.301987e-08][0.04445297][6.163758e-09][8.3497386e-23][0.48234487][0.263329][1.0][0.29800338][0.22361773][1.5238979e-06][0.32820383][0.029332481][1.1995622e-31][0.0]0
32024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 23, 22, 15, 12, 35, 34, 36, 12, 18, 24, 34, 32, 36, 12, 14, 16, 27, 22, 23][2.1387213e-15][3.8817338e-10][0.04559972][1.9090405e-07][1.3140474e-25][0.5954264][0.17374131][0.9999997][0.23151566][0.1759168][1.0876193e-09][0.21832275][0.012869264][6.1588803e-28][1.4386127e-35]0
42024-03-08 15:32:26.280[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 13, 14, 12, 33, 16, 23, 15, 22, 30, 28, 26, 12, 16, 32, 37, 29, 22, 28, 22, 16, 27, 32][9.4533425e-15][7.091165e-10][0.04981512][5.2914135e-09][7.413152e-19][1.5504575e-13][1.0791892e-15][0.9999989][1.5003076e-15][0.3307571][2.625885e-07][0.50362796][0.020393759][0.0][2.329197e-38]0
52024-03-08 15:32:26.293[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 29, 20, 33, 13, 36, 35, 30, 21, 29, 17, 26, 19, 25, 36, 14, 23, 16, 18, 15, 21, 18, 28, 35, 19][1.7247285e-17][8.1354194e-08][0.013697129][5.6086392e-11][1.4032912e-17][0.4946911][0.11978862][0.99999994][0.19000016][0.10596694][5.524429e-06][0.24210057][0.0069435085][1.2804814e-34][9.482465e-35]0
62024-03-08 15:32:26.299[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 36, 14, 12, 23, 14, 13, 20, 20, 23, 27, 36, 29, 35, 19, 33, 22, 25, 26, 32, 21][5.5500585e-18][3.3608643e-07][0.023452938][1.1318812e-10][1.0496957e-22][0.23692927][0.064456925][0.99999183][0.07306592][0.06499427][1.4302767e-08][0.11925243][0.0011031023][1.5206224e-32][0.0]0
72024-03-08 15:32:26.301[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 28, 23, 20, 25, 21, 20, 16, 12, 33, 21, 14, 34, 34, 32, 19, 36, 17, 29, 26, 14, 29][3.9222568e-18][1.407435e-10][0.010946895][8.202812e-11][2.454965e-24][0.42107278][0.071240015][0.9982491][0.118182994][0.08340969][1.9207886e-09][0.16958171][0.0005199058][0.0][0.0]0
82024-03-08 15:32:26.302[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 27, 30, 33, 29, 37, 24, 33, 16, 20, 24][4.0574426e-11][1.087887e-09][0.17916852][1.7313088e-06][8.697261e-18][9.197122e-16][3.8521073e-17][0.9999977][3.2654394e-17][0.32568428][6.834302e-09][0.37007827][0.44918332][0.0][2.082403e-26]0
92024-03-08 15:32:26.311[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 29, 19, 35, 31, 15, 14, 21, 26, 31, 34, 27, 22][2.2576288e-09][2.0812616e-09][0.17788404][1.1887528e-08][1.078572e-11][0.041252796][0.21430437][0.9999988][0.17853741][0.13382334][0.000114089744][0.14033836][0.011299953][3.575825e-24][7.164664e-24]0
102024-03-08 15:32:26.324[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 25, 19, 26, 30, 19, 29, 37, 20, 24, 23, 22, 20, 20, 12, 35, 29, 26, 16, 35, 36, 32, 23, 19][7.892642e-12][3.0390834e-07][0.015696561][5.4462657e-13][1.2192533e-22][2.9611054e-17][2.630555e-20][0.9999961][6.9846006e-20][0.28895643][1.8219538e-10][0.5132747][0.03162864][0.0][6.496134e-32]0
112024-03-08 15:32:26.324[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 33, 34, 15, 23, 28, 34, 18, 33, 33][2.6560714e-16][5.9408145e-09][0.12814313][3.3345504e-08][2.2118923e-18][0.3078206][0.27681428][0.9999999][0.27904558][0.17737378][7.047457e-08][0.17205144][0.20136176][3.6787982e-29][4.919293e-33]0
122024-03-08 15:32:26.332[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 28, 30, 13, 26, 28, 30, 33, 25, 30, 31, 30, 26, 34, 19, 18, 23, 18, 15][1.9262531e-07][0.00011627592][0.015093419][6.0622415e-06][2.7445957e-08][0.1944085][0.11690311][0.9999991][0.17412043][0.06493864][0.49536943][0.08959357][0.005527823][2.4333167e-38][1.3592863e-25]0
132024-03-08 15:32:26.340[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 19, 17, 22, 25, 35, 29, 26, 15][1.8286044e-05][0.00021055655][0.012560264][1.669594e-12][1.2260803e-07][0.007982212][0.01670425][0.017594405][0.017098006][0.011611045][0.00011716164][0.009795011][0.010660369][3.187273e-35][6.004795e-27]0
142024-03-08 15:32:26.364[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 17, 34, 27, 18, 18, 30][3.6237112e-22][1.0416503e-05][0.3348774][2.1746243e-06][8.617319e-23][0.029006457][0.20757225][0.99999344][0.13615955][0.08263349][2.8077036e-09][0.056751817][0.100090414][1.0977599e-18][1.6076299e-32]0
152024-03-08 15:32:26.369[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 21, 19, 31, 31, 19, 36, 22, 12, 37, 18, 22, 31, 29][2.6339812e-11][3.014685e-10][0.04157271][2.9721992e-11][4.1457778e-19][2.8498805e-12][1.0917219e-13][0.99999815][1.5328618e-13][0.15687591][6.499695e-07][0.2797901][0.07243411][6.264585e-28][3.7361817e-33]0
162024-03-08 15:32:26.371[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 15, 26, 15, 14, 19, 32, 24, 33, 13, 20, 22, 32, 14, 25, 26, 35, 22, 12, 31, 23, 19, 31][2.3916345e-11][1.0221278e-06][0.0036410673][3.0198066e-10][6.5029376e-10][0.01702937][0.024708282][0.99999654][0.031047806][0.029724406][1.1598447e-05][0.053846903][6.46801e-05][1.9701536e-31][8.561327e-37]0
172024-03-08 15:32:26.370[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 31, 21, 21, 26, 33, 17, 16, 23, 28, 20, 19, 29, 25, 24, 30, 20, 35, 19, 36][5.989164e-14][4.957251e-05][0.014003561][6.2121716e-13][6.8363566e-18][0.15793473][0.0400572][0.9999906][0.057762165][0.036209285][1.1137857e-06][0.05882588][0.021252671][2.852255e-32][2.9058335e-35]0
182024-03-08 15:32:26.392[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 31, 14, 18, 29, 22, 19, 13, 23, 36, 16, 24, 17, 31, 35, 24, 26, 33, 37][7.04367e-15][3.8310918e-10][0.010476033][5.5391797e-13][4.2660885e-18][1.8002026e-13][3.1393156e-15][0.99999946][5.5198312e-15][0.14957657][3.9449355e-07][0.31189042][0.0042013763][0.0][3.35857e-34]0
192024-03-08 15:32:26.399[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 20, 20, 31, 30, 33, 32, 23, 24][8.1578895e-05][0.005662437][0.25973395][0.00036145633][2.2012752e-13][0.022834523][0.16723366][0.9992838][0.116028585][0.06689821][9.2623617e-07][0.03539415][0.22199522][1.975508e-20][9.651789e-15]0
Parallel Infer with Apache Arrow Table

The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.

We then compare it to the List of pandas DataFrames submitted sequentially.

# convert the DataFrame to an Arrow Table

import pyarrow as pa
timeout_secs=30
now = datetime.datetime.now()

test_data_arrow_table = pa.Table.from_pandas(test_data.head(1000))

parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data_arrow_table, 
                                                       timeout=timeout_secs, 
                                                       num_parallel=2*REPLICAS, 
                                                       retries=3)

total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 3.364765 : 1000

We can quickly see the speed benefits of using parallel infer requests over sequential requests, and even more speed when going from a pandas DataFrame to an Apache Arrow table.

Output Formats

DataFrame and Arrow

Output formats are based on the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects.

The default columns returned are:

  • time: The DateTime of the inference request.
  • in: The input data.
  • out: The output data. Outputs of the inference are based on the model’s outputs as out.{model_output}. This model only has one output - dense_1, which is listed in the out.dense_1 column. If the model has multiple outputs, they would be listed as out.{outputname1}, out.{outputname2}, etc.
  • anomaly: Whether any Pipeline validation parameters were triggered. Includes the following fields.
    • anomaly.count: Any anomalies detected from validations.
    • anomaly.{validation}: The validation that triggered the anomaly detection and whether it is True (indicating an anomaly was detected) or False. For more details, see Wallaroo SDK Essentials Guide: Anomaly Detection

Columns returned are controlled by the dataset_exclude array parameter, which specifies which output columns to ignore. For example, if a model outputs the columns out.rambo, out.main, out.glibnar, using the parameter dataset_exclude=["out.rambo", "out.glibnar"] will exclude those columns from the output.