Wallaroo SDK Essentials Guide: Model Uploads and Registrations: Arbitrary Python

How to upload and use Containerized MLFlow with Wallaroo

Arbitrary Python or BYOP (Bring Your Own Predict) allows organizations to use Python scripts and supporting libraries as it’s own model. Similar to using a Python step, arbitrary python is an even more robust and flexible tool for working with ML Models in Wallaroo pipelines.

ParameterDescription
Web Sitehttps://www.python.org/
Supported Librariespython==3.8
FrameworkFramework.CUSTOM aka custom
RuntimeContainerized aka flight

Arbitrary Python models, also known as Bring Your Own Predict (BYOP) allow for custom model deployments with supporting scripts and artifacts. These are used with pre-trained models (PyTorch, Tensorflow, etc) along with whatever supporting artifacts they require. Supporting artifacts can include other Python modules, model files, etc. These are zipped with all scripts, artifacts, and a requirements.txt file that indicates what other Python models need to be imported that are outside of the typical Wallaroo platform.

Contrast this with Wallaroo Python models - aka “Python steps”. These are standalone python scripts that use the python libraries natively supported by the Wallaroo platform. These are used for either simple model deployment (such as ARIMA Statsmodels), or data formatting such as the postprocessing steps. A Wallaroo Python model will be composed of one Python script that matches the Wallaroo requirements.

Arbitrary Python File Requirements

Arbitrary Python (BYOP) models are uploaded to Wallaroo via a ZIP file with the following components:

ArtifactTypeDescription
Python scripts aka .py files with classes that extend mac.inference.Inference and mac.inference.creation.InferenceBuilderPython ScriptExtend the classes mac.inference.Inference and mac.inference.creation.InferenceBuilder. These are included with the Wallaroo SDK. Further details are in Arbitrary Python Script Requirements. Note that there is no specified naming requirements for the classes that extend mac.inference.Inference and mac.inference.creation.InferenceBuilder - any qualified class name is sufficient as long as these two classes are extended as defined below.
requirements.txtPython requirements fileThis sets the Python libraries used for the arbitrary python model. These libraries should be targeted for Python 3.8 compliance. These requirements and the versions of libraries should be exactly the same between creating the model and deploying it in Wallaroo. This insures that the script and methods will function exactly the same as during the model creation process.
Other artifactsFilesOther models, files, and other artifacts used in support of this model.

For example, the if the arbitrary python model will be known as vgg_clustering, the contents may be in the following structure, with vgg_clustering as the storage directory:

vgg_clustering\
    feature_extractor.h5
    kmeans.pkl
    custom_inference.py
    requirements.txt

Note the inclusion of the custom_inference.py file. This file name is not required - any Python script or scripts that extend the classes listed above are sufficient. This Python script could have been named vgg_custom_model.py or any other name as long as it includes the extension of the classes listed above.

The sample arbitrary python model file is created with the command zip -r vgg_clustering.zip vgg_clustering/.

Wallaroo Arbitrary Python uses the Wallaroo SDK mac module, included in the Wallaroo SDK 2023.2.1 and above. See the Wallaroo SDK Install Guides for instructions on installing the Wallaroo SDK.

Arbitrary Python Script Requirements

The entry point of the arbitrary python model is any python script that extends the following classes. These are included with the Wallaroo SDK. The required methods that must be overridden are specified in each section below.

  • mac.inference.Inference interface serves model inferences based on submitted input some input. Its purpose is to serve inferences for any supported arbitrary model framework (e.g. scikit, keras etc.).

    classDiagram
        class Inference {
            <<Abstract>>
            +model Optional[Any]
            +expected_model_types()* Set
            +predict(input_data: InferenceData)*  InferenceData
            -raise_error_if_model_is_not_assigned() None
            -raise_error_if_model_is_wrong_type() None
        }
  • mac.inference.creation.InferenceBuilder builds a concrete Inference, i.e. instantiates an Inference object, loads the appropriate model and assigns the model to to the Inference object.

    classDiagram
        class InferenceBuilder {
            +create(config InferenceConfig) * Inference
            -inference()* Any
        }

mac.inference.Inference

mac.inference.Inference Objects
ObjectTypeDescription
model (Required)[Any]One or more objects that match the expected_model_types. This can be a ML Model (for inference use), a string (for data conversion), etc. See Arbitrary Python Examples for examples.
mac.inference.Inference Methods
MethodReturnsDescription
expected_model_types (Required)SetReturns a Set of models expected for the inference as defined by the developer. Typically this is a set of one. Wallaroo checks the expected model types to verify that the model submitted through the InferenceBuilder method matches what this Inference class expects.
_predict (input_data: mac.types.InferenceData) (Required)mac.types.InferenceDataThe entry point for the Wallaroo inference with the following input and output parameters that are defined when the model is updated.
  • mac.types.InferenceData: The input InferenceData is a Dictionary of numpy arrays derived from the input_schema detailed when the model is uploaded, defined in PyArrow.Schema format.
  • mac.types.InferenceData: The output is a Dictionary of numpy arrays as defined by the output parameters defined in PyArrow.Schema format.
The InferenceDataValidationError exception is raised when the input data does not match mac.types.InferenceData.
raise_error_if_model_is_not_assignedN/AError when a model is not set to Inference.
raise_error_if_model_is_wrong_typeN/AError when the model does not match the expected_model_types.

The example, the expected_model_types can be defined for the KMeans model.

from sklearn.cluster import KMeans

class SampleClass(mac.inference.Inference):
    @property
    def expected_model_types(self) -> Set[Any]:
        return {KMeans}

mac.inference.creation.InferenceBuilder

InferenceBuilder builds a concrete Inference, i.e. instantiates an Inference object, loads the appropriate model and assigns the model to the Inference.

classDiagram
    class InferenceBuilder {
        +create(config InferenceConfig) * Inference
        -inference()* Any
    }

Each model that is included requires its own InferenceBuilder. InferenceBuilder loads one model, then submits it to the Inference class when created. The Inference class checks this class against its expected_model_types() Set.

mac.inference.creation.InferenceBuilder Methods
MethodReturnsDescription
create(config mac.config.inference.CustomInferenceConfig) (Required)The custom Inference instance.Creates an Inference subclass, then assigns a model and attributes. The CustomInferenceConfig is used to retrieve the config.model_path, which is a pathlib.Path object pointing to the folder where the model artifacts are saved. Every artifact loaded must be relative to config.model_path. This is set when the arbitrary python .zip file is uploaded and the environment for running it in Wallaroo is set. For example: loading the artifact vgg_clustering\feature_extractor.h5 would be set with config.model_path \ feature_extractor.h5. The model loaded must match an existing module. For our example, this is from sklearn.cluster import KMeans, and this must match the Inference expected_model_types.
inferencecustom Inference instance.Returns the instantiated custom Inference object created from the create method.

Arbitrary Python Runtime

Arbitrary Python always run in the containerized model runtime.

Upload Arbitrary Python Model

Arbitrary Python models are uploaded to Wallaroo through the Wallaroo Client upload_model method.

Upload Arbitrary Python Model Parameters

The following parameters are required for Arbitrary Python models. Note that while some fields are considered as optional for the upload_model method, they are required for proper uploading of a Arbitrary Python model to Wallaroo.

ParameterTypeDescription
namestring (Required)The name of the model. Model names are unique per workspace. Models that are uploaded with the same name are assigned as a new version of the model.
pathstring (Required)The path to the model file being uploaded.
frameworkstring (Required)Set as Framework.CUSTOM.
input_schemapyarrow.lib.Schema (Required)The input schema in Apache Arrow schema format.
output_schemapyarrow.lib.Schema (Required)The output schema in Apache Arrow schema format.
convert_waitbool (Optional) (Default: True)
  • True: Waits in the script for the model conversion completion.
  • False: Proceeds with the script without waiting for the model conversion process to display complete.
archwallaroo.engine_config.ArchitectureThe architecture the model is deployed to. If a model is intended for deployment to an ARM architecture, it must be specified during this step. Values include: X86 (Default): x86 based architectures. ARM: ARM based architectures.

Once the upload process starts, the model is containerized by the Wallaroo instance. This process may take up to 10 minutes.

