Wallaroo SDK Essentials Guide: Model Uploads and Registrations: Arbitrary Python

How to upload and use Containerized MLFlow with Wallaroo

Table of Contents

Arbitrary Python or BYOP (Bring Your Own Predict) allows organizations to use Python scripts and supporting libraries as it’s own model. Similar to using a Python step, arbitrary python is an even more robust and flexible tool for working with ML Models in Wallaroo pipelines.

ParameterDescription
Web Sitehttps://www.python.org/
Supported Librariespython==3.8
FrameworkFramework.CUSTOM aka custom

Arbitrary Python models, also known as Bring Your Own Predict (BYOP) allow for custom model inference methods with supporting scripts and artifacts. These are used with pre-trained models (PyTorch, Tensorflow, etc) along with their supporting artifacts such as other Python modules, scripts, model files, etc.

Contrast this with Wallaroo Python models - aka “Python steps” - are standalone python scripts that use the python libraries. These are commonly used for data formatting such as the pre and post-processing steps, and are also appropriate for simple models (such as ARIMA Statsmodels). A Wallaroo Python model can be composed of one or more Python script that matches the Wallaroo requirements.

Arbitrary Python File Requirements

Arbitrary Python (BYOP) models are uploaded to Wallaroo via a ZIP file with the following components:

ArtifactTypeDescription
Python scripts aka .py files with classes that extend mac.inference.Inference and mac.inference.creation.InferenceBuilderPython ScriptExtend the classes mac.inference.Inference and mac.inference.creation.InferenceBuilder. These are included with the Wallaroo SDK. Further details are in Arbitrary Python Script Requirements. Note that there is no specified naming requirements for the classes that extend mac.inference.Inference and mac.inference.creation.InferenceBuilder - any qualified class name is sufficient as long as these two classes are extended as defined below.
requirements.txtPython requirements fileThis sets the Python libraries used for the arbitrary python model. These libraries should be targeted for Python 3.8 compliance. These requirements and the versions of libraries should be exactly the same between creating the model and deploying it in Wallaroo. This insures that the script and methods will function exactly the same as during the model creation process.
Other artifactsFilesOther models, files, and other artifacts used in support of this model.

For example, the if the arbitrary python model will be known as vgg_clustering, the contents may be in the following structure, with vgg_clustering as the storage directory:

vgg_clustering\
    feature_extractor.h5
    kmeans.pkl
    custom_inference.py
    requirements.txt

Note the inclusion of the custom_inference.py file. This file name is not required - any Python script or scripts that extend the classes listed above are sufficient. This Python script could have been named vgg_custom_model.py or any other name as long as it includes the extension of the classes listed above.

The sample arbitrary python model file is created with the command zip -r vgg_clustering.zip vgg_clustering/.

Wallaroo Arbitrary Python uses the Wallaroo SDK mac module, included in the Wallaroo SDK 2023.2.1 and above. See the Wallaroo SDK Install Guides for instructions on installing the Wallaroo SDK.

Arbitrary Python Script Requirements

The entry point of the arbitrary python model is any python script that extends the following classes. These are included with the Wallaroo SDK. The required methods that must be overridden are specified in each section below.

  • mac.inference.Inference interface serves model inferences based on submitted input some input. Its purpose is to serve inferences for any supported arbitrary model framework (e.g. scikit, keras etc.).

    classDiagram
        class Inference {
            <<Abstract>>
            +model Optional[Any]
            +expected_model_types()* Set
            +predict(input_data: InferenceData)*  InferenceData
            -raise_error_if_model_is_not_assigned() None
            -raise_error_if_model_is_wrong_type() None
        }
  • mac.inference.creation.InferenceBuilder builds a concrete Inference, i.e. instantiates an Inference object, loads the appropriate model and assigns the model to to the Inference object.

    classDiagram
        class InferenceBuilder {
            +create(config InferenceConfig) * Inference
            -inference()* Any
        }

mac.inference.Inference

