Inference Results
How to retrieve inference logs.
Deployed ML models provide endpoints for performing inference requests. The results of these requests are available to through the Wallaroo pipeline logs.
The following guides detail how to perform inference requests through a Wallaroo deployed ML model and retrieve the logs of those inference requests.
Wallaroo deployed ML models process inference requests via the following methods.
Inferences are performed on deployed pipelines. This submits data to the pipeline, where it is processed through each of the pipeline’s steps with the output of the previous step providing the input for the next step. The final step will then output the result of all of the pipeline’s steps.
Apache Arrow is the recommended method of data inputs for inferences. Wallaroo inference data is based on Apache Arrow, which will return the fastest inference results and smaller data transfer amounts on average than JSON or DataFrame tables. Arrow tables also specify the data types used in their schema, insuring that the data sent and receives are exactly what is required. Using pandas DataFrame requires inferring the data type which may lead to data type mismatch issues.
For a complete example of using the Wallaroo SDK for inferencing, see the Wallaroo SDK Inference Tutorial.
The method wallaroo.pipeline.Pipeline.infer(data, timeout, dataset, dataset_exclude, dataset_separator)
method performs an inference as defined by the pipeline steps.
wallaroo.pipeline.Pipeline.infer
takes the following parameters:
Parameter | Type | Description |
---|---|---|
tensor | pandas.DataFrame or Apache Arrow (Preferred) (REQUIRED) | The data submitted to the pipeline for inference. The following data inputs are supported:
|
timeout | (Integer OR Float) (OPTIONAL) | A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds. |
dataset | List[String] (OPTIONAL) | The datasets to be returned. The datasets available are:
|
dataset_exclude | List[String] (OPTIONAL) | Allows users to exclude parts of the dataset. |
dataset_separator | List[String] (OPTIONAL) | Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened. |
tensor
input is a pandas DataFrame, returns a pandas DataFrame.tensor
input is an Apache Arrow Table, returns an Apache Arrow table.The following datasets are returned by default, modified by the dataset
and dataset_exclude
parameters.
Field | Description |
---|---|
time | The DateTime of the inference request. |
in | All inputs listed as in.{variable_name} |
out | All outputs listed as out.{variable_name} . |
anomaly | Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True . For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset. |
The inference result field metadata.elapsed
format changes depending on the input type.
metadata.elapsed
is returned as an int.metadata.elapsed
is returned as pyarrow.Int64
.The following code is provided to convert metadata.elapsed
into the same format for consistency.
import pyarrow as pa
if input_type == "json":
parse_elapsed += results["metadata.elapsed"][0][0]
inference_elapsed += results["metadata.elapsed"][0][1]
elif input_type == "arrow":
parse_elapsed += results["metadata.elapsed"][0][0].as_py()
inference_elapsed += results["metadata.elapsed"][0][1].as_py()
else:
assert False, f"Bad input type {input_type}"
The following demonstrates the differences between the metadata.elasped
field from a DataFrame based inference request vs an Apache Arrow table inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")
# unconverted raw data from Arrow table inference
# time to parse input data
display(result_arrow["metadata.elapsed"][0][0])
# time to inference from parsed input data
display(result_arrow["metadata.elapsed"][0][1])
<pyarrow.UInt64Scalar: 1253219>
<pyarrow.UInt64Scalar: 1275320>
parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')
'Time to parse input data: 1253219'
display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')
'Time to inference from parsed data: 1253219'
# no conversion needed for pandas DataFrame
result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])
result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])
display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")
display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")
'Time to parse input data: 51879691'
'Time to inference from parsed data: 2310435'
The following example is an inference request using an Apache Arrow table. The inference result is returned as an Apache Arrow table, which is then converted into a Pandas DataFrame and a Polars DataFrame, with the results filtered based on results greater than 0.75.
result = ccfraud_pipeline.infer(ccfraud_input_1k_arrow_table)
display(result)
pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
child 0, item: float
out.dense_1: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: int8
----
time: [[2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,...,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562]]
in.tensor: [[[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],...,[0.49511018,-0.24993694,0.4553345,0.92427504,-0.36435103,...,1.1117147,-0.566654,0.12122019,0.06676402,0.6583282],[0.61188054,0.1726081,0.43105456,0.50321484,-0.27466634,...,0.30260187,0.081211455,-0.15578508,0.017189292,-0.7236631]]]
out.dense_1: [[[0.99300325],[0.99300325],...,[0.0008533001],[0.0012498498]]]
anomaly.count: [[0,0,0,0,0,...,0,0,0,0,0]]
import pyarrow as pa
list = [0.75]
outputs = result.to_pandas()
# display(outputs)
filter = [elt[0] > 0.75 for elt in outputs['out.dense_1']]
outputs = outputs.loc[filter]
display(outputs)
  | time | in.tensor | out.dense_1 | anomaly.count |