Model Config Options

Model version configurations are updated with the wallaroo.model_version.config and include the following parameters. Most are optional unless specified.

ParameterTypeDescription
runtimeString (Optional)The model runtime from wallaroo.framework. }
tensor_fields(List[string]) (Optional)A list of alternate input fields. For example, if the model accepts the input fields ['variable1', 'variable2'], tensor_fields allows those inputs to be overridden to ['square_feet', 'house_age'], or other values as required.
input_schemapyarrow.lib.SchemaThe input schema for the model in pyarrow.lib.Schema format.
output_schemapyarrow.lib.SchemaThe output schema for the model in pyarrow.lib.Schema format.
batch_config(List[string]) (Optional)Batch config is either None for multiple-input inferences, or single to accept an inference request with only one row of data.

Upload Arbitrary Python Model Return

The following is returned with a successful model upload and conversion.

FieldTypeDescription
namestringThe name of the model.
versionstringThe model version as a unique UUID.
file_namestringThe file name of the model as stored in Wallaroo.
image_pathstringThe image used to deploy the model in the Wallaroo engine.
last_update_timeDateTimeWhen the model was last updated.

Arbitrary Python Examples

The following are examples of use cases for BYOP models.

Upload Arbitrary Python Model Example

The following example is of uploading a Arbitrary Python VGG16 Clustering ML Model to a Wallaroo instance.

Arbitrary Python Script Example

The following is an example script that fulfills the requirements for a Wallaroo Arbitrary Python Model, and would be saved as custom_inference.py.

"""This module features an example implementation of a custom Inference and its
corresponding InferenceBuilder."""

import pathlib
import pickle
from typing import Any, Set

import tensorflow as tf
from mac.config.inference import CustomInferenceConfig
from mac.inference import Inference
from mac.inference.creation import InferenceBuilder
from mac.types import InferenceData
from sklearn.cluster import KMeans


class ImageClustering(Inference):
    """Inference class for image clustering, that uses
    a pre-trained VGG16 model on cifar10 as a feature extractor
    and performs clustering on a trained KMeans model.

    Attributes:
        - feature_extractor: The embedding model we will use
        as a feature extractor (i.e. a trained VGG16).
        - expected_model_types: A set of model instance types that are expected by this inference.
        - model: The model on which the inference is calculated.
    """

    def __init__(self, feature_extractor: tf.keras.Model):
        self.feature_extractor = feature_extractor
        super().__init__()

    @property
    def expected_model_types(self) -> Set[Any]:
        return {KMeans}

    @Inference.model.setter  # type: ignore
    def model(self, model) -> None:
        """Sets the model on which the inference is calculated.

        :param model: A model instance on which the inference is calculated.

        :raises TypeError: If the model is not an instance of expected_model_types
            (i.e. KMeans).
        """
        self._raise_error_if_model_is_wrong_type(model) # this will make sure an error will be raised if the model is of wrong type
        self._model = model

    def _predict(self, input_data: InferenceData) -> InferenceData:
        """Calculates the inference on the given input data.
        This is the core function that each subclass needs to implement
        in order to calculate the inference.

        :param input_data: The input data on which the inference is calculated.
        It is of type InferenceData, meaning it comes as a dictionary of numpy
        arrays.

        :raises InferenceDataValidationError: If the input data is not valid.
        Ideally, every subclass should raise this error if the input data is not valid.

        :return: The output of the model, that is a dictionary of numpy arrays.
        """

        # input_data maps to the input_schema we have defined
        # with PyArrow, coming as a dictionary of numpy arrays
        inputs = input_data["images"]

        # Forward inputs to the models
        embeddings = self.feature_extractor(inputs)
        predictions = self.model.predict(embeddings.numpy())

        # Return predictions as dictionary of numpy arrays
        return {"predictions": predictions}


