Inferences are performed on deployed pipelines. This submits data to the pipeline, where it is processed through each of the pipeline’s steps with the output of the previous step providing the input for the next step. The final step will then output the result of all of the pipeline’s steps.
- Inputs are either sent one of the following:
- pandas.DataFrame. The return value will be a pandas.DataFrame.
- Apache Arrow (Preferred). The return value will be an Apache Arrow table.
- Python Dict. The return value will be JSON.
Apache Arrow is the recommended method of data inputs for inferences. Wallaroo inference data is based on Apache Arrow, which will return the fastest inference results and smaller data transfer amounts on average than JSON or DataFrame tables. Arrow tables also specify the data types used in their schema, insuring that the data sent and receives are exactly what is required. Using pandas DataFrame requires inferring the data type which may lead to data type mismatch issues.
For a complete example of using the Wallaroo SDK for inferencing, see the Wallaroo SDK Inference Tutorial.
Run Inference through Local Variable
The pipeline infer(data, timeout, dataset, dataset_exclude, dataset_separator)
method performs an inference as defined by the pipeline steps and takes the following arguments:
- data (REQUIRED): The data submitted to the pipeline for inference. The following data inputs are supported:
- pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
- Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
- timeout (OPTIONAL): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds.
- dataset (OPTIONAL): The datasets to be returned. The datasets available are:
*
: Default. This translates to["time", "in", "out", "check_failures"]
.time
: The DateTime of the inference request.in
: All inputs listed asin_{variable_name}
.out
: All outputs listed asout_variable_name
.check_failures
: Flags whether an Anomaly or Validation Check was triggered.0
indicates no checks were triggers, 1 or greater indicates a check was triggered.meta
: IMPORTANT NOTE: See Metadata Requests Restrictions for specifications on how to usemeta
ormetadata
dataset requests in combination with other fields.- Returns in the
metadata.elapsed
field:- A list of time in nanoseconds for:
- The time to serialize the input.
- How long each step took.
- A list of time in nanoseconds for:
- Returns in the
metadata.last_model
field:- A dict with each Python step as:
model_name
: The name of the model in the pipeline step.model_sha
: The sha hash of the model in the pipeline step.
- Returns in the
metadata.pipeline_version
field:- The pipeline version as a UUID value.
- Returns in the
metadata.elapsed
: See Metadata Requests Restrictions for specifications on how to usemeta
ormetadata
dataset requests in combination with other fields.- Returns in the
metadata.elapsed
field:- A list of time in nanoseconds for:
- The time to serialize the input.
- How long each step took.
- A list of time in nanoseconds for:
- Returns in the
- dataset_exclude (OPTIONAL): Allows users to exclude parts of the dataset.
- dataset_separator (OPTIONAL): Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.
Outputs of the inference are based on the model’s outputs as out.{model_output}
. This model only has one output - dense_1
, which is listed in the out.dense_1
column. If the model has multiple outputs, they would be listed as out.output1
, out.output2
, etc.
The following example is an inference request using an Apache Arrow table. The inference result is returned as an Apache Arrow table, which is then converted into a Pandas DataFrame and a Polars DataFrame, with the results filtered based on results greater than 0.75.
result = ccfraud_pipeline.infer(ccfraud_input_1k_arrow_table)
display(result)
pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
child 0, item: float
out.dense_1: list<inner: float not null> not null
child 0, inner: float not null
check_failures: int8
----
time: [[2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,...,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562,2023-03-20 18:55:09.562]]
in.tensor: [[[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],[-1.0603298,2.3544967,-3.5638788,5.138735,-1.2308457,...,0.038412016,1.0993439,1.2603409,-0.14662448,-1.4463212],...,[0.49511018,-0.24993694,0.4553345,0.92427504,-0.36435103,...,1.1117147,-0.566654,0.12122019,0.06676402,0.6583282],[0.61188054,0.1726081,0.43105456,0.50321484,-0.27466634,...,0.30260187,0.081211455,-0.15578508,0.017189292,-0.7236631]]]
out.dense_1: [[[0.99300325],[0.99300325],...,[0.0008533001],[0.0012498498]]]
check_failures: [[0,0,0,0,0,...,0,0,0,0,0]]
import pyarrow as pa
list = [0.75]
outputs = result.to_pandas()
# display(outputs)
filter = [elt[0] > 0.75 for elt in outputs['out.dense_1']]
outputs = outputs.loc[filter]
display(outputs)
  | time | in.tensor | out.dense_1 | check_failures |
