Parallel Infer

Leverage parallel inference requests to manage multiple inference requests in one request.

Parallel Inference

The await wallaroo.pipeline.Pipeline.parallel_infer(tensor, timeout, num_parallel, retries) asynchronous method performs an inference as defined by the pipeline steps.

Parallel Inference Parameters

ParameterTypeDescription
tensorpandas.DataFrame OR pyarrow.Table (Required)The data submitted to the pipeline for inference a pandas.DataFrame or Apache Arrow pyarrow.Table
timeoutInteger (Optional)A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
num_parallelInteger (Optional)The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
retriesInteger (Optional)The number of retries per inference request submitted.
datasetList[String] (Optional)The dataset of the inference result to return. By default this is set to ["*"] which returns [“time”, “in”, “out”, “anomaly”]. Other available datasets - [“metadata”]. IMPORTANT NOTE: 'time' must be included in the returned datasets for the Wallaroo SDK, or the SDK will return an error. n. For example: dataset=['time', 'out'] returned the datasets time and in, and no others.
dataset_excludeList[String] (Optional)The datasets to exclude in the inference results returned values. For example: dataset=['*'], dataset_exclude['in'] would exclude the in dataset from being returned, but include the default datasets of [’time’, ‘out’, ‘anomaly’]. IMPORTANT NOTE: 'time' can not be excluded for the Wallaroo SDK, or the SDK will return an error.

Parallel Inference Returns

wallaroo.pipeline.Pipeline.parallel_infer returns one of the following based on the tensor input parameter.

  • If a pandas DataFrame was submitted, a pandas DataFrame is returned.
  • If an Apache Arrow table was submitted, an Apache Arrow table is returned.

The following fields are returned based on the dataset and dataset_exclude parameters.