class ImageClusteringBuilder(InferenceBuilder):
    """InferenceBuilder subclass for ImageClustering, that loads
    a pre-trained VGG16 model on cifar10 as a feature extractor
    and a trained KMeans model, and creates an ImageClustering object."""

    @property
    def inference(self) -> ImageClustering:
        return ImageClustering

    def create(self, config: CustomInferenceConfig) -> ImageClustering:
        """Creates an Inference subclass and assigns a model and additionally
        needed attributes to it.

        :param config: Custom inference configuration. In particular, we're
        interested in `config.model_path` that is a pathlib.Path object
        pointing to the folder where the model artifacts are saved.
        Every artifact we need to load from this folder has to be
        relative to `config.model_path`.

        :return: A custom Inference instance.
        """
        feature_extractor = self._load_feature_extractor(
            config.model_path / "feature_extractor.h5"
        )
        inference = self.inference(feature_extractor)
        model = self._load_model(config.model_path / "kmeans.pkl")
        inference.model = model

        return inference

    def _load_feature_extractor(
        self, file_path: pathlib.Path
    ) -> tf.keras.Model:
        return tf.keras.models.load_model(file_path)

    def _load_model(self, file_path: pathlib.Path) -> KMeans:
        with open(file_path.as_posix(), "rb") as fp:
            model = pickle.load(fp)
        return model

The following is the requirements.txt file that would be included in the arbitrary python ZIP file. It is highly recommended to use the same requirements.txt file for setting the libraries and versions used to create the model in the arbitrary python ZIP file.

tensorflow==2.8.0
scikit-learn==1.2.2

Upload Arbitrary Python Example

The following example demonstrates uploading the arbitrary python model as vgg_clustering.zip with the following input and output schemas defined.

input_schema = pa.schema([
    pa.field('images', pa.list_(
        pa.list_(
            pa.list_(
                pa.int64(),
                list_size=3
            ),
            list_size=32
        ),
        list_size=32
    )),
])

output_schema = pa.schema([
    pa.field('predictions', pa.int64()),
])

model = wl.upload_model(
                        'vgg16-clustering', 
                        'vgg16_clustering.zip', 
                        framework=Framework.CUSTOM, 
                        input_schema=input_schema, 
                        output_schema=output_schema, 
                        convert_wait=True
                    )

Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime.......................successful

Ready

Data Shaping

Pre and post processing of data is typically performed with Python models, which use the default Wallaroo Python libraries to shape data for models.

When Python libraries are required for data shaping that are not part of the standard included Wallaroo Python libraries, BYOP models are used to import those additional libraries.

The following example uses the following requirements field to add additional libraries for image conversion. In this example, there is no ML Model that is part of the BYOP model. The ImageResize class extends the mac.inference.Inference to perform the data conversion.

tensorflow==2.8.0
pillow>=10.0.0

The following code accepts data from either a pandas DataFrame or Apache arrow table where the data is in the data column, and reformats that data to be in the image column.

"""This module features an example implementation of a custom Inference and its
corresponding InferenceBuilder."""

import pathlib
import pickle
from typing import Any, Set
import base64
import numpy as np
from PIL import Image
import logging

from mac.config.inference import CustomInferenceConfig
from mac.inference import Inference
from mac.inference.creation import InferenceBuilder
from mac.types import InferenceData

class ImageResize(Inference):
    """Inference class for image resizing.
    """

    def __init__(self):
        self.model = "conversion-sample"
        super().__init__()

    @property
    def expected_model_types(self) -> Set[Any]:
        return {str}

    @Inference.model.setter  # type: ignore
    def model(self, model) -> None:
        # Hazard: this has to be here because the ABC has the getter
        self._model = "conversion-sample"

    def _predict(self, input_data: InferenceData) -> InferenceData:
        # input_data maps to the input_schema we have defined
        # with PyArrow, coming as a dictionary of numpy arrays
        img = input_data["data"]
        logging.debug(f"In Python  {type(img)}")

        
        res = {"image": img} # sets the `image` field to the incoming data's ['data'] field.
        logging.debug(f"Returning results")
        return res

class ImageResizeBuilder(InferenceBuilder):
    """InferenceBuilder subclass for ImageResize."""

    @property
    def inference(self) -> ImageResize:
        return ImageResize

    def create(self, config: CustomInferenceConfig) -> ImageResize:
        """Creates an Inference subclass and assigns a model and additionally
        needed attributes to it.

        :param config: Custom inference configuration. In particular, we're
        interested in `config.model_path` that is a pathlib.Path object
        pointing to the folder where the model artifacts are saved.
        Every artifact we need to load from this folder has to be
        relative to `config.model_path`.

        :return: A custom Inference instance.
        """
        x = self.inference()
        x.model = "conversion-sample"
        return x

