wallaroo.pipeline_config


class ValidDataType(builtins.str, enum.Enum):

An enumeration.

f32 = <ValidDataType.f32: 'f32'>
f64 = <ValidDataType.f64: 'f64'>
i8 = <ValidDataType.i8: 'i8'>
u8 = <ValidDataType.u8: 'u8'>
i16 = <ValidDataType.i16: 'i16'>
u16 = <ValidDataType.u16: 'u16'>
i32 = <ValidDataType.i32: 'i32'>
u32 = <ValidDataType.u32: 'u32'>
i64 = <ValidDataType.i64: 'i64'>
u64 = <ValidDataType.u64: 'u64'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ModelConfigsForStep:
ModelConfigsForStep(model_configs: List[wallaroo.model_config.ModelConfig])
class ModelForStep:
ModelForStep(name, version, sha)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, str]):
@classmethod
def from_model(cls, model: wallaroo.model_version.ModelVersion):
class ModelWeight:
ModelWeight(weight: float, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
@classmethod
def from_tuple(cls, tup: Tuple[float, wallaroo.model_version.ModelVersion]):
class RowToModel:
RowToModel(row_index: int, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
class Step:
Step()
def to_json(self):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

@staticmethod
def from_json(json_dict: Dict):
class Steps(typing.List[wallaroo.pipeline_config.Step]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class Average(Step):
Average()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class AuditResults(Step):
AuditResults(start: int, end: Union[int, NoneType] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class Check(Step):
Check(validations: List[str])
def to_json(self):
@classmethod
def from_validation_dict(cls, validations: Dict[str, polars.expr.expr.Expr]):
@staticmethod
def from_json(json_dict: Dict):
class ColumnsSelect(Step):
ColumnsSelect(columns: List[int])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class ColumnsToRows(Step):
ColumnsToRows()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class InputDataToType(Step):
InputDataToType(data_type: wallaroo.pipeline_config.ValidDataType)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class ModelInference(Step):
ModelInference(models: List[wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

class RowsToModels(Step):
RowsToModels(rows_to_models: List[wallaroo.pipeline_config.RowToModel])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
Inherited Members
class Nth(Step):
Nth(index: int)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class MultiOut(Step):
MultiOut()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class MetaValueSplit(Step):
MetaValueSplit( split_key: str, control: wallaroo.pipeline_config.ModelForStep, routes: Dict[str, wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

class RandomSplit(Step):
RandomSplit( weights: List[wallaroo.pipeline_config.ModelWeight], hash_key: Union[str, NoneType] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
Inherited Members
class PipelineConfig:
PipelineConfig(pipeline_name: str, steps: Iterable[wallaroo.pipeline_config.Step])
@classmethod
def from_json(Klass, json):
def to_json(self):
def to_yaml(self):
class PipelineConfigBuilder:
PipelineConfigBuilder( client: Union[wallaroo.client.Client, NoneType], pipeline_name: str, standalone=False)
def upload(self) -> wallaroo.pipeline.Pipeline:
def remove_step(self, index: int):

Remove a step at a given index

Perform inference with a single model.

def replace_with_model_step( self, index: int, model: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the given index with a model step

def add_multi_model_step( self, models: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, models: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a multi model step

def add_audit(self, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit( self, index: int, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Select only the model output with the given index from an array of outputs.

def add_multi_out(self):
def replace_with_select( self, step_index: int, select_index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a select step

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Union[str, NoneType] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Union[str, NoneType] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a random split step

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def add_validations( self, **validations: polars.expr.expr.Expr) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Add a dict of validations to run on every row.

def replace_with_validations( self, index: int, **validations: polars.expr.expr.Expr) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the given index with a different validation dict

Remove all steps from the pipeline. This might be desireable if replacing models, for example.