mac.inference.Inference Objects
ObjectTypeDescription
model (Required)[Any]One or more objects that match the expected_model_types. This can be a ML Model (for inference use), a string (for data conversion), etc. See Arbitrary Python Examples for examples.
mac.inference.Inference Methods
MethodReturnsDescription
expected_model_types (Required)SetReturns a Set of models expected for the inference as defined by the developer. Typically this is a set of one. Wallaroo checks the expected model types to verify that the model submitted through the InferenceBuilder method matches what this Inference class expects.
_predict (input_data: mac.types.InferenceData) (Required)mac.types.InferenceDataThe entry point for the Wallaroo inference with the following input and output parameters that are defined when the model is updated.
  • mac.types.InferenceData: The input InferenceData is a Dictionary of numpy arrays derived from the input_schema detailed when the model is uploaded, defined in PyArrow.Schema format.
  • mac.types.InferenceData: The output is a Dictionary of numpy arrays as defined by the output parameters defined in PyArrow.Schema format.
The InferenceDataValidationError exception is raised when the input data does not match mac.types.InferenceData.
raise_error_if_model_is_not_assignedN/AError when a model is not set to Inference.
raise_error_if_model_is_wrong_typeN/AError when the model does not match the expected_model_types.

The example, the expected_model_types can be defined for the KMeans model.

from sklearn.cluster import KMeans

class SampleClass(mac.inference.Inference):
    @property
    def expected_model_types(self) -> Set[Any]:
        return {KMeans}

mac.inference.creation.InferenceBuilder

InferenceBuilder builds a concrete Inference, i.e. instantiates an Inference object, loads the appropriate model and assigns the model to the Inference.

classDiagram
    class InferenceBuilder {
        +create(config InferenceConfig) * Inference
        -inference()* Any
    }

Each model that is included requires its own InferenceBuilder. InferenceBuilder loads one model, then submits it to the Inference class when created. The Inference class checks this class against its expected_model_types() Set.

mac.inference.creation.InferenceBuilder Methods
MethodReturnsDescription
create(config mac.config.inference.CustomInferenceConfig) (Required)The custom Inference instance.Creates an Inference subclass, then assigns a model and attributes. The CustomInferenceConfig is used to retrieve the config.model_path, which is a pathlib.Path object pointing to the folder where the model artifacts are saved. Every artifact loaded must be relative to config.model_path. This is set when the arbitrary python .zip file is uploaded and the environment for running it in Wallaroo is set. For example: loading the artifact vgg_clustering\feature_extractor.h5 would be set with config.model_path \ feature_extractor.h5. The model loaded must match an existing module. For our example, this is from sklearn.cluster import KMeans, and this must match the Inference expected_model_types.
inferencecustom Inference instance.Returns the instantiated custom Inference object created from the create method.

Arbitrary Python Runtime

Arbitrary Python always run in the containerized model runtime.

Arbitrary Python Inputs

Arbitrary Python inputs are defined during model upload in Apache Arrow Schema format with the following conditions:

  • By default, data inputs are optional unless they are specified with nullable=False.
  • The arbitrary Python code must be aware of the optional and required fields and how to manage those inputs.
  • Specific Data Types conditions:
    • Scaler: Scaler values can be Null.
    • Lists: Lists must either be empty [] or an an array of Null values, for example [None], but cannot be passed as Null outside of an array.
  • By default, columns with only the None or Null value are assigned by Python as NullArray, which is an array with all values of Null. In these situations, the schema must be specified.
Arbitrary Python Inputs Example

The following code sample demonstrates managing optional inputs.

The arbitrary Python code has three inputs:

  • input_1: A required List of floats.
  • input_2: An optional List of floats.
  • multiply_factor: An optional scaler float.

The following demonstrates setting the input and output schemas when uploading the sample code to Wallaroo.

import wallaroo
import pyarrow as pa
input_schema = pa.schema([
    pa.field('input_1', pa.list_(pa.float32()), nullable=False), # fields are optional by default unless `nullable` is set to `False`
    pa.field('input_2', pa.list_(pa.float32())),
    pa.field('multiply_factor', pa.int32()),
])

output_schema = pa.schema([
    pa.field('output', pa.list_(pa.float32())),
])

The following demonstrates different valid inputs based on the input schemas. These fields are submitted either as a pandas DataFrame or an Apache Arrow table when submitted for inference requests.

Note that each time the data is translated to an Apache Arrow table, the input schema is specified so the accurate data types are assigned to the column, even with the column values are Null or None.

The following input has all fields and values translated into an Apache Arrow table, then submitted as an inference request to a pipeline with our sample BYOP model.

input_1 = [[1., 2.], [3., 4.]]
input_2 = [[5., 6.], [7., 8.]]
multiply_factor = [2, 3]
arrow_table = pa.table({"input_1": input_1, "input_2": input_2, "multiply_factor": multiply_factor}, schema=input_schema)
display(arrow_)table

input_1 = [[1., 2.], [3., 4.]]
input_2 = [[], []]
multiply_factor = [None, None]
arrow_table = pa.table({"input_1": input_1, "input_2": input_2, "multiply_factor": multiply_factor}, schema=input_schema)
arrow_table

pipeline.infer(arrow_table)

pyarrow.Table
time: timestamp[ms]
in.input_1: list<item: float> not null
  child 0, item: float
in.input_2: list<item: float> not null
  child 0, item: float
in.multiply_factor: int32 not null
out.output: list<item: double> not null
  child 0, item: double
anomaly.count: uint32 not null
----
time: [[2024-04-30 09:12:01.445,2024-04-30 09:12:01.445]]
in.input_1: [[[1,2],[3,4]]]
in.input_2: [[[5,6],[7,8]]]
in.multiply_factor: [[2,3]]
out.output: [[[12,16],[30,36]]]
anomaly.count: [[0,0]]

In the following example input_2 has two empty lists, stored into a pandas DataFrame and submitted for the inference request.

dataframe = pd.DataFrame({'input_1': [[1., 2.], [3., 4.]], 'input_2': [[], []], 'multiply_factor': [2, 3]})
display(dataframe)
 input_1input_2multiply_factor
0[1.0, 2.0][]2
1[3.0, 4.0][]3

For the following example, input_2 is an empty list, with multiply_factor set to None. This is stored in an Apache Arrow table for the inference request.

input_1 = [[1., 2.], [3., 4.]]
input_2 = [[], []]
multiply_factor = [None, None]
arrow_table = pa.table({"input_1": input_1, "input_2": input_2, "multiply_factor": multiply_factor}, schema=input_schema)
display(arrow_table)

pyarrow.Table
input_1: list<item: float> not null
  child 0, item: float
input_2: list<item: float>
  child 0, item: float
multiply_factor: int32
----
input_1: [[[1,2],[3,4]]]
input_2: [[[],[]]]
multiply_factor: [[null,null]]

pipeline.infer(arrow_table)

pyarrow.Table
time: timestamp[ms]
in.input_1: list<item: float> not null
  child 0, item: float
in.input_2: list<item: float> not null
  child 0, item: float
in.multiply_factor: int32 not null
out.output: list<item: double> not null
  child 0, item: double
anomaly.count: uint32 not null
----
time: [[2024-04-30 09:07:42.467,2024-04-30 09:07:42.467]]
in.input_1: [[[1,2],[3,4]]]
in.input_2: [[[],[]]]
in.multiply_factor: [[null,null]]
out.output: [[[1,2],[3,4]]]
anomaly.count: [[0,0]]

Upload Arbitrary Python Model

Arbitrary Python models are uploaded to Wallaroo through the Wallaroo Client upload_model method.

Upload Arbitrary Python Model Parameters

The following parameters are required for Arbitrary Python models. Note that while some fields are considered as optional for the upload_model method, they are required for proper uploading of a Arbitrary Python model to Wallaroo.

ParameterTypeDescription
namestring (Required)The name of the model. Model names are unique per workspace. Models that are uploaded with the same name are assigned as a new version of the model.
pathstring (Required)The path to the model file being uploaded.
frameworkstring (Required)Set as Framework.CUSTOM.
input_schemapyarrow.lib.Schema (Required)The input schema in Apache Arrow schema format.
output_schemapyarrow.lib.Schema (Required)The output schema in Apache Arrow schema format.
convert_waitbool (Optional) (Default: True)
  • True: Waits in the script for the model conversion completion.
  • False: Proceeds with the script without waiting for the model conversion process to display complete.
archwallaroo.engine_config.ArchitectureThe architecture the model is deployed to. If a model is intended for deployment to an ARM architecture, it must be specified during this step. Values include: X86 (Default): x86 based architectures. ARM: ARM based architectures.

Once the upload process starts, the model is containerized by the Wallaroo instance. This process may take up to 10 minutes.

Upload Arbitrary Python Model Return

upload_model returns a wallaroo.model_version.ModelVersion object with the following fields.

FieldTypeDescription
nameStringThe name of the model.
versionStringThe model version as a unique UUID.
file_nameStringThe file name of the model as stored in Wallaroo.
SHAStringThe hash value of the model file.
StatusStringThe status of the model.
image_pathStringThe image used to deploy the model in the Wallaroo engine.
last_update_timeDateTimeWhen the model was last updated.