The BYOP model is uploaded to Wallaroo using framework=wallaroo.framework.Framework.CUSTOM as a parameter in the model_upload() function and added to a pipeline as a pipeline step. Here the BYOP model formats the data before submitting to the actual computer vision model.

# for the BYOP data reshaper model
input_schema = pa.schema([pa.field("data", pa.list_(pa.float32(), list_size=921600))])
output_schema = pa.schema([pa.field("image", pa.list_(pa.float32(), list_size=921600))])

resize = wl.upload_model("resize", "./resize-arrow.zip", framework=wallaroo.framework.Framework.CUSTOM,
                         input_schema = input_schema, output_schema = output_schema, convert_wait=True)

# for the CV model

input_schema = pa.schema([pa.field("data", pa.list_(pa.float32(), list_size=921600))])
output_schema = pa.schema([pa.field("image", pa.list_(pa.float32(), list_size=921600))])

model = wl.upload_model('mobilenet', "./model/mobilenet.pt.onnx", 
                        framework = wallaroo.framework.Framework.ONNX

# set the engine config
dc = wallaroo.DeploymentConfigBuilder() \
    .cpus(4)\
    .memory("4Gi")\
    .build()

pipeline = wl.build_pipeline('resize-pipeline')
pipeline.add_model_step(resize)
pipeline.add_model_step(model)

# deploy the pipeline
pipeline.deploy(deployment_config = dc)

Pipeline Deployment Configurations

Pipeline deployments allocate resources from the cluster to the pipeline and its models with the wallaroo.pipeline.deploy(deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig]) method. The wallaroo.deployment_config.DeploymentConfig.DeploymentConfigBuilder class creates DeploymentConfig settings such as the number of CPUs, the amount of RAM, the architecture, etc. For full details, see the Pipeline deployment configurations guides.

The settings for a pipeline configuration are dependent on whether the model is converted to the Native Runtime space, or Containerized Model Runtime space during the model upload process. The method wallaroo.model_config.runtime() displays which runtime the uploaded model was converted to.

RuntimeTypePipeline Deployment Details
onnxWallaroo NativeSee Native Runtime Configuration Methods
flightWallaroo ContainerSee Containerized Runtime Configuration Methods

Wallaroo Native Runtime Deployment

Wallaroo Native Runtime models typically use the following settings for pipeline resource allocation. See See Native Runtime Configuration Methods for complete options.

ResourceMethodDescription
Replicaswallaroo.deployment_config.DeploymentConfigBuilder.replica_count(count: int)The number of replicas of the Wallaroo Native pipeline resources to allocate. Each replica has the same number of cpus, ram, etc. For example: DeploymentConfigBuilder.replica_count(2)
Auto-allocated replicaswallaroo.deployment_config.DeploymentConfigBuilder.replica_autoscale_min_max(maximum: int, minimum: int = 0)Replicas that will auto-allocate more replicas to the pipeline from 0 to the set maximum as more inference requests are made.
CPUwallaroo.deployment_config.DeploymentConfigBuilder.cpus(core_count: float)Fractional number of cpus to allocate. For example: DeploymentConfigBuilder.cpus(0.5)
Memorywallaroo.deployment_config.DeploymentConfigBuilder.memory(memory_spec: string)Memory resources in Kubernetes Memory resource units
GPUswallaroo.deployment_config.DeploymentConfigBuilder.gpus(core_count: int)Number of GPU’s to deploy; GPUs can only be deployed in whole increments. If used, must be paired with the deployment_label pipeline configuration option.
Deployment Labelwallaroo.deployment_config.DeploymentConfigBuilder.deployment_label(label:string)Required if gpus are set and must match the GPU nodepool label.

The following example shows deploying a Native Wallaroo Runtime model with the pipeline configuration of one replica, half a cpu and 1 Gi of RAM.