---|---|---|---|---|
0 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
1 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
2 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
3 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
161 | 2023-03-20 18:55:09.562 | [-9.716793, 9.174981, -14.450761, 8.653825, -11.039951, 0.6602411, -22.825525, -9.919395, -8.064324, -16.737926, 4.852197, -12.563343, -1.0762653, -7.524591, -3.2938414, -9.62102, -15.6501045, -7.089741, 1.7687134, 5.044906, -11.365625, 4.5987034, 4.4777045, 0.31702697, -2.2731977, 0.07944675, -10.052058, -2.024108, -1.0611985] | [1.0] | 0 |
941 | 2023-03-20 18:55:09.562 | [-0.50492376, 1.9348029, -3.4217603, 2.2165704, -0.6545315, -1.9004827, -1.6786858, 0.5380051, -2.7229102, -5.265194, 3.504164, -5.4661765, 0.68954825, -8.725291, 2.0267954, -5.4717045, -4.9123807, -1.6131229, 3.8021576, 1.3881834, 1.0676425, 0.28200775, -0.30759808, -0.48498034, 0.9507336, 1.5118006, 1.6385275, 1.072455, 0.7959132] | [0.9873102] | 0 |
import polars as pl
outputs = pl.from_arrow(result)
display(outputs.filter(pl.col("out.dense_1").apply(lambda x: x[0]) > 0.75))
time | in.tensor | out.dense_1 | anomaly.count |
---|---|---|---|
datetime[ms] | list[f32] | list[f32] | i8 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-9.716793, 9.174981, … -1.061198] | [1.0] | 0 |
2023-03-20 18:55:09.562 | [-0.504924, 1.934803, … 0.795913] | [0.98731] | 0 |
The following restrictions are in place when requesting the datasets metadata
or metadata.elapsed
.
For the following Pipeline steps, metadata
or metadata.elapsed
must be requested with the *
parameter. For example:
result = mainpipeline.infer(normal_input, dataset=["*", "metadata.elapsed"])
Effected pipeline steps:
add_model_step
replace_with_model_step
For the following Pipeline steps, meta
or metadata.elapsed
can not be included with the *
parameter. For example:
result = mainpipeline.infer(normal_input, dataset=["metadata.elapsed"])
Effected pipeline steps:
add_random_split
replace_with_random_split
add_shadow_deploy
replace_with_shadow_deploy
Numpy arrays can be submitted as an input by containing it within a DataFrame. In this example, the input column is tensor
, but can whatever the model expects.
dataframedata = pd.DataFrame({"tensor":[npArray]})
This bypasses the need to convert the npArray to a List - the object itself can be embedded into the DataFrame table and submitted. For this example, a DataFrame with the column tensor
that contains a numpy array will be submitted as an inference, and from the return only the column out.2519
will be displayed.
infResults = pipeline.infer(dataframedata, dataset=["*", "metadata.elapsed"])
display(infResults.loc[0]["out.2519"])
[44,
44,
44,
44,
82,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
84,
84,
44,
84,
44,
44,
44,
61,
44,
86,
44,
44]
To submit a data file directly to a pipeline, use the method wallaroo.pipeline.Pipeline.infer_from_file(data, timeout, dataset, dataset_exclude, dataset_separator)
method.
infer_from_file
takes the following arguments.
Parameter | Type | Description |
---|---|---|
filename | String* (REQUIRED) | The data submitted to the pipeline for inference as either a pandas Record in JSON format, or Apache Arrow Table. The following data inputs are supported:
|
timeout | (Integer or Float) (OPTIONAL) | A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds. |
dataset | List[String] (OPTIONAL) | The datasets to be returned. The datasets available are:
|
dataset_exclude | List[String] (OPTIONAL) | Allows users to exclude parts of the dataset. |
dataset_separator | List[String] (OPTIONAL) | Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened. |
tensor
input is a pandas DataFrame, returns a pandas DataFrame.tensor
input is an Apache Arrow Table, returns an Apache Arrow table.The following datasets are returned by default, modified by the dataset
and dataset_exclude
parameters.
Field | Description |
---|---|
time | The DateTime of the inference request. |
in | All inputs listed as in.{variable_name} |
out | All outputs listed as out.{variable_name} . |
anomaly | Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True . For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset. |
The inference result field metadata.elapsed
format changes depending on the input type.
metadata.elapsed
is returned as an int.metadata.elapsed
is returned as pyarrow.Int64
.The following code is provided to convert metadata.elapsed
into the same format for consistency.
import pyarrow as pa
if input_type == "json":
parse_elapsed += results["metadata.elapsed"][0][0]
inference_elapsed += results["metadata.elapsed"][0][1]
elif input_type == "arrow":
parse_elapsed += results["metadata.elapsed"][0][0].as_py()
inference_elapsed += results["metadata.elapsed"][0][1].as_py()
else:
assert False, f"Bad input type {input_type}"
The following demonstrates the differences between the metadata.elasped
field from a DataFrame based inference request vs an Apache Arrow table inference request.
result_arrow = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.arrow', dataset="metadata")
# unconverted raw data from Arrow table inference
# time to parse input data
display(result_arrow["metadata.elapsed"][0][0])
# time to inference from parsed input data
display(result_arrow["metadata.elapsed"][0][1])
<pyarrow.UInt64Scalar: 1253219>
<pyarrow.UInt64Scalar: 1275320>
parse_elapsed = result_arrow["metadata.elapsed"][0][0].as_py()
display(f'Time to parse input data: {parse_elapsed}')
'Time to parse input data: 1253219'
display(f'Time to inference from parsed data: {result_arrow["metadata.elapsed"][0][0]}')
'Time to inference from parsed data: 1253219'
# no conversion needed for pandas DataFrame
result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])
result_dataframe = ccfraud_pipeline.infer_from_file('./data/cc_data_10k.df.json', dataset=["*", "metadata"])
display(f"Time to parse input data: {result_dataframe['metadata.elapsed'][0][0]}")
display(f"Time to inference from parsed data: {result_dataframe['metadata.elapsed'][0][1]}")
'Time to parse input data: 51879691'
'Time to inference from parsed data: 2310435'
In this example, an inference of 50K inferences as an Apache Arrow file will be submitted to a model trained for reviewing IMDB reviews, and the first 5 results displayed.
results = imdb_pipeline.infer_from_file('./data/test_data_50K.arrow')
import polars as pl
outputs = pl.from_arrow(results)
display(outputs.head(5))
shape: (5, 4)
time | in.tensor | out.dense_1 | anomaly.count |
---|---|---|---|
datetime[ms] | list[f32] | list[f32] | i8 |
2023-03-20 20:53:50.170 | [11.0, 6.0, … 0.0] | [0.898019] | 0 |
2023-03-20 20:53:50.170 | [54.0, 548.0, … 20.0] | [0.056597] | 0 |
2023-03-20 20:53:50.170 | [1.0, 9259.0, … 1.0] | [0.92608] | 0 |
2023-03-20 20:53:50.170 | [10.0, 25.0, … 0.0] | [0.926919] | 0 |
2023-03-20 20:53:50.170 | [10.0, 37.0, … 0.0] | [0.661858] | 0 |
In this example, an inference will be submitted to the ccfraud_pipeline
with the file smoke_test.df.json
, a DataFrame formatted JSON file.
result = ccfraud_pipeline.infer_from_file('./data/smoke_test.df.json')
 | time | in.tensor | out.dense_1 | anomaly.count |
