wallaroo.pipeline


def update_timestamp(f):
class Pipeline(wallaroo.object.Object):

A pipeline is an execution context for models. Pipelines contain Steps, which are often Models. Pipelines can be deployed or un-deployed.

Pipeline(client: wallaroo.client.Client, data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def owner_id(*args, **kwargs):
def create_time(*args, **kwargs):
def last_update_time(*args, **kwargs):
def name(*args, **kwargs):
def versions(*args, **kwargs):
def tags(*args, **kwargs):
def workspace(*args, **kwargs):
def get_pipeline_configuration(self, version: Optional[str] = None) -> Dict[str, Any]:

Get a pipeline configuration for a specific version.

Parameters
  • version: str Version of the pipeline. :return Dict[str, Any] Pipeline configuration.
def logs( self, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> Union[pyarrow.lib.Table, pandas.core.frame.DataFrame]:

Get inference logs for this pipeline.

Parameters
  • limit: Optional[int]: Maximum number of logs to return.
  • start_datetime: Optional[datetime.datetime]: Start time for logs.
  • end_datetime: Optional[datetime.datetime]: End time for logs.
  • valid: Optional[bool]: If set to False, will include logs for failed inferences
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns

Union[pa.Table, pd.DataFrame]

def export_logs( self, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> None:

Export logs to a user provided local file.

Parameters
  • directory: Optional[str] Logs will be exported to a file in the given directory. By default, logs will be exported to new "logs" subdirectory in current working directory.
  • file_prefix: Optional[str] Prefix to name the exported file. By default, the file_prefix will be set to the pipeline name.
  • data_size_limit: Optional[str] The maximum size of the exported data in bytes. Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
  • limit: Optional[int] The maximum number of logs to return.
  • start_datetime: Optional[datetime.datetime] The start time to filter logs by.
  • end_datetime: Optional[datetime.datetime] The end time to filter logs by.
  • valid: Optional[bool] If set to False, will return logs for failed inferences.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame. :return None
def url(self) -> str:

Returns the inference URL for this pipeline.

def deploy( self, pipeline_name: Optional[str] = None, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None, wait_for_status: Optional[bool] = True) -> Optional[wallaroo.pipeline.Pipeline]:

Deploy pipeline. pipeline_name is optional if deploy was called previously. When specified, pipeline_name must be ASCII alpha-numeric characters, plus dash (-) only.

Parameters
  • pipeline_name: Optional[str] Name of the pipeline to deploy.
  • deployment_config: Optional[DeploymentConfig] Deployment configuration.
  • wait_for_status: If set to False, will not wait for deployment status. If set to True, will wait for deployment status to be running or encountered an error. Default is True.
Returns

Pipeline

def definition(self) -> str:

Get the current definition of the pipeline as a string

def get_topic_name(self) -> str:
def undeploy(self) -> wallaroo.pipeline.Pipeline:
def infer(self, *args, **kwargs):

Inferences are performed on deployed pipelines. A pipeline processes data sequentially through a series of steps, where each step's output becomes the input for the next step. The final output represents the result of the entire pipeline's processing.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] The data submitted to the pipeline for inference.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
Returns

DataFrame or Arrow format.

def infer_from_file(self, *args, **kwargs):

This method is used to run inference on a deployment using a file. The file can be in one of the following formats: pandas.DataFrame: .arrow, .json which contains data either in the pandas.records format or wallaroo custom json format.

Parameters
  • filename: Union[str, pathlib.Path]. The file to be sent to run inference on.
  • data_format: Optional[str]. The format of the data in the file. If not provided, the format will be inferred from the file extension.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[str] If set to ".", returned dataset will be flattened.
Returns

Inference result in the form of pd.DataFrame, pa.Table, dict or list.

async def async_infer( self, tensor: Union[Dict[str, Any], pandas.core.frame.DataFrame, pyarrow.lib.Table], async_client: httpx.AsyncClient, timeout: Union[int, float, NoneType] = None, retries: Optional[int] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):

Runs an async inference and returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data.
  • async_client: AsyncClient Async client to use for async inference.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • retries: Optional[int] Number of retries to use in case of Connection errors.
  • job_id: Optional[int] Job id to use for async inference.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
async def parallel_infer( self, tensor: Union[pandas.core.frame.DataFrame, pyarrow.lib.Table], batch_size: Optional[int] = 1, timeout: Union[int, float, NoneType] = None, num_parallel: Optional[int] = None, retries: Optional[int] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):

Runs parallel inferences and returns a list of inference results on latest deployment.

Parameters
  • tensor: Union[pd.DataFrame, pa.Table] Inference data.
  • batch_size: Optional[int] Number of examples per batch.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • num_parallel: Optional[int] Semaphore to use for async inference.
  • retries: Optional[int] Number of retries to use in case of Connection errors.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
Returns

Union[pd.DataFrame, pa.Table] Inference results

def status(self) -> Dict[str, Any]:

Status of pipeline

def steps(self) -> List[wallaroo.pipeline_config.Step]:

Returns a list of the steps of a pipeline. Not exactly a shim

def model_configs(self) -> List[wallaroo.model_config.ModelConfig]:

Returns a list of the model configs of a pipeline. Not exactly a shim

def remove_step(self, index: int) -> wallaroo.pipeline.Pipeline:

Remove a step at a given index

def add_model_step( self, model_version: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline.Pipeline:

Perform inference with a single model.

def replace_with_model_step( self, index: int, model_version: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline.Pipeline:

Replaces the step at the given index with a model step

def add_multi_model_step( self, model_version_list: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, model_version_list: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a multi model step

def add_audit(self, slice) -> wallaroo.pipeline.Pipeline:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit(self, index: int, audit_slice: str) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline.Pipeline:

Select only the model output with the given index from an array of outputs.

def replace_with_select(self, step_index: int, select_index: int) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a select step

def add_key_split( self, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a random split step

def add_shadow_deploy( self, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replace a given step with a shadow deployment

def add_validations(self, **validations: polars.expr.expr.Expr) -> wallaroo.pipeline.Pipeline:

Add a dict of validations to run on every row.

def clear(self) -> wallaroo.pipeline.Pipeline:

Remove all steps from the pipeline. This might be desireable if replacing models, for example.

def publish( self, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None, replaces: Optional[List[int]] = None):

Create a new version of a pipeline and publish it.

def publishes(self):
def list_edges(self):
def create_version(self) -> wallaroo.pipeline_version.PipelineVersion:

Creates a new PipelineVersion and stores it in the database.

class Pipelines(typing.List[wallaroo.pipeline.Pipeline]):

Wraps a list of pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class Edge(wallaroo.wallaroo_ml_ops_api_client.models.edge.Edge):

The Edge

Attributes: cpus (float): Number of CPUs id (str): ID memory (str): Amount of memory (in k8s format) name (str): User-given name tags (List[str]): Edge tags created_on_version (Union[Unset, AppVersion]): should_run_publish (Union[None, Unset, int]): The pipeline publish ID this edge is supposed to run spiffe_id (Union[None, Unset, str]): Spiffe ID

class EdgesList(typing.List[wallaroo.pipeline.Edge]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort