Parallel Infer
Parallel Inference
The await wallaroo.pipeline.Pipeline.parallel_infer(tensor, timeout, num_parallel, retries) asynchronous method performs an inference as defined by the pipeline steps.
Parallel Inference Parameters
| Parameter | Type | Description |
|---|---|---|
| tensor | pandas.DataFrame OR pyarrow.Table (Required) | The data submitted to the pipeline for inference a pandas.DataFrame or Apache Arrow pyarrow.Table |
| timeout | Integer (Optional) | A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds. |
| num_parallel | Integer (Optional) | The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas. |
| retries | Integer (Optional) | The number of retries per inference request submitted. |
| dataset | List[String] (Optional) | The dataset of the inference result to return. By default this is set to ["*"] which returns [“time”, “in”, “out”, “anomaly”]. Other available datasets - [“metadata”]. IMPORTANT NOTE: 'time' must be included in the returned datasets for the Wallaroo SDK, or the SDK will return an error. n. For example: dataset=['time', 'out'] returned the datasets time and in, and no others. |
| dataset_exclude | List[String] (Optional) | The datasets to exclude in the inference results returned values. For example: dataset=['*'], dataset_exclude['in'] would exclude the in dataset from being returned, but include the default datasets of [’time’, ‘out’, ‘anomaly’]. IMPORTANT NOTE: 'time' can not be excluded for the Wallaroo SDK, or the SDK will return an error. |
Parallel Inference Returns
wallaroo.pipeline.Pipeline.parallel_infer returns one of the following based on the tensor input parameter.
- If a pandas DataFrame was submitted, a pandas DataFrame is returned.
- If an Apache Arrow table was submitted, an Apache Arrow table is returned.
The following fields are returned based on the dataset and dataset_exclude parameters.
| Parameter | Type | Description |
|---|---|---|
| time | DateTime | The DateTime of the returned inference result. |
| in | Any | The input parameters. Each in dataset field correlates to the input field from the inference request. For example, the inputs ['year_built', 'last_renovated', 'number_of_bedrooms'] generates the in dataset `[‘in.year_built’, ‘in.last_renovated’, ‘in.number_of_bedrooms’] |
| out | Any | The output parameters. Each out dataset field correlates to the final output of the pipeline. For example, if the final output is ['house_price', 'recommended_starting_bid'], the output fields would be ['out.house_price', 'out.recommended_starting_bid]. |
| anomaly | Any | The detected anomalies based on the validations added to the pipeline. The field anomaly.count is always included, which displays a count of all validations that returned True, which indicates a detected anomaly. Other anomalies are listed by anomaly.{validation_name}. For more information, see Wallaroo Anomaly Detection. |
| meta | Dict | Includes the following fields with the metadata dataset.
|
Parallel Inference Examples
Sequential Inference Example
The first example will show taking a pandas DataFrame with 25,000 rows, splitting those into 25,000 separate DataFrames with one row each, then submitting them sequentially. This simulates a situation where the data input sizes are so large they must be broken up for more efficient transmission and inferencing.
test_data = pd.read_json("./data/data_25k.df.json")
test_list = []
for index, row in test_data.head(1000).iterrows():
test_list.append(row.to_frame('text_input').reset_index(drop=True))
# show the first row as an example
test_list[0]
| text_input | |
|---|---|
| 0 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17] |
#
# Run the inference sequentially to establish a baseline
#
now = datetime.datetime.now()
results = []
for df in test_list:
results.append(aloha_pipeline.infer(tensor=df, timeout=10))
total_sequential = datetime.datetime.now() - now
print(f"Elapsed = {total_sequential.total_seconds()} : {len(results)}")
Elapsed = 37.917126 : 1000
Parallel Infer with DataFrame Example
The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.
We then compare it to the List of pandas DataFrames submitted sequentially.
timeout_secs=30
now = datetime.datetime.now()
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data.head(1000),
timeout=timeout_secs,
num_parallel=2*REPLICAS,
retries=3)
total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 7.691442 : 1000
parallel_results.head(20)
| time | in.text_input | out.banjori | out.corebot | out.cryptolocker | out.dircrypt | out.gozi | out.kraken | out.locky | out.main | out.matsnu | out.pykspa | out.qakbot | out.ramdo | out.ramnit | out.simda | out.suppobox | anomaly.count | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17] | [0.0015195842] | [0.98291475] | [0.012099553] | [4.7591206e-05] | [2.0289332e-05] | [0.00031977228] | [0.011029261] | [0.997564] | [0.010341614] | [0.008038961] | [0.016155045] | [0.0062362333] | [0.0009985747] | [1.7933435e-26] | [1.3889844e-27] | 0 |
| 1 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 20, 19, 27, 14, 17, 24, 13, 23, 20, 18, 35, 18, 22, 23] | [7.447168e-18] | [6.735899e-08] | [0.17081985] | [1.3220147e-09] | [1.2758705e-24] | [0.22559549] | [0.3420985] | [0.99999994] | [0.3080186] | [0.1828217] | [3.8022332e-11] | [0.2062254] | [0.15215823] | [1.1701893e-30] | [3.1513734e-38] | 0 |
| 2 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 33, 25, 36, 25, 31, 14, 32, 36, 25, 12, 35, 34, 30, 28, 27, 24, 29, 27] | [2.8598976e-21] | [9.301987e-08] | [0.04445297] | [6.163758e-09] | [8.3497386e-23] | [0.48234487] | [0.263329] | [1.0] | [0.29800338] | [0.22361773] | [1.5238979e-06] | [0.32820383] | [0.029332481] | [1.1995622e-31] | [0.0] | 0 |
| 3 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 23, 22, 15, 12, 35, 34, 36, 12, 18, 24, 34, 32, 36, 12, 14, 16, 27, 22, 23] | [2.1387213e-15] | [3.8817338e-10] | [0.04559972] | [1.9090405e-07] | [1.3140474e-25] | [0.5954264] | [0.17374131] | [0.9999997] | [0.23151566] | [0.1759168] | [1.0876193e-09] | [0.21832275] | [0.012869264] | [6.1588803e-28] | [1.4386127e-35] | 0 |
| 4 | 2024-03-08 15:32:26.280 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 13, 14, 12, 33, 16, 23, 15, 22, 30, 28, 26, 12, 16, 32, 37, 29, 22, 28, 22, 16, 27, 32] | [9.4533425e-15] | [7.091165e-10] | [0.04981512] | [5.2914135e-09] | [7.413152e-19] | [1.5504575e-13] | [1.0791892e-15] | [0.9999989] | [1.5003076e-15] | [0.3307571] | [2.625885e-07] | [0.50362796] | [0.020393759] | [0.0] | [2.329197e-38] | 0 |
| 5 | 2024-03-08 15:32:26.293 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 29, 20, 33, 13, 36, 35, 30, 21, 29, 17, 26, 19, 25, 36, 14, 23, 16, 18, 15, 21, 18, 28, 35, 19] | [1.7247285e-17] | [8.1354194e-08] | [0.013697129] | [5.6086392e-11] | [1.4032912e-17] | [0.4946911] | [0.11978862] | [0.99999994] | [0.19000016] | [0.10596694] | [5.524429e-06] | [0.24210057] | [0.0069435085] | [1.2804814e-34] | [9.482465e-35] | 0 |
| 6 | 2024-03-08 15:32:26.299 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 36, 14, 12, 23, 14, 13, 20, 20, 23, 27, 36, 29, 35, 19, 33, 22, 25, 26, 32, 21] | [5.5500585e-18] | [3.3608643e-07] | [0.023452938] | [1.1318812e-10] | [1.0496957e-22] | [0.23692927] | [0.064456925] | [0.99999183] | [0.07306592] | [0.06499427] | [1.4302767e-08] | [0.11925243] | [0.0011031023] | [1.5206224e-32] | [0.0] | 0 |
| 7 | 2024-03-08 15:32:26.301 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 28, 23, 20, 25, 21, 20, 16, 12, 33, 21, 14, 34, 34, 32, 19, 36, 17, 29, 26, 14, 29] | [3.9222568e-18] | [1.407435e-10] | [0.010946895] | [8.202812e-11] | [2.454965e-24] | [0.42107278] | [0.071240015] | [0.9982491] | [0.118182994] | [0.08340969] | [1.9207886e-09] | [0.16958171] | [0.0005199058] | [0.0] | [0.0] | 0 |
| 8 | 2024-03-08 15:32:26.302 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 27, 30, 33, 29, 37, 24, 33, 16, 20, 24] | [4.0574426e-11] | [1.087887e-09] | [0.17916852] | [1.7313088e-06] | [8.697261e-18] | [9.197122e-16] | [3.8521073e-17] | [0.9999977] | [3.2654394e-17] | [0.32568428] | [6.834302e-09] | [0.37007827] | [0.44918332] | [0.0] | [2.082403e-26] | 0 |
| 9 | 2024-03-08 15:32:26.311 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 29, 19, 35, 31, 15, 14, 21, 26, 31, 34, 27, 22] | [2.2576288e-09] | [2.0812616e-09] | [0.17788404] | [1.1887528e-08] | [1.078572e-11] | [0.041252796] | [0.21430437] | [0.9999988] | [0.17853741] | [0.13382334] | [0.000114089744] | [0.14033836] | [0.011299953] | [3.575825e-24] | [7.164664e-24] | 0 |
| 10 | 2024-03-08 15:32:26.324 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 25, 19, 26, 30, 19, 29, 37, 20, 24, 23, 22, 20, 20, 12, 35, 29, 26, 16, 35, 36, 32, 23, 19] | [7.892642e-12] | [3.0390834e-07] | [0.015696561] | [5.4462657e-13] | [1.2192533e-22] | [2.9611054e-17] | [2.630555e-20] | [0.9999961] | [6.9846006e-20] | [0.28895643] | [1.8219538e-10] | [0.5132747] | [0.03162864] | [0.0] | [6.496134e-32] | 0 |
| 11 | 2024-03-08 15:32:26.324 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 33, 34, 15, 23, 28, 34, 18, 33, 33] | [2.6560714e-16] | [5.9408145e-09] | [0.12814313] | [3.3345504e-08] | [2.2118923e-18] | [0.3078206] | [0.27681428] | [0.9999999] | [0.27904558] | [0.17737378] | [7.047457e-08] | [0.17205144] | [0.20136176] | [3.6787982e-29] | [4.919293e-33] | 0 |
| 12 | 2024-03-08 15:32:26.332 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 28, 30, 13, 26, 28, 30, 33, 25, 30, 31, 30, 26, 34, 19, 18, 23, 18, 15] | [1.9262531e-07] | [0.00011627592] | [0.015093419] | [6.0622415e-06] | [2.7445957e-08] | [0.1944085] | [0.11690311] | [0.9999991] | [0.17412043] | [0.06493864] | [0.49536943] | [0.08959357] | [0.005527823] | [2.4333167e-38] | [1.3592863e-25] | 0 |
| 13 | 2024-03-08 15:32:26.340 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 19, 17, 22, 25, 35, 29, 26, 15] | [1.8286044e-05] | [0.00021055655] | [0.012560264] | [1.669594e-12] | [1.2260803e-07] | [0.007982212] | [0.01670425] | [0.017594405] | [0.017098006] | [0.011611045] | [0.00011716164] | [0.009795011] | [0.010660369] | [3.187273e-35] | [6.004795e-27] | 0 |
| 14 | 2024-03-08 15:32:26.364 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 17, 34, 27, 18, 18, 30] | [3.6237112e-22] | [1.0416503e-05] | [0.3348774] | [2.1746243e-06] | [8.617319e-23] | [0.029006457] | [0.20757225] | [0.99999344] | [0.13615955] | [0.08263349] | [2.8077036e-09] | [0.056751817] | [0.100090414] | [1.0977599e-18] | [1.6076299e-32] | 0 |
| 15 | 2024-03-08 15:32:26.369 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 21, 19, 31, 31, 19, 36, 22, 12, 37, 18, 22, 31, 29] | [2.6339812e-11] | [3.014685e-10] | [0.04157271] | [2.9721992e-11] | [4.1457778e-19] | [2.8498805e-12] | [1.0917219e-13] | [0.99999815] | [1.5328618e-13] | [0.15687591] | [6.499695e-07] | [0.2797901] | [0.07243411] | [6.264585e-28] | [3.7361817e-33] | 0 |
| 16 | 2024-03-08 15:32:26.371 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 15, 26, 15, 14, 19, 32, 24, 33, 13, 20, 22, 32, 14, 25, 26, 35, 22, 12, 31, 23, 19, 31] | [2.3916345e-11] | [1.0221278e-06] | [0.0036410673] | [3.0198066e-10] | [6.5029376e-10] | [0.01702937] | [0.024708282] | [0.99999654] | [0.031047806] | [0.029724406] | [1.1598447e-05] | [0.053846903] | [6.46801e-05] | [1.9701536e-31] | [8.561327e-37] | 0 |
| 17 | 2024-03-08 15:32:26.370 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 31, 21, 21, 26, 33, 17, 16, 23, 28, 20, 19, 29, 25, 24, 30, 20, 35, 19, 36] | [5.989164e-14] | [4.957251e-05] | [0.014003561] | [6.2121716e-13] | [6.8363566e-18] | [0.15793473] | [0.0400572] | [0.9999906] | [0.057762165] | [0.036209285] | [1.1137857e-06] | [0.05882588] | [0.021252671] | [2.852255e-32] | [2.9058335e-35] | 0 |
| 18 | 2024-03-08 15:32:26.392 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 31, 14, 18, 29, 22, 19, 13, 23, 36, 16, 24, 17, 31, 35, 24, 26, 33, 37] | [7.04367e-15] | [3.8310918e-10] | [0.010476033] | [5.5391797e-13] | [4.2660885e-18] | [1.8002026e-13] | [3.1393156e-15] | [0.99999946] | [5.5198312e-15] | [0.14957657] | [3.9449355e-07] | [0.31189042] | [0.0042013763] | [0.0] | [3.35857e-34] | 0 |
| 19 | 2024-03-08 15:32:26.399 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 20, 20, 31, 30, 33, 32, 23, 24] | [8.1578895e-05] | [0.005662437] | [0.25973395] | [0.00036145633] | [2.2012752e-13] | [0.022834523] | [0.16723366] | [0.9992838] | [0.116028585] | [0.06689821] | [9.2623617e-07] | [0.03539415] | [0.22199522] | [1.975508e-20] | [9.651789e-15] | 0 |
Parallel Infer with Apache Arrow Table
The following example shows using wallaroo.pipeline.Pipeline.parallel_infer with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.
We then compare it to the List of pandas DataFrames submitted sequentially.
# convert the DataFrame to an Arrow Table
import pyarrow as pa
timeout_secs=30
now = datetime.datetime.now()
test_data_arrow_table = pa.Table.from_pandas(test_data.head(1000))
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data_arrow_table,
timeout=timeout_secs,
num_parallel=2*REPLICAS,
retries=3)
total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 3.364765 : 1000
We can quickly see the speed benefits of using parallel infer requests over sequential requests, and even more speed when going from a pandas DataFrame to an Apache Arrow table.