---|---|---|---|---|
0 | 2023-02-15 23:07:07.497 | [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756] | [0.0014974177] | 0 |
The method async wallaroo.pipeline.Pipeline.async_infer
provides asynchronous inference requests to a deployed model. This allows organizations to submit inference requests, proceed with other tasks, then respond when the inference request is complete.
async_infer
takes the following parameters.
Parameter | Type | Description |
---|---|---|
tensor | pandas.core.frame.DataFrame, pyarrow.lib.Table (Required) | Accepts either a pandas DataFrame or an Apache Arrow Table. |
async_client | httpx.AsyncClient (Required) | AsyncClient to use for async inference. |
timeout | Integer, Float, None (Optional Default: 15) | The timeout period for an inference request in seconds. |
retries | Integer, None (Optional) | The number of retries from connection errors. |
dataset | List[String], None (Optional) | The datasets to return. See DataFrame and Arrow for details on Wallaroo inference datasets. |
dataset_exclude | List[String], None (Optional) | The datasets to exclude. See DataFrame and Arrow for details on Wallaroo inference datasets. |
dataset_separator | String (Optional) | Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened. |
async_infer
returns:
tensor
input is a pandas DataFrame, returns a pandas DataFrame.tensor
input is an Apache Arrow Table, returns an Apache Arrow table.The following datasets are returned by default, modified by the dataset
and dataset_exclude
parameters.
Field | Description |
---|---|
time | The DateTime of the inference request. |
in | All inputs listed as in.{variable_name} |
out | All outputs listed as out.{variable_name} . |
anomaly | Flags whether an Anomaly was detected. Anomalies are detected from each pipeline validation returned True . For full details, see Wallaroo SDK Essentials Guide: Anomaly Detection. The following fields are included in this dataset. |
The following example demonstrates using async_infer
to perform an inference request with other tasks executed between the inference request and receiving the result.
For this example:
run_one_batch
reads an Apache Arrow Table and submits it through async_infer
.run_one_batch
three times.async_infer
are gathered together and published as one set.async def run_one_batch(file):
with pa.ipc.open_file(file) as source:
table = source.read_all() # to get pyarrow table
# ... potentially do other processing here or request data from a feature store ...
result_async_batch = await aloha_pipeline.async_infer(tensor=table,
async_client=async_client,
retries=2)
return result_async_batch
# Process three batches in parallel. Inference on the first will start while
# the second file is read, and so on. Multiple inferences may be running at
# the same time if the engine replicas are > 1.
tasks = []
for file in ["./data/data_25k.arrow", "./data/data_1k.arrow", "./data/data_25k.arrow"]:
tasks.append(run_one_batch(file))
# Wait for all three batches to finish.
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
print(f"Number of rows for task: {result.num_rows}")
display(result)
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,...,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406,2024-09-30 17:53:07.406]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...
Number of rows for task: 1000
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,...,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384,2024-09-30 17:53:07.384]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,29,12,36,31,12],...,[0,0,0,0,0,...,35,16,35,27,16],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[0.00002837503],...,[0.0000056315566],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[0.000012753118],...,[0.0000033642746],[1.1029468e-9]]]
out.cryptolocker: [[[0.012099549],[0.025435215],...,[0.13612255],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[6.150966e-10],...,[5.6732154e-11],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[2.321774e-10],...,[2.773063e-8],[8.43847e-15]]]
out.kraken: [[[0.00031977257],[0.051351104],...,[0.0025221605],[0.30495816]]]
out.locky: [[[0.011029262],[0.022038758],...,[0.05455697],[0.116279885]]]
out.main: [[[0.997564],[0.9885122],...,[0.9998954],[0.99999803]]]
...
Number of rows for task: 24954
pyarrow.Table
time: timestamp[ms]
in.text_input: list<item: float>
child 0, item: float
out.banjori: list<inner: float not null> not null
child 0, inner: float not null
out.corebot: list<inner: float not null> not null
child 0, inner: float not null
out.cryptolocker: list<inner: float not null> not null
child 0, inner: float not null
out.dircrypt: list<inner: float not null> not null
child 0, inner: float not null
out.gozi: list<inner: float not null> not null
child 0, inner: float not null
out.kraken: list<inner: float not null> not null
child 0, inner: float not null
out.locky: list<inner: float not null> not null
child 0, inner: float not null
out.main: list<inner: float not null> not null
child 0, inner: float not null
out.matsnu: list<inner: float not null> not null
child 0, inner: float not null
out.pykspa: list<inner: float not null> not null
child 0, inner: float not null
out.qakbot: list<inner: float not null> not null
child 0, inner: float not null
out.ramdo: list<inner: float not null> not null
child 0, inner: float not null
out.ramnit: list<inner: float not null> not null
child 0, inner: float not null
out.simda: list<inner: float not null> not null
child 0, inner: float not null
out.suppobox: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,...,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395,2024-09-30 17:53:07.395]]
in.text_input: [[[0,0,0,0,0,...,32,30,19,26,17],[0,0,0,0,0,...,18,35,18,22,23],...,[0,0,0,0,0,...,27,28,19,33,23],[0,0,0,0,0,...,24,29,14,36,13]]]
out.banjori: [[[0.0015195814],[7.447168e-18],...,[1.342218e-14],[1.3068625e-12]]]
out.corebot: [[[0.98291475],[6.7359245e-8],...,[7.969789e-9],[1.1029467e-9]]]
out.cryptolocker: [[[0.012099549],[0.17081994],...,[0.14756858],[0.014839977]]]
out.dircrypt: [[[0.000047591115],[1.3220122e-9],...,[4.813928e-9],[2.2757316e-8]]]
out.gozi: [[[0.000020289312],[1.2758657e-24],...,[1.0541016e-22],[8.438438e-15]]]
out.kraken: [[[0.00031977257],[0.22559547],...,[0.2969691],[0.30495816]]]
out.locky: [[[0.011029262],[0.3420985],...,[0.2706808],[0.11627986]]]
out.main: [[[0.997564],[0.99999994],...,[1],[0.99999803]]]
...
The await wallaroo.pipeline.Pipeline.parallel_infer(tensor, timeout, num_parallel, retries)
asynchronous method performs an inference as defined by the pipeline steps.
Parameter | Type | Description |
---|---|---|
tensor | pandas.DataFrame OR pyarrow.Table (Required) | The data submitted to the pipeline for inference a pandas.DataFrame or Apache Arrow pyarrow.Table |
timeout | Integer (Optional) | A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds. |
num_parallel | Integer (Optional) | The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas. |
retries | Integer (Optional) | The number of retries per inference request submitted. |
dataset | List(String) (Optional) | The dataset of the inference result to return. By default this is set to ["*"] which returns [“time”, “in”, “out”, “anomaly”]. Other available datasets - [“metadata”]. IMPORTANT NOTE: 'time' must be included in the returned datasets for the Wallaroo SDK, or the SDK will return an error. n. For example: dataset=['time', 'out'] returned the datasets time and in , and no others. |
dataset_exclude | List(String) (Optional) | The datasets to exclude in the inference results returned values. For example: dataset=['*'], dataset_exclude['in'] would exclude the in dataset from being returned, but include the default datasets of [’time’, ‘out’, ‘anomaly’]. IMPORTANT NOTE: 'time' can not be excluded for the Wallaroo SDK, or the SDK will return an error. |
wallaroo.pipeline.Pipeline.parallel_infer
returns one of the following based on the tensor
input parameter.
The following fields are returned based on the dataset
and dataset_exclude
parameters.
Parameter | Type | Description |
---|---|---|
time | DateTime | The DateTime of the returned inference result. |
in | Any | The input parameters. Each in dataset field correlates to the input field from the inference request. For example, the inputs ['year_built', 'last_renovated', 'number_of_bedrooms'] generates the in dataset `[‘in.year_built’, ‘in.last_renovated’, ‘in.number_of_bedrooms’] |
out | Any | The output parameters. Each out dataset field correlates to the final output of the pipeline. For example, if the final output is ['house_price', 'recommended_starting_bid'] , the output fields would be ['out.house_price', 'out.recommended_starting_bid] . |
anomaly | Any | The detected anomalies based on the validations added to the pipeline. The field anomaly.count is always included, which displays a count of all validations that returned True , which indicates a detected anomaly. Other anomalies are listed by anomaly.{validation_name} . For more information, see Wallaroo Anomaly Detection. |
meta | Dict | Includes the following fields with the metadata dataset.
|
The first example will show taking a pandas DataFrame with 25,000 rows, splitting those into 25,000 separate DataFrames with one row each, then submitting them sequentially. This simulates a situation where the data input sizes are so large they must be broken up for more efficient transmission and inferencing.
test_data = pd.read_json("./data/data_25k.df.json")
test_list = []
for index, row in test_data.head(1000).iterrows():
test_list.append(row.to_frame('text_input').reset_index(drop=True))
# show the first row as an example
test_list[0]
text_input | |
---|---|
0 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17] |
#
# Run the inference sequentially to establish a baseline
#
now = datetime.datetime.now()
results = []
for df in test_list:
results.append(aloha_pipeline.infer(tensor=df, timeout=10))
total_sequential = datetime.datetime.now() - now
print(f"Elapsed = {total_sequential.total_seconds()} : {len(results)}")
Elapsed = 37.917126 : 1000
The following example shows using wallaroo.pipeline.Pipeline.parallel_infer
with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.
We then compare it to the List of pandas DataFrames submitted sequentially.
timeout_secs=30
now = datetime.datetime.now()
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data.head(1000),
timeout=timeout_secs,
num_parallel=2*REPLICAS,
retries=3)
total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 7.691442 : 1000
parallel_results.head(20)
time | in.text_input | out.banjori | out.corebot | out.cryptolocker | out.dircrypt | out.gozi | out.kraken | out.locky | out.main | out.matsnu | out.pykspa | out.qakbot | out.ramdo | out.ramnit | out.simda | out.suppobox | anomaly.count | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 16, 32, 23, 29, 32, 30, 19, 26, 17] | [0.0015195842] | [0.98291475] | [0.012099553] | [4.7591206e-05] | [2.0289332e-05] | [0.00031977228] | [0.011029261] | [0.997564] | [0.010341614] | [0.008038961] | [0.016155045] | [0.0062362333] | [0.0009985747] | [1.7933435e-26] | [1.3889844e-27] | 0 |
1 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 30, 20, 19, 27, 14, 17, 24, 13, 23, 20, 18, 35, 18, 22, 23] | [7.447168e-18] | [6.735899e-08] | [0.17081985] | [1.3220147e-09] | [1.2758705e-24] | [0.22559549] | [0.3420985] | [0.99999994] | [0.3080186] | [0.1828217] | [3.8022332e-11] | [0.2062254] | [0.15215823] | [1.1701893e-30] | [3.1513734e-38] | 0 |
2 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 33, 25, 36, 25, 31, 14, 32, 36, 25, 12, 35, 34, 30, 28, 27, 24, 29, 27] | [2.8598976e-21] | [9.301987e-08] | [0.04445297] | [6.163758e-09] | [8.3497386e-23] | [0.48234487] | [0.263329] | [1.0] | [0.29800338] | [0.22361773] | [1.5238979e-06] | [0.32820383] | [0.029332481] | [1.1995622e-31] | [0.0] | 0 |
3 | 2024-03-08 15:32:26.253 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 23, 22, 15, 12, 35, 34, 36, 12, 18, 24, 34, 32, 36, 12, 14, 16, 27, 22, 23] | [2.1387213e-15] | [3.8817338e-10] | [0.04559972] | [1.9090405e-07] | [1.3140474e-25] | [0.5954264] | [0.17374131] | [0.9999997] | [0.23151566] | [0.1759168] | [1.0876193e-09] | [0.21832275] | [0.012869264] | [6.1588803e-28] | [1.4386127e-35] | 0 |
4 | 2024-03-08 15:32:26.280 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 32, 13, 14, 12, 33, 16, 23, 15, 22, 30, 28, 26, 12, 16, 32, 37, 29, 22, 28, 22, 16, 27, 32] | [9.4533425e-15] | [7.091165e-10] | [0.04981512] | [5.2914135e-09] | [7.413152e-19] | [1.5504575e-13] | [1.0791892e-15] | [0.9999989] | [1.5003076e-15] | [0.3307571] | [2.625885e-07] | [0.50362796] | [0.020393759] | [0.0] | [2.329197e-38] | 0 |
5 | 2024-03-08 15:32:26.293 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 17, 29, 20, 33, 13, 36, 35, 30, 21, 29, 17, 26, 19, 25, 36, 14, 23, 16, 18, 15, 21, 18, 28, 35, 19] | [1.7247285e-17] | [8.1354194e-08] | [0.013697129] | [5.6086392e-11] | [1.4032912e-17] | [0.4946911] | [0.11978862] | [0.99999994] | [0.19000016] | [0.10596694] | [5.524429e-06] | [0.24210057] | [0.0069435085] | [1.2804814e-34] | [9.482465e-35] | 0 |
6 | 2024-03-08 15:32:26.299 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 36, 14, 12, 23, 14, 13, 20, 20, 23, 27, 36, 29, 35, 19, 33, 22, 25, 26, 32, 21] | [5.5500585e-18] | [3.3608643e-07] | [0.023452938] | [1.1318812e-10] | [1.0496957e-22] | [0.23692927] | [0.064456925] | [0.99999183] | [0.07306592] | [0.06499427] | [1.4302767e-08] | [0.11925243] | [0.0011031023] | [1.5206224e-32] | [0.0] | 0 |
7 | 2024-03-08 15:32:26.301 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 28, 23, 20, 25, 21, 20, 16, 12, 33, 21, 14, 34, 34, 32, 19, 36, 17, 29, 26, 14, 29] | [3.9222568e-18] | [1.407435e-10] | [0.010946895] | [8.202812e-11] | [2.454965e-24] | [0.42107278] | [0.071240015] | [0.9982491] | [0.118182994] | [0.08340969] | [1.9207886e-09] | [0.16958171] | [0.0005199058] | [0.0] | [0.0] | 0 |
8 | 2024-03-08 15:32:26.302 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 27, 30, 33, 29, 37, 24, 33, 16, 20, 24] | [4.0574426e-11] | [1.087887e-09] | [0.17916852] | [1.7313088e-06] | [8.697261e-18] | [9.197122e-16] | [3.8521073e-17] | [0.9999977] | [3.2654394e-17] | [0.32568428] | [6.834302e-09] | [0.37007827] | [0.44918332] | [0.0] | [2.082403e-26] | 0 |
9 | 2024-03-08 15:32:26.311 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 29, 19, 35, 31, 15, 14, 21, 26, 31, 34, 27, 22] | [2.2576288e-09] | [2.0812616e-09] | [0.17788404] | [1.1887528e-08] | [1.078572e-11] | [0.041252796] | [0.21430437] | [0.9999988] | [0.17853741] | [0.13382334] | [0.000114089744] | [0.14033836] | [0.011299953] | [3.575825e-24] | [7.164664e-24] | 0 |
10 | 2024-03-08 15:32:26.324 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 25, 19, 26, 30, 19, 29, 37, 20, 24, 23, 22, 20, 20, 12, 35, 29, 26, 16, 35, 36, 32, 23, 19] | [7.892642e-12] | [3.0390834e-07] | [0.015696561] | [5.4462657e-13] | [1.2192533e-22] | [2.9611054e-17] | [2.630555e-20] | [0.9999961] | [6.9846006e-20] | [0.28895643] | [1.8219538e-10] | [0.5132747] | [0.03162864] | [0.0] | [6.496134e-32] | 0 |
11 | 2024-03-08 15:32:26.324 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 35, 33, 34, 15, 23, 28, 34, 18, 33, 33] | [2.6560714e-16] | [5.9408145e-09] | [0.12814313] | [3.3345504e-08] | [2.2118923e-18] | [0.3078206] | [0.27681428] | [0.9999999] | [0.27904558] | [0.17737378] | [7.047457e-08] | [0.17205144] | [0.20136176] | [3.6787982e-29] | [4.919293e-33] | 0 |
12 | 2024-03-08 15:32:26.332 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 28, 30, 13, 26, 28, 30, 33, 25, 30, 31, 30, 26, 34, 19, 18, 23, 18, 15] | [1.9262531e-07] | [0.00011627592] | [0.015093419] | [6.0622415e-06] | [2.7445957e-08] | [0.1944085] | [0.11690311] | [0.9999991] | [0.17412043] | [0.06493864] | [0.49536943] | [0.08959357] | [0.005527823] | [2.4333167e-38] | [1.3592863e-25] | 0 |
13 | 2024-03-08 15:32:26.340 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 19, 17, 22, 25, 35, 29, 26, 15] | [1.8286044e-05] | [0.00021055655] | [0.012560264] | [1.669594e-12] | [1.2260803e-07] | [0.007982212] | [0.01670425] | [0.017594405] | [0.017098006] | [0.011611045] | [0.00011716164] | [0.009795011] | [0.010660369] | [3.187273e-35] | [6.004795e-27] | 0 |
14 | 2024-03-08 15:32:26.364 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 36, 22, 17, 34, 27, 18, 18, 30] | [3.6237112e-22] | [1.0416503e-05] | [0.3348774] | [2.1746243e-06] | [8.617319e-23] | [0.029006457] | [0.20757225] | [0.99999344] | [0.13615955] | [0.08263349] | [2.8077036e-09] | [0.056751817] | [0.100090414] | [1.0977599e-18] | [1.6076299e-32] | 0 |
15 | 2024-03-08 15:32:26.369 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 21, 19, 31, 31, 19, 36, 22, 12, 37, 18, 22, 31, 29] | [2.6339812e-11] | [3.014685e-10] | [0.04157271] | [2.9721992e-11] | [4.1457778e-19] | [2.8498805e-12] | [1.0917219e-13] | [0.99999815] | [1.5328618e-13] | [0.15687591] | [6.499695e-07] | [0.2797901] | [0.07243411] | [6.264585e-28] | [3.7361817e-33] | 0 |
16 | 2024-03-08 15:32:26.371 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 15, 26, 15, 14, 19, 32, 24, 33, 13, 20, 22, 32, 14, 25, 26, 35, 22, 12, 31, 23, 19, 31] | [2.3916345e-11] | [1.0221278e-06] | [0.0036410673] | [3.0198066e-10] | [6.5029376e-10] | [0.01702937] | [0.024708282] | [0.99999654] | [0.031047806] | [0.029724406] | [1.1598447e-05] | [0.053846903] | [6.46801e-05] | [1.9701536e-31] | [8.561327e-37] | 0 |
17 | 2024-03-08 15:32:26.370 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 25, 31, 21, 21, 26, 33, 17, 16, 23, 28, 20, 19, 29, 25, 24, 30, 20, 35, 19, 36] | [5.989164e-14] | [4.957251e-05] | [0.014003561] | [6.2121716e-13] | [6.8363566e-18] | [0.15793473] | [0.0400572] | [0.9999906] | [0.057762165] | [0.036209285] | [1.1137857e-06] | [0.05882588] | [0.021252671] | [2.852255e-32] | [2.9058335e-35] | 0 |
18 | 2024-03-08 15:32:26.392 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 23, 31, 14, 18, 29, 22, 19, 13, 23, 36, 16, 24, 17, 31, 35, 24, 26, 33, 37] | [7.04367e-15] | [3.8310918e-10] | [0.010476033] | [5.5391797e-13] | [4.2660885e-18] | [1.8002026e-13] | [3.1393156e-15] | [0.99999946] | [5.5198312e-15] | [0.14957657] | [3.9449355e-07] | [0.31189042] | [0.0042013763] | [0.0] | [3.35857e-34] | 0 |
19 | 2024-03-08 15:32:26.399 | [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 20, 20, 31, 30, 33, 32, 23, 24] | [8.1578895e-05] | [0.005662437] | [0.25973395] | [0.00036145633] | [2.2012752e-13] | [0.022834523] | [0.16723366] | [0.9992838] | [0.116028585] | [0.06689821] | [9.2623617e-07] | [0.03539415] | [0.22199522] | [1.975508e-20] | [9.651789e-15] | 0 |
The following example shows using wallaroo.pipeline.Pipeline.parallel_infer
with a pandas DataFrames. It is automatically split for parallel inferences by the Wallaroo SDK.
We then compare it to the List of pandas DataFrames submitted sequentially.
# convert the DataFrame to an Arrow Table
import pyarrow as pa
timeout_secs=30
now = datetime.datetime.now()
test_data_arrow_table = pa.Table.from_pandas(test_data.head(1000))
parallel_results = await aloha_pipeline.parallel_infer(tensor=test_data_arrow_table,
timeout=timeout_secs,
num_parallel=2*REPLICAS,
retries=3)
total_parallel = datetime.datetime.now() - now
print(f"Elapsed_in_parallel = {total_parallel.total_seconds()} : {len(parallel_results)}")
Elapsed_in_parallel = 3.364765 : 1000
We can quickly see the speed benefits of using parallel infer requests over sequential requests, and even more speed when going from a pandas DataFrame to an Apache Arrow table.
Output formats are based on the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects.
The default columns returned are:
out.{model_output}
. This model only has one output - dense_1
, which is listed in the out.dense_1
column. If the model has multiple outputs, they would be listed as out.{outputname1}
, out.{outputname2}
, etc.anomaly.count
: Any anomalies detected from validations.anomaly.{validation}
: The validation that triggered the anomaly detection and whether it is True
(indicating an anomaly was detected) or False
. For more details, see Wallaroo SDK Essentials Guide: Anomaly DetectionColumns returned are controlled by the dataset_exclude
array parameter, which specifies which output columns to ignore. For example, if a model outputs the columns out.rambo
, out.main
, out.glibnar
, using the parameter dataset_exclude=["out.rambo", "out.glibnar"]
will exclude those columns from the output.
There are two methods of retrieving the JWT token used to authenticate to the Wallaroo instance’s API service:
This tutorial will use the Wallaroo SDK method for convenience with environmental variables for a seamless login without browser validation. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.
All Wallaroo API endpoints follow the format:
https://$WALLAROO_DOMAIN/v1/api$COMMAND
Where $COMMAND
is the specific endpoint. For example, for the command to list of workspaces in the Wallaroo instance would use the above format based on these settings:
$WALLAROO_DOMAIN
: wallaroo.example.com
$COMMAND
: /workspaces/list
This would create the following API endpoint:
https://wallaroo.example.com/v1/api/workspaces/list
For this example, a connection to the Wallaroo SDK is used. This will be used to retrieve the JWT token for the MLOps API calls.
This example will store the user’s credentials into the file ./creds.json
which contains the following:
{
"username": "{Connecting User's Username}",
"password": "{Connecting User's Password}",
"email": "{Connecting User's Email Address}"
}
Replace the username
, password
, and email
fields with the user account connecting to the Wallaroo instance. This allows a seamless connection to the Wallaroo instance and bypasses the standard browser based confirmation link. For more information, see the Wallaroo SDK Essentials Guide: Client Connection.
Update wallarooDomain
variable to match the name of the Wallaroo Domain.
import wallaroo
from wallaroo.object import EntityNotFoundError
import pandas as pd
import os
import base64
import pyarrow as pa
import requests
from requests.auth import HTTPBasicAuth
# Used to create unique workspace and pipeline names
import string
import random
import json
# used to display dataframe information without truncating
from IPython.display import display
pd.set_option('display.max_colwidth', None)
# Retrieve the login credentials.
os.environ["WALLAROO_SDK_CREDENTIALS"] = './creds.json.example'
# wl = wallaroo.Client(auth_type="user_password")
# Client connection from local Wallaroo instance
wallarooDomain = 'wallaroo.example.com'
wl = wallaroo.Client(api_endpoint=f"https://{wallarooDOmain}",
auth_type="user_password")
wallarooDomain = "wallaroo.example.com"
APIURL=f"https://{wallarooDomain}/v1/api"
As mentioned earlier, there are multiple methods of authenticating to the Wallaroo instance for MLOps API calls. This tutorial will use the Wallaroo SDK method Wallaroo Client wl.auth.auth_header()
method, extracting the token from the response.
Reference: MLOps API Retrieve Token Through Wallaroo SDK
# Retrieve the token
headers = wl.auth.auth_header()
display(headers)
{'Authorization': 'Bearer exampleabcdefg'}
The API command /admin/get_pipeline_external_url
retrieves the external inference URL for a specific pipeline in a workspace.
In this example, a list of the workspaces will be retrieved. Based on the setup from the Internal Pipeline Deployment URL Tutorial, the workspace matching urlworkspace
will have it’s workspace id stored and used for the /admin/get_pipeline_external_url
request with the pipeline urlpipeline
.
The External Inference URL is stored a variable for the next step.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Get External Inference URL
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json'
## Retrieve the pipeline's External Inference URL
apiRequest = f"{APIURL}/v1/api/admin/get_pipeline_external_url"
data = {
"workspace_id": workspaceId,
"pipeline_name": pipeline_name
}
response = requests.post(apiRequest, json=data, headers=headers, verify=True).json()
deployurl = response['url']
deployurl
'https://api.autoscale-uat-ee.wallaroo.dev/v1/api/pipelines/infer/vsnaapiinferenceexamplepipeline-260/vsnaapiinferenceexamplepipeline'
The following headers are required for connecting the the Pipeline Deployment URL:
Authorization: This requires the JWT token in the format 'Bearer ' + token
. For example:
Authorization: Bearer abcdefg==
Content-Type:
For DataFrame formatted JSON:
Content-Type:application/json; format=pandas-records
For Arrow binary files, the Content-Type
is application/vnd.apache.arrow.file
.
Content-Type:application/vnd.apache.arrow.file
Accept
Accept: application/json; format=pandas-records
: The inference result is returned as a JSON in pandas Record format.Accept: application/vnd.apache.arrow.file
: The inference result is returned as a binary in Apache Arrow format.The inference can now be performed through the External Inference URL. This URL will accept the same inference data file that is used with the Wallaroo SDK, or with an Internal Inference URL as used in the Internal Pipeline Inference URL Tutorial.
For this example, the externalUrl
retrieved through the Get External Inference URL is used to submit a single inference request through the data file data-1.json
.
Reference: Wallaroo MLOps API Essentials Guide: Pipeline Management: Perform Inference Through External URL
The following example demonstrates performing an inference using a pandas Record format input.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/json; format=pandas-records'
## Inference through external URL using dataframe
# retrieve the json data to submit
data = [
{
"tensor":[
1.0678324729,
0.2177810266,
-1.7115145262,
0.682285721,
1.0138553067,
-0.4335000013,
0.7395859437,
-0.2882839595,
-0.447262688,
0.5146124988,
0.3791316964,
0.5190619748,
-0.4904593222,
1.1656456469,
-0.9776307444,
-0.6322198963,
-0.6891477694,
0.1783317857,
0.1397992467,
-0.3554220649,
0.4394217877,
1.4588397512,
-0.3886829615,
0.4353492889,
1.7420053483,
-0.4434654615,
-0.1515747891,
-0.2668451725,
-1.4549617756
]
}
]
# submit the request via POST, import as pandas DataFrame
response = pd.DataFrame.from_records(
requests.post(
deployurl,
json=data,
headers=headers)
.json()
)
display(response.loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1688750664105 | {'dense_1': [0.0014974177]} |
The following example demonstrates performing an inference using an Apache Arrow table as the input. The response is transformed into a pandas DataFrame for easier display.
# Retrieve the token
headers = wl.auth.auth_header()
# set Content-Type type
headers['Content-Type']='application/vnd.apache.arrow.file'
# set accept as apache arrow table
headers['Accept']="application/vnd.apache.arrow.file"
# Submit arrow file
dataFile="./data/cc_data_10k.arrow"
data = open(dataFile,'rb').read()
response = requests.post(
deployurl,
headers=headers,
data=data,
verify=True
)
# Arrow table is retrieved
with pa.ipc.open_file(response.content) as reader:
arrow_table = reader.read_all()
# convert to Polars DataFrame and display the first 5 rows
display(arrow_table.to_pandas().head(5).loc[:,["time", "out"]])
time | out | |
---|---|---|
0 | 1688750664889 | {'dense_1': [0.99300325]} |
1 | 1688750664889 | {'dense_1': [0.99300325]} |
2 | 1688750664889 | {'dense_1': [0.99300325]} |
3 | 1688750664889 | {'dense_1': [0.99300325]} |
4 | 1688750664889 | {'dense_1': [0.0010916889]} |
How to retrieve inference logs.