---|---|---|---|---|
0 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
1 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
2 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
3 | 2023-03-20 18:55:09.562 | [-1.0603298, 2.3544967, -3.5638788, 5.138735, -1.2308457, -0.76878244, -3.5881228, 1.8880838, -3.2789674, -3.9563255, 4.099344, -5.653918, -0.8775733, -9.131571, -0.6093538, -3.7480276, -5.0309124, -0.8748149, 1.9870535, 0.7005486, 0.9204423, -0.10414918, 0.32295644, -0.74181414, 0.038412016, 1.0993439, 1.2603409, -0.14662448, -1.4463212] | [0.99300325] | 0 |
161 | 2023-03-20 18:55:09.562 | [-9.716793, 9.174981, -14.450761, 8.653825, -11.039951, 0.6602411, -22.825525, -9.919395, -8.064324, -16.737926, 4.852197, -12.563343, -1.0762653, -7.524591, -3.2938414, -9.62102, -15.6501045, -7.089741, 1.7687134, 5.044906, -11.365625, 4.5987034, 4.4777045, 0.31702697, -2.2731977, 0.07944675, -10.052058, -2.024108, -1.0611985] | [1.0] | 0 |
941 | 2023-03-20 18:55:09.562 | [-0.50492376, 1.9348029, -3.4217603, 2.2165704, -0.6545315, -1.9004827, -1.6786858, 0.5380051, -2.7229102, -5.265194, 3.504164, -5.4661765, 0.68954825, -8.725291, 2.0267954, -5.4717045, -4.9123807, -1.6131229, 3.8021576, 1.3881834, 1.0676425, 0.28200775, -0.30759808, -0.48498034, 0.9507336, 1.5118006, 1.6385275, 1.072455, 0.7959132] | [0.9873102] | 0 |
import polars as pl
outputs = pl.from_arrow(result)
display(outputs.filter(pl.col("out.dense_1").apply(lambda x: x[0]) > 0.75))
time | in.tensor | out.dense_1 | check_failures |
---|---|---|---|
datetime[ms] | list[f32] | list[f32] | i8 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-1.06033, 2.354497, … -1.446321] | [0.993003] | 0 |
2023-03-20 18:55:09.562 | [-9.716793, 9.174981, … -1.061198] | [1.0] | 0 |
2023-03-20 18:55:09.562 | [-0.504924, 1.934803, … 0.795913] | [0.98731] | 0 |
Metadata Requests Restrictions
The following restrictions are in place when requesting the datasets metadata
or metadata.elapsed
.
Standard Pipeline Steps
For the following Pipeline steps, metadata
or metadata.elapsed
must be requested with the *
parameter. For example:
result = mainpipeline.infer(normal_input, dataset=["*", "metadata.elapsed"])
Effected pipeline steps:
add_model_step
replace_with_model_step
Testing Pipeline Steps
For the following Pipeline steps, meta
or metadata.elapsed
can not be included with the *
parameter. For example:
result = mainpipeline.infer(normal_input, dataset=["metadata.elapsed"])
Effected pipeline steps:
add_random_split
replace_with_random_split
add_shadow_deploy
replace_with_shadow_deploy
Numpy Arrays as Inputs
Numpy arrays can be submitted as an input by containing it within a DataFrame. In this example, the input column is tensor
, but can whatever the model expects.
dataframedata = pd.DataFrame({"tensor":[npArray]})
This bypasses the need to convert the npArray to a List - the object itself can be embedded into the DataFrame table and submitted. For this example, a DataFrame with the column tensor
that contains a numpy array will be submitted as an inference, and from the return only the column out.2519
will be displayed.
infResults = pipeline.infer(dataframedata, dataset=["*", "metadata.elapsed"])
display(infResults.loc[0]["out.2519"])
[44,
44,
44,
44,
82,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
44,
84,
84,
44,
84,
44,
44,
44,
61,
44,
86,
44,
44]
Run Inference From A File
To submit a data file directly to a pipeline, use the pipeline infer_from_file(data, timeout, dataset, dataset_exclude, dataset_separator)
method. This performs an inference as defined by the pipeline steps and takes the following arguments:
- data (REQUIRED): The name of the file submitted to the pipeline for inference.
- pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
- Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
- [Custom JSON]: Data formatted in a custom JSON format. This requires the use of the
data_format="custom-json"
parameter. IMPORTANT NOTE: Submitting JSON as input data can have performance repercussions compared to using either pandas DataFrame or Apache Arrow as the data input.
- timeout (OPTIONAL): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per call - with 10 inference requests, each would have a default timeout of 15 seconds. Inferences sent in a batch rather than individual inference requests are processed faster.
- dataset (OPTIONAL): The datasets to be returned. By default this is set to ["*"] which returns, [“time”, “in”, “out”, “check_failures”].
- dataset (OPTIONAL): The datasets to be returned. The datasets available are:
*
: Default. This translates to["time", "in", "out", "check_failures"]
.time
: The DateTime of the inference request.in
: All inputs listed asin_{variable_name}
.out
: All outputs listed asout_variable_name
.check_failures
: Flags whether an Anomaly or Validation Check was triggered.0
indicates no checks were triggers, 1 or greater indicates a check was triggered.meta
:- Returns in the
metadata.elapsed
field:- A list of time in nanoseconds for:
- The time to serialize the input.
- How long each step took.
- A list of time in nanoseconds for:
- Returns in the
metadata.elapsed
:- Returns in the
metadata.elapsed
field:- A list of time in nanoseconds for:
- The time to serialize the input.
- How long each step took.
- A list of time in nanoseconds for:
- Returns in the
metadata.last_model
field:- A dict with each Python step as:
model_name
: The name of the model in the pipeline step.model_sha
: The sha hash of the model in the pipeline step.
- Returns in the
- data_format: If the input is custom JSON, then this parameter must be included as
data_format="custom-json"
. - dataset_exclude (OPTIONAL): Allows users to exclude parts of the dataset.
- dataset_separator (OPTIONAL): Allows other types of dataset separators to be used. If set to “.”, the returned dataset will be flattened.
In this example, an inference of 50K inferences as an Apache Arrow file will be submitted to a model trained for reviewing IMDB reviews, and the first 5 results displayed.
results = imdb_pipeline.infer_from_file('./data/test_data_50K.arrow')
import polars as pl
outputs = pl.from_arrow(results)
display(outputs.head(5))
shape: (5, 4)
time | in.tensor | out.dense_1 | check_failures |
---|---|---|---|
datetime[ms] | list[f32] | list[f32] | i8 |
2023-03-20 20:53:50.170 | [11.0, 6.0, … 0.0] | [0.898019] | 0 |
2023-03-20 20:53:50.170 | [54.0, 548.0, … 20.0] | [0.056597] | 0 |
2023-03-20 20:53:50.170 | [1.0, 9259.0, … 1.0] | [0.92608] | 0 |
2023-03-20 20:53:50.170 | [10.0, 25.0, … 0.0] | [0.926919] | 0 |
2023-03-20 20:53:50.170 | [10.0, 37.0, … 0.0] | [0.661858] | 0 |
In this example, an inference will be submitted to the ccfraud_pipeline
with the file smoke_test.df.json
, a DataFrame formatted JSON file.
result = ccfraud_pipeline.infer_from_file('./data/smoke_test.df.json')
time | in.tensor | out.dense_1 | check_failures | |
---|---|---|---|---|
0 | 2023-02-15 23:07:07.497 | [1.0678324729, 0.2177810266, -1.7115145262, 0.682285721, 1.0138553067, -0.4335000013, 0.7395859437, -0.2882839595, -0.447262688, 0.5146124988, 0.3791316964, 0.5190619748, -0.4904593222, 1.1656456469, -0.9776307444, -0.6322198963, -0.6891477694, 0.1783317857, 0.1397992467, -0.3554220649, 0.4394217877, 1.4588397512, -0.3886829615, 0.4353492889, 1.7420053483, -0.4434654615, -0.1515747891, -0.2668451725, -1.4549617756] | [0.0014974177] | 0 |
Parallel Inferences
Wallaroo pipelines allow for multiple replicas of the pipeline and models to be deployed. This allows for parallel inferences to increase the speed of multiple inferences requests. Wallaroo does so by scaling multiple replicas of the deployed pipeline and models based on the pipeline configuration. See Pipeline Deployment Configuration.
Parallel Inference Use Cases
Parallel inferences are most useful when:
- Inference request inputs are extremely large - for example, greater than 4 GB. Parallen inference requests allow that request to be split into more manageable sizes and submitted in one request, with each segment split as a separate inference request automatically.
- Inference inputs come from different data sources. This allows organizations to query data from different sources, add each query result to the list, then submit the entire list as one request and receive the results fast.
- Image processing, where the entire image is of a extreme size and resolution where submitting the entire image requires large memory and bandwidth. The image can be resolved into separate pieces, then all the pieces submitted in one requests to allow parallelization to examine each individual piece and return the results faster than analyzing the entire large image.
It is highly recommended that the data elements included in the parallel inference List are all of the same data type. For example: all of the elements of the list should be a pandas DataFrame OR all an Apache Arrow table. This makes processing the returned information easier rather than trying to parse what type of data is received.
For example, if the parallel inference input list should be in the format:
Data Type | |
---|---|
0 | DataFrame |
1 | DataFrame |
2 | DataFrame |
3 | DataFrame |
And not:
Data Type | |
---|---|
0 | DataFrame |
1 | Apache Arrow |
2 | DataFrame |
3 | Apache Arrow |
Parallel Inferences Method
The pipeline parallel_infer(tensor_list, timeout, num_parallel, retries)
asynchronous method performs an inference as defined by the pipeline steps and takes the following arguments:
- tensor_list (REQUIRED List): The data submitted to the pipeline for inference as a List of the supported data types:
- pandas.DataFrame: Data submitted as a pandas DataFrame are returned as a pandas DataFrame. For models that output one column based on the models outputs.
- Apache Arrow (Preferred): Data submitted as an Apache Arrow are returned as an Apache Arrow.
- timeout (OPTIONAL int): A timeout in seconds before the inference throws an exception. The default is 15 second per call to accommodate large, complex models. Note that for a batch inference, this is per list item - with 10 inference requests, each would have a default timeout of 15 seconds.
- num_parallel (OPTIONAL int): The number of parallel threads used for the submission. This should be no more than four times the number of pipeline replicas.
- retries (OPTIONAL int): The number of retries per inference request submitted.
parallel_infer
is an asynchronous method that returns the Python callback list of tasks. Calling parallel_infer
should be called with the await
keyword to retrieve the callback results.
For example, the following will split a single pandas DataFrame table into rows, and submit each row as a separate DataFrame table. Once complete, each separate table is submitted via parallel_infer
, and the results collected together as a new List. For this example, there are 4 replicas set in the pipeline deployment configuration.
dataset = []
for index, row in test_data.head(200).iterrows():
dataset.append(row.to_frame('text_input').reset_index())
# we have a list of 200 dataframes - run as in inference
parallel_results = await pipeline.parallel_infer(dataset, timeout=10, num_parallel=8, retries=1)
Parallel Inference Returns
The await pipeline.parallel_infer
method asynchronously returns a List of inference results. This includes how inference requests match the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects. For example: a parallel inference request with 3 DataFrame tables in the list will return a list with 3 DataFrame tables.
Inference failures are tied to the object in the List that caused the failure. For example, a List with [dataframe1, dataframe2, dataframe3]
where dataframe2
is malformed, then the List returned from await pipeline.parallel_infer
would be [some inference result, error inference result, some inference result]
. Results are returned in the same order of the data submitted.
Output Formats
DataFrame and Arrow
Output formats are based on the input types: pandas DataFrame inputs return pandas DataFrame, and Apache Arrow inputs return Apache Arrow objects.
The default columns returned are:
- time: The DateTime of the inference request.
- in: The input data.
- out: The output data. Outputs of the inference are based on the model’s outputs as
out.{model_output}
. This model only has one output -dense_1
, which is listed in theout.dense_1
column. If the model has multiple outputs, they would be listed asout.{outputname1}
,out.{outputname2}
, etc. - check_failures: Whether any Pipeline validation parameters were triggered.
Columns returned are controlled by the dataset_exclude
array parameter, which specifies which output columns to ignore. For example, if a model outputs the columns out.rambo
, out.main
, out.glibnar
, using the parameter dataset_exclude=["out.rambo", "out.glibnar"]
will exclude those columns from the output.
Custom JSON
When submitting custom JSON as an input, JSON is returned as an output based on the model’s output parameters.
Using Apache Arrow is highly encouraged over custom JSON or pandas DataFrame for the inference speed, lower data transmission, and use specific data types as defined in the Arrow table schemas.
In this example, a pipeline with a Statsmodel model accepts custom JSON inputs and returns JSON as the output.
results = pipeline.infer_from_file('bike_day_eval.json', data_format="custom-json")
display(results)
[{'forecast': [1882.378455403016,
2130.6079157429585,
2340.840053800859,
2895.754978555364,
2163.6575155637433,
1509.1792126514365,
2431.183892393437]}]