Wallaroo JSON Inference Data to DataFrame and Arrow Tutorials
The following guide on using inference data inputs from Wallaroo proprietary JSON to either Pandas DataFrame or Apache Arrow downloaded as part of the Wallaroo Tutorials repository.
Introduction
The following guide is to help users transition from using Wallaroo Proprietary JSON to Pandas DataFrame and Apache Arrow. The latter two formats allow data scientists to work natively with DataFrames, and when ready convert those into Arrow table files which provides greater file size efficiency and overall speed.
Using Pandas DataFrames for inference inputs requires typecasting into the Wallaroo inference engine. For models that are sensitive to data types, Arrow is the preferred format.
This guide will demonstrate the following:
- Convert Wallaroo Proprietary JSON to Pandas DataFrame: Converting from Wallaroo Proprietary JSON to Pandas DataFrame used for inferences in the Wallaroo Engine.
- Convert Wallaroo JSON File to Pandas DataFrame: Converting from Wallaroo JSON files or DataFrame JSON files to Pandas DataFrame for Wallaroo Engine inferences.
- Convert Pandas DataFrame to Arrow Table: Converting from Pandas DataFrame to Apache Arrow used for inferences in the Wallaroo Engine.
- Read Arrow File to DataFrame: How to convert from an Arrow binary file to an Apache Arrow object or DataFrame object.
- Convert Flattened Arrow Table to Multi-Dimensional Pandas DataFrame: How to convert from a one dimensional arrow table to a Multi-Dimensional Pandas DataFrame.
Prerequisites
The demonstration code assumes a Wallaroo instance with Arrow support enabled and provides the following:
ccfraud.onnx
: Sample trained ML Model trained to detect credit card fraud transactions.data/high_fraud.json
: Sample inference input formatted in the Wallaroo proprietary JSON format.
The following demonstrates how to convert Wallaroo Proprietary JSON to Pandas DataFrame. This example data and models are taken from the Wallaroo 101, which uses the CCFraud model examples.
Initialization
Connect to the Wallaroo Instance
The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
import wallaroo
from wallaroo.object import EntityNotFoundError
from IPython.display import display
# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
pd.set_option('display.max_colwidth', None)
# Login through local Wallaroo instance
wl = wallaroo.Client()
workspace_name = 'inferencedataexamplesworkspace'
pipeline_name = 'inferencedataexamplespipeline'
model_name = 'ccfraud'
model_file_name = './ccfraud.onnx'
# Create the workspace
workspace = wl.get_workspace(name=workspace_name, create_if_not_exist=True)
wl.set_current_workspace(workspace)
# upload the model
model = (wl.upload_model(model_name,
model_file_name,
framework=wallaroo.framework.Framework.ONNX
).configure(tensor_fields=["tensor"])
)
# Create the pipeline then deploy it
pipeline = wl.build_pipeline(pipeline_name)
pipeline.add_model_step(model).deploy()
name | inferencedataexamplespipeline |
---|---|
created | 2024-04-16 20:55:40.352813+00:00 |
last_updated | 2024-04-16 20:55:41.255402+00:00 |
deployed | True |
arch | x86 |
accel | none |
tags | |
versions | 0f7fbfca-3fae-431a-b770-21d6fe700eba, d954cb7d-5c98-4d01-9d7b-3ee5a1cd4b25 |
steps | ccfraud |
published | False |
Convert Wallaroo Proprietary JSON to Pandas DataFrame
The following demonstrates how to convert Wallaroo Proprietary JSON to Pandas DataFrame.
Load Libraries
The following libraries are used as part of the conversion process.
import pandas as pd
import pyarrow as pa
import json
import datetime
import numpy as np
Load Wallaroo Data
The Wallaroo data will be saved to a variable. This sample input when run through the trained model as an inference returns a high probability of fraud.
# Start with the single example
high_fraud_data = {
"tensor": [
[1.0678324729342086,
18.155556397512136,
-1.658955105843852,
5.2111788045436445,
2.345247064454334,
10.467083577773014,
5.0925820522419745,
12.829515363712181,
4.953677046849403,
2.3934736228338225,
23.912131817957253,
1.7599568310350207,
0.8561037518143335,
1.1656456468728567,
0.5395988813934498,
0.7784221343010385,
6.75806107274245,
3.927411847659908,
12.462178276650056,
12.307538216518655,
13.787951906620115,
1.4588397511627804,
3.681834686805714,
1.7539143660379741,
8.484355003656184,
14.6454097666836,
26.852377436250144,
2.716529237720336,
3.061195706890285]
]
}
Convert to DataFrame
The Wallaroo proprietary JSON file will now be converted into Pandas DataFrame.
high_fraud_dataframe = pd.DataFrame.from_records(high_fraud_data)
display(high_fraud_dataframe)
tensor | |
---|---|
0 | [1.0678324729342086, 18.155556397512136, -1.658955105843852, 5.2111788045436445, 2.345247064454334, 10.467083577773014, 5.0925820522419745, 12.829515363712181, 4.953677046849403, 2.3934736228338225, 23.912131817957253, 1.7599568310350207, 0.8561037518143335, 1.1656456468728567, 0.5395988813934498, 0.7784221343010385, 6.75806107274245, 3.927411847659908, 12.462178276650056, 12.307538216518655, 13.787951906620115, 1.4588397511627804, 3.681834686805714, 1.7539143660379741, 8.484355003656184, 14.6454097666836, 26.852377436250144, 2.716529237720336, 3.061195706890285] |
DataFrame for Inferences
Once converted, the DataFrame version of the data can be used for inferences in an Arrow enabled Wallaroo instance.
# Use this dataframe to infer
result = pipeline.infer(high_fraud_dataframe)
display(result)
time | in.tensor | out.dense_1 | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-16 20:55:56.747 | [1.0678324729, 18.1555563975, -1.6589551058, 5.2111788045, 2.3452470645, 10.4670835778, 5.0925820522, 12.8295153637, 4.9536770468, 2.3934736228, 23.912131818, 1.759956831, 0.8561037518, 1.1656456469, 0.5395988814, 0.7784221343, 6.7580610727, 3.9274118477, 12.4621782767, 12.3075382165, 13.7879519066, 1.4588397512, 3.6818346868, 1.753914366, 8.4843550037, 14.6454097667, 26.8523774363, 2.7165292377, 3.0611957069] | [0.981199] | 0 |
Pandas JSON to Pandas DataFrame
For JSON data that is in the Pandas DataFrame format, the data can be turned into a Pandas DataFrame object through the same method. Note that the original variable is JSON, which could have come from a file, to a DataFrame object.
high_fraud_dataframe_json = [
{
"tensor":[
1.0678324729,
18.1555563975,
-1.6589551058,
5.2111788045,
2.3452470645,
10.4670835778,
5.0925820522,
12.8295153637,
4.9536770468,
2.3934736228,
23.912131818,
1.759956831,
0.8561037518,
1.1656456469,
0.5395988814,
0.7784221343,
6.7580610727,
3.9274118477,
12.4621782767,
12.3075382165,
13.7879519066,
1.4588397512,
3.6818346868,
1.753914366,
8.4843550037,
14.6454097667,
26.8523774363,
2.7165292377,
3.0611957069
]
}
]
# Infer from the JSON
high_fraud_from_dataframe_json = pd.DataFrame.from_records(high_fraud_dataframe_json)
display(high_fraud_from_dataframe_json)
tensor | |
---|---|
0 | [1.0678324729, 18.1555563975, -1.6589551058, 5.2111788045, 2.3452470645, 10.4670835778, 5.0925820522, 12.8295153637, 4.9536770468, 2.3934736228, 23.912131818, 1.759956831, 0.8561037518, 1.1656456469, 0.5395988814, 0.7784221343, 6.7580610727, 3.9274118477, 12.4621782767, 12.3075382165, 13.7879519066, 1.4588397512, 3.6818346868, 1.753914366, 8.4843550037, 14.6454097667, 26.8523774363, 2.7165292377, 3.0611957069] |
# Use this dataframe to infer
results = pipeline.infer(high_fraud_from_dataframe_json)
display(results)
time | in.tensor | out.dense_1 | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-16 20:55:57.030 | [1.0678324729, 18.1555563975, -1.6589551058, 5.2111788045, 2.3452470645, 10.4670835778, 5.0925820522, 12.8295153637, 4.9536770468, 2.3934736228, 23.912131818, 1.759956831, 0.8561037518, 1.1656456469, 0.5395988814, 0.7784221343, 6.7580610727, 3.9274118477, 12.4621782767, 12.3075382165, 13.7879519066, 1.4588397512, 3.6818346868, 1.753914366, 8.4843550037, 14.6454097667, 26.8523774363, 2.7165292377, 3.0611957069] | [0.981199] | 0 |
Convert Wallaroo JSON File to Pandas DataFrame
When working with files containing Wallaroo JSON data, these can be imported from their original JSON, then converted to a Pandas DataFrame object with the pandas method read_json
.
high_fraud_filename = "./data/high_fraud.json"
high_fraud_data_from_file = pd.read_json(high_fraud_filename, orient="records")
display(high_fraud_data_from_file)
tensor | |
---|---|
0 | [1.067832472934208, 18.155556397512136, -1.658955105843852, 5.211178804543644, 2.345247064454334, 10.467083577773014, 5.092582052241974, 12.829515363712181, 4.953677046849403, 2.393473622833822, 23.912131817957253, 1.75995683103502, 0.8561037518143331, 1.165645646872856, 0.539598881393449, 0.778422134301038, 6.75806107274245, 3.9274118476599083, 12.462178276650056, 12.307538216518655, 13.787951906620115, 1.45883975116278, 3.681834686805714, 1.7539143660379741, 8.484355003656184, 14.6454097666836, 26.852377436250144, 2.7165292377203363, 3.061195706890285] |
The data can be used in an inference either with the infer
method on the DataFrame object, or directly from the file. Note that in either case, the returned object is a DataFrame.
# Use this dataframe to infer
result = pipeline.infer(high_fraud_data_from_file)
display(result)
time | in.tensor | out.dense_1 | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-16 20:55:57.267 | [1.0678324729, 18.1555563975, -1.6589551058, 5.2111788045, 2.3452470645, 10.4670835778, 5.0925820522, 12.8295153637, 4.9536770468, 2.3934736228, 23.912131818, 1.759956831, 0.8561037518, 1.1656456469, 0.5395988814, 0.7784221343, 6.7580610727, 3.9274118477, 12.4621782767, 12.3075382165, 13.7879519066, 1.4588397512, 3.6818346868, 1.753914366, 8.4843550037, 14.6454097667, 26.8523774363, 2.7165292377, 3.0611957069] | [0.981199] | 0 |
# Infer from file - it is read as a Pandas DataFrame object from the DataFrame JSON
result = pipeline.infer_from_file(high_fraud_filename)
display(result)
time | in.tensor | out.dense_1 | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-16 20:55:57.481 | [1.0678324729, 18.1555563975, -1.6589551058, 5.2111788045, 2.3452470645, 10.4670835778, 5.0925820522, 12.8295153637, 4.9536770468, 2.3934736228, 23.912131818, 1.759956831, 0.8561037518, 1.1656456469, 0.5395988814, 0.7784221343, 6.7580610727, 3.9274118477, 12.4621782767, 12.3075382165, 13.7879519066, 1.4588397512, 3.6818346868, 1.753914366, 8.4843550037, 14.6454097667, 26.8523774363, 2.7165292377, 3.0611957069] | [0.981199] | 0 |
Convert Pandas DataFrame to Arrow Table
The helper file convert_wallaroo_data.py
is used to convert from Pandas DataFrame to an Arrow Table with the following caveats:
Arrow requires the user to specify the exact datatypes of the array elements before passing the data to the engine. If you are aware of what data type the model expects, create a dictionary with column names as key and data type as the value and pass it as a param in place of data_type_dict
. If not, the convert_to_pa_dtype
function will try and guess the equivalent pyarrow data type and use it (this may or may not work as intended).
import convert_wallaroo_data
data_type_dict = {"tensor": pa.float32()}
pa_table = convert_wallaroo_data.convert_pandas_to_arrow(high_fraud_dataframe, data_type_dict)
pa_table
pyarrow.Table
tensor: fixed_size_list<item: float>[29]
child 0, item: float
----
tensor: [[[1.0678325,18.155556,-1.6589551,5.211179,2.345247,...,8.484355,14.64541,26.852377,2.7165291,3.0611956]]]
An inference can be done using the arrow table. The following shows the code sample and result. Note that when submitting an Arrow table to infer
, that the returned object is an Arrow table.
# use the arrow table for infer:
result = pipeline.infer(pa_table)
display(result)
pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
child 0, item: float
out.dense_1: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-04-16 20:55:57.706]]
in.tensor: [[[1.0678325,18.155556,-1.6589551,5.211179,2.345247,...,8.484355,14.64541,26.852377,2.7165291,3.0611956]]]
out.dense_1: [[[0.981199]]]
anomaly.count: [[0]]
Save Arrow Table to Arrow File
The converted Arrow table can be saved using the pyarrow
library.
arrow_file_name = "./data/high_fraud.arrow"
with pa.OSFile(arrow_file_name, 'wb') as sink:
with pa.ipc.new_file(sink, pa_table.schema) as arrow_ipc:
arrow_ipc.write(pa_table)
arrow_ipc.close()
infer_from_file
can be performed using the new arrow
file. Note again that when submitting an inference with an Arrow object, the returning value is an Arrow object.
result = pipeline.infer_from_file(arrow_file_name)
display(result)
pyarrow.Table
time: timestamp[ms]
in.tensor: list<item: float> not null
child 0, item: float
out.dense_1: list<inner: float not null> not null
child 0, inner: float not null
anomaly.count: uint32 not null
----
time: [[2024-04-16 20:55:57.939]]
in.tensor: [[[1.0678325,18.155556,-1.6589551,5.211179,2.345247,...,8.484355,14.64541,26.852377,2.7165291,3.0611956]]]
out.dense_1: [[[0.981199]]]
anomaly.count: [[0]]
Read Arrow File to DataFrame
The data can go the opposite direction - reading from an Arrow binary file, and turning the data into either an Arrow table with the Arrow read_all
method, or just the data into a DataFrame with the Arrow read_pandas
method.
with pa.ipc.open_file(arrow_file_name) as source:
table = source.read_all() # to get pyarrow table
table_df = source.read_pandas() # to get pandas dataframe
display(table)
display(table_df)
pyarrow.Table
tensor: fixed_size_list<item: float>[29]
child 0, item: float
----
tensor: [[[1.0678325,18.155556,-1.6589551,5.211179,2.345247,...,8.484355,14.64541,26.852377,2.7165291,3.0611956]]]
tensor | |
---|---|
0 | [1.0678325, 18.155556, -1.6589551, 5.211179, 2.345247, 10.467084, 5.092582, 12.829515, 4.953677, 2.3934736, 23.912132, 1.7599568, 0.8561038, 1.1656456, 0.5395989, 0.7784221, 6.758061, 3.9274118, 12.462178, 12.307538, 13.787951, 1.4588398, 3.6818347, 1.7539144, 8.484355, 14.64541, 26.852377, 2.7165291, 3.0611956] |
Convert Arrow Infer to DataFrame
When an infer result is returned as an Arrow object, it can be converted to a DataFrame for easy viewing.
result = pipeline.infer_from_file(arrow_file_name)
display(result.to_pandas())
time | in.tensor | out.dense_1 | anomaly.count | |
---|---|---|---|---|
0 | 2024-04-16 20:55:58.285 | [1.0678325, 18.155556, -1.6589551, 5.211179, 2.345247, 10.467084, 5.092582, 12.829515, 4.953677, 2.3934736, 23.912132, 1.7599568, 0.8561038, 1.1656456, 0.5395989, 0.7784221, 6.758061, 3.9274118, 12.462178, 12.307538, 13.787951, 1.4588398, 3.6818347, 1.7539144, 8.484355, 14.64541, 26.852377, 2.7165291, 3.0611956] | [0.981199] | 0 |
Convert Flattened Arrow Table to Multi-Dimensional Pandas DataFrame
Some ML models use multi-dimensional DataFrames. Currently Wallaroo supports and outputs flattened tables for inferences.
For situations where the original data was in a multi-dimensional DataFrame, the following procedure will convert the flattened Arrow table back into a desired multi-dimensional pandas DataFrame.
Here is a sample infer result data in Arrow Table format.
time_array = pa.array([datetime.datetime(2023, 2 , 22, 22, 14)])
in_tensor_array = pa.array([[1.5997242884551583,-0.72885535293112,-0.8464381472712799,-0.48041787186839674,0.8211244519635765,0.999086254697715,-1.365979802921807,0.36611200379560294,1.27093766309002,0.4895466723195178]])
out_array = pa.array([[1.8749652,-0.94025564,-1.0790397,-0.72123086,0.90895796,1.092086,-1.2834015,0.340406,1.2441622,0.57471186]])
check_failures_array = pa.array([0])
names = ["time", "in.tensor", "out.reshape", "check_failures"]
flattened_2d_table = pa.Table.from_arrays([time_array, in_tensor_array, out_array, check_failures_array], names = names)
flattened_2d_table
pyarrow.Table
time: timestamp[us]
in.tensor: list<item: double>
child 0, item: double
out.reshape: list<item: double>
child 0, item: double
check_failures: int64
----
time: [[2023-02-22 22:14:00.000000]]
in.tensor: [[[1.5997242884551583,-0.72885535293112,-0.8464381472712799,-0.48041787186839674,0.8211244519635765,0.999086254697715,-1.365979802921807,0.36611200379560294,1.27093766309002,0.4895466723195178]]]
out.reshape: [[[1.8749652,-0.94025564,-1.0790397,-0.72123086,0.90895796,1.092086,-1.2834015,0.340406,1.2441622,0.57471186]]]
check_failures: [[0]]
flattened_2d_table["out.reshape"]
<pyarrow.lib.ChunkedArray object at 0x176d93680>
[
[
[
1.8749652,
-0.94025564,
-1.0790397,
-0.72123086,
0.90895796,
1.092086,
-1.2834015,
0.340406,
1.2441622,
0.57471186
]
]
]
Verify the Shape
Let’s suppose the shape of the output that natively comes out of the model is [2,5]. We can use that to make sure the shape is correct when translating from the 1 dimensional Arrow table.
tensor_type = {"shape": [2, 5]}
output_df = flattened_2d_table.to_pandas()['out.reshape']
# numpy array, shape [N, 2, 5]
# In this case N = 1
output_list = [elt.reshape(tensor_type['shape']) for elt in output_df]
output_tensor = np.stack(output_list)
output_tensor
array([[[ 1.8749652 , -0.94025564, -1.0790397 , -0.72123086,
0.90895796],
[ 1.092086 , -1.2834015 , 0.340406 , 1.2441622 ,
0.57471186]]])
output_2d_df = pd.DataFrame(output_tensor.tolist())
output_2d_df
0 | 1 | |
---|---|---|
0 | [1.8749652, -0.94025564, -1.0790397, -0.72123086, 0.90895796] | [1.092086, -1.2834015, 0.340406, 1.2441622, 0.57471186] |
Undeploy Pipeline
The pipeline will now be undeployed to return the resources back to the Wallaroo instance.
pipeline.undeploy()
name | inferencedataexamplespipeline |
---|---|
created | 2024-04-16 20:55:40.352813+00:00 |
last_updated | 2024-04-16 20:55:41.255402+00:00 |
deployed | False |
arch | x86 |
accel | none |
tags | |
versions | 0f7fbfca-3fae-431a-b770-21d6fe700eba, d954cb7d-5c98-4d01-9d7b-3ee5a1cd4b25 |
steps | ccfraud |
published | False |