ParameterTypeDescription
timeDateTimeThe DateTime of the returned inference result.
inAnyThe input parameters. Each in dataset field correlates to the input field from the inference request. For example, the inputs ['year_built', 'last_renovated', 'number_of_bedrooms'] generates the in dataset `[‘in.year_built’, ‘in.last_renovated’, ‘in.number_of_bedrooms’]
outAnyThe output parameters. Each out dataset field correlates to the final output of the pipeline. For example, if the final output is ['house_price', 'recommended_starting_bid'], the output fields would be ['out.house_price', 'out.recommended_starting_bid].
anomalyAnyThe detected anomalies based on the validations added to the pipeline. The field anomaly.count is always included, which displays a count of all validations that returned True, which indicates a detected anomaly. Other anomalies are listed by anomaly.{validation_name}. For more information, see Wallaroo Anomaly Detection.
metaDictIncludes the following fields with the metadata dataset.
  1. metadata.elapsed:A List of time in nanoseconds for:
    1. The time to serialize the input.
    2. How long each step took.
  2. metadata.last_model: A dict with each Python step as:
    1. model_name: The name of the model in the pipeline step.
    2. model_sha : The sha hash of the model in the pipeline step.
  3. metadata.pipeline_version: The pipeline version as a UUID value.
  4. metadata.partition: The partition used to store the inference results from this pipeline. This is mainly used when adding Wallaroo Server edge deployments to a published pipeline and separating the inference results from those edge deployments.

Parallel Inference Examples

Sequential Inference Example

The first example will show taking a pandas DataFrame with 25,000 rows, splitting those into 25,000 separate DataFrames with one row each, then submitting them sequentially. This simulates a situation where the data input sizes are so large they must be broken up for more efficient transmission and inferencing.

test_data = pd.read_json("./data/data_25k.df.json")
test_list = []

for index, row in test_data.head(1000).iterrows():
    test_list.append(row.to_frame('text_input').reset_index(drop=True))
# show the first row as an example
test_list[0]
text_input
0[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17]
#
# Run the inference sequentially to establish a baseline
#
now = datetime.datetime.now()

results = []
for df in test_list:
    results.append(aloha_pipeline.infer(tensor=df, timeout=10))

total_sequential = datetime.datetime.now() - now

print(f"Elapsed = {total_sequential.total_seconds()} : {len(results)}")
Elapsed = 37.917126 : 1000
Parallel Infer with DataFrame Example

The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.

We then compare it to the List of pandas DataFrames submitted sequentially.

timeout_secs=30
now = datetime.datetime.now()
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data.head(1000), 
                                                       timeout=timeout_secs, 
                                                       num_parallel=2*REPLICAS, 
                                                       retries=3)

total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 7.691442 : 1000
parallel_results.head(20)
timein.text_inputout.banjoriout.corebotout.cryptolockerout.dircryptout.goziout.krakenout.lockyout.mainout.matsnuout.pykspaout.qakbotout.ramdoout.ramnitout.simdaout.suppoboxanomaly.count
02024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17][0.0015195842][0.98291475][0.012099553][4.7591206e-05][2.0289332e-05][0.00031977228][0.011029261][0.997564][0.010341614][0.008038961][0.016155045][0.0062362333][0.0009985747][1.7933435e-26][1.3889844e-27]0
12024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 20, 19, 27, 14, 17, 24, 13, 23, 20, 18, 35, 18, 22, 23][7.447168e-18][6.735899e-08][0.17081985][1.3220147e-09][1.2758705e-24][0.22559549][0.3420985][0.99999994][0.3080186][0.1828217][3.8022332e-11][0.2062254][0.15215823][1.1701893e-30][3.1513734e-38]0
22024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 33, 25, 36, 25, 31, 14, 32, 36, 25, 12, 35, 34, 30, 28, 27, 24, 29, 27][2.8598976e-21][9.301987e-08][0.04445297][6.163758e-09][8.3497386e-23][0.48234487][0.263329][1.0][0.29800338][0.22361773][1.5238979e-06][0.32820383][0.029332481][1.1995622e-31][0.0]0
32024-03-08 15:32:26.253[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 23, 22, 15, 12, 35, 34, 36, 12, 18, 24, 34, 32, 36, 12, 14, 16, 27, 22, 23][2.1387213e-15][3.8817338e-10][0.04559972][1.9090405e-07][1.3140474e-25][0.5954264][0.17374131][0.9999997][0.23151566][0.1759168][1.0876193e-09][0.21832275][0.012869264][6.1588803e-28][1.4386127e-35]0
42024-03-08 15:32:26.280[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 13, 14, 12, 33, 16, 23, 15, 22, 30, 28, 26, 12, 16, 32, 37, 29, 22, 28, 22, 16, 27, 32][9.4533425e-15][7.091165e-10][0.04981512][5.2914135e-09][7.413152e-19][1.5504575e-13][1.0791892e-15][0.9999989][1.5003076e-15][0.3307571][2.625885e-07][0.50362796][0.020393759][0.0][2.329197e-38]0
52024-03-08 15:32:26.293[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 29, 20, 33, 13, 36, 35, 30, 21, 29, 17, 26, 19, 25, 36, 14, 23, 16, 18, 15, 21, 18, 28, 35, 19][1.7247285e-17][8.1354194e-08][0.013697129][5.6086392e-11][1.4032912e-17][0.4946911][0.11978862][0.99999994][0.19000016][0.10596694][5.524429e-06][0.24210057][0.0069435085][1.2804814e-34][9.482465e-35]0
62024-03-08 15:32:26.299[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 36, 14, 12, 23, 14, 13, 20, 20, 23, 27, 36, 29, 35, 19, 33, 22, 25, 26, 32, 21][5.5500585e-18][3.3608643e-07][0.023452938][1.1318812e-10][1.0496957e-22][0.23692927][0.064456925][0.99999183][0.07306592][0.06499427][1.4302767e-08][0.11925243][0.0011031023][1.5206224e-32][0.0]0
72024-03-08 15:32:26.301[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 28, 23, 20, 25, 21, 20, 16, 12, 33, 21, 14, 34, 34, 32, 19, 36, 17, 29, 26, 14, 29][3.9222568e-18][1.407435e-10][0.010946895][8.202812e-11][2.454965e-24][0.42107278][0.071240015][0.9982491][0.118182994][0.08340969][1.9207886e-09][0.16958171][0.0005199058][0.0][0.0]0
82024-03-08 15:32:26.302[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 27, 30, 33, 29, 37, 24, 33, 16, 20, 24][4.0574426e-11][1.087887e-09][0.17916852][1.7313088e-06][8.697261e-18][9.197122e-16][3.8521073e-17][0.9999977][3.2654394e-17][0.32568428][6.834302e-09][0.37007827][0.44918332][0.0][2.082403e-26]0
92024-03-08 15:32:26.311[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 29, 19, 35, 31, 15, 14, 21, 26, 31, 34, 27, 22][2.2576288e-09][2.0812616e-09][0.17788404][1.1887528e-08][1.078572e-11][0.041252796][0.21430437][0.9999988][0.17853741][0.13382334][0.000114089744][0.14033836][0.011299953][3.575825e-24][7.164664e-24]0
102024-03-08 15:32:26.324[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 25, 19, 26, 30, 19, 29, 37, 20, 24, 23, 22, 20, 20, 12, 35, 29, 26, 16, 35, 36, 32, 23, 19][7.892642e-12][3.0390834e-07][0.015696561][5.4462657e-13][1.2192533e-22][2.9611054e-17][2.630555e-20][0.9999961][6.9846006e-20][0.28895643][1.8219538e-10][0.5132747][0.03162864][0.0][6.496134e-32]0
112024-03-08 15:32:26.324[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 33, 34, 15, 23, 28, 34, 18, 33, 33][2.6560714e-16][5.9408145e-09][0.12814313][3.3345504e-08][2.2118923e-18][0.3078206][0.27681428][0.9999999][0.27904558][0.17737378][7.047457e-08][0.17205144][0.20136176][3.6787982e-29][4.919293e-33]0
122024-03-08 15:32:26.332[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 28, 30, 13, 26, 28, 30, 33, 25, 30, 31, 30, 26, 34, 19, 18, 23, 18, 15][1.9262531e-07][0.00011627592][0.015093419][6.0622415e-06][2.7445957e-08][0.1944085][0.11690311][0.9999991][0.17412043][0.06493864][0.49536943][0.08959357][0.005527823][2.4333167e-38][1.3592863e-25]0
132024-03-08 15:32:26.340[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 19, 17, 22, 25, 35, 29, 26, 15][1.8286044e-05][0.00021055655][0.012560264][1.669594e-12][1.2260803e-07][0.007982212][0.01670425][0.017594405][0.017098006][0.011611045][0.00011716164][0.009795011][0.010660369][3.187273e-35][6.004795e-27]0
142024-03-08 15:32:26.364[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 17, 34, 27, 18, 18, 30][3.6237112e-22][1.0416503e-05][0.3348774][2.1746243e-06][8.617319e-23][0.029006457][0.20757225][0.99999344][0.13615955][0.08263349][2.8077036e-09][0.056751817][0.100090414][1.0977599e-18][1.6076299e-32]0
152024-03-08 15:32:26.369[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 21, 19, 31, 31, 19, 36, 22, 12, 37, 18, 22, 31, 29][2.6339812e-11][3.014685e-10][0.04157271][2.9721992e-11][4.1457778e-19][2.8498805e-12][1.0917219e-13][0.99999815][1.5328618e-13][0.15687591][6.499695e-07][0.2797901][0.07243411][6.264585e-28][3.7361817e-33]0
162024-03-08 15:32:26.371[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 15, 26, 15, 14, 19, 32, 24, 33, 13, 20, 22, 32, 14, 25, 26, 35, 22, 12, 31, 23, 19, 31][2.3916345e-11][1.0221278e-06][0.0036410673][3.0198066e-10][6.5029376e-10][0.01702937][0.024708282][0.99999654][0.031047806][0.029724406][1.1598447e-05][0.053846903][6.46801e-05][1.9701536e-31][8.561327e-37]0
172024-03-08 15:32:26.370[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 31, 21, 21, 26, 33, 17, 16, 23, 28, 20, 19, 29, 25, 24, 30, 20, 35, 19, 36][5.989164e-14][4.957251e-05][0.014003561][6.2121716e-13][6.8363566e-18][0.15793473][0.0400572][0.9999906][0.057762165][0.036209285][1.1137857e-06][0.05882588][0.021252671][2.852255e-32][2.9058335e-35]0
182024-03-08 15:32:26.392[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 31, 14, 18, 29, 22, 19, 13, 23, 36, 16, 24, 17, 31, 35, 24, 26, 33, 37][7.04367e-15][3.8310918e-10][0.010476033][5.5391797e-13][4.2660885e-18][1.8002026e-13][3.1393156e-15][0.99999946][5.5198312e-15][0.14957657][3.9449355e-07][0.31189042][0.0042013763][0.0][3.35857e-34]0
192024-03-08 15:32:26.399[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 20, 20, 31, 30, 33, 32, 23, 24][8.1578895e-05][0.005662437][0.25973395][0.00036145633][2.2012752e-13][0.022834523][0.16723366][0.9992838][0.116028585][0.06689821][9.2623617e-07][0.03539415][0.22199522][1.975508e-20][9.651789e-15]0
Parallel Infer with Apache Arrow Table

The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.

We then compare it to the List of pandas DataFrames submitted sequentially.

# convert the DataFrame to an Arrow Table

import pyarrow as pa
timeout_secs=30
now = datetime.datetime.now()

test_data_arrow_table = pa.Table.from_pandas(test_data.head(1000))

parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data_arrow_table, 
                                                       timeout=timeout_secs, 
                                                       num_parallel=2*REPLICAS, 
                                                       retries=3)

total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 3.364765 : 1000

We can quickly see the speed benefits of using parallel infer requests over sequential requests, and even more speed when going from a pandas DataFrame to an Apache Arrow table.