class
Pipeline(wallaroo.object.Object):
A pipeline is an execution context for models.
Pipelines contain Steps, which are often Models.
Pipelines can be deployed or un-deployed.
Pipeline(client: Optional[wallaroo.client.Client], data: Dict[str, Any])
Base constructor.
Each object requires:
- a GraphQL client - in order to fill its missing members dynamically
- an initial data blob - typically from unserialized JSON, contains at
- least the data for required
members (typically the object's primary key) and optionally other data
members.
def
builder(self) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def
owner_id(*args, **kwargs):
def
create_time(*args, **kwargs):
def
last_update_time(*args, **kwargs):
def
name(*args, **kwargs):
def
variants(*args, **kwargs):
def
get_pipeline_configuration(self, version: Optional[str] = None) -> Dict[str, Any]:
Get a pipeline configuration for a specific version.
Parameters
- version: str Version of the pipeline.
:return Dict[str, Any] Pipeline configuration.
def
logs( self, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> Union[wallaroo.logs.LogEntries, pyarrow.lib.Table, pandas.core.frame.DataFrame]:
Get inference logs for this pipeline.
Parameters
- limit: Optional[int]: Maximum number of logs to return.
- start_datetime: Optional[datetime.datetime]: Start time for logs.
- end_datetime: Optional[datetime.datetime]: End time for logs.
- valid: Optional[bool]: If set to False, will include logs for failed inferences
- dataset: Optional[List[str]] By default this is set to ["*"] which returns,
["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
- dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
- dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
- arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns
Union[LogEntries, pa.Table, pd.DataFrame]
def
export_logs( self, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> None:
Export logs to a user provided local file.
Parameters
- directory: Optional[str] Logs will be exported to a file in the given directory.
By default, logs will be exported to new "logs" subdirectory in current working directory.
- file_prefix: Optional[str] Prefix to name the exported file. By default, the file_prefix will be set to
the pipeline name.
- data_size_limit: Optional[str] The maximum size of the exported data in bytes.
Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
- limit: Optional[int] The maximum number of logs to return.
- start_datetime: Optional[datetime.datetime] The start time to filter logs by.
- end_datetime: Optional[datetime.datetime] The end time to filter logs by.
- valid: Optional[bool] If set to False, will return logs for failed inferences.
- dataset: Optional[List[str]] By default this is set to ["*"] which returns,
["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
- dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
- dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
- arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
:return None
def
logs_shadow_deploy(self):
def
url(self) -> str:
Returns the inference URL for this pipeline.
def
deploy( self, pipeline_name: Optional[str] = None, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.pipeline.Pipeline:
Deploy pipeline. pipeline_name
is optional if deploy was called previously. When specified,
pipeline_name
must be ASCII alpha-numeric characters, plus dash (-) only.
def
definition(self) -> str:
Get the current definition of the pipeline as a string
def
get_topic_name(self) -> str:
def
undeploy(self) -> wallaroo.pipeline.Pipeline:
def
infer(self, *args, **kwargs):
Returns an inference result on this deployment, given a tensor.
Parameters
- tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data. Should be a dictionary.
Future improvement: will be a pandas dataframe or arrow table
- timeout: Optional[Union[int, float]] infer requests will time out after
the amount of seconds provided are exceeded. timeout defaults
to 15 secs.
- dataset: Optional[List[str]] By default this is set to ["*"] which returns,
["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
- dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
- dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
Returns
InferenceResult in dictionary, dataframe or arrow format.
def
infer_from_file(self, *args, **kwargs):
Returns an inference result on this deployment, given tensors in a file.
def
status(self) -> Dict[str, Any]:
def
steps(self) -> List[wallaroo.pipeline_config.Step]:
Returns a list of the steps of a pipeline. Not exactly a shim
def
model_configs(self) -> List[wallaroo.model_config.ModelConfig]:
Returns a list of the model configs of a pipeline. Not exactly a shim
def
remove_step(self, index: int) -> wallaroo.pipeline.Pipeline:
Remove a step at a given index
def
add_model_step(self, model: wallaroo.model.Model) -> wallaroo.pipeline.Pipeline:
Perform inference with a single model.
def
replace_with_model_step( self, index: int, model: wallaroo.model.Model) -> wallaroo.pipeline.Pipeline:
Replaces the step at the given index with a model step
def
add_multi_model_step( self, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Perform inference on the same input data for any number of models.
def
replace_with_multi_model_step( self, index: int, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Replaces the step at the index with a multi model step
def
add_audit(self, slice) -> wallaroo.pipeline.Pipeline:
Run audit logging on a specified slice
of model outputs.
The slice must be in python-like format. start:
, start:end
, and
:end
are supported.
def
replace_with_audit(self, index: int, audit_slice: str) -> wallaroo.pipeline.Pipeline:
Replaces the step at the index with an audit step
def
add_select(self, index: int) -> wallaroo.pipeline.Pipeline:
Select only the model output with the given index
from an array of
outputs.
def
replace_with_select(self, step_index: int, select_index: int) -> wallaroo.pipeline.Pipeline:
Replaces the step at the index with a select step
def
add_key_split( self, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Split traffic based on the value at a given meta_key
in the input data,
routing to the appropriate model.
If the resulting value is a key in options
, the corresponding model is used.
Otherwise, the default
model is used for inference.
def
replace_with_key_split( self, index: int, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Replace the step at the index with a key split step
def
add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:
Routes inputs to a single model, randomly chosen from the list of
weighted
options.
Each model receives inputs that are approximately proportional to the
weight it is assigned. For example, with two models having weights 1
and 1, each will receive roughly equal amounts of inference inputs. If
the weights were changed to 1 and 2, the models would receive roughly
33% and 66% respectively instead.
When choosing the model to use, a random number between 0.0 and 1.0 is
generated. The weighted inputs are mapped to that range, and the random
input is then used to select the model to use. For example, for the
two-models equal-weight case, a random key of 0.4 would route to the
first model. 0.6 would route to the second.
To support consistent assignment to a model, a hash_key
can be
specified. This must be between 0.0 and 1.0. The value at this key, when
present in the input data, will be used instead of a random number for
model selection.
def
replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:
Replace the step at the index with a random split step
def
add_shadow_deploy( self, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Create a "shadow deployment" experiment pipeline. The champion
model and all challengers
are run for each input. The result data for
all models is logged, but the output of the champion
is the only
result returned.
This is particularly useful for "burn-in" testing a new model with real
world data without displacing the currently proven model.
This is currently implemented as three steps: A multi model step, an audit step, and
a select step. To remove or replace this step, you need to remove or replace
all three. You can remove steps using pipeline.remove_step
def
replace_with_shadow_deploy( self, index: int, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:
Replace a given step with a shadow deployment
def
add_validation( self, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline.Pipeline:
Add a validation
with the given name
. All validations are run on
all outputs, and all failures are logged.
def
add_alert( self, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:
def
replace_with_alert( self, index: int, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:
Replace the step at the given index with the specified alert
def
clear(self) -> wallaroo.pipeline.Pipeline:
Remove all steps from the pipeline. This might be desireable if replacing models, for example.
def
list_explainability_configs(self) -> List[wallaroo.explainability.ExplainabilityConfig]:
List the explainability configs we've created.
def
get_explainability_config( self, expr: Union[str, wallaroo.explainability.ExplainabilityConfig]) -> wallaroo.explainability.ExplainabilityConfig:
Get the details of an explainability config.
def
create_explainability_config(self, feature_names: Sequence[str], num_points=10):
Create a shap config to be used later for reference and adhoc requests.