Arbitrary Python Examples

The following are examples of use cases for BYOP models.

Upload Arbitrary Python Model Example

The following example is of uploading a Arbitrary Python VGG16 Clustering ML Model to a Wallaroo instance.

Arbitrary Python Script Example

The following is an example script that fulfills the requirements for a Wallaroo Arbitrary Python Model, and would be saved as custom_inference.py.

"""This module features an example implementation of a custom Inference and its
corresponding InferenceBuilder."""

import pathlib
import pickle
from typing import Any, Set

import tensorflow as tf
from mac.config.inference import CustomInferenceConfig
from mac.inference import Inference
from mac.inference.creation import InferenceBuilder
from mac.types import InferenceData
from sklearn.cluster import KMeans


class ImageClustering(Inference):
    """Inference class for image clustering, that uses
    a pre-trained VGG16 model on cifar10 as a feature extractor
    and performs clustering on a trained KMeans model.

    Attributes:
        - feature_extractor: The embedding model we will use
        as a feature extractor (i.e. a trained VGG16).
        - expected_model_types: A set of model instance types that are expected by this inference.
        - model: The model on which the inference is calculated.
    """

    def __init__(self, feature_extractor: tf.keras.Model):
        self.feature_extractor = feature_extractor
        super().__init__()

    @property
    def expected_model_types(self) -> Set[Any]:
        return {KMeans}

    @Inference.model.setter  # type: ignore
    def model(self, model) -> None:
        """Sets the model on which the inference is calculated.

        :param model: A model instance on which the inference is calculated.

        :raises TypeError: If the model is not an instance of expected_model_types
            (i.e. KMeans).
        """
        self._raise_error_if_model_is_wrong_type(model) # this will make sure an error will be raised if the model is of wrong type
        self._model = model

    def _predict(self, input_data: InferenceData) -> InferenceData:
        """Calculates the inference on the given input data.
        This is the core function that each subclass needs to implement
        in order to calculate the inference.

        :param input_data: The input data on which the inference is calculated.
        It is of type InferenceData, meaning it comes as a dictionary of numpy
        arrays.

        :raises InferenceDataValidationError: If the input data is not valid.
        Ideally, every subclass should raise this error if the input data is not valid.

        :return: The output of the model, that is a dictionary of numpy arrays.
        """

        # input_data maps to the input_schema we have defined
        # with PyArrow, coming as a dictionary of numpy arrays
        inputs = input_data["images"]

        # Forward inputs to the models
        embeddings = self.feature_extractor(inputs)
        predictions = self.model.predict(embeddings.numpy())

        # Return predictions as dictionary of numpy arrays
        return {"predictions": predictions}


class ImageClusteringBuilder(InferenceBuilder):
    """InferenceBuilder subclass for ImageClustering, that loads
    a pre-trained VGG16 model on cifar10 as a feature extractor
    and a trained KMeans model, and creates an ImageClustering object."""

    @property
    def inference(self) -> ImageClustering:
        return ImageClustering

    def create(self, config: CustomInferenceConfig) -> ImageClustering:
        """Creates an Inference subclass and assigns a model and additionally
        needed attributes to it.

        :param config: Custom inference configuration. In particular, we're
        interested in `config.model_path` that is a pathlib.Path object
        pointing to the folder where the model artifacts are saved.
        Every artifact we need to load from this folder has to be
        relative to `config.model_path`.

        :return: A custom Inference instance.
        """
        feature_extractor = self._load_feature_extractor(
            config.model_path / "feature_extractor.h5"
        )
        inference = self.inference(feature_extractor)
        model = self._load_model(config.model_path / "kmeans.pkl")
        inference.model = model

        return inference

    def _load_feature_extractor(
        self, file_path: pathlib.Path
    ) -> tf.keras.Model:
        return tf.keras.models.load_model(file_path)

    def _load_model(self, file_path: pathlib.Path) -> KMeans:
        with open(file_path.as_posix(), "rb") as fp:
            model = pickle.load(fp)
        return model

The following is the requirements.txt file that would be included in the arbitrary python ZIP file. It is highly recommended to use the same requirements.txt file for setting the libraries and versions used to create the model in the arbitrary python ZIP file.

tensorflow==2.8.0
scikit-learn==1.2.2

Upload Arbitrary Python Example

The following example demonstrates uploading the arbitrary python model as vgg_clustering.zip with the following input and output schemas defined.