Note that for native runtime models, total pipeline resources are shared by all the native runtime models for each replica.

model.config().runtime()

'onnx'

# add the model as a pipeline step
pipeline.add_model_step(model)

# DeploymentConfigBuilder is used to create the pipeline's deployment configuration object
from wallaroo.deployment_config import DeploymentConfigBuilder

# deploy using native runtime deployment
deployment_config_native = DeploymentConfigBuilder() \
    .replica_count(1) \
    .cpus(0.5) \
    .memory('1Gi') \
    .build()

# deploy the pipeline with the pipeline configuration
pipeline.deploy(deployment_config=deployment_config_native)

Wallaroo Containerized Runtime Deployment

Wallaroo Containerized Runtime models typically use the following settings for pipeline resource allocation. See See Containerized Runtime Configuration Methods for complete options.

Containerized Runtime models resources are allocated with the sidekick name, with the containerized model specified for resources.

ResourceMethodDescription
Replicaswallaroo.deployment_config.DeploymentConfigBuilder.replica_count(count: int)The number of replicas of the Wallaroo Native pipeline resources to allocate. Each replica has the same number of cpus, ram, etc.
Auto-allocated replicaswallaroo.deployment_config.DeploymentConfigBuilder.replica_autoscale_min_max(maximum: int, minimum: int = 0)Replicas that will auto-allocate more replicas to the pipeline from 0 to the set maximum as more inference requests are made.
CPUwallaroo.deployment_config.DeploymentConfigBuilder.sidekick_cpus(model: wallaroo.model.Model, core_count: float)Fractional number of cpus to allocate for the containerized model.
Memorywallaroo.deployment_config.DeploymentConfigBuilder.sidekick_memory(model: wallaroo.model.Model, memory_spec: string)Memory resources in Kubernetes Memory resource units
GPUswallaroo.deployment_config.DeploymentConfigBuilder.sidekick_gpus(model: wallaroo.model.Model, core_count: int)Number of GPU’s to deploy; GPUs can only be deployed in whole increments. If used, must be paired with the deployment_label pipeline configuration option.
Deployment Labelwallaroo.deployment_config.DeploymentConfigBuilder.deployment_label(label:string)Required if gpus are set and must match the GPU nodepool label.

The following example shows deploying a Containerized Wallaroo Runtime model with the pipeline configuration of one replica, half a cpu and 1 Gi of RAM.

Note that for containerized models, each containerized model’s resources are set independently of each other and duplicated for each pipeline replica, and are considered separate from the native runtime models.

model_native.config().runtime()

'onnx'

model_containerized.config().runtime()

'flight'

# add the models as a pipeline steps
pipeline.add_model_step(model_native)
pipeline.add_model_step(model_containerized)


# DeploymentConfigBuilder is used to create the pipeline's deployment configuration object
from wallaroo.deployment_config import DeploymentConfigBuilder

# deploy using containerized runtime deployment
deployment_config_containerized = DeploymentConfigBuilder() \
    .replica_count(1) \
    .cpus(0.5) \ # shared by the native runtime models
    .memory('1Gi') \ # shared by the native runtime models
    .sidekick_cpus(model_containerized, 0.5) \ # 0.5 cpu allocated solely for the containerized model
    .sidekick_memory(model_containerized, '1Gi') \ #1 Gi allocated solely for the containerized model
    .build()

# deploy the pipeline with the pipeline configuration
pipeline.deploy(deployment_config=deployment_config_containerized)

Pipeline Deployment Timeouts

Pipeline deployments typically take 45 seconds for Wallaroo Native Runtimes, and 90 seconds for Wallaroo Containerized Runtimes.

If Wallaroo Pipeline deployment times out from a very large or complex ML model being deployed, the timeout is extended from with the wallaroo.Client.Client(request_timeout:int) setting, where request_timeout is in integer seconds. Wallaroo Native Runtime deployments are scaled at 1x the request_timeout setting. Wallaroo Containerized Runtimes are scaled at 2x the request_timeout setting.

The following example shows extending the request_timeout to 2 minutes.

wl = wallaroo.Client(request_timeout=120)

wl.timeout

120