input_schema = pa.schema([
    pa.field('images', pa.list_(
        pa.list_(
            pa.list_(
                pa.int64(),
                list_size=3
            ),
            list_size=32
        ),
        list_size=32
    )),
])

output_schema = pa.schema([
    pa.field('predictions', pa.int64()),
])

model = wl.upload_model(
                        'vgg16-clustering', 
                        'vgg16_clustering.zip', 
                        framework=Framework.CUSTOM, 
                        input_schema=input_schema, 
                        output_schema=output_schema, 
                        convert_wait=True
                    )

Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime.......................successful

Ready

Data Shaping

Pre and post processing of data is typically performed with Python models, which use the default Wallaroo Python libraries to shape data for models.

When Python libraries are required for data shaping that are not part of the standard included Wallaroo Python libraries, BYOP models are used to import those additional libraries.

The following example uses the following requirements field to add additional libraries for image conversion. In this example, there is no ML Model that is part of the BYOP model. The ImageResize class extends the mac.inference.Inference to perform the data conversion.

tensorflow==2.8.0
pillow>=10.0.0

The following code accepts data from either a pandas DataFrame or Apache arrow table where the data is in the data column, and reformats that data to be in the image column.

"""This module features an example implementation of a custom Inference and its
corresponding InferenceBuilder."""

import pathlib
import pickle
from typing import Any, Set
import base64
import numpy as np
from PIL import Image
import logging

from mac.config.inference import CustomInferenceConfig
from mac.inference import Inference
from mac.inference.creation import InferenceBuilder
from mac.types import InferenceData

class ImageResize(Inference):
    """Inference class for image resizing.
    """

    def __init__(self):
        self.model = "conversion-sample"
        super().__init__()

    @property
    def expected_model_types(self) -> Set[Any]:
        return {str}

    @Inference.model.setter  # type: ignore
    def model(self, model) -> None:
        # Hazard: this has to be here because the ABC has the getter
        self._model = "conversion-sample"

    def _predict(self, input_data: InferenceData) -> InferenceData:
        # input_data maps to the input_schema we have defined
        # with PyArrow, coming as a dictionary of numpy arrays
        img = input_data["data"]
        logging.debug(f"In Python  {type(img)}")

        
        res = {"image": img} # sets the `image` field to the incoming data's ['data'] field.
        logging.debug(f"Returning results")
        return res

class ImageResizeBuilder(InferenceBuilder):
    """InferenceBuilder subclass for ImageResize."""

    @property
    def inference(self) -> ImageResize:
        return ImageResize

    def create(self, config: CustomInferenceConfig) -> ImageResize:
        """Creates an Inference subclass and assigns a model and additionally
        needed attributes to it.

        :param config: Custom inference configuration. In particular, we're
        interested in `config.model_path` that is a pathlib.Path object
        pointing to the folder where the model artifacts are saved.
        Every artifact we need to load from this folder has to be
        relative to `config.model_path`.

        :return: A custom Inference instance.
        """
        x = self.inference()
        x.model = "conversion-sample"
        return x

The BYOP model is uploaded to Wallaroo using framework=wallaroo.framework.Framework.CUSTOM as a parameter in the model_upload() function and added to a pipeline as a pipeline step. Here the BYOP model formats the data before submitting to the actual computer vision model.

# for the BYOP data reshaper model
input_schema = pa.schema([pa.field("data", pa.list_(pa.float32(), list_size=921600))])
output_schema = pa.schema([pa.field("image", pa.list_(pa.float32(), list_size=921600))])

resize = wl.upload_model("resize", "./resize-arrow.zip", framework=wallaroo.framework.Framework.CUSTOM,
                         input_schema = input_schema, output_schema = output_schema, convert_wait=True)

# for the CV model

input_schema = pa.schema([pa.field("data", pa.list_(pa.float32(), list_size=921600))])
output_schema = pa.schema([pa.field("image", pa.list_(pa.float32(), list_size=921600))])

model = wl.upload_model('mobilenet', "./model/mobilenet.pt.onnx", 
                        framework = wallaroo.framework.Framework.ONNX

# set the engine config
dc = wallaroo.DeploymentConfigBuilder() \
    .cpus(4)\
    .memory("4Gi")\
    .build()

pipeline = wl.build_pipeline('resize-pipeline')
pipeline.add_model_step(resize)
pipeline.add_model_step(model)

# deploy the pipeline
pipeline.deploy(deployment_config = dc)